Age Owner TLA Line data Source code
1 : /*-------------------------------------------------------------------------
2 : *
3 : * walreceiverfuncs.c
4 : *
5 : * This file contains functions used by the startup process to communicate
6 : * with the walreceiver process. Functions implementing walreceiver itself
7 : * are in walreceiver.c.
8 : *
9 : * Portions Copyright (c) 2010-2023, PostgreSQL Global Development Group
10 : *
11 : *
12 : * IDENTIFICATION
13 : * src/backend/replication/walreceiverfuncs.c
14 : *
15 : *-------------------------------------------------------------------------
16 : */
17 : #include "postgres.h"
18 :
19 : #include <sys/stat.h>
20 : #include <sys/time.h>
21 : #include <time.h>
22 : #include <unistd.h>
23 : #include <signal.h>
24 :
25 : #include "access/xlog_internal.h"
26 : #include "access/xlogrecovery.h"
27 : #include "pgstat.h"
28 : #include "postmaster/startup.h"
29 : #include "replication/walreceiver.h"
30 : #include "storage/pmsignal.h"
31 : #include "storage/shmem.h"
32 : #include "utils/timestamp.h"
33 :
34 : WalRcvData *WalRcv = NULL;
35 :
36 : /*
37 : * How long to wait for walreceiver to start up after requesting
38 : * postmaster to launch it. In seconds.
39 : */
40 : #define WALRCV_STARTUP_TIMEOUT 10
41 :
42 : /* Report shared memory space needed by WalRcvShmemInit */
43 : Size
4832 heikki.linnakangas 44 CBC 6390 : WalRcvShmemSize(void)
45 : {
4790 bruce 46 6390 : Size size = 0;
47 :
4832 heikki.linnakangas 48 6390 : size = add_size(size, sizeof(WalRcvData));
49 :
50 6390 : return size;
51 : }
52 :
53 : /* Allocate and initialize walreceiver-related shared memory */
54 : void
55 1826 : WalRcvShmemInit(void)
56 : {
57 : bool found;
58 :
59 1826 : WalRcv = (WalRcvData *)
60 1826 : ShmemInitStruct("Wal Receiver Ctl", WalRcvShmemSize(), &found);
61 :
4729 tgl 62 1826 : if (!found)
63 : {
64 : /* First time through, so initialize */
65 1826 : MemSet(WalRcv, 0, WalRcvShmemSize());
66 1826 : WalRcv->walRcvState = WALRCV_STOPPED;
758 tmunro 67 1826 : ConditionVariableInit(&WalRcv->walRcvStoppedCV);
4729 tgl 68 1826 : SpinLockInit(&WalRcv->mutex);
780 fujii 69 1826 : pg_atomic_init_u64(&WalRcv->writtenUpto, 0);
2321 peter_e 70 1826 : WalRcv->latch = NULL;
71 : }
4832 heikki.linnakangas 72 1826 : }
73 :
74 : /* Is walreceiver running (or starting up)? */
75 : bool
3769 76 1309 : WalRcvRunning(void)
77 : {
2742 rhaas 78 1309 : WalRcvData *walrcv = WalRcv;
79 : WalRcvState state;
80 : pg_time_t startTime;
81 :
4832 heikki.linnakangas 82 1309 : SpinLockAcquire(&walrcv->mutex);
83 :
4820 84 1309 : state = walrcv->walRcvState;
85 1309 : startTime = walrcv->startTime;
86 :
87 1309 : SpinLockRelease(&walrcv->mutex);
88 :
89 : /*
90 : * If it has taken too long for walreceiver to start up, give up. Setting
91 : * the state to STOPPED ensures that if walreceiver later does start up
92 : * after all, it will see that it's not supposed to be running and die
93 : * without doing anything.
94 : */
95 1309 : if (state == WALRCV_STARTING)
96 : {
4790 bruce 97 UBC 0 : pg_time_t now = (pg_time_t) time(NULL);
98 :
4820 heikki.linnakangas 99 0 : if ((now - startTime) > WALRCV_STARTUP_TIMEOUT)
100 : {
758 tmunro 101 0 : bool stopped = false;
102 :
103 0 : SpinLockAcquire(&walrcv->mutex);
4820 heikki.linnakangas 104 0 : if (walrcv->walRcvState == WALRCV_STARTING)
105 : {
106 0 : state = walrcv->walRcvState = WALRCV_STOPPED;
758 tmunro 107 0 : stopped = true;
108 : }
4820 heikki.linnakangas 109 0 : SpinLockRelease(&walrcv->mutex);
110 :
758 tmunro 111 0 : if (stopped)
112 0 : ConditionVariableBroadcast(&walrcv->walRcvStoppedCV);
113 : }
114 : }
115 :
4820 heikki.linnakangas 116 CBC 1309 : if (state != WALRCV_STOPPED)
117 26 : return true;
118 : else
119 1283 : return false;
120 : }
121 :
122 : /*
123 : * Is walreceiver running and streaming (or at least attempting to connect,
124 : * or starting up)?
125 : */
126 : bool
3769 127 29631 : WalRcvStreaming(void)
128 : {
2742 rhaas 129 29631 : WalRcvData *walrcv = WalRcv;
130 : WalRcvState state;
131 : pg_time_t startTime;
132 :
3769 heikki.linnakangas 133 29631 : SpinLockAcquire(&walrcv->mutex);
134 :
135 29631 : state = walrcv->walRcvState;
136 29631 : startTime = walrcv->startTime;
137 :
138 29631 : SpinLockRelease(&walrcv->mutex);
139 :
140 : /*
141 : * If it has taken too long for walreceiver to start up, give up. Setting
142 : * the state to STOPPED ensures that if walreceiver later does start up
143 : * after all, it will see that it's not supposed to be running and die
144 : * without doing anything.
145 : */
146 29631 : if (state == WALRCV_STARTING)
147 : {
148 216 : pg_time_t now = (pg_time_t) time(NULL);
149 :
150 216 : if ((now - startTime) > WALRCV_STARTUP_TIMEOUT)
151 : {
758 tmunro 152 UBC 0 : bool stopped = false;
153 :
154 0 : SpinLockAcquire(&walrcv->mutex);
3769 heikki.linnakangas 155 0 : if (walrcv->walRcvState == WALRCV_STARTING)
156 : {
157 0 : state = walrcv->walRcvState = WALRCV_STOPPED;
758 tmunro 158 0 : stopped = true;
159 : }
3769 heikki.linnakangas 160 0 : SpinLockRelease(&walrcv->mutex);
161 :
758 tmunro 162 0 : if (stopped)
163 0 : ConditionVariableBroadcast(&walrcv->walRcvStoppedCV);
164 : }
165 : }
166 :
3769 heikki.linnakangas 167 CBC 29631 : if (state == WALRCV_STREAMING || state == WALRCV_STARTING ||
168 : state == WALRCV_RESTARTING)
169 26853 : return true;
170 : else
171 2778 : return false;
172 : }
173 :
174 : /*
175 : * Stop walreceiver (if running) and wait for it to die.
176 : * Executed by the Startup process.
177 : */
178 : void
4832 179 1283 : ShutdownWalRcv(void)
180 : {
2742 rhaas 181 1283 : WalRcvData *walrcv = WalRcv;
4790 bruce 182 1283 : pid_t walrcvpid = 0;
758 tmunro 183 1283 : bool stopped = false;
184 :
185 : /*
186 : * Request walreceiver to stop. Walreceiver will switch to WALRCV_STOPPED
187 : * mode once it's finished, and will also request postmaster to not
188 : * restart itself.
189 : */
4832 heikki.linnakangas 190 1283 : SpinLockAcquire(&walrcv->mutex);
4790 bruce 191 1283 : switch (walrcv->walRcvState)
192 : {
4820 heikki.linnakangas 193 1247 : case WALRCV_STOPPED:
194 1247 : break;
195 10 : case WALRCV_STARTING:
196 10 : walrcv->walRcvState = WALRCV_STOPPED;
758 tmunro 197 10 : stopped = true;
4820 heikki.linnakangas 198 10 : break;
199 :
3769 200 26 : case WALRCV_STREAMING:
201 : case WALRCV_WAITING:
202 : case WALRCV_RESTARTING:
4820 203 26 : walrcv->walRcvState = WALRCV_STOPPING;
204 : /* fall through */
205 26 : case WALRCV_STOPPING:
206 26 : walrcvpid = walrcv->pid;
207 26 : break;
208 : }
4832 209 1283 : SpinLockRelease(&walrcv->mutex);
210 :
211 : /* Unnecessary but consistent. */
758 tmunro 212 1283 : if (stopped)
213 10 : ConditionVariableBroadcast(&walrcv->walRcvStoppedCV);
214 :
215 : /*
216 : * Signal walreceiver process if it was still running.
217 : */
4832 heikki.linnakangas 218 1283 : if (walrcvpid != 0)
219 26 : kill(walrcvpid, SIGTERM);
220 :
221 : /*
222 : * Wait for walreceiver to acknowledge its death by setting state to
223 : * WALRCV_STOPPED.
224 : */
758 tmunro 225 1283 : ConditionVariablePrepareToSleep(&walrcv->walRcvStoppedCV);
3769 heikki.linnakangas 226 1309 : while (WalRcvRunning())
758 tmunro 227 26 : ConditionVariableSleep(&walrcv->walRcvStoppedCV,
228 : WAIT_EVENT_WAL_RECEIVER_EXIT);
229 1283 : ConditionVariableCancelSleep();
4832 heikki.linnakangas 230 1283 : }
231 :
232 : /*
233 : * Request postmaster to start walreceiver.
234 : *
235 : * "recptr" indicates the position where streaming should begin. "conninfo"
236 : * is a libpq connection string to use. "slotname" is, optionally, the name
237 : * of a replication slot to acquire. "create_temp_slot" indicates to create
238 : * a temporary slot when no "slotname" is given.
239 : *
240 : * WAL receivers do not directly load GUC parameters used for the connection
241 : * to the primary, and rely on the values passed down by the caller of this
242 : * routine instead. Hence, the addition of any new parameters should happen
243 : * through this code path.
244 : */
245 : void
3355 rhaas 246 125 : RequestXLogStreaming(TimeLineID tli, XLogRecPtr recptr, const char *conninfo,
247 : const char *slotname, bool create_temp_slot)
248 : {
2742 249 125 : WalRcvData *walrcv = WalRcv;
3769 heikki.linnakangas 250 125 : bool launch = false;
4790 bruce 251 125 : pg_time_t now = (pg_time_t) time(NULL);
252 : Latch *latch;
253 :
254 : /*
255 : * We always start at the beginning of the segment. That prevents a broken
256 : * segment (i.e., with no records in the first half of a segment) from
257 : * being created by XLOG streaming, which might cause trouble later on if
258 : * the segment is e.g archived.
259 : */
2028 andres 260 125 : if (XLogSegmentOffset(recptr, wal_segment_size) != 0)
261 125 : recptr -= XLogSegmentOffset(recptr, wal_segment_size);
262 :
4663 tgl 263 125 : SpinLockAcquire(&walrcv->mutex);
264 :
265 : /* It better be stopped if we try to restart it */
3769 heikki.linnakangas 266 125 : Assert(walrcv->walRcvState == WALRCV_STOPPED ||
267 : walrcv->walRcvState == WALRCV_WAITING);
268 :
4832 269 125 : if (conninfo != NULL)
270 125 : strlcpy((char *) walrcv->conninfo, conninfo, MAXCONNINFO);
271 : else
4832 heikki.linnakangas 272 UBC 0 : walrcv->conninfo[0] = '\0';
273 :
274 : /*
275 : * Use configured replication slot if present, and ignore the value of
276 : * create_temp_slot as the slot name should be persistent. Otherwise, use
277 : * create_temp_slot to determine whether this WAL receiver should create a
278 : * temporary slot by itself and use it, or not.
279 : */
1108 alvherre 280 CBC 125 : if (slotname != NULL && slotname[0] != '\0')
281 : {
3355 rhaas 282 21 : strlcpy((char *) walrcv->slotname, slotname, NAMEDATALEN);
1108 alvherre 283 21 : walrcv->is_temp_slot = false;
284 : }
285 : else
286 : {
3355 rhaas 287 104 : walrcv->slotname[0] = '\0';
1108 alvherre 288 104 : walrcv->is_temp_slot = create_temp_slot;
289 : }
290 :
3769 heikki.linnakangas 291 125 : if (walrcv->walRcvState == WALRCV_STOPPED)
292 : {
293 125 : launch = true;
294 125 : walrcv->walRcvState = WALRCV_STARTING;
295 : }
296 : else
3769 heikki.linnakangas 297 UBC 0 : walrcv->walRcvState = WALRCV_RESTARTING;
4820 heikki.linnakangas 298 CBC 125 : walrcv->startTime = now;
299 :
300 : /*
301 : * If this is the first startup of walreceiver (on this timeline),
302 : * initialize flushedUpto and latestChunkStart to the starting point.
303 : */
3623 304 125 : if (walrcv->receiveStart == 0 || walrcv->receivedTLI != tli)
305 : {
1096 tmunro 306 67 : walrcv->flushedUpto = recptr;
3623 heikki.linnakangas 307 67 : walrcv->receivedTLI = tli;
4422 308 67 : walrcv->latestChunkStart = recptr;
309 : }
310 125 : walrcv->receiveStart = recptr;
3769 311 125 : walrcv->receiveStartTLI = tli;
312 :
2014 tgl 313 125 : latch = walrcv->latch;
314 :
4832 heikki.linnakangas 315 125 : SpinLockRelease(&walrcv->mutex);
316 :
3769 317 125 : if (launch)
318 125 : SendPostmasterSignal(PMSIGNAL_START_WALRECEIVER);
2014 tgl 319 UBC 0 : else if (latch)
320 0 : SetLatch(latch);
4832 heikki.linnakangas 321 CBC 125 : }
322 :
323 : /*
324 : * Returns the last+1 byte position that walreceiver has flushed.
325 : *
326 : * Optionally, returns the previous chunk start, that is the first byte
327 : * written in the most recent walreceiver flush cycle. Callers not
328 : * interested in that value may pass NULL for latestChunkStart. Same for
329 : * receiveTLI.
330 : */
331 : XLogRecPtr
1096 tmunro 332 28001 : GetWalRcvFlushRecPtr(XLogRecPtr *latestChunkStart, TimeLineID *receiveTLI)
333 : {
2742 rhaas 334 28001 : WalRcvData *walrcv = WalRcv;
335 : XLogRecPtr recptr;
336 :
4832 heikki.linnakangas 337 28001 : SpinLockAcquire(&walrcv->mutex);
1096 tmunro 338 28001 : recptr = walrcv->flushedUpto;
4663 tgl 339 28001 : if (latestChunkStart)
340 26705 : *latestChunkStart = walrcv->latestChunkStart;
3769 heikki.linnakangas 341 28001 : if (receiveTLI)
342 27971 : *receiveTLI = walrcv->receivedTLI;
4832 343 28001 : SpinLockRelease(&walrcv->mutex);
344 :
345 28001 : return recptr;
346 : }
347 :
348 : /*
349 : * Returns the last+1 byte position that walreceiver has written.
350 : * This returns a recently written value without taking a lock.
351 : */
352 : XLogRecPtr
1096 tmunro 353 UBC 0 : GetWalRcvWriteRecPtr(void)
354 : {
355 0 : WalRcvData *walrcv = WalRcv;
356 :
357 0 : return pg_atomic_read_u64(&walrcv->writtenUpto);
358 : }
359 :
360 : /*
361 : * Returns the replication apply delay in ms or -1
362 : * if the apply delay info is not available
363 : */
364 : int
4117 simon 365 CBC 1212 : GetReplicationApplyDelay(void)
366 : {
2742 rhaas 367 1212 : WalRcvData *walrcv = WalRcv;
368 : XLogRecPtr receivePtr;
369 : XLogRecPtr replayPtr;
370 : TimestampTz chunkReplayStartTime;
371 :
4117 simon 372 1212 : SpinLockAcquire(&walrcv->mutex);
1096 tmunro 373 1212 : receivePtr = walrcv->flushedUpto;
4117 simon 374 1212 : SpinLockRelease(&walrcv->mutex);
375 :
3762 heikki.linnakangas 376 1212 : replayPtr = GetXLogReplayRecPtr(NULL);
377 :
3754 alvherre 378 1212 : if (receivePtr == replayPtr)
4117 simon 379 606 : return 0;
380 :
2217 peter_e 381 606 : chunkReplayStartTime = GetCurrentChunkReplayStartTime();
382 :
383 606 : if (chunkReplayStartTime == 0)
2948 ishii 384 82 : return -1;
385 :
880 tgl 386 524 : return TimestampDifferenceMilliseconds(chunkReplayStartTime,
387 : GetCurrentTimestamp());
388 : }
389 :
390 : /*
391 : * Returns the network latency in ms, note that this includes any
392 : * difference in clock settings between the servers, as well as timezone.
393 : */
394 : int
4117 simon 395 1212 : GetReplicationTransferLatency(void)
396 : {
2742 rhaas 397 1212 : WalRcvData *walrcv = WalRcv;
398 : TimestampTz lastMsgSendTime;
399 : TimestampTz lastMsgReceiptTime;
400 :
4117 simon 401 1212 : SpinLockAcquire(&walrcv->mutex);
402 1212 : lastMsgSendTime = walrcv->lastMsgSendTime;
403 1212 : lastMsgReceiptTime = walrcv->lastMsgReceiptTime;
404 1212 : SpinLockRelease(&walrcv->mutex);
405 :
880 tgl 406 1212 : return TimestampDifferenceMilliseconds(lastMsgSendTime,
407 : lastMsgReceiptTime);
408 : }
|