LCOV - differential code coverage report
Current view: top level - src/backend/replication - walreceiverfuncs.c (source / functions) Coverage Total Hit UBC CBC
Current: Differential Code Coverage HEAD vs 15 Lines: 83.1 % 148 123 25 123
Current Date: 2023-04-08 15:15:32 Functions: 90.0 % 10 9 1 9
Baseline: 15
Baseline Date: 2023-04-08 15:09:40
Legend: Lines: hit not hit

           TLA  Line data    Source code
       1                 : /*-------------------------------------------------------------------------
       2                 :  *
       3                 :  * walreceiverfuncs.c
       4                 :  *
       5                 :  * This file contains functions used by the startup process to communicate
       6                 :  * with the walreceiver process. Functions implementing walreceiver itself
       7                 :  * are in walreceiver.c.
       8                 :  *
       9                 :  * Portions Copyright (c) 2010-2023, PostgreSQL Global Development Group
      10                 :  *
      11                 :  *
      12                 :  * IDENTIFICATION
      13                 :  *    src/backend/replication/walreceiverfuncs.c
      14                 :  *
      15                 :  *-------------------------------------------------------------------------
      16                 :  */
      17                 : #include "postgres.h"
      18                 : 
      19                 : #include <sys/stat.h>
      20                 : #include <sys/time.h>
      21                 : #include <time.h>
      22                 : #include <unistd.h>
      23                 : #include <signal.h>
      24                 : 
      25                 : #include "access/xlog_internal.h"
      26                 : #include "access/xlogrecovery.h"
      27                 : #include "pgstat.h"
      28                 : #include "postmaster/startup.h"
      29                 : #include "replication/walreceiver.h"
      30                 : #include "storage/pmsignal.h"
      31                 : #include "storage/shmem.h"
      32                 : #include "utils/timestamp.h"
      33                 : 
      34                 : WalRcvData *WalRcv = NULL;
      35                 : 
      36                 : /*
      37                 :  * How long to wait for walreceiver to start up after requesting
      38                 :  * postmaster to launch it. In seconds.
      39                 :  */
      40                 : #define WALRCV_STARTUP_TIMEOUT 10
      41                 : 
      42                 : /* Report shared memory space needed by WalRcvShmemInit */
      43                 : Size
      44 CBC        6390 : WalRcvShmemSize(void)
      45                 : {
      46            6390 :     Size        size = 0;
      47                 : 
      48            6390 :     size = add_size(size, sizeof(WalRcvData));
      49                 : 
      50            6390 :     return size;
      51                 : }
      52                 : 
      53                 : /* Allocate and initialize walreceiver-related shared memory */
      54                 : void
      55            1826 : WalRcvShmemInit(void)
      56                 : {
      57                 :     bool        found;
      58                 : 
      59            1826 :     WalRcv = (WalRcvData *)
      60            1826 :         ShmemInitStruct("Wal Receiver Ctl", WalRcvShmemSize(), &found);
      61                 : 
      62            1826 :     if (!found)
      63                 :     {
      64                 :         /* First time through, so initialize */
      65            1826 :         MemSet(WalRcv, 0, WalRcvShmemSize());
      66            1826 :         WalRcv->walRcvState = WALRCV_STOPPED;
      67            1826 :         ConditionVariableInit(&WalRcv->walRcvStoppedCV);
      68            1826 :         SpinLockInit(&WalRcv->mutex);
      69            1826 :         pg_atomic_init_u64(&WalRcv->writtenUpto, 0);
      70            1826 :         WalRcv->latch = NULL;
      71                 :     }
      72            1826 : }
      73                 : 
      74                 : /* Is walreceiver running (or starting up)? */
      75                 : bool
      76            1309 : WalRcvRunning(void)
      77                 : {
      78            1309 :     WalRcvData *walrcv = WalRcv;
      79                 :     WalRcvState state;
      80                 :     pg_time_t   startTime;
      81                 : 
      82            1309 :     SpinLockAcquire(&walrcv->mutex);
      83                 : 
      84            1309 :     state = walrcv->walRcvState;
      85            1309 :     startTime = walrcv->startTime;
      86                 : 
      87            1309 :     SpinLockRelease(&walrcv->mutex);
      88                 : 
      89                 :     /*
      90                 :      * If it has taken too long for walreceiver to start up, give up. Setting
      91                 :      * the state to STOPPED ensures that if walreceiver later does start up
      92                 :      * after all, it will see that it's not supposed to be running and die
      93                 :      * without doing anything.
      94                 :      */
      95            1309 :     if (state == WALRCV_STARTING)
      96                 :     {
      97 UBC           0 :         pg_time_t   now = (pg_time_t) time(NULL);
      98                 : 
      99               0 :         if ((now - startTime) > WALRCV_STARTUP_TIMEOUT)
     100                 :         {
     101               0 :             bool        stopped = false;
     102                 : 
     103               0 :             SpinLockAcquire(&walrcv->mutex);
     104               0 :             if (walrcv->walRcvState == WALRCV_STARTING)
     105                 :             {
     106               0 :                 state = walrcv->walRcvState = WALRCV_STOPPED;
     107               0 :                 stopped = true;
     108                 :             }
     109               0 :             SpinLockRelease(&walrcv->mutex);
     110                 : 
     111               0 :             if (stopped)
     112               0 :                 ConditionVariableBroadcast(&walrcv->walRcvStoppedCV);
     113                 :         }
     114                 :     }
     115                 : 
     116 CBC        1309 :     if (state != WALRCV_STOPPED)
     117              26 :         return true;
     118                 :     else
     119            1283 :         return false;
     120                 : }
     121                 : 
     122                 : /*
     123                 :  * Is walreceiver running and streaming (or at least attempting to connect,
     124                 :  * or starting up)?
     125                 :  */
     126                 : bool
     127           29631 : WalRcvStreaming(void)
     128                 : {
     129           29631 :     WalRcvData *walrcv = WalRcv;
     130                 :     WalRcvState state;
     131                 :     pg_time_t   startTime;
     132                 : 
     133           29631 :     SpinLockAcquire(&walrcv->mutex);
     134                 : 
     135           29631 :     state = walrcv->walRcvState;
     136           29631 :     startTime = walrcv->startTime;
     137                 : 
     138           29631 :     SpinLockRelease(&walrcv->mutex);
     139                 : 
     140                 :     /*
     141                 :      * If it has taken too long for walreceiver to start up, give up. Setting
     142                 :      * the state to STOPPED ensures that if walreceiver later does start up
     143                 :      * after all, it will see that it's not supposed to be running and die
     144                 :      * without doing anything.
     145                 :      */
     146           29631 :     if (state == WALRCV_STARTING)
     147                 :     {
     148             216 :         pg_time_t   now = (pg_time_t) time(NULL);
     149                 : 
     150             216 :         if ((now - startTime) > WALRCV_STARTUP_TIMEOUT)
     151                 :         {
     152 UBC           0 :             bool        stopped = false;
     153                 : 
     154               0 :             SpinLockAcquire(&walrcv->mutex);
     155               0 :             if (walrcv->walRcvState == WALRCV_STARTING)
     156                 :             {
     157               0 :                 state = walrcv->walRcvState = WALRCV_STOPPED;
     158               0 :                 stopped = true;
     159                 :             }
     160               0 :             SpinLockRelease(&walrcv->mutex);
     161                 : 
     162               0 :             if (stopped)
     163               0 :                 ConditionVariableBroadcast(&walrcv->walRcvStoppedCV);
     164                 :         }
     165                 :     }
     166                 : 
     167 CBC       29631 :     if (state == WALRCV_STREAMING || state == WALRCV_STARTING ||
     168                 :         state == WALRCV_RESTARTING)
     169           26853 :         return true;
     170                 :     else
     171            2778 :         return false;
     172                 : }
     173                 : 
     174                 : /*
     175                 :  * Stop walreceiver (if running) and wait for it to die.
     176                 :  * Executed by the Startup process.
     177                 :  */
     178                 : void
     179            1283 : ShutdownWalRcv(void)
     180                 : {
     181            1283 :     WalRcvData *walrcv = WalRcv;
     182            1283 :     pid_t       walrcvpid = 0;
     183            1283 :     bool        stopped = false;
     184                 : 
     185                 :     /*
     186                 :      * Request walreceiver to stop. Walreceiver will switch to WALRCV_STOPPED
     187                 :      * mode once it's finished, and will also request postmaster to not
     188                 :      * restart itself.
     189                 :      */
     190            1283 :     SpinLockAcquire(&walrcv->mutex);
     191            1283 :     switch (walrcv->walRcvState)
     192                 :     {
     193            1247 :         case WALRCV_STOPPED:
     194            1247 :             break;
     195              10 :         case WALRCV_STARTING:
     196              10 :             walrcv->walRcvState = WALRCV_STOPPED;
     197              10 :             stopped = true;
     198              10 :             break;
     199                 : 
     200              26 :         case WALRCV_STREAMING:
     201                 :         case WALRCV_WAITING:
     202                 :         case WALRCV_RESTARTING:
     203              26 :             walrcv->walRcvState = WALRCV_STOPPING;
     204                 :             /* fall through */
     205              26 :         case WALRCV_STOPPING:
     206              26 :             walrcvpid = walrcv->pid;
     207              26 :             break;
     208                 :     }
     209            1283 :     SpinLockRelease(&walrcv->mutex);
     210                 : 
     211                 :     /* Unnecessary but consistent. */
     212            1283 :     if (stopped)
     213              10 :         ConditionVariableBroadcast(&walrcv->walRcvStoppedCV);
     214                 : 
     215                 :     /*
     216                 :      * Signal walreceiver process if it was still running.
     217                 :      */
     218            1283 :     if (walrcvpid != 0)
     219              26 :         kill(walrcvpid, SIGTERM);
     220                 : 
     221                 :     /*
     222                 :      * Wait for walreceiver to acknowledge its death by setting state to
     223                 :      * WALRCV_STOPPED.
     224                 :      */
     225            1283 :     ConditionVariablePrepareToSleep(&walrcv->walRcvStoppedCV);
     226            1309 :     while (WalRcvRunning())
     227              26 :         ConditionVariableSleep(&walrcv->walRcvStoppedCV,
     228                 :                                WAIT_EVENT_WAL_RECEIVER_EXIT);
     229            1283 :     ConditionVariableCancelSleep();
     230            1283 : }
     231                 : 
     232                 : /*
     233                 :  * Request postmaster to start walreceiver.
     234                 :  *
     235                 :  * "recptr" indicates the position where streaming should begin.  "conninfo"
     236                 :  * is a libpq connection string to use.  "slotname" is, optionally, the name
     237                 :  * of a replication slot to acquire.  "create_temp_slot" indicates to create
     238                 :  * a temporary slot when no "slotname" is given.
     239                 :  *
     240                 :  * WAL receivers do not directly load GUC parameters used for the connection
     241                 :  * to the primary, and rely on the values passed down by the caller of this
     242                 :  * routine instead.  Hence, the addition of any new parameters should happen
     243                 :  * through this code path.
     244                 :  */
     245                 : void
     246             125 : RequestXLogStreaming(TimeLineID tli, XLogRecPtr recptr, const char *conninfo,
     247                 :                      const char *slotname, bool create_temp_slot)
     248                 : {
     249             125 :     WalRcvData *walrcv = WalRcv;
     250             125 :     bool        launch = false;
     251             125 :     pg_time_t   now = (pg_time_t) time(NULL);
     252                 :     Latch      *latch;
     253                 : 
     254                 :     /*
     255                 :      * We always start at the beginning of the segment. That prevents a broken
     256                 :      * segment (i.e., with no records in the first half of a segment) from
     257                 :      * being created by XLOG streaming, which might cause trouble later on if
     258                 :      * the segment is e.g archived.
     259                 :      */
     260             125 :     if (XLogSegmentOffset(recptr, wal_segment_size) != 0)
     261             125 :         recptr -= XLogSegmentOffset(recptr, wal_segment_size);
     262                 : 
     263             125 :     SpinLockAcquire(&walrcv->mutex);
     264                 : 
     265                 :     /* It better be stopped if we try to restart it */
     266             125 :     Assert(walrcv->walRcvState == WALRCV_STOPPED ||
     267                 :            walrcv->walRcvState == WALRCV_WAITING);
     268                 : 
     269             125 :     if (conninfo != NULL)
     270             125 :         strlcpy((char *) walrcv->conninfo, conninfo, MAXCONNINFO);
     271                 :     else
     272 UBC           0 :         walrcv->conninfo[0] = '\0';
     273                 : 
     274                 :     /*
     275                 :      * Use configured replication slot if present, and ignore the value of
     276                 :      * create_temp_slot as the slot name should be persistent.  Otherwise, use
     277                 :      * create_temp_slot to determine whether this WAL receiver should create a
     278                 :      * temporary slot by itself and use it, or not.
     279                 :      */
     280 CBC         125 :     if (slotname != NULL && slotname[0] != '\0')
     281                 :     {
     282              21 :         strlcpy((char *) walrcv->slotname, slotname, NAMEDATALEN);
     283              21 :         walrcv->is_temp_slot = false;
     284                 :     }
     285                 :     else
     286                 :     {
     287             104 :         walrcv->slotname[0] = '\0';
     288             104 :         walrcv->is_temp_slot = create_temp_slot;
     289                 :     }
     290                 : 
     291             125 :     if (walrcv->walRcvState == WALRCV_STOPPED)
     292                 :     {
     293             125 :         launch = true;
     294             125 :         walrcv->walRcvState = WALRCV_STARTING;
     295                 :     }
     296                 :     else
     297 UBC           0 :         walrcv->walRcvState = WALRCV_RESTARTING;
     298 CBC         125 :     walrcv->startTime = now;
     299                 : 
     300                 :     /*
     301                 :      * If this is the first startup of walreceiver (on this timeline),
     302                 :      * initialize flushedUpto and latestChunkStart to the starting point.
     303                 :      */
     304             125 :     if (walrcv->receiveStart == 0 || walrcv->receivedTLI != tli)
     305                 :     {
     306              67 :         walrcv->flushedUpto = recptr;
     307              67 :         walrcv->receivedTLI = tli;
     308              67 :         walrcv->latestChunkStart = recptr;
     309                 :     }
     310             125 :     walrcv->receiveStart = recptr;
     311             125 :     walrcv->receiveStartTLI = tli;
     312                 : 
     313             125 :     latch = walrcv->latch;
     314                 : 
     315             125 :     SpinLockRelease(&walrcv->mutex);
     316                 : 
     317             125 :     if (launch)
     318             125 :         SendPostmasterSignal(PMSIGNAL_START_WALRECEIVER);
     319 UBC           0 :     else if (latch)
     320               0 :         SetLatch(latch);
     321 CBC         125 : }
     322                 : 
     323                 : /*
     324                 :  * Returns the last+1 byte position that walreceiver has flushed.
     325                 :  *
     326                 :  * Optionally, returns the previous chunk start, that is the first byte
     327                 :  * written in the most recent walreceiver flush cycle.  Callers not
     328                 :  * interested in that value may pass NULL for latestChunkStart. Same for
     329                 :  * receiveTLI.
     330                 :  */
     331                 : XLogRecPtr
     332           28001 : GetWalRcvFlushRecPtr(XLogRecPtr *latestChunkStart, TimeLineID *receiveTLI)
     333                 : {
     334           28001 :     WalRcvData *walrcv = WalRcv;
     335                 :     XLogRecPtr  recptr;
     336                 : 
     337           28001 :     SpinLockAcquire(&walrcv->mutex);
     338           28001 :     recptr = walrcv->flushedUpto;
     339           28001 :     if (latestChunkStart)
     340           26705 :         *latestChunkStart = walrcv->latestChunkStart;
     341           28001 :     if (receiveTLI)
     342           27971 :         *receiveTLI = walrcv->receivedTLI;
     343           28001 :     SpinLockRelease(&walrcv->mutex);
     344                 : 
     345           28001 :     return recptr;
     346                 : }
     347                 : 
     348                 : /*
     349                 :  * Returns the last+1 byte position that walreceiver has written.
     350                 :  * This returns a recently written value without taking a lock.
     351                 :  */
     352                 : XLogRecPtr
     353 UBC           0 : GetWalRcvWriteRecPtr(void)
     354                 : {
     355               0 :     WalRcvData *walrcv = WalRcv;
     356                 : 
     357               0 :     return pg_atomic_read_u64(&walrcv->writtenUpto);
     358                 : }
     359                 : 
     360                 : /*
     361                 :  * Returns the replication apply delay in ms or -1
     362                 :  * if the apply delay info is not available
     363                 :  */
     364                 : int
     365 CBC        1212 : GetReplicationApplyDelay(void)
     366                 : {
     367            1212 :     WalRcvData *walrcv = WalRcv;
     368                 :     XLogRecPtr  receivePtr;
     369                 :     XLogRecPtr  replayPtr;
     370                 :     TimestampTz chunkReplayStartTime;
     371                 : 
     372            1212 :     SpinLockAcquire(&walrcv->mutex);
     373            1212 :     receivePtr = walrcv->flushedUpto;
     374            1212 :     SpinLockRelease(&walrcv->mutex);
     375                 : 
     376            1212 :     replayPtr = GetXLogReplayRecPtr(NULL);
     377                 : 
     378            1212 :     if (receivePtr == replayPtr)
     379             606 :         return 0;
     380                 : 
     381             606 :     chunkReplayStartTime = GetCurrentChunkReplayStartTime();
     382                 : 
     383             606 :     if (chunkReplayStartTime == 0)
     384              82 :         return -1;
     385                 : 
     386             524 :     return TimestampDifferenceMilliseconds(chunkReplayStartTime,
     387                 :                                            GetCurrentTimestamp());
     388                 : }
     389                 : 
     390                 : /*
     391                 :  * Returns the network latency in ms, note that this includes any
     392                 :  * difference in clock settings between the servers, as well as timezone.
     393                 :  */
     394                 : int
     395            1212 : GetReplicationTransferLatency(void)
     396                 : {
     397            1212 :     WalRcvData *walrcv = WalRcv;
     398                 :     TimestampTz lastMsgSendTime;
     399                 :     TimestampTz lastMsgReceiptTime;
     400                 : 
     401            1212 :     SpinLockAcquire(&walrcv->mutex);
     402            1212 :     lastMsgSendTime = walrcv->lastMsgSendTime;
     403            1212 :     lastMsgReceiptTime = walrcv->lastMsgReceiptTime;
     404            1212 :     SpinLockRelease(&walrcv->mutex);
     405                 : 
     406            1212 :     return TimestampDifferenceMilliseconds(lastMsgSendTime,
     407                 :                                            lastMsgReceiptTime);
     408                 : }
        

Generated by: LCOV version v1.16-55-g56c0a2a