Age Owner TLA Line data Source code
1 : /*-------------------------------------------------------------------------
2 : *
3 : * sinvaladt.c
4 : * POSTGRES shared cache invalidation data manager.
5 : *
6 : * Portions Copyright (c) 1996-2023, PostgreSQL Global Development Group
7 : * Portions Copyright (c) 1994, Regents of the University of California
8 : *
9 : *
10 : * IDENTIFICATION
11 : * src/backend/storage/ipc/sinvaladt.c
12 : *
13 : *-------------------------------------------------------------------------
14 : */
15 : #include "postgres.h"
16 :
17 : #include <signal.h>
18 : #include <unistd.h>
19 :
20 : #include "access/transam.h"
21 : #include "miscadmin.h"
22 : #include "storage/backendid.h"
23 : #include "storage/ipc.h"
24 : #include "storage/proc.h"
25 : #include "storage/procsignal.h"
26 : #include "storage/shmem.h"
27 : #include "storage/sinvaladt.h"
28 : #include "storage/spin.h"
29 :
30 : /*
31 : * Conceptually, the shared cache invalidation messages are stored in an
32 : * infinite array, where maxMsgNum is the next array subscript to store a
33 : * submitted message in, minMsgNum is the smallest array subscript containing
34 : * a message not yet read by all backends, and we always have maxMsgNum >=
35 : * minMsgNum. (They are equal when there are no messages pending.) For each
36 : * active backend, there is a nextMsgNum pointer indicating the next message it
37 : * needs to read; we have maxMsgNum >= nextMsgNum >= minMsgNum for every
38 : * backend.
39 : *
40 : * (In the current implementation, minMsgNum is a lower bound for the
41 : * per-process nextMsgNum values, but it isn't rigorously kept equal to the
42 : * smallest nextMsgNum --- it may lag behind. We only update it when
43 : * SICleanupQueue is called, and we try not to do that often.)
44 : *
45 : * In reality, the messages are stored in a circular buffer of MAXNUMMESSAGES
46 : * entries. We translate MsgNum values into circular-buffer indexes by
47 : * computing MsgNum % MAXNUMMESSAGES (this should be fast as long as
48 : * MAXNUMMESSAGES is a constant and a power of 2). As long as maxMsgNum
49 : * doesn't exceed minMsgNum by more than MAXNUMMESSAGES, we have enough space
50 : * in the buffer. If the buffer does overflow, we recover by setting the
51 : * "reset" flag for each backend that has fallen too far behind. A backend
52 : * that is in "reset" state is ignored while determining minMsgNum. When
53 : * it does finally attempt to receive inval messages, it must discard all
54 : * its invalidatable state, since it won't know what it missed.
55 : *
56 : * To reduce the probability of needing resets, we send a "catchup" interrupt
57 : * to any backend that seems to be falling unreasonably far behind. The
58 : * normal behavior is that at most one such interrupt is in flight at a time;
59 : * when a backend completes processing a catchup interrupt, it executes
60 : * SICleanupQueue, which will signal the next-furthest-behind backend if
61 : * needed. This avoids undue contention from multiple backends all trying
62 : * to catch up at once. However, the furthest-back backend might be stuck
63 : * in a state where it can't catch up. Eventually it will get reset, so it
64 : * won't cause any more problems for anyone but itself. But we don't want
65 : * to find that a bunch of other backends are now too close to the reset
66 : * threshold to be saved. So SICleanupQueue is designed to occasionally
67 : * send extra catchup interrupts as the queue gets fuller, to backends that
68 : * are far behind and haven't gotten one yet. As long as there aren't a lot
69 : * of "stuck" backends, we won't need a lot of extra interrupts, since ones
70 : * that aren't stuck will propagate their interrupts to the next guy.
71 : *
72 : * We would have problems if the MsgNum values overflow an integer, so
73 : * whenever minMsgNum exceeds MSGNUMWRAPAROUND, we subtract MSGNUMWRAPAROUND
74 : * from all the MsgNum variables simultaneously. MSGNUMWRAPAROUND can be
75 : * large so that we don't need to do this often. It must be a multiple of
76 : * MAXNUMMESSAGES so that the existing circular-buffer entries don't need
77 : * to be moved when we do it.
78 : *
79 : * Access to the shared sinval array is protected by two locks, SInvalReadLock
80 : * and SInvalWriteLock. Readers take SInvalReadLock in shared mode; this
81 : * authorizes them to modify their own ProcState but not to modify or even
82 : * look at anyone else's. When we need to perform array-wide updates,
83 : * such as in SICleanupQueue, we take SInvalReadLock in exclusive mode to
84 : * lock out all readers. Writers take SInvalWriteLock (always in exclusive
85 : * mode) to serialize adding messages to the queue. Note that a writer
86 : * can operate in parallel with one or more readers, because the writer
87 : * has no need to touch anyone's ProcState, except in the infrequent cases
88 : * when SICleanupQueue is needed. The only point of overlap is that
89 : * the writer wants to change maxMsgNum while readers need to read it.
90 : * We deal with that by having a spinlock that readers must take for just
91 : * long enough to read maxMsgNum, while writers take it for just long enough
92 : * to write maxMsgNum. (The exact rule is that you need the spinlock to
93 : * read maxMsgNum if you are not holding SInvalWriteLock, and you need the
94 : * spinlock to write maxMsgNum unless you are holding both locks.)
95 : *
96 : * Note: since maxMsgNum is an int and hence presumably atomically readable/
97 : * writable, the spinlock might seem unnecessary. The reason it is needed
98 : * is to provide a memory barrier: we need to be sure that messages written
99 : * to the array are actually there before maxMsgNum is increased, and that
100 : * readers will see that data after fetching maxMsgNum. Multiprocessors
101 : * that have weak memory-ordering guarantees can fail without the memory
102 : * barrier instructions that are included in the spinlock sequences.
103 : */
104 :
105 :
106 : /*
107 : * Configurable parameters.
108 : *
109 : * MAXNUMMESSAGES: max number of shared-inval messages we can buffer.
110 : * Must be a power of 2 for speed.
111 : *
112 : * MSGNUMWRAPAROUND: how often to reduce MsgNum variables to avoid overflow.
113 : * Must be a multiple of MAXNUMMESSAGES. Should be large.
114 : *
115 : * CLEANUP_MIN: the minimum number of messages that must be in the buffer
116 : * before we bother to call SICleanupQueue.
117 : *
118 : * CLEANUP_QUANTUM: how often (in messages) to call SICleanupQueue once
119 : * we exceed CLEANUP_MIN. Should be a power of 2 for speed.
120 : *
121 : * SIG_THRESHOLD: the minimum number of messages a backend must have fallen
122 : * behind before we'll send it PROCSIG_CATCHUP_INTERRUPT.
123 : *
124 : * WRITE_QUANTUM: the max number of messages to push into the buffer per
125 : * iteration of SIInsertDataEntries. Noncritical but should be less than
126 : * CLEANUP_QUANTUM, because we only consider calling SICleanupQueue once
127 : * per iteration.
128 : */
129 :
130 : #define MAXNUMMESSAGES 4096
131 : #define MSGNUMWRAPAROUND (MAXNUMMESSAGES * 262144)
132 : #define CLEANUP_MIN (MAXNUMMESSAGES / 2)
133 : #define CLEANUP_QUANTUM (MAXNUMMESSAGES / 16)
134 : #define SIG_THRESHOLD (MAXNUMMESSAGES / 2)
135 : #define WRITE_QUANTUM 64
136 :
137 : /* Per-backend state in shared invalidation structure */
138 : typedef struct ProcState
139 : {
140 : /* procPid is zero in an inactive ProcState array entry. */
141 : pid_t procPid; /* PID of backend, for signaling */
142 : PGPROC *proc; /* PGPROC of backend */
143 : /* nextMsgNum is meaningless if procPid == 0 or resetState is true. */
144 : int nextMsgNum; /* next message number to read */
145 : bool resetState; /* backend needs to reset its state */
146 : bool signaled; /* backend has been sent catchup signal */
147 : bool hasMessages; /* backend has unread messages */
148 :
149 : /*
150 : * Backend only sends invalidations, never receives them. This only makes
151 : * sense for Startup process during recovery because it doesn't maintain a
152 : * relcache, yet it fires inval messages to allow query backends to see
153 : * schema changes.
154 : */
155 : bool sendOnly; /* backend only sends, never receives */
156 :
157 : /*
158 : * Next LocalTransactionId to use for each idle backend slot. We keep
159 : * this here because it is indexed by BackendId and it is convenient to
160 : * copy the value to and from local memory when MyBackendId is set. It's
161 : * meaningless in an active ProcState entry.
162 : */
163 : LocalTransactionId nextLXID;
164 : } ProcState;
165 :
166 : /* Shared cache invalidation memory segment */
167 : typedef struct SISeg
168 : {
169 : /*
170 : * General state information
171 : */
172 : int minMsgNum; /* oldest message still needed */
173 : int maxMsgNum; /* next message number to be assigned */
174 : int nextThreshold; /* # of messages to call SICleanupQueue */
175 : int lastBackend; /* index of last active procState entry, +1 */
176 : int maxBackends; /* size of procState array */
177 :
178 : slock_t msgnumLock; /* spinlock protecting maxMsgNum */
179 :
180 : /*
181 : * Circular buffer holding shared-inval messages
182 : */
183 : SharedInvalidationMessage buffer[MAXNUMMESSAGES];
184 :
185 : /*
186 : * Per-backend invalidation state info (has MaxBackends entries).
187 : */
188 : ProcState procState[FLEXIBLE_ARRAY_MEMBER];
189 : } SISeg;
190 :
191 : static SISeg *shmInvalBuffer; /* pointer to the shared inval buffer */
192 :
193 :
194 : static LocalTransactionId nextLocalTransactionId;
195 :
196 : static void CleanupInvalidationState(int status, Datum arg);
197 :
198 :
199 : /*
200 : * SInvalShmemSize --- return shared-memory space needed
201 : */
202 : Size
6441 tgl 203 CBC 4564 : SInvalShmemSize(void)
204 : {
205 : Size size;
206 :
207 4564 : size = offsetof(SISeg, procState);
208 :
209 : /*
210 : * In Hot Standby mode, the startup process requests a procState array
211 : * slot using InitRecoveryTransactionEnvironment(). Even though
212 : * MaxBackends doesn't account for the startup process, it is guaranteed
213 : * to get a free slot. This is because the autovacuum launcher and worker
214 : * processes, which are included in MaxBackends, are not started in Hot
215 : * Standby mode.
216 : */
362 rhaas 217 4564 : size = add_size(size, mul_size(sizeof(ProcState), MaxBackends));
218 :
6441 tgl 219 4564 : return size;
220 : }
221 :
222 : /*
223 : * CreateSharedInvalidationState
224 : * Create and initialize the SI message buffer
225 : */
226 : void
5502 alvherre 227 1826 : CreateSharedInvalidationState(void)
228 : {
229 : int i;
230 : bool found;
231 :
232 : /* Allocate space in shared memory */
233 1826 : shmInvalBuffer = (SISeg *)
2969 tgl 234 1826 : ShmemInitStruct("shmInvalBuffer", SInvalShmemSize(), &found);
7050 bruce 235 1826 : if (found)
7050 bruce 236 UBC 0 : return;
237 :
238 : /* Clear message counters, save size of procState array, init spinlock */
5502 alvherre 239 CBC 1826 : shmInvalBuffer->minMsgNum = 0;
240 1826 : shmInvalBuffer->maxMsgNum = 0;
5407 tgl 241 1826 : shmInvalBuffer->nextThreshold = CLEANUP_MIN;
5502 alvherre 242 1826 : shmInvalBuffer->lastBackend = 0;
362 rhaas 243 1826 : shmInvalBuffer->maxBackends = MaxBackends;
5406 tgl 244 1826 : SpinLockInit(&shmInvalBuffer->msgnumLock);
245 :
246 : /* The buffer[] array is initially all unused, so we need not fill it */
247 :
248 : /* Mark all backends inactive, and initialize nextLXID */
5502 alvherre 249 193203 : for (i = 0; i < shmInvalBuffer->maxBackends; i++)
250 : {
2118 tgl 251 191377 : shmInvalBuffer->procState[i].procPid = 0; /* inactive */
4266 rhaas 252 191377 : shmInvalBuffer->procState[i].proc = NULL;
5050 bruce 253 191377 : shmInvalBuffer->procState[i].nextMsgNum = 0; /* meaningless */
5502 alvherre 254 191377 : shmInvalBuffer->procState[i].resetState = false;
5407 tgl 255 191377 : shmInvalBuffer->procState[i].signaled = false;
4272 rhaas 256 191377 : shmInvalBuffer->procState[i].hasMessages = false;
5407 tgl 257 191377 : shmInvalBuffer->procState[i].nextLXID = InvalidLocalTransactionId;
258 : }
259 : }
260 :
261 : /*
262 : * SharedInvalBackendInit
263 : * Initialize a new backend to operate on the sinval buffer
264 : */
265 : void
4859 simon 266 11571 : SharedInvalBackendInit(bool sendOnly)
267 : {
268 : int index;
8397 bruce 269 11571 : ProcState *stateP = NULL;
5502 alvherre 270 11571 : SISeg *segP = shmInvalBuffer;
271 :
272 : /*
273 : * This can run in parallel with read operations, but not with write
274 : * operations, since SIInsertDataEntries relies on lastBackend to set
275 : * hasMessages appropriately.
276 : */
5407 tgl 277 11571 : LWLockAcquire(SInvalWriteLock, LW_EXCLUSIVE);
278 :
279 : /* Look for a free entry in the procState array */
8183 280 44280 : for (index = 0; index < segP->lastBackend; index++)
281 : {
2118 282 34031 : if (segP->procState[index].procPid == 0) /* inactive slot? */
283 : {
8616 284 1322 : stateP = &segP->procState[index];
285 1322 : break;
286 : }
287 : }
288 :
289 11571 : if (stateP == NULL)
290 : {
7444 291 10249 : if (segP->lastBackend < segP->maxBackends)
292 : {
293 10249 : stateP = &segP->procState[segP->lastBackend];
5234 heikki.linnakangas 294 10249 : Assert(stateP->procPid == 0);
7444 tgl 295 10249 : segP->lastBackend++;
296 : }
297 : else
298 : {
299 : /*
300 : * out of procState slots: MaxBackends exceeded -- report normally
301 : */
7444 tgl 302 UBC 0 : MyBackendId = InvalidBackendId;
5407 303 0 : LWLockRelease(SInvalWriteLock);
5502 alvherre 304 0 : ereport(FATAL,
305 : (errcode(ERRCODE_TOO_MANY_CONNECTIONS),
306 : errmsg("sorry, too many clients already")));
307 : }
308 : }
309 :
8616 tgl 310 CBC 11571 : MyBackendId = (stateP - &segP->procState[0]) + 1;
311 :
312 : /* Advertise assigned backend ID in MyProc */
5695 313 11571 : MyProc->backendId = MyBackendId;
314 :
315 : /* Fetch next local transaction ID into local memory */
5407 316 11571 : nextLocalTransactionId = stateP->nextLXID;
317 :
318 : /* mark myself active, with all extant messages already read */
5234 heikki.linnakangas 319 11571 : stateP->procPid = MyProcPid;
4266 rhaas 320 11571 : stateP->proc = MyProc;
8616 tgl 321 11571 : stateP->nextMsgNum = segP->maxMsgNum;
8598 322 11571 : stateP->resetState = false;
5407 323 11571 : stateP->signaled = false;
4272 rhaas 324 11571 : stateP->hasMessages = false;
4859 simon 325 11571 : stateP->sendOnly = sendOnly;
326 :
5407 tgl 327 11571 : LWLockRelease(SInvalWriteLock);
328 :
329 : /* register exit routine to mark my entry inactive at exit */
8224 peter_e 330 11571 : on_shmem_exit(CleanupInvalidationState, PointerGetDatum(segP));
331 :
4312 332 11571 : elog(DEBUG4, "my backend ID is %d", MyBackendId);
9345 bruce 333 11571 : }
334 :
335 : /*
336 : * CleanupInvalidationState
337 : * Mark the current backend as no longer active.
338 : *
339 : * This function is called via on_shmem_exit() during backend shutdown.
340 : *
341 : * arg is really of type "SISeg*".
342 : */
343 : static void
8224 peter_e 344 11571 : CleanupInvalidationState(int status, Datum arg)
345 : {
8183 tgl 346 11571 : SISeg *segP = (SISeg *) DatumGetPointer(arg);
347 : ProcState *stateP;
348 : int i;
349 :
8616 350 11571 : Assert(PointerIsValid(segP));
351 :
5407 352 11571 : LWLockAcquire(SInvalWriteLock, LW_EXCLUSIVE);
353 :
354 11571 : stateP = &segP->procState[MyBackendId - 1];
355 :
356 : /* Update next local transaction ID for next holder of this backendID */
357 11571 : stateP->nextLXID = nextLocalTransactionId;
358 :
359 : /* Mark myself inactive */
5234 heikki.linnakangas 360 11571 : stateP->procPid = 0;
4266 rhaas 361 11571 : stateP->proc = NULL;
5407 tgl 362 11571 : stateP->nextMsgNum = 0;
363 11571 : stateP->resetState = false;
364 11571 : stateP->signaled = false;
365 :
366 : /* Recompute index of last active backend */
8183 367 21788 : for (i = segP->lastBackend; i > 0; i--)
368 : {
5234 heikki.linnakangas 369 20763 : if (segP->procState[i - 1].procPid != 0)
8183 tgl 370 10546 : break;
371 : }
372 11571 : segP->lastBackend = i;
373 :
5407 374 11571 : LWLockRelease(SInvalWriteLock);
8616 375 11571 : }
376 :
377 : /*
378 : * BackendIdGetProc
379 : * Get the PGPROC structure for a backend, given the backend ID.
380 : * The result may be out of date arbitrarily quickly, so the caller
381 : * must be careful about how this information is used. NULL is
382 : * returned if the backend is not active.
383 : */
384 : PGPROC *
4266 rhaas 385 619 : BackendIdGetProc(int backendID)
386 : {
387 619 : PGPROC *result = NULL;
5395 tgl 388 619 : SISeg *segP = shmInvalBuffer;
389 :
390 : /* Need to lock out additions/removals of backends */
391 619 : LWLockAcquire(SInvalWriteLock, LW_SHARED);
392 :
393 619 : if (backendID > 0 && backendID <= segP->lastBackend)
394 : {
395 615 : ProcState *stateP = &segP->procState[backendID - 1];
396 :
4266 rhaas 397 615 : result = stateP->proc;
398 : }
399 :
5395 tgl 400 619 : LWLockRelease(SInvalWriteLock);
401 :
402 619 : return result;
403 : }
404 :
405 : /*
406 : * BackendIdGetTransactionIds
407 : * Get the xid, xmin, nsubxid and overflow status of the backend. The
408 : * result may be out of date arbitrarily quickly, so the caller must be
409 : * careful about how this information is used.
410 : */
411 : void
111 rhaas 412 GNC 4284 : BackendIdGetTransactionIds(int backendID, TransactionId *xid,
413 : TransactionId *xmin, int *nsubxid, bool *overflowed)
414 : {
3330 rhaas 415 GIC 4284 : SISeg *segP = shmInvalBuffer;
3330 rhaas 416 ECB :
3330 rhaas 417 GIC 4284 : *xid = InvalidTransactionId;
3330 rhaas 418 CBC 4284 : *xmin = InvalidTransactionId;
111 rhaas 419 GNC 4284 : *nsubxid = 0;
420 4284 : *overflowed = false;
3330 rhaas 421 ECB :
422 : /* Need to lock out additions/removals of backends */
3330 rhaas 423 CBC 4284 : LWLockAcquire(SInvalWriteLock, LW_SHARED);
424 :
3330 rhaas 425 GIC 4284 : if (backendID > 0 && backendID <= segP->lastBackend)
3330 rhaas 426 ECB : {
2932 tgl 427 GIC 2509 : ProcState *stateP = &segP->procState[backendID - 1];
2932 tgl 428 CBC 2509 : PGPROC *proc = stateP->proc;
429 :
430 2509 : if (proc != NULL)
2932 tgl 431 ECB : {
968 andres 432 GIC 2509 : *xid = proc->xid;
969 andres 433 CBC 2509 : *xmin = proc->xmin;
111 rhaas 434 GNC 2509 : *nsubxid = proc->subxidStatus.count;
435 2509 : *overflowed = proc->subxidStatus.overflowed;
436 : }
3330 rhaas 437 ECB : }
438 :
3330 rhaas 439 CBC 4284 : LWLockRelease(SInvalWriteLock);
440 4284 : }
441 :
442 : /*
443 : * SIInsertDataEntries
5407 tgl 444 ECB : * Add new invalidation message(s) to the buffer.
8616 445 : */
446 : void
5407 tgl 447 GIC 584302 : SIInsertDataEntries(const SharedInvalidationMessage *data, int n)
448 : {
449 584302 : SISeg *segP = shmInvalBuffer;
450 :
451 : /*
3260 bruce 452 ECB : * N can be arbitrarily large. We divide the work into groups of no more
453 : * than WRITE_QUANTUM messages, to be sure that we don't hold the lock for
5407 tgl 454 : * an unreasonably long time. (This is not so much because we care about
455 : * letting in other writers, as that some just-caught-up backend might be
456 : * trying to do SICleanupQueue to pass on its signal, and we don't want it
457 : * to have to wait a long time.) Also, we need to consider calling
458 : * SICleanupQueue every so often.
459 : */
5407 tgl 460 GIC 1208774 : while (n > 0)
461 : {
5050 bruce 462 624472 : int nthistime = Min(n, WRITE_QUANTUM);
463 : int numMsgs;
464 : int max;
4272 rhaas 465 ECB : int i;
466 :
5407 tgl 467 CBC 624472 : n -= nthistime;
468 :
5407 tgl 469 GIC 624472 : LWLockAcquire(SInvalWriteLock, LW_EXCLUSIVE);
470 :
471 : /*
5407 tgl 472 ECB : * If the buffer is full, we *must* acquire some space. Clean the
473 : * queue and reset anyone who is preventing space from being freed.
474 : * Otherwise, clean the queue only when it's exceeded the next
475 : * fullness threshold. We have to loop and recheck the buffer state
476 : * after any call of SICleanupQueue.
477 : */
478 : for (;;)
479 : {
5378 tgl 480 GIC 631644 : numMsgs = segP->maxMsgNum - segP->minMsgNum;
481 631644 : if (numMsgs + nthistime > MAXNUMMESSAGES ||
482 631462 : numMsgs >= segP->nextThreshold)
483 7172 : SICleanupQueue(true, nthistime);
484 : else
5378 tgl 485 ECB : break;
8613 486 : }
8616 487 :
5407 488 : /*
489 : * Insert new message(s) into proper slot of circular buffer
490 : */
5406 tgl 491 GIC 624472 : max = segP->maxMsgNum;
5407 492 6868815 : while (nthistime-- > 0)
493 : {
5406 494 6244343 : segP->buffer[max % MAXNUMMESSAGES] = *data++;
495 6244343 : max++;
5406 tgl 496 ECB : }
497 :
498 : /* Update current value of maxMsgNum using spinlock */
2732 rhaas 499 CBC 624472 : SpinLockAcquire(&segP->msgnumLock);
500 624472 : segP->maxMsgNum = max;
2732 rhaas 501 GIC 624472 : SpinLockRelease(&segP->msgnumLock);
502 :
503 : /*
3955 bruce 504 ECB : * Now that the maxMsgNum change is globally visible, we give everyone
505 : * a swift kick to make sure they read the newly added messages.
506 : * Releasing SInvalWriteLock will enforce a full memory barrier, so
507 : * these (unlocked) changes will be committed to memory before we exit
508 : * the function.
509 : */
4272 rhaas 510 GIC 2608254 : for (i = 0; i < segP->lastBackend; i++)
511 : {
512 1983782 : ProcState *stateP = &segP->procState[i];
513 :
4269 514 1983782 : stateP->hasMessages = true;
4272 rhaas 515 ECB : }
516 :
5407 tgl 517 CBC 624472 : LWLockRelease(SInvalWriteLock);
518 : }
9770 scrappy 519 584302 : }
520 :
521 : /*
5407 tgl 522 ECB : * SIGetDataEntries
523 : * get next SI message(s) for current backend, if there are any
8616 524 : *
525 : * Possible return values:
526 : * 0: no SI message available
527 : * n>0: next n SI messages have been extracted into data[]
528 : * -1: SI reset message extracted
529 : *
530 : * If the return value is less than the array size "datasize", the caller
531 : * can assume that there are no more SI messages after the one(s) returned.
532 : * Otherwise, another call is needed to collect more messages.
533 : *
534 : * NB: this can run in parallel with other instances of SIGetDataEntries
535 : * executing on behalf of other backends, since each instance will modify only
536 : * fields of its own backend's ProcState, and no instance will look at fields
537 : * of other backends' ProcStates. We express this by grabbing SInvalReadLock
538 : * in shared mode. Note that this is not exactly the normal (read-only)
539 : * interpretation of a shared lock! Look closely at the interactions before
540 : * allowing SInvalReadLock to be grabbed in shared mode for any other reason!
541 : *
542 : * NB: this can also run in parallel with SIInsertDataEntries. It is not
543 : * guaranteed that we will return any messages added after the routine is
544 : * entered.
545 : *
546 : * Note: we assume that "datasize" is not so large that it might be important
547 : * to break our hold on SInvalReadLock into segments.
548 : */
549 : int
5407 tgl 550 GIC 26828383 : SIGetDataEntries(SharedInvalidationMessage *data, int datasize)
551 : {
552 : SISeg *segP;
553 : ProcState *stateP;
554 : int max;
5407 tgl 555 ECB : int n;
556 :
5502 alvherre 557 GIC 26828383 : segP = shmInvalBuffer;
5407 tgl 558 26828383 : stateP = &segP->procState[MyBackendId - 1];
559 :
560 : /*
561 : * Before starting to take locks, do a quick, unlocked test to see whether
3260 bruce 562 ECB : * there can possibly be anything to read. On a multiprocessor system,
3955 563 : * it's possible that this load could migrate backwards and occur before
564 : * we actually enter this function, so we might miss a sinval message that
565 : * was just added by some other processor. But they can't migrate
566 : * backwards over a preceding lock acquisition, so it should be OK. If we
567 : * haven't acquired a lock preventing against further relevant
568 : * invalidations, any such occurrence is not much different than if the
569 : * invalidation had arrived slightly later in the first place.
570 : */
4272 rhaas 571 GIC 26828383 : if (!stateP->hasMessages)
572 26026336 : return 0;
573 :
574 802047 : LWLockAcquire(SInvalReadLock, LW_SHARED);
575 :
4272 rhaas 576 ECB : /*
577 : * We must reset hasMessages before determining how many messages we're
578 : * going to read. That way, if new messages arrive after we have
579 : * determined how many we're reading, the flag will get reset and we'll
580 : * notice those messages part-way through.
581 : *
582 : * Note that, if we don't end up reading all of the messages, we had
583 : * better be certain to reset this flag before exiting!
584 : */
4269 rhaas 585 GIC 802047 : stateP->hasMessages = false;
586 :
587 : /* Fetch current value of maxMsgNum using spinlock */
2732 588 802047 : SpinLockAcquire(&segP->msgnumLock);
589 802047 : max = segP->maxMsgNum;
2732 rhaas 590 CBC 802047 : SpinLockRelease(&segP->msgnumLock);
591 :
8616 tgl 592 GIC 802047 : if (stateP->resetState)
9345 bruce 593 ECB : {
8397 594 : /*
595 : * Force reset. We can say we have dealt with any messages added
596 : * since the reset, as well; and that means we should clear the
5407 tgl 597 : * signaled flag, too.
598 : */
5406 tgl 599 GIC 217 : stateP->nextMsgNum = max;
5407 600 217 : stateP->resetState = false;
601 217 : stateP->signaled = false;
602 217 : LWLockRelease(SInvalReadLock);
8618 603 217 : return -1;
9345 bruce 604 ECB : }
8618 tgl 605 :
8616 606 : /*
5407 607 : * Retrieve messages and advance backend's counter, until data array is
608 : * full or there are no more messages.
609 : *
610 : * There may be other backends that haven't read the message(s), so we
611 : * cannot delete them here. SICleanupQueue() will eventually remove them
612 : * from the queue.
613 : */
5407 tgl 614 GIC 801830 : n = 0;
5406 615 16747234 : while (n < datasize && stateP->nextMsgNum < max)
616 : {
5407 617 15945404 : data[n++] = segP->buffer[stateP->nextMsgNum % MAXNUMMESSAGES];
618 15945404 : stateP->nextMsgNum++;
5407 tgl 619 ECB : }
8618 620 :
621 : /*
4272 rhaas 622 : * If we have caught up completely, reset our "signaled" flag so that
623 : * we'll get another signal if we fall behind again.
624 : *
625 : * If we haven't caught up completely, reset the hasMessages flag so that
626 : * we see the remaining messages next time.
627 : */
5406 tgl 628 GIC 801830 : if (stateP->nextMsgNum >= max)
5407 629 402408 : stateP->signaled = false;
630 : else
4269 rhaas 631 399422 : stateP->hasMessages = true;
632 :
5407 tgl 633 CBC 801830 : LWLockRelease(SInvalReadLock);
634 801830 : return n;
635 : }
9770 scrappy 636 ECB :
637 : /*
5407 tgl 638 : * SICleanupQueue
8616 639 : * Remove messages that have been consumed by all active backends
640 : *
641 : * callerHasWriteLock is true if caller is holding SInvalWriteLock.
642 : * minFree is the minimum number of message slots to make free.
643 : *
644 : * Possible side effects of this routine include marking one or more
645 : * backends as "reset" in the array, and sending PROCSIG_CATCHUP_INTERRUPT
646 : * to some backend that seems to be getting too far behind. We signal at
647 : * most one backend at a time, for reasons explained at the top of the file.
648 : *
649 : * Caution: because we transiently release write lock when we have to signal
650 : * some other backend, it is NOT guaranteed that there are still minFree
651 : * free message slots at exit. Caller must recheck and perhaps retry.
652 : */
653 : void
5407 tgl 654 GIC 9820 : SICleanupQueue(bool callerHasWriteLock, int minFree)
655 : {
5502 alvherre 656 9820 : SISeg *segP = shmInvalBuffer;
657 : int min,
658 : minsig,
5407 tgl 659 ECB : lowbound,
660 : numMsgs,
661 : i;
5407 tgl 662 GIC 9820 : ProcState *needSig = NULL;
663 :
664 : /* Lock out all writers and readers */
665 9820 : if (!callerHasWriteLock)
666 2648 : LWLockAcquire(SInvalWriteLock, LW_EXCLUSIVE);
5407 tgl 667 CBC 9820 : LWLockAcquire(SInvalReadLock, LW_EXCLUSIVE);
668 :
669 : /*
5050 bruce 670 ECB : * Recompute minMsgNum = minimum of all backends' nextMsgNum, identify the
671 : * furthest-back backend that needs signaling (if any), and reset any
3260 672 : * backends that are too far back. Note that because we ignore sendOnly
673 : * backends here it is possible for them to keep sending messages without
674 : * a problem even when they are the only active backend.
675 : */
8616 tgl 676 GIC 9820 : min = segP->maxMsgNum;
5407 677 9820 : minsig = min - SIG_THRESHOLD;
678 9820 : lowbound = min - MAXNUMMESSAGES + minFree;
679 :
8183 680 75566 : for (i = 0; i < segP->lastBackend; i++)
9345 bruce 681 ECB : {
5407 tgl 682 CBC 65746 : ProcState *stateP = &segP->procState[i];
5050 bruce 683 65746 : int n = stateP->nextMsgNum;
684 :
5407 tgl 685 ECB : /* Ignore if inactive or already in reset state */
4859 simon 686 GIC 65746 : if (stateP->procPid == 0 || stateP->resetState || stateP->sendOnly)
5407 tgl 687 CBC 20521 : continue;
5407 tgl 688 ECB :
689 : /*
690 : * If we must free some space and this backend is preventing it, force
5050 bruce 691 : * him into reset state and then ignore until he catches up.
5407 tgl 692 : */
5407 tgl 693 GIC 45225 : if (n < lowbound)
694 : {
695 218 : stateP->resetState = true;
696 : /* no point in signaling him ... */
697 218 : continue;
5407 tgl 698 ECB : }
699 :
700 : /* Track the global minimum nextMsgNum */
5407 tgl 701 GIC 45007 : if (n < min)
5407 tgl 702 CBC 13470 : min = n;
703 :
704 : /* Also see who's furthest back of the unsignaled backends */
5407 tgl 705 GIC 45007 : if (n < minsig && !stateP->signaled)
5407 tgl 706 ECB : {
5407 tgl 707 CBC 2706 : minsig = n;
5407 tgl 708 GIC 2706 : needSig = stateP;
709 : }
9345 bruce 710 ECB : }
8616 tgl 711 GIC 9820 : segP->minMsgNum = min;
9345 bruce 712 ECB :
8397 713 : /*
714 : * When minMsgNum gets really large, decrement all message counters so as
715 : * to forestall overflow of the counters. This happens seldom enough that
5050 716 : * folding it into the previous loop would be a loser.
717 : */
8616 tgl 718 GIC 9820 : if (min >= MSGNUMWRAPAROUND)
719 : {
8616 tgl 720 UIC 0 : segP->minMsgNum -= MSGNUMWRAPAROUND;
721 0 : segP->maxMsgNum -= MSGNUMWRAPAROUND;
8183 722 0 : for (i = 0; i < segP->lastBackend; i++)
9345 bruce 723 ECB : {
724 : /* we don't bother skipping inactive entries here */
5407 tgl 725 UBC 0 : segP->procState[i].nextMsgNum -= MSGNUMWRAPAROUND;
9345 bruce 726 EUB : }
727 : }
728 :
729 : /*
5407 tgl 730 : * Determine how many messages are still in the queue, and set the
731 : * threshold at which we should repeat SICleanupQueue().
732 : */
5407 tgl 733 GIC 9820 : numMsgs = segP->maxMsgNum - segP->minMsgNum;
734 9820 : if (numMsgs < CLEANUP_MIN)
735 3429 : segP->nextThreshold = CLEANUP_MIN;
736 : else
737 6391 : segP->nextThreshold = (numMsgs / CLEANUP_QUANTUM + 1) * CLEANUP_QUANTUM;
5407 tgl 738 ECB :
739 : /*
5000 740 : * Lastly, signal anyone who needs a catchup interrupt. Since
741 : * SendProcSignal() might not be fast, we don't want to hold locks while
742 : * executing it.
743 : */
5407 tgl 744 GIC 9820 : if (needSig)
745 : {
5050 bruce 746 2660 : pid_t his_pid = needSig->procPid;
5000 tgl 747 2660 : BackendId his_backendId = (needSig - &segP->procState[0]) + 1;
748 :
5407 tgl 749 CBC 2660 : needSig->signaled = true;
5407 tgl 750 GIC 2660 : LWLockRelease(SInvalReadLock);
5407 tgl 751 CBC 2660 : LWLockRelease(SInvalWriteLock);
5234 heikki.linnakangas 752 2660 : elog(DEBUG4, "sending sinval catchup signal to PID %d", (int) his_pid);
5000 tgl 753 GIC 2660 : SendProcSignal(his_pid, PROCSIG_CATCHUP_INTERRUPT, his_backendId);
5407 tgl 754 CBC 2660 : if (callerHasWriteLock)
755 1980 : LWLockAcquire(SInvalWriteLock, LW_EXCLUSIVE);
5407 tgl 756 ECB : }
757 : else
758 : {
5407 tgl 759 CBC 7160 : LWLockRelease(SInvalReadLock);
760 7160 : if (!callerHasWriteLock)
5407 tgl 761 GIC 1968 : LWLockRelease(SInvalWriteLock);
762 : }
9345 bruce 763 9820 : }
5695 tgl 764 ECB :
765 :
766 : /*
767 : * GetNextLocalTransactionId --- allocate a new LocalTransactionId
768 : *
769 : * We split VirtualTransactionIds into two parts so that it is possible
770 : * to allocate a new one without any contention for shared memory, except
771 : * for a bit of additional overhead during backend startup/shutdown.
772 : * The high-order part of a VirtualTransactionId is a BackendId, and the
773 : * low-order part is a LocalTransactionId, which we assign from a local
774 : * counter. To avoid the risk of a VirtualTransactionId being reused
775 : * within a short interval, successive procs occupying the same backend ID
776 : * slot should use a consecutive sequence of local IDs, which is implemented
777 : * by copying nextLocalTransactionId as seen above.
778 : */
779 : LocalTransactionId
5695 tgl 780 GIC 486240 : GetNextLocalTransactionId(void)
781 : {
782 : LocalTransactionId result;
783 :
784 : /* loop to avoid returning InvalidLocalTransactionId at wraparound */
5624 bruce 785 ECB : do
786 : {
5695 tgl 787 GIC 488487 : result = nextLocalTransactionId++;
788 488487 : } while (!LocalTransactionIdIsValid(result));
789 :
790 486240 : return result;
791 : }
|