Age Owner Branch data TLA Line data Source code
1 : : /*-------------------------------------------------------------------------
2 : : *
3 : : * sinvaladt.c
4 : : * POSTGRES shared cache invalidation data manager.
5 : : *
6 : : * Portions Copyright (c) 1996-2024, PostgreSQL Global Development Group
7 : : * Portions Copyright (c) 1994, Regents of the University of California
8 : : *
9 : : *
10 : : * IDENTIFICATION
11 : : * src/backend/storage/ipc/sinvaladt.c
12 : : *
13 : : *-------------------------------------------------------------------------
14 : : */
15 : : #include "postgres.h"
16 : :
17 : : #include <signal.h>
18 : : #include <unistd.h>
19 : :
20 : : #include "access/transam.h"
21 : : #include "miscadmin.h"
22 : : #include "storage/ipc.h"
23 : : #include "storage/proc.h"
24 : : #include "storage/procnumber.h"
25 : : #include "storage/procsignal.h"
26 : : #include "storage/shmem.h"
27 : : #include "storage/sinvaladt.h"
28 : : #include "storage/spin.h"
29 : :
30 : : /*
31 : : * Conceptually, the shared cache invalidation messages are stored in an
32 : : * infinite array, where maxMsgNum is the next array subscript to store a
33 : : * submitted message in, minMsgNum is the smallest array subscript containing
34 : : * a message not yet read by all backends, and we always have maxMsgNum >=
35 : : * minMsgNum. (They are equal when there are no messages pending.) For each
36 : : * active backend, there is a nextMsgNum pointer indicating the next message it
37 : : * needs to read; we have maxMsgNum >= nextMsgNum >= minMsgNum for every
38 : : * backend.
39 : : *
40 : : * (In the current implementation, minMsgNum is a lower bound for the
41 : : * per-process nextMsgNum values, but it isn't rigorously kept equal to the
42 : : * smallest nextMsgNum --- it may lag behind. We only update it when
43 : : * SICleanupQueue is called, and we try not to do that often.)
44 : : *
45 : : * In reality, the messages are stored in a circular buffer of MAXNUMMESSAGES
46 : : * entries. We translate MsgNum values into circular-buffer indexes by
47 : : * computing MsgNum % MAXNUMMESSAGES (this should be fast as long as
48 : : * MAXNUMMESSAGES is a constant and a power of 2). As long as maxMsgNum
49 : : * doesn't exceed minMsgNum by more than MAXNUMMESSAGES, we have enough space
50 : : * in the buffer. If the buffer does overflow, we recover by setting the
51 : : * "reset" flag for each backend that has fallen too far behind. A backend
52 : : * that is in "reset" state is ignored while determining minMsgNum. When
53 : : * it does finally attempt to receive inval messages, it must discard all
54 : : * its invalidatable state, since it won't know what it missed.
55 : : *
56 : : * To reduce the probability of needing resets, we send a "catchup" interrupt
57 : : * to any backend that seems to be falling unreasonably far behind. The
58 : : * normal behavior is that at most one such interrupt is in flight at a time;
59 : : * when a backend completes processing a catchup interrupt, it executes
60 : : * SICleanupQueue, which will signal the next-furthest-behind backend if
61 : : * needed. This avoids undue contention from multiple backends all trying
62 : : * to catch up at once. However, the furthest-back backend might be stuck
63 : : * in a state where it can't catch up. Eventually it will get reset, so it
64 : : * won't cause any more problems for anyone but itself. But we don't want
65 : : * to find that a bunch of other backends are now too close to the reset
66 : : * threshold to be saved. So SICleanupQueue is designed to occasionally
67 : : * send extra catchup interrupts as the queue gets fuller, to backends that
68 : : * are far behind and haven't gotten one yet. As long as there aren't a lot
69 : : * of "stuck" backends, we won't need a lot of extra interrupts, since ones
70 : : * that aren't stuck will propagate their interrupts to the next guy.
71 : : *
72 : : * We would have problems if the MsgNum values overflow an integer, so
73 : : * whenever minMsgNum exceeds MSGNUMWRAPAROUND, we subtract MSGNUMWRAPAROUND
74 : : * from all the MsgNum variables simultaneously. MSGNUMWRAPAROUND can be
75 : : * large so that we don't need to do this often. It must be a multiple of
76 : : * MAXNUMMESSAGES so that the existing circular-buffer entries don't need
77 : : * to be moved when we do it.
78 : : *
79 : : * Access to the shared sinval array is protected by two locks, SInvalReadLock
80 : : * and SInvalWriteLock. Readers take SInvalReadLock in shared mode; this
81 : : * authorizes them to modify their own ProcState but not to modify or even
82 : : * look at anyone else's. When we need to perform array-wide updates,
83 : : * such as in SICleanupQueue, we take SInvalReadLock in exclusive mode to
84 : : * lock out all readers. Writers take SInvalWriteLock (always in exclusive
85 : : * mode) to serialize adding messages to the queue. Note that a writer
86 : : * can operate in parallel with one or more readers, because the writer
87 : : * has no need to touch anyone's ProcState, except in the infrequent cases
88 : : * when SICleanupQueue is needed. The only point of overlap is that
89 : : * the writer wants to change maxMsgNum while readers need to read it.
90 : : * We deal with that by having a spinlock that readers must take for just
91 : : * long enough to read maxMsgNum, while writers take it for just long enough
92 : : * to write maxMsgNum. (The exact rule is that you need the spinlock to
93 : : * read maxMsgNum if you are not holding SInvalWriteLock, and you need the
94 : : * spinlock to write maxMsgNum unless you are holding both locks.)
95 : : *
96 : : * Note: since maxMsgNum is an int and hence presumably atomically readable/
97 : : * writable, the spinlock might seem unnecessary. The reason it is needed
98 : : * is to provide a memory barrier: we need to be sure that messages written
99 : : * to the array are actually there before maxMsgNum is increased, and that
100 : : * readers will see that data after fetching maxMsgNum. Multiprocessors
101 : : * that have weak memory-ordering guarantees can fail without the memory
102 : : * barrier instructions that are included in the spinlock sequences.
103 : : */
104 : :
105 : :
106 : : /*
107 : : * Configurable parameters.
108 : : *
109 : : * MAXNUMMESSAGES: max number of shared-inval messages we can buffer.
110 : : * Must be a power of 2 for speed.
111 : : *
112 : : * MSGNUMWRAPAROUND: how often to reduce MsgNum variables to avoid overflow.
113 : : * Must be a multiple of MAXNUMMESSAGES. Should be large.
114 : : *
115 : : * CLEANUP_MIN: the minimum number of messages that must be in the buffer
116 : : * before we bother to call SICleanupQueue.
117 : : *
118 : : * CLEANUP_QUANTUM: how often (in messages) to call SICleanupQueue once
119 : : * we exceed CLEANUP_MIN. Should be a power of 2 for speed.
120 : : *
121 : : * SIG_THRESHOLD: the minimum number of messages a backend must have fallen
122 : : * behind before we'll send it PROCSIG_CATCHUP_INTERRUPT.
123 : : *
124 : : * WRITE_QUANTUM: the max number of messages to push into the buffer per
125 : : * iteration of SIInsertDataEntries. Noncritical but should be less than
126 : : * CLEANUP_QUANTUM, because we only consider calling SICleanupQueue once
127 : : * per iteration.
128 : : */
129 : :
130 : : #define MAXNUMMESSAGES 4096
131 : : #define MSGNUMWRAPAROUND (MAXNUMMESSAGES * 262144)
132 : : #define CLEANUP_MIN (MAXNUMMESSAGES / 2)
133 : : #define CLEANUP_QUANTUM (MAXNUMMESSAGES / 16)
134 : : #define SIG_THRESHOLD (MAXNUMMESSAGES / 2)
135 : : #define WRITE_QUANTUM 64
136 : :
137 : : /* Per-backend state in shared invalidation structure */
138 : : typedef struct ProcState
139 : : {
140 : : /* procPid is zero in an inactive ProcState array entry. */
141 : : pid_t procPid; /* PID of backend, for signaling */
142 : : /* nextMsgNum is meaningless if procPid == 0 or resetState is true. */
143 : : int nextMsgNum; /* next message number to read */
144 : : bool resetState; /* backend needs to reset its state */
145 : : bool signaled; /* backend has been sent catchup signal */
146 : : bool hasMessages; /* backend has unread messages */
147 : :
148 : : /*
149 : : * Backend only sends invalidations, never receives them. This only makes
150 : : * sense for Startup process during recovery because it doesn't maintain a
151 : : * relcache, yet it fires inval messages to allow query backends to see
152 : : * schema changes.
153 : : */
154 : : bool sendOnly; /* backend only sends, never receives */
155 : :
156 : : /*
157 : : * Next LocalTransactionId to use for each idle backend slot. We keep
158 : : * this here because it is indexed by ProcNumber and it is convenient to
159 : : * copy the value to and from local memory when MyProcNumber is set. It's
160 : : * meaningless in an active ProcState entry.
161 : : */
162 : : LocalTransactionId nextLXID;
163 : : } ProcState;
164 : :
165 : : /* Shared cache invalidation memory segment */
166 : : typedef struct SISeg
167 : : {
168 : : /*
169 : : * General state information
170 : : */
171 : : int minMsgNum; /* oldest message still needed */
172 : : int maxMsgNum; /* next message number to be assigned */
173 : : int nextThreshold; /* # of messages to call SICleanupQueue */
174 : :
175 : : slock_t msgnumLock; /* spinlock protecting maxMsgNum */
176 : :
177 : : /*
178 : : * Circular buffer holding shared-inval messages
179 : : */
180 : : SharedInvalidationMessage buffer[MAXNUMMESSAGES];
181 : :
182 : : /*
183 : : * Per-backend invalidation state info.
184 : : *
185 : : * 'procState' has NumProcStateSlots entries, and is indexed by pgprocno.
186 : : * 'numProcs' is the number of slots currently in use, and 'pgprocnos' is
187 : : * a dense array of their indexes, to speed up scanning all in-use slots.
188 : : *
189 : : * 'pgprocnos' is largely redundant with ProcArrayStruct->pgprocnos, but
190 : : * having our separate copy avoids contention on ProcArrayLock, and allows
191 : : * us to track only the processes that participate in shared cache
192 : : * invalidations.
193 : : */
194 : : int numProcs;
195 : : int *pgprocnos;
196 : : ProcState procState[FLEXIBLE_ARRAY_MEMBER];
197 : : } SISeg;
198 : :
199 : : /*
200 : : * We reserve a slot for each possible ProcNumber, plus one for each
201 : : * possible auxiliary process type. (This scheme assumes there is not
202 : : * more than one of any auxiliary process type at a time.)
203 : : */
204 : : #define NumProcStateSlots (MaxBackends + NUM_AUXILIARY_PROCS)
205 : :
206 : : static SISeg *shmInvalBuffer; /* pointer to the shared inval buffer */
207 : :
208 : :
209 : : static LocalTransactionId nextLocalTransactionId;
210 : :
211 : : static void CleanupInvalidationState(int status, Datum arg);
212 : :
213 : :
214 : : /*
215 : : * SInvalShmemSize --- return shared-memory space needed
216 : : */
217 : : Size
6812 tgl@sss.pgh.pa.us 218 :CBC 2577 : SInvalShmemSize(void)
219 : : {
220 : : Size size;
221 : :
222 : 2577 : size = offsetof(SISeg, procState);
42 heikki.linnakangas@i 223 :GNC 2577 : size = add_size(size, mul_size(sizeof(ProcState), NumProcStateSlots)); /* procState */
224 : 2577 : size = add_size(size, mul_size(sizeof(int), NumProcStateSlots)); /* pgprocnos */
225 : :
6812 tgl@sss.pgh.pa.us 226 :CBC 2577 : return size;
227 : : }
228 : :
229 : : /*
230 : : * CreateSharedInvalidationState
231 : : * Create and initialize the SI message buffer
232 : : */
233 : : void
5873 alvherre@alvh.no-ip. 234 : 898 : CreateSharedInvalidationState(void)
235 : : {
236 : : int i;
237 : : bool found;
238 : :
239 : : /* Allocate space in shared memory */
240 : 898 : shmInvalBuffer = (SISeg *)
3340 tgl@sss.pgh.pa.us 241 : 898 : ShmemInitStruct("shmInvalBuffer", SInvalShmemSize(), &found);
7421 bruce@momjian.us 242 [ - + ]: 898 : if (found)
7421 bruce@momjian.us 243 :UBC 0 : return;
244 : :
245 : : /* Clear message counters, save size of procState array, init spinlock */
5873 alvherre@alvh.no-ip. 246 :CBC 898 : shmInvalBuffer->minMsgNum = 0;
247 : 898 : shmInvalBuffer->maxMsgNum = 0;
5778 tgl@sss.pgh.pa.us 248 : 898 : shmInvalBuffer->nextThreshold = CLEANUP_MIN;
5777 249 : 898 : SpinLockInit(&shmInvalBuffer->msgnumLock);
250 : :
251 : : /* The buffer[] array is initially all unused, so we need not fill it */
252 : :
253 : : /* Mark all backends inactive, and initialize nextLXID */
42 heikki.linnakangas@i 254 [ + + ]:GNC 80720 : for (i = 0; i < NumProcStateSlots; i++)
255 : : {
2489 tgl@sss.pgh.pa.us 256 :CBC 79822 : shmInvalBuffer->procState[i].procPid = 0; /* inactive */
5421 bruce@momjian.us 257 : 79822 : shmInvalBuffer->procState[i].nextMsgNum = 0; /* meaningless */
5873 alvherre@alvh.no-ip. 258 : 79822 : shmInvalBuffer->procState[i].resetState = false;
5778 tgl@sss.pgh.pa.us 259 : 79822 : shmInvalBuffer->procState[i].signaled = false;
4643 rhaas@postgresql.org 260 : 79822 : shmInvalBuffer->procState[i].hasMessages = false;
5778 tgl@sss.pgh.pa.us 261 : 79822 : shmInvalBuffer->procState[i].nextLXID = InvalidLocalTransactionId;
262 : : }
42 heikki.linnakangas@i 263 :GNC 898 : shmInvalBuffer->numProcs = 0;
264 : 898 : shmInvalBuffer->pgprocnos = (int *) &shmInvalBuffer->procState[i];
265 : : }
266 : :
267 : : /*
268 : : * SharedInvalBackendInit
269 : : * Initialize a new backend to operate on the sinval buffer
270 : : */
271 : : void
5230 simon@2ndQuadrant.co 272 :CBC 16481 : SharedInvalBackendInit(bool sendOnly)
273 : : {
274 : : ProcState *stateP;
275 : : pid_t oldPid;
5873 alvherre@alvh.no-ip. 276 : 16481 : SISeg *segP = shmInvalBuffer;
277 : :
42 heikki.linnakangas@i 278 [ - + ]:GNC 16481 : if (MyProcNumber < 0)
42 heikki.linnakangas@i 279 [ # # ]:UNC 0 : elog(ERROR, "MyProcNumber not set");
42 heikki.linnakangas@i 280 [ - + ]:GNC 16481 : if (MyProcNumber >= NumProcStateSlots)
42 heikki.linnakangas@i 281 [ # # ]:UNC 0 : elog(PANIC, "unexpected MyProcNumber %d in SharedInvalBackendInit (max %d)",
282 : : MyProcNumber, NumProcStateSlots);
42 heikki.linnakangas@i 283 :GNC 16481 : stateP = &segP->procState[MyProcNumber];
284 : :
285 : : /*
286 : : * This can run in parallel with read operations, but not with write
287 : : * operations, since SIInsertDataEntries relies on the pgprocnos array to
288 : : * set hasMessages appropriately.
289 : : */
5778 tgl@sss.pgh.pa.us 290 :CBC 16481 : LWLockAcquire(SInvalWriteLock, LW_EXCLUSIVE);
291 : :
42 heikki.linnakangas@i 292 :GNC 16481 : oldPid = stateP->procPid;
293 [ - + ]: 16481 : if (oldPid != 0)
294 : : {
42 heikki.linnakangas@i 295 :UNC 0 : LWLockRelease(SInvalWriteLock);
296 [ # # ]: 0 : elog(ERROR, "sinval slot for backend %d is already in use by process %d",
297 : : MyProcNumber, (int) oldPid);
298 : : }
299 : :
42 heikki.linnakangas@i 300 :GNC 16481 : shmInvalBuffer->pgprocnos[shmInvalBuffer->numProcs++] = MyProcNumber;
301 : :
302 : : /* Fetch next local transaction ID into local memory */
5778 tgl@sss.pgh.pa.us 303 :CBC 16481 : nextLocalTransactionId = stateP->nextLXID;
304 : :
305 : : /* mark myself active, with all extant messages already read */
5605 heikki.linnakangas@i 306 : 16481 : stateP->procPid = MyProcPid;
8987 tgl@sss.pgh.pa.us 307 : 16481 : stateP->nextMsgNum = segP->maxMsgNum;
8969 308 : 16481 : stateP->resetState = false;
5778 309 : 16481 : stateP->signaled = false;
4643 rhaas@postgresql.org 310 : 16481 : stateP->hasMessages = false;
5230 simon@2ndQuadrant.co 311 : 16481 : stateP->sendOnly = sendOnly;
312 : :
5778 tgl@sss.pgh.pa.us 313 : 16481 : LWLockRelease(SInvalWriteLock);
314 : :
315 : : /* register exit routine to mark my entry inactive at exit */
8595 peter_e@gmx.net 316 : 16481 : on_shmem_exit(CleanupInvalidationState, PointerGetDatum(segP));
9716 bruce@momjian.us 317 : 16481 : }
318 : :
319 : : /*
320 : : * CleanupInvalidationState
321 : : * Mark the current backend as no longer active.
322 : : *
323 : : * This function is called via on_shmem_exit() during backend shutdown.
324 : : *
325 : : * arg is really of type "SISeg*".
326 : : */
327 : : static void
8595 peter_e@gmx.net 328 : 15880 : CleanupInvalidationState(int status, Datum arg)
329 : : {
8554 tgl@sss.pgh.pa.us 330 : 15880 : SISeg *segP = (SISeg *) DatumGetPointer(arg);
331 : : ProcState *stateP;
332 : : int i;
333 : :
8987 334 [ - + ]: 15880 : Assert(PointerIsValid(segP));
335 : :
5778 336 : 15880 : LWLockAcquire(SInvalWriteLock, LW_EXCLUSIVE);
337 : :
42 heikki.linnakangas@i 338 :GNC 15880 : stateP = &segP->procState[MyProcNumber];
339 : :
340 : : /* Update next local transaction ID for next holder of this proc number */
5778 tgl@sss.pgh.pa.us 341 :CBC 15880 : stateP->nextLXID = nextLocalTransactionId;
342 : :
343 : : /* Mark myself inactive */
5605 heikki.linnakangas@i 344 : 15880 : stateP->procPid = 0;
5778 tgl@sss.pgh.pa.us 345 : 15880 : stateP->nextMsgNum = 0;
346 : 15880 : stateP->resetState = false;
347 : 15880 : stateP->signaled = false;
348 : :
42 heikki.linnakangas@i 349 [ + - ]:GNC 21730 : for (i = segP->numProcs - 1; i >= 0; i--)
350 : : {
351 [ + + ]: 21730 : if (segP->pgprocnos[i] == MyProcNumber)
352 : : {
353 [ + + ]: 15880 : if (i != segP->numProcs - 1)
354 : 2673 : segP->pgprocnos[i] = segP->pgprocnos[segP->numProcs - 1];
355 : 15880 : break;
356 : : }
357 : : }
358 [ - + ]: 15880 : if (i < 0)
42 heikki.linnakangas@i 359 [ # # ]:UNC 0 : elog(PANIC, "could not find entry in sinval array");
42 heikki.linnakangas@i 360 :GNC 15880 : segP->numProcs--;
361 : :
3701 rhaas@postgresql.org 362 :CBC 15880 : LWLockRelease(SInvalWriteLock);
363 : 15880 : }
364 : :
365 : : /*
366 : : * SIInsertDataEntries
367 : : * Add new invalidation message(s) to the buffer.
368 : : */
369 : : void
5778 tgl@sss.pgh.pa.us 370 : 254739 : SIInsertDataEntries(const SharedInvalidationMessage *data, int n)
371 : : {
372 : 254739 : SISeg *segP = shmInvalBuffer;
373 : :
374 : : /*
375 : : * N can be arbitrarily large. We divide the work into groups of no more
376 : : * than WRITE_QUANTUM messages, to be sure that we don't hold the lock for
377 : : * an unreasonably long time. (This is not so much because we care about
378 : : * letting in other writers, as that some just-caught-up backend might be
379 : : * trying to do SICleanupQueue to pass on its signal, and we don't want it
380 : : * to have to wait a long time.) Also, we need to consider calling
381 : : * SICleanupQueue every so often.
382 : : */
383 [ + + ]: 532166 : while (n > 0)
384 : : {
5421 bruce@momjian.us 385 : 277427 : int nthistime = Min(n, WRITE_QUANTUM);
386 : : int numMsgs;
387 : : int max;
388 : : int i;
389 : :
5778 tgl@sss.pgh.pa.us 390 : 277427 : n -= nthistime;
391 : :
392 : 277427 : LWLockAcquire(SInvalWriteLock, LW_EXCLUSIVE);
393 : :
394 : : /*
395 : : * If the buffer is full, we *must* acquire some space. Clean the
396 : : * queue and reset anyone who is preventing space from being freed.
397 : : * Otherwise, clean the queue only when it's exceeded the next
398 : : * fullness threshold. We have to loop and recheck the buffer state
399 : : * after any call of SICleanupQueue.
400 : : */
401 : : for (;;)
402 : : {
5749 403 : 282317 : numMsgs = segP->maxMsgNum - segP->minMsgNum;
404 [ + + ]: 282317 : if (numMsgs + nthistime > MAXNUMMESSAGES ||
405 [ + + ]: 282161 : numMsgs >= segP->nextThreshold)
406 : 4890 : SICleanupQueue(true, nthistime);
407 : : else
408 : : break;
409 : : }
410 : :
411 : : /*
412 : : * Insert new message(s) into proper slot of circular buffer
413 : : */
5777 414 : 277427 : max = segP->maxMsgNum;
5778 415 [ + + ]: 3583813 : while (nthistime-- > 0)
416 : : {
5777 417 : 3306386 : segP->buffer[max % MAXNUMMESSAGES] = *data++;
418 : 3306386 : max++;
419 : : }
420 : :
421 : : /* Update current value of maxMsgNum using spinlock */
3103 rhaas@postgresql.org 422 [ + + ]: 277427 : SpinLockAcquire(&segP->msgnumLock);
423 : 277427 : segP->maxMsgNum = max;
424 : 277427 : SpinLockRelease(&segP->msgnumLock);
425 : :
426 : : /*
427 : : * Now that the maxMsgNum change is globally visible, we give everyone
428 : : * a swift kick to make sure they read the newly added messages.
429 : : * Releasing SInvalWriteLock will enforce a full memory barrier, so
430 : : * these (unlocked) changes will be committed to memory before we exit
431 : : * the function.
432 : : */
42 heikki.linnakangas@i 433 [ + + ]:GNC 1621197 : for (i = 0; i < segP->numProcs; i++)
434 : : {
435 : 1343770 : ProcState *stateP = &segP->procState[segP->pgprocnos[i]];
436 : :
4640 rhaas@postgresql.org 437 :CBC 1343770 : stateP->hasMessages = true;
438 : : }
439 : :
5778 tgl@sss.pgh.pa.us 440 : 277427 : LWLockRelease(SInvalWriteLock);
441 : : }
10141 scrappy@hub.org 442 : 254739 : }
443 : :
444 : : /*
445 : : * SIGetDataEntries
446 : : * get next SI message(s) for current backend, if there are any
447 : : *
448 : : * Possible return values:
449 : : * 0: no SI message available
450 : : * n>0: next n SI messages have been extracted into data[]
451 : : * -1: SI reset message extracted
452 : : *
453 : : * If the return value is less than the array size "datasize", the caller
454 : : * can assume that there are no more SI messages after the one(s) returned.
455 : : * Otherwise, another call is needed to collect more messages.
456 : : *
457 : : * NB: this can run in parallel with other instances of SIGetDataEntries
458 : : * executing on behalf of other backends, since each instance will modify only
459 : : * fields of its own backend's ProcState, and no instance will look at fields
460 : : * of other backends' ProcStates. We express this by grabbing SInvalReadLock
461 : : * in shared mode. Note that this is not exactly the normal (read-only)
462 : : * interpretation of a shared lock! Look closely at the interactions before
463 : : * allowing SInvalReadLock to be grabbed in shared mode for any other reason!
464 : : *
465 : : * NB: this can also run in parallel with SIInsertDataEntries. It is not
466 : : * guaranteed that we will return any messages added after the routine is
467 : : * entered.
468 : : *
469 : : * Note: we assume that "datasize" is not so large that it might be important
470 : : * to break our hold on SInvalReadLock into segments.
471 : : */
472 : : int
5778 tgl@sss.pgh.pa.us 473 : 16289491 : SIGetDataEntries(SharedInvalidationMessage *data, int datasize)
474 : : {
475 : : SISeg *segP;
476 : : ProcState *stateP;
477 : : int max;
478 : : int n;
479 : :
5873 alvherre@alvh.no-ip. 480 : 16289491 : segP = shmInvalBuffer;
42 heikki.linnakangas@i 481 :GNC 16289491 : stateP = &segP->procState[MyProcNumber];
482 : :
483 : : /*
484 : : * Before starting to take locks, do a quick, unlocked test to see whether
485 : : * there can possibly be anything to read. On a multiprocessor system,
486 : : * it's possible that this load could migrate backwards and occur before
487 : : * we actually enter this function, so we might miss a sinval message that
488 : : * was just added by some other processor. But they can't migrate
489 : : * backwards over a preceding lock acquisition, so it should be OK. If we
490 : : * haven't acquired a lock preventing against further relevant
491 : : * invalidations, any such occurrence is not much different than if the
492 : : * invalidation had arrived slightly later in the first place.
493 : : */
4643 rhaas@postgresql.org 494 [ + + ]:CBC 16289491 : if (!stateP->hasMessages)
495 : 15680315 : return 0;
496 : :
497 : 609176 : LWLockAcquire(SInvalReadLock, LW_SHARED);
498 : :
499 : : /*
500 : : * We must reset hasMessages before determining how many messages we're
501 : : * going to read. That way, if new messages arrive after we have
502 : : * determined how many we're reading, the flag will get reset and we'll
503 : : * notice those messages part-way through.
504 : : *
505 : : * Note that, if we don't end up reading all of the messages, we had
506 : : * better be certain to reset this flag before exiting!
507 : : */
4640 508 : 609176 : stateP->hasMessages = false;
509 : :
510 : : /* Fetch current value of maxMsgNum using spinlock */
3103 511 [ + + ]: 609176 : SpinLockAcquire(&segP->msgnumLock);
512 : 609176 : max = segP->maxMsgNum;
513 : 609176 : SpinLockRelease(&segP->msgnumLock);
514 : :
8987 tgl@sss.pgh.pa.us 515 [ + + ]: 609176 : if (stateP->resetState)
516 : : {
517 : : /*
518 : : * Force reset. We can say we have dealt with any messages added
519 : : * since the reset, as well; and that means we should clear the
520 : : * signaled flag, too.
521 : : */
5777 522 : 195 : stateP->nextMsgNum = max;
5778 523 : 195 : stateP->resetState = false;
524 : 195 : stateP->signaled = false;
525 : 195 : LWLockRelease(SInvalReadLock);
8989 526 : 195 : return -1;
527 : : }
528 : :
529 : : /*
530 : : * Retrieve messages and advance backend's counter, until data array is
531 : : * full or there are no more messages.
532 : : *
533 : : * There may be other backends that haven't read the message(s), so we
534 : : * cannot delete them here. SICleanupQueue() will eventually remove them
535 : : * from the queue.
536 : : */
5778 537 : 608981 : n = 0;
5777 538 [ + + + + ]: 15137374 : while (n < datasize && stateP->nextMsgNum < max)
539 : : {
5778 540 : 14528393 : data[n++] = segP->buffer[stateP->nextMsgNum % MAXNUMMESSAGES];
541 : 14528393 : stateP->nextMsgNum++;
542 : : }
543 : :
544 : : /*
545 : : * If we have caught up completely, reset our "signaled" flag so that
546 : : * we'll get another signal if we fall behind again.
547 : : *
548 : : * If we haven't caught up completely, reset the hasMessages flag so that
549 : : * we see the remaining messages next time.
550 : : */
5777 551 [ + + ]: 608981 : if (stateP->nextMsgNum >= max)
5778 552 : 224077 : stateP->signaled = false;
553 : : else
4640 rhaas@postgresql.org 554 : 384904 : stateP->hasMessages = true;
555 : :
5778 tgl@sss.pgh.pa.us 556 : 608981 : LWLockRelease(SInvalReadLock);
557 : 608981 : return n;
558 : : }
559 : :
560 : : /*
561 : : * SICleanupQueue
562 : : * Remove messages that have been consumed by all active backends
563 : : *
564 : : * callerHasWriteLock is true if caller is holding SInvalWriteLock.
565 : : * minFree is the minimum number of message slots to make free.
566 : : *
567 : : * Possible side effects of this routine include marking one or more
568 : : * backends as "reset" in the array, and sending PROCSIG_CATCHUP_INTERRUPT
569 : : * to some backend that seems to be getting too far behind. We signal at
570 : : * most one backend at a time, for reasons explained at the top of the file.
571 : : *
572 : : * Caution: because we transiently release write lock when we have to signal
573 : : * some other backend, it is NOT guaranteed that there are still minFree
574 : : * free message slots at exit. Caller must recheck and perhaps retry.
575 : : */
576 : : void
577 : 7174 : SICleanupQueue(bool callerHasWriteLock, int minFree)
578 : : {
5873 alvherre@alvh.no-ip. 579 : 7174 : SISeg *segP = shmInvalBuffer;
580 : : int min,
581 : : minsig,
582 : : lowbound,
583 : : numMsgs,
584 : : i;
5778 tgl@sss.pgh.pa.us 585 : 7174 : ProcState *needSig = NULL;
586 : :
587 : : /* Lock out all writers and readers */
588 [ + + ]: 7174 : if (!callerHasWriteLock)
589 : 2284 : LWLockAcquire(SInvalWriteLock, LW_EXCLUSIVE);
590 : 7174 : LWLockAcquire(SInvalReadLock, LW_EXCLUSIVE);
591 : :
592 : : /*
593 : : * Recompute minMsgNum = minimum of all backends' nextMsgNum, identify the
594 : : * furthest-back backend that needs signaling (if any), and reset any
595 : : * backends that are too far back. Note that because we ignore sendOnly
596 : : * backends here it is possible for them to keep sending messages without
597 : : * a problem even when they are the only active backend.
598 : : */
8987 599 : 7174 : min = segP->maxMsgNum;
5778 600 : 7174 : minsig = min - SIG_THRESHOLD;
601 : 7174 : lowbound = min - MAXNUMMESSAGES + minFree;
602 : :
42 heikki.linnakangas@i 603 [ + + ]:GNC 58571 : for (i = 0; i < segP->numProcs; i++)
604 : : {
605 : 51397 : ProcState *stateP = &segP->procState[segP->pgprocnos[i]];
5421 bruce@momjian.us 606 :CBC 51397 : int n = stateP->nextMsgNum;
607 : :
608 : : /* Ignore if already in reset state */
42 heikki.linnakangas@i 609 [ - + ]:GNC 51397 : Assert(stateP->procPid != 0);
610 [ + + + + ]: 51397 : if (stateP->resetState || stateP->sendOnly)
5778 tgl@sss.pgh.pa.us 611 :CBC 5272 : continue;
612 : :
613 : : /*
614 : : * If we must free some space and this backend is preventing it, force
615 : : * him into reset state and then ignore until he catches up.
616 : : */
617 [ + + ]: 46125 : if (n < lowbound)
618 : : {
619 : 196 : stateP->resetState = true;
620 : : /* no point in signaling him ... */
621 : 196 : continue;
622 : : }
623 : :
624 : : /* Track the global minimum nextMsgNum */
625 [ + + ]: 45929 : if (n < min)
626 : 10671 : min = n;
627 : :
628 : : /* Also see who's furthest back of the unsignaled backends */
629 [ + + + + ]: 45929 : if (n < minsig && !stateP->signaled)
630 : : {
631 : 2308 : minsig = n;
632 : 2308 : needSig = stateP;
633 : : }
634 : : }
8987 635 : 7174 : segP->minMsgNum = min;
636 : :
637 : : /*
638 : : * When minMsgNum gets really large, decrement all message counters so as
639 : : * to forestall overflow of the counters. This happens seldom enough that
640 : : * folding it into the previous loop would be a loser.
641 : : */
642 [ - + ]: 7174 : if (min >= MSGNUMWRAPAROUND)
643 : : {
8987 tgl@sss.pgh.pa.us 644 :UBC 0 : segP->minMsgNum -= MSGNUMWRAPAROUND;
645 : 0 : segP->maxMsgNum -= MSGNUMWRAPAROUND;
42 heikki.linnakangas@i 646 [ # # ]:UNC 0 : for (i = 0; i < segP->numProcs; i++)
647 : 0 : segP->procState[segP->pgprocnos[i]].nextMsgNum -= MSGNUMWRAPAROUND;
648 : : }
649 : :
650 : : /*
651 : : * Determine how many messages are still in the queue, and set the
652 : : * threshold at which we should repeat SICleanupQueue().
653 : : */
5778 tgl@sss.pgh.pa.us 654 :CBC 7174 : numMsgs = segP->maxMsgNum - segP->minMsgNum;
655 [ + + ]: 7174 : if (numMsgs < CLEANUP_MIN)
656 : 2345 : segP->nextThreshold = CLEANUP_MIN;
657 : : else
658 : 4829 : segP->nextThreshold = (numMsgs / CLEANUP_QUANTUM + 1) * CLEANUP_QUANTUM;
659 : :
660 : : /*
661 : : * Lastly, signal anyone who needs a catchup interrupt. Since
662 : : * SendProcSignal() might not be fast, we don't want to hold locks while
663 : : * executing it.
664 : : */
665 [ + + ]: 7174 : if (needSig)
666 : : {
5421 bruce@momjian.us 667 : 2285 : pid_t his_pid = needSig->procPid;
42 heikki.linnakangas@i 668 :GNC 2285 : ProcNumber his_procNumber = (needSig - &segP->procState[0]);
669 : :
5778 tgl@sss.pgh.pa.us 670 :CBC 2285 : needSig->signaled = true;
671 : 2285 : LWLockRelease(SInvalReadLock);
672 : 2285 : LWLockRelease(SInvalWriteLock);
5605 heikki.linnakangas@i 673 [ - + ]: 2285 : elog(DEBUG4, "sending sinval catchup signal to PID %d", (int) his_pid);
42 heikki.linnakangas@i 674 :GNC 2285 : SendProcSignal(his_pid, PROCSIG_CATCHUP_INTERRUPT, his_procNumber);
5778 tgl@sss.pgh.pa.us 675 [ + + ]:CBC 2285 : if (callerHasWriteLock)
676 : 1788 : LWLockAcquire(SInvalWriteLock, LW_EXCLUSIVE);
677 : : }
678 : : else
679 : : {
680 : 4889 : LWLockRelease(SInvalReadLock);
681 [ + + ]: 4889 : if (!callerHasWriteLock)
682 : 1787 : LWLockRelease(SInvalWriteLock);
683 : : }
9716 bruce@momjian.us 684 : 7174 : }
685 : :
686 : :
687 : : /*
688 : : * GetNextLocalTransactionId --- allocate a new LocalTransactionId
689 : : *
690 : : * We split VirtualTransactionIds into two parts so that it is possible
691 : : * to allocate a new one without any contention for shared memory, except
692 : : * for a bit of additional overhead during backend startup/shutdown.
693 : : * The high-order part of a VirtualTransactionId is a ProcNumber, and the
694 : : * low-order part is a LocalTransactionId, which we assign from a local
695 : : * counter. To avoid the risk of a VirtualTransactionId being reused
696 : : * within a short interval, successive procs occupying the same PGPROC slot
697 : : * should use a consecutive sequence of local IDs, which is implemented
698 : : * by copying nextLocalTransactionId as seen above.
699 : : */
700 : : LocalTransactionId
6066 tgl@sss.pgh.pa.us 701 : 433060 : GetNextLocalTransactionId(void)
702 : : {
703 : : LocalTransactionId result;
704 : :
705 : : /* loop to avoid returning InvalidLocalTransactionId at wraparound */
706 : : do
707 : : {
708 : 441753 : result = nextLocalTransactionId++;
709 [ + + ]: 441753 : } while (!LocalTransactionIdIsValid(result));
710 : :
711 : 433060 : return result;
712 : : }
|