Age Owner Branch data TLA Line data Source code
1 : : /*-------------------------------------------------------------------------
2 : : *
3 : : * syncrep.c
4 : : *
5 : : * Synchronous replication is new as of PostgreSQL 9.1.
6 : : *
7 : : * If requested, transaction commits wait until their commit LSN are
8 : : * acknowledged by the synchronous standbys.
9 : : *
10 : : * This module contains the code for waiting and release of backends.
11 : : * All code in this module executes on the primary. The core streaming
12 : : * replication transport remains within WALreceiver/WALsender modules.
13 : : *
14 : : * The essence of this design is that it isolates all logic about
15 : : * waiting/releasing onto the primary. The primary defines which standbys
16 : : * it wishes to wait for. The standbys are completely unaware of the
17 : : * durability requirements of transactions on the primary, reducing the
18 : : * complexity of the code and streamlining both standby operations and
19 : : * network bandwidth because there is no requirement to ship
20 : : * per-transaction state information.
21 : : *
22 : : * Replication is either synchronous or not synchronous (async). If it is
23 : : * async, we just fastpath out of here. If it is sync, then we wait for
24 : : * the write, flush or apply location on the standby before releasing
25 : : * the waiting backend. Further complexity in that interaction is
26 : : * expected in later releases.
27 : : *
28 : : * The best performing way to manage the waiting backends is to have a
29 : : * single ordered queue of waiting backends, so that we can avoid
30 : : * searching the through all waiters each time we receive a reply.
31 : : *
32 : : * In 9.5 or before only a single standby could be considered as
33 : : * synchronous. In 9.6 we support a priority-based multiple synchronous
34 : : * standbys. In 10.0 a quorum-based multiple synchronous standbys is also
35 : : * supported. The number of synchronous standbys that transactions
36 : : * must wait for replies from is specified in synchronous_standby_names.
37 : : * This parameter also specifies a list of standby names and the method
38 : : * (FIRST and ANY) to choose synchronous standbys from the listed ones.
39 : : *
40 : : * The method FIRST specifies a priority-based synchronous replication
41 : : * and makes transaction commits wait until their WAL records are
42 : : * replicated to the requested number of synchronous standbys chosen based
43 : : * on their priorities. The standbys whose names appear earlier in the list
44 : : * are given higher priority and will be considered as synchronous.
45 : : * Other standby servers appearing later in this list represent potential
46 : : * synchronous standbys. If any of the current synchronous standbys
47 : : * disconnects for whatever reason, it will be replaced immediately with
48 : : * the next-highest-priority standby.
49 : : *
50 : : * The method ANY specifies a quorum-based synchronous replication
51 : : * and makes transaction commits wait until their WAL records are
52 : : * replicated to at least the requested number of synchronous standbys
53 : : * in the list. All the standbys appearing in the list are considered as
54 : : * candidates for quorum synchronous standbys.
55 : : *
56 : : * If neither FIRST nor ANY is specified, FIRST is used as the method.
57 : : * This is for backward compatibility with 9.6 or before where only a
58 : : * priority-based sync replication was supported.
59 : : *
60 : : * Before the standbys chosen from synchronous_standby_names can
61 : : * become the synchronous standbys they must have caught up with
62 : : * the primary; that may take some time. Once caught up,
63 : : * the standbys which are considered as synchronous at that moment
64 : : * will release waiters from the queue.
65 : : *
66 : : * Portions Copyright (c) 2010-2024, PostgreSQL Global Development Group
67 : : *
68 : : * IDENTIFICATION
69 : : * src/backend/replication/syncrep.c
70 : : *
71 : : *-------------------------------------------------------------------------
72 : : */
73 : : #include "postgres.h"
74 : :
75 : : #include <unistd.h>
76 : :
77 : : #include "access/xact.h"
78 : : #include "common/int.h"
79 : : #include "miscadmin.h"
80 : : #include "pgstat.h"
81 : : #include "replication/syncrep.h"
82 : : #include "replication/walsender.h"
83 : : #include "replication/walsender_private.h"
84 : : #include "storage/proc.h"
85 : : #include "tcop/tcopprot.h"
86 : : #include "utils/guc_hooks.h"
87 : : #include "utils/ps_status.h"
88 : :
89 : : /* User-settable parameters for sync rep */
90 : : char *SyncRepStandbyNames;
91 : :
92 : : #define SyncStandbysDefined() \
93 : : (SyncRepStandbyNames != NULL && SyncRepStandbyNames[0] != '\0')
94 : :
95 : : static bool announce_next_takeover = true;
96 : :
97 : : SyncRepConfigData *SyncRepConfig = NULL;
98 : : static int SyncRepWaitMode = SYNC_REP_NO_WAIT;
99 : :
100 : : static void SyncRepQueueInsert(int mode);
101 : : static void SyncRepCancelWait(void);
102 : : static int SyncRepWakeQueue(bool all, int mode);
103 : :
104 : : static bool SyncRepGetSyncRecPtr(XLogRecPtr *writePtr,
105 : : XLogRecPtr *flushPtr,
106 : : XLogRecPtr *applyPtr,
107 : : bool *am_sync);
108 : : static void SyncRepGetOldestSyncRecPtr(XLogRecPtr *writePtr,
109 : : XLogRecPtr *flushPtr,
110 : : XLogRecPtr *applyPtr,
111 : : SyncRepStandbyData *sync_standbys,
112 : : int num_standbys);
113 : : static void SyncRepGetNthLatestSyncRecPtr(XLogRecPtr *writePtr,
114 : : XLogRecPtr *flushPtr,
115 : : XLogRecPtr *applyPtr,
116 : : SyncRepStandbyData *sync_standbys,
117 : : int num_standbys,
118 : : uint8 nth);
119 : : static int SyncRepGetStandbyPriority(void);
120 : : static int standby_priority_comparator(const void *a, const void *b);
121 : : static int cmp_lsn(const void *a, const void *b);
122 : :
123 : : #ifdef USE_ASSERT_CHECKING
124 : : static bool SyncRepQueueIsOrderedByLSN(int mode);
125 : : #endif
126 : :
127 : : /*
128 : : * ===========================================================
129 : : * Synchronous Replication functions for normal user backends
130 : : * ===========================================================
131 : : */
132 : :
133 : : /*
134 : : * Wait for synchronous replication, if requested by user.
135 : : *
136 : : * Initially backends start in state SYNC_REP_NOT_WAITING and then
137 : : * change that state to SYNC_REP_WAITING before adding ourselves
138 : : * to the wait queue. During SyncRepWakeQueue() a WALSender changes
139 : : * the state to SYNC_REP_WAIT_COMPLETE once replication is confirmed.
140 : : * This backend then resets its state to SYNC_REP_NOT_WAITING.
141 : : *
142 : : * 'lsn' represents the LSN to wait for. 'commit' indicates whether this LSN
143 : : * represents a commit record. If it doesn't, then we wait only for the WAL
144 : : * to be flushed if synchronous_commit is set to the higher level of
145 : : * remote_apply, because only commit records provide apply feedback.
146 : : */
147 : : void
2938 rhaas@postgresql.org 148 :CBC 106381 : SyncRepWaitForLSN(XLogRecPtr lsn, bool commit)
149 : : {
150 : : int mode;
151 : :
152 : : /*
153 : : * This should be called while holding interrupts during a transaction
154 : : * commit to prevent the follow-up shared memory queue cleanups to be
155 : : * influenced by external interruptions.
156 : : */
1626 michael@paquier.xyz 157 [ - + ]: 106381 : Assert(InterruptHoldoffCount > 0);
158 : :
159 : : /*
160 : : * Fast exit if user has not requested sync replication, or there are no
161 : : * sync replication standby names defined.
162 : : *
163 : : * Since this routine gets called every commit time, it's important to
164 : : * exit quickly if sync replication is not requested. So we check
165 : : * WalSndCtl->sync_standbys_defined flag without the lock and exit
166 : : * immediately if it's false. If it's true, we need to check it again
167 : : * later while holding the lock, to check the flag and operate the sync
168 : : * rep queue atomically. This is necessary to avoid the race condition
169 : : * described in SyncRepUpdateSyncStandbysDefined(). On the other hand, if
170 : : * it's false, the lock is not necessary because we don't touch the queue.
171 : : */
1320 fujii@postgresql.org 172 [ + + + + ]: 106381 : if (!SyncRepRequested() ||
173 [ + + ]: 80980 : !((volatile WalSndCtlData *) WalSndCtl)->sync_standbys_defined)
174 : 106345 : return;
175 : :
176 : : /* Cap the level for anything other than commit to remote flush only. */
2938 rhaas@postgresql.org 177 [ + + ]: 36 : if (commit)
178 : 20 : mode = SyncRepWaitMode;
179 : : else
180 : 16 : mode = Min(SyncRepWaitMode, SYNC_REP_WAIT_FLUSH);
181 : :
452 andres@anarazel.de 182 [ - + ]: 36 : Assert(dlist_node_is_detached(&MyProc->syncRepLinks));
4777 rhaas@postgresql.org 183 [ - + ]: 36 : Assert(WalSndCtl != NULL);
184 : :
185 : 36 : LWLockAcquire(SyncRepLock, LW_EXCLUSIVE);
186 [ - + ]: 36 : Assert(MyProc->syncRepState == SYNC_REP_NOT_WAITING);
187 : :
188 : : /*
189 : : * We don't wait for sync rep if WalSndCtl->sync_standbys_defined is not
190 : : * set. See SyncRepUpdateSyncStandbysDefined.
191 : : *
192 : : * Also check that the standby hasn't already replied. Unlikely race
193 : : * condition but we'll be fetching that cache line anyway so it's likely
194 : : * to be a low cost check.
195 : : */
4768 simon@2ndQuadrant.co 196 [ + - ]: 36 : if (!WalSndCtl->sync_standbys_defined ||
2938 rhaas@postgresql.org 197 [ - + ]: 36 : lsn <= WalSndCtl->lsn[mode])
198 : : {
4777 rhaas@postgresql.org 199 :LBC (2) : LWLockRelease(SyncRepLock);
200 : (2) : return;
201 : : }
202 : :
203 : : /*
204 : : * Set our waitLSN so WALSender will know when to wake us, and add
205 : : * ourselves to the queue.
206 : : */
2938 rhaas@postgresql.org 207 :CBC 36 : MyProc->waitLSN = lsn;
4777 208 : 36 : MyProc->syncRepState = SYNC_REP_WAITING;
4458 simon@2ndQuadrant.co 209 : 36 : SyncRepQueueInsert(mode);
210 [ - + ]: 36 : Assert(SyncRepQueueIsOrderedByLSN(mode));
4777 rhaas@postgresql.org 211 : 36 : LWLockRelease(SyncRepLock);
212 : :
213 : : /* Alter ps display to show waiting for sync rep. */
214 [ + - ]: 36 : if (update_process_title)
215 : : {
216 : : char buffer[32];
217 : :
419 drowley@postgresql.o 218 : 36 : sprintf(buffer, "waiting for %X/%X", LSN_FORMAT_ARGS(lsn));
219 : 36 : set_ps_display_suffix(buffer);
220 : : }
221 : :
222 : : /*
223 : : * Wait for specified LSN to be confirmed.
224 : : *
225 : : * Each proc has its own wait latch, so we perform a normal latch
226 : : * check/wait loop here.
227 : : */
228 : : for (;;)
4788 simon@2ndQuadrant.co 229 : 36 : {
230 : : int rc;
231 : :
232 : : /* Must reset the latch before testing state. */
3378 andres@anarazel.de 233 : 72 : ResetLatch(MyLatch);
234 : :
235 : : /*
236 : : * Acquiring the lock is not needed, the latch ensures proper
237 : : * barriers. If it looks like we're done, we must really be done,
238 : : * because once walsender changes the state to SYNC_REP_WAIT_COMPLETE,
239 : : * it will never update it again, so we can't be seeing a stale value
240 : : * in that case.
241 : : */
2802 simon@2ndQuadrant.co 242 [ + + ]: 72 : if (MyProc->syncRepState == SYNC_REP_WAIT_COMPLETE)
4777 rhaas@postgresql.org 243 : 36 : break;
244 : :
245 : : /*
246 : : * If a wait for synchronous replication is pending, we can neither
247 : : * acknowledge the commit nor raise ERROR or FATAL. The latter would
248 : : * lead the client to believe that the transaction aborted, which is
249 : : * not true: it's already committed locally. The former is no good
250 : : * either: the client has requested synchronous replication, and is
251 : : * entitled to assume that an acknowledged commit is also replicated,
252 : : * which might not be true. So in this case we issue a WARNING (which
253 : : * some clients may be able to interpret) and shut off further output.
254 : : * We do NOT reset ProcDiePending, so that the process will die after
255 : : * the commit is cleaned up.
256 : : */
257 [ - + ]: 36 : if (ProcDiePending)
258 : : {
4777 rhaas@postgresql.org 259 [ # # ]:UBC 0 : ereport(WARNING,
260 : : (errcode(ERRCODE_ADMIN_SHUTDOWN),
261 : : errmsg("canceling the wait for synchronous replication and terminating connection due to administrator command"),
262 : : errdetail("The transaction has already committed locally, but might not have been replicated to the standby.")));
263 : 0 : whereToSendOutput = DestNone;
264 : 0 : SyncRepCancelWait();
265 : 0 : break;
266 : : }
267 : :
268 : : /*
269 : : * It's unclear what to do if a query cancel interrupt arrives. We
270 : : * can't actually abort at this point, but ignoring the interrupt
271 : : * altogether is not helpful, so we just terminate the wait with a
272 : : * suitable warning.
273 : : */
4777 rhaas@postgresql.org 274 [ - + ]:CBC 36 : if (QueryCancelPending)
275 : : {
4777 rhaas@postgresql.org 276 :UBC 0 : QueryCancelPending = false;
277 [ # # ]: 0 : ereport(WARNING,
278 : : (errmsg("canceling wait for synchronous replication due to user request"),
279 : : errdetail("The transaction has already committed locally, but might not have been replicated to the standby.")));
280 : 0 : SyncRepCancelWait();
281 : 0 : break;
282 : : }
283 : :
284 : : /*
285 : : * Wait on latch. Any condition that should wake us up will set the
286 : : * latch, so no need for timeout.
287 : : */
1969 tmunro@postgresql.or 288 :CBC 36 : rc = WaitLatch(MyLatch, WL_LATCH_SET | WL_POSTMASTER_DEATH, -1,
289 : : WAIT_EVENT_SYNC_REP);
290 : :
291 : : /*
292 : : * If the postmaster dies, we'll probably never get an acknowledgment,
293 : : * because all the wal sender processes will exit. So just bail out.
294 : : */
295 [ - + ]: 36 : if (rc & WL_POSTMASTER_DEATH)
296 : : {
4777 rhaas@postgresql.org 297 :UBC 0 : ProcDiePending = true;
298 : 0 : whereToSendOutput = DestNone;
299 : 0 : SyncRepCancelWait();
300 : 0 : break;
301 : : }
302 : : }
303 : :
304 : : /*
305 : : * WalSender has checked our LSN and has removed us from queue. Clean up
306 : : * state and leave. It's OK to reset these shared memory fields without
307 : : * holding SyncRepLock, because any walsenders will ignore us anyway when
308 : : * we're not on the queue. We need a read barrier to make sure we see the
309 : : * changes to the queue link (this might be unnecessary without
310 : : * assertions, but better safe than sorry).
311 : : */
2468 heikki.linnakangas@i 312 :CBC 36 : pg_read_barrier();
452 andres@anarazel.de 313 [ - + ]: 36 : Assert(dlist_node_is_detached(&MyProc->syncRepLinks));
4777 rhaas@postgresql.org 314 : 36 : MyProc->syncRepState = SYNC_REP_NOT_WAITING;
4312 heikki.linnakangas@i 315 : 36 : MyProc->waitLSN = 0;
316 : :
317 : : /* reset ps display to remove the suffix */
419 drowley@postgresql.o 318 [ + - ]: 36 : if (update_process_title)
319 : 36 : set_ps_display_remove_suffix();
320 : : }
321 : :
322 : : /*
323 : : * Insert MyProc into the specified SyncRepQueue, maintaining sorted invariant.
324 : : *
325 : : * Usually we will go at tail of queue, though it's possible that we arrive
326 : : * here out of order, so start at tail and work back to insertion point.
327 : : */
328 : : static void
4464 simon@2ndQuadrant.co 329 : 36 : SyncRepQueueInsert(int mode)
330 : : {
331 : : dlist_head *queue;
332 : : dlist_iter iter;
333 : :
334 [ + - - + ]: 36 : Assert(mode >= 0 && mode < NUM_SYNC_REP_WAIT_MODE);
452 andres@anarazel.de 335 : 36 : queue = &WalSndCtl->SyncRepQueue[mode];
336 : :
337 [ + - - + ]: 36 : dlist_reverse_foreach(iter, queue)
338 : : {
452 andres@anarazel.de 339 :UBC 0 : PGPROC *proc = dlist_container(PGPROC, syncRepLinks, iter.cur);
340 : :
341 : : /*
342 : : * Stop at the queue element that we should insert after to ensure the
343 : : * queue is ordered by LSN.
344 : : */
4125 alvherre@alvh.no-ip. 345 [ # # ]: 0 : if (proc->waitLSN < MyProc->waitLSN)
346 : : {
452 andres@anarazel.de 347 : 0 : dlist_insert_after(&proc->syncRepLinks, &MyProc->syncRepLinks);
348 : 0 : return;
349 : : }
350 : : }
351 : :
352 : : /*
353 : : * If we get here, the list was either empty, or this process needs to be
354 : : * at the head.
355 : : */
452 andres@anarazel.de 356 :CBC 36 : dlist_push_head(queue, &MyProc->syncRepLinks);
357 : : }
358 : :
359 : : /*
360 : : * Acquire SyncRepLock and cancel any wait currently in progress.
361 : : */
362 : : static void
4777 rhaas@postgresql.org 363 :UBC 0 : SyncRepCancelWait(void)
364 : : {
365 : 0 : LWLockAcquire(SyncRepLock, LW_EXCLUSIVE);
452 andres@anarazel.de 366 [ # # ]: 0 : if (!dlist_node_is_detached(&MyProc->syncRepLinks))
367 : 0 : dlist_delete_thoroughly(&MyProc->syncRepLinks);
4777 rhaas@postgresql.org 368 : 0 : MyProc->syncRepState = SYNC_REP_NOT_WAITING;
369 : 0 : LWLockRelease(SyncRepLock);
370 : 0 : }
371 : :
372 : : void
4631 tgl@sss.pgh.pa.us 373 :CBC 15793 : SyncRepCleanupAtProcExit(void)
374 : : {
375 : : /*
376 : : * First check if we are removed from the queue without the lock to not
377 : : * slow down backend exit.
378 : : */
452 andres@anarazel.de 379 [ - + ]: 15793 : if (!dlist_node_is_detached(&MyProc->syncRepLinks))
380 : : {
4788 simon@2ndQuadrant.co 381 :UBC 0 : LWLockAcquire(SyncRepLock, LW_EXCLUSIVE);
382 : :
383 : : /* maybe we have just been removed, so recheck */
452 andres@anarazel.de 384 [ # # ]: 0 : if (!dlist_node_is_detached(&MyProc->syncRepLinks))
385 : 0 : dlist_delete_thoroughly(&MyProc->syncRepLinks);
386 : :
4788 simon@2ndQuadrant.co 387 : 0 : LWLockRelease(SyncRepLock);
388 : : }
4788 simon@2ndQuadrant.co 389 :CBC 15793 : }
390 : :
391 : : /*
392 : : * ===========================================================
393 : : * Synchronous Replication functions for wal sender processes
394 : : * ===========================================================
395 : : */
396 : :
397 : : /*
398 : : * Take any action required to initialise sync rep state from config
399 : : * data. Called at WALSender startup and after each SIGHUP.
400 : : */
401 : : void
402 : 667 : SyncRepInitConfig(void)
403 : : {
404 : : int priority;
405 : :
406 : : /*
407 : : * Determine if we are a potential sync standby and remember the result
408 : : * for handling replies from standby.
409 : : */
410 : 667 : priority = SyncRepGetStandbyPriority();
411 [ + + ]: 667 : if (MyWalSnd->sync_standby_priority != priority)
412 : : {
1457 tgl@sss.pgh.pa.us 413 [ - + ]: 43 : SpinLockAcquire(&MyWalSnd->mutex);
4788 simon@2ndQuadrant.co 414 : 43 : MyWalSnd->sync_standby_priority = priority;
1457 tgl@sss.pgh.pa.us 415 : 43 : SpinLockRelease(&MyWalSnd->mutex);
416 : :
4788 simon@2ndQuadrant.co 417 [ - + ]: 43 : ereport(DEBUG1,
418 : : (errmsg_internal("standby \"%s\" now has synchronous standby priority %d",
419 : : application_name, priority)));
420 : : }
421 : 667 : }
422 : :
423 : : /*
424 : : * Update the LSNs on each queue based upon our latest state. This
425 : : * implements a simple policy of first-valid-sync-standby-releases-waiter.
426 : : *
427 : : * Other policies are possible, which would change what we do here and
428 : : * perhaps also which information we store as well.
429 : : */
430 : : void
431 : 101251 : SyncRepReleaseWaiters(void)
432 : : {
433 : 101251 : volatile WalSndCtlData *walsndctl = WalSndCtl;
434 : : XLogRecPtr writePtr;
435 : : XLogRecPtr flushPtr;
436 : : XLogRecPtr applyPtr;
437 : : bool got_recptr;
438 : : bool am_sync;
4464 439 : 101251 : int numwrite = 0;
440 : 101251 : int numflush = 0;
2938 rhaas@postgresql.org 441 : 101251 : int numapply = 0;
442 : :
443 : : /*
444 : : * If this WALSender is serving a standby that is not on the list of
445 : : * potential sync standbys then we have nothing to do. If we are still
446 : : * starting up, still running base backup or the current flush position is
447 : : * still invalid, then leave quickly also. Streaming or stopping WAL
448 : : * senders are allowed to release waiters.
449 : : */
4788 simon@2ndQuadrant.co 450 [ + + ]: 101251 : if (MyWalSnd->sync_standby_priority == 0 ||
1963 michael@paquier.xyz 451 [ + + ]: 199 : (MyWalSnd->state != WALSNDSTATE_STREAMING &&
452 [ + + ]: 41 : MyWalSnd->state != WALSNDSTATE_STOPPING) ||
4302 magnus@hagander.net 453 [ - + ]: 183 : XLogRecPtrIsInvalid(MyWalSnd->flush))
454 : : {
2930 fujii@postgresql.org 455 : 101068 : announce_next_takeover = true;
4788 simon@2ndQuadrant.co 456 : 101069 : return;
457 : : }
458 : :
459 : : /*
460 : : * We're a potential sync standby. Release waiters if there are enough
461 : : * sync standbys and we are considered as sync.
462 : : */
463 : 183 : LWLockAcquire(SyncRepLock, LW_EXCLUSIVE);
464 : :
465 : : /*
466 : : * Check whether we are a sync standby or not, and calculate the synced
467 : : * positions among all sync standbys. (Note: although this step does not
468 : : * of itself require holding SyncRepLock, it seems like a good idea to do
469 : : * it after acquiring the lock. This ensures that the WAL pointers we use
470 : : * to release waiters are newer than any previous execution of this
471 : : * routine used.)
472 : : */
2673 fujii@postgresql.org 473 : 183 : got_recptr = SyncRepGetSyncRecPtr(&writePtr, &flushPtr, &applyPtr, &am_sync);
474 : :
475 : : /*
476 : : * If we are managing a sync standby, though we weren't prior to this,
477 : : * then announce we are now a sync standby.
478 : : */
2930 479 [ + + + + ]: 183 : if (announce_next_takeover && am_sync)
480 : : {
481 : 16 : announce_next_takeover = false;
482 : :
2673 483 [ + - ]: 16 : if (SyncRepConfig->syncrep_method == SYNC_REP_PRIORITY)
484 [ + - ]: 16 : ereport(LOG,
485 : : (errmsg("standby \"%s\" is now a synchronous standby with priority %d",
486 : : application_name, MyWalSnd->sync_standby_priority)));
487 : : else
2673 fujii@postgresql.org 488 [ # # ]:UBC 0 : ereport(LOG,
489 : : (errmsg("standby \"%s\" is now a candidate for quorum synchronous standby",
490 : : application_name)));
491 : : }
492 : :
493 : : /*
494 : : * If the number of sync standbys is less than requested or we aren't
495 : : * managing a sync standby then just leave.
496 : : */
2673 fujii@postgresql.org 497 [ + + - + ]:CBC 183 : if (!got_recptr || !am_sync)
498 : : {
4788 simon@2ndQuadrant.co 499 : 1 : LWLockRelease(SyncRepLock);
2930 fujii@postgresql.org 500 : 1 : announce_next_takeover = !am_sync;
4788 simon@2ndQuadrant.co 501 : 1 : return;
502 : : }
503 : :
504 : : /*
505 : : * Set the lsn first so that when we wake backends they will release up to
506 : : * this location.
507 : : */
2930 fujii@postgresql.org 508 [ + + ]: 182 : if (walsndctl->lsn[SYNC_REP_WAIT_WRITE] < writePtr)
509 : : {
510 : 67 : walsndctl->lsn[SYNC_REP_WAIT_WRITE] = writePtr;
4464 simon@2ndQuadrant.co 511 : 67 : numwrite = SyncRepWakeQueue(false, SYNC_REP_WAIT_WRITE);
512 : : }
2930 fujii@postgresql.org 513 [ + + ]: 182 : if (walsndctl->lsn[SYNC_REP_WAIT_FLUSH] < flushPtr)
514 : : {
515 : 77 : walsndctl->lsn[SYNC_REP_WAIT_FLUSH] = flushPtr;
4464 simon@2ndQuadrant.co 516 : 77 : numflush = SyncRepWakeQueue(false, SYNC_REP_WAIT_FLUSH);
517 : : }
2930 fujii@postgresql.org 518 [ + + ]: 182 : if (walsndctl->lsn[SYNC_REP_WAIT_APPLY] < applyPtr)
519 : : {
520 : 76 : walsndctl->lsn[SYNC_REP_WAIT_APPLY] = applyPtr;
2938 rhaas@postgresql.org 521 : 76 : numapply = SyncRepWakeQueue(false, SYNC_REP_WAIT_APPLY);
522 : : }
523 : :
4788 simon@2ndQuadrant.co 524 : 182 : LWLockRelease(SyncRepLock);
525 : :
2930 fujii@postgresql.org 526 [ - + ]: 182 : elog(DEBUG3, "released %d procs up to write %X/%X, %d procs up to flush %X/%X, %d procs up to apply %X/%X",
527 : : numwrite, LSN_FORMAT_ARGS(writePtr),
528 : : numflush, LSN_FORMAT_ARGS(flushPtr),
529 : : numapply, LSN_FORMAT_ARGS(applyPtr));
530 : : }
531 : :
532 : : /*
533 : : * Calculate the synced Write, Flush and Apply positions among sync standbys.
534 : : *
535 : : * Return false if the number of sync standbys is less than
536 : : * synchronous_standby_names specifies. Otherwise return true and
537 : : * store the positions into *writePtr, *flushPtr and *applyPtr.
538 : : *
539 : : * On return, *am_sync is set to true if this walsender is connecting to
540 : : * sync standby. Otherwise it's set to false.
541 : : */
542 : : static bool
2673 543 : 183 : SyncRepGetSyncRecPtr(XLogRecPtr *writePtr, XLogRecPtr *flushPtr,
544 : : XLogRecPtr *applyPtr, bool *am_sync)
545 : : {
546 : : SyncRepStandbyData *sync_standbys;
547 : : int num_standbys;
548 : : int i;
549 : :
550 : : /* Initialize default results */
2930 551 : 183 : *writePtr = InvalidXLogRecPtr;
552 : 183 : *flushPtr = InvalidXLogRecPtr;
553 : 183 : *applyPtr = InvalidXLogRecPtr;
554 : 183 : *am_sync = false;
555 : :
556 : : /* Quick out if not even configured to be synchronous */
1457 tgl@sss.pgh.pa.us 557 [ - + ]: 183 : if (SyncRepConfig == NULL)
1457 tgl@sss.pgh.pa.us 558 :UBC 0 : return false;
559 : :
560 : : /* Get standbys that are considered as synchronous at this moment */
1457 tgl@sss.pgh.pa.us 561 :CBC 183 : num_standbys = SyncRepGetCandidateStandbys(&sync_standbys);
562 : :
563 : : /* Am I among the candidate sync standbys? */
564 [ + + ]: 184 : for (i = 0; i < num_standbys; i++)
565 : : {
566 [ + + ]: 183 : if (sync_standbys[i].is_me)
567 : : {
568 : 182 : *am_sync = true;
569 : 182 : break;
570 : : }
571 : : }
572 : :
573 : : /*
574 : : * Nothing more to do if we are not managing a sync standby or there are
575 : : * not enough synchronous standbys.
576 : : */
2909 577 [ + + ]: 183 : if (!(*am_sync) ||
1457 578 [ - + ]: 182 : num_standbys < SyncRepConfig->num_sync)
579 : : {
580 : 1 : pfree(sync_standbys);
2930 fujii@postgresql.org 581 : 1 : return false;
582 : : }
583 : :
584 : : /*
585 : : * In a priority-based sync replication, the synced positions are the
586 : : * oldest ones among sync standbys. In a quorum-based, they are the Nth
587 : : * latest ones.
588 : : *
589 : : * SyncRepGetNthLatestSyncRecPtr() also can calculate the oldest
590 : : * positions. But we use SyncRepGetOldestSyncRecPtr() for that calculation
591 : : * because it's a bit more efficient.
592 : : *
593 : : * XXX If the numbers of current and requested sync standbys are the same,
594 : : * we can use SyncRepGetOldestSyncRecPtr() to calculate the synced
595 : : * positions even in a quorum-based sync replication.
596 : : */
2673 597 [ + - ]: 182 : if (SyncRepConfig->syncrep_method == SYNC_REP_PRIORITY)
598 : : {
599 : 182 : SyncRepGetOldestSyncRecPtr(writePtr, flushPtr, applyPtr,
600 : : sync_standbys, num_standbys);
601 : : }
602 : : else
603 : : {
2673 fujii@postgresql.org 604 :UBC 0 : SyncRepGetNthLatestSyncRecPtr(writePtr, flushPtr, applyPtr,
605 : : sync_standbys, num_standbys,
1457 tgl@sss.pgh.pa.us 606 : 0 : SyncRepConfig->num_sync);
607 : : }
608 : :
1457 tgl@sss.pgh.pa.us 609 :CBC 182 : pfree(sync_standbys);
2673 fujii@postgresql.org 610 : 182 : return true;
611 : : }
612 : :
613 : : /*
614 : : * Calculate the oldest Write, Flush and Apply positions among sync standbys.
615 : : */
616 : : static void
1457 tgl@sss.pgh.pa.us 617 : 182 : SyncRepGetOldestSyncRecPtr(XLogRecPtr *writePtr,
618 : : XLogRecPtr *flushPtr,
619 : : XLogRecPtr *applyPtr,
620 : : SyncRepStandbyData *sync_standbys,
621 : : int num_standbys)
622 : : {
623 : : int i;
624 : :
625 : : /*
626 : : * Scan through all sync standbys and calculate the oldest Write, Flush
627 : : * and Apply positions. We assume *writePtr et al were initialized to
628 : : * InvalidXLogRecPtr.
629 : : */
630 [ + + ]: 365 : for (i = 0; i < num_standbys; i++)
631 : : {
632 : 183 : XLogRecPtr write = sync_standbys[i].write;
633 : 183 : XLogRecPtr flush = sync_standbys[i].flush;
634 : 183 : XLogRecPtr apply = sync_standbys[i].apply;
635 : :
2930 fujii@postgresql.org 636 [ + + - + ]: 183 : if (XLogRecPtrIsInvalid(*writePtr) || *writePtr > write)
637 : 182 : *writePtr = write;
638 [ + + - + ]: 183 : if (XLogRecPtrIsInvalid(*flushPtr) || *flushPtr > flush)
639 : 182 : *flushPtr = flush;
640 [ + + - + ]: 183 : if (XLogRecPtrIsInvalid(*applyPtr) || *applyPtr > apply)
641 : 182 : *applyPtr = apply;
642 : : }
2673 643 : 182 : }
644 : :
645 : : /*
646 : : * Calculate the Nth latest Write, Flush and Apply positions among sync
647 : : * standbys.
648 : : */
649 : : static void
1457 tgl@sss.pgh.pa.us 650 :UBC 0 : SyncRepGetNthLatestSyncRecPtr(XLogRecPtr *writePtr,
651 : : XLogRecPtr *flushPtr,
652 : : XLogRecPtr *applyPtr,
653 : : SyncRepStandbyData *sync_standbys,
654 : : int num_standbys,
655 : : uint8 nth)
656 : : {
657 : : XLogRecPtr *write_array;
658 : : XLogRecPtr *flush_array;
659 : : XLogRecPtr *apply_array;
660 : : int i;
661 : :
662 : : /* Should have enough candidates, or somebody messed up */
663 [ # # # # ]: 0 : Assert(nth > 0 && nth <= num_standbys);
664 : :
665 : 0 : write_array = (XLogRecPtr *) palloc(sizeof(XLogRecPtr) * num_standbys);
666 : 0 : flush_array = (XLogRecPtr *) palloc(sizeof(XLogRecPtr) * num_standbys);
667 : 0 : apply_array = (XLogRecPtr *) palloc(sizeof(XLogRecPtr) * num_standbys);
668 : :
669 [ # # ]: 0 : for (i = 0; i < num_standbys; i++)
670 : : {
671 : 0 : write_array[i] = sync_standbys[i].write;
672 : 0 : flush_array[i] = sync_standbys[i].flush;
673 : 0 : apply_array[i] = sync_standbys[i].apply;
674 : : }
675 : :
676 : : /* Sort each array in descending order */
677 : 0 : qsort(write_array, num_standbys, sizeof(XLogRecPtr), cmp_lsn);
678 : 0 : qsort(flush_array, num_standbys, sizeof(XLogRecPtr), cmp_lsn);
679 : 0 : qsort(apply_array, num_standbys, sizeof(XLogRecPtr), cmp_lsn);
680 : :
681 : : /* Get Nth latest Write, Flush, Apply positions */
2673 fujii@postgresql.org 682 : 0 : *writePtr = write_array[nth - 1];
683 : 0 : *flushPtr = flush_array[nth - 1];
684 : 0 : *applyPtr = apply_array[nth - 1];
685 : :
686 : 0 : pfree(write_array);
687 : 0 : pfree(flush_array);
688 : 0 : pfree(apply_array);
689 : 0 : }
690 : :
691 : : /*
692 : : * Compare lsn in order to sort array in descending order.
693 : : */
694 : : static int
695 : 0 : cmp_lsn(const void *a, const void *b)
696 : : {
2524 bruce@momjian.us 697 : 0 : XLogRecPtr lsn1 = *((const XLogRecPtr *) a);
698 : 0 : XLogRecPtr lsn2 = *((const XLogRecPtr *) b);
699 : :
58 nathan@postgresql.or 700 :UNC 0 : return pg_cmp_u64(lsn2, lsn1);
701 : : }
702 : :
703 : : /*
704 : : * Return data about walsenders that are candidates to be sync standbys.
705 : : *
706 : : * *standbys is set to a palloc'd array of structs of per-walsender data,
707 : : * and the number of valid entries (candidate sync senders) is returned.
708 : : * (This might be more or fewer than num_sync; caller must check.)
709 : : */
710 : : int
1457 tgl@sss.pgh.pa.us 711 :CBC 887 : SyncRepGetCandidateStandbys(SyncRepStandbyData **standbys)
712 : : {
713 : : int i;
714 : : int n;
715 : :
716 : : /* Create result array */
717 : 887 : *standbys = (SyncRepStandbyData *)
718 : 887 : palloc(max_wal_senders * sizeof(SyncRepStandbyData));
719 : :
720 : : /* Quick exit if sync replication is not requested */
2673 fujii@postgresql.org 721 [ + + ]: 887 : if (SyncRepConfig == NULL)
1457 tgl@sss.pgh.pa.us 722 : 687 : return 0;
723 : :
724 : : /* Collect raw data from shared memory */
725 : 200 : n = 0;
2673 fujii@postgresql.org 726 [ + + ]: 2200 : for (i = 0; i < max_wal_senders; i++)
727 : : {
728 : : volatile WalSnd *walsnd; /* Use volatile pointer to prevent code
729 : : * rearrangement */
730 : : SyncRepStandbyData *stby;
731 : : WalSndState state; /* not included in SyncRepStandbyData */
732 : :
733 : 2000 : walsnd = &WalSndCtl->walsnds[i];
1457 tgl@sss.pgh.pa.us 734 : 2000 : stby = *standbys + n;
735 : :
2480 alvherre@alvh.no-ip. 736 [ - + ]: 2000 : SpinLockAcquire(&walsnd->mutex);
1457 tgl@sss.pgh.pa.us 737 : 2000 : stby->pid = walsnd->pid;
2480 alvherre@alvh.no-ip. 738 : 2000 : state = walsnd->state;
1457 tgl@sss.pgh.pa.us 739 : 2000 : stby->write = walsnd->write;
740 : 2000 : stby->flush = walsnd->flush;
741 : 2000 : stby->apply = walsnd->apply;
742 : 2000 : stby->sync_standby_priority = walsnd->sync_standby_priority;
2480 alvherre@alvh.no-ip. 743 : 2000 : SpinLockRelease(&walsnd->mutex);
744 : :
745 : : /* Must be active */
1457 tgl@sss.pgh.pa.us 746 [ + + ]: 2000 : if (stby->pid == 0)
2673 fujii@postgresql.org 747 : 1765 : continue;
748 : :
749 : : /* Must be streaming or stopping */
1963 michael@paquier.xyz 750 [ + + - + ]: 235 : if (state != WALSNDSTATE_STREAMING &&
751 : : state != WALSNDSTATE_STOPPING)
2673 fujii@postgresql.org 752 :UBC 0 : continue;
753 : :
754 : : /* Must be synchronous */
1457 tgl@sss.pgh.pa.us 755 [ + + ]:CBC 235 : if (stby->sync_standby_priority == 0)
2673 fujii@postgresql.org 756 : 11 : continue;
757 : :
758 : : /* Must have a valid flush position */
1457 tgl@sss.pgh.pa.us 759 [ - + ]: 224 : if (XLogRecPtrIsInvalid(stby->flush))
2673 fujii@postgresql.org 760 :UBC 0 : continue;
761 : :
762 : : /* OK, it's a candidate */
1457 tgl@sss.pgh.pa.us 763 :CBC 224 : stby->walsnd_index = i;
764 : 224 : stby->is_me = (walsnd == MyWalSnd);
765 : 224 : n++;
766 : : }
767 : :
768 : : /*
769 : : * In quorum mode, we return all the candidates. In priority mode, if we
770 : : * have too many candidates then return only the num_sync ones of highest
771 : : * priority.
772 : : */
773 [ + + ]: 200 : if (SyncRepConfig->syncrep_method == SYNC_REP_PRIORITY &&
774 [ + + ]: 199 : n > SyncRepConfig->num_sync)
775 : : {
776 : : /* Sort by priority ... */
777 : 10 : qsort(*standbys, n, sizeof(SyncRepStandbyData),
778 : : standby_priority_comparator);
779 : : /* ... then report just the first num_sync ones */
780 : 10 : n = SyncRepConfig->num_sync;
781 : : }
782 : :
783 : 200 : return n;
784 : : }
785 : :
786 : : /*
787 : : * qsort comparator to sort SyncRepStandbyData entries by priority
788 : : */
789 : : static int
790 : 23 : standby_priority_comparator(const void *a, const void *b)
791 : : {
792 : 23 : const SyncRepStandbyData *sa = (const SyncRepStandbyData *) a;
793 : 23 : const SyncRepStandbyData *sb = (const SyncRepStandbyData *) b;
794 : :
795 : : /* First, sort by increasing priority value */
796 [ + + ]: 23 : if (sa->sync_standby_priority != sb->sync_standby_priority)
797 : 12 : return sa->sync_standby_priority - sb->sync_standby_priority;
798 : :
799 : : /*
800 : : * We might have equal priority values; arbitrarily break ties by position
801 : : * in the WalSnd array. (This is utterly bogus, since that is arrival
802 : : * order dependent, but there are regression tests that rely on it.)
803 : : */
804 : 11 : return sa->walsnd_index - sb->walsnd_index;
805 : : }
806 : :
807 : :
808 : : /*
809 : : * Check if we are in the list of sync standbys, and if so, determine
810 : : * priority sequence. Return priority if set, or zero to indicate that
811 : : * we are not a potential sync standby.
812 : : *
813 : : * Compare the parameter SyncRepStandbyNames against the application_name
814 : : * for this WALSender, or allow any name if we find a wildcard "*".
815 : : */
816 : : static int
4788 simon@2ndQuadrant.co 817 : 667 : SyncRepGetStandbyPriority(void)
818 : : {
819 : : const char *standby_name;
820 : : int priority;
821 : 667 : bool found = false;
822 : :
823 : : /*
824 : : * Since synchronous cascade replication is not allowed, we always set the
825 : : * priority of cascading walsender to zero.
826 : : */
4653 827 [ + + ]: 667 : if (am_cascading_walsender)
828 : 28 : return 0;
829 : :
2909 tgl@sss.pgh.pa.us 830 [ + - + + : 639 : if (!SyncStandbysDefined() || SyncRepConfig == NULL)
- + ]
4788 simon@2ndQuadrant.co 831 : 585 : return 0;
832 : :
2909 tgl@sss.pgh.pa.us 833 : 54 : standby_name = SyncRepConfig->member_names;
834 [ + + ]: 84 : for (priority = 1; priority <= SyncRepConfig->nmembers; priority++)
835 : : {
4788 simon@2ndQuadrant.co 836 [ + + ]: 77 : if (pg_strcasecmp(standby_name, application_name) == 0 ||
2909 tgl@sss.pgh.pa.us 837 [ + + ]: 50 : strcmp(standby_name, "*") == 0)
838 : : {
4788 simon@2ndQuadrant.co 839 : 47 : found = true;
840 : 47 : break;
841 : : }
2909 tgl@sss.pgh.pa.us 842 : 30 : standby_name += strlen(standby_name) + 1;
843 : : }
844 : :
2545 fujii@postgresql.org 845 [ + + ]: 54 : if (!found)
846 : 7 : return 0;
847 : :
848 : : /*
849 : : * In quorum-based sync replication, all the standbys in the list have the
850 : : * same priority, one.
851 : : */
852 [ + + ]: 47 : return (SyncRepConfig->syncrep_method == SYNC_REP_PRIORITY) ? priority : 1;
853 : : }
854 : :
855 : : /*
856 : : * Walk the specified queue from head. Set the state of any backends that
857 : : * need to be woken, remove them from the queue, and then wake them.
858 : : * Pass all = true to wake whole queue; otherwise, just wake up to
859 : : * the walsender's LSN.
860 : : *
861 : : * The caller must hold SyncRepLock in exclusive mode.
862 : : */
863 : : static int
4464 simon@2ndQuadrant.co 864 : 226 : SyncRepWakeQueue(bool all, int mode)
865 : : {
4788 866 : 226 : volatile WalSndCtlData *walsndctl = WalSndCtl;
4753 bruce@momjian.us 867 : 226 : int numprocs = 0;
868 : : dlist_mutable_iter iter;
869 : :
4464 simon@2ndQuadrant.co 870 [ + - - + ]: 226 : Assert(mode >= 0 && mode < NUM_SYNC_REP_WAIT_MODE);
1626 michael@paquier.xyz 871 [ - + ]: 226 : Assert(LWLockHeldByMeInMode(SyncRepLock, LW_EXCLUSIVE));
4464 simon@2ndQuadrant.co 872 [ - + ]: 226 : Assert(SyncRepQueueIsOrderedByLSN(mode));
873 : :
452 andres@anarazel.de 874 [ + - + + ]: 270 : dlist_foreach_modify(iter, &WalSndCtl->SyncRepQueue[mode])
875 : : {
331 tgl@sss.pgh.pa.us 876 : 55 : PGPROC *proc = dlist_container(PGPROC, syncRepLinks, iter.cur);
877 : :
878 : : /*
879 : : * Assume the queue is ordered by LSN
880 : : */
4125 alvherre@alvh.no-ip. 881 [ + - + + ]: 55 : if (!all && walsndctl->lsn[mode] < proc->waitLSN)
4788 simon@2ndQuadrant.co 882 : 11 : return numprocs;
883 : :
884 : : /*
885 : : * Remove from queue.
886 : : */
452 andres@anarazel.de 887 : 44 : dlist_delete_thoroughly(&proc->syncRepLinks);
888 : :
889 : : /*
890 : : * SyncRepWaitForLSN() reads syncRepState without holding the lock, so
891 : : * make sure that it sees the queue link being removed before the
892 : : * syncRepState change.
893 : : */
2468 heikki.linnakangas@i 894 : 44 : pg_write_barrier();
895 : :
896 : : /*
897 : : * Set state to complete; see SyncRepWaitForLSN() for discussion of
898 : : * the various states.
899 : : */
452 andres@anarazel.de 900 : 44 : proc->syncRepState = SYNC_REP_WAIT_COMPLETE;
901 : :
902 : : /*
903 : : * Wake only when we have set state and removed from queue.
904 : : */
905 : 44 : SetLatch(&(proc->procLatch));
906 : :
4788 simon@2ndQuadrant.co 907 : 44 : numprocs++;
908 : : }
909 : :
910 : 215 : return numprocs;
911 : : }
912 : :
913 : : /*
914 : : * The checkpointer calls this as needed to update the shared
915 : : * sync_standbys_defined flag, so that backends don't remain permanently wedged
916 : : * if synchronous_standby_names is unset. It's safe to check the current value
917 : : * without the lock, because it's only ever updated by one process. But we
918 : : * must take the lock to change it.
919 : : */
920 : : void
4777 rhaas@postgresql.org 921 : 863 : SyncRepUpdateSyncStandbysDefined(void)
922 : : {
923 [ + - + + ]: 863 : bool sync_standbys_defined = SyncStandbysDefined();
924 : :
925 [ + + ]: 863 : if (sync_standbys_defined != WalSndCtl->sync_standbys_defined)
926 : : {
927 : 21 : LWLockAcquire(SyncRepLock, LW_EXCLUSIVE);
928 : :
929 : : /*
930 : : * If synchronous_standby_names has been reset to empty, it's futile
931 : : * for backends to continue waiting. Since the user no longer wants
932 : : * synchronous replication, we'd better wake them up.
933 : : */
934 [ + + ]: 21 : if (!sync_standbys_defined)
935 : : {
936 : : int i;
937 : :
4464 simon@2ndQuadrant.co 938 [ + + ]: 8 : for (i = 0; i < NUM_SYNC_REP_WAIT_MODE; i++)
939 : 6 : SyncRepWakeQueue(true, i);
940 : : }
941 : :
942 : : /*
943 : : * Only allow people to join the queue when there are synchronous
944 : : * standbys defined. Without this interlock, there's a race
945 : : * condition: we might wake up all the current waiters; then, some
946 : : * backend that hasn't yet reloaded its config might go to sleep on
947 : : * the queue (and never wake up). This prevents that.
948 : : */
4777 rhaas@postgresql.org 949 : 21 : WalSndCtl->sync_standbys_defined = sync_standbys_defined;
950 : :
951 : 21 : LWLockRelease(SyncRepLock);
952 : : }
953 : 863 : }
954 : :
955 : : #ifdef USE_ASSERT_CHECKING
956 : : static bool
4464 simon@2ndQuadrant.co 957 : 262 : SyncRepQueueIsOrderedByLSN(int mode)
958 : : {
959 : : XLogRecPtr lastLSN;
960 : : dlist_iter iter;
961 : :
962 [ + - - + ]: 262 : Assert(mode >= 0 && mode < NUM_SYNC_REP_WAIT_MODE);
963 : :
4312 heikki.linnakangas@i 964 : 262 : lastLSN = 0;
965 : :
452 andres@anarazel.de 966 [ + - + + ]: 353 : dlist_foreach(iter, &WalSndCtl->SyncRepQueue[mode])
967 : : {
968 : 91 : PGPROC *proc = dlist_container(PGPROC, syncRepLinks, iter.cur);
969 : :
970 : : /*
971 : : * Check the queue is ordered by LSN and that multiple procs don't
972 : : * have matching LSNs
973 : : */
4125 alvherre@alvh.no-ip. 974 [ - + ]: 91 : if (proc->waitLSN <= lastLSN)
4788 simon@2ndQuadrant.co 975 :UBC 0 : return false;
976 : :
4788 simon@2ndQuadrant.co 977 :CBC 91 : lastLSN = proc->waitLSN;
978 : : }
979 : :
980 : 262 : return true;
981 : : }
982 : : #endif
983 : :
984 : : /*
985 : : * ===========================================================
986 : : * Synchronous Replication functions executed by any process
987 : : * ===========================================================
988 : : */
989 : :
990 : : bool
4756 tgl@sss.pgh.pa.us 991 : 1079 : check_synchronous_standby_names(char **newval, void **extra, GucSource source)
992 : : {
2930 fujii@postgresql.org 993 [ + - + + ]: 1079 : if (*newval != NULL && (*newval)[0] != '\0')
4788 simon@2ndQuadrant.co 994 : 136 : {
995 : : int parse_rc;
996 : : SyncRepConfigData *pconf;
997 : :
998 : : /* Reset communication variables to ensure a fresh start */
2909 tgl@sss.pgh.pa.us 999 : 136 : syncrep_parse_result = NULL;
1000 : 136 : syncrep_parse_error_msg = NULL;
1001 : :
1002 : : /* Parse the synchronous_standby_names string */
2930 fujii@postgresql.org 1003 : 136 : syncrep_scanner_init(*newval);
1004 : 136 : parse_rc = syncrep_yyparse();
1005 : 136 : syncrep_scanner_finish();
1006 : :
2909 tgl@sss.pgh.pa.us 1007 [ + - - + ]: 136 : if (parse_rc != 0 || syncrep_parse_result == NULL)
1008 : : {
2930 fujii@postgresql.org 1009 :UBC 0 : GUC_check_errcode(ERRCODE_SYNTAX_ERROR);
2909 tgl@sss.pgh.pa.us 1010 [ # # ]: 0 : if (syncrep_parse_error_msg)
1011 : 0 : GUC_check_errdetail("%s", syncrep_parse_error_msg);
1012 : : else
1013 : 0 : GUC_check_errdetail("synchronous_standby_names parser failed");
2930 fujii@postgresql.org 1014 : 0 : return false;
1015 : : }
1016 : :
2675 fujii@postgresql.org 1017 [ - + ]:CBC 136 : if (syncrep_parse_result->num_sync <= 0)
1018 : : {
2675 fujii@postgresql.org 1019 :UBC 0 : GUC_check_errmsg("number of synchronous standbys (%d) must be greater than zero",
1020 : 0 : syncrep_parse_result->num_sync);
1021 : 0 : return false;
1022 : : }
1023 : :
1024 : : /* GUC extra value must be guc_malloc'd, not palloc'd */
1025 : : pconf = (SyncRepConfigData *)
548 tgl@sss.pgh.pa.us 1026 :CBC 136 : guc_malloc(LOG, syncrep_parse_result->config_size);
2909 1027 [ - + ]: 136 : if (pconf == NULL)
2909 tgl@sss.pgh.pa.us 1028 :UBC 0 : return false;
2909 tgl@sss.pgh.pa.us 1029 :CBC 136 : memcpy(pconf, syncrep_parse_result, syncrep_parse_result->config_size);
1030 : :
1031 : 136 : *extra = (void *) pconf;
1032 : :
1033 : : /*
1034 : : * We need not explicitly clean up syncrep_parse_result. It, and any
1035 : : * other cruft generated during parsing, will be freed when the
1036 : : * current memory context is deleted. (This code is generally run in
1037 : : * a short-lived context used for config file processing, so that will
1038 : : * not be very long.)
1039 : : */
1040 : : }
1041 : : else
1042 : 943 : *extra = NULL;
1043 : :
4756 1044 : 1079 : return true;
1045 : : }
1046 : :
1047 : : void
2909 1048 : 1069 : assign_synchronous_standby_names(const char *newval, void *extra)
1049 : : {
1050 : 1069 : SyncRepConfig = (SyncRepConfigData *) extra;
1051 : 1069 : }
1052 : :
1053 : : void
4464 simon@2ndQuadrant.co 1054 : 2292 : assign_synchronous_commit(int newval, void *extra)
1055 : : {
1056 [ - + - + ]: 2292 : switch (newval)
1057 : : {
4464 simon@2ndQuadrant.co 1058 :UBC 0 : case SYNCHRONOUS_COMMIT_REMOTE_WRITE:
1059 : 0 : SyncRepWaitMode = SYNC_REP_WAIT_WRITE;
1060 : 0 : break;
4464 simon@2ndQuadrant.co 1061 :CBC 1047 : case SYNCHRONOUS_COMMIT_REMOTE_FLUSH:
1062 : 1047 : SyncRepWaitMode = SYNC_REP_WAIT_FLUSH;
1063 : 1047 : break;
2938 rhaas@postgresql.org 1064 :UBC 0 : case SYNCHRONOUS_COMMIT_REMOTE_APPLY:
1065 : 0 : SyncRepWaitMode = SYNC_REP_WAIT_APPLY;
1066 : 0 : break;
4464 simon@2ndQuadrant.co 1067 :CBC 1245 : default:
1068 : 1245 : SyncRepWaitMode = SYNC_REP_NO_WAIT;
1069 : 1245 : break;
1070 : : }
1071 : 2292 : }
|