TLA Line data Source code
1 : /*-------------------------------------------------------------------------
2 : *
3 : * syncrep.c
4 : *
5 : * Synchronous replication is new as of PostgreSQL 9.1.
6 : *
7 : * If requested, transaction commits wait until their commit LSN are
8 : * acknowledged by the synchronous standbys.
9 : *
10 : * This module contains the code for waiting and release of backends.
11 : * All code in this module executes on the primary. The core streaming
12 : * replication transport remains within WALreceiver/WALsender modules.
13 : *
14 : * The essence of this design is that it isolates all logic about
15 : * waiting/releasing onto the primary. The primary defines which standbys
16 : * it wishes to wait for. The standbys are completely unaware of the
17 : * durability requirements of transactions on the primary, reducing the
18 : * complexity of the code and streamlining both standby operations and
19 : * network bandwidth because there is no requirement to ship
20 : * per-transaction state information.
21 : *
22 : * Replication is either synchronous or not synchronous (async). If it is
23 : * async, we just fastpath out of here. If it is sync, then we wait for
24 : * the write, flush or apply location on the standby before releasing
25 : * the waiting backend. Further complexity in that interaction is
26 : * expected in later releases.
27 : *
28 : * The best performing way to manage the waiting backends is to have a
29 : * single ordered queue of waiting backends, so that we can avoid
30 : * searching the through all waiters each time we receive a reply.
31 : *
32 : * In 9.5 or before only a single standby could be considered as
33 : * synchronous. In 9.6 we support a priority-based multiple synchronous
34 : * standbys. In 10.0 a quorum-based multiple synchronous standbys is also
35 : * supported. The number of synchronous standbys that transactions
36 : * must wait for replies from is specified in synchronous_standby_names.
37 : * This parameter also specifies a list of standby names and the method
38 : * (FIRST and ANY) to choose synchronous standbys from the listed ones.
39 : *
40 : * The method FIRST specifies a priority-based synchronous replication
41 : * and makes transaction commits wait until their WAL records are
42 : * replicated to the requested number of synchronous standbys chosen based
43 : * on their priorities. The standbys whose names appear earlier in the list
44 : * are given higher priority and will be considered as synchronous.
45 : * Other standby servers appearing later in this list represent potential
46 : * synchronous standbys. If any of the current synchronous standbys
47 : * disconnects for whatever reason, it will be replaced immediately with
48 : * the next-highest-priority standby.
49 : *
50 : * The method ANY specifies a quorum-based synchronous replication
51 : * and makes transaction commits wait until their WAL records are
52 : * replicated to at least the requested number of synchronous standbys
53 : * in the list. All the standbys appearing in the list are considered as
54 : * candidates for quorum synchronous standbys.
55 : *
56 : * If neither FIRST nor ANY is specified, FIRST is used as the method.
57 : * This is for backward compatibility with 9.6 or before where only a
58 : * priority-based sync replication was supported.
59 : *
60 : * Before the standbys chosen from synchronous_standby_names can
61 : * become the synchronous standbys they must have caught up with
62 : * the primary; that may take some time. Once caught up,
63 : * the standbys which are considered as synchronous at that moment
64 : * will release waiters from the queue.
65 : *
66 : * Portions Copyright (c) 2010-2023, PostgreSQL Global Development Group
67 : *
68 : * IDENTIFICATION
69 : * src/backend/replication/syncrep.c
70 : *
71 : *-------------------------------------------------------------------------
72 : */
73 : #include "postgres.h"
74 :
75 : #include <unistd.h>
76 :
77 : #include "access/xact.h"
78 : #include "miscadmin.h"
79 : #include "pgstat.h"
80 : #include "replication/syncrep.h"
81 : #include "replication/walsender.h"
82 : #include "replication/walsender_private.h"
83 : #include "storage/pmsignal.h"
84 : #include "storage/proc.h"
85 : #include "tcop/tcopprot.h"
86 : #include "utils/builtins.h"
87 : #include "utils/guc_hooks.h"
88 : #include "utils/ps_status.h"
89 :
90 : /* User-settable parameters for sync rep */
91 : char *SyncRepStandbyNames;
92 :
93 : #define SyncStandbysDefined() \
94 : (SyncRepStandbyNames != NULL && SyncRepStandbyNames[0] != '\0')
95 :
96 : static bool announce_next_takeover = true;
97 :
98 : SyncRepConfigData *SyncRepConfig = NULL;
99 : static int SyncRepWaitMode = SYNC_REP_NO_WAIT;
100 :
101 : static void SyncRepQueueInsert(int mode);
102 : static void SyncRepCancelWait(void);
103 : static int SyncRepWakeQueue(bool all, int mode);
104 :
105 : static bool SyncRepGetSyncRecPtr(XLogRecPtr *writePtr,
106 : XLogRecPtr *flushPtr,
107 : XLogRecPtr *applyPtr,
108 : bool *am_sync);
109 : static void SyncRepGetOldestSyncRecPtr(XLogRecPtr *writePtr,
110 : XLogRecPtr *flushPtr,
111 : XLogRecPtr *applyPtr,
112 : SyncRepStandbyData *sync_standbys,
113 : int num_standbys);
114 : static void SyncRepGetNthLatestSyncRecPtr(XLogRecPtr *writePtr,
115 : XLogRecPtr *flushPtr,
116 : XLogRecPtr *applyPtr,
117 : SyncRepStandbyData *sync_standbys,
118 : int num_standbys,
119 : uint8 nth);
120 : static int SyncRepGetStandbyPriority(void);
121 : static int standby_priority_comparator(const void *a, const void *b);
122 : static int cmp_lsn(const void *a, const void *b);
123 :
124 : #ifdef USE_ASSERT_CHECKING
125 : static bool SyncRepQueueIsOrderedByLSN(int mode);
126 : #endif
127 :
128 : /*
129 : * ===========================================================
130 : * Synchronous Replication functions for normal user backends
131 : * ===========================================================
132 : */
133 :
134 : /*
135 : * Wait for synchronous replication, if requested by user.
136 : *
137 : * Initially backends start in state SYNC_REP_NOT_WAITING and then
138 : * change that state to SYNC_REP_WAITING before adding ourselves
139 : * to the wait queue. During SyncRepWakeQueue() a WALSender changes
140 : * the state to SYNC_REP_WAIT_COMPLETE once replication is confirmed.
141 : * This backend then resets its state to SYNC_REP_NOT_WAITING.
142 : *
143 : * 'lsn' represents the LSN to wait for. 'commit' indicates whether this LSN
144 : * represents a commit record. If it doesn't, then we wait only for the WAL
145 : * to be flushed if synchronous_commit is set to the higher level of
146 : * remote_apply, because only commit records provide apply feedback.
147 : */
148 : void
149 GIC 290343 : SyncRepWaitForLSN(XLogRecPtr lsn, bool commit)
150 ECB : {
151 : int mode;
152 :
153 : /*
154 : * This should be called while holding interrupts during a transaction
155 : * commit to prevent the follow-up shared memory queue cleanups to be
156 : * influenced by external interruptions.
157 : */
158 GIC 290343 : Assert(InterruptHoldoffCount > 0);
159 :
160 : /*
161 : * Fast exit if user has not requested sync replication, or there are no
162 : * sync replication standby names defined.
163 : *
164 : * Since this routine gets called every commit time, it's important to
165 : * exit quickly if sync replication is not requested. So we check
166 : * WalSndCtl->sync_standbys_defined flag without the lock and exit
167 : * immediately if it's false. If it's true, we need to check it again
168 : * later while holding the lock, to check the flag and operate the sync
169 : * rep queue atomically. This is necessary to avoid the race condition
170 : * described in SyncRepUpdateSyncStandbysDefined(). On the other hand, if
171 : * it's false, the lock is not necessary because we don't touch the queue.
172 ECB : */
173 CBC 290343 : if (!SyncRepRequested() ||
174 266442 : !((volatile WalSndCtlData *) WalSndCtl)->sync_standbys_defined)
175 GIC 290299 : return;
176 :
177 ECB : /* Cap the level for anything other than commit to remote flush only. */
178 CBC 44 : if (commit)
179 GIC 22 : mode = SyncRepWaitMode;
180 ECB : else
181 GIC 22 : mode = Min(SyncRepWaitMode, SYNC_REP_WAIT_FLUSH);
182 ECB :
183 GNC 44 : Assert(dlist_node_is_detached(&MyProc->syncRepLinks));
184 GIC 44 : Assert(WalSndCtl != NULL);
185 ECB :
186 CBC 44 : LWLockAcquire(SyncRepLock, LW_EXCLUSIVE);
187 GIC 44 : Assert(MyProc->syncRepState == SYNC_REP_NOT_WAITING);
188 :
189 : /*
190 : * We don't wait for sync rep if WalSndCtl->sync_standbys_defined is not
191 : * set. See SyncRepUpdateSyncStandbysDefined.
192 : *
193 : * Also check that the standby hasn't already replied. Unlikely race
194 : * condition but we'll be fetching that cache line anyway so it's likely
195 : * to be a low cost check.
196 ECB : */
197 CBC 44 : if (!WalSndCtl->sync_standbys_defined ||
198 GIC 44 : lsn <= WalSndCtl->lsn[mode])
199 ECB : {
200 CBC 10 : LWLockRelease(SyncRepLock);
201 GIC 10 : return;
202 : }
203 :
204 : /*
205 : * Set our waitLSN so WALSender will know when to wake us, and add
206 : * ourselves to the queue.
207 ECB : */
208 CBC 34 : MyProc->waitLSN = lsn;
209 34 : MyProc->syncRepState = SYNC_REP_WAITING;
210 34 : SyncRepQueueInsert(mode);
211 34 : Assert(SyncRepQueueIsOrderedByLSN(mode));
212 GIC 34 : LWLockRelease(SyncRepLock);
213 :
214 ECB : /* Alter ps display to show waiting for sync rep. */
215 GIC 34 : if (update_process_title)
216 : {
217 : char buffer[32];
218 ECB :
219 GNC 34 : sprintf(buffer, "waiting for %X/%X", LSN_FORMAT_ARGS(lsn));
220 34 : set_ps_display_suffix(buffer);
221 : }
222 :
223 : /*
224 ECB : * Wait for specified LSN to be confirmed.
225 : *
226 : * Each proc has its own wait latch, so we perform a normal latch
227 : * check/wait loop here.
228 : */
229 : for (;;)
230 GIC 34 : {
231 : int rc;
232 :
233 : /* Must reset the latch before testing state. */
234 68 : ResetLatch(MyLatch);
235 :
236 : /*
237 ECB : * Acquiring the lock is not needed, the latch ensures proper
238 : * barriers. If it looks like we're done, we must really be done,
239 : * because once walsender changes the state to SYNC_REP_WAIT_COMPLETE,
240 : * it will never update it again, so we can't be seeing a stale value
241 : * in that case.
242 : */
243 GIC 68 : if (MyProc->syncRepState == SYNC_REP_WAIT_COMPLETE)
244 34 : break;
245 :
246 : /*
247 : * If a wait for synchronous replication is pending, we can neither
248 : * acknowledge the commit nor raise ERROR or FATAL. The latter would
249 : * lead the client to believe that the transaction aborted, which is
250 : * not true: it's already committed locally. The former is no good
251 : * either: the client has requested synchronous replication, and is
252 ECB : * entitled to assume that an acknowledged commit is also replicated,
253 : * which might not be true. So in this case we issue a WARNING (which
254 EUB : * some clients may be able to interpret) and shut off further output.
255 : * We do NOT reset ProcDiePending, so that the process will die after
256 : * the commit is cleaned up.
257 : */
258 GBC 34 : if (ProcDiePending)
259 EUB : {
260 UBC 0 : ereport(WARNING,
261 : (errcode(ERRCODE_ADMIN_SHUTDOWN),
262 : errmsg("canceling the wait for synchronous replication and terminating connection due to administrator command"),
263 : errdetail("The transaction has already committed locally, but might not have been replicated to the standby.")));
264 UIC 0 : whereToSendOutput = DestNone;
265 0 : SyncRepCancelWait();
266 0 : break;
267 : }
268 :
269 ECB : /*
270 : * It's unclear what to do if a query cancel interrupt arrives. We
271 EUB : * can't actually abort at this point, but ignoring the interrupt
272 : * altogether is not helpful, so we just terminate the wait with a
273 : * suitable warning.
274 : */
275 GBC 34 : if (QueryCancelPending)
276 EUB : {
277 UIC 0 : QueryCancelPending = false;
278 0 : ereport(WARNING,
279 : (errmsg("canceling wait for synchronous replication due to user request"),
280 : errdetail("The transaction has already committed locally, but might not have been replicated to the standby.")));
281 0 : SyncRepCancelWait();
282 0 : break;
283 ECB : }
284 :
285 : /*
286 : * Wait on latch. Any condition that should wake us up will set the
287 : * latch, so no need for timeout.
288 : */
289 GIC 34 : rc = WaitLatch(MyLatch, WL_LATCH_SET | WL_POSTMASTER_DEATH, -1,
290 ECB : WAIT_EVENT_SYNC_REP);
291 :
292 EUB : /*
293 : * If the postmaster dies, we'll probably never get an acknowledgment,
294 : * because all the wal sender processes will exit. So just bail out.
295 : */
296 GIC 34 : if (rc & WL_POSTMASTER_DEATH)
297 : {
298 UIC 0 : ProcDiePending = true;
299 0 : whereToSendOutput = DestNone;
300 0 : SyncRepCancelWait();
301 0 : break;
302 : }
303 : }
304 :
305 : /*
306 : * WalSender has checked our LSN and has removed us from queue. Clean up
307 ECB : * state and leave. It's OK to reset these shared memory fields without
308 : * holding SyncRepLock, because any walsenders will ignore us anyway when
309 : * we're not on the queue. We need a read barrier to make sure we see the
310 : * changes to the queue link (this might be unnecessary without
311 : * assertions, but better safe than sorry).
312 : */
313 CBC 34 : pg_read_barrier();
314 GNC 34 : Assert(dlist_node_is_detached(&MyProc->syncRepLinks));
315 GIC 34 : MyProc->syncRepState = SYNC_REP_NOT_WAITING;
316 34 : MyProc->waitLSN = 0;
317 :
318 : /* reset ps display to remove the suffix */
319 GNC 34 : if (update_process_title)
320 34 : set_ps_display_remove_suffix();
321 ECB : }
322 :
323 : /*
324 : * Insert MyProc into the specified SyncRepQueue, maintaining sorted invariant.
325 : *
326 : * Usually we will go at tail of queue, though it's possible that we arrive
327 : * here out of order, so start at tail and work back to insertion point.
328 : */
329 : static void
330 GIC 34 : SyncRepQueueInsert(int mode)
331 EUB : {
332 : dlist_head *queue;
333 : dlist_iter iter;
334 :
335 GIC 34 : Assert(mode >= 0 && mode < NUM_SYNC_REP_WAIT_MODE);
336 GNC 34 : queue = &WalSndCtl->SyncRepQueue[mode];
337 :
338 34 : dlist_reverse_foreach(iter, queue)
339 EUB : {
340 UNC 0 : PGPROC *proc = dlist_container(PGPROC, syncRepLinks, iter.cur);
341 :
342 : /*
343 : * Stop at the queue element that we should insert after to ensure the
344 : * queue is ordered by LSN.
345 : */
346 UIC 0 : if (proc->waitLSN < MyProc->waitLSN)
347 : {
348 UNC 0 : dlist_insert_after(&proc->syncRepLinks, &MyProc->syncRepLinks);
349 0 : return;
350 : }
351 : }
352 :
353 : /*
354 : * If we get here, the list was either empty, or this process needs to be
355 : * at the head.
356 : */
357 GNC 34 : dlist_push_head(queue, &MyProc->syncRepLinks);
358 EUB : }
359 :
360 : /*
361 : * Acquire SyncRepLock and cancel any wait currently in progress.
362 : */
363 : static void
364 UIC 0 : SyncRepCancelWait(void)
365 : {
366 LBC 0 : LWLockAcquire(SyncRepLock, LW_EXCLUSIVE);
367 UNC 0 : if (!dlist_node_is_detached(&MyProc->syncRepLinks))
368 0 : dlist_delete_thoroughly(&MyProc->syncRepLinks);
369 UIC 0 : MyProc->syncRepState = SYNC_REP_NOT_WAITING;
370 0 : LWLockRelease(SyncRepLock);
371 0 : }
372 ECB :
373 : void
374 GBC 11507 : SyncRepCleanupAtProcExit(void)
375 : {
376 : /*
377 EUB : * First check if we are removed from the queue without the lock to not
378 : * slow down backend exit.
379 : */
380 GNC 11507 : if (!dlist_node_is_detached(&MyProc->syncRepLinks))
381 : {
382 LBC 0 : LWLockAcquire(SyncRepLock, LW_EXCLUSIVE);
383 :
384 : /* maybe we have just been removed, so recheck */
385 UNC 0 : if (!dlist_node_is_detached(&MyProc->syncRepLinks))
386 0 : dlist_delete_thoroughly(&MyProc->syncRepLinks);
387 :
388 UIC 0 : LWLockRelease(SyncRepLock);
389 : }
390 GIC 11507 : }
391 :
392 : /*
393 : * ===========================================================
394 : * Synchronous Replication functions for wal sender processes
395 ECB : * ===========================================================
396 : */
397 :
398 : /*
399 : * Take any action required to initialise sync rep state from config
400 : * data. Called at WALSender startup and after each SIGHUP.
401 : */
402 : void
403 CBC 509 : SyncRepInitConfig(void)
404 ECB : {
405 : int priority;
406 :
407 : /*
408 : * Determine if we are a potential sync standby and remember the result
409 : * for handling replies from standby.
410 : */
411 GIC 509 : priority = SyncRepGetStandbyPriority();
412 509 : if (MyWalSnd->sync_standby_priority != priority)
413 : {
414 CBC 20 : SpinLockAcquire(&MyWalSnd->mutex);
415 GIC 20 : MyWalSnd->sync_standby_priority = priority;
416 20 : SpinLockRelease(&MyWalSnd->mutex);
417 :
418 20 : ereport(DEBUG1,
419 : (errmsg_internal("standby \"%s\" now has synchronous standby priority %u",
420 : application_name, priority)));
421 : }
422 509 : }
423 :
424 ECB : /*
425 : * Update the LSNs on each queue based upon our latest state. This
426 : * implements a simple policy of first-valid-sync-standby-releases-waiter.
427 : *
428 : * Other policies are possible, which would change what we do here and
429 : * perhaps also which information we store as well.
430 : */
431 : void
432 CBC 90688 : SyncRepReleaseWaiters(void)
433 ECB : {
434 CBC 90688 : volatile WalSndCtlData *walsndctl = WalSndCtl;
435 : XLogRecPtr writePtr;
436 : XLogRecPtr flushPtr;
437 : XLogRecPtr applyPtr;
438 : bool got_recptr;
439 : bool am_sync;
440 GIC 90688 : int numwrite = 0;
441 90688 : int numflush = 0;
442 90688 : int numapply = 0;
443 ECB :
444 : /*
445 : * If this WALSender is serving a standby that is not on the list of
446 : * potential sync standbys then we have nothing to do. If we are still
447 : * starting up, still running base backup or the current flush position is
448 : * still invalid, then leave quickly also. Streaming or stopping WAL
449 : * senders are allowed to release waiters.
450 : */
451 GIC 90688 : if (MyWalSnd->sync_standby_priority == 0 ||
452 127 : (MyWalSnd->state != WALSNDSTATE_STREAMING &&
453 46 : MyWalSnd->state != WALSNDSTATE_STOPPING) ||
454 102 : XLogRecPtrIsInvalid(MyWalSnd->flush))
455 : {
456 CBC 90586 : announce_next_takeover = true;
457 GIC 90586 : return;
458 : }
459 :
460 : /*
461 : * We're a potential sync standby. Release waiters if there are enough
462 : * sync standbys and we are considered as sync.
463 : */
464 102 : LWLockAcquire(SyncRepLock, LW_EXCLUSIVE);
465 :
466 ECB : /*
467 : * Check whether we are a sync standby or not, and calculate the synced
468 : * positions among all sync standbys. (Note: although this step does not
469 : * of itself require holding SyncRepLock, it seems like a good idea to do
470 : * it after acquiring the lock. This ensures that the WAL pointers we use
471 : * to release waiters are newer than any previous execution of this
472 : * routine used.)
473 : */
474 CBC 102 : got_recptr = SyncRepGetSyncRecPtr(&writePtr, &flushPtr, &applyPtr, &am_sync);
475 :
476 ECB : /*
477 : * If we are managing a sync standby, though we weren't prior to this,
478 : * then announce we are now a sync standby.
479 : */
480 GIC 102 : if (announce_next_takeover && am_sync)
481 EUB : {
482 GIC 8 : announce_next_takeover = false;
483 :
484 8 : if (SyncRepConfig->syncrep_method == SYNC_REP_PRIORITY)
485 8 : ereport(LOG,
486 : (errmsg("standby \"%s\" is now a synchronous standby with priority %u",
487 : application_name, MyWalSnd->sync_standby_priority)));
488 : else
489 UIC 0 : ereport(LOG,
490 ECB : (errmsg("standby \"%s\" is now a candidate for quorum synchronous standby",
491 : application_name)));
492 EUB : }
493 :
494 : /*
495 : * If the number of sync standbys is less than requested or we aren't
496 : * managing a sync standby then just leave.
497 : */
498 GIC 102 : if (!got_recptr || !am_sync)
499 : {
500 UIC 0 : LWLockRelease(SyncRepLock);
501 LBC 0 : announce_next_takeover = !am_sync;
502 UIC 0 : return;
503 ECB : }
504 :
505 : /*
506 : * Set the lsn first so that when we wake backends they will release up to
507 : * this location.
508 : */
509 CBC 102 : if (walsndctl->lsn[SYNC_REP_WAIT_WRITE] < writePtr)
510 : {
511 36 : walsndctl->lsn[SYNC_REP_WAIT_WRITE] = writePtr;
512 GIC 36 : numwrite = SyncRepWakeQueue(false, SYNC_REP_WAIT_WRITE);
513 ECB : }
514 CBC 102 : if (walsndctl->lsn[SYNC_REP_WAIT_FLUSH] < flushPtr)
515 : {
516 GIC 41 : walsndctl->lsn[SYNC_REP_WAIT_FLUSH] = flushPtr;
517 CBC 41 : numflush = SyncRepWakeQueue(false, SYNC_REP_WAIT_FLUSH);
518 : }
519 102 : if (walsndctl->lsn[SYNC_REP_WAIT_APPLY] < applyPtr)
520 : {
521 GIC 41 : walsndctl->lsn[SYNC_REP_WAIT_APPLY] = applyPtr;
522 41 : numapply = SyncRepWakeQueue(false, SYNC_REP_WAIT_APPLY);
523 : }
524 :
525 102 : LWLockRelease(SyncRepLock);
526 :
527 102 : elog(DEBUG3, "released %d procs up to write %X/%X, %d procs up to flush %X/%X, %d procs up to apply %X/%X",
528 : numwrite, LSN_FORMAT_ARGS(writePtr),
529 : numflush, LSN_FORMAT_ARGS(flushPtr),
530 : numapply, LSN_FORMAT_ARGS(applyPtr));
531 : }
532 :
533 : /*
534 : * Calculate the synced Write, Flush and Apply positions among sync standbys.
535 : *
536 ECB : * Return false if the number of sync standbys is less than
537 : * synchronous_standby_names specifies. Otherwise return true and
538 : * store the positions into *writePtr, *flushPtr and *applyPtr.
539 : *
540 : * On return, *am_sync is set to true if this walsender is connecting to
541 : * sync standby. Otherwise it's set to false.
542 : */
543 : static bool
544 CBC 102 : SyncRepGetSyncRecPtr(XLogRecPtr *writePtr, XLogRecPtr *flushPtr,
545 ECB : XLogRecPtr *applyPtr, bool *am_sync)
546 : {
547 : SyncRepStandbyData *sync_standbys;
548 : int num_standbys;
549 : int i;
550 :
551 EUB : /* Initialize default results */
552 GIC 102 : *writePtr = InvalidXLogRecPtr;
553 102 : *flushPtr = InvalidXLogRecPtr;
554 CBC 102 : *applyPtr = InvalidXLogRecPtr;
555 GIC 102 : *am_sync = false;
556 :
557 ECB : /* Quick out if not even configured to be synchronous */
558 GIC 102 : if (SyncRepConfig == NULL)
559 LBC 0 : return false;
560 :
561 ECB : /* Get standbys that are considered as synchronous at this moment */
562 CBC 102 : num_standbys = SyncRepGetCandidateStandbys(&sync_standbys);
563 :
564 : /* Am I among the candidate sync standbys? */
565 GIC 102 : for (i = 0; i < num_standbys; i++)
566 : {
567 102 : if (sync_standbys[i].is_me)
568 : {
569 102 : *am_sync = true;
570 CBC 102 : break;
571 ECB : }
572 : }
573 EUB :
574 : /*
575 : * Nothing more to do if we are not managing a sync standby or there are
576 : * not enough synchronous standbys.
577 : */
578 GIC 102 : if (!(*am_sync) ||
579 102 : num_standbys < SyncRepConfig->num_sync)
580 : {
581 UIC 0 : pfree(sync_standbys);
582 0 : return false;
583 : }
584 :
585 : /*
586 : * In a priority-based sync replication, the synced positions are the
587 : * oldest ones among sync standbys. In a quorum-based, they are the Nth
588 : * latest ones.
589 : *
590 ECB : * SyncRepGetNthLatestSyncRecPtr() also can calculate the oldest
591 : * positions. But we use SyncRepGetOldestSyncRecPtr() for that calculation
592 : * because it's a bit more efficient.
593 : *
594 : * XXX If the numbers of current and requested sync standbys are the same,
595 : * we can use SyncRepGetOldestSyncRecPtr() to calculate the synced
596 : * positions even in a quorum-based sync replication.
597 EUB : */
598 GIC 102 : if (SyncRepConfig->syncrep_method == SYNC_REP_PRIORITY)
599 EUB : {
600 GIC 102 : SyncRepGetOldestSyncRecPtr(writePtr, flushPtr, applyPtr,
601 : sync_standbys, num_standbys);
602 ECB : }
603 : else
604 : {
605 UIC 0 : SyncRepGetNthLatestSyncRecPtr(writePtr, flushPtr, applyPtr,
606 : sync_standbys, num_standbys,
607 0 : SyncRepConfig->num_sync);
608 : }
609 :
610 CBC 102 : pfree(sync_standbys);
611 GIC 102 : return true;
612 : }
613 :
614 : /*
615 : * Calculate the oldest Write, Flush and Apply positions among sync standbys.
616 : */
617 : static void
618 102 : SyncRepGetOldestSyncRecPtr(XLogRecPtr *writePtr,
619 : XLogRecPtr *flushPtr,
620 : XLogRecPtr *applyPtr,
621 : SyncRepStandbyData *sync_standbys,
622 : int num_standbys)
623 ECB : {
624 : int i;
625 :
626 : /*
627 : * Scan through all sync standbys and calculate the oldest Write, Flush
628 : * and Apply positions. We assume *writePtr et al were initialized to
629 : * InvalidXLogRecPtr.
630 : */
631 CBC 204 : for (i = 0; i < num_standbys; i++)
632 ECB : {
633 CBC 102 : XLogRecPtr write = sync_standbys[i].write;
634 102 : XLogRecPtr flush = sync_standbys[i].flush;
635 GIC 102 : XLogRecPtr apply = sync_standbys[i].apply;
636 ECB :
637 GIC 102 : if (XLogRecPtrIsInvalid(*writePtr) || *writePtr > write)
638 102 : *writePtr = write;
639 102 : if (XLogRecPtrIsInvalid(*flushPtr) || *flushPtr > flush)
640 102 : *flushPtr = flush;
641 102 : if (XLogRecPtrIsInvalid(*applyPtr) || *applyPtr > apply)
642 102 : *applyPtr = apply;
643 EUB : }
644 GIC 102 : }
645 :
646 : /*
647 : * Calculate the Nth latest Write, Flush and Apply positions among sync
648 : * standbys.
649 : */
650 : static void
651 UIC 0 : SyncRepGetNthLatestSyncRecPtr(XLogRecPtr *writePtr,
652 : XLogRecPtr *flushPtr,
653 : XLogRecPtr *applyPtr,
654 : SyncRepStandbyData *sync_standbys,
655 : int num_standbys,
656 EUB : uint8 nth)
657 : {
658 : XLogRecPtr *write_array;
659 : XLogRecPtr *flush_array;
660 : XLogRecPtr *apply_array;
661 : int i;
662 :
663 : /* Should have enough candidates, or somebody messed up */
664 UBC 0 : Assert(nth > 0 && nth <= num_standbys);
665 EUB :
666 UBC 0 : write_array = (XLogRecPtr *) palloc(sizeof(XLogRecPtr) * num_standbys);
667 UIC 0 : flush_array = (XLogRecPtr *) palloc(sizeof(XLogRecPtr) * num_standbys);
668 0 : apply_array = (XLogRecPtr *) palloc(sizeof(XLogRecPtr) * num_standbys);
669 :
670 UBC 0 : for (i = 0; i < num_standbys; i++)
671 EUB : {
672 UBC 0 : write_array[i] = sync_standbys[i].write;
673 UIC 0 : flush_array[i] = sync_standbys[i].flush;
674 0 : apply_array[i] = sync_standbys[i].apply;
675 EUB : }
676 :
677 : /* Sort each array in descending order */
678 UIC 0 : qsort(write_array, num_standbys, sizeof(XLogRecPtr), cmp_lsn);
679 UBC 0 : qsort(flush_array, num_standbys, sizeof(XLogRecPtr), cmp_lsn);
680 0 : qsort(apply_array, num_standbys, sizeof(XLogRecPtr), cmp_lsn);
681 EUB :
682 : /* Get Nth latest Write, Flush, Apply positions */
683 UIC 0 : *writePtr = write_array[nth - 1];
684 0 : *flushPtr = flush_array[nth - 1];
685 0 : *applyPtr = apply_array[nth - 1];
686 :
687 0 : pfree(write_array);
688 UBC 0 : pfree(flush_array);
689 UIC 0 : pfree(apply_array);
690 UBC 0 : }
691 EUB :
692 : /*
693 : * Compare lsn in order to sort array in descending order.
694 : */
695 : static int
696 UBC 0 : cmp_lsn(const void *a, const void *b)
697 : {
698 0 : XLogRecPtr lsn1 = *((const XLogRecPtr *) a);
699 UIC 0 : XLogRecPtr lsn2 = *((const XLogRecPtr *) b);
700 :
701 0 : if (lsn1 > lsn2)
702 0 : return -1;
703 0 : else if (lsn1 == lsn2)
704 0 : return 0;
705 : else
706 0 : return 1;
707 : }
708 :
709 ECB : /*
710 : * Return data about walsenders that are candidates to be sync standbys.
711 : *
712 : * *standbys is set to a palloc'd array of structs of per-walsender data,
713 : * and the number of valid entries (candidate sync senders) is returned.
714 : * (This might be more or fewer than num_sync; caller must check.)
715 : */
716 : int
717 GIC 655 : SyncRepGetCandidateStandbys(SyncRepStandbyData **standbys)
718 : {
719 ECB : int i;
720 : int n;
721 :
722 : /* Create result array */
723 CBC 655 : *standbys = (SyncRepStandbyData *)
724 655 : palloc(max_wal_senders * sizeof(SyncRepStandbyData));
725 :
726 : /* Quick exit if sync replication is not requested */
727 GIC 655 : if (SyncRepConfig == NULL)
728 536 : return 0;
729 :
730 : /* Collect raw data from shared memory */
731 CBC 119 : n = 0;
732 1309 : for (i = 0; i < max_wal_senders; i++)
733 : {
734 ECB : volatile WalSnd *walsnd; /* Use volatile pointer to prevent code
735 : * rearrangement */
736 : SyncRepStandbyData *stby;
737 : WalSndState state; /* not included in SyncRepStandbyData */
738 :
739 CBC 1190 : walsnd = &WalSndCtl->walsnds[i];
740 1190 : stby = *standbys + n;
741 ECB :
742 GIC 1190 : SpinLockAcquire(&walsnd->mutex);
743 1190 : stby->pid = walsnd->pid;
744 CBC 1190 : state = walsnd->state;
745 1190 : stby->write = walsnd->write;
746 GIC 1190 : stby->flush = walsnd->flush;
747 1190 : stby->apply = walsnd->apply;
748 CBC 1190 : stby->sync_standby_priority = walsnd->sync_standby_priority;
749 GIC 1190 : SpinLockRelease(&walsnd->mutex);
750 EUB :
751 : /* Must be active */
752 GIC 1190 : if (stby->pid == 0)
753 CBC 1043 : continue;
754 ECB :
755 : /* Must be streaming or stopping */
756 GIC 147 : if (state != WALSNDSTATE_STREAMING &&
757 ECB : state != WALSNDSTATE_STOPPING)
758 UBC 0 : continue;
759 :
760 : /* Must be synchronous */
761 CBC 147 : if (stby->sync_standby_priority == 0)
762 7 : continue;
763 ECB :
764 : /* Must have a valid flush position */
765 GIC 140 : if (XLogRecPtrIsInvalid(stby->flush))
766 UIC 0 : continue;
767 :
768 : /* OK, it's a candidate */
769 GIC 140 : stby->walsnd_index = i;
770 140 : stby->is_me = (walsnd == MyWalSnd);
771 CBC 140 : n++;
772 ECB : }
773 :
774 : /*
775 : * In quorum mode, we return all the candidates. In priority mode, if we
776 : * have too many candidates then return only the num_sync ones of highest
777 : * priority.
778 : */
779 GIC 119 : if (SyncRepConfig->syncrep_method == SYNC_REP_PRIORITY &&
780 118 : n > SyncRepConfig->num_sync)
781 ECB : {
782 : /* Sort by priority ... */
783 GIC 8 : qsort(*standbys, n, sizeof(SyncRepStandbyData),
784 : standby_priority_comparator);
785 : /* ... then report just the first num_sync ones */
786 8 : n = SyncRepConfig->num_sync;
787 : }
788 ECB :
789 GIC 119 : return n;
790 ECB : }
791 :
792 : /*
793 : * qsort comparator to sort SyncRepStandbyData entries by priority
794 : */
795 : static int
796 GIC 19 : standby_priority_comparator(const void *a, const void *b)
797 : {
798 19 : const SyncRepStandbyData *sa = (const SyncRepStandbyData *) a;
799 19 : const SyncRepStandbyData *sb = (const SyncRepStandbyData *) b;
800 :
801 : /* First, sort by increasing priority value */
802 CBC 19 : if (sa->sync_standby_priority != sb->sync_standby_priority)
803 GIC 9 : return sa->sync_standby_priority - sb->sync_standby_priority;
804 :
805 : /*
806 : * We might have equal priority values; arbitrarily break ties by position
807 : * in the WALSnd array. (This is utterly bogus, since that is arrival
808 : * order dependent, but there are regression tests that rely on it.)
809 : */
810 10 : return sa->walsnd_index - sb->walsnd_index;
811 : }
812 :
813 :
814 : /*
815 ECB : * Check if we are in the list of sync standbys, and if so, determine
816 : * priority sequence. Return priority if set, or zero to indicate that
817 : * we are not a potential sync standby.
818 : *
819 : * Compare the parameter SyncRepStandbyNames against the application_name
820 : * for this WALSender, or allow any name if we find a wildcard "*".
821 : */
822 : static int
823 GIC 509 : SyncRepGetStandbyPriority(void)
824 : {
825 ECB : const char *standby_name;
826 : int priority;
827 GIC 509 : bool found = false;
828 ECB :
829 : /*
830 : * Since synchronous cascade replication is not allowed, we always set the
831 : * priority of cascading walsender to zero.
832 : */
833 GIC 509 : if (am_cascading_walsender)
834 CBC 33 : return 0;
835 ECB :
836 GIC 476 : if (!SyncStandbysDefined() || SyncRepConfig == NULL)
837 CBC 453 : return 0;
838 ECB :
839 GIC 23 : standby_name = SyncRepConfig->member_names;
840 CBC 31 : for (priority = 1; priority <= SyncRepConfig->nmembers; priority++)
841 : {
842 GIC 30 : if (pg_strcasecmp(standby_name, application_name) == 0 ||
843 CBC 18 : strcmp(standby_name, "*") == 0)
844 ECB : {
845 GIC 22 : found = true;
846 22 : break;
847 : }
848 8 : standby_name += strlen(standby_name) + 1;
849 : }
850 ECB :
851 GIC 23 : if (!found)
852 1 : return 0;
853 :
854 : /*
855 : * In quorum-based sync replication, all the standbys in the list have the
856 : * same priority, one.
857 : */
858 22 : return (SyncRepConfig->syncrep_method == SYNC_REP_PRIORITY) ? priority : 1;
859 : }
860 :
861 : /*
862 ECB : * Walk the specified queue from head. Set the state of any backends that
863 : * need to be woken, remove them from the queue, and then wake them.
864 : * Pass all = true to wake whole queue; otherwise, just wake up to
865 : * the walsender's LSN.
866 : *
867 : * The caller must hold SyncRepLock in exclusive mode.
868 : */
869 : static int
870 CBC 118 : SyncRepWakeQueue(bool all, int mode)
871 : {
872 118 : volatile WalSndCtlData *walsndctl = WalSndCtl;
873 GIC 118 : int numprocs = 0;
874 : dlist_mutable_iter iter;
875 :
876 118 : Assert(mode >= 0 && mode < NUM_SYNC_REP_WAIT_MODE);
877 118 : Assert(LWLockHeldByMeInMode(SyncRepLock, LW_EXCLUSIVE));
878 CBC 118 : Assert(SyncRepQueueIsOrderedByLSN(mode));
879 ECB :
880 GNC 132 : dlist_foreach_modify(iter, &WalSndCtl->SyncRepQueue[mode])
881 : {
882 19 : PGPROC *proc = dlist_container(PGPROC, syncRepLinks, iter.cur);
883 :
884 : /*
885 : * Assume the queue is ordered by LSN
886 : */
887 GIC 19 : if (!all && walsndctl->lsn[mode] < proc->waitLSN)
888 5 : return numprocs;
889 ECB :
890 : /*
891 : * Remove from queue.
892 : */
893 GNC 14 : dlist_delete_thoroughly(&proc->syncRepLinks);
894 :
895 : /*
896 ECB : * SyncRepWaitForLSN() reads syncRepState without holding the lock, so
897 : * make sure that it sees the queue link being removed before the
898 : * syncRepState change.
899 : */
900 GIC 14 : pg_write_barrier();
901 :
902 : /*
903 : * Set state to complete; see SyncRepWaitForLSN() for discussion of
904 : * the various states.
905 : */
906 GNC 14 : proc->syncRepState = SYNC_REP_WAIT_COMPLETE;
907 ECB :
908 : /*
909 : * Wake only when we have set state and removed from queue.
910 : */
911 GNC 14 : SetLatch(&(proc->procLatch));
912 :
913 CBC 14 : numprocs++;
914 : }
915 :
916 GIC 113 : return numprocs;
917 : }
918 :
919 : /*
920 ECB : * The checkpointer calls this as needed to update the shared
921 : * sync_standbys_defined flag, so that backends don't remain permanently wedged
922 : * if synchronous_standby_names is unset. It's safe to check the current value
923 : * without the lock, because it's only ever updated by one process. But we
924 EUB : * must take the lock to change it.
925 : */
926 : void
927 GIC 391 : SyncRepUpdateSyncStandbysDefined(void)
928 : {
929 391 : bool sync_standbys_defined = SyncStandbysDefined();
930 :
931 391 : if (sync_standbys_defined != WalSndCtl->sync_standbys_defined)
932 : {
933 8 : LWLockAcquire(SyncRepLock, LW_EXCLUSIVE);
934 :
935 ECB : /*
936 : * If synchronous_standby_names has been reset to empty, it's futile
937 : * for backends to continue waiting. Since the user no longer wants
938 : * synchronous replication, we'd better wake them up.
939 : */
940 GIC 8 : if (!sync_standbys_defined)
941 : {
942 : int i;
943 ECB :
944 UIC 0 : for (i = 0; i < NUM_SYNC_REP_WAIT_MODE; i++)
945 0 : SyncRepWakeQueue(true, i);
946 : }
947 :
948 ECB : /*
949 : * Only allow people to join the queue when there are synchronous
950 : * standbys defined. Without this interlock, there's a race
951 : * condition: we might wake up all the current waiters; then, some
952 : * backend that hasn't yet reloaded its config might go to sleep on
953 : * the queue (and never wake up). This prevents that.
954 : */
955 GIC 8 : WalSndCtl->sync_standbys_defined = sync_standbys_defined;
956 :
957 8 : LWLockRelease(SyncRepLock);
958 : }
959 391 : }
960 ECB :
961 EUB : #ifdef USE_ASSERT_CHECKING
962 : static bool
963 CBC 152 : SyncRepQueueIsOrderedByLSN(int mode)
964 : {
965 ECB : XLogRecPtr lastLSN;
966 : dlist_iter iter;
967 :
968 GIC 152 : Assert(mode >= 0 && mode < NUM_SYNC_REP_WAIT_MODE);
969 :
970 152 : lastLSN = 0;
971 :
972 GNC 205 : dlist_foreach(iter, &WalSndCtl->SyncRepQueue[mode])
973 ECB : {
974 GNC 53 : PGPROC *proc = dlist_container(PGPROC, syncRepLinks, iter.cur);
975 :
976 : /*
977 ECB : * Check the queue is ordered by LSN and that multiple procs don't
978 : * have matching LSNs
979 : */
980 GIC 53 : if (proc->waitLSN <= lastLSN)
981 UIC 0 : return false;
982 :
983 CBC 53 : lastLSN = proc->waitLSN;
984 ECB : }
985 :
986 GIC 152 : return true;
987 ECB : }
988 : #endif
989 EUB :
990 : /*
991 : * ===========================================================
992 : * Synchronous Replication functions executed by any process
993 : * ===========================================================
994 : */
995 :
996 : bool
997 CBC 1926 : check_synchronous_standby_names(char **newval, void **extra, GucSource source)
998 : {
999 GBC 1926 : if (*newval != NULL && (*newval)[0] != '\0')
1000 67 : {
1001 EUB : int parse_rc;
1002 : SyncRepConfigData *pconf;
1003 :
1004 : /* Reset communication variables to ensure a fresh start */
1005 GIC 67 : syncrep_parse_result = NULL;
1006 CBC 67 : syncrep_parse_error_msg = NULL;
1007 ECB :
1008 EUB : /* Parse the synchronous_standby_names string */
1009 CBC 67 : syncrep_scanner_init(*newval);
1010 GIC 67 : parse_rc = syncrep_yyparse();
1011 CBC 67 : syncrep_scanner_finish();
1012 :
1013 GIC 67 : if (parse_rc != 0 || syncrep_parse_result == NULL)
1014 : {
1015 UIC 0 : GUC_check_errcode(ERRCODE_SYNTAX_ERROR);
1016 0 : if (syncrep_parse_error_msg)
1017 0 : GUC_check_errdetail("%s", syncrep_parse_error_msg);
1018 : else
1019 0 : GUC_check_errdetail("synchronous_standby_names parser failed");
1020 0 : return false;
1021 : }
1022 ECB :
1023 GIC 67 : if (syncrep_parse_result->num_sync <= 0)
1024 ECB : {
1025 UIC 0 : GUC_check_errmsg("number of synchronous standbys (%d) must be greater than zero",
1026 0 : syncrep_parse_result->num_sync);
1027 0 : return false;
1028 ECB : }
1029 :
1030 : /* GUC extra value must be guc_malloc'd, not palloc'd */
1031 : pconf = (SyncRepConfigData *)
1032 GNC 67 : guc_malloc(LOG, syncrep_parse_result->config_size);
1033 GIC 67 : if (pconf == NULL)
1034 LBC 0 : return false;
1035 GIC 67 : memcpy(pconf, syncrep_parse_result, syncrep_parse_result->config_size);
1036 ECB :
1037 GIC 67 : *extra = (void *) pconf;
1038 EUB :
1039 : /*
1040 : * We need not explicitly clean up syncrep_parse_result. It, and any
1041 ECB : * other cruft generated during parsing, will be freed when the
1042 : * current memory context is deleted. (This code is generally run in
1043 : * a short-lived context used for config file processing, so that will
1044 EUB : * not be very long.)
1045 : */
1046 : }
1047 ECB : else
1048 CBC 1859 : *extra = NULL;
1049 ECB :
1050 GIC 1926 : return true;
1051 ECB : }
1052 :
1053 : void
1054 GIC 1916 : assign_synchronous_standby_names(const char *newval, void *extra)
1055 : {
1056 1916 : SyncRepConfig = (SyncRepConfigData *) extra;
1057 1916 : }
1058 :
1059 : void
1060 2335 : assign_synchronous_commit(int newval, void *extra)
1061 : {
1062 2335 : switch (newval)
1063 : {
1064 UIC 0 : case SYNCHRONOUS_COMMIT_REMOTE_WRITE:
1065 0 : SyncRepWaitMode = SYNC_REP_WAIT_WRITE;
1066 0 : break;
1067 GIC 1976 : case SYNCHRONOUS_COMMIT_REMOTE_FLUSH:
1068 1976 : SyncRepWaitMode = SYNC_REP_WAIT_FLUSH;
1069 1976 : break;
1070 UIC 0 : case SYNCHRONOUS_COMMIT_REMOTE_APPLY:
1071 0 : SyncRepWaitMode = SYNC_REP_WAIT_APPLY;
1072 0 : break;
1073 GIC 359 : default:
1074 359 : SyncRepWaitMode = SYNC_REP_NO_WAIT;
1075 359 : break;
1076 : }
1077 2335 : }
|