LCOV - differential code coverage report
Current view: top level - src/backend/replication - syncrep.c (source / functions) Coverage Total Hit UNC LBC UIC UBC GBC GIC GNC CBC EUB ECB DUB DCB
Current: Differential Code Coverage HEAD vs 15 Lines: 73.1 % 308 225 7 5 59 12 5 143 18 59 62 148 4 13
Current Date: 2023-04-08 17:13:01 Functions: 84.2 % 19 16 3 16 2 16 1
Baseline: 15 Line coverage date bins:
Baseline Date: 2023-04-08 15:09:40 [..60] days: 100.0 % 4 4 4
Legend: Lines: hit not hit (60,120] days: 65.0 % 20 13 7 13
(120,180] days: 100.0 % 1 1 1
(240..) days: 73.1 % 283 207 5 59 12 5 143 59 62 148
Function coverage date bins:
(240..) days: 43.2 % 37 16 3 16 2 16

 Age         Owner                  TLA  Line data    Source code
                                  1                 : /*-------------------------------------------------------------------------
                                  2                 :  *
                                  3                 :  * syncrep.c
                                  4                 :  *
                                  5                 :  * Synchronous replication is new as of PostgreSQL 9.1.
                                  6                 :  *
                                  7                 :  * If requested, transaction commits wait until their commit LSN are
                                  8                 :  * acknowledged by the synchronous standbys.
                                  9                 :  *
                                 10                 :  * This module contains the code for waiting and release of backends.
                                 11                 :  * All code in this module executes on the primary. The core streaming
                                 12                 :  * replication transport remains within WALreceiver/WALsender modules.
                                 13                 :  *
                                 14                 :  * The essence of this design is that it isolates all logic about
                                 15                 :  * waiting/releasing onto the primary. The primary defines which standbys
                                 16                 :  * it wishes to wait for. The standbys are completely unaware of the
                                 17                 :  * durability requirements of transactions on the primary, reducing the
                                 18                 :  * complexity of the code and streamlining both standby operations and
                                 19                 :  * network bandwidth because there is no requirement to ship
                                 20                 :  * per-transaction state information.
                                 21                 :  *
                                 22                 :  * Replication is either synchronous or not synchronous (async). If it is
                                 23                 :  * async, we just fastpath out of here. If it is sync, then we wait for
                                 24                 :  * the write, flush or apply location on the standby before releasing
                                 25                 :  * the waiting backend. Further complexity in that interaction is
                                 26                 :  * expected in later releases.
                                 27                 :  *
                                 28                 :  * The best performing way to manage the waiting backends is to have a
                                 29                 :  * single ordered queue of waiting backends, so that we can avoid
                                 30                 :  * searching the through all waiters each time we receive a reply.
                                 31                 :  *
                                 32                 :  * In 9.5 or before only a single standby could be considered as
                                 33                 :  * synchronous. In 9.6 we support a priority-based multiple synchronous
                                 34                 :  * standbys. In 10.0 a quorum-based multiple synchronous standbys is also
                                 35                 :  * supported. The number of synchronous standbys that transactions
                                 36                 :  * must wait for replies from is specified in synchronous_standby_names.
                                 37                 :  * This parameter also specifies a list of standby names and the method
                                 38                 :  * (FIRST and ANY) to choose synchronous standbys from the listed ones.
                                 39                 :  *
                                 40                 :  * The method FIRST specifies a priority-based synchronous replication
                                 41                 :  * and makes transaction commits wait until their WAL records are
                                 42                 :  * replicated to the requested number of synchronous standbys chosen based
                                 43                 :  * on their priorities. The standbys whose names appear earlier in the list
                                 44                 :  * are given higher priority and will be considered as synchronous.
                                 45                 :  * Other standby servers appearing later in this list represent potential
                                 46                 :  * synchronous standbys. If any of the current synchronous standbys
                                 47                 :  * disconnects for whatever reason, it will be replaced immediately with
                                 48                 :  * the next-highest-priority standby.
                                 49                 :  *
                                 50                 :  * The method ANY specifies a quorum-based synchronous replication
                                 51                 :  * and makes transaction commits wait until their WAL records are
                                 52                 :  * replicated to at least the requested number of synchronous standbys
                                 53                 :  * in the list. All the standbys appearing in the list are considered as
                                 54                 :  * candidates for quorum synchronous standbys.
                                 55                 :  *
                                 56                 :  * If neither FIRST nor ANY is specified, FIRST is used as the method.
                                 57                 :  * This is for backward compatibility with 9.6 or before where only a
                                 58                 :  * priority-based sync replication was supported.
                                 59                 :  *
                                 60                 :  * Before the standbys chosen from synchronous_standby_names can
                                 61                 :  * become the synchronous standbys they must have caught up with
                                 62                 :  * the primary; that may take some time. Once caught up,
                                 63                 :  * the standbys which are considered as synchronous at that moment
                                 64                 :  * will release waiters from the queue.
                                 65                 :  *
                                 66                 :  * Portions Copyright (c) 2010-2023, PostgreSQL Global Development Group
                                 67                 :  *
                                 68                 :  * IDENTIFICATION
                                 69                 :  *    src/backend/replication/syncrep.c
                                 70                 :  *
                                 71                 :  *-------------------------------------------------------------------------
                                 72                 :  */
                                 73                 : #include "postgres.h"
                                 74                 : 
                                 75                 : #include <unistd.h>
                                 76                 : 
                                 77                 : #include "access/xact.h"
                                 78                 : #include "miscadmin.h"
                                 79                 : #include "pgstat.h"
                                 80                 : #include "replication/syncrep.h"
                                 81                 : #include "replication/walsender.h"
                                 82                 : #include "replication/walsender_private.h"
                                 83                 : #include "storage/pmsignal.h"
                                 84                 : #include "storage/proc.h"
                                 85                 : #include "tcop/tcopprot.h"
                                 86                 : #include "utils/builtins.h"
                                 87                 : #include "utils/guc_hooks.h"
                                 88                 : #include "utils/ps_status.h"
                                 89                 : 
                                 90                 : /* User-settable parameters for sync rep */
                                 91                 : char       *SyncRepStandbyNames;
                                 92                 : 
                                 93                 : #define SyncStandbysDefined() \
                                 94                 :     (SyncRepStandbyNames != NULL && SyncRepStandbyNames[0] != '\0')
                                 95                 : 
                                 96                 : static bool announce_next_takeover = true;
                                 97                 : 
                                 98                 : SyncRepConfigData *SyncRepConfig = NULL;
                                 99                 : static int  SyncRepWaitMode = SYNC_REP_NO_WAIT;
                                100                 : 
                                101                 : static void SyncRepQueueInsert(int mode);
                                102                 : static void SyncRepCancelWait(void);
                                103                 : static int  SyncRepWakeQueue(bool all, int mode);
                                104                 : 
                                105                 : static bool SyncRepGetSyncRecPtr(XLogRecPtr *writePtr,
                                106                 :                                  XLogRecPtr *flushPtr,
                                107                 :                                  XLogRecPtr *applyPtr,
                                108                 :                                  bool *am_sync);
                                109                 : static void SyncRepGetOldestSyncRecPtr(XLogRecPtr *writePtr,
                                110                 :                                        XLogRecPtr *flushPtr,
                                111                 :                                        XLogRecPtr *applyPtr,
                                112                 :                                        SyncRepStandbyData *sync_standbys,
                                113                 :                                        int num_standbys);
                                114                 : static void SyncRepGetNthLatestSyncRecPtr(XLogRecPtr *writePtr,
                                115                 :                                           XLogRecPtr *flushPtr,
                                116                 :                                           XLogRecPtr *applyPtr,
                                117                 :                                           SyncRepStandbyData *sync_standbys,
                                118                 :                                           int num_standbys,
                                119                 :                                           uint8 nth);
                                120                 : static int  SyncRepGetStandbyPriority(void);
                                121                 : static int  standby_priority_comparator(const void *a, const void *b);
                                122                 : static int  cmp_lsn(const void *a, const void *b);
                                123                 : 
                                124                 : #ifdef USE_ASSERT_CHECKING
                                125                 : static bool SyncRepQueueIsOrderedByLSN(int mode);
                                126                 : #endif
                                127                 : 
                                128                 : /*
                                129                 :  * ===========================================================
                                130                 :  * Synchronous Replication functions for normal user backends
                                131                 :  * ===========================================================
                                132                 :  */
                                133                 : 
                                134                 : /*
                                135                 :  * Wait for synchronous replication, if requested by user.
                                136                 :  *
                                137                 :  * Initially backends start in state SYNC_REP_NOT_WAITING and then
                                138                 :  * change that state to SYNC_REP_WAITING before adding ourselves
                                139                 :  * to the wait queue. During SyncRepWakeQueue() a WALSender changes
                                140                 :  * the state to SYNC_REP_WAIT_COMPLETE once replication is confirmed.
                                141                 :  * This backend then resets its state to SYNC_REP_NOT_WAITING.
                                142                 :  *
                                143                 :  * 'lsn' represents the LSN to wait for.  'commit' indicates whether this LSN
                                144                 :  * represents a commit record.  If it doesn't, then we wait only for the WAL
                                145                 :  * to be flushed if synchronous_commit is set to the higher level of
                                146                 :  * remote_apply, because only commit records provide apply feedback.
                                147                 :  */
                                148                 : void
 2567 rhaas                     149 GIC      290343 : SyncRepWaitForLSN(XLogRecPtr lsn, bool commit)
 4417 simon                     150 ECB             : {
                                151                 :     int         mode;
                                152                 : 
                                153                 :     /*
                                154                 :      * This should be called while holding interrupts during a transaction
                                155                 :      * commit to prevent the follow-up shared memory queue cleanups to be
                                156                 :      * influenced by external interruptions.
 1255 michael                   157                 :      */
 1255 michael                   158 GIC      290343 :     Assert(InterruptHoldoffCount > 0);
                                159                 : 
                                160                 :     /*
                                161                 :      * Fast exit if user has not requested sync replication, or there are no
                                162                 :      * sync replication standby names defined.
                                163                 :      *
                                164                 :      * Since this routine gets called every commit time, it's important to
                                165                 :      * exit quickly if sync replication is not requested. So we check
                                166                 :      * WalSndCtl->sync_standbys_defined flag without the lock and exit
                                167                 :      * immediately if it's false. If it's true, we need to check it again
                                168                 :      * later while holding the lock, to check the flag and operate the sync
                                169                 :      * rep queue atomically. This is necessary to avoid the race condition
                                170                 :      * described in SyncRepUpdateSyncStandbysDefined(). On the other hand, if
                                171                 :      * it's false, the lock is not necessary because we don't touch the queue.
  949 fujii                     172 ECB             :      */
  949 fujii                     173 CBC      290343 :     if (!SyncRepRequested() ||
                                174          266442 :         !((volatile WalSndCtlData *) WalSndCtl)->sync_standbys_defined)
  949 fujii                     175 GIC      290299 :         return;
                                176                 : 
 2567 rhaas                     177 ECB             :     /* Cap the level for anything other than commit to remote flush only. */
 2567 rhaas                     178 CBC          44 :     if (commit)
 2567 rhaas                     179 GIC          22 :         mode = SyncRepWaitMode;
 2567 rhaas                     180 ECB             :     else
 2567 rhaas                     181 GIC          22 :         mode = Min(SyncRepWaitMode, SYNC_REP_WAIT_FLUSH);
 4417 simon                     182 ECB             : 
   81 andres                    183 GNC          44 :     Assert(dlist_node_is_detached(&MyProc->syncRepLinks));
 4406 rhaas                     184 GIC          44 :     Assert(WalSndCtl != NULL);
 4406 rhaas                     185 ECB             : 
 4406 rhaas                     186 CBC          44 :     LWLockAcquire(SyncRepLock, LW_EXCLUSIVE);
 4406 rhaas                     187 GIC          44 :     Assert(MyProc->syncRepState == SYNC_REP_NOT_WAITING);
                                188                 : 
                                189                 :     /*
                                190                 :      * We don't wait for sync rep if WalSndCtl->sync_standbys_defined is not
                                191                 :      * set.  See SyncRepUpdateSyncStandbysDefined.
                                192                 :      *
                                193                 :      * Also check that the standby hasn't already replied. Unlikely race
                                194                 :      * condition but we'll be fetching that cache line anyway so it's likely
                                195                 :      * to be a low cost check.
 4397 simon                     196 ECB             :      */
 4397 simon                     197 CBC          44 :     if (!WalSndCtl->sync_standbys_defined ||
 2567 rhaas                     198 GIC          44 :         lsn <= WalSndCtl->lsn[mode])
 4406 rhaas                     199 ECB             :     {
 4406 rhaas                     200 CBC          10 :         LWLockRelease(SyncRepLock);
 4406 rhaas                     201 GIC          10 :         return;
                                202                 :     }
                                203                 : 
                                204                 :     /*
                                205                 :      * Set our waitLSN so WALSender will know when to wake us, and add
                                206                 :      * ourselves to the queue.
 4397 simon                     207 ECB             :      */
 2567 rhaas                     208 CBC          34 :     MyProc->waitLSN = lsn;
 4406                           209              34 :     MyProc->syncRepState = SYNC_REP_WAITING;
 4087 simon                     210              34 :     SyncRepQueueInsert(mode);
                                211              34 :     Assert(SyncRepQueueIsOrderedByLSN(mode));
 4406 rhaas                     212 GIC          34 :     LWLockRelease(SyncRepLock);
                                213                 : 
 4406 rhaas                     214 ECB             :     /* Alter ps display to show waiting for sync rep. */
 4406 rhaas                     215 GIC          34 :     if (update_process_title)
                                216                 :     {
                                217                 :         char        buffer[32];
 4406 rhaas                     218 ECB             : 
   48 drowley                   219 GNC          34 :         sprintf(buffer, "waiting for %X/%X", LSN_FORMAT_ARGS(lsn));
                                220              34 :         set_ps_display_suffix(buffer);
                                221                 :     }
                                222                 : 
                                223                 :     /*
 4417 simon                     224 ECB             :      * Wait for specified LSN to be confirmed.
                                225                 :      *
                                226                 :      * Each proc has its own wait latch, so we perform a normal latch
                                227                 :      * check/wait loop here.
                                228                 :      */
                                229                 :     for (;;)
 4417 simon                     230 GIC          34 :     {
                                231                 :         int         rc;
                                232                 : 
                                233                 :         /* Must reset the latch before testing state. */
 3007 andres                    234              68 :         ResetLatch(MyLatch);
                                235                 : 
                                236                 :         /*
 2428 tgl                       237 ECB             :          * Acquiring the lock is not needed, the latch ensures proper
                                238                 :          * barriers. If it looks like we're done, we must really be done,
                                239                 :          * because once walsender changes the state to SYNC_REP_WAIT_COMPLETE,
                                240                 :          * it will never update it again, so we can't be seeing a stale value
                                241                 :          * in that case.
                                242                 :          */
 2431 simon                     243 GIC          68 :         if (MyProc->syncRepState == SYNC_REP_WAIT_COMPLETE)
 4406 rhaas                     244              34 :             break;
                                245                 : 
                                246                 :         /*
                                247                 :          * If a wait for synchronous replication is pending, we can neither
                                248                 :          * acknowledge the commit nor raise ERROR or FATAL.  The latter would
                                249                 :          * lead the client to believe that the transaction aborted, which is
                                250                 :          * not true: it's already committed locally. The former is no good
                                251                 :          * either: the client has requested synchronous replication, and is
 4382 bruce                     252 ECB             :          * entitled to assume that an acknowledged commit is also replicated,
                                253                 :          * which might not be true. So in this case we issue a WARNING (which
 4382 bruce                     254 EUB             :          * some clients may be able to interpret) and shut off further output.
                                255                 :          * We do NOT reset ProcDiePending, so that the process will die after
                                256                 :          * the commit is cleaned up.
                                257                 :          */
 4406 rhaas                     258 GBC          34 :         if (ProcDiePending)
 4406 rhaas                     259 EUB             :         {
 4406 rhaas                     260 UBC           0 :             ereport(WARNING,
                                261                 :                     (errcode(ERRCODE_ADMIN_SHUTDOWN),
                                262                 :                      errmsg("canceling the wait for synchronous replication and terminating connection due to administrator command"),
                                263                 :                      errdetail("The transaction has already committed locally, but might not have been replicated to the standby.")));
 4406 rhaas                     264 UIC           0 :             whereToSendOutput = DestNone;
                                265               0 :             SyncRepCancelWait();
                                266               0 :             break;
                                267                 :         }
                                268                 : 
 4406 rhaas                     269 ECB             :         /*
                                270                 :          * It's unclear what to do if a query cancel interrupt arrives.  We
 4406 rhaas                     271 EUB             :          * can't actually abort at this point, but ignoring the interrupt
 4382 bruce                     272                 :          * altogether is not helpful, so we just terminate the wait with a
                                273                 :          * suitable warning.
                                274                 :          */
 4406 rhaas                     275 GBC          34 :         if (QueryCancelPending)
 4406 rhaas                     276 EUB             :         {
 4406 rhaas                     277 UIC           0 :             QueryCancelPending = false;
                                278               0 :             ereport(WARNING,
                                279                 :                     (errmsg("canceling wait for synchronous replication due to user request"),
                                280                 :                      errdetail("The transaction has already committed locally, but might not have been replicated to the standby.")));
                                281               0 :             SyncRepCancelWait();
                                282               0 :             break;
 4406 rhaas                     283 ECB             :         }
                                284                 : 
                                285                 :         /*
                                286                 :          * Wait on latch.  Any condition that should wake us up will set the
                                287                 :          * latch, so no need for timeout.
                                288                 :          */
 1598 tmunro                    289 GIC          34 :         rc = WaitLatch(MyLatch, WL_LATCH_SET | WL_POSTMASTER_DEATH, -1,
 1598 tmunro                    290 ECB             :                        WAIT_EVENT_SYNC_REP);
                                291                 : 
 4406 rhaas                     292 EUB             :         /*
 1418 tgl                       293                 :          * If the postmaster dies, we'll probably never get an acknowledgment,
                                294                 :          * because all the wal sender processes will exit. So just bail out.
 4406 rhaas                     295                 :          */
 1598 tmunro                    296 GIC          34 :         if (rc & WL_POSTMASTER_DEATH)
                                297                 :         {
 4406 rhaas                     298 UIC           0 :             ProcDiePending = true;
                                299               0 :             whereToSendOutput = DestNone;
                                300               0 :             SyncRepCancelWait();
                                301               0 :             break;
                                302                 :         }
                                303                 :     }
                                304                 : 
                                305                 :     /*
                                306                 :      * WalSender has checked our LSN and has removed us from queue. Clean up
 4406 rhaas                     307 ECB             :      * state and leave.  It's OK to reset these shared memory fields without
                                308                 :      * holding SyncRepLock, because any walsenders will ignore us anyway when
 2064 tgl                       309                 :      * we're not on the queue.  We need a read barrier to make sure we see the
                                310                 :      * changes to the queue link (this might be unnecessary without
                                311                 :      * assertions, but better safe than sorry).
                                312                 :      */
 2097 heikki.linnakangas        313 CBC          34 :     pg_read_barrier();
   81 andres                    314 GNC          34 :     Assert(dlist_node_is_detached(&MyProc->syncRepLinks));
 4406 rhaas                     315 GIC          34 :     MyProc->syncRepState = SYNC_REP_NOT_WAITING;
 3941 heikki.linnakangas        316              34 :     MyProc->waitLSN = 0;
                                317                 : 
                                318                 :     /* reset ps display to remove the suffix */
   48 drowley                   319 GNC          34 :     if (update_process_title)
                                320              34 :         set_ps_display_remove_suffix();
 4417 simon                     321 ECB             : }
                                322                 : 
                                323                 : /*
                                324                 :  * Insert MyProc into the specified SyncRepQueue, maintaining sorted invariant.
                                325                 :  *
 4413 rhaas                     326                 :  * Usually we will go at tail of queue, though it's possible that we arrive
 4417 simon                     327                 :  * here out of order, so start at tail and work back to insertion point.
                                328                 :  */
                                329                 : static void
 4093 simon                     330 GIC          34 : SyncRepQueueInsert(int mode)
 4417 simon                     331 EUB             : {
                                332                 :     dlist_head *queue;
                                333                 :     dlist_iter iter;
                                334                 : 
 4093 simon                     335 GIC          34 :     Assert(mode >= 0 && mode < NUM_SYNC_REP_WAIT_MODE);
   81 andres                    336 GNC          34 :     queue = &WalSndCtl->SyncRepQueue[mode];
                                337                 : 
                                338              34 :     dlist_reverse_foreach(iter, queue)
 4417 simon                     339 EUB             :     {
   81 andres                    340 UNC           0 :         PGPROC     *proc = dlist_container(PGPROC, syncRepLinks, iter.cur);
                                341                 : 
                                342                 :         /*
                                343                 :          * Stop at the queue element that we should insert after to ensure the
                                344                 :          * queue is ordered by LSN.
                                345                 :          */
 3754 alvherre                  346 UIC           0 :         if (proc->waitLSN < MyProc->waitLSN)
                                347                 :         {
   81 andres                    348 UNC           0 :             dlist_insert_after(&proc->syncRepLinks, &MyProc->syncRepLinks);
                                349               0 :             return;
                                350                 :         }
                                351                 :     }
                                352                 : 
                                353                 :     /*
                                354                 :      * If we get here, the list was either empty, or this process needs to be
                                355                 :      * at the head.
                                356                 :      */
   81 andres                    357 GNC          34 :     dlist_push_head(queue, &MyProc->syncRepLinks);
 4417 simon                     358 EUB             : }
                                359                 : 
 4406 rhaas                     360                 : /*
                                361                 :  * Acquire SyncRepLock and cancel any wait currently in progress.
                                362                 :  */
                                363                 : static void
 4406 rhaas                     364 UIC           0 : SyncRepCancelWait(void)
                                365                 : {
 4406 rhaas                     366 LBC           0 :     LWLockAcquire(SyncRepLock, LW_EXCLUSIVE);
   81 andres                    367 UNC           0 :     if (!dlist_node_is_detached(&MyProc->syncRepLinks))
                                368               0 :         dlist_delete_thoroughly(&MyProc->syncRepLinks);
 4406 rhaas                     369 UIC           0 :     MyProc->syncRepState = SYNC_REP_NOT_WAITING;
                                370               0 :     LWLockRelease(SyncRepLock);
                                371               0 : }
 4406 rhaas                     372 ECB             : 
                                373                 : void
 4260 tgl                       374 GBC       11507 : SyncRepCleanupAtProcExit(void)
                                375                 : {
                                376                 :     /*
 1255 michael                   377 EUB             :      * First check if we are removed from the queue without the lock to not
                                378                 :      * slow down backend exit.
                                379                 :      */
   81 andres                    380 GNC       11507 :     if (!dlist_node_is_detached(&MyProc->syncRepLinks))
                                381                 :     {
 4417 simon                     382 LBC           0 :         LWLockAcquire(SyncRepLock, LW_EXCLUSIVE);
                                383                 : 
                                384                 :         /* maybe we have just been removed, so recheck */
   81 andres                    385 UNC           0 :         if (!dlist_node_is_detached(&MyProc->syncRepLinks))
                                386               0 :             dlist_delete_thoroughly(&MyProc->syncRepLinks);
                                387                 : 
 4417 simon                     388 UIC           0 :         LWLockRelease(SyncRepLock);
                                389                 :     }
 4417 simon                     390 GIC       11507 : }
                                391                 : 
                                392                 : /*
                                393                 :  * ===========================================================
                                394                 :  * Synchronous Replication functions for wal sender processes
 4417 simon                     395 ECB             :  * ===========================================================
                                396                 :  */
                                397                 : 
                                398                 : /*
                                399                 :  * Take any action required to initialise sync rep state from config
                                400                 :  * data. Called at WALSender startup and after each SIGHUP.
                                401                 :  */
                                402                 : void
 4417 simon                     403 CBC         509 : SyncRepInitConfig(void)
 4417 simon                     404 ECB             : {
                                405                 :     int         priority;
                                406                 : 
                                407                 :     /*
                                408                 :      * Determine if we are a potential sync standby and remember the result
                                409                 :      * for handling replies from standby.
                                410                 :      */
 4417 simon                     411 GIC         509 :     priority = SyncRepGetStandbyPriority();
                                412             509 :     if (MyWalSnd->sync_standby_priority != priority)
                                413                 :     {
 1086 tgl                       414 CBC          20 :         SpinLockAcquire(&MyWalSnd->mutex);
 4417 simon                     415 GIC          20 :         MyWalSnd->sync_standby_priority = priority;
 1086 tgl                       416              20 :         SpinLockRelease(&MyWalSnd->mutex);
                                417                 : 
 4417 simon                     418              20 :         ereport(DEBUG1,
                                419                 :                 (errmsg_internal("standby \"%s\" now has synchronous standby priority %u",
                                420                 :                                  application_name, priority)));
                                421                 :     }
                                422             509 : }
                                423                 : 
 4417 simon                     424 ECB             : /*
                                425                 :  * Update the LSNs on each queue based upon our latest state. This
 2559 fujii                     426                 :  * implements a simple policy of first-valid-sync-standby-releases-waiter.
                                427                 :  *
                                428                 :  * Other policies are possible, which would change what we do here and
                                429                 :  * perhaps also which information we store as well.
                                430                 :  */
                                431                 : void
 4417 simon                     432 CBC       90688 : SyncRepReleaseWaiters(void)
 4417 simon                     433 ECB             : {
 4417 simon                     434 CBC       90688 :     volatile WalSndCtlData *walsndctl = WalSndCtl;
                                435                 :     XLogRecPtr  writePtr;
                                436                 :     XLogRecPtr  flushPtr;
                                437                 :     XLogRecPtr  applyPtr;
                                438                 :     bool        got_recptr;
                                439                 :     bool        am_sync;
 4093 simon                     440 GIC       90688 :     int         numwrite = 0;
                                441           90688 :     int         numflush = 0;
 2567 rhaas                     442           90688 :     int         numapply = 0;
 4417 simon                     443 ECB             : 
                                444                 :     /*
                                445                 :      * If this WALSender is serving a standby that is not on the list of
 2596 alvherre                  446                 :      * potential sync standbys then we have nothing to do. If we are still
                                447                 :      * starting up, still running base backup or the current flush position is
 1592 michael                   448                 :      * still invalid, then leave quickly also.  Streaming or stopping WAL
                                449                 :      * senders are allowed to release waiters.
                                450                 :      */
 4417 simon                     451 GIC       90688 :     if (MyWalSnd->sync_standby_priority == 0 ||
 1592 michael                   452             127 :         (MyWalSnd->state != WALSNDSTATE_STREAMING &&
                                453              46 :          MyWalSnd->state != WALSNDSTATE_STOPPING) ||
 3931 magnus                    454             102 :         XLogRecPtrIsInvalid(MyWalSnd->flush))
                                455                 :     {
 2559 fujii                     456 CBC       90586 :         announce_next_takeover = true;
 4417 simon                     457 GIC       90586 :         return;
                                458                 :     }
                                459                 : 
                                460                 :     /*
                                461                 :      * We're a potential sync standby. Release waiters if there are enough
                                462                 :      * sync standbys and we are considered as sync.
                                463                 :      */
                                464             102 :     LWLockAcquire(SyncRepLock, LW_EXCLUSIVE);
                                465                 : 
 2559 fujii                     466 ECB             :     /*
                                467                 :      * Check whether we are a sync standby or not, and calculate the synced
                                468                 :      * positions among all sync standbys.  (Note: although this step does not
                                469                 :      * of itself require holding SyncRepLock, it seems like a good idea to do
                                470                 :      * it after acquiring the lock.  This ensures that the WAL pointers we use
                                471                 :      * to release waiters are newer than any previous execution of this
 1086 tgl                       472                 :      * routine used.)
                                473                 :      */
 2302 fujii                     474 CBC         102 :     got_recptr = SyncRepGetSyncRecPtr(&writePtr, &flushPtr, &applyPtr, &am_sync);
                                475                 : 
 2559 fujii                     476 ECB             :     /*
 2495 rhaas                     477                 :      * If we are managing a sync standby, though we weren't prior to this,
                                478                 :      * then announce we are now a sync standby.
                                479                 :      */
 2559 fujii                     480 GIC         102 :     if (announce_next_takeover && am_sync)
 2559 fujii                     481 EUB             :     {
 2559 fujii                     482 GIC           8 :         announce_next_takeover = false;
                                483                 : 
 2302                           484               8 :         if (SyncRepConfig->syncrep_method == SYNC_REP_PRIORITY)
                                485               8 :             ereport(LOG,
                                486                 :                     (errmsg("standby \"%s\" is now a synchronous standby with priority %u",
                                487                 :                             application_name, MyWalSnd->sync_standby_priority)));
                                488                 :         else
 2302 fujii                     489 UIC           0 :             ereport(LOG,
 2302 fujii                     490 ECB             :                     (errmsg("standby \"%s\" is now a candidate for quorum synchronous standby",
                                491                 :                             application_name)));
 2559 fujii                     492 EUB             :     }
 4417 simon                     493                 : 
                                494                 :     /*
                                495                 :      * If the number of sync standbys is less than requested or we aren't
                                496                 :      * managing a sync standby then just leave.
                                497                 :      */
 2302 fujii                     498 GIC         102 :     if (!got_recptr || !am_sync)
                                499                 :     {
 4417 simon                     500 UIC           0 :         LWLockRelease(SyncRepLock);
 2559 fujii                     501 LBC           0 :         announce_next_takeover = !am_sync;
 4417 simon                     502 UIC           0 :         return;
 4417 simon                     503 ECB             :     }
                                504                 : 
                                505                 :     /*
 3955 bruce                     506                 :      * Set the lsn first so that when we wake backends they will release up to
                                507                 :      * this location.
 4093 simon                     508                 :      */
 2559 fujii                     509 CBC         102 :     if (walsndctl->lsn[SYNC_REP_WAIT_WRITE] < writePtr)
                                510                 :     {
                                511              36 :         walsndctl->lsn[SYNC_REP_WAIT_WRITE] = writePtr;
 4093 simon                     512 GIC          36 :         numwrite = SyncRepWakeQueue(false, SYNC_REP_WAIT_WRITE);
 4093 simon                     513 ECB             :     }
 2559 fujii                     514 CBC         102 :     if (walsndctl->lsn[SYNC_REP_WAIT_FLUSH] < flushPtr)
                                515                 :     {
 2559 fujii                     516 GIC          41 :         walsndctl->lsn[SYNC_REP_WAIT_FLUSH] = flushPtr;
 4093 simon                     517 CBC          41 :         numflush = SyncRepWakeQueue(false, SYNC_REP_WAIT_FLUSH);
                                518                 :     }
 2559 fujii                     519             102 :     if (walsndctl->lsn[SYNC_REP_WAIT_APPLY] < applyPtr)
                                520                 :     {
 2559 fujii                     521 GIC          41 :         walsndctl->lsn[SYNC_REP_WAIT_APPLY] = applyPtr;
 2567 rhaas                     522              41 :         numapply = SyncRepWakeQueue(false, SYNC_REP_WAIT_APPLY);
                                523                 :     }
                                524                 : 
 4417 simon                     525             102 :     LWLockRelease(SyncRepLock);
                                526                 : 
 2559 fujii                     527             102 :     elog(DEBUG3, "released %d procs up to write %X/%X, %d procs up to flush %X/%X, %d procs up to apply %X/%X",
                                528                 :          numwrite, LSN_FORMAT_ARGS(writePtr),
                                529                 :          numflush, LSN_FORMAT_ARGS(flushPtr),
                                530                 :          numapply, LSN_FORMAT_ARGS(applyPtr));
                                531                 : }
                                532                 : 
                                533                 : /*
                                534                 :  * Calculate the synced Write, Flush and Apply positions among sync standbys.
                                535                 :  *
 2559 fujii                     536 ECB             :  * Return false if the number of sync standbys is less than
                                537                 :  * synchronous_standby_names specifies. Otherwise return true and
                                538                 :  * store the positions into *writePtr, *flushPtr and *applyPtr.
                                539                 :  *
                                540                 :  * On return, *am_sync is set to true if this walsender is connecting to
                                541                 :  * sync standby. Otherwise it's set to false.
                                542                 :  */
                                543                 : static bool
 2302 fujii                     544 CBC         102 : SyncRepGetSyncRecPtr(XLogRecPtr *writePtr, XLogRecPtr *flushPtr,
 2153 bruce                     545 ECB             :                      XLogRecPtr *applyPtr, bool *am_sync)
 2559 fujii                     546                 : {
 1086 tgl                       547                 :     SyncRepStandbyData *sync_standbys;
                                548                 :     int         num_standbys;
                                549                 :     int         i;
 1255 michael                   550                 : 
 1086 tgl                       551 EUB             :     /* Initialize default results */
 2559 fujii                     552 GIC         102 :     *writePtr = InvalidXLogRecPtr;
                                553             102 :     *flushPtr = InvalidXLogRecPtr;
 2559 fujii                     554 CBC         102 :     *applyPtr = InvalidXLogRecPtr;
 2559 fujii                     555 GIC         102 :     *am_sync = false;
                                556                 : 
 1086 tgl                       557 ECB             :     /* Quick out if not even configured to be synchronous */
 1086 tgl                       558 GIC         102 :     if (SyncRepConfig == NULL)
 1086 tgl                       559 LBC           0 :         return false;
                                560                 : 
 2559 fujii                     561 ECB             :     /* Get standbys that are considered as synchronous at this moment */
 1086 tgl                       562 CBC         102 :     num_standbys = SyncRepGetCandidateStandbys(&sync_standbys);
                                563                 : 
                                564                 :     /* Am I among the candidate sync standbys? */
 1086 tgl                       565 GIC         102 :     for (i = 0; i < num_standbys; i++)
                                566                 :     {
                                567             102 :         if (sync_standbys[i].is_me)
                                568                 :         {
                                569             102 :             *am_sync = true;
 1086 tgl                       570 CBC         102 :             break;
 1086 tgl                       571 ECB             :         }
                                572                 :     }
 4417 simon                     573 EUB             : 
                                574                 :     /*
                                575                 :      * Nothing more to do if we are not managing a sync standby or there are
                                576                 :      * not enough synchronous standbys.
                                577                 :      */
 2538 tgl                       578 GIC         102 :     if (!(*am_sync) ||
 1086                           579             102 :         num_standbys < SyncRepConfig->num_sync)
                                580                 :     {
 1086 tgl                       581 UIC           0 :         pfree(sync_standbys);
 2559 fujii                     582               0 :         return false;
                                583                 :     }
                                584                 : 
                                585                 :     /*
                                586                 :      * In a priority-based sync replication, the synced positions are the
                                587                 :      * oldest ones among sync standbys. In a quorum-based, they are the Nth
                                588                 :      * latest ones.
                                589                 :      *
 2153 bruce                     590 ECB             :      * SyncRepGetNthLatestSyncRecPtr() also can calculate the oldest
                                591                 :      * positions. But we use SyncRepGetOldestSyncRecPtr() for that calculation
                                592                 :      * because it's a bit more efficient.
                                593                 :      *
                                594                 :      * XXX If the numbers of current and requested sync standbys are the same,
                                595                 :      * we can use SyncRepGetOldestSyncRecPtr() to calculate the synced
                                596                 :      * positions even in a quorum-based sync replication.
 2559 fujii                     597 EUB             :      */
 2302 fujii                     598 GIC         102 :     if (SyncRepConfig->syncrep_method == SYNC_REP_PRIORITY)
 2559 fujii                     599 EUB             :     {
 2302 fujii                     600 GIC         102 :         SyncRepGetOldestSyncRecPtr(writePtr, flushPtr, applyPtr,
                                601                 :                                    sync_standbys, num_standbys);
 2302 fujii                     602 ECB             :     }
                                603                 :     else
                                604                 :     {
 2302 fujii                     605 UIC           0 :         SyncRepGetNthLatestSyncRecPtr(writePtr, flushPtr, applyPtr,
                                606                 :                                       sync_standbys, num_standbys,
 1086 tgl                       607               0 :                                       SyncRepConfig->num_sync);
                                608                 :     }
                                609                 : 
 1086 tgl                       610 CBC         102 :     pfree(sync_standbys);
 2302 fujii                     611 GIC         102 :     return true;
                                612                 : }
                                613                 : 
                                614                 : /*
                                615                 :  * Calculate the oldest Write, Flush and Apply positions among sync standbys.
                                616                 :  */
                                617                 : static void
 1086 tgl                       618             102 : SyncRepGetOldestSyncRecPtr(XLogRecPtr *writePtr,
                                619                 :                            XLogRecPtr *flushPtr,
                                620                 :                            XLogRecPtr *applyPtr,
                                621                 :                            SyncRepStandbyData *sync_standbys,
                                622                 :                            int num_standbys)
 2302 fujii                     623 ECB             : {
                                624                 :     int         i;
                                625                 : 
                                626                 :     /*
 2153 bruce                     627                 :      * Scan through all sync standbys and calculate the oldest Write, Flush
                                628                 :      * and Apply positions.  We assume *writePtr et al were initialized to
 1086 tgl                       629                 :      * InvalidXLogRecPtr.
 2302 fujii                     630                 :      */
 1086 tgl                       631 CBC         204 :     for (i = 0; i < num_standbys; i++)
 2302 fujii                     632 ECB             :     {
 1086 tgl                       633 CBC         102 :         XLogRecPtr  write = sync_standbys[i].write;
                                634             102 :         XLogRecPtr  flush = sync_standbys[i].flush;
 1086 tgl                       635 GIC         102 :         XLogRecPtr  apply = sync_standbys[i].apply;
 2559 fujii                     636 ECB             : 
 2559 fujii                     637 GIC         102 :         if (XLogRecPtrIsInvalid(*writePtr) || *writePtr > write)
                                638             102 :             *writePtr = write;
                                639             102 :         if (XLogRecPtrIsInvalid(*flushPtr) || *flushPtr > flush)
                                640             102 :             *flushPtr = flush;
                                641             102 :         if (XLogRecPtrIsInvalid(*applyPtr) || *applyPtr > apply)
                                642             102 :             *applyPtr = apply;
 2559 fujii                     643 EUB             :     }
 2302 fujii                     644 GIC         102 : }
                                645                 : 
                                646                 : /*
                                647                 :  * Calculate the Nth latest Write, Flush and Apply positions among sync
                                648                 :  * standbys.
                                649                 :  */
                                650                 : static void
 1086 tgl                       651 UIC           0 : SyncRepGetNthLatestSyncRecPtr(XLogRecPtr *writePtr,
                                652                 :                               XLogRecPtr *flushPtr,
                                653                 :                               XLogRecPtr *applyPtr,
                                654                 :                               SyncRepStandbyData *sync_standbys,
                                655                 :                               int num_standbys,
 1086 tgl                       656 EUB             :                               uint8 nth)
                                657                 : {
 2153 bruce                     658                 :     XLogRecPtr *write_array;
                                659                 :     XLogRecPtr *flush_array;
                                660                 :     XLogRecPtr *apply_array;
                                661                 :     int         i;
 2302 fujii                     662                 : 
                                663                 :     /* Should have enough candidates, or somebody messed up */
 1086 tgl                       664 UBC           0 :     Assert(nth > 0 && nth <= num_standbys);
 2302 fujii                     665 EUB             : 
 1086 tgl                       666 UBC           0 :     write_array = (XLogRecPtr *) palloc(sizeof(XLogRecPtr) * num_standbys);
 1086 tgl                       667 UIC           0 :     flush_array = (XLogRecPtr *) palloc(sizeof(XLogRecPtr) * num_standbys);
                                668               0 :     apply_array = (XLogRecPtr *) palloc(sizeof(XLogRecPtr) * num_standbys);
                                669                 : 
 1086 tgl                       670 UBC           0 :     for (i = 0; i < num_standbys; i++)
 1086 tgl                       671 EUB             :     {
 1086 tgl                       672 UBC           0 :         write_array[i] = sync_standbys[i].write;
 1086 tgl                       673 UIC           0 :         flush_array[i] = sync_standbys[i].flush;
                                674               0 :         apply_array[i] = sync_standbys[i].apply;
 2302 fujii                     675 EUB             :     }
                                676                 : 
 2181                           677                 :     /* Sort each array in descending order */
 1086 tgl                       678 UIC           0 :     qsort(write_array, num_standbys, sizeof(XLogRecPtr), cmp_lsn);
 1086 tgl                       679 UBC           0 :     qsort(flush_array, num_standbys, sizeof(XLogRecPtr), cmp_lsn);
                                680               0 :     qsort(apply_array, num_standbys, sizeof(XLogRecPtr), cmp_lsn);
 2302 fujii                     681 EUB             : 
                                682                 :     /* Get Nth latest Write, Flush, Apply positions */
 2302 fujii                     683 UIC           0 :     *writePtr = write_array[nth - 1];
                                684               0 :     *flushPtr = flush_array[nth - 1];
                                685               0 :     *applyPtr = apply_array[nth - 1];
                                686                 : 
                                687               0 :     pfree(write_array);
 2302 fujii                     688 UBC           0 :     pfree(flush_array);
 2302 fujii                     689 UIC           0 :     pfree(apply_array);
 2302 fujii                     690 UBC           0 : }
 2302 fujii                     691 EUB             : 
                                692                 : /*
                                693                 :  * Compare lsn in order to sort array in descending order.
                                694                 :  */
                                695                 : static int
 2302 fujii                     696 UBC           0 : cmp_lsn(const void *a, const void *b)
                                697                 : {
 2153 bruce                     698               0 :     XLogRecPtr  lsn1 = *((const XLogRecPtr *) a);
 2153 bruce                     699 UIC           0 :     XLogRecPtr  lsn2 = *((const XLogRecPtr *) b);
                                700                 : 
 2302 fujii                     701               0 :     if (lsn1 > lsn2)
                                702               0 :         return -1;
                                703               0 :     else if (lsn1 == lsn2)
                                704               0 :         return 0;
                                705                 :     else
                                706               0 :         return 1;
                                707                 : }
                                708                 : 
 2559 fujii                     709 ECB             : /*
                                710                 :  * Return data about walsenders that are candidates to be sync standbys.
                                711                 :  *
                                712                 :  * *standbys is set to a palloc'd array of structs of per-walsender data,
                                713                 :  * and the number of valid entries (candidate sync senders) is returned.
                                714                 :  * (This might be more or fewer than num_sync; caller must check.)
                                715                 :  */
 1086 tgl                       716                 : int
 1086 tgl                       717 GIC         655 : SyncRepGetCandidateStandbys(SyncRepStandbyData **standbys)
                                718                 : {
 1086 tgl                       719 ECB             :     int         i;
                                720                 :     int         n;
                                721                 : 
                                722                 :     /* Create result array */
 1086 tgl                       723 CBC         655 :     *standbys = (SyncRepStandbyData *)
                                724             655 :         palloc(max_wal_senders * sizeof(SyncRepStandbyData));
                                725                 : 
                                726                 :     /* Quick exit if sync replication is not requested */
 2302 fujii                     727 GIC         655 :     if (SyncRepConfig == NULL)
 1086 tgl                       728             536 :         return 0;
                                729                 : 
                                730                 :     /* Collect raw data from shared memory */
 1086 tgl                       731 CBC         119 :     n = 0;
 2302 fujii                     732            1309 :     for (i = 0; i < max_wal_senders; i++)
                                733                 :     {
 1086 tgl                       734 ECB             :         volatile WalSnd *walsnd;    /* Use volatile pointer to prevent code
                                735                 :                                      * rearrangement */
                                736                 :         SyncRepStandbyData *stby;
                                737                 :         WalSndState state;      /* not included in SyncRepStandbyData */
 2109 alvherre                  738                 : 
 2302 fujii                     739 CBC        1190 :         walsnd = &WalSndCtl->walsnds[i];
 1086 tgl                       740            1190 :         stby = *standbys + n;
 2302 fujii                     741 ECB             : 
 2109 alvherre                  742 GIC        1190 :         SpinLockAcquire(&walsnd->mutex);
 1086 tgl                       743            1190 :         stby->pid = walsnd->pid;
 2109 alvherre                  744 CBC        1190 :         state = walsnd->state;
 1086 tgl                       745            1190 :         stby->write = walsnd->write;
 1086 tgl                       746 GIC        1190 :         stby->flush = walsnd->flush;
                                747            1190 :         stby->apply = walsnd->apply;
 1086 tgl                       748 CBC        1190 :         stby->sync_standby_priority = walsnd->sync_standby_priority;
 2109 alvherre                  749 GIC        1190 :         SpinLockRelease(&walsnd->mutex);
 2109 alvherre                  750 EUB             : 
                                751                 :         /* Must be active */
 1086 tgl                       752 GIC        1190 :         if (stby->pid == 0)
 2302 fujii                     753 CBC        1043 :             continue;
 2302 fujii                     754 ECB             : 
                                755                 :         /* Must be streaming or stopping */
 1592 michael                   756 GIC         147 :         if (state != WALSNDSTATE_STREAMING &&
 1592 michael                   757 ECB             :             state != WALSNDSTATE_STOPPING)
 2302 fujii                     758 UBC           0 :             continue;
                                759                 : 
                                760                 :         /* Must be synchronous */
 1086 tgl                       761 CBC         147 :         if (stby->sync_standby_priority == 0)
 2302 fujii                     762               7 :             continue;
 2302 fujii                     763 ECB             : 
                                764                 :         /* Must have a valid flush position */
 1086 tgl                       765 GIC         140 :         if (XLogRecPtrIsInvalid(stby->flush))
 2302 fujii                     766 UIC           0 :             continue;
                                767                 : 
                                768                 :         /* OK, it's a candidate */
 1086 tgl                       769 GIC         140 :         stby->walsnd_index = i;
                                770             140 :         stby->is_me = (walsnd == MyWalSnd);
 1086 tgl                       771 CBC         140 :         n++;
 2302 fujii                     772 ECB             :     }
                                773                 : 
                                774                 :     /*
 1086 tgl                       775                 :      * In quorum mode, we return all the candidates.  In priority mode, if we
                                776                 :      * have too many candidates then return only the num_sync ones of highest
                                777                 :      * priority.
 2559 fujii                     778                 :      */
 1086 tgl                       779 GIC         119 :     if (SyncRepConfig->syncrep_method == SYNC_REP_PRIORITY &&
                                780             118 :         n > SyncRepConfig->num_sync)
 2559 fujii                     781 ECB             :     {
                                782                 :         /* Sort by priority ... */
 1086 tgl                       783 GIC           8 :         qsort(*standbys, n, sizeof(SyncRepStandbyData),
                                784                 :               standby_priority_comparator);
                                785                 :         /* ... then report just the first num_sync ones */
                                786               8 :         n = SyncRepConfig->num_sync;
                                787                 :     }
 2559 fujii                     788 ECB             : 
 1086 tgl                       789 GIC         119 :     return n;
 1086 tgl                       790 ECB             : }
 2559 fujii                     791                 : 
                                792                 : /*
                                793                 :  * qsort comparator to sort SyncRepStandbyData entries by priority
 1086 tgl                       794                 :  */
                                795                 : static int
 1086 tgl                       796 GIC          19 : standby_priority_comparator(const void *a, const void *b)
                                797                 : {
                                798              19 :     const SyncRepStandbyData *sa = (const SyncRepStandbyData *) a;
                                799              19 :     const SyncRepStandbyData *sb = (const SyncRepStandbyData *) b;
                                800                 : 
                                801                 :     /* First, sort by increasing priority value */
 1086 tgl                       802 CBC          19 :     if (sa->sync_standby_priority != sb->sync_standby_priority)
 1086 tgl                       803 GIC           9 :         return sa->sync_standby_priority - sb->sync_standby_priority;
                                804                 : 
                                805                 :     /*
                                806                 :      * We might have equal priority values; arbitrarily break ties by position
                                807                 :      * in the WALSnd array.  (This is utterly bogus, since that is arrival
                                808                 :      * order dependent, but there are regression tests that rely on it.)
                                809                 :      */
                                810              10 :     return sa->walsnd_index - sb->walsnd_index;
                                811                 : }
                                812                 : 
                                813                 : 
                                814                 : /*
 4417 simon                     815 ECB             :  * Check if we are in the list of sync standbys, and if so, determine
                                816                 :  * priority sequence. Return priority if set, or zero to indicate that
                                817                 :  * we are not a potential sync standby.
                                818                 :  *
                                819                 :  * Compare the parameter SyncRepStandbyNames against the application_name
                                820                 :  * for this WALSender, or allow any name if we find a wildcard "*".
                                821                 :  */
                                822                 : static int
 4417 simon                     823 GIC         509 : SyncRepGetStandbyPriority(void)
                                824                 : {
 2538 tgl                       825 ECB             :     const char *standby_name;
                                826                 :     int         priority;
 4417 simon                     827 GIC         509 :     bool        found = false;
 4417 simon                     828 ECB             : 
 4282                           829                 :     /*
                                830                 :      * Since synchronous cascade replication is not allowed, we always set the
 3955 bruce                     831                 :      * priority of cascading walsender to zero.
 4282 simon                     832                 :      */
 4282 simon                     833 GIC         509 :     if (am_cascading_walsender)
 4282 simon                     834 CBC          33 :         return 0;
 4282 simon                     835 ECB             : 
 2538 tgl                       836 GIC         476 :     if (!SyncStandbysDefined() || SyncRepConfig == NULL)
 4417 simon                     837 CBC         453 :         return 0;
 4417 simon                     838 ECB             : 
 2538 tgl                       839 GIC          23 :     standby_name = SyncRepConfig->member_names;
 2538 tgl                       840 CBC          31 :     for (priority = 1; priority <= SyncRepConfig->nmembers; priority++)
                                841                 :     {
 4417 simon                     842 GIC          30 :         if (pg_strcasecmp(standby_name, application_name) == 0 ||
 2538 tgl                       843 CBC          18 :             strcmp(standby_name, "*") == 0)
 4417 simon                     844 ECB             :         {
 4417 simon                     845 GIC          22 :             found = true;
                                846              22 :             break;
                                847                 :         }
 2538 tgl                       848               8 :         standby_name += strlen(standby_name) + 1;
                                849                 :     }
 4417 simon                     850 ECB             : 
 2174 fujii                     851 GIC          23 :     if (!found)
                                852               1 :         return 0;
                                853                 : 
                                854                 :     /*
                                855                 :      * In quorum-based sync replication, all the standbys in the list have the
                                856                 :      * same priority, one.
                                857                 :      */
                                858              22 :     return (SyncRepConfig->syncrep_method == SYNC_REP_PRIORITY) ? priority : 1;
                                859                 : }
                                860                 : 
                                861                 : /*
 3260 bruce                     862 ECB             :  * Walk the specified queue from head.  Set the state of any backends that
                                863                 :  * need to be woken, remove them from the queue, and then wake them.
 4093 simon                     864                 :  * Pass all = true to wake whole queue; otherwise, just wake up to
                                865                 :  * the walsender's LSN.
                                866                 :  *
                                867                 :  * The caller must hold SyncRepLock in exclusive mode.
 4417                           868                 :  */
 2936 ishii                     869                 : static int
 4093 simon                     870 CBC         118 : SyncRepWakeQueue(bool all, int mode)
                                871                 : {
 4417                           872             118 :     volatile WalSndCtlData *walsndctl = WalSndCtl;
 4382 bruce                     873 GIC         118 :     int         numprocs = 0;
                                874                 :     dlist_mutable_iter iter;
                                875                 : 
 4093 simon                     876             118 :     Assert(mode >= 0 && mode < NUM_SYNC_REP_WAIT_MODE);
 1255 michael                   877             118 :     Assert(LWLockHeldByMeInMode(SyncRepLock, LW_EXCLUSIVE));
 4093 simon                     878 CBC         118 :     Assert(SyncRepQueueIsOrderedByLSN(mode));
 4417 simon                     879 ECB             : 
   81 andres                    880 GNC         132 :     dlist_foreach_modify(iter, &WalSndCtl->SyncRepQueue[mode])
                                881                 :     {
                                882              19 :         PGPROC *proc = dlist_container(PGPROC, syncRepLinks, iter.cur);
                                883                 : 
                                884                 :         /*
                                885                 :          * Assume the queue is ordered by LSN
                                886                 :          */
 3754 alvherre                  887 GIC          19 :         if (!all && walsndctl->lsn[mode] < proc->waitLSN)
 4417 simon                     888               5 :             return numprocs;
 4417 simon                     889 ECB             : 
                                890                 :         /*
                                891                 :          * Remove from queue.
                                892                 :          */
   81 andres                    893 GNC          14 :         dlist_delete_thoroughly(&proc->syncRepLinks);
                                894                 : 
                                895                 :         /*
 2097 heikki.linnakangas        896 ECB             :          * SyncRepWaitForLSN() reads syncRepState without holding the lock, so
                                897                 :          * make sure that it sees the queue link being removed before the
                                898                 :          * syncRepState change.
                                899                 :          */
 2097 heikki.linnakangas        900 GIC          14 :         pg_write_barrier();
                                901                 : 
                                902                 :         /*
                                903                 :          * Set state to complete; see SyncRepWaitForLSN() for discussion of
                                904                 :          * the various states.
                                905                 :          */
   81 andres                    906 GNC          14 :         proc->syncRepState = SYNC_REP_WAIT_COMPLETE;
 4417 simon                     907 ECB             : 
                                908                 :         /*
                                909                 :          * Wake only when we have set state and removed from queue.
                                910                 :          */
   81 andres                    911 GNC          14 :         SetLatch(&(proc->procLatch));
                                912                 : 
 4417 simon                     913 CBC          14 :         numprocs++;
                                914                 :     }
                                915                 : 
 4417 simon                     916 GIC         113 :     return numprocs;
                                917                 : }
                                918                 : 
                                919                 : /*
 4087 simon                     920 ECB             :  * The checkpointer calls this as needed to update the shared
                                921                 :  * sync_standbys_defined flag, so that backends don't remain permanently wedged
                                922                 :  * if synchronous_standby_names is unset.  It's safe to check the current value
                                923                 :  * without the lock, because it's only ever updated by one process.  But we
 4406 rhaas                     924 EUB             :  * must take the lock to change it.
                                925                 :  */
                                926                 : void
 4406 rhaas                     927 GIC         391 : SyncRepUpdateSyncStandbysDefined(void)
                                928                 : {
                                929             391 :     bool        sync_standbys_defined = SyncStandbysDefined();
                                930                 : 
                                931             391 :     if (sync_standbys_defined != WalSndCtl->sync_standbys_defined)
                                932                 :     {
                                933               8 :         LWLockAcquire(SyncRepLock, LW_EXCLUSIVE);
                                934                 : 
 4406 rhaas                     935 ECB             :         /*
                                936                 :          * If synchronous_standby_names has been reset to empty, it's futile
 1256 michael                   937                 :          * for backends to continue waiting.  Since the user no longer wants
                                938                 :          * synchronous replication, we'd better wake them up.
 4406 rhaas                     939                 :          */
 4406 rhaas                     940 GIC           8 :         if (!sync_standbys_defined)
                                941                 :         {
                                942                 :             int         i;
 4093 simon                     943 ECB             : 
 4093 simon                     944 UIC           0 :             for (i = 0; i < NUM_SYNC_REP_WAIT_MODE; i++)
                                945               0 :                 SyncRepWakeQueue(true, i);
                                946                 :         }
                                947                 : 
 4406 rhaas                     948 ECB             :         /*
                                949                 :          * Only allow people to join the queue when there are synchronous
                                950                 :          * standbys defined.  Without this interlock, there's a race
                                951                 :          * condition: we might wake up all the current waiters; then, some
                                952                 :          * backend that hasn't yet reloaded its config might go to sleep on
                                953                 :          * the queue (and never wake up).  This prevents that.
                                954                 :          */
 4406 rhaas                     955 GIC           8 :         WalSndCtl->sync_standbys_defined = sync_standbys_defined;
                                956                 : 
                                957               8 :         LWLockRelease(SyncRepLock);
                                958                 :     }
                                959             391 : }
 4406 rhaas                     960 ECB             : 
 4417 simon                     961 EUB             : #ifdef USE_ASSERT_CHECKING
                                962                 : static bool
 4093 simon                     963 CBC         152 : SyncRepQueueIsOrderedByLSN(int mode)
                                964                 : {
 4382 bruce                     965 ECB             :     XLogRecPtr  lastLSN;
                                966                 :     dlist_iter  iter;
                                967                 : 
 4093 simon                     968 GIC         152 :     Assert(mode >= 0 && mode < NUM_SYNC_REP_WAIT_MODE);
                                969                 : 
 3941 heikki.linnakangas        970             152 :     lastLSN = 0;
                                971                 : 
   81 andres                    972 GNC         205 :     dlist_foreach(iter, &WalSndCtl->SyncRepQueue[mode])
 4417 simon                     973 ECB             :     {
   81 andres                    974 GNC          53 :         PGPROC     *proc = dlist_container(PGPROC, syncRepLinks, iter.cur);
                                975                 : 
                                976                 :         /*
 4382 bruce                     977 ECB             :          * Check the queue is ordered by LSN and that multiple procs don't
                                978                 :          * have matching LSNs
                                979                 :          */
 3754 alvherre                  980 GIC          53 :         if (proc->waitLSN <= lastLSN)
 4417 simon                     981 UIC           0 :             return false;
                                982                 : 
 4417 simon                     983 CBC          53 :         lastLSN = proc->waitLSN;
 4417 simon                     984 ECB             :     }
                                985                 : 
 4417 simon                     986 GIC         152 :     return true;
 4417 simon                     987 ECB             : }
                                988                 : #endif
 4417 simon                     989 EUB             : 
                                990                 : /*
                                991                 :  * ===========================================================
                                992                 :  * Synchronous Replication functions executed by any process
                                993                 :  * ===========================================================
                                994                 :  */
                                995                 : 
                                996                 : bool
 4385 tgl                       997 CBC        1926 : check_synchronous_standby_names(char **newval, void **extra, GucSource source)
                                998                 : {
 2559 fujii                     999 GBC        1926 :     if (*newval != NULL && (*newval)[0] != '\0')
 4417 simon                    1000              67 :     {
 2538 tgl                      1001 EUB             :         int         parse_rc;
                               1002                 :         SyncRepConfigData *pconf;
                               1003                 : 
                               1004                 :         /* Reset communication variables to ensure a fresh start */
 2538 tgl                      1005 GIC          67 :         syncrep_parse_result = NULL;
 2538 tgl                      1006 CBC          67 :         syncrep_parse_error_msg = NULL;
 2538 tgl                      1007 ECB             : 
 2538 tgl                      1008 EUB             :         /* Parse the synchronous_standby_names string */
 2559 fujii                    1009 CBC          67 :         syncrep_scanner_init(*newval);
 2559 fujii                    1010 GIC          67 :         parse_rc = syncrep_yyparse();
 2559 fujii                    1011 CBC          67 :         syncrep_scanner_finish();
                               1012                 : 
 2538 tgl                      1013 GIC          67 :         if (parse_rc != 0 || syncrep_parse_result == NULL)
                               1014                 :         {
 2559 fujii                    1015 UIC           0 :             GUC_check_errcode(ERRCODE_SYNTAX_ERROR);
 2538 tgl                      1016               0 :             if (syncrep_parse_error_msg)
                               1017               0 :                 GUC_check_errdetail("%s", syncrep_parse_error_msg);
                               1018                 :             else
                               1019               0 :                 GUC_check_errdetail("synchronous_standby_names parser failed");
 2559 fujii                    1020               0 :             return false;
                               1021                 :         }
 2559 fujii                    1022 ECB             : 
 2304 fujii                    1023 GIC          67 :         if (syncrep_parse_result->num_sync <= 0)
 2304 fujii                    1024 ECB             :         {
 2304 fujii                    1025 UIC           0 :             GUC_check_errmsg("number of synchronous standbys (%d) must be greater than zero",
                               1026               0 :                              syncrep_parse_result->num_sync);
                               1027               0 :             return false;
 2304 fujii                    1028 ECB             :         }
                               1029                 : 
                               1030                 :         /* GUC extra value must be guc_malloc'd, not palloc'd */
 2538 tgl                      1031                 :         pconf = (SyncRepConfigData *)
  177 tgl                      1032 GNC          67 :             guc_malloc(LOG, syncrep_parse_result->config_size);
 2538 tgl                      1033 GIC          67 :         if (pconf == NULL)
 2538 tgl                      1034 LBC           0 :             return false;
 2538 tgl                      1035 GIC          67 :         memcpy(pconf, syncrep_parse_result, syncrep_parse_result->config_size);
 2538 tgl                      1036 ECB             : 
 2538 tgl                      1037 GIC          67 :         *extra = (void *) pconf;
 2538 tgl                      1038 EUB             : 
 2559 fujii                    1039                 :         /*
 2538 tgl                      1040                 :          * We need not explicitly clean up syncrep_parse_result.  It, and any
 2538 tgl                      1041 ECB             :          * other cruft generated during parsing, will be freed when the
                               1042                 :          * current memory context is deleted.  (This code is generally run in
                               1043                 :          * a short-lived context used for config file processing, so that will
 2538 tgl                      1044 EUB             :          * not be very long.)
 2559 fujii                    1045                 :          */
                               1046                 :     }
 2538 tgl                      1047 ECB             :     else
 2538 tgl                      1048 CBC        1859 :         *extra = NULL;
 4417 simon                    1049 ECB             : 
 4385 tgl                      1050 GIC        1926 :     return true;
 4417 simon                    1051 ECB             : }
                               1052                 : 
                               1053                 : void
 2538 tgl                      1054 GIC        1916 : assign_synchronous_standby_names(const char *newval, void *extra)
                               1055                 : {
                               1056            1916 :     SyncRepConfig = (SyncRepConfigData *) extra;
                               1057            1916 : }
                               1058                 : 
                               1059                 : void
 4093 simon                    1060            2335 : assign_synchronous_commit(int newval, void *extra)
                               1061                 : {
                               1062            2335 :     switch (newval)
                               1063                 :     {
 4093 simon                    1064 UIC           0 :         case SYNCHRONOUS_COMMIT_REMOTE_WRITE:
                               1065               0 :             SyncRepWaitMode = SYNC_REP_WAIT_WRITE;
                               1066               0 :             break;
 4093 simon                    1067 GIC        1976 :         case SYNCHRONOUS_COMMIT_REMOTE_FLUSH:
                               1068            1976 :             SyncRepWaitMode = SYNC_REP_WAIT_FLUSH;
                               1069            1976 :             break;
 2567 rhaas                    1070 UIC           0 :         case SYNCHRONOUS_COMMIT_REMOTE_APPLY:
                               1071               0 :             SyncRepWaitMode = SYNC_REP_WAIT_APPLY;
                               1072               0 :             break;
 4093 simon                    1073 GIC         359 :         default:
                               1074             359 :             SyncRepWaitMode = SYNC_REP_NO_WAIT;
                               1075             359 :             break;
                               1076                 :     }
                               1077            2335 : }
        

Generated by: LCOV version v1.16-55-g56c0a2a