LCOV - differential code coverage report
Current view: top level - src/backend/storage/ipc - sinvaladt.c (source / functions) Coverage Total Hit UIC UBC GIC GNC CBC EUB ECB DCB
Current: Differential Code Coverage HEAD vs 15 Lines: 95.9 % 193 185 3 5 77 5 103 3 81 1
Current Date: 2023-04-08 17:13:01 Functions: 100.0 % 10 10 4 1 5 4 1
Baseline: 15 Line coverage date bins:
Baseline Date: 2023-04-08 15:09:40 (60,120] days: 100.0 % 5 5 5
Legend: Lines: hit not hit (240..) days: 95.7 % 188 180 3 5 77 103 3 78
Function coverage date bins:
(60,120] days: 100.0 % 1 1 1
(240..) days: 69.2 % 13 9 4 5 4

 Age         Owner                  TLA  Line data    Source code
                                  1                 : /*-------------------------------------------------------------------------
                                  2                 :  *
                                  3                 :  * sinvaladt.c
                                  4                 :  *    POSTGRES shared cache invalidation data manager.
                                  5                 :  *
                                  6                 :  * Portions Copyright (c) 1996-2023, PostgreSQL Global Development Group
                                  7                 :  * Portions Copyright (c) 1994, Regents of the University of California
                                  8                 :  *
                                  9                 :  *
                                 10                 :  * IDENTIFICATION
                                 11                 :  *    src/backend/storage/ipc/sinvaladt.c
                                 12                 :  *
                                 13                 :  *-------------------------------------------------------------------------
                                 14                 :  */
                                 15                 : #include "postgres.h"
                                 16                 : 
                                 17                 : #include <signal.h>
                                 18                 : #include <unistd.h>
                                 19                 : 
                                 20                 : #include "access/transam.h"
                                 21                 : #include "miscadmin.h"
                                 22                 : #include "storage/backendid.h"
                                 23                 : #include "storage/ipc.h"
                                 24                 : #include "storage/proc.h"
                                 25                 : #include "storage/procsignal.h"
                                 26                 : #include "storage/shmem.h"
                                 27                 : #include "storage/sinvaladt.h"
                                 28                 : #include "storage/spin.h"
                                 29                 : 
                                 30                 : /*
                                 31                 :  * Conceptually, the shared cache invalidation messages are stored in an
                                 32                 :  * infinite array, where maxMsgNum is the next array subscript to store a
                                 33                 :  * submitted message in, minMsgNum is the smallest array subscript containing
                                 34                 :  * a message not yet read by all backends, and we always have maxMsgNum >=
                                 35                 :  * minMsgNum.  (They are equal when there are no messages pending.)  For each
                                 36                 :  * active backend, there is a nextMsgNum pointer indicating the next message it
                                 37                 :  * needs to read; we have maxMsgNum >= nextMsgNum >= minMsgNum for every
                                 38                 :  * backend.
                                 39                 :  *
                                 40                 :  * (In the current implementation, minMsgNum is a lower bound for the
                                 41                 :  * per-process nextMsgNum values, but it isn't rigorously kept equal to the
                                 42                 :  * smallest nextMsgNum --- it may lag behind.  We only update it when
                                 43                 :  * SICleanupQueue is called, and we try not to do that often.)
                                 44                 :  *
                                 45                 :  * In reality, the messages are stored in a circular buffer of MAXNUMMESSAGES
                                 46                 :  * entries.  We translate MsgNum values into circular-buffer indexes by
                                 47                 :  * computing MsgNum % MAXNUMMESSAGES (this should be fast as long as
                                 48                 :  * MAXNUMMESSAGES is a constant and a power of 2).  As long as maxMsgNum
                                 49                 :  * doesn't exceed minMsgNum by more than MAXNUMMESSAGES, we have enough space
                                 50                 :  * in the buffer.  If the buffer does overflow, we recover by setting the
                                 51                 :  * "reset" flag for each backend that has fallen too far behind.  A backend
                                 52                 :  * that is in "reset" state is ignored while determining minMsgNum.  When
                                 53                 :  * it does finally attempt to receive inval messages, it must discard all
                                 54                 :  * its invalidatable state, since it won't know what it missed.
                                 55                 :  *
                                 56                 :  * To reduce the probability of needing resets, we send a "catchup" interrupt
                                 57                 :  * to any backend that seems to be falling unreasonably far behind.  The
                                 58                 :  * normal behavior is that at most one such interrupt is in flight at a time;
                                 59                 :  * when a backend completes processing a catchup interrupt, it executes
                                 60                 :  * SICleanupQueue, which will signal the next-furthest-behind backend if
                                 61                 :  * needed.  This avoids undue contention from multiple backends all trying
                                 62                 :  * to catch up at once.  However, the furthest-back backend might be stuck
                                 63                 :  * in a state where it can't catch up.  Eventually it will get reset, so it
                                 64                 :  * won't cause any more problems for anyone but itself.  But we don't want
                                 65                 :  * to find that a bunch of other backends are now too close to the reset
                                 66                 :  * threshold to be saved.  So SICleanupQueue is designed to occasionally
                                 67                 :  * send extra catchup interrupts as the queue gets fuller, to backends that
                                 68                 :  * are far behind and haven't gotten one yet.  As long as there aren't a lot
                                 69                 :  * of "stuck" backends, we won't need a lot of extra interrupts, since ones
                                 70                 :  * that aren't stuck will propagate their interrupts to the next guy.
                                 71                 :  *
                                 72                 :  * We would have problems if the MsgNum values overflow an integer, so
                                 73                 :  * whenever minMsgNum exceeds MSGNUMWRAPAROUND, we subtract MSGNUMWRAPAROUND
                                 74                 :  * from all the MsgNum variables simultaneously.  MSGNUMWRAPAROUND can be
                                 75                 :  * large so that we don't need to do this often.  It must be a multiple of
                                 76                 :  * MAXNUMMESSAGES so that the existing circular-buffer entries don't need
                                 77                 :  * to be moved when we do it.
                                 78                 :  *
                                 79                 :  * Access to the shared sinval array is protected by two locks, SInvalReadLock
                                 80                 :  * and SInvalWriteLock.  Readers take SInvalReadLock in shared mode; this
                                 81                 :  * authorizes them to modify their own ProcState but not to modify or even
                                 82                 :  * look at anyone else's.  When we need to perform array-wide updates,
                                 83                 :  * such as in SICleanupQueue, we take SInvalReadLock in exclusive mode to
                                 84                 :  * lock out all readers.  Writers take SInvalWriteLock (always in exclusive
                                 85                 :  * mode) to serialize adding messages to the queue.  Note that a writer
                                 86                 :  * can operate in parallel with one or more readers, because the writer
                                 87                 :  * has no need to touch anyone's ProcState, except in the infrequent cases
                                 88                 :  * when SICleanupQueue is needed.  The only point of overlap is that
                                 89                 :  * the writer wants to change maxMsgNum while readers need to read it.
                                 90                 :  * We deal with that by having a spinlock that readers must take for just
                                 91                 :  * long enough to read maxMsgNum, while writers take it for just long enough
                                 92                 :  * to write maxMsgNum.  (The exact rule is that you need the spinlock to
                                 93                 :  * read maxMsgNum if you are not holding SInvalWriteLock, and you need the
                                 94                 :  * spinlock to write maxMsgNum unless you are holding both locks.)
                                 95                 :  *
                                 96                 :  * Note: since maxMsgNum is an int and hence presumably atomically readable/
                                 97                 :  * writable, the spinlock might seem unnecessary.  The reason it is needed
                                 98                 :  * is to provide a memory barrier: we need to be sure that messages written
                                 99                 :  * to the array are actually there before maxMsgNum is increased, and that
                                100                 :  * readers will see that data after fetching maxMsgNum.  Multiprocessors
                                101                 :  * that have weak memory-ordering guarantees can fail without the memory
                                102                 :  * barrier instructions that are included in the spinlock sequences.
                                103                 :  */
                                104                 : 
                                105                 : 
                                106                 : /*
                                107                 :  * Configurable parameters.
                                108                 :  *
                                109                 :  * MAXNUMMESSAGES: max number of shared-inval messages we can buffer.
                                110                 :  * Must be a power of 2 for speed.
                                111                 :  *
                                112                 :  * MSGNUMWRAPAROUND: how often to reduce MsgNum variables to avoid overflow.
                                113                 :  * Must be a multiple of MAXNUMMESSAGES.  Should be large.
                                114                 :  *
                                115                 :  * CLEANUP_MIN: the minimum number of messages that must be in the buffer
                                116                 :  * before we bother to call SICleanupQueue.
                                117                 :  *
                                118                 :  * CLEANUP_QUANTUM: how often (in messages) to call SICleanupQueue once
                                119                 :  * we exceed CLEANUP_MIN.  Should be a power of 2 for speed.
                                120                 :  *
                                121                 :  * SIG_THRESHOLD: the minimum number of messages a backend must have fallen
                                122                 :  * behind before we'll send it PROCSIG_CATCHUP_INTERRUPT.
                                123                 :  *
                                124                 :  * WRITE_QUANTUM: the max number of messages to push into the buffer per
                                125                 :  * iteration of SIInsertDataEntries.  Noncritical but should be less than
                                126                 :  * CLEANUP_QUANTUM, because we only consider calling SICleanupQueue once
                                127                 :  * per iteration.
                                128                 :  */
                                129                 : 
                                130                 : #define MAXNUMMESSAGES 4096
                                131                 : #define MSGNUMWRAPAROUND (MAXNUMMESSAGES * 262144)
                                132                 : #define CLEANUP_MIN (MAXNUMMESSAGES / 2)
                                133                 : #define CLEANUP_QUANTUM (MAXNUMMESSAGES / 16)
                                134                 : #define SIG_THRESHOLD (MAXNUMMESSAGES / 2)
                                135                 : #define WRITE_QUANTUM 64
                                136                 : 
                                137                 : /* Per-backend state in shared invalidation structure */
                                138                 : typedef struct ProcState
                                139                 : {
                                140                 :     /* procPid is zero in an inactive ProcState array entry. */
                                141                 :     pid_t       procPid;        /* PID of backend, for signaling */
                                142                 :     PGPROC     *proc;           /* PGPROC of backend */
                                143                 :     /* nextMsgNum is meaningless if procPid == 0 or resetState is true. */
                                144                 :     int         nextMsgNum;     /* next message number to read */
                                145                 :     bool        resetState;     /* backend needs to reset its state */
                                146                 :     bool        signaled;       /* backend has been sent catchup signal */
                                147                 :     bool        hasMessages;    /* backend has unread messages */
                                148                 : 
                                149                 :     /*
                                150                 :      * Backend only sends invalidations, never receives them. This only makes
                                151                 :      * sense for Startup process during recovery because it doesn't maintain a
                                152                 :      * relcache, yet it fires inval messages to allow query backends to see
                                153                 :      * schema changes.
                                154                 :      */
                                155                 :     bool        sendOnly;       /* backend only sends, never receives */
                                156                 : 
                                157                 :     /*
                                158                 :      * Next LocalTransactionId to use for each idle backend slot.  We keep
                                159                 :      * this here because it is indexed by BackendId and it is convenient to
                                160                 :      * copy the value to and from local memory when MyBackendId is set. It's
                                161                 :      * meaningless in an active ProcState entry.
                                162                 :      */
                                163                 :     LocalTransactionId nextLXID;
                                164                 : } ProcState;
                                165                 : 
                                166                 : /* Shared cache invalidation memory segment */
                                167                 : typedef struct SISeg
                                168                 : {
                                169                 :     /*
                                170                 :      * General state information
                                171                 :      */
                                172                 :     int         minMsgNum;      /* oldest message still needed */
                                173                 :     int         maxMsgNum;      /* next message number to be assigned */
                                174                 :     int         nextThreshold;  /* # of messages to call SICleanupQueue */
                                175                 :     int         lastBackend;    /* index of last active procState entry, +1 */
                                176                 :     int         maxBackends;    /* size of procState array */
                                177                 : 
                                178                 :     slock_t     msgnumLock;     /* spinlock protecting maxMsgNum */
                                179                 : 
                                180                 :     /*
                                181                 :      * Circular buffer holding shared-inval messages
                                182                 :      */
                                183                 :     SharedInvalidationMessage buffer[MAXNUMMESSAGES];
                                184                 : 
                                185                 :     /*
                                186                 :      * Per-backend invalidation state info (has MaxBackends entries).
                                187                 :      */
                                188                 :     ProcState   procState[FLEXIBLE_ARRAY_MEMBER];
                                189                 : } SISeg;
                                190                 : 
                                191                 : static SISeg *shmInvalBuffer;   /* pointer to the shared inval buffer */
                                192                 : 
                                193                 : 
                                194                 : static LocalTransactionId nextLocalTransactionId;
                                195                 : 
                                196                 : static void CleanupInvalidationState(int status, Datum arg);
                                197                 : 
                                198                 : 
                                199                 : /*
                                200                 :  * SInvalShmemSize --- return shared-memory space needed
                                201                 :  */
                                202                 : Size
 6441 tgl                       203 CBC        4564 : SInvalShmemSize(void)
                                204                 : {
                                205                 :     Size        size;
                                206                 : 
                                207            4564 :     size = offsetof(SISeg, procState);
                                208                 : 
                                209                 :     /*
                                210                 :      * In Hot Standby mode, the startup process requests a procState array
                                211                 :      * slot using InitRecoveryTransactionEnvironment(). Even though
                                212                 :      * MaxBackends doesn't account for the startup process, it is guaranteed
                                213                 :      * to get a free slot. This is because the autovacuum launcher and worker
                                214                 :      * processes, which are included in MaxBackends, are not started in Hot
                                215                 :      * Standby mode.
                                216                 :      */
  362 rhaas                     217            4564 :     size = add_size(size, mul_size(sizeof(ProcState), MaxBackends));
                                218                 : 
 6441 tgl                       219            4564 :     return size;
                                220                 : }
                                221                 : 
                                222                 : /*
                                223                 :  * CreateSharedInvalidationState
                                224                 :  *      Create and initialize the SI message buffer
                                225                 :  */
                                226                 : void
 5502 alvherre                  227            1826 : CreateSharedInvalidationState(void)
                                228                 : {
                                229                 :     int         i;
                                230                 :     bool        found;
                                231                 : 
                                232                 :     /* Allocate space in shared memory */
                                233            1826 :     shmInvalBuffer = (SISeg *)
 2969 tgl                       234            1826 :         ShmemInitStruct("shmInvalBuffer", SInvalShmemSize(), &found);
 7050 bruce                     235            1826 :     if (found)
 7050 bruce                     236 UBC           0 :         return;
                                237                 : 
                                238                 :     /* Clear message counters, save size of procState array, init spinlock */
 5502 alvherre                  239 CBC        1826 :     shmInvalBuffer->minMsgNum = 0;
                                240            1826 :     shmInvalBuffer->maxMsgNum = 0;
 5407 tgl                       241            1826 :     shmInvalBuffer->nextThreshold = CLEANUP_MIN;
 5502 alvherre                  242            1826 :     shmInvalBuffer->lastBackend = 0;
  362 rhaas                     243            1826 :     shmInvalBuffer->maxBackends = MaxBackends;
 5406 tgl                       244            1826 :     SpinLockInit(&shmInvalBuffer->msgnumLock);
                                245                 : 
                                246                 :     /* The buffer[] array is initially all unused, so we need not fill it */
                                247                 : 
                                248                 :     /* Mark all backends inactive, and initialize nextLXID */
 5502 alvherre                  249          193203 :     for (i = 0; i < shmInvalBuffer->maxBackends; i++)
                                250                 :     {
 2118 tgl                       251          191377 :         shmInvalBuffer->procState[i].procPid = 0;    /* inactive */
 4266 rhaas                     252          191377 :         shmInvalBuffer->procState[i].proc = NULL;
 5050 bruce                     253          191377 :         shmInvalBuffer->procState[i].nextMsgNum = 0; /* meaningless */
 5502 alvherre                  254          191377 :         shmInvalBuffer->procState[i].resetState = false;
 5407 tgl                       255          191377 :         shmInvalBuffer->procState[i].signaled = false;
 4272 rhaas                     256          191377 :         shmInvalBuffer->procState[i].hasMessages = false;
 5407 tgl                       257          191377 :         shmInvalBuffer->procState[i].nextLXID = InvalidLocalTransactionId;
                                258                 :     }
                                259                 : }
                                260                 : 
                                261                 : /*
                                262                 :  * SharedInvalBackendInit
                                263                 :  *      Initialize a new backend to operate on the sinval buffer
                                264                 :  */
                                265                 : void
 4859 simon                     266           11571 : SharedInvalBackendInit(bool sendOnly)
                                267                 : {
                                268                 :     int         index;
 8397 bruce                     269           11571 :     ProcState  *stateP = NULL;
 5502 alvherre                  270           11571 :     SISeg      *segP = shmInvalBuffer;
                                271                 : 
                                272                 :     /*
                                273                 :      * This can run in parallel with read operations, but not with write
                                274                 :      * operations, since SIInsertDataEntries relies on lastBackend to set
                                275                 :      * hasMessages appropriately.
                                276                 :      */
 5407 tgl                       277           11571 :     LWLockAcquire(SInvalWriteLock, LW_EXCLUSIVE);
                                278                 : 
                                279                 :     /* Look for a free entry in the procState array */
 8183                           280           44280 :     for (index = 0; index < segP->lastBackend; index++)
                                281                 :     {
 2118                           282           34031 :         if (segP->procState[index].procPid == 0) /* inactive slot? */
                                283                 :         {
 8616                           284            1322 :             stateP = &segP->procState[index];
                                285            1322 :             break;
                                286                 :         }
                                287                 :     }
                                288                 : 
                                289           11571 :     if (stateP == NULL)
                                290                 :     {
 7444                           291           10249 :         if (segP->lastBackend < segP->maxBackends)
                                292                 :         {
                                293           10249 :             stateP = &segP->procState[segP->lastBackend];
 5234 heikki.linnakangas        294           10249 :             Assert(stateP->procPid == 0);
 7444 tgl                       295           10249 :             segP->lastBackend++;
                                296                 :         }
                                297                 :         else
                                298                 :         {
                                299                 :             /*
                                300                 :              * out of procState slots: MaxBackends exceeded -- report normally
                                301                 :              */
 7444 tgl                       302 UBC           0 :             MyBackendId = InvalidBackendId;
 5407                           303               0 :             LWLockRelease(SInvalWriteLock);
 5502 alvherre                  304               0 :             ereport(FATAL,
                                305                 :                     (errcode(ERRCODE_TOO_MANY_CONNECTIONS),
                                306                 :                      errmsg("sorry, too many clients already")));
                                307                 :         }
                                308                 :     }
                                309                 : 
 8616 tgl                       310 CBC       11571 :     MyBackendId = (stateP - &segP->procState[0]) + 1;
                                311                 : 
                                312                 :     /* Advertise assigned backend ID in MyProc */
 5695                           313           11571 :     MyProc->backendId = MyBackendId;
                                314                 : 
                                315                 :     /* Fetch next local transaction ID into local memory */
 5407                           316           11571 :     nextLocalTransactionId = stateP->nextLXID;
                                317                 : 
                                318                 :     /* mark myself active, with all extant messages already read */
 5234 heikki.linnakangas        319           11571 :     stateP->procPid = MyProcPid;
 4266 rhaas                     320           11571 :     stateP->proc = MyProc;
 8616 tgl                       321           11571 :     stateP->nextMsgNum = segP->maxMsgNum;
 8598                           322           11571 :     stateP->resetState = false;
 5407                           323           11571 :     stateP->signaled = false;
 4272 rhaas                     324           11571 :     stateP->hasMessages = false;
 4859 simon                     325           11571 :     stateP->sendOnly = sendOnly;
                                326                 : 
 5407 tgl                       327           11571 :     LWLockRelease(SInvalWriteLock);
                                328                 : 
                                329                 :     /* register exit routine to mark my entry inactive at exit */
 8224 peter_e                   330           11571 :     on_shmem_exit(CleanupInvalidationState, PointerGetDatum(segP));
                                331                 : 
 4312                           332           11571 :     elog(DEBUG4, "my backend ID is %d", MyBackendId);
 9345 bruce                     333           11571 : }
                                334                 : 
                                335                 : /*
                                336                 :  * CleanupInvalidationState
                                337                 :  *      Mark the current backend as no longer active.
                                338                 :  *
                                339                 :  * This function is called via on_shmem_exit() during backend shutdown.
                                340                 :  *
                                341                 :  * arg is really of type "SISeg*".
                                342                 :  */
                                343                 : static void
 8224 peter_e                   344           11571 : CleanupInvalidationState(int status, Datum arg)
                                345                 : {
 8183 tgl                       346           11571 :     SISeg      *segP = (SISeg *) DatumGetPointer(arg);
                                347                 :     ProcState  *stateP;
                                348                 :     int         i;
                                349                 : 
 8616                           350           11571 :     Assert(PointerIsValid(segP));
                                351                 : 
 5407                           352           11571 :     LWLockAcquire(SInvalWriteLock, LW_EXCLUSIVE);
                                353                 : 
                                354           11571 :     stateP = &segP->procState[MyBackendId - 1];
                                355                 : 
                                356                 :     /* Update next local transaction ID for next holder of this backendID */
                                357           11571 :     stateP->nextLXID = nextLocalTransactionId;
                                358                 : 
                                359                 :     /* Mark myself inactive */
 5234 heikki.linnakangas        360           11571 :     stateP->procPid = 0;
 4266 rhaas                     361           11571 :     stateP->proc = NULL;
 5407 tgl                       362           11571 :     stateP->nextMsgNum = 0;
                                363           11571 :     stateP->resetState = false;
                                364           11571 :     stateP->signaled = false;
                                365                 : 
                                366                 :     /* Recompute index of last active backend */
 8183                           367           21788 :     for (i = segP->lastBackend; i > 0; i--)
                                368                 :     {
 5234 heikki.linnakangas        369           20763 :         if (segP->procState[i - 1].procPid != 0)
 8183 tgl                       370           10546 :             break;
                                371                 :     }
                                372           11571 :     segP->lastBackend = i;
                                373                 : 
 5407                           374           11571 :     LWLockRelease(SInvalWriteLock);
 8616                           375           11571 : }
                                376                 : 
                                377                 : /*
                                378                 :  * BackendIdGetProc
                                379                 :  *      Get the PGPROC structure for a backend, given the backend ID.
                                380                 :  *      The result may be out of date arbitrarily quickly, so the caller
                                381                 :  *      must be careful about how this information is used.  NULL is
                                382                 :  *      returned if the backend is not active.
                                383                 :  */
                                384                 : PGPROC *
 4266 rhaas                     385             619 : BackendIdGetProc(int backendID)
                                386                 : {
                                387             619 :     PGPROC     *result = NULL;
 5395 tgl                       388             619 :     SISeg      *segP = shmInvalBuffer;
                                389                 : 
                                390                 :     /* Need to lock out additions/removals of backends */
                                391             619 :     LWLockAcquire(SInvalWriteLock, LW_SHARED);
                                392                 : 
                                393             619 :     if (backendID > 0 && backendID <= segP->lastBackend)
                                394                 :     {
                                395             615 :         ProcState  *stateP = &segP->procState[backendID - 1];
                                396                 : 
 4266 rhaas                     397             615 :         result = stateP->proc;
                                398                 :     }
                                399                 : 
 5395 tgl                       400             619 :     LWLockRelease(SInvalWriteLock);
                                401                 : 
                                402             619 :     return result;
                                403                 : }
                                404                 : 
                                405                 : /*
                                406                 :  * BackendIdGetTransactionIds
                                407                 :  *      Get the xid, xmin, nsubxid and overflow status of the backend. The
                                408                 :  *      result may be out of date arbitrarily quickly, so the caller must be
                                409                 :  *      careful about how this information is used.
                                410                 :  */
                                411                 : void
  111 rhaas                     412 GNC        4284 : BackendIdGetTransactionIds(int backendID, TransactionId *xid,
                                413                 :                            TransactionId *xmin, int *nsubxid, bool *overflowed)
                                414                 : {
 3330 rhaas                     415 GIC        4284 :     SISeg      *segP = shmInvalBuffer;
 3330 rhaas                     416 ECB             : 
 3330 rhaas                     417 GIC        4284 :     *xid = InvalidTransactionId;
 3330 rhaas                     418 CBC        4284 :     *xmin = InvalidTransactionId;
  111 rhaas                     419 GNC        4284 :     *nsubxid = 0;
                                420            4284 :     *overflowed = false;
 3330 rhaas                     421 ECB             : 
                                422                 :     /* Need to lock out additions/removals of backends */
 3330 rhaas                     423 CBC        4284 :     LWLockAcquire(SInvalWriteLock, LW_SHARED);
                                424                 : 
 3330 rhaas                     425 GIC        4284 :     if (backendID > 0 && backendID <= segP->lastBackend)
 3330 rhaas                     426 ECB             :     {
 2932 tgl                       427 GIC        2509 :         ProcState  *stateP = &segP->procState[backendID - 1];
 2932 tgl                       428 CBC        2509 :         PGPROC     *proc = stateP->proc;
                                429                 : 
                                430            2509 :         if (proc != NULL)
 2932 tgl                       431 ECB             :         {
  968 andres                    432 GIC        2509 :             *xid = proc->xid;
  969 andres                    433 CBC        2509 :             *xmin = proc->xmin;
  111 rhaas                     434 GNC        2509 :             *nsubxid = proc->subxidStatus.count;
                                435            2509 :             *overflowed = proc->subxidStatus.overflowed;
                                436                 :         }
 3330 rhaas                     437 ECB             :     }
                                438                 : 
 3330 rhaas                     439 CBC        4284 :     LWLockRelease(SInvalWriteLock);
                                440            4284 : }
                                441                 : 
                                442                 : /*
                                443                 :  * SIInsertDataEntries
 5407 tgl                       444 ECB             :  *      Add new invalidation message(s) to the buffer.
 8616                           445                 :  */
                                446                 : void
 5407 tgl                       447 GIC      584302 : SIInsertDataEntries(const SharedInvalidationMessage *data, int n)
                                448                 : {
                                449          584302 :     SISeg      *segP = shmInvalBuffer;
                                450                 : 
                                451                 :     /*
 3260 bruce                     452 ECB             :      * N can be arbitrarily large.  We divide the work into groups of no more
                                453                 :      * than WRITE_QUANTUM messages, to be sure that we don't hold the lock for
 5407 tgl                       454                 :      * an unreasonably long time.  (This is not so much because we care about
                                455                 :      * letting in other writers, as that some just-caught-up backend might be
                                456                 :      * trying to do SICleanupQueue to pass on its signal, and we don't want it
                                457                 :      * to have to wait a long time.)  Also, we need to consider calling
                                458                 :      * SICleanupQueue every so often.
                                459                 :      */
 5407 tgl                       460 GIC     1208774 :     while (n > 0)
                                461                 :     {
 5050 bruce                     462          624472 :         int         nthistime = Min(n, WRITE_QUANTUM);
                                463                 :         int         numMsgs;
                                464                 :         int         max;
 4272 rhaas                     465 ECB             :         int         i;
                                466                 : 
 5407 tgl                       467 CBC      624472 :         n -= nthistime;
                                468                 : 
 5407 tgl                       469 GIC      624472 :         LWLockAcquire(SInvalWriteLock, LW_EXCLUSIVE);
                                470                 : 
                                471                 :         /*
 5407 tgl                       472 ECB             :          * If the buffer is full, we *must* acquire some space.  Clean the
                                473                 :          * queue and reset anyone who is preventing space from being freed.
                                474                 :          * Otherwise, clean the queue only when it's exceeded the next
                                475                 :          * fullness threshold.  We have to loop and recheck the buffer state
                                476                 :          * after any call of SICleanupQueue.
                                477                 :          */
                                478                 :         for (;;)
                                479                 :         {
 5378 tgl                       480 GIC      631644 :             numMsgs = segP->maxMsgNum - segP->minMsgNum;
                                481          631644 :             if (numMsgs + nthistime > MAXNUMMESSAGES ||
                                482          631462 :                 numMsgs >= segP->nextThreshold)
                                483            7172 :                 SICleanupQueue(true, nthistime);
                                484                 :             else
 5378 tgl                       485 ECB             :                 break;
 8613                           486                 :         }
 8616                           487                 : 
 5407                           488                 :         /*
                                489                 :          * Insert new message(s) into proper slot of circular buffer
                                490                 :          */
 5406 tgl                       491 GIC      624472 :         max = segP->maxMsgNum;
 5407                           492         6868815 :         while (nthistime-- > 0)
                                493                 :         {
 5406                           494         6244343 :             segP->buffer[max % MAXNUMMESSAGES] = *data++;
                                495         6244343 :             max++;
 5406 tgl                       496 ECB             :         }
                                497                 : 
                                498                 :         /* Update current value of maxMsgNum using spinlock */
 2732 rhaas                     499 CBC      624472 :         SpinLockAcquire(&segP->msgnumLock);
                                500          624472 :         segP->maxMsgNum = max;
 2732 rhaas                     501 GIC      624472 :         SpinLockRelease(&segP->msgnumLock);
                                502                 : 
                                503                 :         /*
 3955 bruce                     504 ECB             :          * Now that the maxMsgNum change is globally visible, we give everyone
                                505                 :          * a swift kick to make sure they read the newly added messages.
                                506                 :          * Releasing SInvalWriteLock will enforce a full memory barrier, so
                                507                 :          * these (unlocked) changes will be committed to memory before we exit
                                508                 :          * the function.
                                509                 :          */
 4272 rhaas                     510 GIC     2608254 :         for (i = 0; i < segP->lastBackend; i++)
                                511                 :         {
                                512         1983782 :             ProcState  *stateP = &segP->procState[i];
                                513                 : 
 4269                           514         1983782 :             stateP->hasMessages = true;
 4272 rhaas                     515 ECB             :         }
                                516                 : 
 5407 tgl                       517 CBC      624472 :         LWLockRelease(SInvalWriteLock);
                                518                 :     }
 9770 scrappy                   519          584302 : }
                                520                 : 
                                521                 : /*
 5407 tgl                       522 ECB             :  * SIGetDataEntries
                                523                 :  *      get next SI message(s) for current backend, if there are any
 8616                           524                 :  *
                                525                 :  * Possible return values:
                                526                 :  *  0:   no SI message available
                                527                 :  *  n>0: next n SI messages have been extracted into data[]
                                528                 :  * -1:   SI reset message extracted
                                529                 :  *
                                530                 :  * If the return value is less than the array size "datasize", the caller
                                531                 :  * can assume that there are no more SI messages after the one(s) returned.
                                532                 :  * Otherwise, another call is needed to collect more messages.
                                533                 :  *
                                534                 :  * NB: this can run in parallel with other instances of SIGetDataEntries
                                535                 :  * executing on behalf of other backends, since each instance will modify only
                                536                 :  * fields of its own backend's ProcState, and no instance will look at fields
                                537                 :  * of other backends' ProcStates.  We express this by grabbing SInvalReadLock
                                538                 :  * in shared mode.  Note that this is not exactly the normal (read-only)
                                539                 :  * interpretation of a shared lock! Look closely at the interactions before
                                540                 :  * allowing SInvalReadLock to be grabbed in shared mode for any other reason!
                                541                 :  *
                                542                 :  * NB: this can also run in parallel with SIInsertDataEntries.  It is not
                                543                 :  * guaranteed that we will return any messages added after the routine is
                                544                 :  * entered.
                                545                 :  *
                                546                 :  * Note: we assume that "datasize" is not so large that it might be important
                                547                 :  * to break our hold on SInvalReadLock into segments.
                                548                 :  */
                                549                 : int
 5407 tgl                       550 GIC    26828383 : SIGetDataEntries(SharedInvalidationMessage *data, int datasize)
                                551                 : {
                                552                 :     SISeg      *segP;
                                553                 :     ProcState  *stateP;
                                554                 :     int         max;
 5407 tgl                       555 ECB             :     int         n;
                                556                 : 
 5502 alvherre                  557 GIC    26828383 :     segP = shmInvalBuffer;
 5407 tgl                       558        26828383 :     stateP = &segP->procState[MyBackendId - 1];
                                559                 : 
                                560                 :     /*
                                561                 :      * Before starting to take locks, do a quick, unlocked test to see whether
 3260 bruce                     562 ECB             :      * there can possibly be anything to read.  On a multiprocessor system,
 3955                           563                 :      * it's possible that this load could migrate backwards and occur before
                                564                 :      * we actually enter this function, so we might miss a sinval message that
                                565                 :      * was just added by some other processor.  But they can't migrate
                                566                 :      * backwards over a preceding lock acquisition, so it should be OK.  If we
                                567                 :      * haven't acquired a lock preventing against further relevant
                                568                 :      * invalidations, any such occurrence is not much different than if the
                                569                 :      * invalidation had arrived slightly later in the first place.
                                570                 :      */
 4272 rhaas                     571 GIC    26828383 :     if (!stateP->hasMessages)
                                572        26026336 :         return 0;
                                573                 : 
                                574          802047 :     LWLockAcquire(SInvalReadLock, LW_SHARED);
                                575                 : 
 4272 rhaas                     576 ECB             :     /*
                                577                 :      * We must reset hasMessages before determining how many messages we're
                                578                 :      * going to read.  That way, if new messages arrive after we have
                                579                 :      * determined how many we're reading, the flag will get reset and we'll
                                580                 :      * notice those messages part-way through.
                                581                 :      *
                                582                 :      * Note that, if we don't end up reading all of the messages, we had
                                583                 :      * better be certain to reset this flag before exiting!
                                584                 :      */
 4269 rhaas                     585 GIC      802047 :     stateP->hasMessages = false;
                                586                 : 
                                587                 :     /* Fetch current value of maxMsgNum using spinlock */
 2732                           588          802047 :     SpinLockAcquire(&segP->msgnumLock);
                                589          802047 :     max = segP->maxMsgNum;
 2732 rhaas                     590 CBC      802047 :     SpinLockRelease(&segP->msgnumLock);
                                591                 : 
 8616 tgl                       592 GIC      802047 :     if (stateP->resetState)
 9345 bruce                     593 ECB             :     {
 8397                           594                 :         /*
                                595                 :          * Force reset.  We can say we have dealt with any messages added
                                596                 :          * since the reset, as well; and that means we should clear the
 5407 tgl                       597                 :          * signaled flag, too.
                                598                 :          */
 5406 tgl                       599 GIC         217 :         stateP->nextMsgNum = max;
 5407                           600             217 :         stateP->resetState = false;
                                601             217 :         stateP->signaled = false;
                                602             217 :         LWLockRelease(SInvalReadLock);
 8618                           603             217 :         return -1;
 9345 bruce                     604 ECB             :     }
 8618 tgl                       605                 : 
 8616                           606                 :     /*
 5407                           607                 :      * Retrieve messages and advance backend's counter, until data array is
                                608                 :      * full or there are no more messages.
                                609                 :      *
                                610                 :      * There may be other backends that haven't read the message(s), so we
                                611                 :      * cannot delete them here.  SICleanupQueue() will eventually remove them
                                612                 :      * from the queue.
                                613                 :      */
 5407 tgl                       614 GIC      801830 :     n = 0;
 5406                           615        16747234 :     while (n < datasize && stateP->nextMsgNum < max)
                                616                 :     {
 5407                           617        15945404 :         data[n++] = segP->buffer[stateP->nextMsgNum % MAXNUMMESSAGES];
                                618        15945404 :         stateP->nextMsgNum++;
 5407 tgl                       619 ECB             :     }
 8618                           620                 : 
                                621                 :     /*
 4272 rhaas                     622                 :      * If we have caught up completely, reset our "signaled" flag so that
                                623                 :      * we'll get another signal if we fall behind again.
                                624                 :      *
                                625                 :      * If we haven't caught up completely, reset the hasMessages flag so that
                                626                 :      * we see the remaining messages next time.
                                627                 :      */
 5406 tgl                       628 GIC      801830 :     if (stateP->nextMsgNum >= max)
 5407                           629          402408 :         stateP->signaled = false;
                                630                 :     else
 4269 rhaas                     631          399422 :         stateP->hasMessages = true;
                                632                 : 
 5407 tgl                       633 CBC      801830 :     LWLockRelease(SInvalReadLock);
                                634          801830 :     return n;
                                635                 : }
 9770 scrappy                   636 ECB             : 
                                637                 : /*
 5407 tgl                       638                 :  * SICleanupQueue
 8616                           639                 :  *      Remove messages that have been consumed by all active backends
                                640                 :  *
                                641                 :  * callerHasWriteLock is true if caller is holding SInvalWriteLock.
                                642                 :  * minFree is the minimum number of message slots to make free.
                                643                 :  *
                                644                 :  * Possible side effects of this routine include marking one or more
                                645                 :  * backends as "reset" in the array, and sending PROCSIG_CATCHUP_INTERRUPT
                                646                 :  * to some backend that seems to be getting too far behind.  We signal at
                                647                 :  * most one backend at a time, for reasons explained at the top of the file.
                                648                 :  *
                                649                 :  * Caution: because we transiently release write lock when we have to signal
                                650                 :  * some other backend, it is NOT guaranteed that there are still minFree
                                651                 :  * free message slots at exit.  Caller must recheck and perhaps retry.
                                652                 :  */
                                653                 : void
 5407 tgl                       654 GIC        9820 : SICleanupQueue(bool callerHasWriteLock, int minFree)
                                655                 : {
 5502 alvherre                  656            9820 :     SISeg      *segP = shmInvalBuffer;
                                657                 :     int         min,
                                658                 :                 minsig,
 5407 tgl                       659 ECB             :                 lowbound,
                                660                 :                 numMsgs,
                                661                 :                 i;
 5407 tgl                       662 GIC        9820 :     ProcState  *needSig = NULL;
                                663                 : 
                                664                 :     /* Lock out all writers and readers */
                                665            9820 :     if (!callerHasWriteLock)
                                666            2648 :         LWLockAcquire(SInvalWriteLock, LW_EXCLUSIVE);
 5407 tgl                       667 CBC        9820 :     LWLockAcquire(SInvalReadLock, LW_EXCLUSIVE);
                                668                 : 
                                669                 :     /*
 5050 bruce                     670 ECB             :      * Recompute minMsgNum = minimum of all backends' nextMsgNum, identify the
                                671                 :      * furthest-back backend that needs signaling (if any), and reset any
 3260                           672                 :      * backends that are too far back.  Note that because we ignore sendOnly
                                673                 :      * backends here it is possible for them to keep sending messages without
                                674                 :      * a problem even when they are the only active backend.
                                675                 :      */
 8616 tgl                       676 GIC        9820 :     min = segP->maxMsgNum;
 5407                           677            9820 :     minsig = min - SIG_THRESHOLD;
                                678            9820 :     lowbound = min - MAXNUMMESSAGES + minFree;
                                679                 : 
 8183                           680           75566 :     for (i = 0; i < segP->lastBackend; i++)
 9345 bruce                     681 ECB             :     {
 5407 tgl                       682 CBC       65746 :         ProcState  *stateP = &segP->procState[i];
 5050 bruce                     683           65746 :         int         n = stateP->nextMsgNum;
                                684                 : 
 5407 tgl                       685 ECB             :         /* Ignore if inactive or already in reset state */
 4859 simon                     686 GIC       65746 :         if (stateP->procPid == 0 || stateP->resetState || stateP->sendOnly)
 5407 tgl                       687 CBC       20521 :             continue;
 5407 tgl                       688 ECB             : 
                                689                 :         /*
                                690                 :          * If we must free some space and this backend is preventing it, force
 5050 bruce                     691                 :          * him into reset state and then ignore until he catches up.
 5407 tgl                       692                 :          */
 5407 tgl                       693 GIC       45225 :         if (n < lowbound)
                                694                 :         {
                                695             218 :             stateP->resetState = true;
                                696                 :             /* no point in signaling him ... */
                                697             218 :             continue;
 5407 tgl                       698 ECB             :         }
                                699                 : 
                                700                 :         /* Track the global minimum nextMsgNum */
 5407 tgl                       701 GIC       45007 :         if (n < min)
 5407 tgl                       702 CBC       13470 :             min = n;
                                703                 : 
                                704                 :         /* Also see who's furthest back of the unsignaled backends */
 5407 tgl                       705 GIC       45007 :         if (n < minsig && !stateP->signaled)
 5407 tgl                       706 ECB             :         {
 5407 tgl                       707 CBC        2706 :             minsig = n;
 5407 tgl                       708 GIC        2706 :             needSig = stateP;
                                709                 :         }
 9345 bruce                     710 ECB             :     }
 8616 tgl                       711 GIC        9820 :     segP->minMsgNum = min;
 9345 bruce                     712 ECB             : 
 8397                           713                 :     /*
                                714                 :      * When minMsgNum gets really large, decrement all message counters so as
                                715                 :      * to forestall overflow of the counters.  This happens seldom enough that
 5050                           716                 :      * folding it into the previous loop would be a loser.
                                717                 :      */
 8616 tgl                       718 GIC        9820 :     if (min >= MSGNUMWRAPAROUND)
                                719                 :     {
 8616 tgl                       720 UIC           0 :         segP->minMsgNum -= MSGNUMWRAPAROUND;
                                721               0 :         segP->maxMsgNum -= MSGNUMWRAPAROUND;
 8183                           722               0 :         for (i = 0; i < segP->lastBackend; i++)
 9345 bruce                     723 ECB             :         {
                                724                 :             /* we don't bother skipping inactive entries here */
 5407 tgl                       725 UBC           0 :             segP->procState[i].nextMsgNum -= MSGNUMWRAPAROUND;
 9345 bruce                     726 EUB             :         }
                                727                 :     }
                                728                 : 
                                729                 :     /*
 5407 tgl                       730                 :      * Determine how many messages are still in the queue, and set the
                                731                 :      * threshold at which we should repeat SICleanupQueue().
                                732                 :      */
 5407 tgl                       733 GIC        9820 :     numMsgs = segP->maxMsgNum - segP->minMsgNum;
                                734            9820 :     if (numMsgs < CLEANUP_MIN)
                                735            3429 :         segP->nextThreshold = CLEANUP_MIN;
                                736                 :     else
                                737            6391 :         segP->nextThreshold = (numMsgs / CLEANUP_QUANTUM + 1) * CLEANUP_QUANTUM;
 5407 tgl                       738 ECB             : 
                                739                 :     /*
 5000                           740                 :      * Lastly, signal anyone who needs a catchup interrupt.  Since
                                741                 :      * SendProcSignal() might not be fast, we don't want to hold locks while
                                742                 :      * executing it.
                                743                 :      */
 5407 tgl                       744 GIC        9820 :     if (needSig)
                                745                 :     {
 5050 bruce                     746            2660 :         pid_t       his_pid = needSig->procPid;
 5000 tgl                       747            2660 :         BackendId   his_backendId = (needSig - &segP->procState[0]) + 1;
                                748                 : 
 5407 tgl                       749 CBC        2660 :         needSig->signaled = true;
 5407 tgl                       750 GIC        2660 :         LWLockRelease(SInvalReadLock);
 5407 tgl                       751 CBC        2660 :         LWLockRelease(SInvalWriteLock);
 5234 heikki.linnakangas        752            2660 :         elog(DEBUG4, "sending sinval catchup signal to PID %d", (int) his_pid);
 5000 tgl                       753 GIC        2660 :         SendProcSignal(his_pid, PROCSIG_CATCHUP_INTERRUPT, his_backendId);
 5407 tgl                       754 CBC        2660 :         if (callerHasWriteLock)
                                755            1980 :             LWLockAcquire(SInvalWriteLock, LW_EXCLUSIVE);
 5407 tgl                       756 ECB             :     }
                                757                 :     else
                                758                 :     {
 5407 tgl                       759 CBC        7160 :         LWLockRelease(SInvalReadLock);
                                760            7160 :         if (!callerHasWriteLock)
 5407 tgl                       761 GIC        1968 :             LWLockRelease(SInvalWriteLock);
                                762                 :     }
 9345 bruce                     763            9820 : }
 5695 tgl                       764 ECB             : 
                                765                 : 
                                766                 : /*
                                767                 :  * GetNextLocalTransactionId --- allocate a new LocalTransactionId
                                768                 :  *
                                769                 :  * We split VirtualTransactionIds into two parts so that it is possible
                                770                 :  * to allocate a new one without any contention for shared memory, except
                                771                 :  * for a bit of additional overhead during backend startup/shutdown.
                                772                 :  * The high-order part of a VirtualTransactionId is a BackendId, and the
                                773                 :  * low-order part is a LocalTransactionId, which we assign from a local
                                774                 :  * counter.  To avoid the risk of a VirtualTransactionId being reused
                                775                 :  * within a short interval, successive procs occupying the same backend ID
                                776                 :  * slot should use a consecutive sequence of local IDs, which is implemented
                                777                 :  * by copying nextLocalTransactionId as seen above.
                                778                 :  */
                                779                 : LocalTransactionId
 5695 tgl                       780 GIC      486240 : GetNextLocalTransactionId(void)
                                781                 : {
                                782                 :     LocalTransactionId result;
                                783                 : 
                                784                 :     /* loop to avoid returning InvalidLocalTransactionId at wraparound */
 5624 bruce                     785 ECB             :     do
                                786                 :     {
 5695 tgl                       787 GIC      488487 :         result = nextLocalTransactionId++;
                                788          488487 :     } while (!LocalTransactionIdIsValid(result));
                                789                 : 
                                790          486240 :     return result;
                                791                 : }
        

Generated by: LCOV version v1.16-55-g56c0a2a