LCOV - differential code coverage report
Current view: top level - src/backend/replication - walreceiver.c (source / functions) Coverage Total Hit UNC LBC UIC UBC GBC GIC GNC CBC EUB ECB DUB DCB
Current: Differential Code Coverage HEAD vs 15 Lines: 75.2 % 525 395 7 46 66 11 20 224 48 103 98 231 1 15
Current Date: 2023-04-08 17:13:01 Functions: 93.8 % 16 15 1 14 1 1 15
Baseline: 15 Line coverage date bins:
Baseline Date: 2023-04-08 15:09:40 [..60] days: 100.0 % 2 2 2
Legend: Lines: hit not hit (60,120] days: 75.0 % 16 12 4 12
(120,180] days: 94.6 % 37 35 2 34 1 1
(180,240] days: 100.0 % 1 1 1
(240..) days: 73.6 % 469 345 1 46 66 11 20 224 101 60 231
Function coverage date bins:
(120,180] days: 100.0 % 1 1 1
(240..) days: 45.2 % 31 14 1 14 1 15

 Age         Owner                  TLA  Line data    Source code
                                  1                 : /*-------------------------------------------------------------------------
                                  2                 :  *
                                  3                 :  * walreceiver.c
                                  4                 :  *
                                  5                 :  * The WAL receiver process (walreceiver) is new as of Postgres 9.0. It
                                  6                 :  * is the process in the standby server that takes charge of receiving
                                  7                 :  * XLOG records from a primary server during streaming replication.
                                  8                 :  *
                                  9                 :  * When the startup process determines that it's time to start streaming,
                                 10                 :  * it instructs postmaster to start walreceiver. Walreceiver first connects
                                 11                 :  * to the primary server (it will be served by a walsender process
                                 12                 :  * in the primary server), and then keeps receiving XLOG records and
                                 13                 :  * writing them to the disk as long as the connection is alive. As XLOG
                                 14                 :  * records are received and flushed to disk, it updates the
                                 15                 :  * WalRcv->flushedUpto variable in shared memory, to inform the startup
                                 16                 :  * process of how far it can proceed with XLOG replay.
                                 17                 :  *
                                 18                 :  * A WAL receiver cannot directly load GUC parameters used when establishing
                                 19                 :  * its connection to the primary. Instead it relies on parameter values
                                 20                 :  * that are passed down by the startup process when streaming is requested.
                                 21                 :  * This applies, for example, to the replication slot and the connection
                                 22                 :  * string to be used for the connection with the primary.
                                 23                 :  *
                                 24                 :  * If the primary server ends streaming, but doesn't disconnect, walreceiver
                                 25                 :  * goes into "waiting" mode, and waits for the startup process to give new
                                 26                 :  * instructions. The startup process will treat that the same as
                                 27                 :  * disconnection, and will rescan the archive/pg_wal directory. But when the
                                 28                 :  * startup process wants to try streaming replication again, it will just
                                 29                 :  * nudge the existing walreceiver process that's waiting, instead of launching
                                 30                 :  * a new one.
                                 31                 :  *
                                 32                 :  * Normal termination is by SIGTERM, which instructs the walreceiver to
                                 33                 :  * exit(0). Emergency termination is by SIGQUIT; like any postmaster child
                                 34                 :  * process, the walreceiver will simply abort and exit on SIGQUIT. A close
                                 35                 :  * of the connection and a FATAL error are treated not as a crash but as
                                 36                 :  * normal operation.
                                 37                 :  *
                                 38                 :  * This file contains the server-facing parts of walreceiver. The libpq-
                                 39                 :  * specific parts are in the libpqwalreceiver module. It's loaded
                                 40                 :  * dynamically to avoid linking the server with libpq.
                                 41                 :  *
                                 42                 :  * Portions Copyright (c) 2010-2023, PostgreSQL Global Development Group
                                 43                 :  *
                                 44                 :  *
                                 45                 :  * IDENTIFICATION
                                 46                 :  *    src/backend/replication/walreceiver.c
                                 47                 :  *
                                 48                 :  *-------------------------------------------------------------------------
                                 49                 :  */
                                 50                 : #include "postgres.h"
                                 51                 : 
                                 52                 : #include <unistd.h>
                                 53                 : 
                                 54                 : #include "access/htup_details.h"
                                 55                 : #include "access/timeline.h"
                                 56                 : #include "access/transam.h"
                                 57                 : #include "access/xlog_internal.h"
                                 58                 : #include "access/xlogarchive.h"
                                 59                 : #include "access/xlogrecovery.h"
                                 60                 : #include "catalog/pg_authid.h"
                                 61                 : #include "catalog/pg_type.h"
                                 62                 : #include "common/ip.h"
                                 63                 : #include "funcapi.h"
                                 64                 : #include "libpq/pqformat.h"
                                 65                 : #include "libpq/pqsignal.h"
                                 66                 : #include "miscadmin.h"
                                 67                 : #include "pgstat.h"
                                 68                 : #include "postmaster/interrupt.h"
                                 69                 : #include "replication/walreceiver.h"
                                 70                 : #include "replication/walsender.h"
                                 71                 : #include "storage/ipc.h"
                                 72                 : #include "storage/pmsignal.h"
                                 73                 : #include "storage/proc.h"
                                 74                 : #include "storage/procarray.h"
                                 75                 : #include "storage/procsignal.h"
                                 76                 : #include "utils/acl.h"
                                 77                 : #include "utils/builtins.h"
                                 78                 : #include "utils/guc.h"
                                 79                 : #include "utils/pg_lsn.h"
                                 80                 : #include "utils/ps_status.h"
                                 81                 : #include "utils/resowner.h"
                                 82                 : #include "utils/timestamp.h"
                                 83                 : 
                                 84                 : 
                                 85                 : /*
                                 86                 :  * GUC variables.  (Other variables that affect walreceiver are in xlog.c
                                 87                 :  * because they're passed down from the startup process, for better
                                 88                 :  * synchronization.)
                                 89                 :  */
                                 90                 : int         wal_receiver_status_interval;
                                 91                 : int         wal_receiver_timeout;
                                 92                 : bool        hot_standby_feedback;
                                 93                 : 
                                 94                 : /* libpqwalreceiver connection */
                                 95                 : static WalReceiverConn *wrconn = NULL;
                                 96                 : WalReceiverFunctionsType *WalReceiverFunctions = NULL;
                                 97                 : 
                                 98                 : /*
                                 99                 :  * These variables are used similarly to openLogFile/SegNo,
                                100                 :  * but for walreceiver to write the XLOG. recvFileTLI is the TimeLineID
                                101                 :  * corresponding the filename of recvFile.
                                102                 :  */
                                103                 : static int  recvFile = -1;
                                104                 : static TimeLineID recvFileTLI = 0;
                                105                 : static XLogSegNo recvSegNo = 0;
                                106                 : 
                                107                 : /*
                                108                 :  * LogstreamResult indicates the byte positions that we have already
                                109                 :  * written/fsynced.
                                110                 :  */
                                111                 : static struct
                                112                 : {
                                113                 :     XLogRecPtr  Write;          /* last byte + 1 written out in the standby */
                                114                 :     XLogRecPtr  Flush;          /* last byte + 1 flushed in the standby */
                                115                 : }           LogstreamResult;
                                116                 : 
                                117                 : /*
                                118                 :  * Reasons to wake up and perform periodic tasks.
                                119                 :  */
                                120                 : typedef enum WalRcvWakeupReason
                                121                 : {
                                122                 :     WALRCV_WAKEUP_TERMINATE,
                                123                 :     WALRCV_WAKEUP_PING,
                                124                 :     WALRCV_WAKEUP_REPLY,
                                125                 :     WALRCV_WAKEUP_HSFEEDBACK
                                126                 : #define NUM_WALRCV_WAKEUPS (WALRCV_WAKEUP_HSFEEDBACK + 1)
                                127                 : } WalRcvWakeupReason;
                                128                 : 
                                129                 : /*
                                130                 :  * Wake up times for periodic tasks.
                                131                 :  */
                                132                 : static TimestampTz wakeup[NUM_WALRCV_WAKEUPS];
                                133                 : 
                                134                 : static StringInfoData reply_message;
                                135                 : static StringInfoData incoming_message;
                                136                 : 
                                137                 : /* Prototypes for private functions */
                                138                 : static void WalRcvFetchTimeLineHistoryFiles(TimeLineID first, TimeLineID last);
                                139                 : static void WalRcvWaitForStartPosition(XLogRecPtr *startpoint, TimeLineID *startpointTLI);
                                140                 : static void WalRcvDie(int code, Datum arg);
                                141                 : static void XLogWalRcvProcessMsg(unsigned char type, char *buf, Size len,
                                142                 :                                  TimeLineID tli);
                                143                 : static void XLogWalRcvWrite(char *buf, Size nbytes, XLogRecPtr recptr,
                                144                 :                             TimeLineID tli);
                                145                 : static void XLogWalRcvFlush(bool dying, TimeLineID tli);
                                146                 : static void XLogWalRcvClose(XLogRecPtr recptr, TimeLineID tli);
                                147                 : static void XLogWalRcvSendReply(bool force, bool requestReply);
                                148                 : static void XLogWalRcvSendHSFeedback(bool immed);
                                149                 : static void ProcessWalSndrMessage(XLogRecPtr walEnd, TimestampTz sendTime);
                                150                 : static void WalRcvComputeNextWakeup(WalRcvWakeupReason reason, TimestampTz now);
                                151                 : 
                                152                 : /*
                                153                 :  * Process any interrupts the walreceiver process may have received.
                                154                 :  * This should be called any time the process's latch has become set.
                                155                 :  *
                                156                 :  * Currently, only SIGTERM is of interest.  We can't just exit(1) within the
                                157                 :  * SIGTERM signal handler, because the signal might arrive in the middle of
                                158                 :  * some critical operation, like while we're holding a spinlock.  Instead, the
                                159                 :  * signal handler sets a flag variable as well as setting the process's latch.
                                160                 :  * We must check the flag (by calling ProcessWalRcvInterrupts) anytime the
                                161                 :  * latch has become set.  Operations that could block for a long time, such as
                                162                 :  * reading from a remote server, must pay attention to the latch too; see
                                163                 :  * libpqrcv_PQgetResult for example.
                                164                 :  */
                                165                 : void
 4832 heikki.linnakangas        166 GIC       45857 : ProcessWalRcvInterrupts(void)
                                167                 : {
                                168                 :     /*
                                169                 :      * Although walreceiver interrupt handling doesn't use the same scheme as
                                170                 :      * regular backends, call CHECK_FOR_INTERRUPTS() to make sure we receive
                                171                 :      * any incoming signals on Win32, and also to make sure we process any
                                172                 :      * barrier events.
                                173                 :      */
                                174           45857 :     CHECK_FOR_INTERRUPTS();
                                175                 : 
  878 fujii                     176           45857 :     if (ShutdownRequestPending)
                                177                 :     {
 4832 heikki.linnakangas        178              56 :         ereport(FATAL,
                                179                 :                 (errcode(ERRCODE_ADMIN_SHUTDOWN),
                                180                 :                  errmsg("terminating walreceiver process due to administrator command")));
                                181                 :     }
 4832 heikki.linnakangas        182 CBC       45801 : }
                                183                 : 
                                184                 : 
                                185                 : /* Main entry point for walreceiver process */
                                186                 : void
 4827 heikki.linnakangas        187 GIC         183 : WalReceiverMain(void)
                                188                 : {
                                189                 :     char        conninfo[MAXCONNINFO];
 2475 alvherre                  190 ECB             :     char       *tmp_conninfo;
                                191                 :     char        slotname[NAMEDATALEN];
 1181 peter                     192                 :     bool        is_temp_slot;
                                193                 :     XLogRecPtr  startpoint;
 3769 heikki.linnakangas        194                 :     TimeLineID  startpointTLI;
                                195                 :     TimeLineID  primaryTLI;
                                196                 :     bool        first_stream;
 2742 rhaas                     197 GIC         183 :     WalRcvData *walrcv = WalRcv;
                                198                 :     TimestampTz now;
                                199                 :     char       *err;
 1835 fujii                     200             183 :     char       *sender_host = NULL;
 1835 fujii                     201 CBC         183 :     int         sender_port = 0;
                                202                 : 
                                203                 :     /*
                                204                 :      * WalRcv should be set up already (if we are a backend, we inherit this
                                205                 :      * by fork() or EXEC_BACKEND mechanism from the postmaster).
                                206                 :      */
 4820 heikki.linnakangas        207 GIC         183 :     Assert(walrcv != NULL);
                                208                 : 
 4820 heikki.linnakangas        209 ECB             :     /*
                                210                 :      * Mark walreceiver as running in shared memory.
                                211                 :      *
 4790 bruce                     212                 :      * Do this as early as possible, so that if we fail later on, we'll set
                                213                 :      * state to STOPPED. If we die before this, the startup process will keep
                                214                 :      * waiting for us to start up, until it times out.
                                215                 :      */
 4820 heikki.linnakangas        216 GIC         183 :     SpinLockAcquire(&walrcv->mutex);
                                217             183 :     Assert(walrcv->pid == 0);
 4790 bruce                     218             183 :     switch (walrcv->walRcvState)
 4820 heikki.linnakangas        219 ECB             :     {
 4820 heikki.linnakangas        220 UIC           0 :         case WALRCV_STOPPING:
                                221                 :             /* If we've already been requested to stop, don't start up. */
                                222               0 :             walrcv->walRcvState = WALRCV_STOPPED;
                                223                 :             /* fall through */
                                224                 : 
 4820 heikki.linnakangas        225 GIC          10 :         case WALRCV_STOPPED:
                                226              10 :             SpinLockRelease(&walrcv->mutex);
  758 tmunro                    227              10 :             ConditionVariableBroadcast(&walrcv->walRcvStoppedCV);
 4820 heikki.linnakangas        228 CBC          10 :             proc_exit(1);
 4820 heikki.linnakangas        229 ECB             :             break;
                                230                 : 
 4820 heikki.linnakangas        231 GIC         173 :         case WALRCV_STARTING:
 4820 heikki.linnakangas        232 EUB             :             /* The usual case */
 4820 heikki.linnakangas        233 GIC         173 :             break;
 4820 heikki.linnakangas        234 EUB             : 
 3769 heikki.linnakangas        235 UIC           0 :         case WALRCV_WAITING:
                                236                 :         case WALRCV_STREAMING:
 3769 heikki.linnakangas        237 ECB             :         case WALRCV_RESTARTING:
                                238                 :         default:
 4820                           239                 :             /* Shouldn't happen */
 2014 alvherre                  240 LBC           0 :             SpinLockRelease(&walrcv->mutex);
 4820 heikki.linnakangas        241 UIC           0 :             elog(PANIC, "walreceiver still running according to shared memory state");
                                242                 :     }
 4820 heikki.linnakangas        243 ECB             :     /* Advertise our PID so that the startup process can kill us */
 4820 heikki.linnakangas        244 GIC         173 :     walrcv->pid = MyProcPid;
 3769 heikki.linnakangas        245 CBC         173 :     walrcv->walRcvState = WALRCV_STREAMING;
                                246                 : 
 4820 heikki.linnakangas        247 EUB             :     /* Fetch information required to start streaming */
 2473 alvherre                  248 GIC         173 :     walrcv->ready_to_display = false;
 4820 heikki.linnakangas        249             173 :     strlcpy(conninfo, (char *) walrcv->conninfo, MAXCONNINFO);
 3355 rhaas                     250             173 :     strlcpy(slotname, (char *) walrcv->slotname, NAMEDATALEN);
 1181 peter                     251             173 :     is_temp_slot = walrcv->is_temp_slot;
 4422 heikki.linnakangas        252 GBC         173 :     startpoint = walrcv->receiveStart;
 3769                           253             173 :     startpointTLI = walrcv->receiveStartTLI;
                                254                 : 
                                255                 :     /*
 1108 alvherre                  256 ECB             :      * At most one of is_temp_slot and slotname can be set; otherwise,
                                257                 :      * RequestXLogStreaming messed up.
                                258                 :      */
 1108 alvherre                  259 GIC         173 :     Assert(!is_temp_slot || (slotname[0] == '\0'));
 1108 alvherre                  260 ECB             : 
 4117 simon                     261                 :     /* Initialise to a sanish value */
   73 tgl                       262 GNC         173 :     now = GetCurrentTimestamp();
 2014 alvherre                  263 CBC         173 :     walrcv->lastMsgSendTime =
  152 tmunro                    264             173 :         walrcv->lastMsgReceiptTime = walrcv->latestWalEndTime = now;
 4117 simon                     265 ECB             : 
 2014 tgl                       266                 :     /* Report the latch to use to awaken this process */
 2014 tgl                       267 GIC         173 :     walrcv->latch = &MyProc->procLatch;
                                268                 : 
 4820 heikki.linnakangas        269             173 :     SpinLockRelease(&walrcv->mutex);
                                270                 : 
  780 fujii                     271             173 :     pg_atomic_write_u64(&WalRcv->writtenUpto, 0);
 1096 tmunro                    272 ECB             : 
                                273                 :     /* Arrange to clean up at walreceiver exit */
  520 rhaas                     274 GIC         173 :     on_shmem_exit(WalRcvDie, PointerGetDatum(&startpointTLI));
 4832 heikki.linnakangas        275 ECB             : 
                                276                 :     /* Properly accept or ignore signals the postmaster might send us */
  878 fujii                     277 CBC         173 :     pqsignal(SIGHUP, SignalHandlerForConfigReload); /* set flag to read config
                                278                 :                                                      * file */
 4832 heikki.linnakangas        279 GIC         173 :     pqsignal(SIGINT, SIG_IGN);
  878 fujii                     280 CBC         173 :     pqsignal(SIGTERM, SignalHandlerForShutdownRequest); /* request shutdown */
                                281                 :     /* SIGQUIT handler was already set up by InitPostmasterChild */
 4832 heikki.linnakangas        282             173 :     pqsignal(SIGALRM, SIG_IGN);
 4832 heikki.linnakangas        283 GIC         173 :     pqsignal(SIGPIPE, SIG_IGN);
 1231 rhaas                     284 CBC         173 :     pqsignal(SIGUSR1, procsignal_sigusr1_handler);
 4832 heikki.linnakangas        285 GIC         173 :     pqsignal(SIGUSR2, SIG_IGN);
                                286                 : 
 4832 heikki.linnakangas        287 ECB             :     /* Reset some signals that are accepted by postmaster but not here */
 4832 heikki.linnakangas        288 GIC         173 :     pqsignal(SIGCHLD, SIG_DFL);
                                289                 : 
 4820 heikki.linnakangas        290 ECB             :     /* Load the libpq-specific functions */
 4820 heikki.linnakangas        291 GIC         173 :     load_file("libpqwalreceiver", false);
 2321 peter_e                   292 CBC         173 :     if (WalReceiverFunctions == NULL)
 4820 heikki.linnakangas        293 LBC           0 :         elog(ERROR, "libpqwalreceiver didn't initialize correctly");
                                294                 : 
 4832 heikki.linnakangas        295 ECB             :     /* Unblock signals (they were blocked when the postmaster forked us) */
   65 tmunro                    296 GNC         173 :     sigprocmask(SIG_SETMASK, &UnBlockSig, NULL);
 4832 heikki.linnakangas        297 ECB             : 
 4827                           298                 :     /* Establish the connection to the primary for XLOG streaming */
   10 rhaas                     299 GNC         173 :     wrconn = walrcv_connect(conninfo, false, false,
                                300                 :                             cluster_name[0] ? cluster_name : "walreceiver",
  662 tgl                       301 ECB             :                             &err);
 2271 peter_e                   302 GIC         173 :     if (!wrconn)
                                303              64 :         ereport(ERROR,
  662 tgl                       304 ECB             :                 (errcode(ERRCODE_CONNECTION_FAILURE),
                                305                 :                  errmsg("could not connect to the primary server: %s", err)));
 4832 heikki.linnakangas        306 EUB             : 
                                307                 :     /*
                                308                 :      * Save user-visible connection string.  This clobbers the original
 1835 fujii                     309 ECB             :      * conninfo, for security. Also save host and port of the sender server
                                310                 :      * this walreceiver is connected to.
                                311                 :      */
 2321 peter_e                   312 CBC         109 :     tmp_conninfo = walrcv_get_conninfo(wrconn);
 1835 fujii                     313 GIC         109 :     walrcv_get_senderinfo(wrconn, &sender_host, &sender_port);
 2475 alvherre                  314             109 :     SpinLockAcquire(&walrcv->mutex);
 2475 alvherre                  315 CBC         109 :     memset(walrcv->conninfo, 0, MAXCONNINFO);
                                316             109 :     if (tmp_conninfo)
 2475 alvherre                  317 GIC         109 :         strlcpy((char *) walrcv->conninfo, tmp_conninfo, MAXCONNINFO);
                                318                 : 
 1835 fujii                     319             109 :     memset(walrcv->sender_host, 0, NI_MAXHOST);
                                320             109 :     if (sender_host)
                                321             109 :         strlcpy((char *) walrcv->sender_host, sender_host, NI_MAXHOST);
                                322                 : 
                                323             109 :     walrcv->sender_port = sender_port;
 2475 alvherre                  324             109 :     walrcv->ready_to_display = true;
 2475 alvherre                  325 CBC         109 :     SpinLockRelease(&walrcv->mutex);
 2475 alvherre                  326 ECB             : 
 2014 alvherre                  327 CBC         109 :     if (tmp_conninfo)
                                328             109 :         pfree(tmp_conninfo);
 2014 alvherre                  329 ECB             : 
 1835 fujii                     330 CBC         109 :     if (sender_host)
 1835 fujii                     331 GIC         109 :         pfree(sender_host);
 1835 fujii                     332 ECB             : 
 3769 heikki.linnakangas        333 CBC         109 :     first_stream = true;
 4832 heikki.linnakangas        334 ECB             :     for (;;)
 4832 heikki.linnakangas        335 UIC           0 :     {
 2321 peter_e                   336 ECB             :         char       *primary_sysid;
                                337                 :         char        standby_sysid[32];
 2271                           338                 :         WalRcvStreamOptions options;
                                339                 : 
 4832 heikki.linnakangas        340                 :         /*
 3769                           341                 :          * Check that we're connected to a valid server using the
                                342                 :          * IDENTIFY_SYSTEM replication command.
 4832                           343                 :          */
 1486 peter                     344 CBC         109 :         primary_sysid = walrcv_identify_system(wrconn, &primaryTLI);
                                345                 : 
 2321 peter_e                   346             109 :         snprintf(standby_sysid, sizeof(standby_sysid), UINT64_FORMAT,
                                347                 :                  GetSystemIdentifier());
 2321 peter_e                   348 GBC         109 :         if (strcmp(primary_sysid, standby_sysid) != 0)
                                349                 :         {
 2321 peter_e                   350 UIC           0 :             ereport(ERROR,
                                351                 :                     (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
                                352                 :                      errmsg("database system identifier differs between the primary and standby"),
                                353                 :                      errdetail("The primary's identifier is %s, the standby's identifier is %s.",
                                354                 :                                primary_sysid, standby_sysid)));
                                355                 :         }
                                356                 : 
 4832 heikki.linnakangas        357 ECB             :         /*
                                358                 :          * Confirm that the current timeline of the primary is the same or
 3769                           359                 :          * ahead of ours.
                                360                 :          */
 3769 heikki.linnakangas        361 CBC         109 :         if (primaryTLI < startpointTLI)
 3769 heikki.linnakangas        362 UIC           0 :             ereport(ERROR,
  662 tgl                       363 EUB             :                     (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
                                364                 :                      errmsg("highest timeline %u of the primary is behind recovery timeline %u",
                                365                 :                             primaryTLI, startpointTLI)));
                                366                 : 
                                367                 :         /*
                                368                 :          * Get any missing history files. We do this always, even when we're
                                369                 :          * not interested in that timeline, so that if we're promoted to
                                370                 :          * become the primary later on, we don't select the same timeline that
                                371                 :          * was already used in the current primary. This isn't bullet-proof -
                                372                 :          * you'll need some external software to manage your cluster if you
                                373                 :          * need to ensure that a unique timeline id is chosen in every case,
 3602 bruce                     374 ECB             :          * but let's avoid the confusion of timeline id collisions where we
 3602 bruce                     375 EUB             :          * can.
                                376                 :          */
 3748 heikki.linnakangas        377 GIC         109 :         WalRcvFetchTimeLineHistoryFiles(startpointTLI, primaryTLI);
                                378                 : 
                                379                 :         /*
                                380                 :          * Create temporary replication slot if requested, and update slot
                                381                 :          * name in shared memory.  (Note the slot name cannot already be set
                                382                 :          * in this case.)
                                383                 :          */
 1108 alvherre                  384             109 :         if (is_temp_slot)
                                385                 :         {
 1108 alvherre                  386 UIC           0 :             snprintf(slotname, sizeof(slotname),
                                387                 :                      "pg_walreceiver_%lld",
                                388               0 :                      (long long int) walrcv_get_backend_pid(wrconn));
                                389                 : 
  634 akapila                   390 LBC           0 :             walrcv_create_slot(wrconn, slotname, true, false, 0, NULL);
                                391                 : 
 1108 alvherre                  392 UIC           0 :             SpinLockAcquire(&walrcv->mutex);
                                393               0 :             strlcpy(walrcv->slotname, slotname, NAMEDATALEN);
                                394               0 :             SpinLockRelease(&walrcv->mutex);
                                395                 :         }
                                396                 : 
 3769 heikki.linnakangas        397 ECB             :         /*
                                398                 :          * Start streaming.
 3769 heikki.linnakangas        399 EUB             :          *
                                400                 :          * We'll try to start at the requested starting point and timeline,
                                401                 :          * even if it's different from the server's latest timeline. In case
                                402                 :          * we've already reached the end of the old timeline, the server will
                                403                 :          * finish the streaming immediately, and we will go back to await
                                404                 :          * orders from the startup process. If recovery_target_timeline is
 2362 rhaas                     405                 :          * 'latest', the startup process will scan pg_wal and find the new
 3769 heikki.linnakangas        406                 :          * history file, bump recovery target timeline, and ask us to restart
                                407                 :          * on the new timeline.
                                408                 :          */
 2271 peter_e                   409 GIC         109 :         options.logical = false;
                                410             109 :         options.startpoint = startpoint;
                                411             109 :         options.slotname = slotname[0] != '\0' ? slotname : NULL;
                                412             109 :         options.proto.physical.startpointTLI = startpointTLI;
                                413             109 :         if (walrcv_startstreaming(wrconn, &options))
                                414                 :         {
 3769 heikki.linnakangas        415             109 :             if (first_stream)
                                416             109 :                 ereport(LOG,
                                417                 :                         (errmsg("started streaming WAL from primary at %X/%X on timeline %u",
                                418                 :                                 LSN_FORMAT_ARGS(startpoint), startpointTLI)));
                                419                 :             else
 3769 heikki.linnakangas        420 UIC           0 :                 ereport(LOG,
                                421                 :                         (errmsg("restarted WAL streaming at %X/%X on timeline %u",
  775 peter                     422 ECB             :                                 LSN_FORMAT_ARGS(startpoint), startpointTLI)));
 3769 heikki.linnakangas        423 CBC         109 :             first_stream = false;
 3769 heikki.linnakangas        424 ECB             : 
                                425                 :             /* Initialize LogstreamResult and buffers for processing messages */
 3762 heikki.linnakangas        426 CBC         109 :             LogstreamResult.Write = LogstreamResult.Flush = GetXLogReplayRecPtr(NULL);
 3769 heikki.linnakangas        427 GIC         109 :             initStringInfo(&reply_message);
 3769 heikki.linnakangas        428 CBC         109 :             initStringInfo(&incoming_message);
 3769 heikki.linnakangas        429 ECB             : 
                                430                 :             /* Initialize nap wakeup times. */
  152 tmunro                    431 GNC         109 :             now = GetCurrentTimestamp();
                                432             545 :             for (int i = 0; i < NUM_WALRCV_WAKEUPS; ++i)
                                433             436 :                 WalRcvComputeNextWakeup(i, now);
                                434                 : 
                                435                 :             /* Send initial reply/feedback messages. */
  143                           436             109 :             XLogWalRcvSendReply(true, false);
                                437             109 :             XLogWalRcvSendHSFeedback(true);
  143 tmunro                    438 EUB             : 
                                439                 :             /* Loop until end-of-streaming or error */
                                440                 :             for (;;)
 3832 heikki.linnakangas        441 CBC       32296 :             {
                                442                 :                 char       *buf;
                                443                 :                 int         len;
 2567 rhaas                     444           32405 :                 bool        endofwal = false;
 2551 tgl                       445           32405 :                 pgsocket    wait_fd = PGINVALID_SOCKET;
 2567 rhaas                     446 ECB             :                 int         rc;
                                447                 :                 TimestampTz nextWakeup;
                                448                 :                 long        nap;
                                449                 : 
                                450                 :                 /*
 3769 heikki.linnakangas        451                 :                  * Exit walreceiver if we're not in recovery. This should not
                                452                 :                  * happen, but cross-check the status here.
                                453                 :                  */
 3769 heikki.linnakangas        454 GIC       32405 :                 if (!RecoveryInProgress())
 3769 heikki.linnakangas        455 UIC           0 :                     ereport(FATAL,
  662 tgl                       456 ECB             :                             (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
                                457                 :                              errmsg("cannot continue WAL streaming, recovery has already ended")));
                                458                 : 
                                459                 :                 /* Process any requests or signals received recently */
 3769 heikki.linnakangas        460 GIC       32405 :                 ProcessWalRcvInterrupts();
 3769 heikki.linnakangas        461 ECB             : 
  878 fujii                     462 GIC       32405 :                 if (ConfigReloadPending)
                                463                 :                 {
  878 fujii                     464 CBC          14 :                     ConfigReloadPending = false;
 3769 heikki.linnakangas        465              14 :                     ProcessConfigFile(PGC_SIGHUP);
                                466                 :                     /* recompute wakeup times */
  152 tmunro                    467 GNC          14 :                     now = GetCurrentTimestamp();
                                468              70 :                     for (int i = 0; i < NUM_WALRCV_WAKEUPS; ++i)
                                469              56 :                         WalRcvComputeNextWakeup(i, now);
 3716 simon                     470 GIC          14 :                     XLogWalRcvSendHSFeedback(true);
                                471                 :                 }
                                472                 : 
                                473                 :                 /* See if we can read data immediately */
 2321 peter_e                   474           32405 :                 len = walrcv_receive(wrconn, &buf, &wait_fd);
 3769 heikki.linnakangas        475           32375 :                 if (len != 0)
                                476                 :                 {
                                477                 :                     /*
 3769 heikki.linnakangas        478 ECB             :                      * Process the received data, and any subsequent data we
 3769 heikki.linnakangas        479 EUB             :                      * can read without blocking.
                                480                 :                      */
                                481                 :                     for (;;)
                                482                 :                     {
 3769 heikki.linnakangas        483 GIC       40805 :                         if (len > 0)
 3769 heikki.linnakangas        484 ECB             :                         {
                                485                 :                             /*
                                486                 :                              * Something was received from primary, so adjust
                                487                 :                              * the ping and terminate wakeup times.
 3602 bruce                     488                 :                              */
   73 tgl                       489 GNC       21673 :                             now = GetCurrentTimestamp();
  152 tmunro                    490           21673 :                             WalRcvComputeNextWakeup(WALRCV_WAKEUP_TERMINATE,
                                491                 :                                                     now);
                                492           21673 :                             WalRcvComputeNextWakeup(WALRCV_WAKEUP_PING, now);
  520 rhaas                     493 CBC       21673 :                             XLogWalRcvProcessMsg(buf[0], &buf[1], len - 1,
  520 rhaas                     494 ECB             :                                                  startpointTLI);
 3769 heikki.linnakangas        495                 :                         }
 3769 heikki.linnakangas        496 CBC       19132 :                         else if (len == 0)
 3769 heikki.linnakangas        497 GIC       19098 :                             break;
                                498              34 :                         else if (len < 0)
                                499                 :                         {
 3769 heikki.linnakangas        500 CBC          34 :                             ereport(LOG,
 3769 heikki.linnakangas        501 ECB             :                                     (errmsg("replication terminated by primary server"),
                                502                 :                                      errdetail("End of WAL reached on timeline %u at %X/%X.",
                                503                 :                                                startpointTLI,
                                504                 :                                                LSN_FORMAT_ARGS(LogstreamResult.Write))));
 3769 heikki.linnakangas        505 GIC          34 :                             endofwal = true;
                                506              34 :                             break;
                                507                 :                         }
 2321 peter_e                   508           21673 :                         len = walrcv_receive(wrconn, &buf, &wait_fd);
 3769 heikki.linnakangas        509 ECB             :                     }
                                510                 : 
                                511                 :                     /* Let the primary know that we received some data. */
 3769 heikki.linnakangas        512 GIC       19132 :                     XLogWalRcvSendReply(false, false);
                                513                 : 
                                514                 :                     /*
 3769 heikki.linnakangas        515 ECB             :                      * If we've written some records, flush them to disk and
                                516                 :                      * let the startup process and primary server know about
                                517                 :                      * them.
                                518                 :                      */
  520 rhaas                     519 CBC       19132 :                     XLogWalRcvFlush(false, startpointTLI);
                                520                 :                 }
                                521                 : 
 2567 rhaas                     522 ECB             :                 /* Check if we need to exit the streaming loop. */
 2567 rhaas                     523 CBC       32374 :                 if (endofwal)
                                524              34 :                     break;
                                525                 : 
                                526                 :                 /* Find the soonest wakeup time, to limit our nap. */
   73 tgl                       527 GNC       32340 :                 nextWakeup = TIMESTAMP_INFINITY;
  152 tmunro                    528          161700 :                 for (int i = 0; i < NUM_WALRCV_WAKEUPS; ++i)
                                529          129360 :                     nextWakeup = Min(wakeup[i], nextWakeup);
                                530                 : 
                                531                 :                 /* Calculate the nap time, clamping as necessary. */
   73 tgl                       532           32340 :                 now = GetCurrentTimestamp();
                                533           32340 :                 nap = TimestampDifferenceMilliseconds(now, nextWakeup);
                                534                 : 
 2567 rhaas                     535 ECB             :                 /*
                                536                 :                  * Ideally we would reuse a WaitEventSet object repeatedly
                                537                 :                  * here to avoid the overheads of WaitLatchOrSocket on epoll
                                538                 :                  * systems, but we can't be sure that libpq (or any other
                                539                 :                  * walreceiver implementation) has the same socket (even if
                                540                 :                  * the fd is the same number, it may have been closed and
                                541                 :                  * reopened since the last time).  In future, if there is a
                                542                 :                  * function for removing sockets from WaitEventSet, then we
                                543                 :                  * could add and remove just the socket each time, potentially
                                544                 :                  * avoiding some system calls.
                                545                 :                  */
 2567 rhaas                     546 GIC       32340 :                 Assert(wait_fd != PGINVALID_SOCKET);
  878 fujii                     547 CBC       32340 :                 rc = WaitLatchOrSocket(MyLatch,
                                548                 :                                        WL_EXIT_ON_PM_DEATH | WL_SOCKET_READABLE |
                                549                 :                                        WL_TIMEOUT | WL_LATCH_SET,
                                550                 :                                        wait_fd,
                                551                 :                                        nap,
                                552                 :                                        WAIT_EVENT_WAL_RECEIVER_MAIN);
 2567 rhaas                     553 GIC       32340 :                 if (rc & WL_LATCH_SET)
 2567 rhaas                     554 ECB             :                 {
  878 fujii                     555 GIC       12343 :                     ResetLatch(MyLatch);
 1441 tgl                       556           12343 :                     ProcessWalRcvInterrupts();
                                557                 : 
 2567 rhaas                     558 CBC       12299 :                     if (walrcv->force_reply)
 2567 rhaas                     559 ECB             :                     {
                                560                 :                         /*
                                561                 :                          * The recovery process has asked us to send apply
                                562                 :                          * feedback now.  Make sure the flag is really set to
 2495                           563                 :                          * false in shared memory before sending the reply, so
                                564                 :                          * we don't miss a new request for a reply.
                                565                 :                          */
 2567 rhaas                     566 GIC       12268 :                         walrcv->force_reply = false;
 2567 rhaas                     567 CBC       12268 :                         pg_memory_barrier();
                                568           12268 :                         XLogWalRcvSendReply(true, false);
                                569                 :                     }
                                570                 :                 }
 2567 rhaas                     571 GIC       32296 :                 if (rc & WL_TIMEOUT)
                                572                 :                 {
                                573                 :                     /*
                                574                 :                      * We didn't receive anything new. If we haven't heard
                                575                 :                      * anything from the server for more than
                                576                 :                      * wal_receiver_timeout / 2, ping the server. Also, if
                                577                 :                      * it's been longer than wal_receiver_status_interval
                                578                 :                      * since the last update we sent, send a status update to
                                579                 :                      * the primary anyway, to report any progress in applying
                                580                 :                      * WAL.
 3769 heikki.linnakangas        581 ECB             :                      */
 3602 bruce                     582 CBC          25 :                     bool        requestReply = false;
                                583                 : 
                                584                 :                     /*
                                585                 :                      * Check if time since last receive from primary has
                                586                 :                      * reached the configured limit.
                                587                 :                      */
   73 tgl                       588 GNC          25 :                     now = GetCurrentTimestamp();
  152 tmunro                    589              25 :                     if (now >= wakeup[WALRCV_WAKEUP_TERMINATE])
  152 tmunro                    590 UNC           0 :                         ereport(ERROR,
                                591                 :                                 (errcode(ERRCODE_CONNECTION_FAILURE),
                                592                 :                                  errmsg("terminating walreceiver due to timeout")));
                                593                 : 
                                594                 :                     /*
                                595                 :                      * If we didn't receive anything new for half of receiver
                                596                 :                      * replication timeout, then ping the server.
                                597                 :                      */
  152 tmunro                    598 GNC          25 :                     if (now >= wakeup[WALRCV_WAKEUP_PING])
                                599                 :                     {
  152 tmunro                    600 UNC           0 :                         requestReply = true;
   73 tgl                       601               0 :                         wakeup[WALRCV_WAKEUP_PING] = TIMESTAMP_INFINITY;
                                602                 :                     }
 3769 heikki.linnakangas        603 ECB             : 
 3769 heikki.linnakangas        604 GIC          25 :                     XLogWalRcvSendReply(requestReply, requestReply);
 3716 simon                     605              25 :                     XLogWalRcvSendHSFeedback(false);
                                606                 :                 }
                                607                 :             }
                                608                 : 
 4827 heikki.linnakangas        609 ECB             :             /*
 3769                           610                 :              * The backend finished streaming. Exit streaming COPY-mode from
 3769 heikki.linnakangas        611 EUB             :              * our side, too.
                                612                 :              */
 2321 peter_e                   613 GIC          34 :             walrcv_endstreaming(wrconn, &primaryTLI);
                                614                 : 
                                615                 :             /*
                                616                 :              * If the server had switched to a new timeline that we didn't
                                617                 :              * know about when we began streaming, fetch its timeline history
                                618                 :              * file now.
 3733 heikki.linnakangas        619 ECB             :              */
 3733 heikki.linnakangas        620 GIC          12 :             WalRcvFetchTimeLineHistoryFiles(startpointTLI, primaryTLI);
 3769 heikki.linnakangas        621 EUB             :         }
                                622                 :         else
 3769 heikki.linnakangas        623 UIC           0 :             ereport(LOG,
                                624                 :                     (errmsg("primary server contains no more WAL on requested timeline %u",
 3769 heikki.linnakangas        625 ECB             :                             startpointTLI)));
                                626                 : 
                                627                 :         /*
                                628                 :          * End of WAL reached on the requested timeline. Close the last
                                629                 :          * segment, and await for new orders from the startup process.
                                630                 :          */
 3769 heikki.linnakangas        631 GIC          12 :         if (recvFile >= 0)
                                632                 :         {
                                633                 :             char        xlogfname[MAXFNAMELEN];
 3896 simon                     634 ECB             : 
  520 rhaas                     635 GIC          11 :             XLogWalRcvFlush(false, startpointTLI);
 1223 michael                   636              11 :             XLogFileName(xlogfname, recvFileTLI, recvSegNo, wal_segment_size);
 3769 heikki.linnakangas        637              11 :             if (close(recvFile) != 0)
 3769 heikki.linnakangas        638 UIC           0 :                 ereport(PANIC,
                                639                 :                         (errcode_for_file_access(),
                                640                 :                          errmsg("could not close WAL segment %s: %m",
 1223 michael                   641 ECB             :                                 xlogfname)));
                                642                 : 
                                643                 :             /*
 3896 simon                     644 EUB             :              * Create .done file forcibly to prevent the streamed segment from
                                645                 :              * being archived later.
                                646                 :              */
 2886 heikki.linnakangas        647 GIC          11 :             if (XLogArchiveMode != ARCHIVE_MODE_ALWAYS)
                                648              11 :                 XLogArchiveForceDone(xlogfname);
                                649                 :             else
  582 alvherre                  650 UIC           0 :                 XLogArchiveNotify(xlogfname);
                                651                 :         }
 3769 heikki.linnakangas        652 CBC          12 :         recvFile = -1;
                                653                 : 
 3769 heikki.linnakangas        654 GIC          12 :         elog(DEBUG1, "walreceiver ended streaming and awaits new instructions");
                                655              12 :         WalRcvWaitForStartPosition(&startpoint, &startpointTLI);
 3769 heikki.linnakangas        656 ECB             :     }
                                657                 :     /* not reached */
                                658                 : }
 3769 heikki.linnakangas        659 EUB             : 
                                660                 : /*
                                661                 :  * Wait for startup process to set receiveStart and receiveStartTLI.
                                662                 :  */
                                663                 : static void
 3769 heikki.linnakangas        664 GIC          12 : WalRcvWaitForStartPosition(XLogRecPtr *startpoint, TimeLineID *startpointTLI)
                                665                 : {
 2742 rhaas                     666              12 :     WalRcvData *walrcv = WalRcv;
                                667                 :     int         state;
 3769 heikki.linnakangas        668 ECB             : 
 3769 heikki.linnakangas        669 CBC          12 :     SpinLockAcquire(&walrcv->mutex);
 3769 heikki.linnakangas        670 GIC          12 :     state = walrcv->walRcvState;
 3769 heikki.linnakangas        671 GBC          12 :     if (state != WALRCV_STREAMING)
                                672                 :     {
 3769 heikki.linnakangas        673 LBC           0 :         SpinLockRelease(&walrcv->mutex);
 3769 heikki.linnakangas        674 UIC           0 :         if (state == WALRCV_STOPPING)
 3769 heikki.linnakangas        675 LBC           0 :             proc_exit(0);
 4441 heikki.linnakangas        676 ECB             :         else
 3769 heikki.linnakangas        677 UIC           0 :             elog(FATAL, "unexpected walreceiver state");
                                678                 :     }
 3769 heikki.linnakangas        679 GIC          12 :     walrcv->walRcvState = WALRCV_WAITING;
                                680              12 :     walrcv->receiveStart = InvalidXLogRecPtr;
                                681              12 :     walrcv->receiveStartTLI = 0;
                                682              12 :     SpinLockRelease(&walrcv->mutex);
                                683                 : 
 1124 peter                     684              12 :     set_ps_display("idle");
 3769 heikki.linnakangas        685 ECB             : 
                                686                 :     /*
                                687                 :      * nudge startup process to notice that we've stopped streaming and are
                                688                 :      * now waiting for instructions.
                                689                 :      */
 3769 heikki.linnakangas        690 CBC          12 :     WakeupRecovery();
 3769 heikki.linnakangas        691 ECB             :     for (;;)
                                692                 :     {
  878 fujii                     693 GIC          22 :         ResetLatch(MyLatch);
 3769 heikki.linnakangas        694 EUB             : 
 3769 heikki.linnakangas        695 GBC          22 :         ProcessWalRcvInterrupts();
 3769 heikki.linnakangas        696 EUB             : 
 3769 heikki.linnakangas        697 GIC          10 :         SpinLockAcquire(&walrcv->mutex);
 3769 heikki.linnakangas        698 GBC          10 :         Assert(walrcv->walRcvState == WALRCV_RESTARTING ||
                                699                 :                walrcv->walRcvState == WALRCV_WAITING ||
 3769 heikki.linnakangas        700 ECB             :                walrcv->walRcvState == WALRCV_STOPPING);
 3769 heikki.linnakangas        701 CBC          10 :         if (walrcv->walRcvState == WALRCV_RESTARTING)
 3769 heikki.linnakangas        702 ECB             :         {
 1108 alvherre                  703                 :             /*
                                704                 :              * No need to handle changes in primary_conninfo or
  326 michael                   705                 :              * primary_slot_name here. Startup process will signal us to
                                706                 :              * terminate in case those change.
                                707                 :              */
 3769 heikki.linnakangas        708 UIC           0 :             *startpoint = walrcv->receiveStart;
                                709               0 :             *startpointTLI = walrcv->receiveStartTLI;
                                710               0 :             walrcv->walRcvState = WALRCV_STREAMING;
 3769 heikki.linnakangas        711 LBC           0 :             SpinLockRelease(&walrcv->mutex);
 3769 heikki.linnakangas        712 UIC           0 :             break;
                                713                 :         }
 3769 heikki.linnakangas        714 CBC          10 :         if (walrcv->walRcvState == WALRCV_STOPPING)
                                715                 :         {
 4441 heikki.linnakangas        716 ECB             :             /*
                                717                 :              * We should've received SIGTERM if the startup process wants us
 3602 bruce                     718                 :              * to die, but might as well check it here too.
 3832 heikki.linnakangas        719                 :              */
 3769 heikki.linnakangas        720 UIC           0 :             SpinLockRelease(&walrcv->mutex);
                                721               0 :             exit(1);
 3769 heikki.linnakangas        722 ECB             :         }
 3769 heikki.linnakangas        723 GIC          10 :         SpinLockRelease(&walrcv->mutex);
                                724                 : 
  878 fujii                     725              10 :         (void) WaitLatch(MyLatch, WL_LATCH_SET | WL_EXIT_ON_PM_DEATH, 0,
                                726                 :                          WAIT_EVENT_WAL_RECEIVER_WAIT_START);
                                727                 :     }
                                728                 : 
 3769 heikki.linnakangas        729 UBC           0 :     if (update_process_title)
 3769 heikki.linnakangas        730 EUB             :     {
                                731                 :         char        activitymsg[50];
 3832                           732                 : 
 3769 heikki.linnakangas        733 UBC           0 :         snprintf(activitymsg, sizeof(activitymsg), "restarting at %X/%X",
  775 peter                     734 UIC           0 :                  LSN_FORMAT_ARGS(*startpoint));
 1124 peter                     735 LBC           0 :         set_ps_display(activitymsg);
                                736                 :     }
 3769 heikki.linnakangas        737 UIC           0 : }
                                738                 : 
                                739                 : /*
                                740                 :  * Fetch any missing timeline history files between 'first' and 'last'
 3769 heikki.linnakangas        741 EUB             :  * (inclusive) from the server.
                                742                 :  */
                                743                 : static void
 3769 heikki.linnakangas        744 CBC         121 : WalRcvFetchTimeLineHistoryFiles(TimeLineID first, TimeLineID last)
                                745                 : {
 3602 bruce                     746 ECB             :     TimeLineID  tli;
                                747                 : 
 3769 heikki.linnakangas        748 GIC         265 :     for (tli = first; tli <= last; tli++)
                                749                 :     {
 3748 heikki.linnakangas        750 EUB             :         /* there's no history file for timeline 1 */
 3748 heikki.linnakangas        751 GIC         144 :         if (tli != 1 && !existsTimeLineHistory(tli))
                                752                 :         {
                                753                 :             char       *fname;
 3769 heikki.linnakangas        754 EUB             :             char       *content;
                                755                 :             int         len;
                                756                 :             char        expectedfname[MAXFNAMELEN];
                                757                 : 
 3769 heikki.linnakangas        758 GBC          11 :             ereport(LOG,
                                759                 :                     (errmsg("fetching timeline history file for timeline %u from primary server",
                                760                 :                             tli)));
                                761                 : 
 2321 peter_e                   762 GIC          11 :             walrcv_readtimelinehistoryfile(wrconn, tli, &fname, &content, &len);
                                763                 : 
                                764                 :             /*
 1029 andres                    765 ECB             :              * Check that the filename on the primary matches what we
                                766                 :              * calculated ourselves. This is just a sanity check, it should
                                767                 :              * always match.
                                768                 :              */
 3769 heikki.linnakangas        769 CBC          11 :             TLHistoryFileName(expectedfname, tli);
 3769 heikki.linnakangas        770 GIC          11 :             if (strcmp(fname, expectedfname) != 0)
 3769 heikki.linnakangas        771 UIC           0 :                 ereport(ERROR,
 3769 heikki.linnakangas        772 ECB             :                         (errcode(ERRCODE_PROTOCOL_VIOLATION),
                                773                 :                          errmsg_internal("primary reported unexpected file name for timeline history file of timeline %u",
                                774                 :                                          tli)));
                                775                 : 
                                776                 :             /*
                                777                 :              * Write the file to pg_wal.
                                778                 :              */
 3769 heikki.linnakangas        779 CBC          11 :             writeTimeLineHistoryFile(tli, content, len);
                                780                 : 
                                781                 :             /*
                                782                 :              * Mark the streamed history file as ready for archiving if
  697 tgl                       783 ECB             :              * archive_mode is always.
                                784                 :              */
  922 fujii                     785 GIC          11 :             if (XLogArchiveMode != ARCHIVE_MODE_ALWAYS)
                                786              11 :                 XLogArchiveForceDone(fname);
                                787                 :             else
  582 alvherre                  788 UIC           0 :                 XLogArchiveNotify(fname);
                                789                 : 
 3769 heikki.linnakangas        790 CBC          11 :             pfree(fname);
                                791              11 :             pfree(content);
 4441 heikki.linnakangas        792 EUB             :         }
                                793                 :     }
 4832 heikki.linnakangas        794 GIC         121 : }
                                795                 : 
                                796                 : /*
                                797                 :  * Mark us as STOPPED in shared memory at exit.
                                798                 :  */
                                799                 : static void
 4820 heikki.linnakangas        800 CBC         173 : WalRcvDie(int code, Datum arg)
                                801                 : {
 2742 rhaas                     802 GIC         173 :     WalRcvData *walrcv = WalRcv;
  520                           803             173 :     TimeLineID *startpointTLI_p = (TimeLineID *) DatumGetPointer(arg);
                                804                 : 
                                805             173 :     Assert(*startpointTLI_p != 0);
 4832 heikki.linnakangas        806 ECB             : 
 4465                           807                 :     /* Ensure that all WAL records received are flushed to disk */
  520 rhaas                     808 GIC         173 :     XLogWalRcvFlush(true, *startpointTLI_p);
 4465 heikki.linnakangas        809 EUB             : 
                                810                 :     /* Mark ourselves inactive in shared memory */
 4832 heikki.linnakangas        811 CBC         173 :     SpinLockAcquire(&walrcv->mutex);
 3769                           812             173 :     Assert(walrcv->walRcvState == WALRCV_STREAMING ||
                                813                 :            walrcv->walRcvState == WALRCV_RESTARTING ||
                                814                 :            walrcv->walRcvState == WALRCV_STARTING ||
 3769 heikki.linnakangas        815 ECB             :            walrcv->walRcvState == WALRCV_WAITING ||
                                816                 :            walrcv->walRcvState == WALRCV_STOPPING);
 3769 heikki.linnakangas        817 GIC         173 :     Assert(walrcv->pid == MyProcPid);
 4820                           818             173 :     walrcv->walRcvState = WALRCV_STOPPED;
 4832                           819             173 :     walrcv->pid = 0;
 2473 alvherre                  820             173 :     walrcv->ready_to_display = false;
 2014 tgl                       821 CBC         173 :     walrcv->latch = NULL;
 4832 heikki.linnakangas        822 GIC         173 :     SpinLockRelease(&walrcv->mutex);
 4832 heikki.linnakangas        823 ECB             : 
  758 tmunro                    824 CBC         173 :     ConditionVariableBroadcast(&walrcv->walRcvStoppedCV);
                                825                 : 
 4820 heikki.linnakangas        826 ECB             :     /* Terminate the connection gracefully. */
 2321 peter_e                   827 GIC         173 :     if (wrconn != NULL)
                                828             109 :         walrcv_disconnect(wrconn);
 3769 heikki.linnakangas        829 ECB             : 
                                830                 :     /* Wake up the startup process to notice promptly that we're gone */
 3769 heikki.linnakangas        831 GIC         173 :     WakeupRecovery();
 4832 heikki.linnakangas        832 CBC         173 : }
 4832 heikki.linnakangas        833 ECB             : 
                                834                 : /*
                                835                 :  * Accept the message from XLOG stream, and process it.
                                836                 :  */
                                837                 : static void
  520 rhaas                     838 CBC       21673 : XLogWalRcvProcessMsg(unsigned char type, char *buf, Size len, TimeLineID tli)
 4813 heikki.linnakangas        839 ECB             : {
 3805                           840                 :     int         hdrlen;
                                841                 :     XLogRecPtr  dataStart;
                                842                 :     XLogRecPtr  walEnd;
 3602 bruce                     843                 :     TimestampTz sendTime;
                                844                 :     bool        replyRequested;
 3805 heikki.linnakangas        845                 : 
 3805 heikki.linnakangas        846 GIC       21673 :     resetStringInfo(&incoming_message);
                                847                 : 
 4813 heikki.linnakangas        848 CBC       21673 :     switch (type)
 4813 heikki.linnakangas        849 ECB             :     {
 4790 bruce                     850 GIC       21673 :         case 'w':               /* WAL records */
                                851                 :             {
 3805 heikki.linnakangas        852 ECB             :                 /* copy message to StringInfo */
 3805 heikki.linnakangas        853 CBC       21673 :                 hdrlen = sizeof(int64) + sizeof(int64) + sizeof(int64);
 3805 heikki.linnakangas        854 GIC       21673 :                 if (len < hdrlen)
 4790 bruce                     855 UIC           0 :                     ereport(ERROR,
                                856                 :                             (errcode(ERRCODE_PROTOCOL_VIOLATION),
                                857                 :                              errmsg_internal("invalid WAL message received from primary")));
 3805 heikki.linnakangas        858 GIC       21673 :                 appendBinaryStringInfo(&incoming_message, buf, hdrlen);
 3805 heikki.linnakangas        859 ECB             : 
                                860                 :                 /* read the fields */
 3805 heikki.linnakangas        861 GIC       21673 :                 dataStart = pq_getmsgint64(&incoming_message);
                                862           21673 :                 walEnd = pq_getmsgint64(&incoming_message);
 2236 tgl                       863           21673 :                 sendTime = pq_getmsgint64(&incoming_message);
 3805 heikki.linnakangas        864           21673 :                 ProcessWalSndrMessage(walEnd, sendTime);
                                865                 : 
                                866           21673 :                 buf += hdrlen;
 3805 heikki.linnakangas        867 CBC       21673 :                 len -= hdrlen;
  520 rhaas                     868 GIC       21673 :                 XLogWalRcvWrite(buf, len, dataStart, tli);
 4790 bruce                     869 CBC       21673 :                 break;
                                870                 :             }
 4117 simon                     871 LBC           0 :         case 'k':               /* Keepalive */
                                872                 :             {
                                873                 :                 /* copy message to StringInfo */
 3805 heikki.linnakangas        874               0 :                 hdrlen = sizeof(int64) + sizeof(int64) + sizeof(char);
                                875               0 :                 if (len != hdrlen)
 4117 simon                     876 UBC           0 :                     ereport(ERROR,
                                877                 :                             (errcode(ERRCODE_PROTOCOL_VIOLATION),
                                878                 :                              errmsg_internal("invalid keepalive message received from primary")));
 3805 heikki.linnakangas        879 LBC           0 :                 appendBinaryStringInfo(&incoming_message, buf, hdrlen);
                                880                 : 
                                881                 :                 /* read the fields */
                                882               0 :                 walEnd = pq_getmsgint64(&incoming_message);
 2236 tgl                       883               0 :                 sendTime = pq_getmsgint64(&incoming_message);
 3805 heikki.linnakangas        884               0 :                 replyRequested = pq_getmsgbyte(&incoming_message);
 3805 heikki.linnakangas        885 ECB             : 
 3805 heikki.linnakangas        886 UIC           0 :                 ProcessWalSndrMessage(walEnd, sendTime);
 3832 heikki.linnakangas        887 ECB             : 
                                888                 :                 /* If the primary requested a reply, send one immediately */
 3805 heikki.linnakangas        889 LBC           0 :                 if (replyRequested)
 3832                           890               0 :                     XLogWalRcvSendReply(true, false);
 4117 simon                     891 UIC           0 :                 break;
 4117 simon                     892 EUB             :             }
 4813 heikki.linnakangas        893 UIC           0 :         default:
                                894               0 :             ereport(ERROR,
 4813 heikki.linnakangas        895 EUB             :                     (errcode(ERRCODE_PROTOCOL_VIOLATION),
 4737 tgl                       896                 :                      errmsg_internal("invalid replication message type %d",
                                897                 :                                      type)));
                                898                 :     }
 4813 heikki.linnakangas        899 GIC       21673 : }
 4813 heikki.linnakangas        900 EUB             : 
                                901                 : /*
                                902                 :  * Write XLOG data to disk.
 4832                           903                 :  */
                                904                 : static void
  520 rhaas                     905 GBC       21673 : XLogWalRcvWrite(char *buf, Size nbytes, XLogRecPtr recptr, TimeLineID tli)
                                906                 : {
 4790 bruce                     907 EUB             :     int         startoff;
                                908                 :     int         byteswritten;
                                909                 : 
  520 rhaas                     910 GBC       21673 :     Assert(tli != 0);
  520 rhaas                     911 EUB             : 
 4832 heikki.linnakangas        912 GBC       43364 :     while (nbytes > 0)
                                913                 :     {
 4790 bruce                     914 EUB             :         int         segbytes;
 4832 heikki.linnakangas        915                 : 
                                916                 :         /* Close the current segment if it's completed */
  577 fujii                     917 GIC       21691 :         if (recvFile >= 0 && !XLByteInSeg(recptr, recvSegNo, wal_segment_size))
  520 rhaas                     918              18 :             XLogWalRcvClose(recptr, tli);
                                919                 : 
  577 fujii                     920 CBC       21691 :         if (recvFile < 0)
                                921                 :         {
                                922                 :             /* Create/use new log file */
 2028 andres                    923 GIC         142 :             XLByteToSeg(recptr, recvSegNo, wal_segment_size);
  520 rhaas                     924             142 :             recvFile = XLogFileInit(recvSegNo, tli);
                                925             142 :             recvFileTLI = tli;
 4832 heikki.linnakangas        926 ECB             :         }
                                927                 : 
                                928                 :         /* Calculate the start offset of the received logs */
 2028 andres                    929 GIC       21691 :         startoff = XLogSegmentOffset(recptr, wal_segment_size);
                                930                 : 
 2028 andres                    931 CBC       21691 :         if (startoff + nbytes > wal_segment_size)
 2028 andres                    932 GIC          18 :             segbytes = wal_segment_size - startoff;
 4832 heikki.linnakangas        933 ECB             :         else
 4832 heikki.linnakangas        934 GIC       21673 :             segbytes = nbytes;
                                935                 : 
                                936                 :         /* OK to write the logs */
                                937           21691 :         errno = 0;
 4832 heikki.linnakangas        938 ECB             : 
  192 tmunro                    939 CBC       21691 :         byteswritten = pg_pwrite(recvFile, buf, segbytes, (off_t) startoff);
 4832 heikki.linnakangas        940 GIC       21691 :         if (byteswritten <= 0)
 4832 heikki.linnakangas        941 ECB             :         {
                                942                 :             char        xlogfname[MAXFNAMELEN];
                                943                 :             int         save_errno;
 1223 michael                   944                 : 
 4832 heikki.linnakangas        945                 :             /* if write didn't set errno, assume no disk space */
 4832 heikki.linnakangas        946 LBC           0 :             if (errno == 0)
 4832 heikki.linnakangas        947 UIC           0 :                 errno = ENOSPC;
                                948                 : 
 1223 michael                   949               0 :             save_errno = errno;
 1223 michael                   950 LBC           0 :             XLogFileName(xlogfname, recvFileTLI, recvSegNo, wal_segment_size);
 1223 michael                   951 UIC           0 :             errno = save_errno;
 4832 heikki.linnakangas        952 LBC           0 :             ereport(PANIC,
 4832 heikki.linnakangas        953 ECB             :                     (errcode_for_file_access(),
                                954                 :                      errmsg("could not write to WAL segment %s "
                                955                 :                             "at offset %u, length %lu: %m",
                                956                 :                             xlogfname, startoff, (unsigned long) segbytes)));
                                957                 :         }
                                958                 : 
                                959                 :         /* Update state for write */
 3754 alvherre                  960 CBC       21691 :         recptr += byteswritten;
 4832 heikki.linnakangas        961 ECB             : 
 4832 heikki.linnakangas        962 GIC       21691 :         nbytes -= byteswritten;
                                963           21691 :         buf += byteswritten;
                                964                 : 
 4790 bruce                     965           21691 :         LogstreamResult.Write = recptr;
                                966                 :     }
 1096 tmunro                    967 EUB             : 
                                968                 :     /* Update shared-memory status */
 1096 tmunro                    969 GIC       21673 :     pg_atomic_write_u64(&WalRcv->writtenUpto, LogstreamResult.Write);
  577 fujii                     970 EUB             : 
                                971                 :     /*
                                972                 :      * Close the current segment if it's fully written up in the last cycle of
                                973                 :      * the loop, to create its archive notification file soon. Otherwise WAL
                                974                 :      * archiving of the segment will be delayed until any data in the next
                                975                 :      * segment is received and written.
                                976                 :      */
  577 fujii                     977 GIC       21673 :     if (recvFile >= 0 && !XLByteInSeg(recptr, recvSegNo, wal_segment_size))
  520 rhaas                     978              31 :         XLogWalRcvClose(recptr, tli);
 4832 heikki.linnakangas        979           21673 : }
                                980                 : 
 4435 rhaas                     981 ECB             : /*
                                982                 :  * Flush the log to disk.
                                983                 :  *
                                984                 :  * If we're in the midst of dying, it's unwise to do anything that might throw
                                985                 :  * an error, so we skip sending a reply in that case.
                                986                 :  */
                                987                 : static void
  520 rhaas                     988 GIC       19365 : XLogWalRcvFlush(bool dying, TimeLineID tli)
                                989                 : {
  520 rhaas                     990 CBC       19365 :     Assert(tli != 0);
                                991                 : 
 3754 alvherre                  992 GIC       19365 :     if (LogstreamResult.Flush < LogstreamResult.Write)
                                993                 :     {
 2742 rhaas                     994           18943 :         WalRcvData *walrcv = WalRcv;
                                995                 : 
  520                           996           18943 :         issue_xlog_fsync(recvFile, recvSegNo, tli);
                                997                 : 
 4832 heikki.linnakangas        998 CBC       18943 :         LogstreamResult.Flush = LogstreamResult.Write;
 4832 heikki.linnakangas        999 ECB             : 
                               1000                 :         /* Update shared-memory status */
 4832 heikki.linnakangas       1001 GIC       18943 :         SpinLockAcquire(&walrcv->mutex);
 1096 tmunro                   1002           18943 :         if (walrcv->flushedUpto < LogstreamResult.Flush)
                               1003                 :         {
                               1004           18943 :             walrcv->latestChunkStart = walrcv->flushedUpto;
                               1005           18943 :             walrcv->flushedUpto = LogstreamResult.Flush;
  520 rhaas                    1006           18943 :             walrcv->receivedTLI = tli;
                               1007                 :         }
 4832 heikki.linnakangas       1008           18943 :         SpinLockRelease(&walrcv->mutex);
 4832 heikki.linnakangas       1009 ECB             : 
                               1010                 :         /* Signal the startup process and walsender that new WAL has arrived */
 4589 heikki.linnakangas       1011 CBC       18943 :         WakeupRecovery();
 4282 simon                    1012 GIC       18943 :         if (AllowCascadeReplication())
    1 andres                   1013 GNC       18943 :             WalSndWakeup(true, false);
                               1014                 : 
 4832 heikki.linnakangas       1015 ECB             :         /* Report XLOG streaming progress in PS display */
 4689 tgl                      1016 GIC       18943 :         if (update_process_title)
 4689 tgl                      1017 ECB             :         {
                               1018                 :             char        activitymsg[50];
                               1019                 : 
 4689 tgl                      1020 GIC       18943 :             snprintf(activitymsg, sizeof(activitymsg), "streaming %X/%X",
  775 peter                    1021           18943 :                      LSN_FORMAT_ARGS(LogstreamResult.Write));
 1124 peter                    1022 CBC       18943 :             set_ps_display(activitymsg);
 4689 tgl                      1023 ECB             :         }
                               1024                 : 
 1029 andres                   1025                 :         /* Also let the primary know that we made some progress */
 4435 rhaas                    1026 CBC       18943 :         if (!dying)
 3370 heikki.linnakangas       1027 ECB             :         {
 3832 heikki.linnakangas       1028 GIC       18942 :             XLogWalRcvSendReply(false, false);
 3370 heikki.linnakangas       1029 CBC       18942 :             XLogWalRcvSendHSFeedback(false);
                               1030                 :         }
                               1031                 :     }
 4832                          1032           19365 : }
 4441 heikki.linnakangas       1033 ECB             : 
  577 fujii                    1034                 : /*
                               1035                 :  * Close the current segment.
                               1036                 :  *
                               1037                 :  * Flush the segment to disk before closing it. Otherwise we have to
                               1038                 :  * reopen and fsync it later.
                               1039                 :  *
                               1040                 :  * Create an archive notification file since the segment is known completed.
                               1041                 :  */
                               1042                 : static void
  520 rhaas                    1043 CBC          49 : XLogWalRcvClose(XLogRecPtr recptr, TimeLineID tli)
                               1044                 : {
                               1045                 :     char        xlogfname[MAXFNAMELEN];
                               1046                 : 
  577 fujii                    1047              49 :     Assert(recvFile >= 0 && !XLByteInSeg(recptr, recvSegNo, wal_segment_size));
  520 rhaas                    1048 GIC          49 :     Assert(tli != 0);
  577 fujii                    1049 ECB             : 
                               1050                 :     /*
                               1051                 :      * fsync() and close current file before we switch to next one. We would
                               1052                 :      * otherwise have to reopen this file to fsync it later
                               1053                 :      */
  520 rhaas                    1054 GIC          49 :     XLogWalRcvFlush(false, tli);
                               1055                 : 
  577 fujii                    1056              49 :     XLogFileName(xlogfname, recvFileTLI, recvSegNo, wal_segment_size);
                               1057                 : 
                               1058                 :     /*
                               1059                 :      * XLOG segment files will be re-read by recovery in startup process soon,
                               1060                 :      * so we don't advise the OS to release cache pages associated with the
                               1061                 :      * file like XLogFileClose() does.
                               1062                 :      */
                               1063              49 :     if (close(recvFile) != 0)
  577 fujii                    1064 LBC           0 :         ereport(PANIC,
                               1065                 :                 (errcode_for_file_access(),
                               1066                 :                  errmsg("could not close WAL segment %s: %m",
                               1067                 :                         xlogfname)));
  577 fujii                    1068 ECB             : 
                               1069                 :     /*
                               1070                 :      * Create .done file forcibly to prevent the streamed segment from being
                               1071                 :      * archived later.
                               1072                 :      */
  577 fujii                    1073 GIC          49 :     if (XLogArchiveMode != ARCHIVE_MODE_ALWAYS)
                               1074              49 :         XLogArchiveForceDone(xlogfname);
  577 fujii                    1075 ECB             :     else
  577 fujii                    1076 UIC           0 :         XLogArchiveNotify(xlogfname);
  577 fujii                    1077 ECB             : 
  577 fujii                    1078 GIC          49 :     recvFile = -1;
                               1079              49 : }
                               1080                 : 
                               1081                 : /*
                               1082                 :  * Send reply message to primary, indicating our current WAL locations, oldest
                               1083                 :  * xmin and the current time.
 3832 heikki.linnakangas       1084 ECB             :  *
 3832 heikki.linnakangas       1085 EUB             :  * If 'force' is not set, the message is only sent if enough time has
                               1086                 :  * passed since last status update to reach wal_receiver_status_interval.
                               1087                 :  * If wal_receiver_status_interval is disabled altogether and 'force' is
                               1088                 :  * false, this is a no-op.
                               1089                 :  *
                               1090                 :  * If 'requestReply' is true, requests the server to reply immediately upon
                               1091                 :  * receiving this message. This is used for heartbeats, when approaching
                               1092                 :  * wal_receiver_timeout.
                               1093                 :  */
 4441 heikki.linnakangas       1094 ECB             : static void
 3832 heikki.linnakangas       1095 CBC       50476 : XLogWalRcvSendReply(bool force, bool requestReply)
                               1096                 : {
 3805 heikki.linnakangas       1097 EUB             :     static XLogRecPtr writePtr = 0;
                               1098                 :     static XLogRecPtr flushPtr = 0;
 3805 heikki.linnakangas       1099 ECB             :     XLogRecPtr  applyPtr;
                               1100                 :     TimestampTz now;
                               1101                 : 
                               1102                 :     /*
                               1103                 :      * If the user doesn't want status to be reported to the primary, be sure
                               1104                 :      * to exit before doing anything at all.
                               1105                 :      */
 3832 heikki.linnakangas       1106 GIC       50476 :     if (!force && wal_receiver_status_interval <= 0)
 4441 heikki.linnakangas       1107 UIC           0 :         return;
                               1108                 : 
                               1109                 :     /* Get current timestamp. */
 4441 heikki.linnakangas       1110 GIC       50476 :     now = GetCurrentTimestamp();
                               1111                 : 
                               1112                 :     /*
                               1113                 :      * We can compare the write and flush positions to the last message we
                               1114                 :      * sent without taking any lock, but the apply position requires a spin
 4441 heikki.linnakangas       1115 ECB             :      * lock, so we don't check that unless something else has changed or 10
                               1116                 :      * seconds have passed.  This means that the apply WAL location will
                               1117                 :      * appear, from the primary's point of view, to lag slightly, but since
                               1118                 :      * this is only for reporting purposes and only on idle systems, that's
                               1119                 :      * probably OK.
                               1120                 :      */
 3832 heikki.linnakangas       1121 GIC       50476 :     if (!force
 3754 alvherre                 1122           38099 :         && writePtr == LogstreamResult.Write
                               1123           18969 :         && flushPtr == LogstreamResult.Flush
  152 tmunro                   1124 GNC          76 :         && now < wakeup[WALRCV_WAKEUP_REPLY])
 4441 heikki.linnakangas       1125 CBC          61 :         return;
                               1126                 : 
                               1127                 :     /* Make sure we wake up when it's time to send another reply. */
  152 tmunro                   1128 GNC       50415 :     WalRcvComputeNextWakeup(WALRCV_WAKEUP_REPLY, now);
                               1129                 : 
                               1130                 :     /* Construct a new message */
 3805 heikki.linnakangas       1131 CBC       50415 :     writePtr = LogstreamResult.Write;
 3805 heikki.linnakangas       1132 GIC       50415 :     flushPtr = LogstreamResult.Flush;
 3762                          1133           50415 :     applyPtr = GetXLogReplayRecPtr(NULL);
                               1134                 : 
 3805                          1135           50415 :     resetStringInfo(&reply_message);
                               1136           50415 :     pq_sendbyte(&reply_message, 'r');
                               1137           50415 :     pq_sendint64(&reply_message, writePtr);
                               1138           50415 :     pq_sendint64(&reply_message, flushPtr);
                               1139           50415 :     pq_sendint64(&reply_message, applyPtr);
 2236 tgl                      1140           50415 :     pq_sendint64(&reply_message, GetCurrentTimestamp());
 3805 heikki.linnakangas       1141           50415 :     pq_sendbyte(&reply_message, requestReply ? 1 : 0);
 3805 heikki.linnakangas       1142 ECB             : 
                               1143                 :     /* Send it */
 3805 heikki.linnakangas       1144 CBC       50415 :     elog(DEBUG2, "sending write %X/%X flush %X/%X apply %X/%X%s",
  775 peter                    1145 ECB             :          LSN_FORMAT_ARGS(writePtr),
                               1146                 :          LSN_FORMAT_ARGS(flushPtr),
                               1147                 :          LSN_FORMAT_ARGS(applyPtr),
                               1148                 :          requestReply ? " (reply requested)" : "");
 3805 heikki.linnakangas       1149                 : 
 2321 peter_e                  1150 GIC       50415 :     walrcv_send(wrconn, reply_message.data, reply_message.len);
                               1151                 : }
 4433 simon                    1152 ECB             : 
                               1153                 : /*
                               1154                 :  * Send hot standby feedback message to primary, plus the current time,
                               1155                 :  * in case they don't have a watch.
 3716                          1156                 :  *
                               1157                 :  * If the user disables feedback, send one final message to tell sender
 2264                          1158                 :  * to forget about the xmin on this standby. We also send this message
                               1159                 :  * on first connect because a previous connection might have set xmin
                               1160                 :  * on a replication slot. (If we're not using a slot it's harmless to
                               1161                 :  * send a feedback message explicitly setting InvalidTransactionId).
 4433                          1162                 :  */
                               1163                 : static void
 3716 simon                    1164 GIC       19090 : XLogWalRcvSendHSFeedback(bool immed)
 4433 simon                    1165 ECB             : {
                               1166                 :     TimestampTz now;
                               1167                 :     FullTransactionId nextFullXid;
                               1168                 :     TransactionId nextXid;
                               1169                 :     uint32      xmin_epoch,
                               1170                 :                 catalog_xmin_epoch;
 2153 bruce                    1171                 :     TransactionId xmin,
                               1172                 :                 catalog_xmin;
                               1173                 : 
                               1174                 :     /* initially true so we always send at least one feedback message */
                               1175                 :     static bool primary_has_standby_xmin = true;
                               1176                 : 
                               1177                 :     /*
                               1178                 :      * If the user doesn't want status to be reported to the primary, be sure
                               1179                 :      * to exit before doing anything at all.
                               1180                 :      */
 3716 simon                    1181 GIC       19090 :     if ((wal_receiver_status_interval <= 0 || !hot_standby_feedback) &&
 1029 andres                   1182           18943 :         !primary_has_standby_xmin)
 4433 simon                    1183           18962 :         return;
 4433 simon                    1184 ECB             : 
                               1185                 :     /* Get current timestamp. */
 4433 simon                    1186 GIC         257 :     now = GetCurrentTimestamp();
                               1187                 : 
                               1188                 :     /* Send feedback at most once per wal_receiver_status_interval. */
  152 tmunro                   1189 GNC         257 :     if (!immed && now < wakeup[WALRCV_WAKEUP_HSFEEDBACK])
                               1190             128 :         return;
                               1191                 : 
                               1192                 :     /* Make sure we wake up when it's time to send feedback again. */
                               1193             129 :     WalRcvComputeNextWakeup(WALRCV_WAKEUP_HSFEEDBACK, now);
                               1194                 : 
                               1195                 :     /*
                               1196                 :      * If Hot Standby is not yet accepting connections there is nothing to
 2264 simon                    1197 ECB             :      * send. Check this after the interval has expired to reduce number of
                               1198                 :      * calls.
                               1199                 :      *
                               1200                 :      * Bailing out here also ensures that we don't send feedback until we've
                               1201                 :      * read our own replication slot state, so we don't tell the primary to
 2153 bruce                    1202                 :      * discard needed xmin or catalog_xmin from any slots that may exist on
                               1203                 :      * this replica.
                               1204                 :      */
 4433 simon                    1205 CBC         129 :     if (!HotStandbyActive())
                               1206               1 :         return;
                               1207                 : 
                               1208                 :     /*
 4382 bruce                    1209 ECB             :      * Make the expensive call to get the oldest xmin once we are certain
                               1210                 :      * everything else has been checked.
                               1211                 :      */
 3716 simon                    1212 GIC         128 :     if (hot_standby_feedback)
                               1213                 :     {
  970 andres                   1214              21 :         GetReplicationHorizons(&xmin, &catalog_xmin);
                               1215                 :     }
                               1216                 :     else
                               1217                 :     {
 3716 simon                    1218             107 :         xmin = InvalidTransactionId;
 2206                          1219             107 :         catalog_xmin = InvalidTransactionId;
                               1220                 :     }
 4433 simon                    1221 ECB             : 
                               1222                 :     /*
                               1223                 :      * Get epoch and adjust if nextXid and oldestXmin are different sides of
                               1224                 :      * the epoch boundary.
                               1225                 :      */
 1473 tmunro                   1226 GIC         128 :     nextFullXid = ReadNextFullTransactionId();
                               1227             128 :     nextXid = XidFromFullTransactionId(nextFullXid);
 1473 tmunro                   1228 CBC         128 :     xmin_epoch = EpochFromFullTransactionId(nextFullXid);
 2206 simon                    1229 GIC         128 :     catalog_xmin_epoch = xmin_epoch;
 4433 simon                    1230 CBC         128 :     if (nextXid < xmin)
 2153 bruce                    1231 UIC           0 :         xmin_epoch--;
 2206 simon                    1232 GIC         128 :     if (nextXid < catalog_xmin)
 2153 bruce                    1233 UIC           0 :         catalog_xmin_epoch--;
 4433 simon                    1234 ECB             : 
 2206 simon                    1235 CBC         128 :     elog(DEBUG2, "sending hot standby feedback xmin %u epoch %u catalog_xmin %u catalog_xmin_epoch %u",
                               1236                 :          xmin, xmin_epoch, catalog_xmin, catalog_xmin_epoch);
                               1237                 : 
                               1238                 :     /* Construct the message and send it. */
 3805 heikki.linnakangas       1239 GIC         128 :     resetStringInfo(&reply_message);
                               1240             128 :     pq_sendbyte(&reply_message, 'h');
 2236 tgl                      1241             128 :     pq_sendint64(&reply_message, GetCurrentTimestamp());
 2006 andres                   1242 CBC         128 :     pq_sendint32(&reply_message, xmin);
                               1243             128 :     pq_sendint32(&reply_message, xmin_epoch);
                               1244             128 :     pq_sendint32(&reply_message, catalog_xmin);
                               1245             128 :     pq_sendint32(&reply_message, catalog_xmin_epoch);
 2321 peter_e                  1246             128 :     walrcv_send(wrconn, reply_message.data, reply_message.len);
 2206 simon                    1247 GBC         128 :     if (TransactionIdIsValid(xmin) || TransactionIdIsValid(catalog_xmin))
 1029 andres                   1248 CBC          21 :         primary_has_standby_xmin = true;
 3716 simon                    1249 EUB             :     else
 1029 andres                   1250 GIC         107 :         primary_has_standby_xmin = false;
 4433 simon                    1251 ECB             : }
                               1252                 : 
                               1253                 : /*
                               1254                 :  * Update shared memory status upon receiving a message from primary.
 3805 heikki.linnakangas       1255                 :  *
                               1256                 :  * 'walEnd' and 'sendTime' are the end-of-WAL and timestamp of the latest
                               1257                 :  * message, reported by primary.
 4117 simon                    1258                 :  */
                               1259                 : static void
 4117 simon                    1260 CBC       21673 : ProcessWalSndrMessage(XLogRecPtr walEnd, TimestampTz sendTime)
 4117 simon                    1261 ECB             : {
 2742 rhaas                    1262 CBC       21673 :     WalRcvData *walrcv = WalRcv;
 4117 simon                    1263           21673 :     TimestampTz lastMsgReceiptTime = GetCurrentTimestamp();
                               1264                 : 
 4117 simon                    1265 ECB             :     /* Update shared-memory status */
 4117 simon                    1266 GIC       21673 :     SpinLockAcquire(&walrcv->mutex);
 3754 alvherre                 1267           21673 :     if (walrcv->latestWalEnd < walEnd)
 3895 simon                    1268           18767 :         walrcv->latestWalEndTime = sendTime;
                               1269           21673 :     walrcv->latestWalEnd = walEnd;
 4117                          1270           21673 :     walrcv->lastMsgSendTime = sendTime;
                               1271           21673 :     walrcv->lastMsgReceiptTime = lastMsgReceiptTime;
                               1272           21673 :     SpinLockRelease(&walrcv->mutex);
                               1273                 : 
  867 tgl                      1274           21673 :     if (message_level_is_interesting(DEBUG2))
 3292 tgl                      1275 ECB             :     {
                               1276                 :         char       *sendtime;
                               1277                 :         char       *receipttime;
 2948 ishii                    1278                 :         int         applyDelay;
                               1279                 : 
                               1280                 :         /* Copy because timestamptz_to_str returns a static buffer */
 3292 tgl                      1281 CBC        1212 :         sendtime = pstrdup(timestamptz_to_str(sendTime));
                               1282            1212 :         receipttime = pstrdup(timestamptz_to_str(lastMsgReceiptTime));
 2948 ishii                    1283            1212 :         applyDelay = GetReplicationApplyDelay();
 2948 ishii                    1284 ECB             : 
                               1285                 :         /* apply delay is not available */
 2948 ishii                    1286 CBC        1212 :         if (applyDelay == -1)
                               1287              82 :             elog(DEBUG2, "sendtime %s receipttime %s replication apply delay (N/A) transfer latency %d ms",
                               1288                 :                  sendtime,
 2948 ishii                    1289 ECB             :                  receipttime,
                               1290                 :                  GetReplicationTransferLatency());
                               1291                 :         else
 2948 ishii                    1292 GIC        1130 :             elog(DEBUG2, "sendtime %s receipttime %s replication apply delay %d ms transfer latency %d ms",
                               1293                 :                  sendtime,
                               1294                 :                  receipttime,
                               1295                 :                  applyDelay,
 2948 ishii                    1296 ECB             :                  GetReplicationTransferLatency());
                               1297                 : 
 3292 tgl                      1298 CBC        1212 :         pfree(sendtime);
 3292 tgl                      1299 GIC        1212 :         pfree(receipttime);
                               1300                 :     }
 4117 simon                    1301 CBC       21673 : }
 2649 alvherre                 1302 ECB             : 
                               1303                 : /*
                               1304                 :  * Compute the next wakeup time for a given wakeup reason.  Can be called to
                               1305                 :  * initialize a wakeup time, to adjust it for the next wakeup, or to
                               1306                 :  * reinitialize it when GUCs have changed.  We ask the caller to pass in the
                               1307                 :  * value of "now" because this frequently avoids multiple calls of
                               1308                 :  * GetCurrentTimestamp().  It had better be a reasonably up-to-date value
                               1309                 :  * though.
                               1310                 :  */
                               1311                 : static void
  152 tmunro                   1312 GNC       94382 : WalRcvComputeNextWakeup(WalRcvWakeupReason reason, TimestampTz now)
                               1313                 : {
                               1314           94382 :     switch (reason)
                               1315                 :     {
                               1316           21796 :         case WALRCV_WAKEUP_TERMINATE:
                               1317           21796 :             if (wal_receiver_timeout <= 0)
   73 tgl                      1318 UNC           0 :                 wakeup[reason] = TIMESTAMP_INFINITY;
                               1319                 :             else
   73 tgl                      1320 GNC       21796 :                 wakeup[reason] = TimestampTzPlusMilliseconds(now, wal_receiver_timeout);
  152 tmunro                   1321           21796 :             break;
                               1322           21796 :         case WALRCV_WAKEUP_PING:
                               1323           21796 :             if (wal_receiver_timeout <= 0)
   73 tgl                      1324 UNC           0 :                 wakeup[reason] = TIMESTAMP_INFINITY;
                               1325                 :             else
   73 tgl                      1326 GNC       21796 :                 wakeup[reason] = TimestampTzPlusMilliseconds(now, wal_receiver_timeout / 2);
  152 tmunro                   1327           21796 :             break;
                               1328             252 :         case WALRCV_WAKEUP_HSFEEDBACK:
                               1329             252 :             if (!hot_standby_feedback || wal_receiver_status_interval <= 0)
   73 tgl                      1330             220 :                 wakeup[reason] = TIMESTAMP_INFINITY;
                               1331                 :             else
                               1332              32 :                 wakeup[reason] = TimestampTzPlusSeconds(now, wal_receiver_status_interval);
  152 tmunro                   1333             252 :             break;
                               1334           50538 :         case WALRCV_WAKEUP_REPLY:
                               1335           50538 :             if (wal_receiver_status_interval <= 0)
   73 tgl                      1336 UNC           0 :                 wakeup[reason] = TIMESTAMP_INFINITY;
                               1337                 :             else
   73 tgl                      1338 GNC       50538 :                 wakeup[reason] = TimestampTzPlusSeconds(now, wal_receiver_status_interval);
  152 tmunro                   1339           50538 :             break;
                               1340                 :             /* there's intentionally no default: here */
                               1341                 :     }
                               1342           94382 : }
                               1343                 : 
                               1344                 : /*
                               1345                 :  * Wake up the walreceiver main loop.
                               1346                 :  *
                               1347                 :  * This is called by the startup process whenever interesting xlog records
 2567 rhaas                    1348 ECB             :  * are applied, so that walreceiver can check if it needs to send an apply
                               1349                 :  * notification back to the primary which may be waiting in a COMMIT with
                               1350                 :  * synchronous_commit = remote_apply.
                               1351                 :  */
                               1352                 : void
 2567 rhaas                    1353 GIC       11679 : WalRcvForceReply(void)
 2567 rhaas                    1354 ECB             : {
 2014 tgl                      1355                 :     Latch      *latch;
                               1356                 : 
 2567 rhaas                    1357 CBC       11679 :     WalRcv->force_reply = true;
                               1358                 :     /* fetching the latch pointer might not be atomic, so use spinlock */
 2014 tgl                      1359 GIC       11679 :     SpinLockAcquire(&WalRcv->mutex);
                               1360           11679 :     latch = WalRcv->latch;
                               1361           11679 :     SpinLockRelease(&WalRcv->mutex);
                               1362           11679 :     if (latch)
                               1363           11558 :         SetLatch(latch);
 2567 rhaas                    1364           11679 : }
                               1365                 : 
                               1366                 : /*
                               1367                 :  * Return a string constant representing the state. This is used
 2649 alvherre                 1368 ECB             :  * in system functions and views, and should *not* be translated.
                               1369                 :  */
                               1370                 : static const char *
 2649 alvherre                 1371 UIC           0 : WalRcvGetStateString(WalRcvState state)
 2649 alvherre                 1372 ECB             : {
 2649 alvherre                 1373 LBC           0 :     switch (state)
 2649 alvherre                 1374 EUB             :     {
 2649 alvherre                 1375 UIC           0 :         case WALRCV_STOPPED:
 2649 alvherre                 1376 LBC           0 :             return "stopped";
                               1377               0 :         case WALRCV_STARTING:
                               1378               0 :             return "starting";
                               1379               0 :         case WALRCV_STREAMING:
 2649 alvherre                 1380 UBC           0 :             return "streaming";
 2649 alvherre                 1381 UIC           0 :         case WALRCV_WAITING:
 2649 alvherre                 1382 LBC           0 :             return "waiting";
                               1383               0 :         case WALRCV_RESTARTING:
                               1384               0 :             return "restarting";
                               1385               0 :         case WALRCV_STOPPING:
                               1386               0 :             return "stopping";
                               1387                 :     }
                               1388               0 :     return "UNKNOWN";
 2649 alvherre                 1389 ECB             : }
                               1390                 : 
                               1391                 : /*
 2649 alvherre                 1392 EUB             :  * Returns activity of WAL receiver, including pid, state and xlog locations
                               1393                 :  * received from the WAL sender of another server.
 2649 alvherre                 1394 ECB             :  */
                               1395                 : Datum
 2649 alvherre                 1396 GIC           3 : pg_stat_get_wal_receiver(PG_FUNCTION_ARGS)
                               1397                 : {
 2649 alvherre                 1398 ECB             :     TupleDesc   tupdesc;
                               1399                 :     Datum      *values;
                               1400                 :     bool       *nulls;
                               1401                 :     int         pid;
                               1402                 :     bool        ready_to_display;
                               1403                 :     WalRcvState state;
                               1404                 :     XLogRecPtr  receive_start_lsn;
                               1405                 :     TimeLineID  receive_start_tli;
                               1406                 :     XLogRecPtr  written_lsn;
                               1407                 :     XLogRecPtr  flushed_lsn;
                               1408                 :     TimeLineID  received_tli;
 2495 rhaas                    1409                 :     TimestampTz last_send_time;
                               1410                 :     TimestampTz last_receipt_time;
                               1411                 :     XLogRecPtr  latest_end_lsn;
                               1412                 :     TimestampTz latest_end_time;
 1835 fujii                    1413                 :     char        sender_host[NI_MAXHOST];
 1835 fujii                    1414 GIC           3 :     int         sender_port = 0;
 2014 alvherre                 1415 ECB             :     char        slotname[NAMEDATALEN];
                               1416                 :     char        conninfo[MAXCONNINFO];
 2649                          1417                 : 
 2109                          1418                 :     /* Take a lock to ensure value consistency */
 2109 alvherre                 1419 CBC           3 :     SpinLockAcquire(&WalRcv->mutex);
                               1420               3 :     pid = (int) WalRcv->pid;
 2109 alvherre                 1421 GIC           3 :     ready_to_display = WalRcv->ready_to_display;
                               1422               3 :     state = WalRcv->walRcvState;
                               1423               3 :     receive_start_lsn = WalRcv->receiveStart;
                               1424               3 :     receive_start_tli = WalRcv->receiveStartTLI;
 1057 michael                  1425               3 :     flushed_lsn = WalRcv->flushedUpto;
 2109 alvherre                 1426               3 :     received_tli = WalRcv->receivedTLI;
 2109 alvherre                 1427 GBC           3 :     last_send_time = WalRcv->lastMsgSendTime;
 2109 alvherre                 1428 GIC           3 :     last_receipt_time = WalRcv->lastMsgReceiptTime;
 2109 alvherre                 1429 GBC           3 :     latest_end_lsn = WalRcv->latestWalEnd;
 2109 alvherre                 1430 GIC           3 :     latest_end_time = WalRcv->latestWalEndTime;
 2014 alvherre                 1431 GBC           3 :     strlcpy(slotname, (char *) WalRcv->slotname, sizeof(slotname));
 1835 fujii                    1432               3 :     strlcpy(sender_host, (char *) WalRcv->sender_host, sizeof(sender_host));
                               1433               3 :     sender_port = WalRcv->sender_port;
 2014 alvherre                 1434               3 :     strlcpy(conninfo, (char *) WalRcv->conninfo, sizeof(conninfo));
 2109                          1435               3 :     SpinLockRelease(&WalRcv->mutex);
 2109 alvherre                 1436 EUB             : 
 2475                          1437                 :     /*
 2473                          1438                 :      * No WAL receiver (or not ready yet), just return a tuple with NULL
                               1439                 :      * values
 2475                          1440                 :      */
 2109 alvherre                 1441 GBC           3 :     if (pid == 0 || !ready_to_display)
 2473                          1442               3 :         PG_RETURN_NULL();
                               1443                 : 
  780 fujii                    1444 EUB             :     /*
                               1445                 :      * Read "writtenUpto" without holding a spinlock.  Note that it may not be
                               1446                 :      * consistent with the other shared variables of the WAL receiver
                               1447                 :      * protected by a spinlock, but this should not be used for data integrity
                               1448                 :      * checks.
                               1449                 :      */
  780 fujii                    1450 UIC           0 :     written_lsn = pg_atomic_read_u64(&WalRcv->writtenUpto);
                               1451                 : 
 2475 alvherre                 1452 ECB             :     /* determine result type */
 2475 alvherre                 1453 UIC           0 :     if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE)
                               1454               0 :         elog(ERROR, "return type must be a row type");
                               1455                 : 
                               1456               0 :     values = palloc0(sizeof(Datum) * tupdesc->natts);
                               1457               0 :     nulls = palloc0(sizeof(bool) * tupdesc->natts);
                               1458                 : 
                               1459                 :     /* Fetch values */
 2109                          1460               0 :     values[0] = Int32GetDatum(pid);
                               1461                 : 
  377 mail                     1462               0 :     if (!has_privs_of_role(GetUserId(), ROLE_PG_READ_ALL_STATS))
                               1463                 :     {
                               1464                 :         /*
                               1465                 :          * Only superusers and roles with privileges of pg_read_all_stats can
                               1466                 :          * see details. Other users only get the pid value to know whether it
                               1467                 :          * is a WAL receiver, but no details.
                               1468                 :          */
  282 peter                    1469 UNC           0 :         memset(&nulls[1], true, sizeof(bool) * (tupdesc->natts - 1));
 2649 alvherre                 1470 ECB             :     }
                               1471                 :     else
                               1472                 :     {
 2649 alvherre                 1473 UIC           0 :         values[1] = CStringGetTextDatum(WalRcvGetStateString(state));
                               1474                 : 
 2649 alvherre                 1475 LBC           0 :         if (XLogRecPtrIsInvalid(receive_start_lsn))
                               1476               0 :             nulls[2] = true;
 2649 alvherre                 1477 ECB             :         else
 2649 alvherre                 1478 LBC           0 :             values[2] = LSNGetDatum(receive_start_lsn);
                               1479               0 :         values[3] = Int32GetDatum(receive_start_tli);
 1057 michael                  1480               0 :         if (XLogRecPtrIsInvalid(written_lsn))
 2649 alvherre                 1481               0 :             nulls[4] = true;
 2649 alvherre                 1482 ECB             :         else
 1057 michael                  1483 LBC           0 :             values[4] = LSNGetDatum(written_lsn);
                               1484               0 :         if (XLogRecPtrIsInvalid(flushed_lsn))
                               1485               0 :             nulls[5] = true;
 1057 michael                  1486 ECB             :         else
 1057 michael                  1487 LBC           0 :             values[5] = LSNGetDatum(flushed_lsn);
                               1488               0 :         values[6] = Int32GetDatum(received_tli);
 2649 alvherre                 1489               0 :         if (last_send_time == 0)
 1057 michael                  1490               0 :             nulls[7] = true;
 2649 alvherre                 1491 ECB             :         else
 1057 michael                  1492 UIC           0 :             values[7] = TimestampTzGetDatum(last_send_time);
 2649 alvherre                 1493               0 :         if (last_receipt_time == 0)
 1057 michael                  1494               0 :             nulls[8] = true;
                               1495                 :         else
                               1496               0 :             values[8] = TimestampTzGetDatum(last_receipt_time);
 2649 alvherre                 1497 LBC           0 :         if (XLogRecPtrIsInvalid(latest_end_lsn))
 1057 michael                  1498               0 :             nulls[9] = true;
                               1499                 :         else
 1057 michael                  1500 UIC           0 :             values[9] = LSNGetDatum(latest_end_lsn);
 2649 alvherre                 1501               0 :         if (latest_end_time == 0)
 1057 michael                  1502               0 :             nulls[10] = true;
                               1503                 :         else
                               1504               0 :             values[10] = TimestampTzGetDatum(latest_end_time);
 2649 alvherre                 1505               0 :         if (*slotname == '\0')
 1057 michael                  1506 UBC           0 :             nulls[11] = true;
                               1507                 :         else
 1057 michael                  1508 UIC           0 :             values[11] = CStringGetTextDatum(slotname);
 1835 fujii                    1509 UBC           0 :         if (*sender_host == '\0')
 1057 michael                  1510               0 :             nulls[12] = true;
                               1511                 :         else
                               1512               0 :             values[12] = CStringGetTextDatum(sender_host);
 1835 fujii                    1513               0 :         if (sender_port == 0)
 1057 michael                  1514 UIC           0 :             nulls[13] = true;
                               1515                 :         else
 1057 michael                  1516 UBC           0 :             values[13] = Int32GetDatum(sender_port);
 1835 fujii                    1517 UIC           0 :         if (*conninfo == '\0')
 1057 michael                  1518 UBC           0 :             nulls[14] = true;
                               1519                 :         else
 1057 michael                  1520 UIC           0 :             values[14] = CStringGetTextDatum(conninfo);
                               1521                 :     }
                               1522                 : 
                               1523                 :     /* Returns the record as Datum */
 2109 alvherre                 1524               0 :     PG_RETURN_DATUM(HeapTupleGetDatum(heap_form_tuple(tupdesc, values, nulls)));
 2649 alvherre                 1525 EUB             : }
        

Generated by: LCOV version v1.16-55-g56c0a2a