Age Owner TLA Line data Source code
1 : /*-------------------------------------------------------------------------
2 : *
3 : * walreceiver.c
4 : *
5 : * The WAL receiver process (walreceiver) is new as of Postgres 9.0. It
6 : * is the process in the standby server that takes charge of receiving
7 : * XLOG records from a primary server during streaming replication.
8 : *
9 : * When the startup process determines that it's time to start streaming,
10 : * it instructs postmaster to start walreceiver. Walreceiver first connects
11 : * to the primary server (it will be served by a walsender process
12 : * in the primary server), and then keeps receiving XLOG records and
13 : * writing them to the disk as long as the connection is alive. As XLOG
14 : * records are received and flushed to disk, it updates the
15 : * WalRcv->flushedUpto variable in shared memory, to inform the startup
16 : * process of how far it can proceed with XLOG replay.
17 : *
18 : * A WAL receiver cannot directly load GUC parameters used when establishing
19 : * its connection to the primary. Instead it relies on parameter values
20 : * that are passed down by the startup process when streaming is requested.
21 : * This applies, for example, to the replication slot and the connection
22 : * string to be used for the connection with the primary.
23 : *
24 : * If the primary server ends streaming, but doesn't disconnect, walreceiver
25 : * goes into "waiting" mode, and waits for the startup process to give new
26 : * instructions. The startup process will treat that the same as
27 : * disconnection, and will rescan the archive/pg_wal directory. But when the
28 : * startup process wants to try streaming replication again, it will just
29 : * nudge the existing walreceiver process that's waiting, instead of launching
30 : * a new one.
31 : *
32 : * Normal termination is by SIGTERM, which instructs the walreceiver to
33 : * exit(0). Emergency termination is by SIGQUIT; like any postmaster child
34 : * process, the walreceiver will simply abort and exit on SIGQUIT. A close
35 : * of the connection and a FATAL error are treated not as a crash but as
36 : * normal operation.
37 : *
38 : * This file contains the server-facing parts of walreceiver. The libpq-
39 : * specific parts are in the libpqwalreceiver module. It's loaded
40 : * dynamically to avoid linking the server with libpq.
41 : *
42 : * Portions Copyright (c) 2010-2023, PostgreSQL Global Development Group
43 : *
44 : *
45 : * IDENTIFICATION
46 : * src/backend/replication/walreceiver.c
47 : *
48 : *-------------------------------------------------------------------------
49 : */
50 : #include "postgres.h"
51 :
52 : #include <unistd.h>
53 :
54 : #include "access/htup_details.h"
55 : #include "access/timeline.h"
56 : #include "access/transam.h"
57 : #include "access/xlog_internal.h"
58 : #include "access/xlogarchive.h"
59 : #include "access/xlogrecovery.h"
60 : #include "catalog/pg_authid.h"
61 : #include "catalog/pg_type.h"
62 : #include "common/ip.h"
63 : #include "funcapi.h"
64 : #include "libpq/pqformat.h"
65 : #include "libpq/pqsignal.h"
66 : #include "miscadmin.h"
67 : #include "pgstat.h"
68 : #include "postmaster/interrupt.h"
69 : #include "replication/walreceiver.h"
70 : #include "replication/walsender.h"
71 : #include "storage/ipc.h"
72 : #include "storage/pmsignal.h"
73 : #include "storage/proc.h"
74 : #include "storage/procarray.h"
75 : #include "storage/procsignal.h"
76 : #include "utils/acl.h"
77 : #include "utils/builtins.h"
78 : #include "utils/guc.h"
79 : #include "utils/pg_lsn.h"
80 : #include "utils/ps_status.h"
81 : #include "utils/resowner.h"
82 : #include "utils/timestamp.h"
83 :
84 :
85 : /*
86 : * GUC variables. (Other variables that affect walreceiver are in xlog.c
87 : * because they're passed down from the startup process, for better
88 : * synchronization.)
89 : */
90 : int wal_receiver_status_interval;
91 : int wal_receiver_timeout;
92 : bool hot_standby_feedback;
93 :
94 : /* libpqwalreceiver connection */
95 : static WalReceiverConn *wrconn = NULL;
96 : WalReceiverFunctionsType *WalReceiverFunctions = NULL;
97 :
98 : /*
99 : * These variables are used similarly to openLogFile/SegNo,
100 : * but for walreceiver to write the XLOG. recvFileTLI is the TimeLineID
101 : * corresponding the filename of recvFile.
102 : */
103 : static int recvFile = -1;
104 : static TimeLineID recvFileTLI = 0;
105 : static XLogSegNo recvSegNo = 0;
106 :
107 : /*
108 : * LogstreamResult indicates the byte positions that we have already
109 : * written/fsynced.
110 : */
111 : static struct
112 : {
113 : XLogRecPtr Write; /* last byte + 1 written out in the standby */
114 : XLogRecPtr Flush; /* last byte + 1 flushed in the standby */
115 : } LogstreamResult;
116 :
117 : /*
118 : * Reasons to wake up and perform periodic tasks.
119 : */
120 : typedef enum WalRcvWakeupReason
121 : {
122 : WALRCV_WAKEUP_TERMINATE,
123 : WALRCV_WAKEUP_PING,
124 : WALRCV_WAKEUP_REPLY,
125 : WALRCV_WAKEUP_HSFEEDBACK
126 : #define NUM_WALRCV_WAKEUPS (WALRCV_WAKEUP_HSFEEDBACK + 1)
127 : } WalRcvWakeupReason;
128 :
129 : /*
130 : * Wake up times for periodic tasks.
131 : */
132 : static TimestampTz wakeup[NUM_WALRCV_WAKEUPS];
133 :
134 : static StringInfoData reply_message;
135 : static StringInfoData incoming_message;
136 :
137 : /* Prototypes for private functions */
138 : static void WalRcvFetchTimeLineHistoryFiles(TimeLineID first, TimeLineID last);
139 : static void WalRcvWaitForStartPosition(XLogRecPtr *startpoint, TimeLineID *startpointTLI);
140 : static void WalRcvDie(int code, Datum arg);
141 : static void XLogWalRcvProcessMsg(unsigned char type, char *buf, Size len,
142 : TimeLineID tli);
143 : static void XLogWalRcvWrite(char *buf, Size nbytes, XLogRecPtr recptr,
144 : TimeLineID tli);
145 : static void XLogWalRcvFlush(bool dying, TimeLineID tli);
146 : static void XLogWalRcvClose(XLogRecPtr recptr, TimeLineID tli);
147 : static void XLogWalRcvSendReply(bool force, bool requestReply);
148 : static void XLogWalRcvSendHSFeedback(bool immed);
149 : static void ProcessWalSndrMessage(XLogRecPtr walEnd, TimestampTz sendTime);
150 : static void WalRcvComputeNextWakeup(WalRcvWakeupReason reason, TimestampTz now);
151 :
152 : /*
153 : * Process any interrupts the walreceiver process may have received.
154 : * This should be called any time the process's latch has become set.
155 : *
156 : * Currently, only SIGTERM is of interest. We can't just exit(1) within the
157 : * SIGTERM signal handler, because the signal might arrive in the middle of
158 : * some critical operation, like while we're holding a spinlock. Instead, the
159 : * signal handler sets a flag variable as well as setting the process's latch.
160 : * We must check the flag (by calling ProcessWalRcvInterrupts) anytime the
161 : * latch has become set. Operations that could block for a long time, such as
162 : * reading from a remote server, must pay attention to the latch too; see
163 : * libpqrcv_PQgetResult for example.
164 : */
165 : void
4832 heikki.linnakangas 166 GIC 45857 : ProcessWalRcvInterrupts(void)
167 : {
168 : /*
169 : * Although walreceiver interrupt handling doesn't use the same scheme as
170 : * regular backends, call CHECK_FOR_INTERRUPTS() to make sure we receive
171 : * any incoming signals on Win32, and also to make sure we process any
172 : * barrier events.
173 : */
174 45857 : CHECK_FOR_INTERRUPTS();
175 :
878 fujii 176 45857 : if (ShutdownRequestPending)
177 : {
4832 heikki.linnakangas 178 56 : ereport(FATAL,
179 : (errcode(ERRCODE_ADMIN_SHUTDOWN),
180 : errmsg("terminating walreceiver process due to administrator command")));
181 : }
4832 heikki.linnakangas 182 CBC 45801 : }
183 :
184 :
185 : /* Main entry point for walreceiver process */
186 : void
4827 heikki.linnakangas 187 GIC 183 : WalReceiverMain(void)
188 : {
189 : char conninfo[MAXCONNINFO];
2475 alvherre 190 ECB : char *tmp_conninfo;
191 : char slotname[NAMEDATALEN];
1181 peter 192 : bool is_temp_slot;
193 : XLogRecPtr startpoint;
3769 heikki.linnakangas 194 : TimeLineID startpointTLI;
195 : TimeLineID primaryTLI;
196 : bool first_stream;
2742 rhaas 197 GIC 183 : WalRcvData *walrcv = WalRcv;
198 : TimestampTz now;
199 : char *err;
1835 fujii 200 183 : char *sender_host = NULL;
1835 fujii 201 CBC 183 : int sender_port = 0;
202 :
203 : /*
204 : * WalRcv should be set up already (if we are a backend, we inherit this
205 : * by fork() or EXEC_BACKEND mechanism from the postmaster).
206 : */
4820 heikki.linnakangas 207 GIC 183 : Assert(walrcv != NULL);
208 :
4820 heikki.linnakangas 209 ECB : /*
210 : * Mark walreceiver as running in shared memory.
211 : *
4790 bruce 212 : * Do this as early as possible, so that if we fail later on, we'll set
213 : * state to STOPPED. If we die before this, the startup process will keep
214 : * waiting for us to start up, until it times out.
215 : */
4820 heikki.linnakangas 216 GIC 183 : SpinLockAcquire(&walrcv->mutex);
217 183 : Assert(walrcv->pid == 0);
4790 bruce 218 183 : switch (walrcv->walRcvState)
4820 heikki.linnakangas 219 ECB : {
4820 heikki.linnakangas 220 UIC 0 : case WALRCV_STOPPING:
221 : /* If we've already been requested to stop, don't start up. */
222 0 : walrcv->walRcvState = WALRCV_STOPPED;
223 : /* fall through */
224 :
4820 heikki.linnakangas 225 GIC 10 : case WALRCV_STOPPED:
226 10 : SpinLockRelease(&walrcv->mutex);
758 tmunro 227 10 : ConditionVariableBroadcast(&walrcv->walRcvStoppedCV);
4820 heikki.linnakangas 228 CBC 10 : proc_exit(1);
4820 heikki.linnakangas 229 ECB : break;
230 :
4820 heikki.linnakangas 231 GIC 173 : case WALRCV_STARTING:
4820 heikki.linnakangas 232 EUB : /* The usual case */
4820 heikki.linnakangas 233 GIC 173 : break;
4820 heikki.linnakangas 234 EUB :
3769 heikki.linnakangas 235 UIC 0 : case WALRCV_WAITING:
236 : case WALRCV_STREAMING:
3769 heikki.linnakangas 237 ECB : case WALRCV_RESTARTING:
238 : default:
4820 239 : /* Shouldn't happen */
2014 alvherre 240 LBC 0 : SpinLockRelease(&walrcv->mutex);
4820 heikki.linnakangas 241 UIC 0 : elog(PANIC, "walreceiver still running according to shared memory state");
242 : }
4820 heikki.linnakangas 243 ECB : /* Advertise our PID so that the startup process can kill us */
4820 heikki.linnakangas 244 GIC 173 : walrcv->pid = MyProcPid;
3769 heikki.linnakangas 245 CBC 173 : walrcv->walRcvState = WALRCV_STREAMING;
246 :
4820 heikki.linnakangas 247 EUB : /* Fetch information required to start streaming */
2473 alvherre 248 GIC 173 : walrcv->ready_to_display = false;
4820 heikki.linnakangas 249 173 : strlcpy(conninfo, (char *) walrcv->conninfo, MAXCONNINFO);
3355 rhaas 250 173 : strlcpy(slotname, (char *) walrcv->slotname, NAMEDATALEN);
1181 peter 251 173 : is_temp_slot = walrcv->is_temp_slot;
4422 heikki.linnakangas 252 GBC 173 : startpoint = walrcv->receiveStart;
3769 253 173 : startpointTLI = walrcv->receiveStartTLI;
254 :
255 : /*
1108 alvherre 256 ECB : * At most one of is_temp_slot and slotname can be set; otherwise,
257 : * RequestXLogStreaming messed up.
258 : */
1108 alvherre 259 GIC 173 : Assert(!is_temp_slot || (slotname[0] == '\0'));
1108 alvherre 260 ECB :
4117 simon 261 : /* Initialise to a sanish value */
73 tgl 262 GNC 173 : now = GetCurrentTimestamp();
2014 alvherre 263 CBC 173 : walrcv->lastMsgSendTime =
152 tmunro 264 173 : walrcv->lastMsgReceiptTime = walrcv->latestWalEndTime = now;
4117 simon 265 ECB :
2014 tgl 266 : /* Report the latch to use to awaken this process */
2014 tgl 267 GIC 173 : walrcv->latch = &MyProc->procLatch;
268 :
4820 heikki.linnakangas 269 173 : SpinLockRelease(&walrcv->mutex);
270 :
780 fujii 271 173 : pg_atomic_write_u64(&WalRcv->writtenUpto, 0);
1096 tmunro 272 ECB :
273 : /* Arrange to clean up at walreceiver exit */
520 rhaas 274 GIC 173 : on_shmem_exit(WalRcvDie, PointerGetDatum(&startpointTLI));
4832 heikki.linnakangas 275 ECB :
276 : /* Properly accept or ignore signals the postmaster might send us */
878 fujii 277 CBC 173 : pqsignal(SIGHUP, SignalHandlerForConfigReload); /* set flag to read config
278 : * file */
4832 heikki.linnakangas 279 GIC 173 : pqsignal(SIGINT, SIG_IGN);
878 fujii 280 CBC 173 : pqsignal(SIGTERM, SignalHandlerForShutdownRequest); /* request shutdown */
281 : /* SIGQUIT handler was already set up by InitPostmasterChild */
4832 heikki.linnakangas 282 173 : pqsignal(SIGALRM, SIG_IGN);
4832 heikki.linnakangas 283 GIC 173 : pqsignal(SIGPIPE, SIG_IGN);
1231 rhaas 284 CBC 173 : pqsignal(SIGUSR1, procsignal_sigusr1_handler);
4832 heikki.linnakangas 285 GIC 173 : pqsignal(SIGUSR2, SIG_IGN);
286 :
4832 heikki.linnakangas 287 ECB : /* Reset some signals that are accepted by postmaster but not here */
4832 heikki.linnakangas 288 GIC 173 : pqsignal(SIGCHLD, SIG_DFL);
289 :
4820 heikki.linnakangas 290 ECB : /* Load the libpq-specific functions */
4820 heikki.linnakangas 291 GIC 173 : load_file("libpqwalreceiver", false);
2321 peter_e 292 CBC 173 : if (WalReceiverFunctions == NULL)
4820 heikki.linnakangas 293 LBC 0 : elog(ERROR, "libpqwalreceiver didn't initialize correctly");
294 :
4832 heikki.linnakangas 295 ECB : /* Unblock signals (they were blocked when the postmaster forked us) */
65 tmunro 296 GNC 173 : sigprocmask(SIG_SETMASK, &UnBlockSig, NULL);
4832 heikki.linnakangas 297 ECB :
4827 298 : /* Establish the connection to the primary for XLOG streaming */
10 rhaas 299 GNC 173 : wrconn = walrcv_connect(conninfo, false, false,
300 : cluster_name[0] ? cluster_name : "walreceiver",
662 tgl 301 ECB : &err);
2271 peter_e 302 GIC 173 : if (!wrconn)
303 64 : ereport(ERROR,
662 tgl 304 ECB : (errcode(ERRCODE_CONNECTION_FAILURE),
305 : errmsg("could not connect to the primary server: %s", err)));
4832 heikki.linnakangas 306 EUB :
307 : /*
308 : * Save user-visible connection string. This clobbers the original
1835 fujii 309 ECB : * conninfo, for security. Also save host and port of the sender server
310 : * this walreceiver is connected to.
311 : */
2321 peter_e 312 CBC 109 : tmp_conninfo = walrcv_get_conninfo(wrconn);
1835 fujii 313 GIC 109 : walrcv_get_senderinfo(wrconn, &sender_host, &sender_port);
2475 alvherre 314 109 : SpinLockAcquire(&walrcv->mutex);
2475 alvherre 315 CBC 109 : memset(walrcv->conninfo, 0, MAXCONNINFO);
316 109 : if (tmp_conninfo)
2475 alvherre 317 GIC 109 : strlcpy((char *) walrcv->conninfo, tmp_conninfo, MAXCONNINFO);
318 :
1835 fujii 319 109 : memset(walrcv->sender_host, 0, NI_MAXHOST);
320 109 : if (sender_host)
321 109 : strlcpy((char *) walrcv->sender_host, sender_host, NI_MAXHOST);
322 :
323 109 : walrcv->sender_port = sender_port;
2475 alvherre 324 109 : walrcv->ready_to_display = true;
2475 alvherre 325 CBC 109 : SpinLockRelease(&walrcv->mutex);
2475 alvherre 326 ECB :
2014 alvherre 327 CBC 109 : if (tmp_conninfo)
328 109 : pfree(tmp_conninfo);
2014 alvherre 329 ECB :
1835 fujii 330 CBC 109 : if (sender_host)
1835 fujii 331 GIC 109 : pfree(sender_host);
1835 fujii 332 ECB :
3769 heikki.linnakangas 333 CBC 109 : first_stream = true;
4832 heikki.linnakangas 334 ECB : for (;;)
4832 heikki.linnakangas 335 UIC 0 : {
2321 peter_e 336 ECB : char *primary_sysid;
337 : char standby_sysid[32];
2271 338 : WalRcvStreamOptions options;
339 :
4832 heikki.linnakangas 340 : /*
3769 341 : * Check that we're connected to a valid server using the
342 : * IDENTIFY_SYSTEM replication command.
4832 343 : */
1486 peter 344 CBC 109 : primary_sysid = walrcv_identify_system(wrconn, &primaryTLI);
345 :
2321 peter_e 346 109 : snprintf(standby_sysid, sizeof(standby_sysid), UINT64_FORMAT,
347 : GetSystemIdentifier());
2321 peter_e 348 GBC 109 : if (strcmp(primary_sysid, standby_sysid) != 0)
349 : {
2321 peter_e 350 UIC 0 : ereport(ERROR,
351 : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
352 : errmsg("database system identifier differs between the primary and standby"),
353 : errdetail("The primary's identifier is %s, the standby's identifier is %s.",
354 : primary_sysid, standby_sysid)));
355 : }
356 :
4832 heikki.linnakangas 357 ECB : /*
358 : * Confirm that the current timeline of the primary is the same or
3769 359 : * ahead of ours.
360 : */
3769 heikki.linnakangas 361 CBC 109 : if (primaryTLI < startpointTLI)
3769 heikki.linnakangas 362 UIC 0 : ereport(ERROR,
662 tgl 363 EUB : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
364 : errmsg("highest timeline %u of the primary is behind recovery timeline %u",
365 : primaryTLI, startpointTLI)));
366 :
367 : /*
368 : * Get any missing history files. We do this always, even when we're
369 : * not interested in that timeline, so that if we're promoted to
370 : * become the primary later on, we don't select the same timeline that
371 : * was already used in the current primary. This isn't bullet-proof -
372 : * you'll need some external software to manage your cluster if you
373 : * need to ensure that a unique timeline id is chosen in every case,
3602 bruce 374 ECB : * but let's avoid the confusion of timeline id collisions where we
3602 bruce 375 EUB : * can.
376 : */
3748 heikki.linnakangas 377 GIC 109 : WalRcvFetchTimeLineHistoryFiles(startpointTLI, primaryTLI);
378 :
379 : /*
380 : * Create temporary replication slot if requested, and update slot
381 : * name in shared memory. (Note the slot name cannot already be set
382 : * in this case.)
383 : */
1108 alvherre 384 109 : if (is_temp_slot)
385 : {
1108 alvherre 386 UIC 0 : snprintf(slotname, sizeof(slotname),
387 : "pg_walreceiver_%lld",
388 0 : (long long int) walrcv_get_backend_pid(wrconn));
389 :
634 akapila 390 LBC 0 : walrcv_create_slot(wrconn, slotname, true, false, 0, NULL);
391 :
1108 alvherre 392 UIC 0 : SpinLockAcquire(&walrcv->mutex);
393 0 : strlcpy(walrcv->slotname, slotname, NAMEDATALEN);
394 0 : SpinLockRelease(&walrcv->mutex);
395 : }
396 :
3769 heikki.linnakangas 397 ECB : /*
398 : * Start streaming.
3769 heikki.linnakangas 399 EUB : *
400 : * We'll try to start at the requested starting point and timeline,
401 : * even if it's different from the server's latest timeline. In case
402 : * we've already reached the end of the old timeline, the server will
403 : * finish the streaming immediately, and we will go back to await
404 : * orders from the startup process. If recovery_target_timeline is
2362 rhaas 405 : * 'latest', the startup process will scan pg_wal and find the new
3769 heikki.linnakangas 406 : * history file, bump recovery target timeline, and ask us to restart
407 : * on the new timeline.
408 : */
2271 peter_e 409 GIC 109 : options.logical = false;
410 109 : options.startpoint = startpoint;
411 109 : options.slotname = slotname[0] != '\0' ? slotname : NULL;
412 109 : options.proto.physical.startpointTLI = startpointTLI;
413 109 : if (walrcv_startstreaming(wrconn, &options))
414 : {
3769 heikki.linnakangas 415 109 : if (first_stream)
416 109 : ereport(LOG,
417 : (errmsg("started streaming WAL from primary at %X/%X on timeline %u",
418 : LSN_FORMAT_ARGS(startpoint), startpointTLI)));
419 : else
3769 heikki.linnakangas 420 UIC 0 : ereport(LOG,
421 : (errmsg("restarted WAL streaming at %X/%X on timeline %u",
775 peter 422 ECB : LSN_FORMAT_ARGS(startpoint), startpointTLI)));
3769 heikki.linnakangas 423 CBC 109 : first_stream = false;
3769 heikki.linnakangas 424 ECB :
425 : /* Initialize LogstreamResult and buffers for processing messages */
3762 heikki.linnakangas 426 CBC 109 : LogstreamResult.Write = LogstreamResult.Flush = GetXLogReplayRecPtr(NULL);
3769 heikki.linnakangas 427 GIC 109 : initStringInfo(&reply_message);
3769 heikki.linnakangas 428 CBC 109 : initStringInfo(&incoming_message);
3769 heikki.linnakangas 429 ECB :
430 : /* Initialize nap wakeup times. */
152 tmunro 431 GNC 109 : now = GetCurrentTimestamp();
432 545 : for (int i = 0; i < NUM_WALRCV_WAKEUPS; ++i)
433 436 : WalRcvComputeNextWakeup(i, now);
434 :
435 : /* Send initial reply/feedback messages. */
143 436 109 : XLogWalRcvSendReply(true, false);
437 109 : XLogWalRcvSendHSFeedback(true);
143 tmunro 438 EUB :
439 : /* Loop until end-of-streaming or error */
440 : for (;;)
3832 heikki.linnakangas 441 CBC 32296 : {
442 : char *buf;
443 : int len;
2567 rhaas 444 32405 : bool endofwal = false;
2551 tgl 445 32405 : pgsocket wait_fd = PGINVALID_SOCKET;
2567 rhaas 446 ECB : int rc;
447 : TimestampTz nextWakeup;
448 : long nap;
449 :
450 : /*
3769 heikki.linnakangas 451 : * Exit walreceiver if we're not in recovery. This should not
452 : * happen, but cross-check the status here.
453 : */
3769 heikki.linnakangas 454 GIC 32405 : if (!RecoveryInProgress())
3769 heikki.linnakangas 455 UIC 0 : ereport(FATAL,
662 tgl 456 ECB : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
457 : errmsg("cannot continue WAL streaming, recovery has already ended")));
458 :
459 : /* Process any requests or signals received recently */
3769 heikki.linnakangas 460 GIC 32405 : ProcessWalRcvInterrupts();
3769 heikki.linnakangas 461 ECB :
878 fujii 462 GIC 32405 : if (ConfigReloadPending)
463 : {
878 fujii 464 CBC 14 : ConfigReloadPending = false;
3769 heikki.linnakangas 465 14 : ProcessConfigFile(PGC_SIGHUP);
466 : /* recompute wakeup times */
152 tmunro 467 GNC 14 : now = GetCurrentTimestamp();
468 70 : for (int i = 0; i < NUM_WALRCV_WAKEUPS; ++i)
469 56 : WalRcvComputeNextWakeup(i, now);
3716 simon 470 GIC 14 : XLogWalRcvSendHSFeedback(true);
471 : }
472 :
473 : /* See if we can read data immediately */
2321 peter_e 474 32405 : len = walrcv_receive(wrconn, &buf, &wait_fd);
3769 heikki.linnakangas 475 32375 : if (len != 0)
476 : {
477 : /*
3769 heikki.linnakangas 478 ECB : * Process the received data, and any subsequent data we
3769 heikki.linnakangas 479 EUB : * can read without blocking.
480 : */
481 : for (;;)
482 : {
3769 heikki.linnakangas 483 GIC 40805 : if (len > 0)
3769 heikki.linnakangas 484 ECB : {
485 : /*
486 : * Something was received from primary, so adjust
487 : * the ping and terminate wakeup times.
3602 bruce 488 : */
73 tgl 489 GNC 21673 : now = GetCurrentTimestamp();
152 tmunro 490 21673 : WalRcvComputeNextWakeup(WALRCV_WAKEUP_TERMINATE,
491 : now);
492 21673 : WalRcvComputeNextWakeup(WALRCV_WAKEUP_PING, now);
520 rhaas 493 CBC 21673 : XLogWalRcvProcessMsg(buf[0], &buf[1], len - 1,
520 rhaas 494 ECB : startpointTLI);
3769 heikki.linnakangas 495 : }
3769 heikki.linnakangas 496 CBC 19132 : else if (len == 0)
3769 heikki.linnakangas 497 GIC 19098 : break;
498 34 : else if (len < 0)
499 : {
3769 heikki.linnakangas 500 CBC 34 : ereport(LOG,
3769 heikki.linnakangas 501 ECB : (errmsg("replication terminated by primary server"),
502 : errdetail("End of WAL reached on timeline %u at %X/%X.",
503 : startpointTLI,
504 : LSN_FORMAT_ARGS(LogstreamResult.Write))));
3769 heikki.linnakangas 505 GIC 34 : endofwal = true;
506 34 : break;
507 : }
2321 peter_e 508 21673 : len = walrcv_receive(wrconn, &buf, &wait_fd);
3769 heikki.linnakangas 509 ECB : }
510 :
511 : /* Let the primary know that we received some data. */
3769 heikki.linnakangas 512 GIC 19132 : XLogWalRcvSendReply(false, false);
513 :
514 : /*
3769 heikki.linnakangas 515 ECB : * If we've written some records, flush them to disk and
516 : * let the startup process and primary server know about
517 : * them.
518 : */
520 rhaas 519 CBC 19132 : XLogWalRcvFlush(false, startpointTLI);
520 : }
521 :
2567 rhaas 522 ECB : /* Check if we need to exit the streaming loop. */
2567 rhaas 523 CBC 32374 : if (endofwal)
524 34 : break;
525 :
526 : /* Find the soonest wakeup time, to limit our nap. */
73 tgl 527 GNC 32340 : nextWakeup = TIMESTAMP_INFINITY;
152 tmunro 528 161700 : for (int i = 0; i < NUM_WALRCV_WAKEUPS; ++i)
529 129360 : nextWakeup = Min(wakeup[i], nextWakeup);
530 :
531 : /* Calculate the nap time, clamping as necessary. */
73 tgl 532 32340 : now = GetCurrentTimestamp();
533 32340 : nap = TimestampDifferenceMilliseconds(now, nextWakeup);
534 :
2567 rhaas 535 ECB : /*
536 : * Ideally we would reuse a WaitEventSet object repeatedly
537 : * here to avoid the overheads of WaitLatchOrSocket on epoll
538 : * systems, but we can't be sure that libpq (or any other
539 : * walreceiver implementation) has the same socket (even if
540 : * the fd is the same number, it may have been closed and
541 : * reopened since the last time). In future, if there is a
542 : * function for removing sockets from WaitEventSet, then we
543 : * could add and remove just the socket each time, potentially
544 : * avoiding some system calls.
545 : */
2567 rhaas 546 GIC 32340 : Assert(wait_fd != PGINVALID_SOCKET);
878 fujii 547 CBC 32340 : rc = WaitLatchOrSocket(MyLatch,
548 : WL_EXIT_ON_PM_DEATH | WL_SOCKET_READABLE |
549 : WL_TIMEOUT | WL_LATCH_SET,
550 : wait_fd,
551 : nap,
552 : WAIT_EVENT_WAL_RECEIVER_MAIN);
2567 rhaas 553 GIC 32340 : if (rc & WL_LATCH_SET)
2567 rhaas 554 ECB : {
878 fujii 555 GIC 12343 : ResetLatch(MyLatch);
1441 tgl 556 12343 : ProcessWalRcvInterrupts();
557 :
2567 rhaas 558 CBC 12299 : if (walrcv->force_reply)
2567 rhaas 559 ECB : {
560 : /*
561 : * The recovery process has asked us to send apply
562 : * feedback now. Make sure the flag is really set to
2495 563 : * false in shared memory before sending the reply, so
564 : * we don't miss a new request for a reply.
565 : */
2567 rhaas 566 GIC 12268 : walrcv->force_reply = false;
2567 rhaas 567 CBC 12268 : pg_memory_barrier();
568 12268 : XLogWalRcvSendReply(true, false);
569 : }
570 : }
2567 rhaas 571 GIC 32296 : if (rc & WL_TIMEOUT)
572 : {
573 : /*
574 : * We didn't receive anything new. If we haven't heard
575 : * anything from the server for more than
576 : * wal_receiver_timeout / 2, ping the server. Also, if
577 : * it's been longer than wal_receiver_status_interval
578 : * since the last update we sent, send a status update to
579 : * the primary anyway, to report any progress in applying
580 : * WAL.
3769 heikki.linnakangas 581 ECB : */
3602 bruce 582 CBC 25 : bool requestReply = false;
583 :
584 : /*
585 : * Check if time since last receive from primary has
586 : * reached the configured limit.
587 : */
73 tgl 588 GNC 25 : now = GetCurrentTimestamp();
152 tmunro 589 25 : if (now >= wakeup[WALRCV_WAKEUP_TERMINATE])
152 tmunro 590 UNC 0 : ereport(ERROR,
591 : (errcode(ERRCODE_CONNECTION_FAILURE),
592 : errmsg("terminating walreceiver due to timeout")));
593 :
594 : /*
595 : * If we didn't receive anything new for half of receiver
596 : * replication timeout, then ping the server.
597 : */
152 tmunro 598 GNC 25 : if (now >= wakeup[WALRCV_WAKEUP_PING])
599 : {
152 tmunro 600 UNC 0 : requestReply = true;
73 tgl 601 0 : wakeup[WALRCV_WAKEUP_PING] = TIMESTAMP_INFINITY;
602 : }
3769 heikki.linnakangas 603 ECB :
3769 heikki.linnakangas 604 GIC 25 : XLogWalRcvSendReply(requestReply, requestReply);
3716 simon 605 25 : XLogWalRcvSendHSFeedback(false);
606 : }
607 : }
608 :
4827 heikki.linnakangas 609 ECB : /*
3769 610 : * The backend finished streaming. Exit streaming COPY-mode from
3769 heikki.linnakangas 611 EUB : * our side, too.
612 : */
2321 peter_e 613 GIC 34 : walrcv_endstreaming(wrconn, &primaryTLI);
614 :
615 : /*
616 : * If the server had switched to a new timeline that we didn't
617 : * know about when we began streaming, fetch its timeline history
618 : * file now.
3733 heikki.linnakangas 619 ECB : */
3733 heikki.linnakangas 620 GIC 12 : WalRcvFetchTimeLineHistoryFiles(startpointTLI, primaryTLI);
3769 heikki.linnakangas 621 EUB : }
622 : else
3769 heikki.linnakangas 623 UIC 0 : ereport(LOG,
624 : (errmsg("primary server contains no more WAL on requested timeline %u",
3769 heikki.linnakangas 625 ECB : startpointTLI)));
626 :
627 : /*
628 : * End of WAL reached on the requested timeline. Close the last
629 : * segment, and await for new orders from the startup process.
630 : */
3769 heikki.linnakangas 631 GIC 12 : if (recvFile >= 0)
632 : {
633 : char xlogfname[MAXFNAMELEN];
3896 simon 634 ECB :
520 rhaas 635 GIC 11 : XLogWalRcvFlush(false, startpointTLI);
1223 michael 636 11 : XLogFileName(xlogfname, recvFileTLI, recvSegNo, wal_segment_size);
3769 heikki.linnakangas 637 11 : if (close(recvFile) != 0)
3769 heikki.linnakangas 638 UIC 0 : ereport(PANIC,
639 : (errcode_for_file_access(),
640 : errmsg("could not close WAL segment %s: %m",
1223 michael 641 ECB : xlogfname)));
642 :
643 : /*
3896 simon 644 EUB : * Create .done file forcibly to prevent the streamed segment from
645 : * being archived later.
646 : */
2886 heikki.linnakangas 647 GIC 11 : if (XLogArchiveMode != ARCHIVE_MODE_ALWAYS)
648 11 : XLogArchiveForceDone(xlogfname);
649 : else
582 alvherre 650 UIC 0 : XLogArchiveNotify(xlogfname);
651 : }
3769 heikki.linnakangas 652 CBC 12 : recvFile = -1;
653 :
3769 heikki.linnakangas 654 GIC 12 : elog(DEBUG1, "walreceiver ended streaming and awaits new instructions");
655 12 : WalRcvWaitForStartPosition(&startpoint, &startpointTLI);
3769 heikki.linnakangas 656 ECB : }
657 : /* not reached */
658 : }
3769 heikki.linnakangas 659 EUB :
660 : /*
661 : * Wait for startup process to set receiveStart and receiveStartTLI.
662 : */
663 : static void
3769 heikki.linnakangas 664 GIC 12 : WalRcvWaitForStartPosition(XLogRecPtr *startpoint, TimeLineID *startpointTLI)
665 : {
2742 rhaas 666 12 : WalRcvData *walrcv = WalRcv;
667 : int state;
3769 heikki.linnakangas 668 ECB :
3769 heikki.linnakangas 669 CBC 12 : SpinLockAcquire(&walrcv->mutex);
3769 heikki.linnakangas 670 GIC 12 : state = walrcv->walRcvState;
3769 heikki.linnakangas 671 GBC 12 : if (state != WALRCV_STREAMING)
672 : {
3769 heikki.linnakangas 673 LBC 0 : SpinLockRelease(&walrcv->mutex);
3769 heikki.linnakangas 674 UIC 0 : if (state == WALRCV_STOPPING)
3769 heikki.linnakangas 675 LBC 0 : proc_exit(0);
4441 heikki.linnakangas 676 ECB : else
3769 heikki.linnakangas 677 UIC 0 : elog(FATAL, "unexpected walreceiver state");
678 : }
3769 heikki.linnakangas 679 GIC 12 : walrcv->walRcvState = WALRCV_WAITING;
680 12 : walrcv->receiveStart = InvalidXLogRecPtr;
681 12 : walrcv->receiveStartTLI = 0;
682 12 : SpinLockRelease(&walrcv->mutex);
683 :
1124 peter 684 12 : set_ps_display("idle");
3769 heikki.linnakangas 685 ECB :
686 : /*
687 : * nudge startup process to notice that we've stopped streaming and are
688 : * now waiting for instructions.
689 : */
3769 heikki.linnakangas 690 CBC 12 : WakeupRecovery();
3769 heikki.linnakangas 691 ECB : for (;;)
692 : {
878 fujii 693 GIC 22 : ResetLatch(MyLatch);
3769 heikki.linnakangas 694 EUB :
3769 heikki.linnakangas 695 GBC 22 : ProcessWalRcvInterrupts();
3769 heikki.linnakangas 696 EUB :
3769 heikki.linnakangas 697 GIC 10 : SpinLockAcquire(&walrcv->mutex);
3769 heikki.linnakangas 698 GBC 10 : Assert(walrcv->walRcvState == WALRCV_RESTARTING ||
699 : walrcv->walRcvState == WALRCV_WAITING ||
3769 heikki.linnakangas 700 ECB : walrcv->walRcvState == WALRCV_STOPPING);
3769 heikki.linnakangas 701 CBC 10 : if (walrcv->walRcvState == WALRCV_RESTARTING)
3769 heikki.linnakangas 702 ECB : {
1108 alvherre 703 : /*
704 : * No need to handle changes in primary_conninfo or
326 michael 705 : * primary_slot_name here. Startup process will signal us to
706 : * terminate in case those change.
707 : */
3769 heikki.linnakangas 708 UIC 0 : *startpoint = walrcv->receiveStart;
709 0 : *startpointTLI = walrcv->receiveStartTLI;
710 0 : walrcv->walRcvState = WALRCV_STREAMING;
3769 heikki.linnakangas 711 LBC 0 : SpinLockRelease(&walrcv->mutex);
3769 heikki.linnakangas 712 UIC 0 : break;
713 : }
3769 heikki.linnakangas 714 CBC 10 : if (walrcv->walRcvState == WALRCV_STOPPING)
715 : {
4441 heikki.linnakangas 716 ECB : /*
717 : * We should've received SIGTERM if the startup process wants us
3602 bruce 718 : * to die, but might as well check it here too.
3832 heikki.linnakangas 719 : */
3769 heikki.linnakangas 720 UIC 0 : SpinLockRelease(&walrcv->mutex);
721 0 : exit(1);
3769 heikki.linnakangas 722 ECB : }
3769 heikki.linnakangas 723 GIC 10 : SpinLockRelease(&walrcv->mutex);
724 :
878 fujii 725 10 : (void) WaitLatch(MyLatch, WL_LATCH_SET | WL_EXIT_ON_PM_DEATH, 0,
726 : WAIT_EVENT_WAL_RECEIVER_WAIT_START);
727 : }
728 :
3769 heikki.linnakangas 729 UBC 0 : if (update_process_title)
3769 heikki.linnakangas 730 EUB : {
731 : char activitymsg[50];
3832 732 :
3769 heikki.linnakangas 733 UBC 0 : snprintf(activitymsg, sizeof(activitymsg), "restarting at %X/%X",
775 peter 734 UIC 0 : LSN_FORMAT_ARGS(*startpoint));
1124 peter 735 LBC 0 : set_ps_display(activitymsg);
736 : }
3769 heikki.linnakangas 737 UIC 0 : }
738 :
739 : /*
740 : * Fetch any missing timeline history files between 'first' and 'last'
3769 heikki.linnakangas 741 EUB : * (inclusive) from the server.
742 : */
743 : static void
3769 heikki.linnakangas 744 CBC 121 : WalRcvFetchTimeLineHistoryFiles(TimeLineID first, TimeLineID last)
745 : {
3602 bruce 746 ECB : TimeLineID tli;
747 :
3769 heikki.linnakangas 748 GIC 265 : for (tli = first; tli <= last; tli++)
749 : {
3748 heikki.linnakangas 750 EUB : /* there's no history file for timeline 1 */
3748 heikki.linnakangas 751 GIC 144 : if (tli != 1 && !existsTimeLineHistory(tli))
752 : {
753 : char *fname;
3769 heikki.linnakangas 754 EUB : char *content;
755 : int len;
756 : char expectedfname[MAXFNAMELEN];
757 :
3769 heikki.linnakangas 758 GBC 11 : ereport(LOG,
759 : (errmsg("fetching timeline history file for timeline %u from primary server",
760 : tli)));
761 :
2321 peter_e 762 GIC 11 : walrcv_readtimelinehistoryfile(wrconn, tli, &fname, &content, &len);
763 :
764 : /*
1029 andres 765 ECB : * Check that the filename on the primary matches what we
766 : * calculated ourselves. This is just a sanity check, it should
767 : * always match.
768 : */
3769 heikki.linnakangas 769 CBC 11 : TLHistoryFileName(expectedfname, tli);
3769 heikki.linnakangas 770 GIC 11 : if (strcmp(fname, expectedfname) != 0)
3769 heikki.linnakangas 771 UIC 0 : ereport(ERROR,
3769 heikki.linnakangas 772 ECB : (errcode(ERRCODE_PROTOCOL_VIOLATION),
773 : errmsg_internal("primary reported unexpected file name for timeline history file of timeline %u",
774 : tli)));
775 :
776 : /*
777 : * Write the file to pg_wal.
778 : */
3769 heikki.linnakangas 779 CBC 11 : writeTimeLineHistoryFile(tli, content, len);
780 :
781 : /*
782 : * Mark the streamed history file as ready for archiving if
697 tgl 783 ECB : * archive_mode is always.
784 : */
922 fujii 785 GIC 11 : if (XLogArchiveMode != ARCHIVE_MODE_ALWAYS)
786 11 : XLogArchiveForceDone(fname);
787 : else
582 alvherre 788 UIC 0 : XLogArchiveNotify(fname);
789 :
3769 heikki.linnakangas 790 CBC 11 : pfree(fname);
791 11 : pfree(content);
4441 heikki.linnakangas 792 EUB : }
793 : }
4832 heikki.linnakangas 794 GIC 121 : }
795 :
796 : /*
797 : * Mark us as STOPPED in shared memory at exit.
798 : */
799 : static void
4820 heikki.linnakangas 800 CBC 173 : WalRcvDie(int code, Datum arg)
801 : {
2742 rhaas 802 GIC 173 : WalRcvData *walrcv = WalRcv;
520 803 173 : TimeLineID *startpointTLI_p = (TimeLineID *) DatumGetPointer(arg);
804 :
805 173 : Assert(*startpointTLI_p != 0);
4832 heikki.linnakangas 806 ECB :
4465 807 : /* Ensure that all WAL records received are flushed to disk */
520 rhaas 808 GIC 173 : XLogWalRcvFlush(true, *startpointTLI_p);
4465 heikki.linnakangas 809 EUB :
810 : /* Mark ourselves inactive in shared memory */
4832 heikki.linnakangas 811 CBC 173 : SpinLockAcquire(&walrcv->mutex);
3769 812 173 : Assert(walrcv->walRcvState == WALRCV_STREAMING ||
813 : walrcv->walRcvState == WALRCV_RESTARTING ||
814 : walrcv->walRcvState == WALRCV_STARTING ||
3769 heikki.linnakangas 815 ECB : walrcv->walRcvState == WALRCV_WAITING ||
816 : walrcv->walRcvState == WALRCV_STOPPING);
3769 heikki.linnakangas 817 GIC 173 : Assert(walrcv->pid == MyProcPid);
4820 818 173 : walrcv->walRcvState = WALRCV_STOPPED;
4832 819 173 : walrcv->pid = 0;
2473 alvherre 820 173 : walrcv->ready_to_display = false;
2014 tgl 821 CBC 173 : walrcv->latch = NULL;
4832 heikki.linnakangas 822 GIC 173 : SpinLockRelease(&walrcv->mutex);
4832 heikki.linnakangas 823 ECB :
758 tmunro 824 CBC 173 : ConditionVariableBroadcast(&walrcv->walRcvStoppedCV);
825 :
4820 heikki.linnakangas 826 ECB : /* Terminate the connection gracefully. */
2321 peter_e 827 GIC 173 : if (wrconn != NULL)
828 109 : walrcv_disconnect(wrconn);
3769 heikki.linnakangas 829 ECB :
830 : /* Wake up the startup process to notice promptly that we're gone */
3769 heikki.linnakangas 831 GIC 173 : WakeupRecovery();
4832 heikki.linnakangas 832 CBC 173 : }
4832 heikki.linnakangas 833 ECB :
834 : /*
835 : * Accept the message from XLOG stream, and process it.
836 : */
837 : static void
520 rhaas 838 CBC 21673 : XLogWalRcvProcessMsg(unsigned char type, char *buf, Size len, TimeLineID tli)
4813 heikki.linnakangas 839 ECB : {
3805 840 : int hdrlen;
841 : XLogRecPtr dataStart;
842 : XLogRecPtr walEnd;
3602 bruce 843 : TimestampTz sendTime;
844 : bool replyRequested;
3805 heikki.linnakangas 845 :
3805 heikki.linnakangas 846 GIC 21673 : resetStringInfo(&incoming_message);
847 :
4813 heikki.linnakangas 848 CBC 21673 : switch (type)
4813 heikki.linnakangas 849 ECB : {
4790 bruce 850 GIC 21673 : case 'w': /* WAL records */
851 : {
3805 heikki.linnakangas 852 ECB : /* copy message to StringInfo */
3805 heikki.linnakangas 853 CBC 21673 : hdrlen = sizeof(int64) + sizeof(int64) + sizeof(int64);
3805 heikki.linnakangas 854 GIC 21673 : if (len < hdrlen)
4790 bruce 855 UIC 0 : ereport(ERROR,
856 : (errcode(ERRCODE_PROTOCOL_VIOLATION),
857 : errmsg_internal("invalid WAL message received from primary")));
3805 heikki.linnakangas 858 GIC 21673 : appendBinaryStringInfo(&incoming_message, buf, hdrlen);
3805 heikki.linnakangas 859 ECB :
860 : /* read the fields */
3805 heikki.linnakangas 861 GIC 21673 : dataStart = pq_getmsgint64(&incoming_message);
862 21673 : walEnd = pq_getmsgint64(&incoming_message);
2236 tgl 863 21673 : sendTime = pq_getmsgint64(&incoming_message);
3805 heikki.linnakangas 864 21673 : ProcessWalSndrMessage(walEnd, sendTime);
865 :
866 21673 : buf += hdrlen;
3805 heikki.linnakangas 867 CBC 21673 : len -= hdrlen;
520 rhaas 868 GIC 21673 : XLogWalRcvWrite(buf, len, dataStart, tli);
4790 bruce 869 CBC 21673 : break;
870 : }
4117 simon 871 LBC 0 : case 'k': /* Keepalive */
872 : {
873 : /* copy message to StringInfo */
3805 heikki.linnakangas 874 0 : hdrlen = sizeof(int64) + sizeof(int64) + sizeof(char);
875 0 : if (len != hdrlen)
4117 simon 876 UBC 0 : ereport(ERROR,
877 : (errcode(ERRCODE_PROTOCOL_VIOLATION),
878 : errmsg_internal("invalid keepalive message received from primary")));
3805 heikki.linnakangas 879 LBC 0 : appendBinaryStringInfo(&incoming_message, buf, hdrlen);
880 :
881 : /* read the fields */
882 0 : walEnd = pq_getmsgint64(&incoming_message);
2236 tgl 883 0 : sendTime = pq_getmsgint64(&incoming_message);
3805 heikki.linnakangas 884 0 : replyRequested = pq_getmsgbyte(&incoming_message);
3805 heikki.linnakangas 885 ECB :
3805 heikki.linnakangas 886 UIC 0 : ProcessWalSndrMessage(walEnd, sendTime);
3832 heikki.linnakangas 887 ECB :
888 : /* If the primary requested a reply, send one immediately */
3805 heikki.linnakangas 889 LBC 0 : if (replyRequested)
3832 890 0 : XLogWalRcvSendReply(true, false);
4117 simon 891 UIC 0 : break;
4117 simon 892 EUB : }
4813 heikki.linnakangas 893 UIC 0 : default:
894 0 : ereport(ERROR,
4813 heikki.linnakangas 895 EUB : (errcode(ERRCODE_PROTOCOL_VIOLATION),
4737 tgl 896 : errmsg_internal("invalid replication message type %d",
897 : type)));
898 : }
4813 heikki.linnakangas 899 GIC 21673 : }
4813 heikki.linnakangas 900 EUB :
901 : /*
902 : * Write XLOG data to disk.
4832 903 : */
904 : static void
520 rhaas 905 GBC 21673 : XLogWalRcvWrite(char *buf, Size nbytes, XLogRecPtr recptr, TimeLineID tli)
906 : {
4790 bruce 907 EUB : int startoff;
908 : int byteswritten;
909 :
520 rhaas 910 GBC 21673 : Assert(tli != 0);
520 rhaas 911 EUB :
4832 heikki.linnakangas 912 GBC 43364 : while (nbytes > 0)
913 : {
4790 bruce 914 EUB : int segbytes;
4832 heikki.linnakangas 915 :
916 : /* Close the current segment if it's completed */
577 fujii 917 GIC 21691 : if (recvFile >= 0 && !XLByteInSeg(recptr, recvSegNo, wal_segment_size))
520 rhaas 918 18 : XLogWalRcvClose(recptr, tli);
919 :
577 fujii 920 CBC 21691 : if (recvFile < 0)
921 : {
922 : /* Create/use new log file */
2028 andres 923 GIC 142 : XLByteToSeg(recptr, recvSegNo, wal_segment_size);
520 rhaas 924 142 : recvFile = XLogFileInit(recvSegNo, tli);
925 142 : recvFileTLI = tli;
4832 heikki.linnakangas 926 ECB : }
927 :
928 : /* Calculate the start offset of the received logs */
2028 andres 929 GIC 21691 : startoff = XLogSegmentOffset(recptr, wal_segment_size);
930 :
2028 andres 931 CBC 21691 : if (startoff + nbytes > wal_segment_size)
2028 andres 932 GIC 18 : segbytes = wal_segment_size - startoff;
4832 heikki.linnakangas 933 ECB : else
4832 heikki.linnakangas 934 GIC 21673 : segbytes = nbytes;
935 :
936 : /* OK to write the logs */
937 21691 : errno = 0;
4832 heikki.linnakangas 938 ECB :
192 tmunro 939 CBC 21691 : byteswritten = pg_pwrite(recvFile, buf, segbytes, (off_t) startoff);
4832 heikki.linnakangas 940 GIC 21691 : if (byteswritten <= 0)
4832 heikki.linnakangas 941 ECB : {
942 : char xlogfname[MAXFNAMELEN];
943 : int save_errno;
1223 michael 944 :
4832 heikki.linnakangas 945 : /* if write didn't set errno, assume no disk space */
4832 heikki.linnakangas 946 LBC 0 : if (errno == 0)
4832 heikki.linnakangas 947 UIC 0 : errno = ENOSPC;
948 :
1223 michael 949 0 : save_errno = errno;
1223 michael 950 LBC 0 : XLogFileName(xlogfname, recvFileTLI, recvSegNo, wal_segment_size);
1223 michael 951 UIC 0 : errno = save_errno;
4832 heikki.linnakangas 952 LBC 0 : ereport(PANIC,
4832 heikki.linnakangas 953 ECB : (errcode_for_file_access(),
954 : errmsg("could not write to WAL segment %s "
955 : "at offset %u, length %lu: %m",
956 : xlogfname, startoff, (unsigned long) segbytes)));
957 : }
958 :
959 : /* Update state for write */
3754 alvherre 960 CBC 21691 : recptr += byteswritten;
4832 heikki.linnakangas 961 ECB :
4832 heikki.linnakangas 962 GIC 21691 : nbytes -= byteswritten;
963 21691 : buf += byteswritten;
964 :
4790 bruce 965 21691 : LogstreamResult.Write = recptr;
966 : }
1096 tmunro 967 EUB :
968 : /* Update shared-memory status */
1096 tmunro 969 GIC 21673 : pg_atomic_write_u64(&WalRcv->writtenUpto, LogstreamResult.Write);
577 fujii 970 EUB :
971 : /*
972 : * Close the current segment if it's fully written up in the last cycle of
973 : * the loop, to create its archive notification file soon. Otherwise WAL
974 : * archiving of the segment will be delayed until any data in the next
975 : * segment is received and written.
976 : */
577 fujii 977 GIC 21673 : if (recvFile >= 0 && !XLByteInSeg(recptr, recvSegNo, wal_segment_size))
520 rhaas 978 31 : XLogWalRcvClose(recptr, tli);
4832 heikki.linnakangas 979 21673 : }
980 :
4435 rhaas 981 ECB : /*
982 : * Flush the log to disk.
983 : *
984 : * If we're in the midst of dying, it's unwise to do anything that might throw
985 : * an error, so we skip sending a reply in that case.
986 : */
987 : static void
520 rhaas 988 GIC 19365 : XLogWalRcvFlush(bool dying, TimeLineID tli)
989 : {
520 rhaas 990 CBC 19365 : Assert(tli != 0);
991 :
3754 alvherre 992 GIC 19365 : if (LogstreamResult.Flush < LogstreamResult.Write)
993 : {
2742 rhaas 994 18943 : WalRcvData *walrcv = WalRcv;
995 :
520 996 18943 : issue_xlog_fsync(recvFile, recvSegNo, tli);
997 :
4832 heikki.linnakangas 998 CBC 18943 : LogstreamResult.Flush = LogstreamResult.Write;
4832 heikki.linnakangas 999 ECB :
1000 : /* Update shared-memory status */
4832 heikki.linnakangas 1001 GIC 18943 : SpinLockAcquire(&walrcv->mutex);
1096 tmunro 1002 18943 : if (walrcv->flushedUpto < LogstreamResult.Flush)
1003 : {
1004 18943 : walrcv->latestChunkStart = walrcv->flushedUpto;
1005 18943 : walrcv->flushedUpto = LogstreamResult.Flush;
520 rhaas 1006 18943 : walrcv->receivedTLI = tli;
1007 : }
4832 heikki.linnakangas 1008 18943 : SpinLockRelease(&walrcv->mutex);
4832 heikki.linnakangas 1009 ECB :
1010 : /* Signal the startup process and walsender that new WAL has arrived */
4589 heikki.linnakangas 1011 CBC 18943 : WakeupRecovery();
4282 simon 1012 GIC 18943 : if (AllowCascadeReplication())
1 andres 1013 GNC 18943 : WalSndWakeup(true, false);
1014 :
4832 heikki.linnakangas 1015 ECB : /* Report XLOG streaming progress in PS display */
4689 tgl 1016 GIC 18943 : if (update_process_title)
4689 tgl 1017 ECB : {
1018 : char activitymsg[50];
1019 :
4689 tgl 1020 GIC 18943 : snprintf(activitymsg, sizeof(activitymsg), "streaming %X/%X",
775 peter 1021 18943 : LSN_FORMAT_ARGS(LogstreamResult.Write));
1124 peter 1022 CBC 18943 : set_ps_display(activitymsg);
4689 tgl 1023 ECB : }
1024 :
1029 andres 1025 : /* Also let the primary know that we made some progress */
4435 rhaas 1026 CBC 18943 : if (!dying)
3370 heikki.linnakangas 1027 ECB : {
3832 heikki.linnakangas 1028 GIC 18942 : XLogWalRcvSendReply(false, false);
3370 heikki.linnakangas 1029 CBC 18942 : XLogWalRcvSendHSFeedback(false);
1030 : }
1031 : }
4832 1032 19365 : }
4441 heikki.linnakangas 1033 ECB :
577 fujii 1034 : /*
1035 : * Close the current segment.
1036 : *
1037 : * Flush the segment to disk before closing it. Otherwise we have to
1038 : * reopen and fsync it later.
1039 : *
1040 : * Create an archive notification file since the segment is known completed.
1041 : */
1042 : static void
520 rhaas 1043 CBC 49 : XLogWalRcvClose(XLogRecPtr recptr, TimeLineID tli)
1044 : {
1045 : char xlogfname[MAXFNAMELEN];
1046 :
577 fujii 1047 49 : Assert(recvFile >= 0 && !XLByteInSeg(recptr, recvSegNo, wal_segment_size));
520 rhaas 1048 GIC 49 : Assert(tli != 0);
577 fujii 1049 ECB :
1050 : /*
1051 : * fsync() and close current file before we switch to next one. We would
1052 : * otherwise have to reopen this file to fsync it later
1053 : */
520 rhaas 1054 GIC 49 : XLogWalRcvFlush(false, tli);
1055 :
577 fujii 1056 49 : XLogFileName(xlogfname, recvFileTLI, recvSegNo, wal_segment_size);
1057 :
1058 : /*
1059 : * XLOG segment files will be re-read by recovery in startup process soon,
1060 : * so we don't advise the OS to release cache pages associated with the
1061 : * file like XLogFileClose() does.
1062 : */
1063 49 : if (close(recvFile) != 0)
577 fujii 1064 LBC 0 : ereport(PANIC,
1065 : (errcode_for_file_access(),
1066 : errmsg("could not close WAL segment %s: %m",
1067 : xlogfname)));
577 fujii 1068 ECB :
1069 : /*
1070 : * Create .done file forcibly to prevent the streamed segment from being
1071 : * archived later.
1072 : */
577 fujii 1073 GIC 49 : if (XLogArchiveMode != ARCHIVE_MODE_ALWAYS)
1074 49 : XLogArchiveForceDone(xlogfname);
577 fujii 1075 ECB : else
577 fujii 1076 UIC 0 : XLogArchiveNotify(xlogfname);
577 fujii 1077 ECB :
577 fujii 1078 GIC 49 : recvFile = -1;
1079 49 : }
1080 :
1081 : /*
1082 : * Send reply message to primary, indicating our current WAL locations, oldest
1083 : * xmin and the current time.
3832 heikki.linnakangas 1084 ECB : *
3832 heikki.linnakangas 1085 EUB : * If 'force' is not set, the message is only sent if enough time has
1086 : * passed since last status update to reach wal_receiver_status_interval.
1087 : * If wal_receiver_status_interval is disabled altogether and 'force' is
1088 : * false, this is a no-op.
1089 : *
1090 : * If 'requestReply' is true, requests the server to reply immediately upon
1091 : * receiving this message. This is used for heartbeats, when approaching
1092 : * wal_receiver_timeout.
1093 : */
4441 heikki.linnakangas 1094 ECB : static void
3832 heikki.linnakangas 1095 CBC 50476 : XLogWalRcvSendReply(bool force, bool requestReply)
1096 : {
3805 heikki.linnakangas 1097 EUB : static XLogRecPtr writePtr = 0;
1098 : static XLogRecPtr flushPtr = 0;
3805 heikki.linnakangas 1099 ECB : XLogRecPtr applyPtr;
1100 : TimestampTz now;
1101 :
1102 : /*
1103 : * If the user doesn't want status to be reported to the primary, be sure
1104 : * to exit before doing anything at all.
1105 : */
3832 heikki.linnakangas 1106 GIC 50476 : if (!force && wal_receiver_status_interval <= 0)
4441 heikki.linnakangas 1107 UIC 0 : return;
1108 :
1109 : /* Get current timestamp. */
4441 heikki.linnakangas 1110 GIC 50476 : now = GetCurrentTimestamp();
1111 :
1112 : /*
1113 : * We can compare the write and flush positions to the last message we
1114 : * sent without taking any lock, but the apply position requires a spin
4441 heikki.linnakangas 1115 ECB : * lock, so we don't check that unless something else has changed or 10
1116 : * seconds have passed. This means that the apply WAL location will
1117 : * appear, from the primary's point of view, to lag slightly, but since
1118 : * this is only for reporting purposes and only on idle systems, that's
1119 : * probably OK.
1120 : */
3832 heikki.linnakangas 1121 GIC 50476 : if (!force
3754 alvherre 1122 38099 : && writePtr == LogstreamResult.Write
1123 18969 : && flushPtr == LogstreamResult.Flush
152 tmunro 1124 GNC 76 : && now < wakeup[WALRCV_WAKEUP_REPLY])
4441 heikki.linnakangas 1125 CBC 61 : return;
1126 :
1127 : /* Make sure we wake up when it's time to send another reply. */
152 tmunro 1128 GNC 50415 : WalRcvComputeNextWakeup(WALRCV_WAKEUP_REPLY, now);
1129 :
1130 : /* Construct a new message */
3805 heikki.linnakangas 1131 CBC 50415 : writePtr = LogstreamResult.Write;
3805 heikki.linnakangas 1132 GIC 50415 : flushPtr = LogstreamResult.Flush;
3762 1133 50415 : applyPtr = GetXLogReplayRecPtr(NULL);
1134 :
3805 1135 50415 : resetStringInfo(&reply_message);
1136 50415 : pq_sendbyte(&reply_message, 'r');
1137 50415 : pq_sendint64(&reply_message, writePtr);
1138 50415 : pq_sendint64(&reply_message, flushPtr);
1139 50415 : pq_sendint64(&reply_message, applyPtr);
2236 tgl 1140 50415 : pq_sendint64(&reply_message, GetCurrentTimestamp());
3805 heikki.linnakangas 1141 50415 : pq_sendbyte(&reply_message, requestReply ? 1 : 0);
3805 heikki.linnakangas 1142 ECB :
1143 : /* Send it */
3805 heikki.linnakangas 1144 CBC 50415 : elog(DEBUG2, "sending write %X/%X flush %X/%X apply %X/%X%s",
775 peter 1145 ECB : LSN_FORMAT_ARGS(writePtr),
1146 : LSN_FORMAT_ARGS(flushPtr),
1147 : LSN_FORMAT_ARGS(applyPtr),
1148 : requestReply ? " (reply requested)" : "");
3805 heikki.linnakangas 1149 :
2321 peter_e 1150 GIC 50415 : walrcv_send(wrconn, reply_message.data, reply_message.len);
1151 : }
4433 simon 1152 ECB :
1153 : /*
1154 : * Send hot standby feedback message to primary, plus the current time,
1155 : * in case they don't have a watch.
3716 1156 : *
1157 : * If the user disables feedback, send one final message to tell sender
2264 1158 : * to forget about the xmin on this standby. We also send this message
1159 : * on first connect because a previous connection might have set xmin
1160 : * on a replication slot. (If we're not using a slot it's harmless to
1161 : * send a feedback message explicitly setting InvalidTransactionId).
4433 1162 : */
1163 : static void
3716 simon 1164 GIC 19090 : XLogWalRcvSendHSFeedback(bool immed)
4433 simon 1165 ECB : {
1166 : TimestampTz now;
1167 : FullTransactionId nextFullXid;
1168 : TransactionId nextXid;
1169 : uint32 xmin_epoch,
1170 : catalog_xmin_epoch;
2153 bruce 1171 : TransactionId xmin,
1172 : catalog_xmin;
1173 :
1174 : /* initially true so we always send at least one feedback message */
1175 : static bool primary_has_standby_xmin = true;
1176 :
1177 : /*
1178 : * If the user doesn't want status to be reported to the primary, be sure
1179 : * to exit before doing anything at all.
1180 : */
3716 simon 1181 GIC 19090 : if ((wal_receiver_status_interval <= 0 || !hot_standby_feedback) &&
1029 andres 1182 18943 : !primary_has_standby_xmin)
4433 simon 1183 18962 : return;
4433 simon 1184 ECB :
1185 : /* Get current timestamp. */
4433 simon 1186 GIC 257 : now = GetCurrentTimestamp();
1187 :
1188 : /* Send feedback at most once per wal_receiver_status_interval. */
152 tmunro 1189 GNC 257 : if (!immed && now < wakeup[WALRCV_WAKEUP_HSFEEDBACK])
1190 128 : return;
1191 :
1192 : /* Make sure we wake up when it's time to send feedback again. */
1193 129 : WalRcvComputeNextWakeup(WALRCV_WAKEUP_HSFEEDBACK, now);
1194 :
1195 : /*
1196 : * If Hot Standby is not yet accepting connections there is nothing to
2264 simon 1197 ECB : * send. Check this after the interval has expired to reduce number of
1198 : * calls.
1199 : *
1200 : * Bailing out here also ensures that we don't send feedback until we've
1201 : * read our own replication slot state, so we don't tell the primary to
2153 bruce 1202 : * discard needed xmin or catalog_xmin from any slots that may exist on
1203 : * this replica.
1204 : */
4433 simon 1205 CBC 129 : if (!HotStandbyActive())
1206 1 : return;
1207 :
1208 : /*
4382 bruce 1209 ECB : * Make the expensive call to get the oldest xmin once we are certain
1210 : * everything else has been checked.
1211 : */
3716 simon 1212 GIC 128 : if (hot_standby_feedback)
1213 : {
970 andres 1214 21 : GetReplicationHorizons(&xmin, &catalog_xmin);
1215 : }
1216 : else
1217 : {
3716 simon 1218 107 : xmin = InvalidTransactionId;
2206 1219 107 : catalog_xmin = InvalidTransactionId;
1220 : }
4433 simon 1221 ECB :
1222 : /*
1223 : * Get epoch and adjust if nextXid and oldestXmin are different sides of
1224 : * the epoch boundary.
1225 : */
1473 tmunro 1226 GIC 128 : nextFullXid = ReadNextFullTransactionId();
1227 128 : nextXid = XidFromFullTransactionId(nextFullXid);
1473 tmunro 1228 CBC 128 : xmin_epoch = EpochFromFullTransactionId(nextFullXid);
2206 simon 1229 GIC 128 : catalog_xmin_epoch = xmin_epoch;
4433 simon 1230 CBC 128 : if (nextXid < xmin)
2153 bruce 1231 UIC 0 : xmin_epoch--;
2206 simon 1232 GIC 128 : if (nextXid < catalog_xmin)
2153 bruce 1233 UIC 0 : catalog_xmin_epoch--;
4433 simon 1234 ECB :
2206 simon 1235 CBC 128 : elog(DEBUG2, "sending hot standby feedback xmin %u epoch %u catalog_xmin %u catalog_xmin_epoch %u",
1236 : xmin, xmin_epoch, catalog_xmin, catalog_xmin_epoch);
1237 :
1238 : /* Construct the message and send it. */
3805 heikki.linnakangas 1239 GIC 128 : resetStringInfo(&reply_message);
1240 128 : pq_sendbyte(&reply_message, 'h');
2236 tgl 1241 128 : pq_sendint64(&reply_message, GetCurrentTimestamp());
2006 andres 1242 CBC 128 : pq_sendint32(&reply_message, xmin);
1243 128 : pq_sendint32(&reply_message, xmin_epoch);
1244 128 : pq_sendint32(&reply_message, catalog_xmin);
1245 128 : pq_sendint32(&reply_message, catalog_xmin_epoch);
2321 peter_e 1246 128 : walrcv_send(wrconn, reply_message.data, reply_message.len);
2206 simon 1247 GBC 128 : if (TransactionIdIsValid(xmin) || TransactionIdIsValid(catalog_xmin))
1029 andres 1248 CBC 21 : primary_has_standby_xmin = true;
3716 simon 1249 EUB : else
1029 andres 1250 GIC 107 : primary_has_standby_xmin = false;
4433 simon 1251 ECB : }
1252 :
1253 : /*
1254 : * Update shared memory status upon receiving a message from primary.
3805 heikki.linnakangas 1255 : *
1256 : * 'walEnd' and 'sendTime' are the end-of-WAL and timestamp of the latest
1257 : * message, reported by primary.
4117 simon 1258 : */
1259 : static void
4117 simon 1260 CBC 21673 : ProcessWalSndrMessage(XLogRecPtr walEnd, TimestampTz sendTime)
4117 simon 1261 ECB : {
2742 rhaas 1262 CBC 21673 : WalRcvData *walrcv = WalRcv;
4117 simon 1263 21673 : TimestampTz lastMsgReceiptTime = GetCurrentTimestamp();
1264 :
4117 simon 1265 ECB : /* Update shared-memory status */
4117 simon 1266 GIC 21673 : SpinLockAcquire(&walrcv->mutex);
3754 alvherre 1267 21673 : if (walrcv->latestWalEnd < walEnd)
3895 simon 1268 18767 : walrcv->latestWalEndTime = sendTime;
1269 21673 : walrcv->latestWalEnd = walEnd;
4117 1270 21673 : walrcv->lastMsgSendTime = sendTime;
1271 21673 : walrcv->lastMsgReceiptTime = lastMsgReceiptTime;
1272 21673 : SpinLockRelease(&walrcv->mutex);
1273 :
867 tgl 1274 21673 : if (message_level_is_interesting(DEBUG2))
3292 tgl 1275 ECB : {
1276 : char *sendtime;
1277 : char *receipttime;
2948 ishii 1278 : int applyDelay;
1279 :
1280 : /* Copy because timestamptz_to_str returns a static buffer */
3292 tgl 1281 CBC 1212 : sendtime = pstrdup(timestamptz_to_str(sendTime));
1282 1212 : receipttime = pstrdup(timestamptz_to_str(lastMsgReceiptTime));
2948 ishii 1283 1212 : applyDelay = GetReplicationApplyDelay();
2948 ishii 1284 ECB :
1285 : /* apply delay is not available */
2948 ishii 1286 CBC 1212 : if (applyDelay == -1)
1287 82 : elog(DEBUG2, "sendtime %s receipttime %s replication apply delay (N/A) transfer latency %d ms",
1288 : sendtime,
2948 ishii 1289 ECB : receipttime,
1290 : GetReplicationTransferLatency());
1291 : else
2948 ishii 1292 GIC 1130 : elog(DEBUG2, "sendtime %s receipttime %s replication apply delay %d ms transfer latency %d ms",
1293 : sendtime,
1294 : receipttime,
1295 : applyDelay,
2948 ishii 1296 ECB : GetReplicationTransferLatency());
1297 :
3292 tgl 1298 CBC 1212 : pfree(sendtime);
3292 tgl 1299 GIC 1212 : pfree(receipttime);
1300 : }
4117 simon 1301 CBC 21673 : }
2649 alvherre 1302 ECB :
1303 : /*
1304 : * Compute the next wakeup time for a given wakeup reason. Can be called to
1305 : * initialize a wakeup time, to adjust it for the next wakeup, or to
1306 : * reinitialize it when GUCs have changed. We ask the caller to pass in the
1307 : * value of "now" because this frequently avoids multiple calls of
1308 : * GetCurrentTimestamp(). It had better be a reasonably up-to-date value
1309 : * though.
1310 : */
1311 : static void
152 tmunro 1312 GNC 94382 : WalRcvComputeNextWakeup(WalRcvWakeupReason reason, TimestampTz now)
1313 : {
1314 94382 : switch (reason)
1315 : {
1316 21796 : case WALRCV_WAKEUP_TERMINATE:
1317 21796 : if (wal_receiver_timeout <= 0)
73 tgl 1318 UNC 0 : wakeup[reason] = TIMESTAMP_INFINITY;
1319 : else
73 tgl 1320 GNC 21796 : wakeup[reason] = TimestampTzPlusMilliseconds(now, wal_receiver_timeout);
152 tmunro 1321 21796 : break;
1322 21796 : case WALRCV_WAKEUP_PING:
1323 21796 : if (wal_receiver_timeout <= 0)
73 tgl 1324 UNC 0 : wakeup[reason] = TIMESTAMP_INFINITY;
1325 : else
73 tgl 1326 GNC 21796 : wakeup[reason] = TimestampTzPlusMilliseconds(now, wal_receiver_timeout / 2);
152 tmunro 1327 21796 : break;
1328 252 : case WALRCV_WAKEUP_HSFEEDBACK:
1329 252 : if (!hot_standby_feedback || wal_receiver_status_interval <= 0)
73 tgl 1330 220 : wakeup[reason] = TIMESTAMP_INFINITY;
1331 : else
1332 32 : wakeup[reason] = TimestampTzPlusSeconds(now, wal_receiver_status_interval);
152 tmunro 1333 252 : break;
1334 50538 : case WALRCV_WAKEUP_REPLY:
1335 50538 : if (wal_receiver_status_interval <= 0)
73 tgl 1336 UNC 0 : wakeup[reason] = TIMESTAMP_INFINITY;
1337 : else
73 tgl 1338 GNC 50538 : wakeup[reason] = TimestampTzPlusSeconds(now, wal_receiver_status_interval);
152 tmunro 1339 50538 : break;
1340 : /* there's intentionally no default: here */
1341 : }
1342 94382 : }
1343 :
1344 : /*
1345 : * Wake up the walreceiver main loop.
1346 : *
1347 : * This is called by the startup process whenever interesting xlog records
2567 rhaas 1348 ECB : * are applied, so that walreceiver can check if it needs to send an apply
1349 : * notification back to the primary which may be waiting in a COMMIT with
1350 : * synchronous_commit = remote_apply.
1351 : */
1352 : void
2567 rhaas 1353 GIC 11679 : WalRcvForceReply(void)
2567 rhaas 1354 ECB : {
2014 tgl 1355 : Latch *latch;
1356 :
2567 rhaas 1357 CBC 11679 : WalRcv->force_reply = true;
1358 : /* fetching the latch pointer might not be atomic, so use spinlock */
2014 tgl 1359 GIC 11679 : SpinLockAcquire(&WalRcv->mutex);
1360 11679 : latch = WalRcv->latch;
1361 11679 : SpinLockRelease(&WalRcv->mutex);
1362 11679 : if (latch)
1363 11558 : SetLatch(latch);
2567 rhaas 1364 11679 : }
1365 :
1366 : /*
1367 : * Return a string constant representing the state. This is used
2649 alvherre 1368 ECB : * in system functions and views, and should *not* be translated.
1369 : */
1370 : static const char *
2649 alvherre 1371 UIC 0 : WalRcvGetStateString(WalRcvState state)
2649 alvherre 1372 ECB : {
2649 alvherre 1373 LBC 0 : switch (state)
2649 alvherre 1374 EUB : {
2649 alvherre 1375 UIC 0 : case WALRCV_STOPPED:
2649 alvherre 1376 LBC 0 : return "stopped";
1377 0 : case WALRCV_STARTING:
1378 0 : return "starting";
1379 0 : case WALRCV_STREAMING:
2649 alvherre 1380 UBC 0 : return "streaming";
2649 alvherre 1381 UIC 0 : case WALRCV_WAITING:
2649 alvherre 1382 LBC 0 : return "waiting";
1383 0 : case WALRCV_RESTARTING:
1384 0 : return "restarting";
1385 0 : case WALRCV_STOPPING:
1386 0 : return "stopping";
1387 : }
1388 0 : return "UNKNOWN";
2649 alvherre 1389 ECB : }
1390 :
1391 : /*
2649 alvherre 1392 EUB : * Returns activity of WAL receiver, including pid, state and xlog locations
1393 : * received from the WAL sender of another server.
2649 alvherre 1394 ECB : */
1395 : Datum
2649 alvherre 1396 GIC 3 : pg_stat_get_wal_receiver(PG_FUNCTION_ARGS)
1397 : {
2649 alvherre 1398 ECB : TupleDesc tupdesc;
1399 : Datum *values;
1400 : bool *nulls;
1401 : int pid;
1402 : bool ready_to_display;
1403 : WalRcvState state;
1404 : XLogRecPtr receive_start_lsn;
1405 : TimeLineID receive_start_tli;
1406 : XLogRecPtr written_lsn;
1407 : XLogRecPtr flushed_lsn;
1408 : TimeLineID received_tli;
2495 rhaas 1409 : TimestampTz last_send_time;
1410 : TimestampTz last_receipt_time;
1411 : XLogRecPtr latest_end_lsn;
1412 : TimestampTz latest_end_time;
1835 fujii 1413 : char sender_host[NI_MAXHOST];
1835 fujii 1414 GIC 3 : int sender_port = 0;
2014 alvherre 1415 ECB : char slotname[NAMEDATALEN];
1416 : char conninfo[MAXCONNINFO];
2649 1417 :
2109 1418 : /* Take a lock to ensure value consistency */
2109 alvherre 1419 CBC 3 : SpinLockAcquire(&WalRcv->mutex);
1420 3 : pid = (int) WalRcv->pid;
2109 alvherre 1421 GIC 3 : ready_to_display = WalRcv->ready_to_display;
1422 3 : state = WalRcv->walRcvState;
1423 3 : receive_start_lsn = WalRcv->receiveStart;
1424 3 : receive_start_tli = WalRcv->receiveStartTLI;
1057 michael 1425 3 : flushed_lsn = WalRcv->flushedUpto;
2109 alvherre 1426 3 : received_tli = WalRcv->receivedTLI;
2109 alvherre 1427 GBC 3 : last_send_time = WalRcv->lastMsgSendTime;
2109 alvherre 1428 GIC 3 : last_receipt_time = WalRcv->lastMsgReceiptTime;
2109 alvherre 1429 GBC 3 : latest_end_lsn = WalRcv->latestWalEnd;
2109 alvherre 1430 GIC 3 : latest_end_time = WalRcv->latestWalEndTime;
2014 alvherre 1431 GBC 3 : strlcpy(slotname, (char *) WalRcv->slotname, sizeof(slotname));
1835 fujii 1432 3 : strlcpy(sender_host, (char *) WalRcv->sender_host, sizeof(sender_host));
1433 3 : sender_port = WalRcv->sender_port;
2014 alvherre 1434 3 : strlcpy(conninfo, (char *) WalRcv->conninfo, sizeof(conninfo));
2109 1435 3 : SpinLockRelease(&WalRcv->mutex);
2109 alvherre 1436 EUB :
2475 1437 : /*
2473 1438 : * No WAL receiver (or not ready yet), just return a tuple with NULL
1439 : * values
2475 1440 : */
2109 alvherre 1441 GBC 3 : if (pid == 0 || !ready_to_display)
2473 1442 3 : PG_RETURN_NULL();
1443 :
780 fujii 1444 EUB : /*
1445 : * Read "writtenUpto" without holding a spinlock. Note that it may not be
1446 : * consistent with the other shared variables of the WAL receiver
1447 : * protected by a spinlock, but this should not be used for data integrity
1448 : * checks.
1449 : */
780 fujii 1450 UIC 0 : written_lsn = pg_atomic_read_u64(&WalRcv->writtenUpto);
1451 :
2475 alvherre 1452 ECB : /* determine result type */
2475 alvherre 1453 UIC 0 : if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE)
1454 0 : elog(ERROR, "return type must be a row type");
1455 :
1456 0 : values = palloc0(sizeof(Datum) * tupdesc->natts);
1457 0 : nulls = palloc0(sizeof(bool) * tupdesc->natts);
1458 :
1459 : /* Fetch values */
2109 1460 0 : values[0] = Int32GetDatum(pid);
1461 :
377 mail 1462 0 : if (!has_privs_of_role(GetUserId(), ROLE_PG_READ_ALL_STATS))
1463 : {
1464 : /*
1465 : * Only superusers and roles with privileges of pg_read_all_stats can
1466 : * see details. Other users only get the pid value to know whether it
1467 : * is a WAL receiver, but no details.
1468 : */
282 peter 1469 UNC 0 : memset(&nulls[1], true, sizeof(bool) * (tupdesc->natts - 1));
2649 alvherre 1470 ECB : }
1471 : else
1472 : {
2649 alvherre 1473 UIC 0 : values[1] = CStringGetTextDatum(WalRcvGetStateString(state));
1474 :
2649 alvherre 1475 LBC 0 : if (XLogRecPtrIsInvalid(receive_start_lsn))
1476 0 : nulls[2] = true;
2649 alvherre 1477 ECB : else
2649 alvherre 1478 LBC 0 : values[2] = LSNGetDatum(receive_start_lsn);
1479 0 : values[3] = Int32GetDatum(receive_start_tli);
1057 michael 1480 0 : if (XLogRecPtrIsInvalid(written_lsn))
2649 alvherre 1481 0 : nulls[4] = true;
2649 alvherre 1482 ECB : else
1057 michael 1483 LBC 0 : values[4] = LSNGetDatum(written_lsn);
1484 0 : if (XLogRecPtrIsInvalid(flushed_lsn))
1485 0 : nulls[5] = true;
1057 michael 1486 ECB : else
1057 michael 1487 LBC 0 : values[5] = LSNGetDatum(flushed_lsn);
1488 0 : values[6] = Int32GetDatum(received_tli);
2649 alvherre 1489 0 : if (last_send_time == 0)
1057 michael 1490 0 : nulls[7] = true;
2649 alvherre 1491 ECB : else
1057 michael 1492 UIC 0 : values[7] = TimestampTzGetDatum(last_send_time);
2649 alvherre 1493 0 : if (last_receipt_time == 0)
1057 michael 1494 0 : nulls[8] = true;
1495 : else
1496 0 : values[8] = TimestampTzGetDatum(last_receipt_time);
2649 alvherre 1497 LBC 0 : if (XLogRecPtrIsInvalid(latest_end_lsn))
1057 michael 1498 0 : nulls[9] = true;
1499 : else
1057 michael 1500 UIC 0 : values[9] = LSNGetDatum(latest_end_lsn);
2649 alvherre 1501 0 : if (latest_end_time == 0)
1057 michael 1502 0 : nulls[10] = true;
1503 : else
1504 0 : values[10] = TimestampTzGetDatum(latest_end_time);
2649 alvherre 1505 0 : if (*slotname == '\0')
1057 michael 1506 UBC 0 : nulls[11] = true;
1507 : else
1057 michael 1508 UIC 0 : values[11] = CStringGetTextDatum(slotname);
1835 fujii 1509 UBC 0 : if (*sender_host == '\0')
1057 michael 1510 0 : nulls[12] = true;
1511 : else
1512 0 : values[12] = CStringGetTextDatum(sender_host);
1835 fujii 1513 0 : if (sender_port == 0)
1057 michael 1514 UIC 0 : nulls[13] = true;
1515 : else
1057 michael 1516 UBC 0 : values[13] = Int32GetDatum(sender_port);
1835 fujii 1517 UIC 0 : if (*conninfo == '\0')
1057 michael 1518 UBC 0 : nulls[14] = true;
1519 : else
1057 michael 1520 UIC 0 : values[14] = CStringGetTextDatum(conninfo);
1521 : }
1522 :
1523 : /* Returns the record as Datum */
2109 alvherre 1524 0 : PG_RETURN_DATUM(HeapTupleGetDatum(heap_form_tuple(tupdesc, values, nulls)));
2649 alvherre 1525 EUB : }
|