LCOV - differential code coverage report
Current view: top level - src/backend/replication - syncrep.c (source / functions) Coverage Total Hit UNC LBC UIC UBC GBC GIC GNC CBC EUB ECB DUB DCB
Current: Differential Code Coverage HEAD vs 15 Lines: 73.1 % 308 225 7 5 59 12 5 143 18 59 62 148 4 13
Current Date: 2023-04-08 15:15:32 Functions: 84.2 % 19 16 3 16 2 16 1
Baseline: 15
Baseline Date: 2023-04-08 15:09:40
Legend: Lines: hit not hit

           TLA  Line data    Source code
       1                 : /*-------------------------------------------------------------------------
       2                 :  *
       3                 :  * syncrep.c
       4                 :  *
       5                 :  * Synchronous replication is new as of PostgreSQL 9.1.
       6                 :  *
       7                 :  * If requested, transaction commits wait until their commit LSN are
       8                 :  * acknowledged by the synchronous standbys.
       9                 :  *
      10                 :  * This module contains the code for waiting and release of backends.
      11                 :  * All code in this module executes on the primary. The core streaming
      12                 :  * replication transport remains within WALreceiver/WALsender modules.
      13                 :  *
      14                 :  * The essence of this design is that it isolates all logic about
      15                 :  * waiting/releasing onto the primary. The primary defines which standbys
      16                 :  * it wishes to wait for. The standbys are completely unaware of the
      17                 :  * durability requirements of transactions on the primary, reducing the
      18                 :  * complexity of the code and streamlining both standby operations and
      19                 :  * network bandwidth because there is no requirement to ship
      20                 :  * per-transaction state information.
      21                 :  *
      22                 :  * Replication is either synchronous or not synchronous (async). If it is
      23                 :  * async, we just fastpath out of here. If it is sync, then we wait for
      24                 :  * the write, flush or apply location on the standby before releasing
      25                 :  * the waiting backend. Further complexity in that interaction is
      26                 :  * expected in later releases.
      27                 :  *
      28                 :  * The best performing way to manage the waiting backends is to have a
      29                 :  * single ordered queue of waiting backends, so that we can avoid
      30                 :  * searching the through all waiters each time we receive a reply.
      31                 :  *
      32                 :  * In 9.5 or before only a single standby could be considered as
      33                 :  * synchronous. In 9.6 we support a priority-based multiple synchronous
      34                 :  * standbys. In 10.0 a quorum-based multiple synchronous standbys is also
      35                 :  * supported. The number of synchronous standbys that transactions
      36                 :  * must wait for replies from is specified in synchronous_standby_names.
      37                 :  * This parameter also specifies a list of standby names and the method
      38                 :  * (FIRST and ANY) to choose synchronous standbys from the listed ones.
      39                 :  *
      40                 :  * The method FIRST specifies a priority-based synchronous replication
      41                 :  * and makes transaction commits wait until their WAL records are
      42                 :  * replicated to the requested number of synchronous standbys chosen based
      43                 :  * on their priorities. The standbys whose names appear earlier in the list
      44                 :  * are given higher priority and will be considered as synchronous.
      45                 :  * Other standby servers appearing later in this list represent potential
      46                 :  * synchronous standbys. If any of the current synchronous standbys
      47                 :  * disconnects for whatever reason, it will be replaced immediately with
      48                 :  * the next-highest-priority standby.
      49                 :  *
      50                 :  * The method ANY specifies a quorum-based synchronous replication
      51                 :  * and makes transaction commits wait until their WAL records are
      52                 :  * replicated to at least the requested number of synchronous standbys
      53                 :  * in the list. All the standbys appearing in the list are considered as
      54                 :  * candidates for quorum synchronous standbys.
      55                 :  *
      56                 :  * If neither FIRST nor ANY is specified, FIRST is used as the method.
      57                 :  * This is for backward compatibility with 9.6 or before where only a
      58                 :  * priority-based sync replication was supported.
      59                 :  *
      60                 :  * Before the standbys chosen from synchronous_standby_names can
      61                 :  * become the synchronous standbys they must have caught up with
      62                 :  * the primary; that may take some time. Once caught up,
      63                 :  * the standbys which are considered as synchronous at that moment
      64                 :  * will release waiters from the queue.
      65                 :  *
      66                 :  * Portions Copyright (c) 2010-2023, PostgreSQL Global Development Group
      67                 :  *
      68                 :  * IDENTIFICATION
      69                 :  *    src/backend/replication/syncrep.c
      70                 :  *
      71                 :  *-------------------------------------------------------------------------
      72                 :  */
      73                 : #include "postgres.h"
      74                 : 
      75                 : #include <unistd.h>
      76                 : 
      77                 : #include "access/xact.h"
      78                 : #include "miscadmin.h"
      79                 : #include "pgstat.h"
      80                 : #include "replication/syncrep.h"
      81                 : #include "replication/walsender.h"
      82                 : #include "replication/walsender_private.h"
      83                 : #include "storage/pmsignal.h"
      84                 : #include "storage/proc.h"
      85                 : #include "tcop/tcopprot.h"
      86                 : #include "utils/builtins.h"
      87                 : #include "utils/guc_hooks.h"
      88                 : #include "utils/ps_status.h"
      89                 : 
      90                 : /* User-settable parameters for sync rep */
      91                 : char       *SyncRepStandbyNames;
      92                 : 
      93                 : #define SyncStandbysDefined() \
      94                 :     (SyncRepStandbyNames != NULL && SyncRepStandbyNames[0] != '\0')
      95                 : 
      96                 : static bool announce_next_takeover = true;
      97                 : 
      98                 : SyncRepConfigData *SyncRepConfig = NULL;
      99                 : static int  SyncRepWaitMode = SYNC_REP_NO_WAIT;
     100                 : 
     101                 : static void SyncRepQueueInsert(int mode);
     102                 : static void SyncRepCancelWait(void);
     103                 : static int  SyncRepWakeQueue(bool all, int mode);
     104                 : 
     105                 : static bool SyncRepGetSyncRecPtr(XLogRecPtr *writePtr,
     106                 :                                  XLogRecPtr *flushPtr,
     107                 :                                  XLogRecPtr *applyPtr,
     108                 :                                  bool *am_sync);
     109                 : static void SyncRepGetOldestSyncRecPtr(XLogRecPtr *writePtr,
     110                 :                                        XLogRecPtr *flushPtr,
     111                 :                                        XLogRecPtr *applyPtr,
     112                 :                                        SyncRepStandbyData *sync_standbys,
     113                 :                                        int num_standbys);
     114                 : static void SyncRepGetNthLatestSyncRecPtr(XLogRecPtr *writePtr,
     115                 :                                           XLogRecPtr *flushPtr,
     116                 :                                           XLogRecPtr *applyPtr,
     117                 :                                           SyncRepStandbyData *sync_standbys,
     118                 :                                           int num_standbys,
     119                 :                                           uint8 nth);
     120                 : static int  SyncRepGetStandbyPriority(void);
     121                 : static int  standby_priority_comparator(const void *a, const void *b);
     122                 : static int  cmp_lsn(const void *a, const void *b);
     123                 : 
     124                 : #ifdef USE_ASSERT_CHECKING
     125                 : static bool SyncRepQueueIsOrderedByLSN(int mode);
     126                 : #endif
     127                 : 
     128                 : /*
     129                 :  * ===========================================================
     130                 :  * Synchronous Replication functions for normal user backends
     131                 :  * ===========================================================
     132                 :  */
     133                 : 
     134                 : /*
     135                 :  * Wait for synchronous replication, if requested by user.
     136                 :  *
     137                 :  * Initially backends start in state SYNC_REP_NOT_WAITING and then
     138                 :  * change that state to SYNC_REP_WAITING before adding ourselves
     139                 :  * to the wait queue. During SyncRepWakeQueue() a WALSender changes
     140                 :  * the state to SYNC_REP_WAIT_COMPLETE once replication is confirmed.
     141                 :  * This backend then resets its state to SYNC_REP_NOT_WAITING.
     142                 :  *
     143                 :  * 'lsn' represents the LSN to wait for.  'commit' indicates whether this LSN
     144                 :  * represents a commit record.  If it doesn't, then we wait only for the WAL
     145                 :  * to be flushed if synchronous_commit is set to the higher level of
     146                 :  * remote_apply, because only commit records provide apply feedback.
     147                 :  */
     148                 : void
     149 GIC      290343 : SyncRepWaitForLSN(XLogRecPtr lsn, bool commit)
     150 ECB             : {
     151                 :     int         mode;
     152                 : 
     153                 :     /*
     154                 :      * This should be called while holding interrupts during a transaction
     155                 :      * commit to prevent the follow-up shared memory queue cleanups to be
     156                 :      * influenced by external interruptions.
     157                 :      */
     158 GIC      290343 :     Assert(InterruptHoldoffCount > 0);
     159                 : 
     160                 :     /*
     161                 :      * Fast exit if user has not requested sync replication, or there are no
     162                 :      * sync replication standby names defined.
     163                 :      *
     164                 :      * Since this routine gets called every commit time, it's important to
     165                 :      * exit quickly if sync replication is not requested. So we check
     166                 :      * WalSndCtl->sync_standbys_defined flag without the lock and exit
     167                 :      * immediately if it's false. If it's true, we need to check it again
     168                 :      * later while holding the lock, to check the flag and operate the sync
     169                 :      * rep queue atomically. This is necessary to avoid the race condition
     170                 :      * described in SyncRepUpdateSyncStandbysDefined(). On the other hand, if
     171                 :      * it's false, the lock is not necessary because we don't touch the queue.
     172 ECB             :      */
     173 CBC      290343 :     if (!SyncRepRequested() ||
     174          266442 :         !((volatile WalSndCtlData *) WalSndCtl)->sync_standbys_defined)
     175 GIC      290299 :         return;
     176                 : 
     177 ECB             :     /* Cap the level for anything other than commit to remote flush only. */
     178 CBC          44 :     if (commit)
     179 GIC          22 :         mode = SyncRepWaitMode;
     180 ECB             :     else
     181 GIC          22 :         mode = Min(SyncRepWaitMode, SYNC_REP_WAIT_FLUSH);
     182 ECB             : 
     183 GNC          44 :     Assert(dlist_node_is_detached(&MyProc->syncRepLinks));
     184 GIC          44 :     Assert(WalSndCtl != NULL);
     185 ECB             : 
     186 CBC          44 :     LWLockAcquire(SyncRepLock, LW_EXCLUSIVE);
     187 GIC          44 :     Assert(MyProc->syncRepState == SYNC_REP_NOT_WAITING);
     188                 : 
     189                 :     /*
     190                 :      * We don't wait for sync rep if WalSndCtl->sync_standbys_defined is not
     191                 :      * set.  See SyncRepUpdateSyncStandbysDefined.
     192                 :      *
     193                 :      * Also check that the standby hasn't already replied. Unlikely race
     194                 :      * condition but we'll be fetching that cache line anyway so it's likely
     195                 :      * to be a low cost check.
     196 ECB             :      */
     197 CBC          44 :     if (!WalSndCtl->sync_standbys_defined ||
     198 GIC          44 :         lsn <= WalSndCtl->lsn[mode])
     199 ECB             :     {
     200 CBC          10 :         LWLockRelease(SyncRepLock);
     201 GIC          10 :         return;
     202                 :     }
     203                 : 
     204                 :     /*
     205                 :      * Set our waitLSN so WALSender will know when to wake us, and add
     206                 :      * ourselves to the queue.
     207 ECB             :      */
     208 CBC          34 :     MyProc->waitLSN = lsn;
     209              34 :     MyProc->syncRepState = SYNC_REP_WAITING;
     210              34 :     SyncRepQueueInsert(mode);
     211              34 :     Assert(SyncRepQueueIsOrderedByLSN(mode));
     212 GIC          34 :     LWLockRelease(SyncRepLock);
     213                 : 
     214 ECB             :     /* Alter ps display to show waiting for sync rep. */
     215 GIC          34 :     if (update_process_title)
     216                 :     {
     217                 :         char        buffer[32];
     218 ECB             : 
     219 GNC          34 :         sprintf(buffer, "waiting for %X/%X", LSN_FORMAT_ARGS(lsn));
     220              34 :         set_ps_display_suffix(buffer);
     221                 :     }
     222                 : 
     223                 :     /*
     224 ECB             :      * Wait for specified LSN to be confirmed.
     225                 :      *
     226                 :      * Each proc has its own wait latch, so we perform a normal latch
     227                 :      * check/wait loop here.
     228                 :      */
     229                 :     for (;;)
     230 GIC          34 :     {
     231                 :         int         rc;
     232                 : 
     233                 :         /* Must reset the latch before testing state. */
     234              68 :         ResetLatch(MyLatch);
     235                 : 
     236                 :         /*
     237 ECB             :          * Acquiring the lock is not needed, the latch ensures proper
     238                 :          * barriers. If it looks like we're done, we must really be done,
     239                 :          * because once walsender changes the state to SYNC_REP_WAIT_COMPLETE,
     240                 :          * it will never update it again, so we can't be seeing a stale value
     241                 :          * in that case.
     242                 :          */
     243 GIC          68 :         if (MyProc->syncRepState == SYNC_REP_WAIT_COMPLETE)
     244              34 :             break;
     245                 : 
     246                 :         /*
     247                 :          * If a wait for synchronous replication is pending, we can neither
     248                 :          * acknowledge the commit nor raise ERROR or FATAL.  The latter would
     249                 :          * lead the client to believe that the transaction aborted, which is
     250                 :          * not true: it's already committed locally. The former is no good
     251                 :          * either: the client has requested synchronous replication, and is
     252 ECB             :          * entitled to assume that an acknowledged commit is also replicated,
     253                 :          * which might not be true. So in this case we issue a WARNING (which
     254 EUB             :          * some clients may be able to interpret) and shut off further output.
     255                 :          * We do NOT reset ProcDiePending, so that the process will die after
     256                 :          * the commit is cleaned up.
     257                 :          */
     258 GBC          34 :         if (ProcDiePending)
     259 EUB             :         {
     260 UBC           0 :             ereport(WARNING,
     261                 :                     (errcode(ERRCODE_ADMIN_SHUTDOWN),
     262                 :                      errmsg("canceling the wait for synchronous replication and terminating connection due to administrator command"),
     263                 :                      errdetail("The transaction has already committed locally, but might not have been replicated to the standby.")));
     264 UIC           0 :             whereToSendOutput = DestNone;
     265               0 :             SyncRepCancelWait();
     266               0 :             break;
     267                 :         }
     268                 : 
     269 ECB             :         /*
     270                 :          * It's unclear what to do if a query cancel interrupt arrives.  We
     271 EUB             :          * can't actually abort at this point, but ignoring the interrupt
     272                 :          * altogether is not helpful, so we just terminate the wait with a
     273                 :          * suitable warning.
     274                 :          */
     275 GBC          34 :         if (QueryCancelPending)
     276 EUB             :         {
     277 UIC           0 :             QueryCancelPending = false;
     278               0 :             ereport(WARNING,
     279                 :                     (errmsg("canceling wait for synchronous replication due to user request"),
     280                 :                      errdetail("The transaction has already committed locally, but might not have been replicated to the standby.")));
     281               0 :             SyncRepCancelWait();
     282               0 :             break;
     283 ECB             :         }
     284                 : 
     285                 :         /*
     286                 :          * Wait on latch.  Any condition that should wake us up will set the
     287                 :          * latch, so no need for timeout.
     288                 :          */
     289 GIC          34 :         rc = WaitLatch(MyLatch, WL_LATCH_SET | WL_POSTMASTER_DEATH, -1,
     290 ECB             :                        WAIT_EVENT_SYNC_REP);
     291                 : 
     292 EUB             :         /*
     293                 :          * If the postmaster dies, we'll probably never get an acknowledgment,
     294                 :          * because all the wal sender processes will exit. So just bail out.
     295                 :          */
     296 GIC          34 :         if (rc & WL_POSTMASTER_DEATH)
     297                 :         {
     298 UIC           0 :             ProcDiePending = true;
     299               0 :             whereToSendOutput = DestNone;
     300               0 :             SyncRepCancelWait();
     301               0 :             break;
     302                 :         }
     303                 :     }
     304                 : 
     305                 :     /*
     306                 :      * WalSender has checked our LSN and has removed us from queue. Clean up
     307 ECB             :      * state and leave.  It's OK to reset these shared memory fields without
     308                 :      * holding SyncRepLock, because any walsenders will ignore us anyway when
     309                 :      * we're not on the queue.  We need a read barrier to make sure we see the
     310                 :      * changes to the queue link (this might be unnecessary without
     311                 :      * assertions, but better safe than sorry).
     312                 :      */
     313 CBC          34 :     pg_read_barrier();
     314 GNC          34 :     Assert(dlist_node_is_detached(&MyProc->syncRepLinks));
     315 GIC          34 :     MyProc->syncRepState = SYNC_REP_NOT_WAITING;
     316              34 :     MyProc->waitLSN = 0;
     317                 : 
     318                 :     /* reset ps display to remove the suffix */
     319 GNC          34 :     if (update_process_title)
     320              34 :         set_ps_display_remove_suffix();
     321 ECB             : }
     322                 : 
     323                 : /*
     324                 :  * Insert MyProc into the specified SyncRepQueue, maintaining sorted invariant.
     325                 :  *
     326                 :  * Usually we will go at tail of queue, though it's possible that we arrive
     327                 :  * here out of order, so start at tail and work back to insertion point.
     328                 :  */
     329                 : static void
     330 GIC          34 : SyncRepQueueInsert(int mode)
     331 EUB             : {
     332                 :     dlist_head *queue;
     333                 :     dlist_iter iter;
     334                 : 
     335 GIC          34 :     Assert(mode >= 0 && mode < NUM_SYNC_REP_WAIT_MODE);
     336 GNC          34 :     queue = &WalSndCtl->SyncRepQueue[mode];
     337                 : 
     338              34 :     dlist_reverse_foreach(iter, queue)
     339 EUB             :     {
     340 UNC           0 :         PGPROC     *proc = dlist_container(PGPROC, syncRepLinks, iter.cur);
     341                 : 
     342                 :         /*
     343                 :          * Stop at the queue element that we should insert after to ensure the
     344                 :          * queue is ordered by LSN.
     345                 :          */
     346 UIC           0 :         if (proc->waitLSN < MyProc->waitLSN)
     347                 :         {
     348 UNC           0 :             dlist_insert_after(&proc->syncRepLinks, &MyProc->syncRepLinks);
     349               0 :             return;
     350                 :         }
     351                 :     }
     352                 : 
     353                 :     /*
     354                 :      * If we get here, the list was either empty, or this process needs to be
     355                 :      * at the head.
     356                 :      */
     357 GNC          34 :     dlist_push_head(queue, &MyProc->syncRepLinks);
     358 EUB             : }
     359                 : 
     360                 : /*
     361                 :  * Acquire SyncRepLock and cancel any wait currently in progress.
     362                 :  */
     363                 : static void
     364 UIC           0 : SyncRepCancelWait(void)
     365                 : {
     366 LBC           0 :     LWLockAcquire(SyncRepLock, LW_EXCLUSIVE);
     367 UNC           0 :     if (!dlist_node_is_detached(&MyProc->syncRepLinks))
     368               0 :         dlist_delete_thoroughly(&MyProc->syncRepLinks);
     369 UIC           0 :     MyProc->syncRepState = SYNC_REP_NOT_WAITING;
     370               0 :     LWLockRelease(SyncRepLock);
     371               0 : }
     372 ECB             : 
     373                 : void
     374 GBC       11507 : SyncRepCleanupAtProcExit(void)
     375                 : {
     376                 :     /*
     377 EUB             :      * First check if we are removed from the queue without the lock to not
     378                 :      * slow down backend exit.
     379                 :      */
     380 GNC       11507 :     if (!dlist_node_is_detached(&MyProc->syncRepLinks))
     381                 :     {
     382 LBC           0 :         LWLockAcquire(SyncRepLock, LW_EXCLUSIVE);
     383                 : 
     384                 :         /* maybe we have just been removed, so recheck */
     385 UNC           0 :         if (!dlist_node_is_detached(&MyProc->syncRepLinks))
     386               0 :             dlist_delete_thoroughly(&MyProc->syncRepLinks);
     387                 : 
     388 UIC           0 :         LWLockRelease(SyncRepLock);
     389                 :     }
     390 GIC       11507 : }
     391                 : 
     392                 : /*
     393                 :  * ===========================================================
     394                 :  * Synchronous Replication functions for wal sender processes
     395 ECB             :  * ===========================================================
     396                 :  */
     397                 : 
     398                 : /*
     399                 :  * Take any action required to initialise sync rep state from config
     400                 :  * data. Called at WALSender startup and after each SIGHUP.
     401                 :  */
     402                 : void
     403 CBC         509 : SyncRepInitConfig(void)
     404 ECB             : {
     405                 :     int         priority;
     406                 : 
     407                 :     /*
     408                 :      * Determine if we are a potential sync standby and remember the result
     409                 :      * for handling replies from standby.
     410                 :      */
     411 GIC         509 :     priority = SyncRepGetStandbyPriority();
     412             509 :     if (MyWalSnd->sync_standby_priority != priority)
     413                 :     {
     414 CBC          20 :         SpinLockAcquire(&MyWalSnd->mutex);
     415 GIC          20 :         MyWalSnd->sync_standby_priority = priority;
     416              20 :         SpinLockRelease(&MyWalSnd->mutex);
     417                 : 
     418              20 :         ereport(DEBUG1,
     419                 :                 (errmsg_internal("standby \"%s\" now has synchronous standby priority %u",
     420                 :                                  application_name, priority)));
     421                 :     }
     422             509 : }
     423                 : 
     424 ECB             : /*
     425                 :  * Update the LSNs on each queue based upon our latest state. This
     426                 :  * implements a simple policy of first-valid-sync-standby-releases-waiter.
     427                 :  *
     428                 :  * Other policies are possible, which would change what we do here and
     429                 :  * perhaps also which information we store as well.
     430                 :  */
     431                 : void
     432 CBC       90688 : SyncRepReleaseWaiters(void)
     433 ECB             : {
     434 CBC       90688 :     volatile WalSndCtlData *walsndctl = WalSndCtl;
     435                 :     XLogRecPtr  writePtr;
     436                 :     XLogRecPtr  flushPtr;
     437                 :     XLogRecPtr  applyPtr;
     438                 :     bool        got_recptr;
     439                 :     bool        am_sync;
     440 GIC       90688 :     int         numwrite = 0;
     441           90688 :     int         numflush = 0;
     442           90688 :     int         numapply = 0;
     443 ECB             : 
     444                 :     /*
     445                 :      * If this WALSender is serving a standby that is not on the list of
     446                 :      * potential sync standbys then we have nothing to do. If we are still
     447                 :      * starting up, still running base backup or the current flush position is
     448                 :      * still invalid, then leave quickly also.  Streaming or stopping WAL
     449                 :      * senders are allowed to release waiters.
     450                 :      */
     451 GIC       90688 :     if (MyWalSnd->sync_standby_priority == 0 ||
     452             127 :         (MyWalSnd->state != WALSNDSTATE_STREAMING &&
     453              46 :          MyWalSnd->state != WALSNDSTATE_STOPPING) ||
     454             102 :         XLogRecPtrIsInvalid(MyWalSnd->flush))
     455                 :     {
     456 CBC       90586 :         announce_next_takeover = true;
     457 GIC       90586 :         return;
     458                 :     }
     459                 : 
     460                 :     /*
     461                 :      * We're a potential sync standby. Release waiters if there are enough
     462                 :      * sync standbys and we are considered as sync.
     463                 :      */
     464             102 :     LWLockAcquire(SyncRepLock, LW_EXCLUSIVE);
     465                 : 
     466 ECB             :     /*
     467                 :      * Check whether we are a sync standby or not, and calculate the synced
     468                 :      * positions among all sync standbys.  (Note: although this step does not
     469                 :      * of itself require holding SyncRepLock, it seems like a good idea to do
     470                 :      * it after acquiring the lock.  This ensures that the WAL pointers we use
     471                 :      * to release waiters are newer than any previous execution of this
     472                 :      * routine used.)
     473                 :      */
     474 CBC         102 :     got_recptr = SyncRepGetSyncRecPtr(&writePtr, &flushPtr, &applyPtr, &am_sync);
     475                 : 
     476 ECB             :     /*
     477                 :      * If we are managing a sync standby, though we weren't prior to this,
     478                 :      * then announce we are now a sync standby.
     479                 :      */
     480 GIC         102 :     if (announce_next_takeover && am_sync)
     481 EUB             :     {
     482 GIC           8 :         announce_next_takeover = false;
     483                 : 
     484               8 :         if (SyncRepConfig->syncrep_method == SYNC_REP_PRIORITY)
     485               8 :             ereport(LOG,
     486                 :                     (errmsg("standby \"%s\" is now a synchronous standby with priority %u",
     487                 :                             application_name, MyWalSnd->sync_standby_priority)));
     488                 :         else
     489 UIC           0 :             ereport(LOG,
     490 ECB             :                     (errmsg("standby \"%s\" is now a candidate for quorum synchronous standby",
     491                 :                             application_name)));
     492 EUB             :     }
     493                 : 
     494                 :     /*
     495                 :      * If the number of sync standbys is less than requested or we aren't
     496                 :      * managing a sync standby then just leave.
     497                 :      */
     498 GIC         102 :     if (!got_recptr || !am_sync)
     499                 :     {
     500 UIC           0 :         LWLockRelease(SyncRepLock);
     501 LBC           0 :         announce_next_takeover = !am_sync;
     502 UIC           0 :         return;
     503 ECB             :     }
     504                 : 
     505                 :     /*
     506                 :      * Set the lsn first so that when we wake backends they will release up to
     507                 :      * this location.
     508                 :      */
     509 CBC         102 :     if (walsndctl->lsn[SYNC_REP_WAIT_WRITE] < writePtr)
     510                 :     {
     511              36 :         walsndctl->lsn[SYNC_REP_WAIT_WRITE] = writePtr;
     512 GIC          36 :         numwrite = SyncRepWakeQueue(false, SYNC_REP_WAIT_WRITE);
     513 ECB             :     }
     514 CBC         102 :     if (walsndctl->lsn[SYNC_REP_WAIT_FLUSH] < flushPtr)
     515                 :     {
     516 GIC          41 :         walsndctl->lsn[SYNC_REP_WAIT_FLUSH] = flushPtr;
     517 CBC          41 :         numflush = SyncRepWakeQueue(false, SYNC_REP_WAIT_FLUSH);
     518                 :     }
     519             102 :     if (walsndctl->lsn[SYNC_REP_WAIT_APPLY] < applyPtr)
     520                 :     {
     521 GIC          41 :         walsndctl->lsn[SYNC_REP_WAIT_APPLY] = applyPtr;
     522              41 :         numapply = SyncRepWakeQueue(false, SYNC_REP_WAIT_APPLY);
     523                 :     }
     524                 : 
     525             102 :     LWLockRelease(SyncRepLock);
     526                 : 
     527             102 :     elog(DEBUG3, "released %d procs up to write %X/%X, %d procs up to flush %X/%X, %d procs up to apply %X/%X",
     528                 :          numwrite, LSN_FORMAT_ARGS(writePtr),
     529                 :          numflush, LSN_FORMAT_ARGS(flushPtr),
     530                 :          numapply, LSN_FORMAT_ARGS(applyPtr));
     531                 : }
     532                 : 
     533                 : /*
     534                 :  * Calculate the synced Write, Flush and Apply positions among sync standbys.
     535                 :  *
     536 ECB             :  * Return false if the number of sync standbys is less than
     537                 :  * synchronous_standby_names specifies. Otherwise return true and
     538                 :  * store the positions into *writePtr, *flushPtr and *applyPtr.
     539                 :  *
     540                 :  * On return, *am_sync is set to true if this walsender is connecting to
     541                 :  * sync standby. Otherwise it's set to false.
     542                 :  */
     543                 : static bool
     544 CBC         102 : SyncRepGetSyncRecPtr(XLogRecPtr *writePtr, XLogRecPtr *flushPtr,
     545 ECB             :                      XLogRecPtr *applyPtr, bool *am_sync)
     546                 : {
     547                 :     SyncRepStandbyData *sync_standbys;
     548                 :     int         num_standbys;
     549                 :     int         i;
     550                 : 
     551 EUB             :     /* Initialize default results */
     552 GIC         102 :     *writePtr = InvalidXLogRecPtr;
     553             102 :     *flushPtr = InvalidXLogRecPtr;
     554 CBC         102 :     *applyPtr = InvalidXLogRecPtr;
     555 GIC         102 :     *am_sync = false;
     556                 : 
     557 ECB             :     /* Quick out if not even configured to be synchronous */
     558 GIC         102 :     if (SyncRepConfig == NULL)
     559 LBC           0 :         return false;
     560                 : 
     561 ECB             :     /* Get standbys that are considered as synchronous at this moment */
     562 CBC         102 :     num_standbys = SyncRepGetCandidateStandbys(&sync_standbys);
     563                 : 
     564                 :     /* Am I among the candidate sync standbys? */
     565 GIC         102 :     for (i = 0; i < num_standbys; i++)
     566                 :     {
     567             102 :         if (sync_standbys[i].is_me)
     568                 :         {
     569             102 :             *am_sync = true;
     570 CBC         102 :             break;
     571 ECB             :         }
     572                 :     }
     573 EUB             : 
     574                 :     /*
     575                 :      * Nothing more to do if we are not managing a sync standby or there are
     576                 :      * not enough synchronous standbys.
     577                 :      */
     578 GIC         102 :     if (!(*am_sync) ||
     579             102 :         num_standbys < SyncRepConfig->num_sync)
     580                 :     {
     581 UIC           0 :         pfree(sync_standbys);
     582               0 :         return false;
     583                 :     }
     584                 : 
     585                 :     /*
     586                 :      * In a priority-based sync replication, the synced positions are the
     587                 :      * oldest ones among sync standbys. In a quorum-based, they are the Nth
     588                 :      * latest ones.
     589                 :      *
     590 ECB             :      * SyncRepGetNthLatestSyncRecPtr() also can calculate the oldest
     591                 :      * positions. But we use SyncRepGetOldestSyncRecPtr() for that calculation
     592                 :      * because it's a bit more efficient.
     593                 :      *
     594                 :      * XXX If the numbers of current and requested sync standbys are the same,
     595                 :      * we can use SyncRepGetOldestSyncRecPtr() to calculate the synced
     596                 :      * positions even in a quorum-based sync replication.
     597 EUB             :      */
     598 GIC         102 :     if (SyncRepConfig->syncrep_method == SYNC_REP_PRIORITY)
     599 EUB             :     {
     600 GIC         102 :         SyncRepGetOldestSyncRecPtr(writePtr, flushPtr, applyPtr,
     601                 :                                    sync_standbys, num_standbys);
     602 ECB             :     }
     603                 :     else
     604                 :     {
     605 UIC           0 :         SyncRepGetNthLatestSyncRecPtr(writePtr, flushPtr, applyPtr,
     606                 :                                       sync_standbys, num_standbys,
     607               0 :                                       SyncRepConfig->num_sync);
     608                 :     }
     609                 : 
     610 CBC         102 :     pfree(sync_standbys);
     611 GIC         102 :     return true;
     612                 : }
     613                 : 
     614                 : /*
     615                 :  * Calculate the oldest Write, Flush and Apply positions among sync standbys.
     616                 :  */
     617                 : static void
     618             102 : SyncRepGetOldestSyncRecPtr(XLogRecPtr *writePtr,
     619                 :                            XLogRecPtr *flushPtr,
     620                 :                            XLogRecPtr *applyPtr,
     621                 :                            SyncRepStandbyData *sync_standbys,
     622                 :                            int num_standbys)
     623 ECB             : {
     624                 :     int         i;
     625                 : 
     626                 :     /*
     627                 :      * Scan through all sync standbys and calculate the oldest Write, Flush
     628                 :      * and Apply positions.  We assume *writePtr et al were initialized to
     629                 :      * InvalidXLogRecPtr.
     630                 :      */
     631 CBC         204 :     for (i = 0; i < num_standbys; i++)
     632 ECB             :     {
     633 CBC         102 :         XLogRecPtr  write = sync_standbys[i].write;
     634             102 :         XLogRecPtr  flush = sync_standbys[i].flush;
     635 GIC         102 :         XLogRecPtr  apply = sync_standbys[i].apply;
     636 ECB             : 
     637 GIC         102 :         if (XLogRecPtrIsInvalid(*writePtr) || *writePtr > write)
     638             102 :             *writePtr = write;
     639             102 :         if (XLogRecPtrIsInvalid(*flushPtr) || *flushPtr > flush)
     640             102 :             *flushPtr = flush;
     641             102 :         if (XLogRecPtrIsInvalid(*applyPtr) || *applyPtr > apply)
     642             102 :             *applyPtr = apply;
     643 EUB             :     }
     644 GIC         102 : }
     645                 : 
     646                 : /*
     647                 :  * Calculate the Nth latest Write, Flush and Apply positions among sync
     648                 :  * standbys.
     649                 :  */
     650                 : static void
     651 UIC           0 : SyncRepGetNthLatestSyncRecPtr(XLogRecPtr *writePtr,
     652                 :                               XLogRecPtr *flushPtr,
     653                 :                               XLogRecPtr *applyPtr,
     654                 :                               SyncRepStandbyData *sync_standbys,
     655                 :                               int num_standbys,
     656 EUB             :                               uint8 nth)
     657                 : {
     658                 :     XLogRecPtr *write_array;
     659                 :     XLogRecPtr *flush_array;
     660                 :     XLogRecPtr *apply_array;
     661                 :     int         i;
     662                 : 
     663                 :     /* Should have enough candidates, or somebody messed up */
     664 UBC           0 :     Assert(nth > 0 && nth <= num_standbys);
     665 EUB             : 
     666 UBC           0 :     write_array = (XLogRecPtr *) palloc(sizeof(XLogRecPtr) * num_standbys);
     667 UIC           0 :     flush_array = (XLogRecPtr *) palloc(sizeof(XLogRecPtr) * num_standbys);
     668               0 :     apply_array = (XLogRecPtr *) palloc(sizeof(XLogRecPtr) * num_standbys);
     669                 : 
     670 UBC           0 :     for (i = 0; i < num_standbys; i++)
     671 EUB             :     {
     672 UBC           0 :         write_array[i] = sync_standbys[i].write;
     673 UIC           0 :         flush_array[i] = sync_standbys[i].flush;
     674               0 :         apply_array[i] = sync_standbys[i].apply;
     675 EUB             :     }
     676                 : 
     677                 :     /* Sort each array in descending order */
     678 UIC           0 :     qsort(write_array, num_standbys, sizeof(XLogRecPtr), cmp_lsn);
     679 UBC           0 :     qsort(flush_array, num_standbys, sizeof(XLogRecPtr), cmp_lsn);
     680               0 :     qsort(apply_array, num_standbys, sizeof(XLogRecPtr), cmp_lsn);
     681 EUB             : 
     682                 :     /* Get Nth latest Write, Flush, Apply positions */
     683 UIC           0 :     *writePtr = write_array[nth - 1];
     684               0 :     *flushPtr = flush_array[nth - 1];
     685               0 :     *applyPtr = apply_array[nth - 1];
     686                 : 
     687               0 :     pfree(write_array);
     688 UBC           0 :     pfree(flush_array);
     689 UIC           0 :     pfree(apply_array);
     690 UBC           0 : }
     691 EUB             : 
     692                 : /*
     693                 :  * Compare lsn in order to sort array in descending order.
     694                 :  */
     695                 : static int
     696 UBC           0 : cmp_lsn(const void *a, const void *b)
     697                 : {
     698               0 :     XLogRecPtr  lsn1 = *((const XLogRecPtr *) a);
     699 UIC           0 :     XLogRecPtr  lsn2 = *((const XLogRecPtr *) b);
     700                 : 
     701               0 :     if (lsn1 > lsn2)
     702               0 :         return -1;
     703               0 :     else if (lsn1 == lsn2)
     704               0 :         return 0;
     705                 :     else
     706               0 :         return 1;
     707                 : }
     708                 : 
     709 ECB             : /*
     710                 :  * Return data about walsenders that are candidates to be sync standbys.
     711                 :  *
     712                 :  * *standbys is set to a palloc'd array of structs of per-walsender data,
     713                 :  * and the number of valid entries (candidate sync senders) is returned.
     714                 :  * (This might be more or fewer than num_sync; caller must check.)
     715                 :  */
     716                 : int
     717 GIC         655 : SyncRepGetCandidateStandbys(SyncRepStandbyData **standbys)
     718                 : {
     719 ECB             :     int         i;
     720                 :     int         n;
     721                 : 
     722                 :     /* Create result array */
     723 CBC         655 :     *standbys = (SyncRepStandbyData *)
     724             655 :         palloc(max_wal_senders * sizeof(SyncRepStandbyData));
     725                 : 
     726                 :     /* Quick exit if sync replication is not requested */
     727 GIC         655 :     if (SyncRepConfig == NULL)
     728             536 :         return 0;
     729                 : 
     730                 :     /* Collect raw data from shared memory */
     731 CBC         119 :     n = 0;
     732            1309 :     for (i = 0; i < max_wal_senders; i++)
     733                 :     {
     734 ECB             :         volatile WalSnd *walsnd;    /* Use volatile pointer to prevent code
     735                 :                                      * rearrangement */
     736                 :         SyncRepStandbyData *stby;
     737                 :         WalSndState state;      /* not included in SyncRepStandbyData */
     738                 : 
     739 CBC        1190 :         walsnd = &WalSndCtl->walsnds[i];
     740            1190 :         stby = *standbys + n;
     741 ECB             : 
     742 GIC        1190 :         SpinLockAcquire(&walsnd->mutex);
     743            1190 :         stby->pid = walsnd->pid;
     744 CBC        1190 :         state = walsnd->state;
     745            1190 :         stby->write = walsnd->write;
     746 GIC        1190 :         stby->flush = walsnd->flush;
     747            1190 :         stby->apply = walsnd->apply;
     748 CBC        1190 :         stby->sync_standby_priority = walsnd->sync_standby_priority;
     749 GIC        1190 :         SpinLockRelease(&walsnd->mutex);
     750 EUB             : 
     751                 :         /* Must be active */
     752 GIC        1190 :         if (stby->pid == 0)
     753 CBC        1043 :             continue;
     754 ECB             : 
     755                 :         /* Must be streaming or stopping */
     756 GIC         147 :         if (state != WALSNDSTATE_STREAMING &&
     757 ECB             :             state != WALSNDSTATE_STOPPING)
     758 UBC           0 :             continue;
     759                 : 
     760                 :         /* Must be synchronous */
     761 CBC         147 :         if (stby->sync_standby_priority == 0)
     762               7 :             continue;
     763 ECB             : 
     764                 :         /* Must have a valid flush position */
     765 GIC         140 :         if (XLogRecPtrIsInvalid(stby->flush))
     766 UIC           0 :             continue;
     767                 : 
     768                 :         /* OK, it's a candidate */
     769 GIC         140 :         stby->walsnd_index = i;
     770             140 :         stby->is_me = (walsnd == MyWalSnd);
     771 CBC         140 :         n++;
     772 ECB             :     }
     773                 : 
     774                 :     /*
     775                 :      * In quorum mode, we return all the candidates.  In priority mode, if we
     776                 :      * have too many candidates then return only the num_sync ones of highest
     777                 :      * priority.
     778                 :      */
     779 GIC         119 :     if (SyncRepConfig->syncrep_method == SYNC_REP_PRIORITY &&
     780             118 :         n > SyncRepConfig->num_sync)
     781 ECB             :     {
     782                 :         /* Sort by priority ... */
     783 GIC           8 :         qsort(*standbys, n, sizeof(SyncRepStandbyData),
     784                 :               standby_priority_comparator);
     785                 :         /* ... then report just the first num_sync ones */
     786               8 :         n = SyncRepConfig->num_sync;
     787                 :     }
     788 ECB             : 
     789 GIC         119 :     return n;
     790 ECB             : }
     791                 : 
     792                 : /*
     793                 :  * qsort comparator to sort SyncRepStandbyData entries by priority
     794                 :  */
     795                 : static int
     796 GIC          19 : standby_priority_comparator(const void *a, const void *b)
     797                 : {
     798              19 :     const SyncRepStandbyData *sa = (const SyncRepStandbyData *) a;
     799              19 :     const SyncRepStandbyData *sb = (const SyncRepStandbyData *) b;
     800                 : 
     801                 :     /* First, sort by increasing priority value */
     802 CBC          19 :     if (sa->sync_standby_priority != sb->sync_standby_priority)
     803 GIC           9 :         return sa->sync_standby_priority - sb->sync_standby_priority;
     804                 : 
     805                 :     /*
     806                 :      * We might have equal priority values; arbitrarily break ties by position
     807                 :      * in the WALSnd array.  (This is utterly bogus, since that is arrival
     808                 :      * order dependent, but there are regression tests that rely on it.)
     809                 :      */
     810              10 :     return sa->walsnd_index - sb->walsnd_index;
     811                 : }
     812                 : 
     813                 : 
     814                 : /*
     815 ECB             :  * Check if we are in the list of sync standbys, and if so, determine
     816                 :  * priority sequence. Return priority if set, or zero to indicate that
     817                 :  * we are not a potential sync standby.
     818                 :  *
     819                 :  * Compare the parameter SyncRepStandbyNames against the application_name
     820                 :  * for this WALSender, or allow any name if we find a wildcard "*".
     821                 :  */
     822                 : static int
     823 GIC         509 : SyncRepGetStandbyPriority(void)
     824                 : {
     825 ECB             :     const char *standby_name;
     826                 :     int         priority;
     827 GIC         509 :     bool        found = false;
     828 ECB             : 
     829                 :     /*
     830                 :      * Since synchronous cascade replication is not allowed, we always set the
     831                 :      * priority of cascading walsender to zero.
     832                 :      */
     833 GIC         509 :     if (am_cascading_walsender)
     834 CBC          33 :         return 0;
     835 ECB             : 
     836 GIC         476 :     if (!SyncStandbysDefined() || SyncRepConfig == NULL)
     837 CBC         453 :         return 0;
     838 ECB             : 
     839 GIC          23 :     standby_name = SyncRepConfig->member_names;
     840 CBC          31 :     for (priority = 1; priority <= SyncRepConfig->nmembers; priority++)
     841                 :     {
     842 GIC          30 :         if (pg_strcasecmp(standby_name, application_name) == 0 ||
     843 CBC          18 :             strcmp(standby_name, "*") == 0)
     844 ECB             :         {
     845 GIC          22 :             found = true;
     846              22 :             break;
     847                 :         }
     848               8 :         standby_name += strlen(standby_name) + 1;
     849                 :     }
     850 ECB             : 
     851 GIC          23 :     if (!found)
     852               1 :         return 0;
     853                 : 
     854                 :     /*
     855                 :      * In quorum-based sync replication, all the standbys in the list have the
     856                 :      * same priority, one.
     857                 :      */
     858              22 :     return (SyncRepConfig->syncrep_method == SYNC_REP_PRIORITY) ? priority : 1;
     859                 : }
     860                 : 
     861                 : /*
     862 ECB             :  * Walk the specified queue from head.  Set the state of any backends that
     863                 :  * need to be woken, remove them from the queue, and then wake them.
     864                 :  * Pass all = true to wake whole queue; otherwise, just wake up to
     865                 :  * the walsender's LSN.
     866                 :  *
     867                 :  * The caller must hold SyncRepLock in exclusive mode.
     868                 :  */
     869                 : static int
     870 CBC         118 : SyncRepWakeQueue(bool all, int mode)
     871                 : {
     872             118 :     volatile WalSndCtlData *walsndctl = WalSndCtl;
     873 GIC         118 :     int         numprocs = 0;
     874                 :     dlist_mutable_iter iter;
     875                 : 
     876             118 :     Assert(mode >= 0 && mode < NUM_SYNC_REP_WAIT_MODE);
     877             118 :     Assert(LWLockHeldByMeInMode(SyncRepLock, LW_EXCLUSIVE));
     878 CBC         118 :     Assert(SyncRepQueueIsOrderedByLSN(mode));
     879 ECB             : 
     880 GNC         132 :     dlist_foreach_modify(iter, &WalSndCtl->SyncRepQueue[mode])
     881                 :     {
     882              19 :         PGPROC *proc = dlist_container(PGPROC, syncRepLinks, iter.cur);
     883                 : 
     884                 :         /*
     885                 :          * Assume the queue is ordered by LSN
     886                 :          */
     887 GIC          19 :         if (!all && walsndctl->lsn[mode] < proc->waitLSN)
     888               5 :             return numprocs;
     889 ECB             : 
     890                 :         /*
     891                 :          * Remove from queue.
     892                 :          */
     893 GNC          14 :         dlist_delete_thoroughly(&proc->syncRepLinks);
     894                 : 
     895                 :         /*
     896 ECB             :          * SyncRepWaitForLSN() reads syncRepState without holding the lock, so
     897                 :          * make sure that it sees the queue link being removed before the
     898                 :          * syncRepState change.
     899                 :          */
     900 GIC          14 :         pg_write_barrier();
     901                 : 
     902                 :         /*
     903                 :          * Set state to complete; see SyncRepWaitForLSN() for discussion of
     904                 :          * the various states.
     905                 :          */
     906 GNC          14 :         proc->syncRepState = SYNC_REP_WAIT_COMPLETE;
     907 ECB             : 
     908                 :         /*
     909                 :          * Wake only when we have set state and removed from queue.
     910                 :          */
     911 GNC          14 :         SetLatch(&(proc->procLatch));
     912                 : 
     913 CBC          14 :         numprocs++;
     914                 :     }
     915                 : 
     916 GIC         113 :     return numprocs;
     917                 : }
     918                 : 
     919                 : /*
     920 ECB             :  * The checkpointer calls this as needed to update the shared
     921                 :  * sync_standbys_defined flag, so that backends don't remain permanently wedged
     922                 :  * if synchronous_standby_names is unset.  It's safe to check the current value
     923                 :  * without the lock, because it's only ever updated by one process.  But we
     924 EUB             :  * must take the lock to change it.
     925                 :  */
     926                 : void
     927 GIC         391 : SyncRepUpdateSyncStandbysDefined(void)
     928                 : {
     929             391 :     bool        sync_standbys_defined = SyncStandbysDefined();
     930                 : 
     931             391 :     if (sync_standbys_defined != WalSndCtl->sync_standbys_defined)
     932                 :     {
     933               8 :         LWLockAcquire(SyncRepLock, LW_EXCLUSIVE);
     934                 : 
     935 ECB             :         /*
     936                 :          * If synchronous_standby_names has been reset to empty, it's futile
     937                 :          * for backends to continue waiting.  Since the user no longer wants
     938                 :          * synchronous replication, we'd better wake them up.
     939                 :          */
     940 GIC           8 :         if (!sync_standbys_defined)
     941                 :         {
     942                 :             int         i;
     943 ECB             : 
     944 UIC           0 :             for (i = 0; i < NUM_SYNC_REP_WAIT_MODE; i++)
     945               0 :                 SyncRepWakeQueue(true, i);
     946                 :         }
     947                 : 
     948 ECB             :         /*
     949                 :          * Only allow people to join the queue when there are synchronous
     950                 :          * standbys defined.  Without this interlock, there's a race
     951                 :          * condition: we might wake up all the current waiters; then, some
     952                 :          * backend that hasn't yet reloaded its config might go to sleep on
     953                 :          * the queue (and never wake up).  This prevents that.
     954                 :          */
     955 GIC           8 :         WalSndCtl->sync_standbys_defined = sync_standbys_defined;
     956                 : 
     957               8 :         LWLockRelease(SyncRepLock);
     958                 :     }
     959             391 : }
     960 ECB             : 
     961 EUB             : #ifdef USE_ASSERT_CHECKING
     962                 : static bool
     963 CBC         152 : SyncRepQueueIsOrderedByLSN(int mode)
     964                 : {
     965 ECB             :     XLogRecPtr  lastLSN;
     966                 :     dlist_iter  iter;
     967                 : 
     968 GIC         152 :     Assert(mode >= 0 && mode < NUM_SYNC_REP_WAIT_MODE);
     969                 : 
     970             152 :     lastLSN = 0;
     971                 : 
     972 GNC         205 :     dlist_foreach(iter, &WalSndCtl->SyncRepQueue[mode])
     973 ECB             :     {
     974 GNC          53 :         PGPROC     *proc = dlist_container(PGPROC, syncRepLinks, iter.cur);
     975                 : 
     976                 :         /*
     977 ECB             :          * Check the queue is ordered by LSN and that multiple procs don't
     978                 :          * have matching LSNs
     979                 :          */
     980 GIC          53 :         if (proc->waitLSN <= lastLSN)
     981 UIC           0 :             return false;
     982                 : 
     983 CBC          53 :         lastLSN = proc->waitLSN;
     984 ECB             :     }
     985                 : 
     986 GIC         152 :     return true;
     987 ECB             : }
     988                 : #endif
     989 EUB             : 
     990                 : /*
     991                 :  * ===========================================================
     992                 :  * Synchronous Replication functions executed by any process
     993                 :  * ===========================================================
     994                 :  */
     995                 : 
     996                 : bool
     997 CBC        1926 : check_synchronous_standby_names(char **newval, void **extra, GucSource source)
     998                 : {
     999 GBC        1926 :     if (*newval != NULL && (*newval)[0] != '\0')
    1000              67 :     {
    1001 EUB             :         int         parse_rc;
    1002                 :         SyncRepConfigData *pconf;
    1003                 : 
    1004                 :         /* Reset communication variables to ensure a fresh start */
    1005 GIC          67 :         syncrep_parse_result = NULL;
    1006 CBC          67 :         syncrep_parse_error_msg = NULL;
    1007 ECB             : 
    1008 EUB             :         /* Parse the synchronous_standby_names string */
    1009 CBC          67 :         syncrep_scanner_init(*newval);
    1010 GIC          67 :         parse_rc = syncrep_yyparse();
    1011 CBC          67 :         syncrep_scanner_finish();
    1012                 : 
    1013 GIC          67 :         if (parse_rc != 0 || syncrep_parse_result == NULL)
    1014                 :         {
    1015 UIC           0 :             GUC_check_errcode(ERRCODE_SYNTAX_ERROR);
    1016               0 :             if (syncrep_parse_error_msg)
    1017               0 :                 GUC_check_errdetail("%s", syncrep_parse_error_msg);
    1018                 :             else
    1019               0 :                 GUC_check_errdetail("synchronous_standby_names parser failed");
    1020               0 :             return false;
    1021                 :         }
    1022 ECB             : 
    1023 GIC          67 :         if (syncrep_parse_result->num_sync <= 0)
    1024 ECB             :         {
    1025 UIC           0 :             GUC_check_errmsg("number of synchronous standbys (%d) must be greater than zero",
    1026               0 :                              syncrep_parse_result->num_sync);
    1027               0 :             return false;
    1028 ECB             :         }
    1029                 : 
    1030                 :         /* GUC extra value must be guc_malloc'd, not palloc'd */
    1031                 :         pconf = (SyncRepConfigData *)
    1032 GNC          67 :             guc_malloc(LOG, syncrep_parse_result->config_size);
    1033 GIC          67 :         if (pconf == NULL)
    1034 LBC           0 :             return false;
    1035 GIC          67 :         memcpy(pconf, syncrep_parse_result, syncrep_parse_result->config_size);
    1036 ECB             : 
    1037 GIC          67 :         *extra = (void *) pconf;
    1038 EUB             : 
    1039                 :         /*
    1040                 :          * We need not explicitly clean up syncrep_parse_result.  It, and any
    1041 ECB             :          * other cruft generated during parsing, will be freed when the
    1042                 :          * current memory context is deleted.  (This code is generally run in
    1043                 :          * a short-lived context used for config file processing, so that will
    1044 EUB             :          * not be very long.)
    1045                 :          */
    1046                 :     }
    1047 ECB             :     else
    1048 CBC        1859 :         *extra = NULL;
    1049 ECB             : 
    1050 GIC        1926 :     return true;
    1051 ECB             : }
    1052                 : 
    1053                 : void
    1054 GIC        1916 : assign_synchronous_standby_names(const char *newval, void *extra)
    1055                 : {
    1056            1916 :     SyncRepConfig = (SyncRepConfigData *) extra;
    1057            1916 : }
    1058                 : 
    1059                 : void
    1060            2335 : assign_synchronous_commit(int newval, void *extra)
    1061                 : {
    1062            2335 :     switch (newval)
    1063                 :     {
    1064 UIC           0 :         case SYNCHRONOUS_COMMIT_REMOTE_WRITE:
    1065               0 :             SyncRepWaitMode = SYNC_REP_WAIT_WRITE;
    1066               0 :             break;
    1067 GIC        1976 :         case SYNCHRONOUS_COMMIT_REMOTE_FLUSH:
    1068            1976 :             SyncRepWaitMode = SYNC_REP_WAIT_FLUSH;
    1069            1976 :             break;
    1070 UIC           0 :         case SYNCHRONOUS_COMMIT_REMOTE_APPLY:
    1071               0 :             SyncRepWaitMode = SYNC_REP_WAIT_APPLY;
    1072               0 :             break;
    1073 GIC         359 :         default:
    1074             359 :             SyncRepWaitMode = SYNC_REP_NO_WAIT;
    1075             359 :             break;
    1076                 :     }
    1077            2335 : }
        

Generated by: LCOV version v1.16-55-g56c0a2a