Age Owner Branch data TLA Line data Source code
1 : : /*-------------------------------------------------------------------------
2 : : *
3 : : * walreceiver.c
4 : : *
5 : : * The WAL receiver process (walreceiver) is new as of Postgres 9.0. It
6 : : * is the process in the standby server that takes charge of receiving
7 : : * XLOG records from a primary server during streaming replication.
8 : : *
9 : : * When the startup process determines that it's time to start streaming,
10 : : * it instructs postmaster to start walreceiver. Walreceiver first connects
11 : : * to the primary server (it will be served by a walsender process
12 : : * in the primary server), and then keeps receiving XLOG records and
13 : : * writing them to the disk as long as the connection is alive. As XLOG
14 : : * records are received and flushed to disk, it updates the
15 : : * WalRcv->flushedUpto variable in shared memory, to inform the startup
16 : : * process of how far it can proceed with XLOG replay.
17 : : *
18 : : * A WAL receiver cannot directly load GUC parameters used when establishing
19 : : * its connection to the primary. Instead it relies on parameter values
20 : : * that are passed down by the startup process when streaming is requested.
21 : : * This applies, for example, to the replication slot and the connection
22 : : * string to be used for the connection with the primary.
23 : : *
24 : : * If the primary server ends streaming, but doesn't disconnect, walreceiver
25 : : * goes into "waiting" mode, and waits for the startup process to give new
26 : : * instructions. The startup process will treat that the same as
27 : : * disconnection, and will rescan the archive/pg_wal directory. But when the
28 : : * startup process wants to try streaming replication again, it will just
29 : : * nudge the existing walreceiver process that's waiting, instead of launching
30 : : * a new one.
31 : : *
32 : : * Normal termination is by SIGTERM, which instructs the walreceiver to
33 : : * exit(0). Emergency termination is by SIGQUIT; like any postmaster child
34 : : * process, the walreceiver will simply abort and exit on SIGQUIT. A close
35 : : * of the connection and a FATAL error are treated not as a crash but as
36 : : * normal operation.
37 : : *
38 : : * This file contains the server-facing parts of walreceiver. The libpq-
39 : : * specific parts are in the libpqwalreceiver module. It's loaded
40 : : * dynamically to avoid linking the server with libpq.
41 : : *
42 : : * Portions Copyright (c) 2010-2024, PostgreSQL Global Development Group
43 : : *
44 : : *
45 : : * IDENTIFICATION
46 : : * src/backend/replication/walreceiver.c
47 : : *
48 : : *-------------------------------------------------------------------------
49 : : */
50 : : #include "postgres.h"
51 : :
52 : : #include <unistd.h>
53 : :
54 : : #include "access/htup_details.h"
55 : : #include "access/timeline.h"
56 : : #include "access/transam.h"
57 : : #include "access/xlog_internal.h"
58 : : #include "access/xlogarchive.h"
59 : : #include "access/xlogrecovery.h"
60 : : #include "catalog/pg_authid.h"
61 : : #include "funcapi.h"
62 : : #include "libpq/pqformat.h"
63 : : #include "libpq/pqsignal.h"
64 : : #include "miscadmin.h"
65 : : #include "pgstat.h"
66 : : #include "postmaster/auxprocess.h"
67 : : #include "postmaster/interrupt.h"
68 : : #include "replication/walreceiver.h"
69 : : #include "replication/walsender.h"
70 : : #include "storage/ipc.h"
71 : : #include "storage/proc.h"
72 : : #include "storage/procarray.h"
73 : : #include "storage/procsignal.h"
74 : : #include "utils/acl.h"
75 : : #include "utils/builtins.h"
76 : : #include "utils/guc.h"
77 : : #include "utils/pg_lsn.h"
78 : : #include "utils/ps_status.h"
79 : : #include "utils/timestamp.h"
80 : :
81 : :
82 : : /*
83 : : * GUC variables. (Other variables that affect walreceiver are in xlog.c
84 : : * because they're passed down from the startup process, for better
85 : : * synchronization.)
86 : : */
87 : : int wal_receiver_status_interval;
88 : : int wal_receiver_timeout;
89 : : bool hot_standby_feedback;
90 : :
91 : : /* libpqwalreceiver connection */
92 : : static WalReceiverConn *wrconn = NULL;
93 : : WalReceiverFunctionsType *WalReceiverFunctions = NULL;
94 : :
95 : : /*
96 : : * These variables are used similarly to openLogFile/SegNo,
97 : : * but for walreceiver to write the XLOG. recvFileTLI is the TimeLineID
98 : : * corresponding the filename of recvFile.
99 : : */
100 : : static int recvFile = -1;
101 : : static TimeLineID recvFileTLI = 0;
102 : : static XLogSegNo recvSegNo = 0;
103 : :
104 : : /*
105 : : * LogstreamResult indicates the byte positions that we have already
106 : : * written/fsynced.
107 : : */
108 : : static struct
109 : : {
110 : : XLogRecPtr Write; /* last byte + 1 written out in the standby */
111 : : XLogRecPtr Flush; /* last byte + 1 flushed in the standby */
112 : : } LogstreamResult;
113 : :
114 : : /*
115 : : * Reasons to wake up and perform periodic tasks.
116 : : */
117 : : typedef enum WalRcvWakeupReason
118 : : {
119 : : WALRCV_WAKEUP_TERMINATE,
120 : : WALRCV_WAKEUP_PING,
121 : : WALRCV_WAKEUP_REPLY,
122 : : WALRCV_WAKEUP_HSFEEDBACK,
123 : : #define NUM_WALRCV_WAKEUPS (WALRCV_WAKEUP_HSFEEDBACK + 1)
124 : : } WalRcvWakeupReason;
125 : :
126 : : /*
127 : : * Wake up times for periodic tasks.
128 : : */
129 : : static TimestampTz wakeup[NUM_WALRCV_WAKEUPS];
130 : :
131 : : static StringInfoData reply_message;
132 : :
133 : : /* Prototypes for private functions */
134 : : static void WalRcvFetchTimeLineHistoryFiles(TimeLineID first, TimeLineID last);
135 : : static void WalRcvWaitForStartPosition(XLogRecPtr *startpoint, TimeLineID *startpointTLI);
136 : : static void WalRcvDie(int code, Datum arg);
137 : : static void XLogWalRcvProcessMsg(unsigned char type, char *buf, Size len,
138 : : TimeLineID tli);
139 : : static void XLogWalRcvWrite(char *buf, Size nbytes, XLogRecPtr recptr,
140 : : TimeLineID tli);
141 : : static void XLogWalRcvFlush(bool dying, TimeLineID tli);
142 : : static void XLogWalRcvClose(XLogRecPtr recptr, TimeLineID tli);
143 : : static void XLogWalRcvSendReply(bool force, bool requestReply);
144 : : static void XLogWalRcvSendHSFeedback(bool immed);
145 : : static void ProcessWalSndrMessage(XLogRecPtr walEnd, TimestampTz sendTime);
146 : : static void WalRcvComputeNextWakeup(WalRcvWakeupReason reason, TimestampTz now);
147 : :
148 : : /*
149 : : * Process any interrupts the walreceiver process may have received.
150 : : * This should be called any time the process's latch has become set.
151 : : *
152 : : * Currently, only SIGTERM is of interest. We can't just exit(1) within the
153 : : * SIGTERM signal handler, because the signal might arrive in the middle of
154 : : * some critical operation, like while we're holding a spinlock. Instead, the
155 : : * signal handler sets a flag variable as well as setting the process's latch.
156 : : * We must check the flag (by calling ProcessWalRcvInterrupts) anytime the
157 : : * latch has become set. Operations that could block for a long time, such as
158 : : * reading from a remote server, must pay attention to the latch too; see
159 : : * libpqrcv_PQgetResult for example.
160 : : */
161 : : void
5203 heikki.linnakangas@i 162 :CBC 46048 : ProcessWalRcvInterrupts(void)
163 : : {
164 : : /*
165 : : * Although walreceiver interrupt handling doesn't use the same scheme as
166 : : * regular backends, call CHECK_FOR_INTERRUPTS() to make sure we receive
167 : : * any incoming signals on Win32, and also to make sure we process any
168 : : * barrier events.
169 : : */
170 [ + + ]: 46048 : CHECK_FOR_INTERRUPTS();
171 : :
1249 fujii@postgresql.org 172 [ + + ]: 46045 : if (ShutdownRequestPending)
173 : : {
5203 heikki.linnakangas@i 174 [ + - ]: 73 : ereport(FATAL,
175 : : (errcode(ERRCODE_ADMIN_SHUTDOWN),
176 : : errmsg("terminating walreceiver process due to administrator command")));
177 : : }
178 : 45972 : }
179 : :
180 : :
181 : : /* Main entry point for walreceiver process */
182 : : void
27 heikki.linnakangas@i 183 :GNC 316 : WalReceiverMain(char *startup_data, size_t startup_data_len)
184 : : {
185 : : char conninfo[MAXCONNINFO];
186 : : char *tmp_conninfo;
187 : : char slotname[NAMEDATALEN];
188 : : bool is_temp_slot;
189 : : XLogRecPtr startpoint;
190 : : TimeLineID startpointTLI;
191 : : TimeLineID primaryTLI;
192 : : bool first_stream;
193 : : WalRcvData *walrcv;
194 : : TimestampTz now;
195 : : char *err;
2206 fujii@postgresql.org 196 :CBC 316 : char *sender_host = NULL;
197 : 316 : int sender_port = 0;
198 : :
27 heikki.linnakangas@i 199 [ - + ]:GNC 316 : Assert(startup_data_len == 0);
200 : :
201 : 316 : MyBackendType = B_WAL_RECEIVER;
202 : 316 : AuxiliaryProcessMainCommon();
203 : :
204 : : /*
205 : : * WalRcv should be set up already (if we are a backend, we inherit this
206 : : * by fork() or EXEC_BACKEND mechanism from the postmaster).
207 : : */
133 208 : 316 : walrcv = WalRcv;
5191 heikki.linnakangas@i 209 [ - + ]:CBC 316 : Assert(walrcv != NULL);
210 : :
211 : : /*
212 : : * Mark walreceiver as running in shared memory.
213 : : *
214 : : * Do this as early as possible, so that if we fail later on, we'll set
215 : : * state to STOPPED. If we die before this, the startup process will keep
216 : : * waiting for us to start up, until it times out.
217 : : */
218 [ - + ]: 316 : SpinLockAcquire(&walrcv->mutex);
219 [ - + ]: 316 : Assert(walrcv->pid == 0);
5161 bruce@momjian.us 220 [ - + + - ]: 316 : switch (walrcv->walRcvState)
221 : : {
5191 heikki.linnakangas@i 222 :UBC 0 : case WALRCV_STOPPING:
223 : : /* If we've already been requested to stop, don't start up. */
224 : 0 : walrcv->walRcvState = WALRCV_STOPPED;
225 : : /* fall through */
226 : :
5191 heikki.linnakangas@i 227 :GBC 3 : case WALRCV_STOPPED:
228 : 3 : SpinLockRelease(&walrcv->mutex);
1129 tmunro@postgresql.or 229 : 3 : ConditionVariableBroadcast(&walrcv->walRcvStoppedCV);
5191 heikki.linnakangas@i 230 : 3 : proc_exit(1);
231 : : break;
232 : :
5191 heikki.linnakangas@i 233 :CBC 313 : case WALRCV_STARTING:
234 : : /* The usual case */
235 : 313 : break;
236 : :
4140 heikki.linnakangas@i 237 :UBC 0 : case WALRCV_WAITING:
238 : : case WALRCV_STREAMING:
239 : : case WALRCV_RESTARTING:
240 : : default:
241 : : /* Shouldn't happen */
2385 alvherre@alvh.no-ip. 242 : 0 : SpinLockRelease(&walrcv->mutex);
5191 heikki.linnakangas@i 243 [ # # ]: 0 : elog(PANIC, "walreceiver still running according to shared memory state");
244 : : }
245 : : /* Advertise our PID so that the startup process can kill us */
5191 heikki.linnakangas@i 246 :CBC 313 : walrcv->pid = MyProcPid;
4140 247 : 313 : walrcv->walRcvState = WALRCV_STREAMING;
248 : :
249 : : /* Fetch information required to start streaming */
2844 alvherre@alvh.no-ip. 250 : 313 : walrcv->ready_to_display = false;
5191 heikki.linnakangas@i 251 : 313 : strlcpy(conninfo, (char *) walrcv->conninfo, MAXCONNINFO);
3726 rhaas@postgresql.org 252 : 313 : strlcpy(slotname, (char *) walrcv->slotname, NAMEDATALEN);
1552 peter@eisentraut.org 253 : 313 : is_temp_slot = walrcv->is_temp_slot;
4793 heikki.linnakangas@i 254 : 313 : startpoint = walrcv->receiveStart;
4140 255 : 313 : startpointTLI = walrcv->receiveStartTLI;
256 : :
257 : : /*
258 : : * At most one of is_temp_slot and slotname can be set; otherwise,
259 : : * RequestXLogStreaming messed up.
260 : : */
1479 alvherre@alvh.no-ip. 261 [ - + - - ]: 313 : Assert(!is_temp_slot || (slotname[0] == '\0'));
262 : :
263 : : /* Initialise to a sanish value */
444 tgl@sss.pgh.pa.us 264 : 313 : now = GetCurrentTimestamp();
2385 alvherre@alvh.no-ip. 265 : 313 : walrcv->lastMsgSendTime =
523 tmunro@postgresql.or 266 : 313 : walrcv->lastMsgReceiptTime = walrcv->latestWalEndTime = now;
267 : :
268 : : /* Report the latch to use to awaken this process */
2385 tgl@sss.pgh.pa.us 269 : 313 : walrcv->latch = &MyProc->procLatch;
270 : :
5191 heikki.linnakangas@i 271 : 313 : SpinLockRelease(&walrcv->mutex);
272 : :
1151 fujii@postgresql.org 273 : 313 : pg_atomic_write_u64(&WalRcv->writtenUpto, 0);
274 : :
275 : : /* Arrange to clean up at walreceiver exit */
891 rhaas@postgresql.org 276 : 313 : on_shmem_exit(WalRcvDie, PointerGetDatum(&startpointTLI));
277 : :
278 : : /* Properly accept or ignore signals the postmaster might send us */
1249 fujii@postgresql.org 279 : 313 : pqsignal(SIGHUP, SignalHandlerForConfigReload); /* set flag to read config
280 : : * file */
5203 heikki.linnakangas@i 281 : 313 : pqsignal(SIGINT, SIG_IGN);
1249 fujii@postgresql.org 282 : 313 : pqsignal(SIGTERM, SignalHandlerForShutdownRequest); /* request shutdown */
283 : : /* SIGQUIT handler was already set up by InitPostmasterChild */
5203 heikki.linnakangas@i 284 : 313 : pqsignal(SIGALRM, SIG_IGN);
285 : 313 : pqsignal(SIGPIPE, SIG_IGN);
1602 rhaas@postgresql.org 286 : 313 : pqsignal(SIGUSR1, procsignal_sigusr1_handler);
5203 heikki.linnakangas@i 287 : 313 : pqsignal(SIGUSR2, SIG_IGN);
288 : :
289 : : /* Reset some signals that are accepted by postmaster but not here */
290 : 313 : pqsignal(SIGCHLD, SIG_DFL);
291 : :
292 : : /* Load the libpq-specific functions */
5191 293 : 313 : load_file("libpqwalreceiver", false);
2692 peter_e@gmx.net 294 [ - + ]: 313 : if (WalReceiverFunctions == NULL)
5191 heikki.linnakangas@i 295 [ # # ]:UBC 0 : elog(ERROR, "libpqwalreceiver didn't initialize correctly");
296 : :
297 : : /* Unblock signals (they were blocked when the postmaster forked us) */
436 tmunro@postgresql.or 298 :CBC 313 : sigprocmask(SIG_SETMASK, &UnBlockSig, NULL);
299 : :
300 : : /* Establish the connection to the primary for XLOG streaming */
69 akapila@postgresql.o 301 [ + + ]:GNC 313 : wrconn = walrcv_connect(conninfo, true, false, false,
302 : : cluster_name[0] ? cluster_name : "walreceiver",
303 : : &err);
2642 peter_e@gmx.net 304 [ + + ]:CBC 313 : if (!wrconn)
305 [ + - ]: 176 : ereport(ERROR,
306 : : (errcode(ERRCODE_CONNECTION_FAILURE),
307 : : errmsg("could not connect to the primary server: %s", err)));
308 : :
309 : : /*
310 : : * Save user-visible connection string. This clobbers the original
311 : : * conninfo, for security. Also save host and port of the sender server
312 : : * this walreceiver is connected to.
313 : : */
2692 314 : 137 : tmp_conninfo = walrcv_get_conninfo(wrconn);
2206 fujii@postgresql.org 315 : 137 : walrcv_get_senderinfo(wrconn, &sender_host, &sender_port);
2846 alvherre@alvh.no-ip. 316 [ - + ]: 137 : SpinLockAcquire(&walrcv->mutex);
317 : 137 : memset(walrcv->conninfo, 0, MAXCONNINFO);
318 [ + - ]: 137 : if (tmp_conninfo)
319 : 137 : strlcpy((char *) walrcv->conninfo, tmp_conninfo, MAXCONNINFO);
320 : :
2206 fujii@postgresql.org 321 : 137 : memset(walrcv->sender_host, 0, NI_MAXHOST);
322 [ + - ]: 137 : if (sender_host)
323 : 137 : strlcpy((char *) walrcv->sender_host, sender_host, NI_MAXHOST);
324 : :
325 : 137 : walrcv->sender_port = sender_port;
2846 alvherre@alvh.no-ip. 326 : 137 : walrcv->ready_to_display = true;
327 : 137 : SpinLockRelease(&walrcv->mutex);
328 : :
2385 329 [ + - ]: 137 : if (tmp_conninfo)
330 : 137 : pfree(tmp_conninfo);
331 : :
2206 fujii@postgresql.org 332 [ + - ]: 137 : if (sender_host)
333 : 137 : pfree(sender_host);
334 : :
4140 heikki.linnakangas@i 335 : 137 : first_stream = true;
336 : : for (;;)
5203 heikki.linnakangas@i 337 :UBC 0 : {
338 : : char *primary_sysid;
339 : : char standby_sysid[32];
340 : : WalRcvStreamOptions options;
341 : :
342 : : /*
343 : : * Check that we're connected to a valid server using the
344 : : * IDENTIFY_SYSTEM replication command.
345 : : */
1857 peter@eisentraut.org 346 :CBC 137 : primary_sysid = walrcv_identify_system(wrconn, &primaryTLI);
347 : :
2692 peter_e@gmx.net 348 : 137 : snprintf(standby_sysid, sizeof(standby_sysid), UINT64_FORMAT,
349 : : GetSystemIdentifier());
350 [ - + ]: 137 : if (strcmp(primary_sysid, standby_sysid) != 0)
351 : : {
2692 peter_e@gmx.net 352 [ # # ]:UBC 0 : ereport(ERROR,
353 : : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
354 : : errmsg("database system identifier differs between the primary and standby"),
355 : : errdetail("The primary's identifier is %s, the standby's identifier is %s.",
356 : : primary_sysid, standby_sysid)));
357 : : }
358 : :
359 : : /*
360 : : * Confirm that the current timeline of the primary is the same or
361 : : * ahead of ours.
362 : : */
4140 heikki.linnakangas@i 363 [ - + ]:CBC 137 : if (primaryTLI < startpointTLI)
4140 heikki.linnakangas@i 364 [ # # ]:UBC 0 : ereport(ERROR,
365 : : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
366 : : errmsg("highest timeline %u of the primary is behind recovery timeline %u",
367 : : primaryTLI, startpointTLI)));
368 : :
369 : : /*
370 : : * Get any missing history files. We do this always, even when we're
371 : : * not interested in that timeline, so that if we're promoted to
372 : : * become the primary later on, we don't select the same timeline that
373 : : * was already used in the current primary. This isn't bullet-proof -
374 : : * you'll need some external software to manage your cluster if you
375 : : * need to ensure that a unique timeline id is chosen in every case,
376 : : * but let's avoid the confusion of timeline id collisions where we
377 : : * can.
378 : : */
4119 heikki.linnakangas@i 379 :CBC 137 : WalRcvFetchTimeLineHistoryFiles(startpointTLI, primaryTLI);
380 : :
381 : : /*
382 : : * Create temporary replication slot if requested, and update slot
383 : : * name in shared memory. (Note the slot name cannot already be set
384 : : * in this case.)
385 : : */
1479 alvherre@alvh.no-ip. 386 [ - + ]: 137 : if (is_temp_slot)
387 : : {
1479 alvherre@alvh.no-ip. 388 :UBC 0 : snprintf(slotname, sizeof(slotname),
389 : : "pg_walreceiver_%lld",
390 : 0 : (long long int) walrcv_get_backend_pid(wrconn));
391 : :
76 akapila@postgresql.o 392 :UNC 0 : walrcv_create_slot(wrconn, slotname, true, false, false, 0, NULL);
393 : :
1479 alvherre@alvh.no-ip. 394 [ # # ]:UBC 0 : SpinLockAcquire(&walrcv->mutex);
395 : 0 : strlcpy(walrcv->slotname, slotname, NAMEDATALEN);
396 : 0 : SpinLockRelease(&walrcv->mutex);
397 : : }
398 : :
399 : : /*
400 : : * Start streaming.
401 : : *
402 : : * We'll try to start at the requested starting point and timeline,
403 : : * even if it's different from the server's latest timeline. In case
404 : : * we've already reached the end of the old timeline, the server will
405 : : * finish the streaming immediately, and we will go back to await
406 : : * orders from the startup process. If recovery_target_timeline is
407 : : * 'latest', the startup process will scan pg_wal and find the new
408 : : * history file, bump recovery target timeline, and ask us to restart
409 : : * on the new timeline.
410 : : */
2642 peter_e@gmx.net 411 :CBC 137 : options.logical = false;
412 : 137 : options.startpoint = startpoint;
413 [ + + ]: 137 : options.slotname = slotname[0] != '\0' ? slotname : NULL;
414 : 137 : options.proto.physical.startpointTLI = startpointTLI;
415 [ + - ]: 137 : if (walrcv_startstreaming(wrconn, &options))
416 : : {
4140 heikki.linnakangas@i 417 [ + - ]: 137 : if (first_stream)
418 [ + - ]: 137 : ereport(LOG,
419 : : (errmsg("started streaming WAL from primary at %X/%X on timeline %u",
420 : : LSN_FORMAT_ARGS(startpoint), startpointTLI)));
421 : : else
4140 heikki.linnakangas@i 422 [ # # ]:UBC 0 : ereport(LOG,
423 : : (errmsg("restarted WAL streaming at %X/%X on timeline %u",
424 : : LSN_FORMAT_ARGS(startpoint), startpointTLI)));
4140 heikki.linnakangas@i 425 :CBC 137 : first_stream = false;
426 : :
427 : : /* Initialize LogstreamResult and buffers for processing messages */
4133 428 : 137 : LogstreamResult.Write = LogstreamResult.Flush = GetXLogReplayRecPtr(NULL);
4140 429 : 137 : initStringInfo(&reply_message);
430 : :
431 : : /* Initialize nap wakeup times. */
523 tmunro@postgresql.or 432 : 137 : now = GetCurrentTimestamp();
433 [ + + ]: 685 : for (int i = 0; i < NUM_WALRCV_WAKEUPS; ++i)
434 : 548 : WalRcvComputeNextWakeup(i, now);
435 : :
436 : : /* Send initial reply/feedback messages. */
514 437 : 137 : XLogWalRcvSendReply(true, false);
438 : 137 : XLogWalRcvSendHSFeedback(true);
439 : :
440 : : /* Loop until end-of-streaming or error */
441 : : for (;;)
4203 heikki.linnakangas@i 442 : 33118 : {
443 : : char *buf;
444 : : int len;
2938 rhaas@postgresql.org 445 : 33255 : bool endofwal = false;
2922 tgl@sss.pgh.pa.us 446 : 33255 : pgsocket wait_fd = PGINVALID_SOCKET;
447 : : int rc;
448 : : TimestampTz nextWakeup;
449 : : long nap;
450 : :
451 : : /*
452 : : * Exit walreceiver if we're not in recovery. This should not
453 : : * happen, but cross-check the status here.
454 : : */
4140 heikki.linnakangas@i 455 [ - + ]: 33255 : if (!RecoveryInProgress())
4140 heikki.linnakangas@i 456 [ # # ]:UBC 0 : ereport(FATAL,
457 : : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
458 : : errmsg("cannot continue WAL streaming, recovery has already ended")));
459 : :
460 : : /* Process any requests or signals received recently */
4140 heikki.linnakangas@i 461 :CBC 33255 : ProcessWalRcvInterrupts();
462 : :
1249 fujii@postgresql.org 463 [ + + ]: 33255 : if (ConfigReloadPending)
464 : : {
465 : 23 : ConfigReloadPending = false;
4140 heikki.linnakangas@i 466 : 23 : ProcessConfigFile(PGC_SIGHUP);
467 : : /* recompute wakeup times */
523 tmunro@postgresql.or 468 : 23 : now = GetCurrentTimestamp();
469 [ + + ]: 115 : for (int i = 0; i < NUM_WALRCV_WAKEUPS; ++i)
470 : 92 : WalRcvComputeNextWakeup(i, now);
4087 simon@2ndQuadrant.co 471 : 23 : XLogWalRcvSendHSFeedback(true);
472 : : }
473 : :
474 : : /* See if we can read data immediately */
2692 peter_e@gmx.net 475 : 33255 : len = walrcv_receive(wrconn, &buf, &wait_fd);
4140 heikki.linnakangas@i 476 [ + + ]: 33224 : if (len != 0)
477 : : {
478 : : /*
479 : : * Process the received data, and any subsequent data we
480 : : * can read without blocking.
481 : : */
482 : : for (;;)
483 : : {
484 [ + + ]: 46977 : if (len > 0)
485 : : {
486 : : /*
487 : : * Something was received from primary, so adjust
488 : : * the ping and terminate wakeup times.
489 : : */
444 tgl@sss.pgh.pa.us 490 : 24997 : now = GetCurrentTimestamp();
523 tmunro@postgresql.or 491 : 24997 : WalRcvComputeNextWakeup(WALRCV_WAKEUP_TERMINATE,
492 : : now);
493 : 24997 : WalRcvComputeNextWakeup(WALRCV_WAKEUP_PING, now);
891 rhaas@postgresql.org 494 : 24997 : XLogWalRcvProcessMsg(buf[0], &buf[1], len - 1,
495 : : startpointTLI);
496 : : }
4140 heikki.linnakangas@i 497 [ + + ]: 21980 : else if (len == 0)
498 : 21944 : break;
499 [ + - ]: 36 : else if (len < 0)
500 : : {
501 [ + - ]: 36 : ereport(LOG,
502 : : (errmsg("replication terminated by primary server"),
503 : : errdetail("End of WAL reached on timeline %u at %X/%X.",
504 : : startpointTLI,
505 : : LSN_FORMAT_ARGS(LogstreamResult.Write))));
506 : 36 : endofwal = true;
507 : 36 : break;
508 : : }
2692 peter_e@gmx.net 509 : 24997 : len = walrcv_receive(wrconn, &buf, &wait_fd);
510 : : }
511 : :
512 : : /* Let the primary know that we received some data. */
4140 heikki.linnakangas@i 513 : 21980 : XLogWalRcvSendReply(false, false);
514 : :
515 : : /*
516 : : * If we've written some records, flush them to disk and
517 : : * let the startup process and primary server know about
518 : : * them.
519 : : */
891 rhaas@postgresql.org 520 : 21980 : XLogWalRcvFlush(false, startpointTLI);
521 : : }
522 : :
523 : : /* Check if we need to exit the streaming loop. */
2938 524 [ + + ]: 33224 : if (endofwal)
525 : 36 : break;
526 : :
527 : : /* Find the soonest wakeup time, to limit our nap. */
444 tgl@sss.pgh.pa.us 528 : 33188 : nextWakeup = TIMESTAMP_INFINITY;
523 tmunro@postgresql.or 529 [ + + ]: 165940 : for (int i = 0; i < NUM_WALRCV_WAKEUPS; ++i)
530 : 132752 : nextWakeup = Min(wakeup[i], nextWakeup);
531 : :
532 : : /* Calculate the nap time, clamping as necessary. */
444 tgl@sss.pgh.pa.us 533 : 33188 : now = GetCurrentTimestamp();
534 : 33188 : nap = TimestampDifferenceMilliseconds(now, nextWakeup);
535 : :
536 : : /*
537 : : * Ideally we would reuse a WaitEventSet object repeatedly
538 : : * here to avoid the overheads of WaitLatchOrSocket on epoll
539 : : * systems, but we can't be sure that libpq (or any other
540 : : * walreceiver implementation) has the same socket (even if
541 : : * the fd is the same number, it may have been closed and
542 : : * reopened since the last time). In future, if there is a
543 : : * function for removing sockets from WaitEventSet, then we
544 : : * could add and remove just the socket each time, potentially
545 : : * avoiding some system calls.
546 : : */
2938 rhaas@postgresql.org 547 [ - + ]: 33188 : Assert(wait_fd != PGINVALID_SOCKET);
1249 fujii@postgresql.org 548 : 33188 : rc = WaitLatchOrSocket(MyLatch,
549 : : WL_EXIT_ON_PM_DEATH | WL_SOCKET_READABLE |
550 : : WL_TIMEOUT | WL_LATCH_SET,
551 : : wait_fd,
552 : : nap,
553 : : WAIT_EVENT_WAL_RECEIVER_MAIN);
2938 rhaas@postgresql.org 554 [ + + ]: 33179 : if (rc & WL_LATCH_SET)
555 : : {
1249 fujii@postgresql.org 556 : 11022 : ResetLatch(MyLatch);
1812 tgl@sss.pgh.pa.us 557 : 11022 : ProcessWalRcvInterrupts();
558 : :
2938 rhaas@postgresql.org 559 [ + + ]: 10961 : if (walrcv->force_reply)
560 : : {
561 : : /*
562 : : * The recovery process has asked us to send apply
563 : : * feedback now. Make sure the flag is really set to
564 : : * false in shared memory before sending the reply, so
565 : : * we don't miss a new request for a reply.
566 : : */
567 : 10906 : walrcv->force_reply = false;
568 : 10906 : pg_memory_barrier();
569 : 10906 : XLogWalRcvSendReply(true, false);
570 : : }
571 : : }
572 [ + + ]: 33118 : if (rc & WL_TIMEOUT)
573 : : {
574 : : /*
575 : : * We didn't receive anything new. If we haven't heard
576 : : * anything from the server for more than
577 : : * wal_receiver_timeout / 2, ping the server. Also, if
578 : : * it's been longer than wal_receiver_status_interval
579 : : * since the last update we sent, send a status update to
580 : : * the primary anyway, to report any progress in applying
581 : : * WAL.
582 : : */
3973 bruce@momjian.us 583 : 10 : bool requestReply = false;
584 : :
585 : : /*
586 : : * Check if time since last receive from primary has
587 : : * reached the configured limit.
588 : : */
444 tgl@sss.pgh.pa.us 589 : 10 : now = GetCurrentTimestamp();
523 tmunro@postgresql.or 590 [ - + ]: 10 : if (now >= wakeup[WALRCV_WAKEUP_TERMINATE])
523 tmunro@postgresql.or 591 [ # # ]:UBC 0 : ereport(ERROR,
592 : : (errcode(ERRCODE_CONNECTION_FAILURE),
593 : : errmsg("terminating walreceiver due to timeout")));
594 : :
595 : : /*
596 : : * If we didn't receive anything new for half of receiver
597 : : * replication timeout, then ping the server.
598 : : */
523 tmunro@postgresql.or 599 [ - + ]:CBC 10 : if (now >= wakeup[WALRCV_WAKEUP_PING])
600 : : {
523 tmunro@postgresql.or 601 :UBC 0 : requestReply = true;
444 tgl@sss.pgh.pa.us 602 : 0 : wakeup[WALRCV_WAKEUP_PING] = TIMESTAMP_INFINITY;
603 : : }
604 : :
4140 heikki.linnakangas@i 605 :CBC 10 : XLogWalRcvSendReply(requestReply, requestReply);
4087 simon@2ndQuadrant.co 606 : 10 : XLogWalRcvSendHSFeedback(false);
607 : : }
608 : : }
609 : :
610 : : /*
611 : : * The backend finished streaming. Exit streaming COPY-mode from
612 : : * our side, too.
613 : : */
2692 peter_e@gmx.net 614 : 36 : walrcv_endstreaming(wrconn, &primaryTLI);
615 : :
616 : : /*
617 : : * If the server had switched to a new timeline that we didn't
618 : : * know about when we began streaming, fetch its timeline history
619 : : * file now.
620 : : */
4104 heikki.linnakangas@i 621 : 12 : WalRcvFetchTimeLineHistoryFiles(startpointTLI, primaryTLI);
622 : : }
623 : : else
4140 heikki.linnakangas@i 624 [ # # ]:UBC 0 : ereport(LOG,
625 : : (errmsg("primary server contains no more WAL on requested timeline %u",
626 : : startpointTLI)));
627 : :
628 : : /*
629 : : * End of WAL reached on the requested timeline. Close the last
630 : : * segment, and await for new orders from the startup process.
631 : : */
4140 heikki.linnakangas@i 632 [ + + ]:CBC 12 : if (recvFile >= 0)
633 : : {
634 : : char xlogfname[MAXFNAMELEN];
635 : :
891 rhaas@postgresql.org 636 : 11 : XLogWalRcvFlush(false, startpointTLI);
1594 michael@paquier.xyz 637 : 11 : XLogFileName(xlogfname, recvFileTLI, recvSegNo, wal_segment_size);
4140 heikki.linnakangas@i 638 [ - + ]: 11 : if (close(recvFile) != 0)
4140 heikki.linnakangas@i 639 [ # # ]:UBC 0 : ereport(PANIC,
640 : : (errcode_for_file_access(),
641 : : errmsg("could not close WAL segment %s: %m",
642 : : xlogfname)));
643 : :
644 : : /*
645 : : * Create .done file forcibly to prevent the streamed segment from
646 : : * being archived later.
647 : : */
3257 heikki.linnakangas@i 648 [ + - ]:CBC 11 : if (XLogArchiveMode != ARCHIVE_MODE_ALWAYS)
649 : 11 : XLogArchiveForceDone(xlogfname);
650 : : else
953 alvherre@alvh.no-ip. 651 :UBC 0 : XLogArchiveNotify(xlogfname);
652 : : }
4140 heikki.linnakangas@i 653 :CBC 12 : recvFile = -1;
654 : :
655 [ - + ]: 12 : elog(DEBUG1, "walreceiver ended streaming and awaits new instructions");
656 : 12 : WalRcvWaitForStartPosition(&startpoint, &startpointTLI);
657 : : }
658 : : /* not reached */
659 : : }
660 : :
661 : : /*
662 : : * Wait for startup process to set receiveStart and receiveStartTLI.
663 : : */
664 : : static void
665 : 12 : WalRcvWaitForStartPosition(XLogRecPtr *startpoint, TimeLineID *startpointTLI)
666 : : {
3113 rhaas@postgresql.org 667 : 12 : WalRcvData *walrcv = WalRcv;
668 : : int state;
669 : :
4140 heikki.linnakangas@i 670 [ - + ]: 12 : SpinLockAcquire(&walrcv->mutex);
671 : 12 : state = walrcv->walRcvState;
672 [ - + ]: 12 : if (state != WALRCV_STREAMING)
673 : : {
4140 heikki.linnakangas@i 674 :UBC 0 : SpinLockRelease(&walrcv->mutex);
675 [ # # ]: 0 : if (state == WALRCV_STOPPING)
676 : 0 : proc_exit(0);
677 : : else
678 [ # # ]: 0 : elog(FATAL, "unexpected walreceiver state");
679 : : }
4140 heikki.linnakangas@i 680 :CBC 12 : walrcv->walRcvState = WALRCV_WAITING;
681 : 12 : walrcv->receiveStart = InvalidXLogRecPtr;
682 : 12 : walrcv->receiveStartTLI = 0;
683 : 12 : SpinLockRelease(&walrcv->mutex);
684 : :
1495 peter@eisentraut.org 685 : 12 : set_ps_display("idle");
686 : :
687 : : /*
688 : : * nudge startup process to notice that we've stopped streaming and are
689 : : * now waiting for instructions.
690 : : */
4140 heikki.linnakangas@i 691 : 12 : WakeupRecovery();
692 : : for (;;)
693 : : {
1249 fujii@postgresql.org 694 : 23 : ResetLatch(MyLatch);
695 : :
4140 heikki.linnakangas@i 696 : 23 : ProcessWalRcvInterrupts();
697 : :
698 [ - + ]: 11 : SpinLockAcquire(&walrcv->mutex);
699 [ + - - + : 11 : Assert(walrcv->walRcvState == WALRCV_RESTARTING ||
- - ]
700 : : walrcv->walRcvState == WALRCV_WAITING ||
701 : : walrcv->walRcvState == WALRCV_STOPPING);
702 [ - + ]: 11 : if (walrcv->walRcvState == WALRCV_RESTARTING)
703 : : {
704 : : /*
705 : : * No need to handle changes in primary_conninfo or
706 : : * primary_slot_name here. Startup process will signal us to
707 : : * terminate in case those change.
708 : : */
4140 heikki.linnakangas@i 709 :UBC 0 : *startpoint = walrcv->receiveStart;
710 : 0 : *startpointTLI = walrcv->receiveStartTLI;
711 : 0 : walrcv->walRcvState = WALRCV_STREAMING;
712 : 0 : SpinLockRelease(&walrcv->mutex);
713 : 0 : break;
714 : : }
4140 heikki.linnakangas@i 715 [ - + ]:CBC 11 : if (walrcv->walRcvState == WALRCV_STOPPING)
716 : : {
717 : : /*
718 : : * We should've received SIGTERM if the startup process wants us
719 : : * to die, but might as well check it here too.
720 : : */
4140 heikki.linnakangas@i 721 :UBC 0 : SpinLockRelease(&walrcv->mutex);
722 : 0 : exit(1);
723 : : }
4140 heikki.linnakangas@i 724 :CBC 11 : SpinLockRelease(&walrcv->mutex);
725 : :
1249 fujii@postgresql.org 726 : 11 : (void) WaitLatch(MyLatch, WL_LATCH_SET | WL_EXIT_ON_PM_DEATH, 0,
727 : : WAIT_EVENT_WAL_RECEIVER_WAIT_START);
728 : : }
729 : :
4140 heikki.linnakangas@i 730 [ # # ]:UBC 0 : if (update_process_title)
731 : : {
732 : : char activitymsg[50];
733 : :
734 : 0 : snprintf(activitymsg, sizeof(activitymsg), "restarting at %X/%X",
1146 peter@eisentraut.org 735 : 0 : LSN_FORMAT_ARGS(*startpoint));
1495 736 : 0 : set_ps_display(activitymsg);
737 : : }
4140 heikki.linnakangas@i 738 : 0 : }
739 : :
740 : : /*
741 : : * Fetch any missing timeline history files between 'first' and 'last'
742 : : * (inclusive) from the server.
743 : : */
744 : : static void
4140 heikki.linnakangas@i 745 :CBC 149 : WalRcvFetchTimeLineHistoryFiles(TimeLineID first, TimeLineID last)
746 : : {
747 : : TimeLineID tli;
748 : :
749 [ + + ]: 321 : for (tli = first; tli <= last; tli++)
750 : : {
751 : : /* there's no history file for timeline 1 */
4119 752 [ + + + + ]: 172 : if (tli != 1 && !existsTimeLineHistory(tli))
753 : : {
754 : : char *fname;
755 : : char *content;
756 : : int len;
757 : : char expectedfname[MAXFNAMELEN];
758 : :
4140 759 [ + - ]: 11 : ereport(LOG,
760 : : (errmsg("fetching timeline history file for timeline %u from primary server",
761 : : tli)));
762 : :
2692 peter_e@gmx.net 763 : 11 : walrcv_readtimelinehistoryfile(wrconn, tli, &fname, &content, &len);
764 : :
765 : : /*
766 : : * Check that the filename on the primary matches what we
767 : : * calculated ourselves. This is just a sanity check, it should
768 : : * always match.
769 : : */
4140 heikki.linnakangas@i 770 : 11 : TLHistoryFileName(expectedfname, tli);
771 [ - + ]: 11 : if (strcmp(fname, expectedfname) != 0)
4140 heikki.linnakangas@i 772 [ # # ]:UBC 0 : ereport(ERROR,
773 : : (errcode(ERRCODE_PROTOCOL_VIOLATION),
774 : : errmsg_internal("primary reported unexpected file name for timeline history file of timeline %u",
775 : : tli)));
776 : :
777 : : /*
778 : : * Write the file to pg_wal.
779 : : */
4140 heikki.linnakangas@i 780 :CBC 11 : writeTimeLineHistoryFile(tli, content, len);
781 : :
782 : : /*
783 : : * Mark the streamed history file as ready for archiving if
784 : : * archive_mode is always.
785 : : */
1293 fujii@postgresql.org 786 [ + - ]: 11 : if (XLogArchiveMode != ARCHIVE_MODE_ALWAYS)
787 : 11 : XLogArchiveForceDone(fname);
788 : : else
953 alvherre@alvh.no-ip. 789 :UBC 0 : XLogArchiveNotify(fname);
790 : :
4140 heikki.linnakangas@i 791 :CBC 11 : pfree(fname);
792 : 11 : pfree(content);
793 : : }
794 : : }
5203 795 : 149 : }
796 : :
797 : : /*
798 : : * Mark us as STOPPED in shared memory at exit.
799 : : */
800 : : static void
5191 801 : 304 : WalRcvDie(int code, Datum arg)
802 : : {
3113 rhaas@postgresql.org 803 : 304 : WalRcvData *walrcv = WalRcv;
891 804 : 304 : TimeLineID *startpointTLI_p = (TimeLineID *) DatumGetPointer(arg);
805 : :
806 [ - + ]: 304 : Assert(*startpointTLI_p != 0);
807 : :
808 : : /* Ensure that all WAL records received are flushed to disk */
809 : 304 : XLogWalRcvFlush(true, *startpointTLI_p);
810 : :
811 : : /* Mark ourselves inactive in shared memory */
5203 heikki.linnakangas@i 812 [ - + ]: 304 : SpinLockAcquire(&walrcv->mutex);
4140 813 [ + + + - : 304 : Assert(walrcv->walRcvState == WALRCV_STREAMING ||
+ - + - -
+ ]
814 : : walrcv->walRcvState == WALRCV_RESTARTING ||
815 : : walrcv->walRcvState == WALRCV_STARTING ||
816 : : walrcv->walRcvState == WALRCV_WAITING ||
817 : : walrcv->walRcvState == WALRCV_STOPPING);
818 [ - + ]: 304 : Assert(walrcv->pid == MyProcPid);
5191 819 : 304 : walrcv->walRcvState = WALRCV_STOPPED;
5203 820 : 304 : walrcv->pid = 0;
2844 alvherre@alvh.no-ip. 821 : 304 : walrcv->ready_to_display = false;
2385 tgl@sss.pgh.pa.us 822 : 304 : walrcv->latch = NULL;
5203 heikki.linnakangas@i 823 : 304 : SpinLockRelease(&walrcv->mutex);
824 : :
1129 tmunro@postgresql.or 825 : 304 : ConditionVariableBroadcast(&walrcv->walRcvStoppedCV);
826 : :
827 : : /* Terminate the connection gracefully. */
2692 peter_e@gmx.net 828 [ + + ]: 304 : if (wrconn != NULL)
829 : 128 : walrcv_disconnect(wrconn);
830 : :
831 : : /* Wake up the startup process to notice promptly that we're gone */
4140 heikki.linnakangas@i 832 : 304 : WakeupRecovery();
5203 833 : 304 : }
834 : :
835 : : /*
836 : : * Accept the message from XLOG stream, and process it.
837 : : */
838 : : static void
891 rhaas@postgresql.org 839 : 24997 : XLogWalRcvProcessMsg(unsigned char type, char *buf, Size len, TimeLineID tli)
840 : : {
841 : : int hdrlen;
842 : : XLogRecPtr dataStart;
843 : : XLogRecPtr walEnd;
844 : : TimestampTz sendTime;
845 : : bool replyRequested;
846 : :
5184 heikki.linnakangas@i 847 [ + - - ]: 24997 : switch (type)
848 : : {
5161 bruce@momjian.us 849 : 24997 : case 'w': /* WAL records */
850 : : {
851 : : StringInfoData incoming_message;
852 : :
4176 heikki.linnakangas@i 853 : 24997 : hdrlen = sizeof(int64) + sizeof(int64) + sizeof(int64);
854 [ - + ]: 24997 : if (len < hdrlen)
5161 bruce@momjian.us 855 [ # # ]:UBC 0 : ereport(ERROR,
856 : : (errcode(ERRCODE_PROTOCOL_VIOLATION),
857 : : errmsg_internal("invalid WAL message received from primary")));
858 : :
859 : : /* initialize a StringInfo with the given buffer */
159 drowley@postgresql.o 860 :GNC 24997 : initReadOnlyStringInfo(&incoming_message, buf, hdrlen);
861 : :
862 : : /* read the fields */
4176 heikki.linnakangas@i 863 :CBC 24997 : dataStart = pq_getmsgint64(&incoming_message);
864 : 24997 : walEnd = pq_getmsgint64(&incoming_message);
2607 tgl@sss.pgh.pa.us 865 : 24997 : sendTime = pq_getmsgint64(&incoming_message);
4176 heikki.linnakangas@i 866 : 24997 : ProcessWalSndrMessage(walEnd, sendTime);
867 : :
868 : 24997 : buf += hdrlen;
869 : 24997 : len -= hdrlen;
891 rhaas@postgresql.org 870 : 24997 : XLogWalRcvWrite(buf, len, dataStart, tli);
5161 bruce@momjian.us 871 : 24997 : break;
872 : : }
4488 simon@2ndQuadrant.co 873 :UBC 0 : case 'k': /* Keepalive */
874 : : {
875 : : StringInfoData incoming_message;
876 : :
4176 heikki.linnakangas@i 877 : 0 : hdrlen = sizeof(int64) + sizeof(int64) + sizeof(char);
878 [ # # ]: 0 : if (len != hdrlen)
4488 simon@2ndQuadrant.co 879 [ # # ]: 0 : ereport(ERROR,
880 : : (errcode(ERRCODE_PROTOCOL_VIOLATION),
881 : : errmsg_internal("invalid keepalive message received from primary")));
882 : :
883 : : /* initialize a StringInfo with the given buffer */
159 drowley@postgresql.o 884 :UNC 0 : initReadOnlyStringInfo(&incoming_message, buf, hdrlen);
885 : :
886 : : /* read the fields */
4176 heikki.linnakangas@i 887 :UBC 0 : walEnd = pq_getmsgint64(&incoming_message);
2607 tgl@sss.pgh.pa.us 888 : 0 : sendTime = pq_getmsgint64(&incoming_message);
4176 heikki.linnakangas@i 889 : 0 : replyRequested = pq_getmsgbyte(&incoming_message);
890 : :
891 : 0 : ProcessWalSndrMessage(walEnd, sendTime);
892 : :
893 : : /* If the primary requested a reply, send one immediately */
894 [ # # ]: 0 : if (replyRequested)
4203 895 : 0 : XLogWalRcvSendReply(true, false);
4488 simon@2ndQuadrant.co 896 : 0 : break;
897 : : }
5184 heikki.linnakangas@i 898 : 0 : default:
899 [ # # ]: 0 : ereport(ERROR,
900 : : (errcode(ERRCODE_PROTOCOL_VIOLATION),
901 : : errmsg_internal("invalid replication message type %d",
902 : : type)));
903 : : }
5184 heikki.linnakangas@i 904 :CBC 24997 : }
905 : :
906 : : /*
907 : : * Write XLOG data to disk.
908 : : */
909 : : static void
891 rhaas@postgresql.org 910 : 24997 : XLogWalRcvWrite(char *buf, Size nbytes, XLogRecPtr recptr, TimeLineID tli)
911 : : {
912 : : int startoff;
913 : : int byteswritten;
914 : :
915 [ - + ]: 24997 : Assert(tli != 0);
916 : :
5203 heikki.linnakangas@i 917 [ + + ]: 50011 : while (nbytes > 0)
918 : : {
919 : : int segbytes;
920 : :
921 : : /* Close the current segment if it's completed */
948 fujii@postgresql.org 922 [ + + + + ]: 25014 : if (recvFile >= 0 && !XLByteInSeg(recptr, recvSegNo, wal_segment_size))
891 rhaas@postgresql.org 923 : 17 : XLogWalRcvClose(recptr, tli);
924 : :
948 fujii@postgresql.org 925 [ + + ]: 25014 : if (recvFile < 0)
926 : : {
927 : : /* Create/use new log file */
2399 andres@anarazel.de 928 : 169 : XLByteToSeg(recptr, recvSegNo, wal_segment_size);
891 rhaas@postgresql.org 929 : 169 : recvFile = XLogFileInit(recvSegNo, tli);
930 : 169 : recvFileTLI = tli;
931 : : }
932 : :
933 : : /* Calculate the start offset of the received logs */
2399 andres@anarazel.de 934 : 25014 : startoff = XLogSegmentOffset(recptr, wal_segment_size);
935 : :
936 [ + + ]: 25014 : if (startoff + nbytes > wal_segment_size)
937 : 17 : segbytes = wal_segment_size - startoff;
938 : : else
5203 heikki.linnakangas@i 939 : 24997 : segbytes = nbytes;
940 : :
941 : : /* OK to write the logs */
942 : 25014 : errno = 0;
943 : :
563 tmunro@postgresql.or 944 : 25014 : byteswritten = pg_pwrite(recvFile, buf, segbytes, (off_t) startoff);
5203 heikki.linnakangas@i 945 [ - + ]: 25014 : if (byteswritten <= 0)
946 : : {
947 : : char xlogfname[MAXFNAMELEN];
948 : : int save_errno;
949 : :
950 : : /* if write didn't set errno, assume no disk space */
5203 heikki.linnakangas@i 951 [ # # ]:UBC 0 : if (errno == 0)
952 : 0 : errno = ENOSPC;
953 : :
1594 michael@paquier.xyz 954 : 0 : save_errno = errno;
955 : 0 : XLogFileName(xlogfname, recvFileTLI, recvSegNo, wal_segment_size);
956 : 0 : errno = save_errno;
5203 heikki.linnakangas@i 957 [ # # ]: 0 : ereport(PANIC,
958 : : (errcode_for_file_access(),
959 : : errmsg("could not write to WAL segment %s "
960 : : "at offset %d, length %lu: %m",
961 : : xlogfname, startoff, (unsigned long) segbytes)));
962 : : }
963 : :
964 : : /* Update state for write */
4125 alvherre@alvh.no-ip. 965 :CBC 25014 : recptr += byteswritten;
966 : :
5203 heikki.linnakangas@i 967 : 25014 : nbytes -= byteswritten;
968 : 25014 : buf += byteswritten;
969 : :
5161 bruce@momjian.us 970 : 25014 : LogstreamResult.Write = recptr;
971 : : }
972 : :
973 : : /* Update shared-memory status */
1467 tmunro@postgresql.or 974 : 24997 : pg_atomic_write_u64(&WalRcv->writtenUpto, LogstreamResult.Write);
975 : :
976 : : /*
977 : : * Close the current segment if it's fully written up in the last cycle of
978 : : * the loop, to create its archive notification file soon. Otherwise WAL
979 : : * archiving of the segment will be delayed until any data in the next
980 : : * segment is received and written.
981 : : */
948 fujii@postgresql.org 982 [ + - + + ]: 24997 : if (recvFile >= 0 && !XLByteInSeg(recptr, recvSegNo, wal_segment_size))
891 rhaas@postgresql.org 983 : 37 : XLogWalRcvClose(recptr, tli);
5203 heikki.linnakangas@i 984 : 24997 : }
985 : :
986 : : /*
987 : : * Flush the log to disk.
988 : : *
989 : : * If we're in the midst of dying, it's unwise to do anything that might throw
990 : : * an error, so we skip sending a reply in that case.
991 : : */
992 : : static void
891 rhaas@postgresql.org 993 : 22349 : XLogWalRcvFlush(bool dying, TimeLineID tli)
994 : : {
995 [ - + ]: 22349 : Assert(tli != 0);
996 : :
4125 alvherre@alvh.no-ip. 997 [ + + ]: 22349 : if (LogstreamResult.Flush < LogstreamResult.Write)
998 : : {
3113 rhaas@postgresql.org 999 : 21914 : WalRcvData *walrcv = WalRcv;
1000 : :
891 1001 : 21914 : issue_xlog_fsync(recvFile, recvSegNo, tli);
1002 : :
5203 heikki.linnakangas@i 1003 : 21914 : LogstreamResult.Flush = LogstreamResult.Write;
1004 : :
1005 : : /* Update shared-memory status */
1006 [ - + ]: 21914 : SpinLockAcquire(&walrcv->mutex);
1467 tmunro@postgresql.or 1007 [ + - ]: 21914 : if (walrcv->flushedUpto < LogstreamResult.Flush)
1008 : : {
1009 : 21914 : walrcv->latestChunkStart = walrcv->flushedUpto;
1010 : 21914 : walrcv->flushedUpto = LogstreamResult.Flush;
891 rhaas@postgresql.org 1011 : 21914 : walrcv->receivedTLI = tli;
1012 : : }
5203 heikki.linnakangas@i 1013 : 21914 : SpinLockRelease(&walrcv->mutex);
1014 : :
1015 : : /* Signal the startup process and walsender that new WAL has arrived */
4960 1016 : 21914 : WakeupRecovery();
4653 simon@2ndQuadrant.co 1017 [ + - + - ]: 21914 : if (AllowCascadeReplication())
372 andres@anarazel.de 1018 : 21914 : WalSndWakeup(true, false);
1019 : :
1020 : : /* Report XLOG streaming progress in PS display */
5060 tgl@sss.pgh.pa.us 1021 [ + - ]: 21914 : if (update_process_title)
1022 : : {
1023 : : char activitymsg[50];
1024 : :
1025 : 21914 : snprintf(activitymsg, sizeof(activitymsg), "streaming %X/%X",
1146 peter@eisentraut.org 1026 : 21914 : LSN_FORMAT_ARGS(LogstreamResult.Write));
1495 1027 : 21914 : set_ps_display(activitymsg);
1028 : : }
1029 : :
1030 : : /* Also let the primary know that we made some progress */
4806 rhaas@postgresql.org 1031 [ + - ]: 21914 : if (!dying)
1032 : : {
4203 heikki.linnakangas@i 1033 : 21914 : XLogWalRcvSendReply(false, false);
3741 1034 : 21914 : XLogWalRcvSendHSFeedback(false);
1035 : : }
1036 : : }
5203 1037 : 22349 : }
1038 : :
1039 : : /*
1040 : : * Close the current segment.
1041 : : *
1042 : : * Flush the segment to disk before closing it. Otherwise we have to
1043 : : * reopen and fsync it later.
1044 : : *
1045 : : * Create an archive notification file since the segment is known completed.
1046 : : */
1047 : : static void
891 rhaas@postgresql.org 1048 : 54 : XLogWalRcvClose(XLogRecPtr recptr, TimeLineID tli)
1049 : : {
1050 : : char xlogfname[MAXFNAMELEN];
1051 : :
948 fujii@postgresql.org 1052 [ + - - + ]: 54 : Assert(recvFile >= 0 && !XLByteInSeg(recptr, recvSegNo, wal_segment_size));
891 rhaas@postgresql.org 1053 [ - + ]: 54 : Assert(tli != 0);
1054 : :
1055 : : /*
1056 : : * fsync() and close current file before we switch to next one. We would
1057 : : * otherwise have to reopen this file to fsync it later
1058 : : */
1059 : 54 : XLogWalRcvFlush(false, tli);
1060 : :
948 fujii@postgresql.org 1061 : 54 : XLogFileName(xlogfname, recvFileTLI, recvSegNo, wal_segment_size);
1062 : :
1063 : : /*
1064 : : * XLOG segment files will be re-read by recovery in startup process soon,
1065 : : * so we don't advise the OS to release cache pages associated with the
1066 : : * file like XLogFileClose() does.
1067 : : */
1068 [ - + ]: 54 : if (close(recvFile) != 0)
948 fujii@postgresql.org 1069 [ # # ]:UBC 0 : ereport(PANIC,
1070 : : (errcode_for_file_access(),
1071 : : errmsg("could not close WAL segment %s: %m",
1072 : : xlogfname)));
1073 : :
1074 : : /*
1075 : : * Create .done file forcibly to prevent the streamed segment from being
1076 : : * archived later.
1077 : : */
948 fujii@postgresql.org 1078 [ + - ]:CBC 54 : if (XLogArchiveMode != ARCHIVE_MODE_ALWAYS)
1079 : 54 : XLogArchiveForceDone(xlogfname);
1080 : : else
948 fujii@postgresql.org 1081 :UBC 0 : XLogArchiveNotify(xlogfname);
1082 : :
948 fujii@postgresql.org 1083 :CBC 54 : recvFile = -1;
1084 : 54 : }
1085 : :
1086 : : /*
1087 : : * Send reply message to primary, indicating our current WAL locations, oldest
1088 : : * xmin and the current time.
1089 : : *
1090 : : * If 'force' is not set, the message is only sent if enough time has
1091 : : * passed since last status update to reach wal_receiver_status_interval.
1092 : : * If wal_receiver_status_interval is disabled altogether and 'force' is
1093 : : * false, this is a no-op.
1094 : : *
1095 : : * If 'requestReply' is true, requests the server to reply immediately upon
1096 : : * receiving this message. This is used for heartbeats, when approaching
1097 : : * wal_receiver_timeout.
1098 : : */
1099 : : static void
4203 heikki.linnakangas@i 1100 : 54947 : XLogWalRcvSendReply(bool force, bool requestReply)
1101 : : {
1102 : : static XLogRecPtr writePtr = 0;
1103 : : static XLogRecPtr flushPtr = 0;
1104 : : XLogRecPtr applyPtr;
1105 : : TimestampTz now;
1106 : :
1107 : : /*
1108 : : * If the user doesn't want status to be reported to the primary, be sure
1109 : : * to exit before doing anything at all.
1110 : : */
1111 [ + + - + ]: 54947 : if (!force && wal_receiver_status_interval <= 0)
4812 heikki.linnakangas@i 1112 :UBC 0 : return;
1113 : :
1114 : : /* Get current timestamp. */
4812 heikki.linnakangas@i 1115 :CBC 54947 : now = GetCurrentTimestamp();
1116 : :
1117 : : /*
1118 : : * We can compare the write and flush positions to the last message we
1119 : : * sent without taking any lock, but the apply position requires a spin
1120 : : * lock, so we don't check that unless something else has changed or 10
1121 : : * seconds have passed. This means that the apply WAL location will
1122 : : * appear, from the primary's point of view, to lag slightly, but since
1123 : : * this is only for reporting purposes and only on idle systems, that's
1124 : : * probably OK.
1125 : : */
4203 1126 [ + + ]: 54947 : if (!force
4125 alvherre@alvh.no-ip. 1127 [ + + ]: 43904 : && writePtr == LogstreamResult.Write
1128 [ + + ]: 21932 : && flushPtr == LogstreamResult.Flush
523 tmunro@postgresql.or 1129 [ + + ]: 72 : && now < wakeup[WALRCV_WAKEUP_REPLY])
4812 heikki.linnakangas@i 1130 : 69 : return;
1131 : :
1132 : : /* Make sure we wake up when it's time to send another reply. */
523 tmunro@postgresql.or 1133 : 54878 : WalRcvComputeNextWakeup(WALRCV_WAKEUP_REPLY, now);
1134 : :
1135 : : /* Construct a new message */
4176 heikki.linnakangas@i 1136 : 54878 : writePtr = LogstreamResult.Write;
1137 : 54878 : flushPtr = LogstreamResult.Flush;
4133 1138 : 54878 : applyPtr = GetXLogReplayRecPtr(NULL);
1139 : :
4176 1140 : 54878 : resetStringInfo(&reply_message);
1141 : 54878 : pq_sendbyte(&reply_message, 'r');
1142 : 54878 : pq_sendint64(&reply_message, writePtr);
1143 : 54878 : pq_sendint64(&reply_message, flushPtr);
1144 : 54878 : pq_sendint64(&reply_message, applyPtr);
2607 tgl@sss.pgh.pa.us 1145 : 54878 : pq_sendint64(&reply_message, GetCurrentTimestamp());
4176 heikki.linnakangas@i 1146 : 54878 : pq_sendbyte(&reply_message, requestReply ? 1 : 0);
1147 : :
1148 : : /* Send it */
1149 [ + + - + ]: 54878 : elog(DEBUG2, "sending write %X/%X flush %X/%X apply %X/%X%s",
1150 : : LSN_FORMAT_ARGS(writePtr),
1151 : : LSN_FORMAT_ARGS(flushPtr),
1152 : : LSN_FORMAT_ARGS(applyPtr),
1153 : : requestReply ? " (reply requested)" : "");
1154 : :
2692 peter_e@gmx.net 1155 : 54878 : walrcv_send(wrconn, reply_message.data, reply_message.len);
1156 : : }
1157 : :
1158 : : /*
1159 : : * Send hot standby feedback message to primary, plus the current time,
1160 : : * in case they don't have a watch.
1161 : : *
1162 : : * If the user disables feedback, send one final message to tell sender
1163 : : * to forget about the xmin on this standby. We also send this message
1164 : : * on first connect because a previous connection might have set xmin
1165 : : * on a replication slot. (If we're not using a slot it's harmless to
1166 : : * send a feedback message explicitly setting InvalidTransactionId).
1167 : : */
1168 : : static void
4087 simon@2ndQuadrant.co 1169 : 22084 : XLogWalRcvSendHSFeedback(bool immed)
1170 : : {
1171 : : TimestampTz now;
1172 : : FullTransactionId nextFullXid;
1173 : : TransactionId nextXid;
1174 : : uint32 xmin_epoch,
1175 : : catalog_xmin_epoch;
1176 : : TransactionId xmin,
1177 : : catalog_xmin;
1178 : :
1179 : : /* initially true so we always send at least one feedback message */
1180 : : static bool primary_has_standby_xmin = true;
1181 : :
1182 : : /*
1183 : : * If the user doesn't want status to be reported to the primary, be sure
1184 : : * to exit before doing anything at all.
1185 : : */
1186 [ + - + + ]: 22084 : if ((wal_receiver_status_interval <= 0 || !hot_standby_feedback) &&
1400 andres@anarazel.de 1187 [ + + ]: 21785 : !primary_has_standby_xmin)
4804 simon@2ndQuadrant.co 1188 : 21924 : return;
1189 : :
1190 : : /* Get current timestamp. */
1191 : 430 : now = GetCurrentTimestamp();
1192 : :
1193 : : /* Send feedback at most once per wal_receiver_status_interval. */
523 tmunro@postgresql.or 1194 [ + + + + ]: 430 : if (!immed && now < wakeup[WALRCV_WAKEUP_HSFEEDBACK])
1195 : 268 : return;
1196 : :
1197 : : /* Make sure we wake up when it's time to send feedback again. */
1198 : 162 : WalRcvComputeNextWakeup(WALRCV_WAKEUP_HSFEEDBACK, now);
1199 : :
1200 : : /*
1201 : : * If Hot Standby is not yet accepting connections there is nothing to
1202 : : * send. Check this after the interval has expired to reduce number of
1203 : : * calls.
1204 : : *
1205 : : * Bailing out here also ensures that we don't send feedback until we've
1206 : : * read our own replication slot state, so we don't tell the primary to
1207 : : * discard needed xmin or catalog_xmin from any slots that may exist on
1208 : : * this replica.
1209 : : */
4804 simon@2ndQuadrant.co 1210 [ + + ]: 162 : if (!HotStandbyActive())
1211 : 2 : return;
1212 : :
1213 : : /*
1214 : : * Make the expensive call to get the oldest xmin once we are certain
1215 : : * everything else has been checked.
1216 : : */
4087 1217 [ + + ]: 160 : if (hot_standby_feedback)
1218 : : {
1341 andres@anarazel.de 1219 : 34 : GetReplicationHorizons(&xmin, &catalog_xmin);
1220 : : }
1221 : : else
1222 : : {
4087 simon@2ndQuadrant.co 1223 : 126 : xmin = InvalidTransactionId;
2577 1224 : 126 : catalog_xmin = InvalidTransactionId;
1225 : : }
1226 : :
1227 : : /*
1228 : : * Get epoch and adjust if nextXid and oldestXmin are different sides of
1229 : : * the epoch boundary.
1230 : : */
1844 tmunro@postgresql.or 1231 : 160 : nextFullXid = ReadNextFullTransactionId();
1232 : 160 : nextXid = XidFromFullTransactionId(nextFullXid);
1233 : 160 : xmin_epoch = EpochFromFullTransactionId(nextFullXid);
2577 simon@2ndQuadrant.co 1234 : 160 : catalog_xmin_epoch = xmin_epoch;
4804 1235 [ - + ]: 160 : if (nextXid < xmin)
2524 bruce@momjian.us 1236 :UBC 0 : xmin_epoch--;
2577 simon@2ndQuadrant.co 1237 [ - + ]:CBC 160 : if (nextXid < catalog_xmin)
2524 bruce@momjian.us 1238 :UBC 0 : catalog_xmin_epoch--;
1239 : :
2577 simon@2ndQuadrant.co 1240 [ + + ]:CBC 160 : elog(DEBUG2, "sending hot standby feedback xmin %u epoch %u catalog_xmin %u catalog_xmin_epoch %u",
1241 : : xmin, xmin_epoch, catalog_xmin, catalog_xmin_epoch);
1242 : :
1243 : : /* Construct the message and send it. */
4176 heikki.linnakangas@i 1244 : 160 : resetStringInfo(&reply_message);
1245 : 160 : pq_sendbyte(&reply_message, 'h');
2607 tgl@sss.pgh.pa.us 1246 : 160 : pq_sendint64(&reply_message, GetCurrentTimestamp());
2377 andres@anarazel.de 1247 : 160 : pq_sendint32(&reply_message, xmin);
1248 : 160 : pq_sendint32(&reply_message, xmin_epoch);
1249 : 160 : pq_sendint32(&reply_message, catalog_xmin);
1250 : 160 : pq_sendint32(&reply_message, catalog_xmin_epoch);
2692 peter_e@gmx.net 1251 : 160 : walrcv_send(wrconn, reply_message.data, reply_message.len);
2577 simon@2ndQuadrant.co 1252 [ + + - + ]: 160 : if (TransactionIdIsValid(xmin) || TransactionIdIsValid(catalog_xmin))
1400 andres@anarazel.de 1253 : 34 : primary_has_standby_xmin = true;
1254 : : else
1255 : 126 : primary_has_standby_xmin = false;
1256 : : }
1257 : :
1258 : : /*
1259 : : * Update shared memory status upon receiving a message from primary.
1260 : : *
1261 : : * 'walEnd' and 'sendTime' are the end-of-WAL and timestamp of the latest
1262 : : * message, reported by primary.
1263 : : */
1264 : : static void
4488 simon@2ndQuadrant.co 1265 : 24997 : ProcessWalSndrMessage(XLogRecPtr walEnd, TimestampTz sendTime)
1266 : : {
3113 rhaas@postgresql.org 1267 : 24997 : WalRcvData *walrcv = WalRcv;
4488 simon@2ndQuadrant.co 1268 : 24997 : TimestampTz lastMsgReceiptTime = GetCurrentTimestamp();
1269 : :
1270 : : /* Update shared-memory status */
1271 [ + + ]: 24997 : SpinLockAcquire(&walrcv->mutex);
4125 alvherre@alvh.no-ip. 1272 [ + + ]: 24997 : if (walrcv->latestWalEnd < walEnd)
4266 simon@2ndQuadrant.co 1273 : 21784 : walrcv->latestWalEndTime = sendTime;
1274 : 24997 : walrcv->latestWalEnd = walEnd;
4488 1275 : 24997 : walrcv->lastMsgSendTime = sendTime;
1276 : 24997 : walrcv->lastMsgReceiptTime = lastMsgReceiptTime;
1277 : 24997 : SpinLockRelease(&walrcv->mutex);
1278 : :
1238 tgl@sss.pgh.pa.us 1279 [ + + ]: 24997 : if (message_level_is_interesting(DEBUG2))
1280 : : {
1281 : : char *sendtime;
1282 : : char *receipttime;
1283 : : int applyDelay;
1284 : :
1285 : : /* Copy because timestamptz_to_str returns a static buffer */
3663 1286 : 408 : sendtime = pstrdup(timestamptz_to_str(sendTime));
1287 : 408 : receipttime = pstrdup(timestamptz_to_str(lastMsgReceiptTime));
3319 ishii@postgresql.org 1288 : 408 : applyDelay = GetReplicationApplyDelay();
1289 : :
1290 : : /* apply delay is not available */
1291 [ + + ]: 408 : if (applyDelay == -1)
1292 [ + - ]: 1 : elog(DEBUG2, "sendtime %s receipttime %s replication apply delay (N/A) transfer latency %d ms",
1293 : : sendtime,
1294 : : receipttime,
1295 : : GetReplicationTransferLatency());
1296 : : else
1297 [ + - ]: 407 : elog(DEBUG2, "sendtime %s receipttime %s replication apply delay %d ms transfer latency %d ms",
1298 : : sendtime,
1299 : : receipttime,
1300 : : applyDelay,
1301 : : GetReplicationTransferLatency());
1302 : :
3663 tgl@sss.pgh.pa.us 1303 : 408 : pfree(sendtime);
1304 : 408 : pfree(receipttime);
1305 : : }
4488 simon@2ndQuadrant.co 1306 : 24997 : }
1307 : :
1308 : : /*
1309 : : * Compute the next wakeup time for a given wakeup reason. Can be called to
1310 : : * initialize a wakeup time, to adjust it for the next wakeup, or to
1311 : : * reinitialize it when GUCs have changed. We ask the caller to pass in the
1312 : : * value of "now" because this frequently avoids multiple calls of
1313 : : * GetCurrentTimestamp(). It had better be a reasonably up-to-date value
1314 : : * though.
1315 : : */
1316 : : static void
523 tmunro@postgresql.or 1317 : 105674 : WalRcvComputeNextWakeup(WalRcvWakeupReason reason, TimestampTz now)
1318 : : {
1319 [ + + + + : 105674 : switch (reason)
- ]
1320 : : {
1321 : 25157 : case WALRCV_WAKEUP_TERMINATE:
1322 [ - + ]: 25157 : if (wal_receiver_timeout <= 0)
444 tgl@sss.pgh.pa.us 1323 :UBC 0 : wakeup[reason] = TIMESTAMP_INFINITY;
1324 : : else
444 tgl@sss.pgh.pa.us 1325 :CBC 25157 : wakeup[reason] = TimestampTzPlusMilliseconds(now, wal_receiver_timeout);
523 tmunro@postgresql.or 1326 : 25157 : break;
1327 : 25157 : case WALRCV_WAKEUP_PING:
1328 [ - + ]: 25157 : if (wal_receiver_timeout <= 0)
444 tgl@sss.pgh.pa.us 1329 :UBC 0 : wakeup[reason] = TIMESTAMP_INFINITY;
1330 : : else
444 tgl@sss.pgh.pa.us 1331 :CBC 25157 : wakeup[reason] = TimestampTzPlusMilliseconds(now, wal_receiver_timeout / 2);
523 tmunro@postgresql.or 1332 : 25157 : break;
1333 : 322 : case WALRCV_WAKEUP_HSFEEDBACK:
1334 [ + + - + ]: 322 : if (!hot_standby_feedback || wal_receiver_status_interval <= 0)
444 tgl@sss.pgh.pa.us 1335 : 261 : wakeup[reason] = TIMESTAMP_INFINITY;
1336 : : else
1337 : 61 : wakeup[reason] = TimestampTzPlusSeconds(now, wal_receiver_status_interval);
523 tmunro@postgresql.or 1338 : 322 : break;
1339 : 55038 : case WALRCV_WAKEUP_REPLY:
1340 [ - + ]: 55038 : if (wal_receiver_status_interval <= 0)
444 tgl@sss.pgh.pa.us 1341 :UBC 0 : wakeup[reason] = TIMESTAMP_INFINITY;
1342 : : else
444 tgl@sss.pgh.pa.us 1343 :CBC 55038 : wakeup[reason] = TimestampTzPlusSeconds(now, wal_receiver_status_interval);
523 tmunro@postgresql.or 1344 : 55038 : break;
1345 : : /* there's intentionally no default: here */
1346 : : }
1347 : 105674 : }
1348 : :
1349 : : /*
1350 : : * Wake up the walreceiver main loop.
1351 : : *
1352 : : * This is called by the startup process whenever interesting xlog records
1353 : : * are applied, so that walreceiver can check if it needs to send an apply
1354 : : * notification back to the primary which may be waiting in a COMMIT with
1355 : : * synchronous_commit = remote_apply.
1356 : : */
1357 : : void
2938 rhaas@postgresql.org 1358 : 11192 : WalRcvForceReply(void)
1359 : : {
1360 : : Latch *latch;
1361 : :
1362 : 11192 : WalRcv->force_reply = true;
1363 : : /* fetching the latch pointer might not be atomic, so use spinlock */
2385 tgl@sss.pgh.pa.us 1364 [ + + ]: 11192 : SpinLockAcquire(&WalRcv->mutex);
1365 : 11192 : latch = WalRcv->latch;
1366 : 11192 : SpinLockRelease(&WalRcv->mutex);
1367 [ + + ]: 11192 : if (latch)
1368 : 10917 : SetLatch(latch);
2938 rhaas@postgresql.org 1369 : 11192 : }
1370 : :
1371 : : /*
1372 : : * Return a string constant representing the state. This is used
1373 : : * in system functions and views, and should *not* be translated.
1374 : : */
1375 : : static const char *
3020 alvherre@alvh.no-ip. 1376 :GBC 12 : WalRcvGetStateString(WalRcvState state)
1377 : : {
1378 [ - - + - : 12 : switch (state)
- - - ]
1379 : : {
3020 alvherre@alvh.no-ip. 1380 :UBC 0 : case WALRCV_STOPPED:
1381 : 0 : return "stopped";
1382 : 0 : case WALRCV_STARTING:
1383 : 0 : return "starting";
3020 alvherre@alvh.no-ip. 1384 :GBC 12 : case WALRCV_STREAMING:
1385 : 12 : return "streaming";
3020 alvherre@alvh.no-ip. 1386 :UBC 0 : case WALRCV_WAITING:
1387 : 0 : return "waiting";
1388 : 0 : case WALRCV_RESTARTING:
1389 : 0 : return "restarting";
1390 : 0 : case WALRCV_STOPPING:
1391 : 0 : return "stopping";
1392 : : }
1393 : 0 : return "UNKNOWN";
1394 : : }
1395 : :
1396 : : /*
1397 : : * Returns activity of WAL receiver, including pid, state and xlog locations
1398 : : * received from the WAL sender of another server.
1399 : : */
1400 : : Datum
3020 alvherre@alvh.no-ip. 1401 :CBC 15 : pg_stat_get_wal_receiver(PG_FUNCTION_ARGS)
1402 : : {
1403 : : TupleDesc tupdesc;
1404 : : Datum *values;
1405 : : bool *nulls;
1406 : : int pid;
1407 : : bool ready_to_display;
1408 : : WalRcvState state;
1409 : : XLogRecPtr receive_start_lsn;
1410 : : TimeLineID receive_start_tli;
1411 : : XLogRecPtr written_lsn;
1412 : : XLogRecPtr flushed_lsn;
1413 : : TimeLineID received_tli;
1414 : : TimestampTz last_send_time;
1415 : : TimestampTz last_receipt_time;
1416 : : XLogRecPtr latest_end_lsn;
1417 : : TimestampTz latest_end_time;
1418 : : char sender_host[NI_MAXHOST];
2206 fujii@postgresql.org 1419 : 15 : int sender_port = 0;
1420 : : char slotname[NAMEDATALEN];
1421 : : char conninfo[MAXCONNINFO];
1422 : :
1423 : : /* Take a lock to ensure value consistency */
2480 alvherre@alvh.no-ip. 1424 [ - + ]: 15 : SpinLockAcquire(&WalRcv->mutex);
1425 : 15 : pid = (int) WalRcv->pid;
1426 : 15 : ready_to_display = WalRcv->ready_to_display;
1427 : 15 : state = WalRcv->walRcvState;
1428 : 15 : receive_start_lsn = WalRcv->receiveStart;
1429 : 15 : receive_start_tli = WalRcv->receiveStartTLI;
1428 michael@paquier.xyz 1430 : 15 : flushed_lsn = WalRcv->flushedUpto;
2480 alvherre@alvh.no-ip. 1431 : 15 : received_tli = WalRcv->receivedTLI;
1432 : 15 : last_send_time = WalRcv->lastMsgSendTime;
1433 : 15 : last_receipt_time = WalRcv->lastMsgReceiptTime;
1434 : 15 : latest_end_lsn = WalRcv->latestWalEnd;
1435 : 15 : latest_end_time = WalRcv->latestWalEndTime;
2385 1436 : 15 : strlcpy(slotname, (char *) WalRcv->slotname, sizeof(slotname));
2206 fujii@postgresql.org 1437 : 15 : strlcpy(sender_host, (char *) WalRcv->sender_host, sizeof(sender_host));
1438 : 15 : sender_port = WalRcv->sender_port;
2385 alvherre@alvh.no-ip. 1439 : 15 : strlcpy(conninfo, (char *) WalRcv->conninfo, sizeof(conninfo));
2480 1440 : 15 : SpinLockRelease(&WalRcv->mutex);
1441 : :
1442 : : /*
1443 : : * No WAL receiver (or not ready yet), just return a tuple with NULL
1444 : : * values
1445 : : */
1446 [ + + - + ]: 15 : if (pid == 0 || !ready_to_display)
2844 1447 : 3 : PG_RETURN_NULL();
1448 : :
1449 : : /*
1450 : : * Read "writtenUpto" without holding a spinlock. Note that it may not be
1451 : : * consistent with the other shared variables of the WAL receiver
1452 : : * protected by a spinlock, but this should not be used for data integrity
1453 : : * checks.
1454 : : */
1151 fujii@postgresql.org 1455 :GBC 12 : written_lsn = pg_atomic_read_u64(&WalRcv->writtenUpto);
1456 : :
1457 : : /* determine result type */
2846 alvherre@alvh.no-ip. 1458 [ - + ]: 12 : if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE)
2846 alvherre@alvh.no-ip. 1459 [ # # ]:UBC 0 : elog(ERROR, "return type must be a row type");
1460 : :
2846 alvherre@alvh.no-ip. 1461 :GBC 12 : values = palloc0(sizeof(Datum) * tupdesc->natts);
1462 : 12 : nulls = palloc0(sizeof(bool) * tupdesc->natts);
1463 : :
1464 : : /* Fetch values */
2480 1465 : 12 : values[0] = Int32GetDatum(pid);
1466 : :
748 mail@joeconway.com 1467 [ - + ]: 12 : if (!has_privs_of_role(GetUserId(), ROLE_PG_READ_ALL_STATS))
1468 : : {
1469 : : /*
1470 : : * Only superusers and roles with privileges of pg_read_all_stats can
1471 : : * see details. Other users only get the pid value to know whether it
1472 : : * is a WAL receiver, but no details.
1473 : : */
653 peter@eisentraut.org 1474 :UBC 0 : memset(&nulls[1], true, sizeof(bool) * (tupdesc->natts - 1));
1475 : : }
1476 : : else
1477 : : {
3020 alvherre@alvh.no-ip. 1478 :GBC 12 : values[1] = CStringGetTextDatum(WalRcvGetStateString(state));
1479 : :
1480 [ - + ]: 12 : if (XLogRecPtrIsInvalid(receive_start_lsn))
3020 alvherre@alvh.no-ip. 1481 :UBC 0 : nulls[2] = true;
1482 : : else
3020 alvherre@alvh.no-ip. 1483 :GBC 12 : values[2] = LSNGetDatum(receive_start_lsn);
1484 : 12 : values[3] = Int32GetDatum(receive_start_tli);
1428 michael@paquier.xyz 1485 [ - + ]: 12 : if (XLogRecPtrIsInvalid(written_lsn))
3020 alvherre@alvh.no-ip. 1486 :UBC 0 : nulls[4] = true;
1487 : : else
1428 michael@paquier.xyz 1488 :GBC 12 : values[4] = LSNGetDatum(written_lsn);
1489 [ - + ]: 12 : if (XLogRecPtrIsInvalid(flushed_lsn))
1428 michael@paquier.xyz 1490 :UBC 0 : nulls[5] = true;
1491 : : else
1428 michael@paquier.xyz 1492 :GBC 12 : values[5] = LSNGetDatum(flushed_lsn);
1493 : 12 : values[6] = Int32GetDatum(received_tli);
3020 alvherre@alvh.no-ip. 1494 [ - + ]: 12 : if (last_send_time == 0)
1428 michael@paquier.xyz 1495 :UBC 0 : nulls[7] = true;
1496 : : else
1428 michael@paquier.xyz 1497 :GBC 12 : values[7] = TimestampTzGetDatum(last_send_time);
3020 alvherre@alvh.no-ip. 1498 [ - + ]: 12 : if (last_receipt_time == 0)
1428 michael@paquier.xyz 1499 :UBC 0 : nulls[8] = true;
1500 : : else
1428 michael@paquier.xyz 1501 :GBC 12 : values[8] = TimestampTzGetDatum(last_receipt_time);
3020 alvherre@alvh.no-ip. 1502 [ - + ]: 12 : if (XLogRecPtrIsInvalid(latest_end_lsn))
1428 michael@paquier.xyz 1503 :UBC 0 : nulls[9] = true;
1504 : : else
1428 michael@paquier.xyz 1505 :GBC 12 : values[9] = LSNGetDatum(latest_end_lsn);
3020 alvherre@alvh.no-ip. 1506 [ - + ]: 12 : if (latest_end_time == 0)
1428 michael@paquier.xyz 1507 :UBC 0 : nulls[10] = true;
1508 : : else
1428 michael@paquier.xyz 1509 :GBC 12 : values[10] = TimestampTzGetDatum(latest_end_time);
3020 alvherre@alvh.no-ip. 1510 [ - + ]: 12 : if (*slotname == '\0')
1428 michael@paquier.xyz 1511 :UBC 0 : nulls[11] = true;
1512 : : else
1428 michael@paquier.xyz 1513 :GBC 12 : values[11] = CStringGetTextDatum(slotname);
2206 fujii@postgresql.org 1514 [ - + ]: 12 : if (*sender_host == '\0')
1428 michael@paquier.xyz 1515 :UBC 0 : nulls[12] = true;
1516 : : else
1428 michael@paquier.xyz 1517 :GBC 12 : values[12] = CStringGetTextDatum(sender_host);
2206 fujii@postgresql.org 1518 [ - + ]: 12 : if (sender_port == 0)
1428 michael@paquier.xyz 1519 :UBC 0 : nulls[13] = true;
1520 : : else
1428 michael@paquier.xyz 1521 :GBC 12 : values[13] = Int32GetDatum(sender_port);
2206 fujii@postgresql.org 1522 [ - + ]: 12 : if (*conninfo == '\0')
1428 michael@paquier.xyz 1523 :UBC 0 : nulls[14] = true;
1524 : : else
1428 michael@paquier.xyz 1525 :GBC 12 : values[14] = CStringGetTextDatum(conninfo);
1526 : : }
1527 : :
1528 : : /* Returns the record as Datum */
2480 alvherre@alvh.no-ip. 1529 : 12 : PG_RETURN_DATUM(HeapTupleGetDatum(heap_form_tuple(tupdesc, values, nulls)));
1530 : : }
|