LCOV - differential code coverage report
Current view: top level - src/backend/replication - walreceiverfuncs.c (source / functions) Coverage Total Hit UBC CBC
Current: Differential Code Coverage HEAD vs 15 Lines: 83.1 % 148 123 25 123
Current Date: 2023-04-08 17:13:01 Functions: 90.0 % 10 9 1 9
Baseline: 15 Line coverage date bins:
Baseline Date: 2023-04-08 15:09:40 (240..) days: 83.1 % 148 123 25 123
Legend: Lines: hit not hit Function coverage date bins:
(240..) days: 90.0 % 10 9 1 9

 Age         Owner                  TLA  Line data    Source code
                                  1                 : /*-------------------------------------------------------------------------
                                  2                 :  *
                                  3                 :  * walreceiverfuncs.c
                                  4                 :  *
                                  5                 :  * This file contains functions used by the startup process to communicate
                                  6                 :  * with the walreceiver process. Functions implementing walreceiver itself
                                  7                 :  * are in walreceiver.c.
                                  8                 :  *
                                  9                 :  * Portions Copyright (c) 2010-2023, PostgreSQL Global Development Group
                                 10                 :  *
                                 11                 :  *
                                 12                 :  * IDENTIFICATION
                                 13                 :  *    src/backend/replication/walreceiverfuncs.c
                                 14                 :  *
                                 15                 :  *-------------------------------------------------------------------------
                                 16                 :  */
                                 17                 : #include "postgres.h"
                                 18                 : 
                                 19                 : #include <sys/stat.h>
                                 20                 : #include <sys/time.h>
                                 21                 : #include <time.h>
                                 22                 : #include <unistd.h>
                                 23                 : #include <signal.h>
                                 24                 : 
                                 25                 : #include "access/xlog_internal.h"
                                 26                 : #include "access/xlogrecovery.h"
                                 27                 : #include "pgstat.h"
                                 28                 : #include "postmaster/startup.h"
                                 29                 : #include "replication/walreceiver.h"
                                 30                 : #include "storage/pmsignal.h"
                                 31                 : #include "storage/shmem.h"
                                 32                 : #include "utils/timestamp.h"
                                 33                 : 
                                 34                 : WalRcvData *WalRcv = NULL;
                                 35                 : 
                                 36                 : /*
                                 37                 :  * How long to wait for walreceiver to start up after requesting
                                 38                 :  * postmaster to launch it. In seconds.
                                 39                 :  */
                                 40                 : #define WALRCV_STARTUP_TIMEOUT 10
                                 41                 : 
                                 42                 : /* Report shared memory space needed by WalRcvShmemInit */
                                 43                 : Size
 4832 heikki.linnakangas         44 CBC        6390 : WalRcvShmemSize(void)
                                 45                 : {
 4790 bruce                      46            6390 :     Size        size = 0;
                                 47                 : 
 4832 heikki.linnakangas         48            6390 :     size = add_size(size, sizeof(WalRcvData));
                                 49                 : 
                                 50            6390 :     return size;
                                 51                 : }
                                 52                 : 
                                 53                 : /* Allocate and initialize walreceiver-related shared memory */
                                 54                 : void
                                 55            1826 : WalRcvShmemInit(void)
                                 56                 : {
                                 57                 :     bool        found;
                                 58                 : 
                                 59            1826 :     WalRcv = (WalRcvData *)
                                 60            1826 :         ShmemInitStruct("Wal Receiver Ctl", WalRcvShmemSize(), &found);
                                 61                 : 
 4729 tgl                        62            1826 :     if (!found)
                                 63                 :     {
                                 64                 :         /* First time through, so initialize */
                                 65            1826 :         MemSet(WalRcv, 0, WalRcvShmemSize());
                                 66            1826 :         WalRcv->walRcvState = WALRCV_STOPPED;
  758 tmunro                     67            1826 :         ConditionVariableInit(&WalRcv->walRcvStoppedCV);
 4729 tgl                        68            1826 :         SpinLockInit(&WalRcv->mutex);
  780 fujii                      69            1826 :         pg_atomic_init_u64(&WalRcv->writtenUpto, 0);
 2321 peter_e                    70            1826 :         WalRcv->latch = NULL;
                                 71                 :     }
 4832 heikki.linnakangas         72            1826 : }
                                 73                 : 
                                 74                 : /* Is walreceiver running (or starting up)? */
                                 75                 : bool
 3769                            76            1309 : WalRcvRunning(void)
                                 77                 : {
 2742 rhaas                      78            1309 :     WalRcvData *walrcv = WalRcv;
                                 79                 :     WalRcvState state;
                                 80                 :     pg_time_t   startTime;
                                 81                 : 
 4832 heikki.linnakangas         82            1309 :     SpinLockAcquire(&walrcv->mutex);
                                 83                 : 
 4820                            84            1309 :     state = walrcv->walRcvState;
                                 85            1309 :     startTime = walrcv->startTime;
                                 86                 : 
                                 87            1309 :     SpinLockRelease(&walrcv->mutex);
                                 88                 : 
                                 89                 :     /*
                                 90                 :      * If it has taken too long for walreceiver to start up, give up. Setting
                                 91                 :      * the state to STOPPED ensures that if walreceiver later does start up
                                 92                 :      * after all, it will see that it's not supposed to be running and die
                                 93                 :      * without doing anything.
                                 94                 :      */
                                 95            1309 :     if (state == WALRCV_STARTING)
                                 96                 :     {
 4790 bruce                      97 UBC           0 :         pg_time_t   now = (pg_time_t) time(NULL);
                                 98                 : 
 4820 heikki.linnakangas         99               0 :         if ((now - startTime) > WALRCV_STARTUP_TIMEOUT)
                                100                 :         {
  758 tmunro                    101               0 :             bool        stopped = false;
                                102                 : 
                                103               0 :             SpinLockAcquire(&walrcv->mutex);
 4820 heikki.linnakangas        104               0 :             if (walrcv->walRcvState == WALRCV_STARTING)
                                105                 :             {
                                106               0 :                 state = walrcv->walRcvState = WALRCV_STOPPED;
  758 tmunro                    107               0 :                 stopped = true;
                                108                 :             }
 4820 heikki.linnakangas        109               0 :             SpinLockRelease(&walrcv->mutex);
                                110                 : 
  758 tmunro                    111               0 :             if (stopped)
                                112               0 :                 ConditionVariableBroadcast(&walrcv->walRcvStoppedCV);
                                113                 :         }
                                114                 :     }
                                115                 : 
 4820 heikki.linnakangas        116 CBC        1309 :     if (state != WALRCV_STOPPED)
                                117              26 :         return true;
                                118                 :     else
                                119            1283 :         return false;
                                120                 : }
                                121                 : 
                                122                 : /*
                                123                 :  * Is walreceiver running and streaming (or at least attempting to connect,
                                124                 :  * or starting up)?
                                125                 :  */
                                126                 : bool
 3769                           127           29631 : WalRcvStreaming(void)
                                128                 : {
 2742 rhaas                     129           29631 :     WalRcvData *walrcv = WalRcv;
                                130                 :     WalRcvState state;
                                131                 :     pg_time_t   startTime;
                                132                 : 
 3769 heikki.linnakangas        133           29631 :     SpinLockAcquire(&walrcv->mutex);
                                134                 : 
                                135           29631 :     state = walrcv->walRcvState;
                                136           29631 :     startTime = walrcv->startTime;
                                137                 : 
                                138           29631 :     SpinLockRelease(&walrcv->mutex);
                                139                 : 
                                140                 :     /*
                                141                 :      * If it has taken too long for walreceiver to start up, give up. Setting
                                142                 :      * the state to STOPPED ensures that if walreceiver later does start up
                                143                 :      * after all, it will see that it's not supposed to be running and die
                                144                 :      * without doing anything.
                                145                 :      */
                                146           29631 :     if (state == WALRCV_STARTING)
                                147                 :     {
                                148             216 :         pg_time_t   now = (pg_time_t) time(NULL);
                                149                 : 
                                150             216 :         if ((now - startTime) > WALRCV_STARTUP_TIMEOUT)
                                151                 :         {
  758 tmunro                    152 UBC           0 :             bool        stopped = false;
                                153                 : 
                                154               0 :             SpinLockAcquire(&walrcv->mutex);
 3769 heikki.linnakangas        155               0 :             if (walrcv->walRcvState == WALRCV_STARTING)
                                156                 :             {
                                157               0 :                 state = walrcv->walRcvState = WALRCV_STOPPED;
  758 tmunro                    158               0 :                 stopped = true;
                                159                 :             }
 3769 heikki.linnakangas        160               0 :             SpinLockRelease(&walrcv->mutex);
                                161                 : 
  758 tmunro                    162               0 :             if (stopped)
                                163               0 :                 ConditionVariableBroadcast(&walrcv->walRcvStoppedCV);
                                164                 :         }
                                165                 :     }
                                166                 : 
 3769 heikki.linnakangas        167 CBC       29631 :     if (state == WALRCV_STREAMING || state == WALRCV_STARTING ||
                                168                 :         state == WALRCV_RESTARTING)
                                169           26853 :         return true;
                                170                 :     else
                                171            2778 :         return false;
                                172                 : }
                                173                 : 
                                174                 : /*
                                175                 :  * Stop walreceiver (if running) and wait for it to die.
                                176                 :  * Executed by the Startup process.
                                177                 :  */
                                178                 : void
 4832                           179            1283 : ShutdownWalRcv(void)
                                180                 : {
 2742 rhaas                     181            1283 :     WalRcvData *walrcv = WalRcv;
 4790 bruce                     182            1283 :     pid_t       walrcvpid = 0;
  758 tmunro                    183            1283 :     bool        stopped = false;
                                184                 : 
                                185                 :     /*
                                186                 :      * Request walreceiver to stop. Walreceiver will switch to WALRCV_STOPPED
                                187                 :      * mode once it's finished, and will also request postmaster to not
                                188                 :      * restart itself.
                                189                 :      */
 4832 heikki.linnakangas        190            1283 :     SpinLockAcquire(&walrcv->mutex);
 4790 bruce                     191            1283 :     switch (walrcv->walRcvState)
                                192                 :     {
 4820 heikki.linnakangas        193            1247 :         case WALRCV_STOPPED:
                                194            1247 :             break;
                                195              10 :         case WALRCV_STARTING:
                                196              10 :             walrcv->walRcvState = WALRCV_STOPPED;
  758 tmunro                    197              10 :             stopped = true;
 4820 heikki.linnakangas        198              10 :             break;
                                199                 : 
 3769                           200              26 :         case WALRCV_STREAMING:
                                201                 :         case WALRCV_WAITING:
                                202                 :         case WALRCV_RESTARTING:
 4820                           203              26 :             walrcv->walRcvState = WALRCV_STOPPING;
                                204                 :             /* fall through */
                                205              26 :         case WALRCV_STOPPING:
                                206              26 :             walrcvpid = walrcv->pid;
                                207              26 :             break;
                                208                 :     }
 4832                           209            1283 :     SpinLockRelease(&walrcv->mutex);
                                210                 : 
                                211                 :     /* Unnecessary but consistent. */
  758 tmunro                    212            1283 :     if (stopped)
                                213              10 :         ConditionVariableBroadcast(&walrcv->walRcvStoppedCV);
                                214                 : 
                                215                 :     /*
                                216                 :      * Signal walreceiver process if it was still running.
                                217                 :      */
 4832 heikki.linnakangas        218            1283 :     if (walrcvpid != 0)
                                219              26 :         kill(walrcvpid, SIGTERM);
                                220                 : 
                                221                 :     /*
                                222                 :      * Wait for walreceiver to acknowledge its death by setting state to
                                223                 :      * WALRCV_STOPPED.
                                224                 :      */
  758 tmunro                    225            1283 :     ConditionVariablePrepareToSleep(&walrcv->walRcvStoppedCV);
 3769 heikki.linnakangas        226            1309 :     while (WalRcvRunning())
  758 tmunro                    227              26 :         ConditionVariableSleep(&walrcv->walRcvStoppedCV,
                                228                 :                                WAIT_EVENT_WAL_RECEIVER_EXIT);
                                229            1283 :     ConditionVariableCancelSleep();
 4832 heikki.linnakangas        230            1283 : }
                                231                 : 
                                232                 : /*
                                233                 :  * Request postmaster to start walreceiver.
                                234                 :  *
                                235                 :  * "recptr" indicates the position where streaming should begin.  "conninfo"
                                236                 :  * is a libpq connection string to use.  "slotname" is, optionally, the name
                                237                 :  * of a replication slot to acquire.  "create_temp_slot" indicates to create
                                238                 :  * a temporary slot when no "slotname" is given.
                                239                 :  *
                                240                 :  * WAL receivers do not directly load GUC parameters used for the connection
                                241                 :  * to the primary, and rely on the values passed down by the caller of this
                                242                 :  * routine instead.  Hence, the addition of any new parameters should happen
                                243                 :  * through this code path.
                                244                 :  */
                                245                 : void
 3355 rhaas                     246             125 : RequestXLogStreaming(TimeLineID tli, XLogRecPtr recptr, const char *conninfo,
                                247                 :                      const char *slotname, bool create_temp_slot)
                                248                 : {
 2742                           249             125 :     WalRcvData *walrcv = WalRcv;
 3769 heikki.linnakangas        250             125 :     bool        launch = false;
 4790 bruce                     251             125 :     pg_time_t   now = (pg_time_t) time(NULL);
                                252                 :     Latch      *latch;
                                253                 : 
                                254                 :     /*
                                255                 :      * We always start at the beginning of the segment. That prevents a broken
                                256                 :      * segment (i.e., with no records in the first half of a segment) from
                                257                 :      * being created by XLOG streaming, which might cause trouble later on if
                                258                 :      * the segment is e.g archived.
                                259                 :      */
 2028 andres                    260             125 :     if (XLogSegmentOffset(recptr, wal_segment_size) != 0)
                                261             125 :         recptr -= XLogSegmentOffset(recptr, wal_segment_size);
                                262                 : 
 4663 tgl                       263             125 :     SpinLockAcquire(&walrcv->mutex);
                                264                 : 
                                265                 :     /* It better be stopped if we try to restart it */
 3769 heikki.linnakangas        266             125 :     Assert(walrcv->walRcvState == WALRCV_STOPPED ||
                                267                 :            walrcv->walRcvState == WALRCV_WAITING);
                                268                 : 
 4832                           269             125 :     if (conninfo != NULL)
                                270             125 :         strlcpy((char *) walrcv->conninfo, conninfo, MAXCONNINFO);
                                271                 :     else
 4832 heikki.linnakangas        272 UBC           0 :         walrcv->conninfo[0] = '\0';
                                273                 : 
                                274                 :     /*
                                275                 :      * Use configured replication slot if present, and ignore the value of
                                276                 :      * create_temp_slot as the slot name should be persistent.  Otherwise, use
                                277                 :      * create_temp_slot to determine whether this WAL receiver should create a
                                278                 :      * temporary slot by itself and use it, or not.
                                279                 :      */
 1108 alvherre                  280 CBC         125 :     if (slotname != NULL && slotname[0] != '\0')
                                281                 :     {
 3355 rhaas                     282              21 :         strlcpy((char *) walrcv->slotname, slotname, NAMEDATALEN);
 1108 alvherre                  283              21 :         walrcv->is_temp_slot = false;
                                284                 :     }
                                285                 :     else
                                286                 :     {
 3355 rhaas                     287             104 :         walrcv->slotname[0] = '\0';
 1108 alvherre                  288             104 :         walrcv->is_temp_slot = create_temp_slot;
                                289                 :     }
                                290                 : 
 3769 heikki.linnakangas        291             125 :     if (walrcv->walRcvState == WALRCV_STOPPED)
                                292                 :     {
                                293             125 :         launch = true;
                                294             125 :         walrcv->walRcvState = WALRCV_STARTING;
                                295                 :     }
                                296                 :     else
 3769 heikki.linnakangas        297 UBC           0 :         walrcv->walRcvState = WALRCV_RESTARTING;
 4820 heikki.linnakangas        298 CBC         125 :     walrcv->startTime = now;
                                299                 : 
                                300                 :     /*
                                301                 :      * If this is the first startup of walreceiver (on this timeline),
                                302                 :      * initialize flushedUpto and latestChunkStart to the starting point.
                                303                 :      */
 3623                           304             125 :     if (walrcv->receiveStart == 0 || walrcv->receivedTLI != tli)
                                305                 :     {
 1096 tmunro                    306              67 :         walrcv->flushedUpto = recptr;
 3623 heikki.linnakangas        307              67 :         walrcv->receivedTLI = tli;
 4422                           308              67 :         walrcv->latestChunkStart = recptr;
                                309                 :     }
                                310             125 :     walrcv->receiveStart = recptr;
 3769                           311             125 :     walrcv->receiveStartTLI = tli;
                                312                 : 
 2014 tgl                       313             125 :     latch = walrcv->latch;
                                314                 : 
 4832 heikki.linnakangas        315             125 :     SpinLockRelease(&walrcv->mutex);
                                316                 : 
 3769                           317             125 :     if (launch)
                                318             125 :         SendPostmasterSignal(PMSIGNAL_START_WALRECEIVER);
 2014 tgl                       319 UBC           0 :     else if (latch)
                                320               0 :         SetLatch(latch);
 4832 heikki.linnakangas        321 CBC         125 : }
                                322                 : 
                                323                 : /*
                                324                 :  * Returns the last+1 byte position that walreceiver has flushed.
                                325                 :  *
                                326                 :  * Optionally, returns the previous chunk start, that is the first byte
                                327                 :  * written in the most recent walreceiver flush cycle.  Callers not
                                328                 :  * interested in that value may pass NULL for latestChunkStart. Same for
                                329                 :  * receiveTLI.
                                330                 :  */
                                331                 : XLogRecPtr
 1096 tmunro                    332           28001 : GetWalRcvFlushRecPtr(XLogRecPtr *latestChunkStart, TimeLineID *receiveTLI)
                                333                 : {
 2742 rhaas                     334           28001 :     WalRcvData *walrcv = WalRcv;
                                335                 :     XLogRecPtr  recptr;
                                336                 : 
 4832 heikki.linnakangas        337           28001 :     SpinLockAcquire(&walrcv->mutex);
 1096 tmunro                    338           28001 :     recptr = walrcv->flushedUpto;
 4663 tgl                       339           28001 :     if (latestChunkStart)
                                340           26705 :         *latestChunkStart = walrcv->latestChunkStart;
 3769 heikki.linnakangas        341           28001 :     if (receiveTLI)
                                342           27971 :         *receiveTLI = walrcv->receivedTLI;
 4832                           343           28001 :     SpinLockRelease(&walrcv->mutex);
                                344                 : 
                                345           28001 :     return recptr;
                                346                 : }
                                347                 : 
                                348                 : /*
                                349                 :  * Returns the last+1 byte position that walreceiver has written.
                                350                 :  * This returns a recently written value without taking a lock.
                                351                 :  */
                                352                 : XLogRecPtr
 1096 tmunro                    353 UBC           0 : GetWalRcvWriteRecPtr(void)
                                354                 : {
                                355               0 :     WalRcvData *walrcv = WalRcv;
                                356                 : 
                                357               0 :     return pg_atomic_read_u64(&walrcv->writtenUpto);
                                358                 : }
                                359                 : 
                                360                 : /*
                                361                 :  * Returns the replication apply delay in ms or -1
                                362                 :  * if the apply delay info is not available
                                363                 :  */
                                364                 : int
 4117 simon                     365 CBC        1212 : GetReplicationApplyDelay(void)
                                366                 : {
 2742 rhaas                     367            1212 :     WalRcvData *walrcv = WalRcv;
                                368                 :     XLogRecPtr  receivePtr;
                                369                 :     XLogRecPtr  replayPtr;
                                370                 :     TimestampTz chunkReplayStartTime;
                                371                 : 
 4117 simon                     372            1212 :     SpinLockAcquire(&walrcv->mutex);
 1096 tmunro                    373            1212 :     receivePtr = walrcv->flushedUpto;
 4117 simon                     374            1212 :     SpinLockRelease(&walrcv->mutex);
                                375                 : 
 3762 heikki.linnakangas        376            1212 :     replayPtr = GetXLogReplayRecPtr(NULL);
                                377                 : 
 3754 alvherre                  378            1212 :     if (receivePtr == replayPtr)
 4117 simon                     379             606 :         return 0;
                                380                 : 
 2217 peter_e                   381             606 :     chunkReplayStartTime = GetCurrentChunkReplayStartTime();
                                382                 : 
                                383             606 :     if (chunkReplayStartTime == 0)
 2948 ishii                     384              82 :         return -1;
                                385                 : 
  880 tgl                       386             524 :     return TimestampDifferenceMilliseconds(chunkReplayStartTime,
                                387                 :                                            GetCurrentTimestamp());
                                388                 : }
                                389                 : 
                                390                 : /*
                                391                 :  * Returns the network latency in ms, note that this includes any
                                392                 :  * difference in clock settings between the servers, as well as timezone.
                                393                 :  */
                                394                 : int
 4117 simon                     395            1212 : GetReplicationTransferLatency(void)
                                396                 : {
 2742 rhaas                     397            1212 :     WalRcvData *walrcv = WalRcv;
                                398                 :     TimestampTz lastMsgSendTime;
                                399                 :     TimestampTz lastMsgReceiptTime;
                                400                 : 
 4117 simon                     401            1212 :     SpinLockAcquire(&walrcv->mutex);
                                402            1212 :     lastMsgSendTime = walrcv->lastMsgSendTime;
                                403            1212 :     lastMsgReceiptTime = walrcv->lastMsgReceiptTime;
                                404            1212 :     SpinLockRelease(&walrcv->mutex);
                                405                 : 
  880 tgl                       406            1212 :     return TimestampDifferenceMilliseconds(lastMsgSendTime,
                                407                 :                                            lastMsgReceiptTime);
                                408                 : }
        

Generated by: LCOV version v1.16-55-g56c0a2a