LCOV - differential code coverage report
Current view: top level - src/backend/storage/ipc - sinvaladt.c (source / functions) Coverage Total Hit UIC UBC GIC GNC CBC EUB ECB DCB
Current: Differential Code Coverage HEAD vs 15 Lines: 95.9 % 193 185 3 5 77 5 103 3 81 1
Current Date: 2023-04-08 15:15:32 Functions: 100.0 % 10 10 4 1 5 4 1
Baseline: 15
Baseline Date: 2023-04-08 15:09:40
Legend: Lines: hit not hit

           TLA  Line data    Source code
       1                 : /*-------------------------------------------------------------------------
       2                 :  *
       3                 :  * sinvaladt.c
       4                 :  *    POSTGRES shared cache invalidation data manager.
       5                 :  *
       6                 :  * Portions Copyright (c) 1996-2023, PostgreSQL Global Development Group
       7                 :  * Portions Copyright (c) 1994, Regents of the University of California
       8                 :  *
       9                 :  *
      10                 :  * IDENTIFICATION
      11                 :  *    src/backend/storage/ipc/sinvaladt.c
      12                 :  *
      13                 :  *-------------------------------------------------------------------------
      14                 :  */
      15                 : #include "postgres.h"
      16                 : 
      17                 : #include <signal.h>
      18                 : #include <unistd.h>
      19                 : 
      20                 : #include "access/transam.h"
      21                 : #include "miscadmin.h"
      22                 : #include "storage/backendid.h"
      23                 : #include "storage/ipc.h"
      24                 : #include "storage/proc.h"
      25                 : #include "storage/procsignal.h"
      26                 : #include "storage/shmem.h"
      27                 : #include "storage/sinvaladt.h"
      28                 : #include "storage/spin.h"
      29                 : 
      30                 : /*
      31                 :  * Conceptually, the shared cache invalidation messages are stored in an
      32                 :  * infinite array, where maxMsgNum is the next array subscript to store a
      33                 :  * submitted message in, minMsgNum is the smallest array subscript containing
      34                 :  * a message not yet read by all backends, and we always have maxMsgNum >=
      35                 :  * minMsgNum.  (They are equal when there are no messages pending.)  For each
      36                 :  * active backend, there is a nextMsgNum pointer indicating the next message it
      37                 :  * needs to read; we have maxMsgNum >= nextMsgNum >= minMsgNum for every
      38                 :  * backend.
      39                 :  *
      40                 :  * (In the current implementation, minMsgNum is a lower bound for the
      41                 :  * per-process nextMsgNum values, but it isn't rigorously kept equal to the
      42                 :  * smallest nextMsgNum --- it may lag behind.  We only update it when
      43                 :  * SICleanupQueue is called, and we try not to do that often.)
      44                 :  *
      45                 :  * In reality, the messages are stored in a circular buffer of MAXNUMMESSAGES
      46                 :  * entries.  We translate MsgNum values into circular-buffer indexes by
      47                 :  * computing MsgNum % MAXNUMMESSAGES (this should be fast as long as
      48                 :  * MAXNUMMESSAGES is a constant and a power of 2).  As long as maxMsgNum
      49                 :  * doesn't exceed minMsgNum by more than MAXNUMMESSAGES, we have enough space
      50                 :  * in the buffer.  If the buffer does overflow, we recover by setting the
      51                 :  * "reset" flag for each backend that has fallen too far behind.  A backend
      52                 :  * that is in "reset" state is ignored while determining minMsgNum.  When
      53                 :  * it does finally attempt to receive inval messages, it must discard all
      54                 :  * its invalidatable state, since it won't know what it missed.
      55                 :  *
      56                 :  * To reduce the probability of needing resets, we send a "catchup" interrupt
      57                 :  * to any backend that seems to be falling unreasonably far behind.  The
      58                 :  * normal behavior is that at most one such interrupt is in flight at a time;
      59                 :  * when a backend completes processing a catchup interrupt, it executes
      60                 :  * SICleanupQueue, which will signal the next-furthest-behind backend if
      61                 :  * needed.  This avoids undue contention from multiple backends all trying
      62                 :  * to catch up at once.  However, the furthest-back backend might be stuck
      63                 :  * in a state where it can't catch up.  Eventually it will get reset, so it
      64                 :  * won't cause any more problems for anyone but itself.  But we don't want
      65                 :  * to find that a bunch of other backends are now too close to the reset
      66                 :  * threshold to be saved.  So SICleanupQueue is designed to occasionally
      67                 :  * send extra catchup interrupts as the queue gets fuller, to backends that
      68                 :  * are far behind and haven't gotten one yet.  As long as there aren't a lot
      69                 :  * of "stuck" backends, we won't need a lot of extra interrupts, since ones
      70                 :  * that aren't stuck will propagate their interrupts to the next guy.
      71                 :  *
      72                 :  * We would have problems if the MsgNum values overflow an integer, so
      73                 :  * whenever minMsgNum exceeds MSGNUMWRAPAROUND, we subtract MSGNUMWRAPAROUND
      74                 :  * from all the MsgNum variables simultaneously.  MSGNUMWRAPAROUND can be
      75                 :  * large so that we don't need to do this often.  It must be a multiple of
      76                 :  * MAXNUMMESSAGES so that the existing circular-buffer entries don't need
      77                 :  * to be moved when we do it.
      78                 :  *
      79                 :  * Access to the shared sinval array is protected by two locks, SInvalReadLock
      80                 :  * and SInvalWriteLock.  Readers take SInvalReadLock in shared mode; this
      81                 :  * authorizes them to modify their own ProcState but not to modify or even
      82                 :  * look at anyone else's.  When we need to perform array-wide updates,
      83                 :  * such as in SICleanupQueue, we take SInvalReadLock in exclusive mode to
      84                 :  * lock out all readers.  Writers take SInvalWriteLock (always in exclusive
      85                 :  * mode) to serialize adding messages to the queue.  Note that a writer
      86                 :  * can operate in parallel with one or more readers, because the writer
      87                 :  * has no need to touch anyone's ProcState, except in the infrequent cases
      88                 :  * when SICleanupQueue is needed.  The only point of overlap is that
      89                 :  * the writer wants to change maxMsgNum while readers need to read it.
      90                 :  * We deal with that by having a spinlock that readers must take for just
      91                 :  * long enough to read maxMsgNum, while writers take it for just long enough
      92                 :  * to write maxMsgNum.  (The exact rule is that you need the spinlock to
      93                 :  * read maxMsgNum if you are not holding SInvalWriteLock, and you need the
      94                 :  * spinlock to write maxMsgNum unless you are holding both locks.)
      95                 :  *
      96                 :  * Note: since maxMsgNum is an int and hence presumably atomically readable/
      97                 :  * writable, the spinlock might seem unnecessary.  The reason it is needed
      98                 :  * is to provide a memory barrier: we need to be sure that messages written
      99                 :  * to the array are actually there before maxMsgNum is increased, and that
     100                 :  * readers will see that data after fetching maxMsgNum.  Multiprocessors
     101                 :  * that have weak memory-ordering guarantees can fail without the memory
     102                 :  * barrier instructions that are included in the spinlock sequences.
     103                 :  */
     104                 : 
     105                 : 
     106                 : /*
     107                 :  * Configurable parameters.
     108                 :  *
     109                 :  * MAXNUMMESSAGES: max number of shared-inval messages we can buffer.
     110                 :  * Must be a power of 2 for speed.
     111                 :  *
     112                 :  * MSGNUMWRAPAROUND: how often to reduce MsgNum variables to avoid overflow.
     113                 :  * Must be a multiple of MAXNUMMESSAGES.  Should be large.
     114                 :  *
     115                 :  * CLEANUP_MIN: the minimum number of messages that must be in the buffer
     116                 :  * before we bother to call SICleanupQueue.
     117                 :  *
     118                 :  * CLEANUP_QUANTUM: how often (in messages) to call SICleanupQueue once
     119                 :  * we exceed CLEANUP_MIN.  Should be a power of 2 for speed.
     120                 :  *
     121                 :  * SIG_THRESHOLD: the minimum number of messages a backend must have fallen
     122                 :  * behind before we'll send it PROCSIG_CATCHUP_INTERRUPT.
     123                 :  *
     124                 :  * WRITE_QUANTUM: the max number of messages to push into the buffer per
     125                 :  * iteration of SIInsertDataEntries.  Noncritical but should be less than
     126                 :  * CLEANUP_QUANTUM, because we only consider calling SICleanupQueue once
     127                 :  * per iteration.
     128                 :  */
     129                 : 
     130                 : #define MAXNUMMESSAGES 4096
     131                 : #define MSGNUMWRAPAROUND (MAXNUMMESSAGES * 262144)
     132                 : #define CLEANUP_MIN (MAXNUMMESSAGES / 2)
     133                 : #define CLEANUP_QUANTUM (MAXNUMMESSAGES / 16)
     134                 : #define SIG_THRESHOLD (MAXNUMMESSAGES / 2)
     135                 : #define WRITE_QUANTUM 64
     136                 : 
     137                 : /* Per-backend state in shared invalidation structure */
     138                 : typedef struct ProcState
     139                 : {
     140                 :     /* procPid is zero in an inactive ProcState array entry. */
     141                 :     pid_t       procPid;        /* PID of backend, for signaling */
     142                 :     PGPROC     *proc;           /* PGPROC of backend */
     143                 :     /* nextMsgNum is meaningless if procPid == 0 or resetState is true. */
     144                 :     int         nextMsgNum;     /* next message number to read */
     145                 :     bool        resetState;     /* backend needs to reset its state */
     146                 :     bool        signaled;       /* backend has been sent catchup signal */
     147                 :     bool        hasMessages;    /* backend has unread messages */
     148                 : 
     149                 :     /*
     150                 :      * Backend only sends invalidations, never receives them. This only makes
     151                 :      * sense for Startup process during recovery because it doesn't maintain a
     152                 :      * relcache, yet it fires inval messages to allow query backends to see
     153                 :      * schema changes.
     154                 :      */
     155                 :     bool        sendOnly;       /* backend only sends, never receives */
     156                 : 
     157                 :     /*
     158                 :      * Next LocalTransactionId to use for each idle backend slot.  We keep
     159                 :      * this here because it is indexed by BackendId and it is convenient to
     160                 :      * copy the value to and from local memory when MyBackendId is set. It's
     161                 :      * meaningless in an active ProcState entry.
     162                 :      */
     163                 :     LocalTransactionId nextLXID;
     164                 : } ProcState;
     165                 : 
     166                 : /* Shared cache invalidation memory segment */
     167                 : typedef struct SISeg
     168                 : {
     169                 :     /*
     170                 :      * General state information
     171                 :      */
     172                 :     int         minMsgNum;      /* oldest message still needed */
     173                 :     int         maxMsgNum;      /* next message number to be assigned */
     174                 :     int         nextThreshold;  /* # of messages to call SICleanupQueue */
     175                 :     int         lastBackend;    /* index of last active procState entry, +1 */
     176                 :     int         maxBackends;    /* size of procState array */
     177                 : 
     178                 :     slock_t     msgnumLock;     /* spinlock protecting maxMsgNum */
     179                 : 
     180                 :     /*
     181                 :      * Circular buffer holding shared-inval messages
     182                 :      */
     183                 :     SharedInvalidationMessage buffer[MAXNUMMESSAGES];
     184                 : 
     185                 :     /*
     186                 :      * Per-backend invalidation state info (has MaxBackends entries).
     187                 :      */
     188                 :     ProcState   procState[FLEXIBLE_ARRAY_MEMBER];
     189                 : } SISeg;
     190                 : 
     191                 : static SISeg *shmInvalBuffer;   /* pointer to the shared inval buffer */
     192                 : 
     193                 : 
     194                 : static LocalTransactionId nextLocalTransactionId;
     195                 : 
     196                 : static void CleanupInvalidationState(int status, Datum arg);
     197                 : 
     198                 : 
     199                 : /*
     200                 :  * SInvalShmemSize --- return shared-memory space needed
     201                 :  */
     202                 : Size
     203 CBC        4564 : SInvalShmemSize(void)
     204                 : {
     205                 :     Size        size;
     206                 : 
     207            4564 :     size = offsetof(SISeg, procState);
     208                 : 
     209                 :     /*
     210                 :      * In Hot Standby mode, the startup process requests a procState array
     211                 :      * slot using InitRecoveryTransactionEnvironment(). Even though
     212                 :      * MaxBackends doesn't account for the startup process, it is guaranteed
     213                 :      * to get a free slot. This is because the autovacuum launcher and worker
     214                 :      * processes, which are included in MaxBackends, are not started in Hot
     215                 :      * Standby mode.
     216                 :      */
     217            4564 :     size = add_size(size, mul_size(sizeof(ProcState), MaxBackends));
     218                 : 
     219            4564 :     return size;
     220                 : }
     221                 : 
     222                 : /*
     223                 :  * CreateSharedInvalidationState
     224                 :  *      Create and initialize the SI message buffer
     225                 :  */
     226                 : void
     227            1826 : CreateSharedInvalidationState(void)
     228                 : {
     229                 :     int         i;
     230                 :     bool        found;
     231                 : 
     232                 :     /* Allocate space in shared memory */
     233            1826 :     shmInvalBuffer = (SISeg *)
     234            1826 :         ShmemInitStruct("shmInvalBuffer", SInvalShmemSize(), &found);
     235            1826 :     if (found)
     236 UBC           0 :         return;
     237                 : 
     238                 :     /* Clear message counters, save size of procState array, init spinlock */
     239 CBC        1826 :     shmInvalBuffer->minMsgNum = 0;
     240            1826 :     shmInvalBuffer->maxMsgNum = 0;
     241            1826 :     shmInvalBuffer->nextThreshold = CLEANUP_MIN;
     242            1826 :     shmInvalBuffer->lastBackend = 0;
     243            1826 :     shmInvalBuffer->maxBackends = MaxBackends;
     244            1826 :     SpinLockInit(&shmInvalBuffer->msgnumLock);
     245                 : 
     246                 :     /* The buffer[] array is initially all unused, so we need not fill it */
     247                 : 
     248                 :     /* Mark all backends inactive, and initialize nextLXID */
     249          193203 :     for (i = 0; i < shmInvalBuffer->maxBackends; i++)
     250                 :     {
     251          191377 :         shmInvalBuffer->procState[i].procPid = 0;    /* inactive */
     252          191377 :         shmInvalBuffer->procState[i].proc = NULL;
     253          191377 :         shmInvalBuffer->procState[i].nextMsgNum = 0; /* meaningless */
     254          191377 :         shmInvalBuffer->procState[i].resetState = false;
     255          191377 :         shmInvalBuffer->procState[i].signaled = false;
     256          191377 :         shmInvalBuffer->procState[i].hasMessages = false;
     257          191377 :         shmInvalBuffer->procState[i].nextLXID = InvalidLocalTransactionId;
     258                 :     }
     259                 : }
     260                 : 
     261                 : /*
     262                 :  * SharedInvalBackendInit
     263                 :  *      Initialize a new backend to operate on the sinval buffer
     264                 :  */
     265                 : void
     266           11571 : SharedInvalBackendInit(bool sendOnly)
     267                 : {
     268                 :     int         index;
     269           11571 :     ProcState  *stateP = NULL;
     270           11571 :     SISeg      *segP = shmInvalBuffer;
     271                 : 
     272                 :     /*
     273                 :      * This can run in parallel with read operations, but not with write
     274                 :      * operations, since SIInsertDataEntries relies on lastBackend to set
     275                 :      * hasMessages appropriately.
     276                 :      */
     277           11571 :     LWLockAcquire(SInvalWriteLock, LW_EXCLUSIVE);
     278                 : 
     279                 :     /* Look for a free entry in the procState array */
     280           44280 :     for (index = 0; index < segP->lastBackend; index++)
     281                 :     {
     282           34031 :         if (segP->procState[index].procPid == 0) /* inactive slot? */
     283                 :         {
     284            1322 :             stateP = &segP->procState[index];
     285            1322 :             break;
     286                 :         }
     287                 :     }
     288                 : 
     289           11571 :     if (stateP == NULL)
     290                 :     {
     291           10249 :         if (segP->lastBackend < segP->maxBackends)
     292                 :         {
     293           10249 :             stateP = &segP->procState[segP->lastBackend];
     294           10249 :             Assert(stateP->procPid == 0);
     295           10249 :             segP->lastBackend++;
     296                 :         }
     297                 :         else
     298                 :         {
     299                 :             /*
     300                 :              * out of procState slots: MaxBackends exceeded -- report normally
     301                 :              */
     302 UBC           0 :             MyBackendId = InvalidBackendId;
     303               0 :             LWLockRelease(SInvalWriteLock);
     304               0 :             ereport(FATAL,
     305                 :                     (errcode(ERRCODE_TOO_MANY_CONNECTIONS),
     306                 :                      errmsg("sorry, too many clients already")));
     307                 :         }
     308                 :     }
     309                 : 
     310 CBC       11571 :     MyBackendId = (stateP - &segP->procState[0]) + 1;
     311                 : 
     312                 :     /* Advertise assigned backend ID in MyProc */
     313           11571 :     MyProc->backendId = MyBackendId;
     314                 : 
     315                 :     /* Fetch next local transaction ID into local memory */
     316           11571 :     nextLocalTransactionId = stateP->nextLXID;
     317                 : 
     318                 :     /* mark myself active, with all extant messages already read */
     319           11571 :     stateP->procPid = MyProcPid;
     320           11571 :     stateP->proc = MyProc;
     321           11571 :     stateP->nextMsgNum = segP->maxMsgNum;
     322           11571 :     stateP->resetState = false;
     323           11571 :     stateP->signaled = false;
     324           11571 :     stateP->hasMessages = false;
     325           11571 :     stateP->sendOnly = sendOnly;
     326                 : 
     327           11571 :     LWLockRelease(SInvalWriteLock);
     328                 : 
     329                 :     /* register exit routine to mark my entry inactive at exit */
     330           11571 :     on_shmem_exit(CleanupInvalidationState, PointerGetDatum(segP));
     331                 : 
     332           11571 :     elog(DEBUG4, "my backend ID is %d", MyBackendId);
     333           11571 : }
     334                 : 
     335                 : /*
     336                 :  * CleanupInvalidationState
     337                 :  *      Mark the current backend as no longer active.
     338                 :  *
     339                 :  * This function is called via on_shmem_exit() during backend shutdown.
     340                 :  *
     341                 :  * arg is really of type "SISeg*".
     342                 :  */
     343                 : static void
     344           11571 : CleanupInvalidationState(int status, Datum arg)
     345                 : {
     346           11571 :     SISeg      *segP = (SISeg *) DatumGetPointer(arg);
     347                 :     ProcState  *stateP;
     348                 :     int         i;
     349                 : 
     350           11571 :     Assert(PointerIsValid(segP));
     351                 : 
     352           11571 :     LWLockAcquire(SInvalWriteLock, LW_EXCLUSIVE);
     353                 : 
     354           11571 :     stateP = &segP->procState[MyBackendId - 1];
     355                 : 
     356                 :     /* Update next local transaction ID for next holder of this backendID */
     357           11571 :     stateP->nextLXID = nextLocalTransactionId;
     358                 : 
     359                 :     /* Mark myself inactive */
     360           11571 :     stateP->procPid = 0;
     361           11571 :     stateP->proc = NULL;
     362           11571 :     stateP->nextMsgNum = 0;
     363           11571 :     stateP->resetState = false;
     364           11571 :     stateP->signaled = false;
     365                 : 
     366                 :     /* Recompute index of last active backend */
     367           21788 :     for (i = segP->lastBackend; i > 0; i--)
     368                 :     {
     369           20763 :         if (segP->procState[i - 1].procPid != 0)
     370           10546 :             break;
     371                 :     }
     372           11571 :     segP->lastBackend = i;
     373                 : 
     374           11571 :     LWLockRelease(SInvalWriteLock);
     375           11571 : }
     376                 : 
     377                 : /*
     378                 :  * BackendIdGetProc
     379                 :  *      Get the PGPROC structure for a backend, given the backend ID.
     380                 :  *      The result may be out of date arbitrarily quickly, so the caller
     381                 :  *      must be careful about how this information is used.  NULL is
     382                 :  *      returned if the backend is not active.
     383                 :  */
     384                 : PGPROC *
     385             619 : BackendIdGetProc(int backendID)
     386                 : {
     387             619 :     PGPROC     *result = NULL;
     388             619 :     SISeg      *segP = shmInvalBuffer;
     389                 : 
     390                 :     /* Need to lock out additions/removals of backends */
     391             619 :     LWLockAcquire(SInvalWriteLock, LW_SHARED);
     392                 : 
     393             619 :     if (backendID > 0 && backendID <= segP->lastBackend)
     394                 :     {
     395             615 :         ProcState  *stateP = &segP->procState[backendID - 1];
     396                 : 
     397             615 :         result = stateP->proc;
     398                 :     }
     399                 : 
     400             619 :     LWLockRelease(SInvalWriteLock);
     401                 : 
     402             619 :     return result;
     403                 : }
     404                 : 
     405                 : /*
     406                 :  * BackendIdGetTransactionIds
     407                 :  *      Get the xid, xmin, nsubxid and overflow status of the backend. The
     408                 :  *      result may be out of date arbitrarily quickly, so the caller must be
     409                 :  *      careful about how this information is used.
     410                 :  */
     411                 : void
     412 GNC        4284 : BackendIdGetTransactionIds(int backendID, TransactionId *xid,
     413                 :                            TransactionId *xmin, int *nsubxid, bool *overflowed)
     414                 : {
     415 GIC        4284 :     SISeg      *segP = shmInvalBuffer;
     416 ECB             : 
     417 GIC        4284 :     *xid = InvalidTransactionId;
     418 CBC        4284 :     *xmin = InvalidTransactionId;
     419 GNC        4284 :     *nsubxid = 0;
     420            4284 :     *overflowed = false;
     421 ECB             : 
     422                 :     /* Need to lock out additions/removals of backends */
     423 CBC        4284 :     LWLockAcquire(SInvalWriteLock, LW_SHARED);
     424                 : 
     425 GIC        4284 :     if (backendID > 0 && backendID <= segP->lastBackend)
     426 ECB             :     {
     427 GIC        2509 :         ProcState  *stateP = &segP->procState[backendID - 1];
     428 CBC        2509 :         PGPROC     *proc = stateP->proc;
     429                 : 
     430            2509 :         if (proc != NULL)
     431 ECB             :         {
     432 GIC        2509 :             *xid = proc->xid;
     433 CBC        2509 :             *xmin = proc->xmin;
     434 GNC        2509 :             *nsubxid = proc->subxidStatus.count;
     435            2509 :             *overflowed = proc->subxidStatus.overflowed;
     436                 :         }
     437 ECB             :     }
     438                 : 
     439 CBC        4284 :     LWLockRelease(SInvalWriteLock);
     440            4284 : }
     441                 : 
     442                 : /*
     443                 :  * SIInsertDataEntries
     444 ECB             :  *      Add new invalidation message(s) to the buffer.
     445                 :  */
     446                 : void
     447 GIC      584302 : SIInsertDataEntries(const SharedInvalidationMessage *data, int n)
     448                 : {
     449          584302 :     SISeg      *segP = shmInvalBuffer;
     450                 : 
     451                 :     /*
     452 ECB             :      * N can be arbitrarily large.  We divide the work into groups of no more
     453                 :      * than WRITE_QUANTUM messages, to be sure that we don't hold the lock for
     454                 :      * an unreasonably long time.  (This is not so much because we care about
     455                 :      * letting in other writers, as that some just-caught-up backend might be
     456                 :      * trying to do SICleanupQueue to pass on its signal, and we don't want it
     457                 :      * to have to wait a long time.)  Also, we need to consider calling
     458                 :      * SICleanupQueue every so often.
     459                 :      */
     460 GIC     1208774 :     while (n > 0)
     461                 :     {
     462          624472 :         int         nthistime = Min(n, WRITE_QUANTUM);
     463                 :         int         numMsgs;
     464                 :         int         max;
     465 ECB             :         int         i;
     466                 : 
     467 CBC      624472 :         n -= nthistime;
     468                 : 
     469 GIC      624472 :         LWLockAcquire(SInvalWriteLock, LW_EXCLUSIVE);
     470                 : 
     471                 :         /*
     472 ECB             :          * If the buffer is full, we *must* acquire some space.  Clean the
     473                 :          * queue and reset anyone who is preventing space from being freed.
     474                 :          * Otherwise, clean the queue only when it's exceeded the next
     475                 :          * fullness threshold.  We have to loop and recheck the buffer state
     476                 :          * after any call of SICleanupQueue.
     477                 :          */
     478                 :         for (;;)
     479                 :         {
     480 GIC      631644 :             numMsgs = segP->maxMsgNum - segP->minMsgNum;
     481          631644 :             if (numMsgs + nthistime > MAXNUMMESSAGES ||
     482          631462 :                 numMsgs >= segP->nextThreshold)
     483            7172 :                 SICleanupQueue(true, nthistime);
     484                 :             else
     485 ECB             :                 break;
     486                 :         }
     487                 : 
     488                 :         /*
     489                 :          * Insert new message(s) into proper slot of circular buffer
     490                 :          */
     491 GIC      624472 :         max = segP->maxMsgNum;
     492         6868815 :         while (nthistime-- > 0)
     493                 :         {
     494         6244343 :             segP->buffer[max % MAXNUMMESSAGES] = *data++;
     495         6244343 :             max++;
     496 ECB             :         }
     497                 : 
     498                 :         /* Update current value of maxMsgNum using spinlock */
     499 CBC      624472 :         SpinLockAcquire(&segP->msgnumLock);
     500          624472 :         segP->maxMsgNum = max;
     501 GIC      624472 :         SpinLockRelease(&segP->msgnumLock);
     502                 : 
     503                 :         /*
     504 ECB             :          * Now that the maxMsgNum change is globally visible, we give everyone
     505                 :          * a swift kick to make sure they read the newly added messages.
     506                 :          * Releasing SInvalWriteLock will enforce a full memory barrier, so
     507                 :          * these (unlocked) changes will be committed to memory before we exit
     508                 :          * the function.
     509                 :          */
     510 GIC     2608254 :         for (i = 0; i < segP->lastBackend; i++)
     511                 :         {
     512         1983782 :             ProcState  *stateP = &segP->procState[i];
     513                 : 
     514         1983782 :             stateP->hasMessages = true;
     515 ECB             :         }
     516                 : 
     517 CBC      624472 :         LWLockRelease(SInvalWriteLock);
     518                 :     }
     519          584302 : }
     520                 : 
     521                 : /*
     522 ECB             :  * SIGetDataEntries
     523                 :  *      get next SI message(s) for current backend, if there are any
     524                 :  *
     525                 :  * Possible return values:
     526                 :  *  0:   no SI message available
     527                 :  *  n>0: next n SI messages have been extracted into data[]
     528                 :  * -1:   SI reset message extracted
     529                 :  *
     530                 :  * If the return value is less than the array size "datasize", the caller
     531                 :  * can assume that there are no more SI messages after the one(s) returned.
     532                 :  * Otherwise, another call is needed to collect more messages.
     533                 :  *
     534                 :  * NB: this can run in parallel with other instances of SIGetDataEntries
     535                 :  * executing on behalf of other backends, since each instance will modify only
     536                 :  * fields of its own backend's ProcState, and no instance will look at fields
     537                 :  * of other backends' ProcStates.  We express this by grabbing SInvalReadLock
     538                 :  * in shared mode.  Note that this is not exactly the normal (read-only)
     539                 :  * interpretation of a shared lock! Look closely at the interactions before
     540                 :  * allowing SInvalReadLock to be grabbed in shared mode for any other reason!
     541                 :  *
     542                 :  * NB: this can also run in parallel with SIInsertDataEntries.  It is not
     543                 :  * guaranteed that we will return any messages added after the routine is
     544                 :  * entered.
     545                 :  *
     546                 :  * Note: we assume that "datasize" is not so large that it might be important
     547                 :  * to break our hold on SInvalReadLock into segments.
     548                 :  */
     549                 : int
     550 GIC    26828383 : SIGetDataEntries(SharedInvalidationMessage *data, int datasize)
     551                 : {
     552                 :     SISeg      *segP;
     553                 :     ProcState  *stateP;
     554                 :     int         max;
     555 ECB             :     int         n;
     556                 : 
     557 GIC    26828383 :     segP = shmInvalBuffer;
     558        26828383 :     stateP = &segP->procState[MyBackendId - 1];
     559                 : 
     560                 :     /*
     561                 :      * Before starting to take locks, do a quick, unlocked test to see whether
     562 ECB             :      * there can possibly be anything to read.  On a multiprocessor system,
     563                 :      * it's possible that this load could migrate backwards and occur before
     564                 :      * we actually enter this function, so we might miss a sinval message that
     565                 :      * was just added by some other processor.  But they can't migrate
     566                 :      * backwards over a preceding lock acquisition, so it should be OK.  If we
     567                 :      * haven't acquired a lock preventing against further relevant
     568                 :      * invalidations, any such occurrence is not much different than if the
     569                 :      * invalidation had arrived slightly later in the first place.
     570                 :      */
     571 GIC    26828383 :     if (!stateP->hasMessages)
     572        26026336 :         return 0;
     573                 : 
     574          802047 :     LWLockAcquire(SInvalReadLock, LW_SHARED);
     575                 : 
     576 ECB             :     /*
     577                 :      * We must reset hasMessages before determining how many messages we're
     578                 :      * going to read.  That way, if new messages arrive after we have
     579                 :      * determined how many we're reading, the flag will get reset and we'll
     580                 :      * notice those messages part-way through.
     581                 :      *
     582                 :      * Note that, if we don't end up reading all of the messages, we had
     583                 :      * better be certain to reset this flag before exiting!
     584                 :      */
     585 GIC      802047 :     stateP->hasMessages = false;
     586                 : 
     587                 :     /* Fetch current value of maxMsgNum using spinlock */
     588          802047 :     SpinLockAcquire(&segP->msgnumLock);
     589          802047 :     max = segP->maxMsgNum;
     590 CBC      802047 :     SpinLockRelease(&segP->msgnumLock);
     591                 : 
     592 GIC      802047 :     if (stateP->resetState)
     593 ECB             :     {
     594                 :         /*
     595                 :          * Force reset.  We can say we have dealt with any messages added
     596                 :          * since the reset, as well; and that means we should clear the
     597                 :          * signaled flag, too.
     598                 :          */
     599 GIC         217 :         stateP->nextMsgNum = max;
     600             217 :         stateP->resetState = false;
     601             217 :         stateP->signaled = false;
     602             217 :         LWLockRelease(SInvalReadLock);
     603             217 :         return -1;
     604 ECB             :     }
     605                 : 
     606                 :     /*
     607                 :      * Retrieve messages and advance backend's counter, until data array is
     608                 :      * full or there are no more messages.
     609                 :      *
     610                 :      * There may be other backends that haven't read the message(s), so we
     611                 :      * cannot delete them here.  SICleanupQueue() will eventually remove them
     612                 :      * from the queue.
     613                 :      */
     614 GIC      801830 :     n = 0;
     615        16747234 :     while (n < datasize && stateP->nextMsgNum < max)
     616                 :     {
     617        15945404 :         data[n++] = segP->buffer[stateP->nextMsgNum % MAXNUMMESSAGES];
     618        15945404 :         stateP->nextMsgNum++;
     619 ECB             :     }
     620                 : 
     621                 :     /*
     622                 :      * If we have caught up completely, reset our "signaled" flag so that
     623                 :      * we'll get another signal if we fall behind again.
     624                 :      *
     625                 :      * If we haven't caught up completely, reset the hasMessages flag so that
     626                 :      * we see the remaining messages next time.
     627                 :      */
     628 GIC      801830 :     if (stateP->nextMsgNum >= max)
     629          402408 :         stateP->signaled = false;
     630                 :     else
     631          399422 :         stateP->hasMessages = true;
     632                 : 
     633 CBC      801830 :     LWLockRelease(SInvalReadLock);
     634          801830 :     return n;
     635                 : }
     636 ECB             : 
     637                 : /*
     638                 :  * SICleanupQueue
     639                 :  *      Remove messages that have been consumed by all active backends
     640                 :  *
     641                 :  * callerHasWriteLock is true if caller is holding SInvalWriteLock.
     642                 :  * minFree is the minimum number of message slots to make free.
     643                 :  *
     644                 :  * Possible side effects of this routine include marking one or more
     645                 :  * backends as "reset" in the array, and sending PROCSIG_CATCHUP_INTERRUPT
     646                 :  * to some backend that seems to be getting too far behind.  We signal at
     647                 :  * most one backend at a time, for reasons explained at the top of the file.
     648                 :  *
     649                 :  * Caution: because we transiently release write lock when we have to signal
     650                 :  * some other backend, it is NOT guaranteed that there are still minFree
     651                 :  * free message slots at exit.  Caller must recheck and perhaps retry.
     652                 :  */
     653                 : void
     654 GIC        9820 : SICleanupQueue(bool callerHasWriteLock, int minFree)
     655                 : {
     656            9820 :     SISeg      *segP = shmInvalBuffer;
     657                 :     int         min,
     658                 :                 minsig,
     659 ECB             :                 lowbound,
     660                 :                 numMsgs,
     661                 :                 i;
     662 GIC        9820 :     ProcState  *needSig = NULL;
     663                 : 
     664                 :     /* Lock out all writers and readers */
     665            9820 :     if (!callerHasWriteLock)
     666            2648 :         LWLockAcquire(SInvalWriteLock, LW_EXCLUSIVE);
     667 CBC        9820 :     LWLockAcquire(SInvalReadLock, LW_EXCLUSIVE);
     668                 : 
     669                 :     /*
     670 ECB             :      * Recompute minMsgNum = minimum of all backends' nextMsgNum, identify the
     671                 :      * furthest-back backend that needs signaling (if any), and reset any
     672                 :      * backends that are too far back.  Note that because we ignore sendOnly
     673                 :      * backends here it is possible for them to keep sending messages without
     674                 :      * a problem even when they are the only active backend.
     675                 :      */
     676 GIC        9820 :     min = segP->maxMsgNum;
     677            9820 :     minsig = min - SIG_THRESHOLD;
     678            9820 :     lowbound = min - MAXNUMMESSAGES + minFree;
     679                 : 
     680           75566 :     for (i = 0; i < segP->lastBackend; i++)
     681 ECB             :     {
     682 CBC       65746 :         ProcState  *stateP = &segP->procState[i];
     683           65746 :         int         n = stateP->nextMsgNum;
     684                 : 
     685 ECB             :         /* Ignore if inactive or already in reset state */
     686 GIC       65746 :         if (stateP->procPid == 0 || stateP->resetState || stateP->sendOnly)
     687 CBC       20521 :             continue;
     688 ECB             : 
     689                 :         /*
     690                 :          * If we must free some space and this backend is preventing it, force
     691                 :          * him into reset state and then ignore until he catches up.
     692                 :          */
     693 GIC       45225 :         if (n < lowbound)
     694                 :         {
     695             218 :             stateP->resetState = true;
     696                 :             /* no point in signaling him ... */
     697             218 :             continue;
     698 ECB             :         }
     699                 : 
     700                 :         /* Track the global minimum nextMsgNum */
     701 GIC       45007 :         if (n < min)
     702 CBC       13470 :             min = n;
     703                 : 
     704                 :         /* Also see who's furthest back of the unsignaled backends */
     705 GIC       45007 :         if (n < minsig && !stateP->signaled)
     706 ECB             :         {
     707 CBC        2706 :             minsig = n;
     708 GIC        2706 :             needSig = stateP;
     709                 :         }
     710 ECB             :     }
     711 GIC        9820 :     segP->minMsgNum = min;
     712 ECB             : 
     713                 :     /*
     714                 :      * When minMsgNum gets really large, decrement all message counters so as
     715                 :      * to forestall overflow of the counters.  This happens seldom enough that
     716                 :      * folding it into the previous loop would be a loser.
     717                 :      */
     718 GIC        9820 :     if (min >= MSGNUMWRAPAROUND)
     719                 :     {
     720 UIC           0 :         segP->minMsgNum -= MSGNUMWRAPAROUND;
     721               0 :         segP->maxMsgNum -= MSGNUMWRAPAROUND;
     722               0 :         for (i = 0; i < segP->lastBackend; i++)
     723 ECB             :         {
     724                 :             /* we don't bother skipping inactive entries here */
     725 UBC           0 :             segP->procState[i].nextMsgNum -= MSGNUMWRAPAROUND;
     726 EUB             :         }
     727                 :     }
     728                 : 
     729                 :     /*
     730                 :      * Determine how many messages are still in the queue, and set the
     731                 :      * threshold at which we should repeat SICleanupQueue().
     732                 :      */
     733 GIC        9820 :     numMsgs = segP->maxMsgNum - segP->minMsgNum;
     734            9820 :     if (numMsgs < CLEANUP_MIN)
     735            3429 :         segP->nextThreshold = CLEANUP_MIN;
     736                 :     else
     737            6391 :         segP->nextThreshold = (numMsgs / CLEANUP_QUANTUM + 1) * CLEANUP_QUANTUM;
     738 ECB             : 
     739                 :     /*
     740                 :      * Lastly, signal anyone who needs a catchup interrupt.  Since
     741                 :      * SendProcSignal() might not be fast, we don't want to hold locks while
     742                 :      * executing it.
     743                 :      */
     744 GIC        9820 :     if (needSig)
     745                 :     {
     746            2660 :         pid_t       his_pid = needSig->procPid;
     747            2660 :         BackendId   his_backendId = (needSig - &segP->procState[0]) + 1;
     748                 : 
     749 CBC        2660 :         needSig->signaled = true;
     750 GIC        2660 :         LWLockRelease(SInvalReadLock);
     751 CBC        2660 :         LWLockRelease(SInvalWriteLock);
     752            2660 :         elog(DEBUG4, "sending sinval catchup signal to PID %d", (int) his_pid);
     753 GIC        2660 :         SendProcSignal(his_pid, PROCSIG_CATCHUP_INTERRUPT, his_backendId);
     754 CBC        2660 :         if (callerHasWriteLock)
     755            1980 :             LWLockAcquire(SInvalWriteLock, LW_EXCLUSIVE);
     756 ECB             :     }
     757                 :     else
     758                 :     {
     759 CBC        7160 :         LWLockRelease(SInvalReadLock);
     760            7160 :         if (!callerHasWriteLock)
     761 GIC        1968 :             LWLockRelease(SInvalWriteLock);
     762                 :     }
     763            9820 : }
     764 ECB             : 
     765                 : 
     766                 : /*
     767                 :  * GetNextLocalTransactionId --- allocate a new LocalTransactionId
     768                 :  *
     769                 :  * We split VirtualTransactionIds into two parts so that it is possible
     770                 :  * to allocate a new one without any contention for shared memory, except
     771                 :  * for a bit of additional overhead during backend startup/shutdown.
     772                 :  * The high-order part of a VirtualTransactionId is a BackendId, and the
     773                 :  * low-order part is a LocalTransactionId, which we assign from a local
     774                 :  * counter.  To avoid the risk of a VirtualTransactionId being reused
     775                 :  * within a short interval, successive procs occupying the same backend ID
     776                 :  * slot should use a consecutive sequence of local IDs, which is implemented
     777                 :  * by copying nextLocalTransactionId as seen above.
     778                 :  */
     779                 : LocalTransactionId
     780 GIC      486240 : GetNextLocalTransactionId(void)
     781                 : {
     782                 :     LocalTransactionId result;
     783                 : 
     784                 :     /* loop to avoid returning InvalidLocalTransactionId at wraparound */
     785 ECB             :     do
     786                 :     {
     787 GIC      488487 :         result = nextLocalTransactionId++;
     788          488487 :     } while (!LocalTransactionIdIsValid(result));
     789                 : 
     790          486240 :     return result;
     791                 : }
        

Generated by: LCOV version v1.16-55-g56c0a2a