Age Owner Branch data TLA Line data Source code
1 : : /*-------------------------------------------------------------------------
2 : : *
3 : : * walreceiverfuncs.c
4 : : *
5 : : * This file contains functions used by the startup process to communicate
6 : : * with the walreceiver process. Functions implementing walreceiver itself
7 : : * are in walreceiver.c.
8 : : *
9 : : * Portions Copyright (c) 2010-2024, PostgreSQL Global Development Group
10 : : *
11 : : *
12 : : * IDENTIFICATION
13 : : * src/backend/replication/walreceiverfuncs.c
14 : : *
15 : : *-------------------------------------------------------------------------
16 : : */
17 : : #include "postgres.h"
18 : :
19 : : #include <sys/stat.h>
20 : : #include <sys/time.h>
21 : : #include <time.h>
22 : : #include <unistd.h>
23 : : #include <signal.h>
24 : :
25 : : #include "access/xlog_internal.h"
26 : : #include "access/xlogrecovery.h"
27 : : #include "pgstat.h"
28 : : #include "replication/walreceiver.h"
29 : : #include "storage/pmsignal.h"
30 : : #include "storage/shmem.h"
31 : : #include "utils/timestamp.h"
32 : :
33 : : WalRcvData *WalRcv = NULL;
34 : :
35 : : /*
36 : : * How long to wait for walreceiver to start up after requesting
37 : : * postmaster to launch it. In seconds.
38 : : */
39 : : #define WALRCV_STARTUP_TIMEOUT 10
40 : :
41 : : /* Report shared memory space needed by WalRcvShmemInit */
42 : : Size
5203 heikki.linnakangas@i 43 :CBC 3475 : WalRcvShmemSize(void)
44 : : {
5161 bruce@momjian.us 45 : 3475 : Size size = 0;
46 : :
5203 heikki.linnakangas@i 47 : 3475 : size = add_size(size, sizeof(WalRcvData));
48 : :
49 : 3475 : return size;
50 : : }
51 : :
52 : : /* Allocate and initialize walreceiver-related shared memory */
53 : : void
54 : 898 : WalRcvShmemInit(void)
55 : : {
56 : : bool found;
57 : :
58 : 898 : WalRcv = (WalRcvData *)
59 : 898 : ShmemInitStruct("Wal Receiver Ctl", WalRcvShmemSize(), &found);
60 : :
5100 tgl@sss.pgh.pa.us 61 [ + - ]: 898 : if (!found)
62 : : {
63 : : /* First time through, so initialize */
64 [ + - + - : 898 : MemSet(WalRcv, 0, WalRcvShmemSize());
+ - - + -
- ]
65 : 898 : WalRcv->walRcvState = WALRCV_STOPPED;
1129 tmunro@postgresql.or 66 : 898 : ConditionVariableInit(&WalRcv->walRcvStoppedCV);
5100 tgl@sss.pgh.pa.us 67 : 898 : SpinLockInit(&WalRcv->mutex);
1151 fujii@postgresql.org 68 : 898 : pg_atomic_init_u64(&WalRcv->writtenUpto, 0);
2692 peter_e@gmx.net 69 : 898 : WalRcv->latch = NULL;
70 : : }
5203 heikki.linnakangas@i 71 : 898 : }
72 : :
73 : : /* Is walreceiver running (or starting up)? */
74 : : bool
4140 75 : 1105 : WalRcvRunning(void)
76 : : {
3113 rhaas@postgresql.org 77 : 1105 : WalRcvData *walrcv = WalRcv;
78 : : WalRcvState state;
79 : : pg_time_t startTime;
80 : :
5203 heikki.linnakangas@i 81 [ - + ]: 1105 : SpinLockAcquire(&walrcv->mutex);
82 : :
5191 83 : 1105 : state = walrcv->walRcvState;
84 : 1105 : startTime = walrcv->startTime;
85 : :
86 : 1105 : SpinLockRelease(&walrcv->mutex);
87 : :
88 : : /*
89 : : * If it has taken too long for walreceiver to start up, give up. Setting
90 : : * the state to STOPPED ensures that if walreceiver later does start up
91 : : * after all, it will see that it's not supposed to be running and die
92 : : * without doing anything.
93 : : */
94 [ + + ]: 1105 : if (state == WALRCV_STARTING)
95 : : {
5161 bruce@momjian.us 96 :GBC 1 : pg_time_t now = (pg_time_t) time(NULL);
97 : :
5191 heikki.linnakangas@i 98 [ - + ]: 1 : if ((now - startTime) > WALRCV_STARTUP_TIMEOUT)
99 : : {
1129 tmunro@postgresql.or 100 :UBC 0 : bool stopped = false;
101 : :
102 [ # # ]: 0 : SpinLockAcquire(&walrcv->mutex);
5191 heikki.linnakangas@i 103 [ # # ]: 0 : if (walrcv->walRcvState == WALRCV_STARTING)
104 : : {
105 : 0 : state = walrcv->walRcvState = WALRCV_STOPPED;
1129 tmunro@postgresql.or 106 : 0 : stopped = true;
107 : : }
5191 heikki.linnakangas@i 108 : 0 : SpinLockRelease(&walrcv->mutex);
109 : :
1129 tmunro@postgresql.or 110 [ # # ]: 0 : if (stopped)
111 : 0 : ConditionVariableBroadcast(&walrcv->walRcvStoppedCV);
112 : : }
113 : : }
114 : :
5191 heikki.linnakangas@i 115 [ + + ]:CBC 1105 : if (state != WALRCV_STOPPED)
116 : 41 : return true;
117 : : else
118 : 1064 : return false;
119 : : }
120 : :
121 : : /*
122 : : * Is walreceiver running and streaming (or at least attempting to connect,
123 : : * or starting up)?
124 : : */
125 : : bool
4140 126 : 29185 : WalRcvStreaming(void)
127 : : {
3113 rhaas@postgresql.org 128 : 29185 : WalRcvData *walrcv = WalRcv;
129 : : WalRcvState state;
130 : : pg_time_t startTime;
131 : :
4140 heikki.linnakangas@i 132 [ + + ]: 29185 : SpinLockAcquire(&walrcv->mutex);
133 : :
134 : 29185 : state = walrcv->walRcvState;
135 : 29185 : startTime = walrcv->startTime;
136 : :
137 : 29185 : SpinLockRelease(&walrcv->mutex);
138 : :
139 : : /*
140 : : * If it has taken too long for walreceiver to start up, give up. Setting
141 : : * the state to STOPPED ensures that if walreceiver later does start up
142 : : * after all, it will see that it's not supposed to be running and die
143 : : * without doing anything.
144 : : */
145 [ + + ]: 29185 : if (state == WALRCV_STARTING)
146 : : {
147 : 517 : pg_time_t now = (pg_time_t) time(NULL);
148 : :
149 [ - + ]: 517 : if ((now - startTime) > WALRCV_STARTUP_TIMEOUT)
150 : : {
1129 tmunro@postgresql.or 151 :UBC 0 : bool stopped = false;
152 : :
153 [ # # ]: 0 : SpinLockAcquire(&walrcv->mutex);
4140 heikki.linnakangas@i 154 [ # # ]: 0 : if (walrcv->walRcvState == WALRCV_STARTING)
155 : : {
156 : 0 : state = walrcv->walRcvState = WALRCV_STOPPED;
1129 tmunro@postgresql.or 157 : 0 : stopped = true;
158 : : }
4140 heikki.linnakangas@i 159 : 0 : SpinLockRelease(&walrcv->mutex);
160 : :
1129 tmunro@postgresql.or 161 [ # # ]: 0 : if (stopped)
162 : 0 : ConditionVariableBroadcast(&walrcv->walRcvStoppedCV);
163 : : }
164 : : }
165 : :
4140 heikki.linnakangas@i 166 [ + + + + :CBC 29185 : if (state == WALRCV_STREAMING || state == WALRCV_STARTING ||
- + ]
167 : : state == WALRCV_RESTARTING)
168 : 26720 : return true;
169 : : else
170 : 2465 : return false;
171 : : }
172 : :
173 : : /*
174 : : * Stop walreceiver (if running) and wait for it to die.
175 : : * Executed by the Startup process.
176 : : */
177 : : void
5203 178 : 1063 : ShutdownWalRcv(void)
179 : : {
3113 rhaas@postgresql.org 180 : 1063 : WalRcvData *walrcv = WalRcv;
5161 bruce@momjian.us 181 : 1063 : pid_t walrcvpid = 0;
1129 tmunro@postgresql.or 182 : 1063 : bool stopped = false;
183 : :
184 : : /*
185 : : * Request walreceiver to stop. Walreceiver will switch to WALRCV_STOPPED
186 : : * mode once it's finished, and will also request postmaster to not
187 : : * restart itself.
188 : : */
5203 heikki.linnakangas@i 189 [ - + ]: 1063 : SpinLockAcquire(&walrcv->mutex);
5161 bruce@momjian.us 190 [ + + + - : 1063 : switch (walrcv->walRcvState)
- ]
191 : : {
5191 heikki.linnakangas@i 192 : 1020 : case WALRCV_STOPPED:
193 : 1020 : break;
5191 heikki.linnakangas@i 194 :GBC 4 : case WALRCV_STARTING:
195 : 4 : walrcv->walRcvState = WALRCV_STOPPED;
1129 tmunro@postgresql.or 196 : 4 : stopped = true;
5191 heikki.linnakangas@i 197 : 4 : break;
198 : :
4140 heikki.linnakangas@i 199 :CBC 39 : case WALRCV_STREAMING:
200 : : case WALRCV_WAITING:
201 : : case WALRCV_RESTARTING:
5191 202 : 39 : walrcv->walRcvState = WALRCV_STOPPING;
203 : : /* fall through */
204 : 39 : case WALRCV_STOPPING:
205 : 39 : walrcvpid = walrcv->pid;
206 : 39 : break;
207 : : }
5203 208 : 1063 : SpinLockRelease(&walrcv->mutex);
209 : :
210 : : /* Unnecessary but consistent. */
1129 tmunro@postgresql.or 211 [ + + ]: 1063 : if (stopped)
1129 tmunro@postgresql.or 212 :GBC 4 : ConditionVariableBroadcast(&walrcv->walRcvStoppedCV);
213 : :
214 : : /*
215 : : * Signal walreceiver process if it was still running.
216 : : */
5203 heikki.linnakangas@i 217 [ + + ]:CBC 1063 : if (walrcvpid != 0)
218 : 39 : kill(walrcvpid, SIGTERM);
219 : :
220 : : /*
221 : : * Wait for walreceiver to acknowledge its death by setting state to
222 : : * WALRCV_STOPPED.
223 : : */
1129 tmunro@postgresql.or 224 : 1063 : ConditionVariablePrepareToSleep(&walrcv->walRcvStoppedCV);
4140 heikki.linnakangas@i 225 [ + + ]: 1100 : while (WalRcvRunning())
1129 tmunro@postgresql.or 226 : 37 : ConditionVariableSleep(&walrcv->walRcvStoppedCV,
227 : : WAIT_EVENT_WAL_RECEIVER_EXIT);
228 : 1063 : ConditionVariableCancelSleep();
5203 heikki.linnakangas@i 229 : 1063 : }
230 : :
231 : : /*
232 : : * Request postmaster to start walreceiver.
233 : : *
234 : : * "recptr" indicates the position where streaming should begin. "conninfo"
235 : : * is a libpq connection string to use. "slotname" is, optionally, the name
236 : : * of a replication slot to acquire. "create_temp_slot" indicates to create
237 : : * a temporary slot when no "slotname" is given.
238 : : *
239 : : * WAL receivers do not directly load GUC parameters used for the connection
240 : : * to the primary, and rely on the values passed down by the caller of this
241 : : * routine instead. Hence, the addition of any new parameters should happen
242 : : * through this code path.
243 : : */
244 : : void
3726 rhaas@postgresql.org 245 : 321 : RequestXLogStreaming(TimeLineID tli, XLogRecPtr recptr, const char *conninfo,
246 : : const char *slotname, bool create_temp_slot)
247 : : {
3113 248 : 321 : WalRcvData *walrcv = WalRcv;
4140 heikki.linnakangas@i 249 : 321 : bool launch = false;
5161 bruce@momjian.us 250 : 321 : pg_time_t now = (pg_time_t) time(NULL);
251 : : Latch *latch;
252 : :
253 : : /*
254 : : * We always start at the beginning of the segment. That prevents a broken
255 : : * segment (i.e., with no records in the first half of a segment) from
256 : : * being created by XLOG streaming, which might cause trouble later on if
257 : : * the segment is e.g archived.
258 : : */
2399 andres@anarazel.de 259 [ + - ]: 321 : if (XLogSegmentOffset(recptr, wal_segment_size) != 0)
260 : 321 : recptr -= XLogSegmentOffset(recptr, wal_segment_size);
261 : :
5034 tgl@sss.pgh.pa.us 262 [ - + ]: 321 : SpinLockAcquire(&walrcv->mutex);
263 : :
264 : : /* It better be stopped if we try to restart it */
4140 heikki.linnakangas@i 265 [ - + - - ]: 321 : Assert(walrcv->walRcvState == WALRCV_STOPPED ||
266 : : walrcv->walRcvState == WALRCV_WAITING);
267 : :
5203 268 [ + - ]: 321 : if (conninfo != NULL)
269 : 321 : strlcpy((char *) walrcv->conninfo, conninfo, MAXCONNINFO);
270 : : else
5203 heikki.linnakangas@i 271 :UBC 0 : walrcv->conninfo[0] = '\0';
272 : :
273 : : /*
274 : : * Use configured replication slot if present, and ignore the value of
275 : : * create_temp_slot as the slot name should be persistent. Otherwise, use
276 : : * create_temp_slot to determine whether this WAL receiver should create a
277 : : * temporary slot by itself and use it, or not.
278 : : */
1479 alvherre@alvh.no-ip. 279 [ + - + + ]:CBC 321 : if (slotname != NULL && slotname[0] != '\0')
280 : : {
3726 rhaas@postgresql.org 281 : 75 : strlcpy((char *) walrcv->slotname, slotname, NAMEDATALEN);
1479 alvherre@alvh.no-ip. 282 : 75 : walrcv->is_temp_slot = false;
283 : : }
284 : : else
285 : : {
3726 rhaas@postgresql.org 286 : 246 : walrcv->slotname[0] = '\0';
1479 alvherre@alvh.no-ip. 287 : 246 : walrcv->is_temp_slot = create_temp_slot;
288 : : }
289 : :
4140 heikki.linnakangas@i 290 [ + - ]: 321 : if (walrcv->walRcvState == WALRCV_STOPPED)
291 : : {
292 : 321 : launch = true;
293 : 321 : walrcv->walRcvState = WALRCV_STARTING;
294 : : }
295 : : else
4140 heikki.linnakangas@i 296 :UBC 0 : walrcv->walRcvState = WALRCV_RESTARTING;
5191 heikki.linnakangas@i 297 :CBC 321 : walrcv->startTime = now;
298 : :
299 : : /*
300 : : * If this is the first startup of walreceiver (on this timeline),
301 : : * initialize flushedUpto and latestChunkStart to the starting point.
302 : : */
3994 303 [ + + - + ]: 321 : if (walrcv->receiveStart == 0 || walrcv->receivedTLI != tli)
304 : : {
1467 tmunro@postgresql.or 305 : 128 : walrcv->flushedUpto = recptr;
3994 heikki.linnakangas@i 306 : 128 : walrcv->receivedTLI = tli;
4793 307 : 128 : walrcv->latestChunkStart = recptr;
308 : : }
309 : 321 : walrcv->receiveStart = recptr;
4140 310 : 321 : walrcv->receiveStartTLI = tli;
311 : :
2385 tgl@sss.pgh.pa.us 312 : 321 : latch = walrcv->latch;
313 : :
5203 heikki.linnakangas@i 314 : 321 : SpinLockRelease(&walrcv->mutex);
315 : :
4140 316 [ + - ]: 321 : if (launch)
317 : 321 : SendPostmasterSignal(PMSIGNAL_START_WALRECEIVER);
2385 tgl@sss.pgh.pa.us 318 [ # # ]:UBC 0 : else if (latch)
319 : 0 : SetLatch(latch);
5203 heikki.linnakangas@i 320 :CBC 321 : }
321 : :
322 : : /*
323 : : * Returns the last+1 byte position that walreceiver has flushed.
324 : : *
325 : : * Optionally, returns the previous chunk start, that is the first byte
326 : : * written in the most recent walreceiver flush cycle. Callers not
327 : : * interested in that value may pass NULL for latestChunkStart. Same for
328 : : * receiveTLI.
329 : : */
330 : : XLogRecPtr
1467 tmunro@postgresql.or 331 : 27513 : GetWalRcvFlushRecPtr(XLogRecPtr *latestChunkStart, TimeLineID *receiveTLI)
332 : : {
3113 rhaas@postgresql.org 333 : 27513 : WalRcvData *walrcv = WalRcv;
334 : : XLogRecPtr recptr;
335 : :
5203 heikki.linnakangas@i 336 [ + + ]: 27513 : SpinLockAcquire(&walrcv->mutex);
1467 tmunro@postgresql.or 337 : 27513 : recptr = walrcv->flushedUpto;
5034 tgl@sss.pgh.pa.us 338 [ + + ]: 27513 : if (latestChunkStart)
339 : 26472 : *latestChunkStart = walrcv->latestChunkStart;
4140 heikki.linnakangas@i 340 [ + + ]: 27513 : if (receiveTLI)
341 : 27463 : *receiveTLI = walrcv->receivedTLI;
5203 342 : 27513 : SpinLockRelease(&walrcv->mutex);
343 : :
344 : 27513 : return recptr;
345 : : }
346 : :
347 : : /*
348 : : * Returns the last+1 byte position that walreceiver has written.
349 : : * This returns a recently written value without taking a lock.
350 : : */
351 : : XLogRecPtr
1467 tmunro@postgresql.or 352 :UBC 0 : GetWalRcvWriteRecPtr(void)
353 : : {
354 : 0 : WalRcvData *walrcv = WalRcv;
355 : :
356 : 0 : return pg_atomic_read_u64(&walrcv->writtenUpto);
357 : : }
358 : :
359 : : /*
360 : : * Returns the replication apply delay in ms or -1
361 : : * if the apply delay info is not available
362 : : */
363 : : int
4488 simon@2ndQuadrant.co 364 :CBC 408 : GetReplicationApplyDelay(void)
365 : : {
3113 rhaas@postgresql.org 366 : 408 : WalRcvData *walrcv = WalRcv;
367 : : XLogRecPtr receivePtr;
368 : : XLogRecPtr replayPtr;
369 : : TimestampTz chunkReplayStartTime;
370 : :
4488 simon@2ndQuadrant.co 371 [ - + ]: 408 : SpinLockAcquire(&walrcv->mutex);
1467 tmunro@postgresql.or 372 : 408 : receivePtr = walrcv->flushedUpto;
4488 simon@2ndQuadrant.co 373 : 408 : SpinLockRelease(&walrcv->mutex);
374 : :
4133 heikki.linnakangas@i 375 : 408 : replayPtr = GetXLogReplayRecPtr(NULL);
376 : :
4125 alvherre@alvh.no-ip. 377 [ + + ]: 408 : if (receivePtr == replayPtr)
4488 simon@2ndQuadrant.co 378 : 64 : return 0;
379 : :
2588 peter_e@gmx.net 380 : 344 : chunkReplayStartTime = GetCurrentChunkReplayStartTime();
381 : :
382 [ + + ]: 344 : if (chunkReplayStartTime == 0)
3319 ishii@postgresql.org 383 : 1 : return -1;
384 : :
1251 tgl@sss.pgh.pa.us 385 : 343 : return TimestampDifferenceMilliseconds(chunkReplayStartTime,
386 : : GetCurrentTimestamp());
387 : : }
388 : :
389 : : /*
390 : : * Returns the network latency in ms, note that this includes any
391 : : * difference in clock settings between the servers, as well as timezone.
392 : : */
393 : : int
4488 simon@2ndQuadrant.co 394 : 408 : GetReplicationTransferLatency(void)
395 : : {
3113 rhaas@postgresql.org 396 : 408 : WalRcvData *walrcv = WalRcv;
397 : : TimestampTz lastMsgSendTime;
398 : : TimestampTz lastMsgReceiptTime;
399 : :
4488 simon@2ndQuadrant.co 400 [ - + ]: 408 : SpinLockAcquire(&walrcv->mutex);
401 : 408 : lastMsgSendTime = walrcv->lastMsgSendTime;
402 : 408 : lastMsgReceiptTime = walrcv->lastMsgReceiptTime;
403 : 408 : SpinLockRelease(&walrcv->mutex);
404 : :
1251 tgl@sss.pgh.pa.us 405 : 408 : return TimestampDifferenceMilliseconds(lastMsgSendTime,
406 : : lastMsgReceiptTime);
407 : : }
|