Age Owner TLA Line data Source code
1 : /*-------------------------------------------------------------------------
2 : *
3 : * syncrep.c
4 : *
5 : * Synchronous replication is new as of PostgreSQL 9.1.
6 : *
7 : * If requested, transaction commits wait until their commit LSN are
8 : * acknowledged by the synchronous standbys.
9 : *
10 : * This module contains the code for waiting and release of backends.
11 : * All code in this module executes on the primary. The core streaming
12 : * replication transport remains within WALreceiver/WALsender modules.
13 : *
14 : * The essence of this design is that it isolates all logic about
15 : * waiting/releasing onto the primary. The primary defines which standbys
16 : * it wishes to wait for. The standbys are completely unaware of the
17 : * durability requirements of transactions on the primary, reducing the
18 : * complexity of the code and streamlining both standby operations and
19 : * network bandwidth because there is no requirement to ship
20 : * per-transaction state information.
21 : *
22 : * Replication is either synchronous or not synchronous (async). If it is
23 : * async, we just fastpath out of here. If it is sync, then we wait for
24 : * the write, flush or apply location on the standby before releasing
25 : * the waiting backend. Further complexity in that interaction is
26 : * expected in later releases.
27 : *
28 : * The best performing way to manage the waiting backends is to have a
29 : * single ordered queue of waiting backends, so that we can avoid
30 : * searching the through all waiters each time we receive a reply.
31 : *
32 : * In 9.5 or before only a single standby could be considered as
33 : * synchronous. In 9.6 we support a priority-based multiple synchronous
34 : * standbys. In 10.0 a quorum-based multiple synchronous standbys is also
35 : * supported. The number of synchronous standbys that transactions
36 : * must wait for replies from is specified in synchronous_standby_names.
37 : * This parameter also specifies a list of standby names and the method
38 : * (FIRST and ANY) to choose synchronous standbys from the listed ones.
39 : *
40 : * The method FIRST specifies a priority-based synchronous replication
41 : * and makes transaction commits wait until their WAL records are
42 : * replicated to the requested number of synchronous standbys chosen based
43 : * on their priorities. The standbys whose names appear earlier in the list
44 : * are given higher priority and will be considered as synchronous.
45 : * Other standby servers appearing later in this list represent potential
46 : * synchronous standbys. If any of the current synchronous standbys
47 : * disconnects for whatever reason, it will be replaced immediately with
48 : * the next-highest-priority standby.
49 : *
50 : * The method ANY specifies a quorum-based synchronous replication
51 : * and makes transaction commits wait until their WAL records are
52 : * replicated to at least the requested number of synchronous standbys
53 : * in the list. All the standbys appearing in the list are considered as
54 : * candidates for quorum synchronous standbys.
55 : *
56 : * If neither FIRST nor ANY is specified, FIRST is used as the method.
57 : * This is for backward compatibility with 9.6 or before where only a
58 : * priority-based sync replication was supported.
59 : *
60 : * Before the standbys chosen from synchronous_standby_names can
61 : * become the synchronous standbys they must have caught up with
62 : * the primary; that may take some time. Once caught up,
63 : * the standbys which are considered as synchronous at that moment
64 : * will release waiters from the queue.
65 : *
66 : * Portions Copyright (c) 2010-2023, PostgreSQL Global Development Group
67 : *
68 : * IDENTIFICATION
69 : * src/backend/replication/syncrep.c
70 : *
71 : *-------------------------------------------------------------------------
72 : */
73 : #include "postgres.h"
74 :
75 : #include <unistd.h>
76 :
77 : #include "access/xact.h"
78 : #include "miscadmin.h"
79 : #include "pgstat.h"
80 : #include "replication/syncrep.h"
81 : #include "replication/walsender.h"
82 : #include "replication/walsender_private.h"
83 : #include "storage/pmsignal.h"
84 : #include "storage/proc.h"
85 : #include "tcop/tcopprot.h"
86 : #include "utils/builtins.h"
87 : #include "utils/guc_hooks.h"
88 : #include "utils/ps_status.h"
89 :
90 : /* User-settable parameters for sync rep */
91 : char *SyncRepStandbyNames;
92 :
93 : #define SyncStandbysDefined() \
94 : (SyncRepStandbyNames != NULL && SyncRepStandbyNames[0] != '\0')
95 :
96 : static bool announce_next_takeover = true;
97 :
98 : SyncRepConfigData *SyncRepConfig = NULL;
99 : static int SyncRepWaitMode = SYNC_REP_NO_WAIT;
100 :
101 : static void SyncRepQueueInsert(int mode);
102 : static void SyncRepCancelWait(void);
103 : static int SyncRepWakeQueue(bool all, int mode);
104 :
105 : static bool SyncRepGetSyncRecPtr(XLogRecPtr *writePtr,
106 : XLogRecPtr *flushPtr,
107 : XLogRecPtr *applyPtr,
108 : bool *am_sync);
109 : static void SyncRepGetOldestSyncRecPtr(XLogRecPtr *writePtr,
110 : XLogRecPtr *flushPtr,
111 : XLogRecPtr *applyPtr,
112 : SyncRepStandbyData *sync_standbys,
113 : int num_standbys);
114 : static void SyncRepGetNthLatestSyncRecPtr(XLogRecPtr *writePtr,
115 : XLogRecPtr *flushPtr,
116 : XLogRecPtr *applyPtr,
117 : SyncRepStandbyData *sync_standbys,
118 : int num_standbys,
119 : uint8 nth);
120 : static int SyncRepGetStandbyPriority(void);
121 : static int standby_priority_comparator(const void *a, const void *b);
122 : static int cmp_lsn(const void *a, const void *b);
123 :
124 : #ifdef USE_ASSERT_CHECKING
125 : static bool SyncRepQueueIsOrderedByLSN(int mode);
126 : #endif
127 :
128 : /*
129 : * ===========================================================
130 : * Synchronous Replication functions for normal user backends
131 : * ===========================================================
132 : */
133 :
134 : /*
135 : * Wait for synchronous replication, if requested by user.
136 : *
137 : * Initially backends start in state SYNC_REP_NOT_WAITING and then
138 : * change that state to SYNC_REP_WAITING before adding ourselves
139 : * to the wait queue. During SyncRepWakeQueue() a WALSender changes
140 : * the state to SYNC_REP_WAIT_COMPLETE once replication is confirmed.
141 : * This backend then resets its state to SYNC_REP_NOT_WAITING.
142 : *
143 : * 'lsn' represents the LSN to wait for. 'commit' indicates whether this LSN
144 : * represents a commit record. If it doesn't, then we wait only for the WAL
145 : * to be flushed if synchronous_commit is set to the higher level of
146 : * remote_apply, because only commit records provide apply feedback.
147 : */
148 : void
2567 rhaas 149 GIC 290343 : SyncRepWaitForLSN(XLogRecPtr lsn, bool commit)
4417 simon 150 ECB : {
151 : int mode;
152 :
153 : /*
154 : * This should be called while holding interrupts during a transaction
155 : * commit to prevent the follow-up shared memory queue cleanups to be
156 : * influenced by external interruptions.
1255 michael 157 : */
1255 michael 158 GIC 290343 : Assert(InterruptHoldoffCount > 0);
159 :
160 : /*
161 : * Fast exit if user has not requested sync replication, or there are no
162 : * sync replication standby names defined.
163 : *
164 : * Since this routine gets called every commit time, it's important to
165 : * exit quickly if sync replication is not requested. So we check
166 : * WalSndCtl->sync_standbys_defined flag without the lock and exit
167 : * immediately if it's false. If it's true, we need to check it again
168 : * later while holding the lock, to check the flag and operate the sync
169 : * rep queue atomically. This is necessary to avoid the race condition
170 : * described in SyncRepUpdateSyncStandbysDefined(). On the other hand, if
171 : * it's false, the lock is not necessary because we don't touch the queue.
949 fujii 172 ECB : */
949 fujii 173 CBC 290343 : if (!SyncRepRequested() ||
174 266442 : !((volatile WalSndCtlData *) WalSndCtl)->sync_standbys_defined)
949 fujii 175 GIC 290299 : return;
176 :
2567 rhaas 177 ECB : /* Cap the level for anything other than commit to remote flush only. */
2567 rhaas 178 CBC 44 : if (commit)
2567 rhaas 179 GIC 22 : mode = SyncRepWaitMode;
2567 rhaas 180 ECB : else
2567 rhaas 181 GIC 22 : mode = Min(SyncRepWaitMode, SYNC_REP_WAIT_FLUSH);
4417 simon 182 ECB :
81 andres 183 GNC 44 : Assert(dlist_node_is_detached(&MyProc->syncRepLinks));
4406 rhaas 184 GIC 44 : Assert(WalSndCtl != NULL);
4406 rhaas 185 ECB :
4406 rhaas 186 CBC 44 : LWLockAcquire(SyncRepLock, LW_EXCLUSIVE);
4406 rhaas 187 GIC 44 : Assert(MyProc->syncRepState == SYNC_REP_NOT_WAITING);
188 :
189 : /*
190 : * We don't wait for sync rep if WalSndCtl->sync_standbys_defined is not
191 : * set. See SyncRepUpdateSyncStandbysDefined.
192 : *
193 : * Also check that the standby hasn't already replied. Unlikely race
194 : * condition but we'll be fetching that cache line anyway so it's likely
195 : * to be a low cost check.
4397 simon 196 ECB : */
4397 simon 197 CBC 44 : if (!WalSndCtl->sync_standbys_defined ||
2567 rhaas 198 GIC 44 : lsn <= WalSndCtl->lsn[mode])
4406 rhaas 199 ECB : {
4406 rhaas 200 CBC 10 : LWLockRelease(SyncRepLock);
4406 rhaas 201 GIC 10 : return;
202 : }
203 :
204 : /*
205 : * Set our waitLSN so WALSender will know when to wake us, and add
206 : * ourselves to the queue.
4397 simon 207 ECB : */
2567 rhaas 208 CBC 34 : MyProc->waitLSN = lsn;
4406 209 34 : MyProc->syncRepState = SYNC_REP_WAITING;
4087 simon 210 34 : SyncRepQueueInsert(mode);
211 34 : Assert(SyncRepQueueIsOrderedByLSN(mode));
4406 rhaas 212 GIC 34 : LWLockRelease(SyncRepLock);
213 :
4406 rhaas 214 ECB : /* Alter ps display to show waiting for sync rep. */
4406 rhaas 215 GIC 34 : if (update_process_title)
216 : {
217 : char buffer[32];
4406 rhaas 218 ECB :
48 drowley 219 GNC 34 : sprintf(buffer, "waiting for %X/%X", LSN_FORMAT_ARGS(lsn));
220 34 : set_ps_display_suffix(buffer);
221 : }
222 :
223 : /*
4417 simon 224 ECB : * Wait for specified LSN to be confirmed.
225 : *
226 : * Each proc has its own wait latch, so we perform a normal latch
227 : * check/wait loop here.
228 : */
229 : for (;;)
4417 simon 230 GIC 34 : {
231 : int rc;
232 :
233 : /* Must reset the latch before testing state. */
3007 andres 234 68 : ResetLatch(MyLatch);
235 :
236 : /*
2428 tgl 237 ECB : * Acquiring the lock is not needed, the latch ensures proper
238 : * barriers. If it looks like we're done, we must really be done,
239 : * because once walsender changes the state to SYNC_REP_WAIT_COMPLETE,
240 : * it will never update it again, so we can't be seeing a stale value
241 : * in that case.
242 : */
2431 simon 243 GIC 68 : if (MyProc->syncRepState == SYNC_REP_WAIT_COMPLETE)
4406 rhaas 244 34 : break;
245 :
246 : /*
247 : * If a wait for synchronous replication is pending, we can neither
248 : * acknowledge the commit nor raise ERROR or FATAL. The latter would
249 : * lead the client to believe that the transaction aborted, which is
250 : * not true: it's already committed locally. The former is no good
251 : * either: the client has requested synchronous replication, and is
4382 bruce 252 ECB : * entitled to assume that an acknowledged commit is also replicated,
253 : * which might not be true. So in this case we issue a WARNING (which
4382 bruce 254 EUB : * some clients may be able to interpret) and shut off further output.
255 : * We do NOT reset ProcDiePending, so that the process will die after
256 : * the commit is cleaned up.
257 : */
4406 rhaas 258 GBC 34 : if (ProcDiePending)
4406 rhaas 259 EUB : {
4406 rhaas 260 UBC 0 : ereport(WARNING,
261 : (errcode(ERRCODE_ADMIN_SHUTDOWN),
262 : errmsg("canceling the wait for synchronous replication and terminating connection due to administrator command"),
263 : errdetail("The transaction has already committed locally, but might not have been replicated to the standby.")));
4406 rhaas 264 UIC 0 : whereToSendOutput = DestNone;
265 0 : SyncRepCancelWait();
266 0 : break;
267 : }
268 :
4406 rhaas 269 ECB : /*
270 : * It's unclear what to do if a query cancel interrupt arrives. We
4406 rhaas 271 EUB : * can't actually abort at this point, but ignoring the interrupt
4382 bruce 272 : * altogether is not helpful, so we just terminate the wait with a
273 : * suitable warning.
274 : */
4406 rhaas 275 GBC 34 : if (QueryCancelPending)
4406 rhaas 276 EUB : {
4406 rhaas 277 UIC 0 : QueryCancelPending = false;
278 0 : ereport(WARNING,
279 : (errmsg("canceling wait for synchronous replication due to user request"),
280 : errdetail("The transaction has already committed locally, but might not have been replicated to the standby.")));
281 0 : SyncRepCancelWait();
282 0 : break;
4406 rhaas 283 ECB : }
284 :
285 : /*
286 : * Wait on latch. Any condition that should wake us up will set the
287 : * latch, so no need for timeout.
288 : */
1598 tmunro 289 GIC 34 : rc = WaitLatch(MyLatch, WL_LATCH_SET | WL_POSTMASTER_DEATH, -1,
1598 tmunro 290 ECB : WAIT_EVENT_SYNC_REP);
291 :
4406 rhaas 292 EUB : /*
1418 tgl 293 : * If the postmaster dies, we'll probably never get an acknowledgment,
294 : * because all the wal sender processes will exit. So just bail out.
4406 rhaas 295 : */
1598 tmunro 296 GIC 34 : if (rc & WL_POSTMASTER_DEATH)
297 : {
4406 rhaas 298 UIC 0 : ProcDiePending = true;
299 0 : whereToSendOutput = DestNone;
300 0 : SyncRepCancelWait();
301 0 : break;
302 : }
303 : }
304 :
305 : /*
306 : * WalSender has checked our LSN and has removed us from queue. Clean up
4406 rhaas 307 ECB : * state and leave. It's OK to reset these shared memory fields without
308 : * holding SyncRepLock, because any walsenders will ignore us anyway when
2064 tgl 309 : * we're not on the queue. We need a read barrier to make sure we see the
310 : * changes to the queue link (this might be unnecessary without
311 : * assertions, but better safe than sorry).
312 : */
2097 heikki.linnakangas 313 CBC 34 : pg_read_barrier();
81 andres 314 GNC 34 : Assert(dlist_node_is_detached(&MyProc->syncRepLinks));
4406 rhaas 315 GIC 34 : MyProc->syncRepState = SYNC_REP_NOT_WAITING;
3941 heikki.linnakangas 316 34 : MyProc->waitLSN = 0;
317 :
318 : /* reset ps display to remove the suffix */
48 drowley 319 GNC 34 : if (update_process_title)
320 34 : set_ps_display_remove_suffix();
4417 simon 321 ECB : }
322 :
323 : /*
324 : * Insert MyProc into the specified SyncRepQueue, maintaining sorted invariant.
325 : *
4413 rhaas 326 : * Usually we will go at tail of queue, though it's possible that we arrive
4417 simon 327 : * here out of order, so start at tail and work back to insertion point.
328 : */
329 : static void
4093 simon 330 GIC 34 : SyncRepQueueInsert(int mode)
4417 simon 331 EUB : {
332 : dlist_head *queue;
333 : dlist_iter iter;
334 :
4093 simon 335 GIC 34 : Assert(mode >= 0 && mode < NUM_SYNC_REP_WAIT_MODE);
81 andres 336 GNC 34 : queue = &WalSndCtl->SyncRepQueue[mode];
337 :
338 34 : dlist_reverse_foreach(iter, queue)
4417 simon 339 EUB : {
81 andres 340 UNC 0 : PGPROC *proc = dlist_container(PGPROC, syncRepLinks, iter.cur);
341 :
342 : /*
343 : * Stop at the queue element that we should insert after to ensure the
344 : * queue is ordered by LSN.
345 : */
3754 alvherre 346 UIC 0 : if (proc->waitLSN < MyProc->waitLSN)
347 : {
81 andres 348 UNC 0 : dlist_insert_after(&proc->syncRepLinks, &MyProc->syncRepLinks);
349 0 : return;
350 : }
351 : }
352 :
353 : /*
354 : * If we get here, the list was either empty, or this process needs to be
355 : * at the head.
356 : */
81 andres 357 GNC 34 : dlist_push_head(queue, &MyProc->syncRepLinks);
4417 simon 358 EUB : }
359 :
4406 rhaas 360 : /*
361 : * Acquire SyncRepLock and cancel any wait currently in progress.
362 : */
363 : static void
4406 rhaas 364 UIC 0 : SyncRepCancelWait(void)
365 : {
4406 rhaas 366 LBC 0 : LWLockAcquire(SyncRepLock, LW_EXCLUSIVE);
81 andres 367 UNC 0 : if (!dlist_node_is_detached(&MyProc->syncRepLinks))
368 0 : dlist_delete_thoroughly(&MyProc->syncRepLinks);
4406 rhaas 369 UIC 0 : MyProc->syncRepState = SYNC_REP_NOT_WAITING;
370 0 : LWLockRelease(SyncRepLock);
371 0 : }
4406 rhaas 372 ECB :
373 : void
4260 tgl 374 GBC 11507 : SyncRepCleanupAtProcExit(void)
375 : {
376 : /*
1255 michael 377 EUB : * First check if we are removed from the queue without the lock to not
378 : * slow down backend exit.
379 : */
81 andres 380 GNC 11507 : if (!dlist_node_is_detached(&MyProc->syncRepLinks))
381 : {
4417 simon 382 LBC 0 : LWLockAcquire(SyncRepLock, LW_EXCLUSIVE);
383 :
384 : /* maybe we have just been removed, so recheck */
81 andres 385 UNC 0 : if (!dlist_node_is_detached(&MyProc->syncRepLinks))
386 0 : dlist_delete_thoroughly(&MyProc->syncRepLinks);
387 :
4417 simon 388 UIC 0 : LWLockRelease(SyncRepLock);
389 : }
4417 simon 390 GIC 11507 : }
391 :
392 : /*
393 : * ===========================================================
394 : * Synchronous Replication functions for wal sender processes
4417 simon 395 ECB : * ===========================================================
396 : */
397 :
398 : /*
399 : * Take any action required to initialise sync rep state from config
400 : * data. Called at WALSender startup and after each SIGHUP.
401 : */
402 : void
4417 simon 403 CBC 509 : SyncRepInitConfig(void)
4417 simon 404 ECB : {
405 : int priority;
406 :
407 : /*
408 : * Determine if we are a potential sync standby and remember the result
409 : * for handling replies from standby.
410 : */
4417 simon 411 GIC 509 : priority = SyncRepGetStandbyPriority();
412 509 : if (MyWalSnd->sync_standby_priority != priority)
413 : {
1086 tgl 414 CBC 20 : SpinLockAcquire(&MyWalSnd->mutex);
4417 simon 415 GIC 20 : MyWalSnd->sync_standby_priority = priority;
1086 tgl 416 20 : SpinLockRelease(&MyWalSnd->mutex);
417 :
4417 simon 418 20 : ereport(DEBUG1,
419 : (errmsg_internal("standby \"%s\" now has synchronous standby priority %u",
420 : application_name, priority)));
421 : }
422 509 : }
423 :
4417 simon 424 ECB : /*
425 : * Update the LSNs on each queue based upon our latest state. This
2559 fujii 426 : * implements a simple policy of first-valid-sync-standby-releases-waiter.
427 : *
428 : * Other policies are possible, which would change what we do here and
429 : * perhaps also which information we store as well.
430 : */
431 : void
4417 simon 432 CBC 90688 : SyncRepReleaseWaiters(void)
4417 simon 433 ECB : {
4417 simon 434 CBC 90688 : volatile WalSndCtlData *walsndctl = WalSndCtl;
435 : XLogRecPtr writePtr;
436 : XLogRecPtr flushPtr;
437 : XLogRecPtr applyPtr;
438 : bool got_recptr;
439 : bool am_sync;
4093 simon 440 GIC 90688 : int numwrite = 0;
441 90688 : int numflush = 0;
2567 rhaas 442 90688 : int numapply = 0;
4417 simon 443 ECB :
444 : /*
445 : * If this WALSender is serving a standby that is not on the list of
2596 alvherre 446 : * potential sync standbys then we have nothing to do. If we are still
447 : * starting up, still running base backup or the current flush position is
1592 michael 448 : * still invalid, then leave quickly also. Streaming or stopping WAL
449 : * senders are allowed to release waiters.
450 : */
4417 simon 451 GIC 90688 : if (MyWalSnd->sync_standby_priority == 0 ||
1592 michael 452 127 : (MyWalSnd->state != WALSNDSTATE_STREAMING &&
453 46 : MyWalSnd->state != WALSNDSTATE_STOPPING) ||
3931 magnus 454 102 : XLogRecPtrIsInvalid(MyWalSnd->flush))
455 : {
2559 fujii 456 CBC 90586 : announce_next_takeover = true;
4417 simon 457 GIC 90586 : return;
458 : }
459 :
460 : /*
461 : * We're a potential sync standby. Release waiters if there are enough
462 : * sync standbys and we are considered as sync.
463 : */
464 102 : LWLockAcquire(SyncRepLock, LW_EXCLUSIVE);
465 :
2559 fujii 466 ECB : /*
467 : * Check whether we are a sync standby or not, and calculate the synced
468 : * positions among all sync standbys. (Note: although this step does not
469 : * of itself require holding SyncRepLock, it seems like a good idea to do
470 : * it after acquiring the lock. This ensures that the WAL pointers we use
471 : * to release waiters are newer than any previous execution of this
1086 tgl 472 : * routine used.)
473 : */
2302 fujii 474 CBC 102 : got_recptr = SyncRepGetSyncRecPtr(&writePtr, &flushPtr, &applyPtr, &am_sync);
475 :
2559 fujii 476 ECB : /*
2495 rhaas 477 : * If we are managing a sync standby, though we weren't prior to this,
478 : * then announce we are now a sync standby.
479 : */
2559 fujii 480 GIC 102 : if (announce_next_takeover && am_sync)
2559 fujii 481 EUB : {
2559 fujii 482 GIC 8 : announce_next_takeover = false;
483 :
2302 484 8 : if (SyncRepConfig->syncrep_method == SYNC_REP_PRIORITY)
485 8 : ereport(LOG,
486 : (errmsg("standby \"%s\" is now a synchronous standby with priority %u",
487 : application_name, MyWalSnd->sync_standby_priority)));
488 : else
2302 fujii 489 UIC 0 : ereport(LOG,
2302 fujii 490 ECB : (errmsg("standby \"%s\" is now a candidate for quorum synchronous standby",
491 : application_name)));
2559 fujii 492 EUB : }
4417 simon 493 :
494 : /*
495 : * If the number of sync standbys is less than requested or we aren't
496 : * managing a sync standby then just leave.
497 : */
2302 fujii 498 GIC 102 : if (!got_recptr || !am_sync)
499 : {
4417 simon 500 UIC 0 : LWLockRelease(SyncRepLock);
2559 fujii 501 LBC 0 : announce_next_takeover = !am_sync;
4417 simon 502 UIC 0 : return;
4417 simon 503 ECB : }
504 :
505 : /*
3955 bruce 506 : * Set the lsn first so that when we wake backends they will release up to
507 : * this location.
4093 simon 508 : */
2559 fujii 509 CBC 102 : if (walsndctl->lsn[SYNC_REP_WAIT_WRITE] < writePtr)
510 : {
511 36 : walsndctl->lsn[SYNC_REP_WAIT_WRITE] = writePtr;
4093 simon 512 GIC 36 : numwrite = SyncRepWakeQueue(false, SYNC_REP_WAIT_WRITE);
4093 simon 513 ECB : }
2559 fujii 514 CBC 102 : if (walsndctl->lsn[SYNC_REP_WAIT_FLUSH] < flushPtr)
515 : {
2559 fujii 516 GIC 41 : walsndctl->lsn[SYNC_REP_WAIT_FLUSH] = flushPtr;
4093 simon 517 CBC 41 : numflush = SyncRepWakeQueue(false, SYNC_REP_WAIT_FLUSH);
518 : }
2559 fujii 519 102 : if (walsndctl->lsn[SYNC_REP_WAIT_APPLY] < applyPtr)
520 : {
2559 fujii 521 GIC 41 : walsndctl->lsn[SYNC_REP_WAIT_APPLY] = applyPtr;
2567 rhaas 522 41 : numapply = SyncRepWakeQueue(false, SYNC_REP_WAIT_APPLY);
523 : }
524 :
4417 simon 525 102 : LWLockRelease(SyncRepLock);
526 :
2559 fujii 527 102 : elog(DEBUG3, "released %d procs up to write %X/%X, %d procs up to flush %X/%X, %d procs up to apply %X/%X",
528 : numwrite, LSN_FORMAT_ARGS(writePtr),
529 : numflush, LSN_FORMAT_ARGS(flushPtr),
530 : numapply, LSN_FORMAT_ARGS(applyPtr));
531 : }
532 :
533 : /*
534 : * Calculate the synced Write, Flush and Apply positions among sync standbys.
535 : *
2559 fujii 536 ECB : * Return false if the number of sync standbys is less than
537 : * synchronous_standby_names specifies. Otherwise return true and
538 : * store the positions into *writePtr, *flushPtr and *applyPtr.
539 : *
540 : * On return, *am_sync is set to true if this walsender is connecting to
541 : * sync standby. Otherwise it's set to false.
542 : */
543 : static bool
2302 fujii 544 CBC 102 : SyncRepGetSyncRecPtr(XLogRecPtr *writePtr, XLogRecPtr *flushPtr,
2153 bruce 545 ECB : XLogRecPtr *applyPtr, bool *am_sync)
2559 fujii 546 : {
1086 tgl 547 : SyncRepStandbyData *sync_standbys;
548 : int num_standbys;
549 : int i;
1255 michael 550 :
1086 tgl 551 EUB : /* Initialize default results */
2559 fujii 552 GIC 102 : *writePtr = InvalidXLogRecPtr;
553 102 : *flushPtr = InvalidXLogRecPtr;
2559 fujii 554 CBC 102 : *applyPtr = InvalidXLogRecPtr;
2559 fujii 555 GIC 102 : *am_sync = false;
556 :
1086 tgl 557 ECB : /* Quick out if not even configured to be synchronous */
1086 tgl 558 GIC 102 : if (SyncRepConfig == NULL)
1086 tgl 559 LBC 0 : return false;
560 :
2559 fujii 561 ECB : /* Get standbys that are considered as synchronous at this moment */
1086 tgl 562 CBC 102 : num_standbys = SyncRepGetCandidateStandbys(&sync_standbys);
563 :
564 : /* Am I among the candidate sync standbys? */
1086 tgl 565 GIC 102 : for (i = 0; i < num_standbys; i++)
566 : {
567 102 : if (sync_standbys[i].is_me)
568 : {
569 102 : *am_sync = true;
1086 tgl 570 CBC 102 : break;
1086 tgl 571 ECB : }
572 : }
4417 simon 573 EUB :
574 : /*
575 : * Nothing more to do if we are not managing a sync standby or there are
576 : * not enough synchronous standbys.
577 : */
2538 tgl 578 GIC 102 : if (!(*am_sync) ||
1086 579 102 : num_standbys < SyncRepConfig->num_sync)
580 : {
1086 tgl 581 UIC 0 : pfree(sync_standbys);
2559 fujii 582 0 : return false;
583 : }
584 :
585 : /*
586 : * In a priority-based sync replication, the synced positions are the
587 : * oldest ones among sync standbys. In a quorum-based, they are the Nth
588 : * latest ones.
589 : *
2153 bruce 590 ECB : * SyncRepGetNthLatestSyncRecPtr() also can calculate the oldest
591 : * positions. But we use SyncRepGetOldestSyncRecPtr() for that calculation
592 : * because it's a bit more efficient.
593 : *
594 : * XXX If the numbers of current and requested sync standbys are the same,
595 : * we can use SyncRepGetOldestSyncRecPtr() to calculate the synced
596 : * positions even in a quorum-based sync replication.
2559 fujii 597 EUB : */
2302 fujii 598 GIC 102 : if (SyncRepConfig->syncrep_method == SYNC_REP_PRIORITY)
2559 fujii 599 EUB : {
2302 fujii 600 GIC 102 : SyncRepGetOldestSyncRecPtr(writePtr, flushPtr, applyPtr,
601 : sync_standbys, num_standbys);
2302 fujii 602 ECB : }
603 : else
604 : {
2302 fujii 605 UIC 0 : SyncRepGetNthLatestSyncRecPtr(writePtr, flushPtr, applyPtr,
606 : sync_standbys, num_standbys,
1086 tgl 607 0 : SyncRepConfig->num_sync);
608 : }
609 :
1086 tgl 610 CBC 102 : pfree(sync_standbys);
2302 fujii 611 GIC 102 : return true;
612 : }
613 :
614 : /*
615 : * Calculate the oldest Write, Flush and Apply positions among sync standbys.
616 : */
617 : static void
1086 tgl 618 102 : SyncRepGetOldestSyncRecPtr(XLogRecPtr *writePtr,
619 : XLogRecPtr *flushPtr,
620 : XLogRecPtr *applyPtr,
621 : SyncRepStandbyData *sync_standbys,
622 : int num_standbys)
2302 fujii 623 ECB : {
624 : int i;
625 :
626 : /*
2153 bruce 627 : * Scan through all sync standbys and calculate the oldest Write, Flush
628 : * and Apply positions. We assume *writePtr et al were initialized to
1086 tgl 629 : * InvalidXLogRecPtr.
2302 fujii 630 : */
1086 tgl 631 CBC 204 : for (i = 0; i < num_standbys; i++)
2302 fujii 632 ECB : {
1086 tgl 633 CBC 102 : XLogRecPtr write = sync_standbys[i].write;
634 102 : XLogRecPtr flush = sync_standbys[i].flush;
1086 tgl 635 GIC 102 : XLogRecPtr apply = sync_standbys[i].apply;
2559 fujii 636 ECB :
2559 fujii 637 GIC 102 : if (XLogRecPtrIsInvalid(*writePtr) || *writePtr > write)
638 102 : *writePtr = write;
639 102 : if (XLogRecPtrIsInvalid(*flushPtr) || *flushPtr > flush)
640 102 : *flushPtr = flush;
641 102 : if (XLogRecPtrIsInvalid(*applyPtr) || *applyPtr > apply)
642 102 : *applyPtr = apply;
2559 fujii 643 EUB : }
2302 fujii 644 GIC 102 : }
645 :
646 : /*
647 : * Calculate the Nth latest Write, Flush and Apply positions among sync
648 : * standbys.
649 : */
650 : static void
1086 tgl 651 UIC 0 : SyncRepGetNthLatestSyncRecPtr(XLogRecPtr *writePtr,
652 : XLogRecPtr *flushPtr,
653 : XLogRecPtr *applyPtr,
654 : SyncRepStandbyData *sync_standbys,
655 : int num_standbys,
1086 tgl 656 EUB : uint8 nth)
657 : {
2153 bruce 658 : XLogRecPtr *write_array;
659 : XLogRecPtr *flush_array;
660 : XLogRecPtr *apply_array;
661 : int i;
2302 fujii 662 :
663 : /* Should have enough candidates, or somebody messed up */
1086 tgl 664 UBC 0 : Assert(nth > 0 && nth <= num_standbys);
2302 fujii 665 EUB :
1086 tgl 666 UBC 0 : write_array = (XLogRecPtr *) palloc(sizeof(XLogRecPtr) * num_standbys);
1086 tgl 667 UIC 0 : flush_array = (XLogRecPtr *) palloc(sizeof(XLogRecPtr) * num_standbys);
668 0 : apply_array = (XLogRecPtr *) palloc(sizeof(XLogRecPtr) * num_standbys);
669 :
1086 tgl 670 UBC 0 : for (i = 0; i < num_standbys; i++)
1086 tgl 671 EUB : {
1086 tgl 672 UBC 0 : write_array[i] = sync_standbys[i].write;
1086 tgl 673 UIC 0 : flush_array[i] = sync_standbys[i].flush;
674 0 : apply_array[i] = sync_standbys[i].apply;
2302 fujii 675 EUB : }
676 :
2181 677 : /* Sort each array in descending order */
1086 tgl 678 UIC 0 : qsort(write_array, num_standbys, sizeof(XLogRecPtr), cmp_lsn);
1086 tgl 679 UBC 0 : qsort(flush_array, num_standbys, sizeof(XLogRecPtr), cmp_lsn);
680 0 : qsort(apply_array, num_standbys, sizeof(XLogRecPtr), cmp_lsn);
2302 fujii 681 EUB :
682 : /* Get Nth latest Write, Flush, Apply positions */
2302 fujii 683 UIC 0 : *writePtr = write_array[nth - 1];
684 0 : *flushPtr = flush_array[nth - 1];
685 0 : *applyPtr = apply_array[nth - 1];
686 :
687 0 : pfree(write_array);
2302 fujii 688 UBC 0 : pfree(flush_array);
2302 fujii 689 UIC 0 : pfree(apply_array);
2302 fujii 690 UBC 0 : }
2302 fujii 691 EUB :
692 : /*
693 : * Compare lsn in order to sort array in descending order.
694 : */
695 : static int
2302 fujii 696 UBC 0 : cmp_lsn(const void *a, const void *b)
697 : {
2153 bruce 698 0 : XLogRecPtr lsn1 = *((const XLogRecPtr *) a);
2153 bruce 699 UIC 0 : XLogRecPtr lsn2 = *((const XLogRecPtr *) b);
700 :
2302 fujii 701 0 : if (lsn1 > lsn2)
702 0 : return -1;
703 0 : else if (lsn1 == lsn2)
704 0 : return 0;
705 : else
706 0 : return 1;
707 : }
708 :
2559 fujii 709 ECB : /*
710 : * Return data about walsenders that are candidates to be sync standbys.
711 : *
712 : * *standbys is set to a palloc'd array of structs of per-walsender data,
713 : * and the number of valid entries (candidate sync senders) is returned.
714 : * (This might be more or fewer than num_sync; caller must check.)
715 : */
1086 tgl 716 : int
1086 tgl 717 GIC 655 : SyncRepGetCandidateStandbys(SyncRepStandbyData **standbys)
718 : {
1086 tgl 719 ECB : int i;
720 : int n;
721 :
722 : /* Create result array */
1086 tgl 723 CBC 655 : *standbys = (SyncRepStandbyData *)
724 655 : palloc(max_wal_senders * sizeof(SyncRepStandbyData));
725 :
726 : /* Quick exit if sync replication is not requested */
2302 fujii 727 GIC 655 : if (SyncRepConfig == NULL)
1086 tgl 728 536 : return 0;
729 :
730 : /* Collect raw data from shared memory */
1086 tgl 731 CBC 119 : n = 0;
2302 fujii 732 1309 : for (i = 0; i < max_wal_senders; i++)
733 : {
1086 tgl 734 ECB : volatile WalSnd *walsnd; /* Use volatile pointer to prevent code
735 : * rearrangement */
736 : SyncRepStandbyData *stby;
737 : WalSndState state; /* not included in SyncRepStandbyData */
2109 alvherre 738 :
2302 fujii 739 CBC 1190 : walsnd = &WalSndCtl->walsnds[i];
1086 tgl 740 1190 : stby = *standbys + n;
2302 fujii 741 ECB :
2109 alvherre 742 GIC 1190 : SpinLockAcquire(&walsnd->mutex);
1086 tgl 743 1190 : stby->pid = walsnd->pid;
2109 alvherre 744 CBC 1190 : state = walsnd->state;
1086 tgl 745 1190 : stby->write = walsnd->write;
1086 tgl 746 GIC 1190 : stby->flush = walsnd->flush;
747 1190 : stby->apply = walsnd->apply;
1086 tgl 748 CBC 1190 : stby->sync_standby_priority = walsnd->sync_standby_priority;
2109 alvherre 749 GIC 1190 : SpinLockRelease(&walsnd->mutex);
2109 alvherre 750 EUB :
751 : /* Must be active */
1086 tgl 752 GIC 1190 : if (stby->pid == 0)
2302 fujii 753 CBC 1043 : continue;
2302 fujii 754 ECB :
755 : /* Must be streaming or stopping */
1592 michael 756 GIC 147 : if (state != WALSNDSTATE_STREAMING &&
1592 michael 757 ECB : state != WALSNDSTATE_STOPPING)
2302 fujii 758 UBC 0 : continue;
759 :
760 : /* Must be synchronous */
1086 tgl 761 CBC 147 : if (stby->sync_standby_priority == 0)
2302 fujii 762 7 : continue;
2302 fujii 763 ECB :
764 : /* Must have a valid flush position */
1086 tgl 765 GIC 140 : if (XLogRecPtrIsInvalid(stby->flush))
2302 fujii 766 UIC 0 : continue;
767 :
768 : /* OK, it's a candidate */
1086 tgl 769 GIC 140 : stby->walsnd_index = i;
770 140 : stby->is_me = (walsnd == MyWalSnd);
1086 tgl 771 CBC 140 : n++;
2302 fujii 772 ECB : }
773 :
774 : /*
1086 tgl 775 : * In quorum mode, we return all the candidates. In priority mode, if we
776 : * have too many candidates then return only the num_sync ones of highest
777 : * priority.
2559 fujii 778 : */
1086 tgl 779 GIC 119 : if (SyncRepConfig->syncrep_method == SYNC_REP_PRIORITY &&
780 118 : n > SyncRepConfig->num_sync)
2559 fujii 781 ECB : {
782 : /* Sort by priority ... */
1086 tgl 783 GIC 8 : qsort(*standbys, n, sizeof(SyncRepStandbyData),
784 : standby_priority_comparator);
785 : /* ... then report just the first num_sync ones */
786 8 : n = SyncRepConfig->num_sync;
787 : }
2559 fujii 788 ECB :
1086 tgl 789 GIC 119 : return n;
1086 tgl 790 ECB : }
2559 fujii 791 :
792 : /*
793 : * qsort comparator to sort SyncRepStandbyData entries by priority
1086 tgl 794 : */
795 : static int
1086 tgl 796 GIC 19 : standby_priority_comparator(const void *a, const void *b)
797 : {
798 19 : const SyncRepStandbyData *sa = (const SyncRepStandbyData *) a;
799 19 : const SyncRepStandbyData *sb = (const SyncRepStandbyData *) b;
800 :
801 : /* First, sort by increasing priority value */
1086 tgl 802 CBC 19 : if (sa->sync_standby_priority != sb->sync_standby_priority)
1086 tgl 803 GIC 9 : return sa->sync_standby_priority - sb->sync_standby_priority;
804 :
805 : /*
806 : * We might have equal priority values; arbitrarily break ties by position
807 : * in the WALSnd array. (This is utterly bogus, since that is arrival
808 : * order dependent, but there are regression tests that rely on it.)
809 : */
810 10 : return sa->walsnd_index - sb->walsnd_index;
811 : }
812 :
813 :
814 : /*
4417 simon 815 ECB : * Check if we are in the list of sync standbys, and if so, determine
816 : * priority sequence. Return priority if set, or zero to indicate that
817 : * we are not a potential sync standby.
818 : *
819 : * Compare the parameter SyncRepStandbyNames against the application_name
820 : * for this WALSender, or allow any name if we find a wildcard "*".
821 : */
822 : static int
4417 simon 823 GIC 509 : SyncRepGetStandbyPriority(void)
824 : {
2538 tgl 825 ECB : const char *standby_name;
826 : int priority;
4417 simon 827 GIC 509 : bool found = false;
4417 simon 828 ECB :
4282 829 : /*
830 : * Since synchronous cascade replication is not allowed, we always set the
3955 bruce 831 : * priority of cascading walsender to zero.
4282 simon 832 : */
4282 simon 833 GIC 509 : if (am_cascading_walsender)
4282 simon 834 CBC 33 : return 0;
4282 simon 835 ECB :
2538 tgl 836 GIC 476 : if (!SyncStandbysDefined() || SyncRepConfig == NULL)
4417 simon 837 CBC 453 : return 0;
4417 simon 838 ECB :
2538 tgl 839 GIC 23 : standby_name = SyncRepConfig->member_names;
2538 tgl 840 CBC 31 : for (priority = 1; priority <= SyncRepConfig->nmembers; priority++)
841 : {
4417 simon 842 GIC 30 : if (pg_strcasecmp(standby_name, application_name) == 0 ||
2538 tgl 843 CBC 18 : strcmp(standby_name, "*") == 0)
4417 simon 844 ECB : {
4417 simon 845 GIC 22 : found = true;
846 22 : break;
847 : }
2538 tgl 848 8 : standby_name += strlen(standby_name) + 1;
849 : }
4417 simon 850 ECB :
2174 fujii 851 GIC 23 : if (!found)
852 1 : return 0;
853 :
854 : /*
855 : * In quorum-based sync replication, all the standbys in the list have the
856 : * same priority, one.
857 : */
858 22 : return (SyncRepConfig->syncrep_method == SYNC_REP_PRIORITY) ? priority : 1;
859 : }
860 :
861 : /*
3260 bruce 862 ECB : * Walk the specified queue from head. Set the state of any backends that
863 : * need to be woken, remove them from the queue, and then wake them.
4093 simon 864 : * Pass all = true to wake whole queue; otherwise, just wake up to
865 : * the walsender's LSN.
866 : *
867 : * The caller must hold SyncRepLock in exclusive mode.
4417 868 : */
2936 ishii 869 : static int
4093 simon 870 CBC 118 : SyncRepWakeQueue(bool all, int mode)
871 : {
4417 872 118 : volatile WalSndCtlData *walsndctl = WalSndCtl;
4382 bruce 873 GIC 118 : int numprocs = 0;
874 : dlist_mutable_iter iter;
875 :
4093 simon 876 118 : Assert(mode >= 0 && mode < NUM_SYNC_REP_WAIT_MODE);
1255 michael 877 118 : Assert(LWLockHeldByMeInMode(SyncRepLock, LW_EXCLUSIVE));
4093 simon 878 CBC 118 : Assert(SyncRepQueueIsOrderedByLSN(mode));
4417 simon 879 ECB :
81 andres 880 GNC 132 : dlist_foreach_modify(iter, &WalSndCtl->SyncRepQueue[mode])
881 : {
882 19 : PGPROC *proc = dlist_container(PGPROC, syncRepLinks, iter.cur);
883 :
884 : /*
885 : * Assume the queue is ordered by LSN
886 : */
3754 alvherre 887 GIC 19 : if (!all && walsndctl->lsn[mode] < proc->waitLSN)
4417 simon 888 5 : return numprocs;
4417 simon 889 ECB :
890 : /*
891 : * Remove from queue.
892 : */
81 andres 893 GNC 14 : dlist_delete_thoroughly(&proc->syncRepLinks);
894 :
895 : /*
2097 heikki.linnakangas 896 ECB : * SyncRepWaitForLSN() reads syncRepState without holding the lock, so
897 : * make sure that it sees the queue link being removed before the
898 : * syncRepState change.
899 : */
2097 heikki.linnakangas 900 GIC 14 : pg_write_barrier();
901 :
902 : /*
903 : * Set state to complete; see SyncRepWaitForLSN() for discussion of
904 : * the various states.
905 : */
81 andres 906 GNC 14 : proc->syncRepState = SYNC_REP_WAIT_COMPLETE;
4417 simon 907 ECB :
908 : /*
909 : * Wake only when we have set state and removed from queue.
910 : */
81 andres 911 GNC 14 : SetLatch(&(proc->procLatch));
912 :
4417 simon 913 CBC 14 : numprocs++;
914 : }
915 :
4417 simon 916 GIC 113 : return numprocs;
917 : }
918 :
919 : /*
4087 simon 920 ECB : * The checkpointer calls this as needed to update the shared
921 : * sync_standbys_defined flag, so that backends don't remain permanently wedged
922 : * if synchronous_standby_names is unset. It's safe to check the current value
923 : * without the lock, because it's only ever updated by one process. But we
4406 rhaas 924 EUB : * must take the lock to change it.
925 : */
926 : void
4406 rhaas 927 GIC 391 : SyncRepUpdateSyncStandbysDefined(void)
928 : {
929 391 : bool sync_standbys_defined = SyncStandbysDefined();
930 :
931 391 : if (sync_standbys_defined != WalSndCtl->sync_standbys_defined)
932 : {
933 8 : LWLockAcquire(SyncRepLock, LW_EXCLUSIVE);
934 :
4406 rhaas 935 ECB : /*
936 : * If synchronous_standby_names has been reset to empty, it's futile
1256 michael 937 : * for backends to continue waiting. Since the user no longer wants
938 : * synchronous replication, we'd better wake them up.
4406 rhaas 939 : */
4406 rhaas 940 GIC 8 : if (!sync_standbys_defined)
941 : {
942 : int i;
4093 simon 943 ECB :
4093 simon 944 UIC 0 : for (i = 0; i < NUM_SYNC_REP_WAIT_MODE; i++)
945 0 : SyncRepWakeQueue(true, i);
946 : }
947 :
4406 rhaas 948 ECB : /*
949 : * Only allow people to join the queue when there are synchronous
950 : * standbys defined. Without this interlock, there's a race
951 : * condition: we might wake up all the current waiters; then, some
952 : * backend that hasn't yet reloaded its config might go to sleep on
953 : * the queue (and never wake up). This prevents that.
954 : */
4406 rhaas 955 GIC 8 : WalSndCtl->sync_standbys_defined = sync_standbys_defined;
956 :
957 8 : LWLockRelease(SyncRepLock);
958 : }
959 391 : }
4406 rhaas 960 ECB :
4417 simon 961 EUB : #ifdef USE_ASSERT_CHECKING
962 : static bool
4093 simon 963 CBC 152 : SyncRepQueueIsOrderedByLSN(int mode)
964 : {
4382 bruce 965 ECB : XLogRecPtr lastLSN;
966 : dlist_iter iter;
967 :
4093 simon 968 GIC 152 : Assert(mode >= 0 && mode < NUM_SYNC_REP_WAIT_MODE);
969 :
3941 heikki.linnakangas 970 152 : lastLSN = 0;
971 :
81 andres 972 GNC 205 : dlist_foreach(iter, &WalSndCtl->SyncRepQueue[mode])
4417 simon 973 ECB : {
81 andres 974 GNC 53 : PGPROC *proc = dlist_container(PGPROC, syncRepLinks, iter.cur);
975 :
976 : /*
4382 bruce 977 ECB : * Check the queue is ordered by LSN and that multiple procs don't
978 : * have matching LSNs
979 : */
3754 alvherre 980 GIC 53 : if (proc->waitLSN <= lastLSN)
4417 simon 981 UIC 0 : return false;
982 :
4417 simon 983 CBC 53 : lastLSN = proc->waitLSN;
4417 simon 984 ECB : }
985 :
4417 simon 986 GIC 152 : return true;
4417 simon 987 ECB : }
988 : #endif
4417 simon 989 EUB :
990 : /*
991 : * ===========================================================
992 : * Synchronous Replication functions executed by any process
993 : * ===========================================================
994 : */
995 :
996 : bool
4385 tgl 997 CBC 1926 : check_synchronous_standby_names(char **newval, void **extra, GucSource source)
998 : {
2559 fujii 999 GBC 1926 : if (*newval != NULL && (*newval)[0] != '\0')
4417 simon 1000 67 : {
2538 tgl 1001 EUB : int parse_rc;
1002 : SyncRepConfigData *pconf;
1003 :
1004 : /* Reset communication variables to ensure a fresh start */
2538 tgl 1005 GIC 67 : syncrep_parse_result = NULL;
2538 tgl 1006 CBC 67 : syncrep_parse_error_msg = NULL;
2538 tgl 1007 ECB :
2538 tgl 1008 EUB : /* Parse the synchronous_standby_names string */
2559 fujii 1009 CBC 67 : syncrep_scanner_init(*newval);
2559 fujii 1010 GIC 67 : parse_rc = syncrep_yyparse();
2559 fujii 1011 CBC 67 : syncrep_scanner_finish();
1012 :
2538 tgl 1013 GIC 67 : if (parse_rc != 0 || syncrep_parse_result == NULL)
1014 : {
2559 fujii 1015 UIC 0 : GUC_check_errcode(ERRCODE_SYNTAX_ERROR);
2538 tgl 1016 0 : if (syncrep_parse_error_msg)
1017 0 : GUC_check_errdetail("%s", syncrep_parse_error_msg);
1018 : else
1019 0 : GUC_check_errdetail("synchronous_standby_names parser failed");
2559 fujii 1020 0 : return false;
1021 : }
2559 fujii 1022 ECB :
2304 fujii 1023 GIC 67 : if (syncrep_parse_result->num_sync <= 0)
2304 fujii 1024 ECB : {
2304 fujii 1025 UIC 0 : GUC_check_errmsg("number of synchronous standbys (%d) must be greater than zero",
1026 0 : syncrep_parse_result->num_sync);
1027 0 : return false;
2304 fujii 1028 ECB : }
1029 :
1030 : /* GUC extra value must be guc_malloc'd, not palloc'd */
2538 tgl 1031 : pconf = (SyncRepConfigData *)
177 tgl 1032 GNC 67 : guc_malloc(LOG, syncrep_parse_result->config_size);
2538 tgl 1033 GIC 67 : if (pconf == NULL)
2538 tgl 1034 LBC 0 : return false;
2538 tgl 1035 GIC 67 : memcpy(pconf, syncrep_parse_result, syncrep_parse_result->config_size);
2538 tgl 1036 ECB :
2538 tgl 1037 GIC 67 : *extra = (void *) pconf;
2538 tgl 1038 EUB :
2559 fujii 1039 : /*
2538 tgl 1040 : * We need not explicitly clean up syncrep_parse_result. It, and any
2538 tgl 1041 ECB : * other cruft generated during parsing, will be freed when the
1042 : * current memory context is deleted. (This code is generally run in
1043 : * a short-lived context used for config file processing, so that will
2538 tgl 1044 EUB : * not be very long.)
2559 fujii 1045 : */
1046 : }
2538 tgl 1047 ECB : else
2538 tgl 1048 CBC 1859 : *extra = NULL;
4417 simon 1049 ECB :
4385 tgl 1050 GIC 1926 : return true;
4417 simon 1051 ECB : }
1052 :
1053 : void
2538 tgl 1054 GIC 1916 : assign_synchronous_standby_names(const char *newval, void *extra)
1055 : {
1056 1916 : SyncRepConfig = (SyncRepConfigData *) extra;
1057 1916 : }
1058 :
1059 : void
4093 simon 1060 2335 : assign_synchronous_commit(int newval, void *extra)
1061 : {
1062 2335 : switch (newval)
1063 : {
4093 simon 1064 UIC 0 : case SYNCHRONOUS_COMMIT_REMOTE_WRITE:
1065 0 : SyncRepWaitMode = SYNC_REP_WAIT_WRITE;
1066 0 : break;
4093 simon 1067 GIC 1976 : case SYNCHRONOUS_COMMIT_REMOTE_FLUSH:
1068 1976 : SyncRepWaitMode = SYNC_REP_WAIT_FLUSH;
1069 1976 : break;
2567 rhaas 1070 UIC 0 : case SYNCHRONOUS_COMMIT_REMOTE_APPLY:
1071 0 : SyncRepWaitMode = SYNC_REP_WAIT_APPLY;
1072 0 : break;
4093 simon 1073 GIC 359 : default:
1074 359 : SyncRepWaitMode = SYNC_REP_NO_WAIT;
1075 359 : break;
1076 : }
1077 2335 : }
|