Age Owner TLA Line data Source code
1 : /*-------------------------------------------------------------------------
2 : *
3 : * sinval.c
4 : * POSTGRES shared cache invalidation communication code.
5 : *
6 : * Portions Copyright (c) 1996-2023, PostgreSQL Global Development Group
7 : * Portions Copyright (c) 1994, Regents of the University of California
8 : *
9 : *
10 : * IDENTIFICATION
11 : * src/backend/storage/ipc/sinval.c
12 : *
13 : *-------------------------------------------------------------------------
14 : */
15 : #include "postgres.h"
16 :
17 : #include "access/xact.h"
18 : #include "commands/async.h"
19 : #include "miscadmin.h"
20 : #include "storage/ipc.h"
21 : #include "storage/proc.h"
22 : #include "storage/sinvaladt.h"
23 : #include "utils/inval.h"
24 :
25 :
26 : uint64 SharedInvalidMessageCounter;
27 :
28 :
29 : /*
30 : * Because backends sitting idle will not be reading sinval events, we
31 : * need a way to give an idle backend a swift kick in the rear and make
32 : * it catch up before the sinval queue overflows and forces it to go
33 : * through a cache reset exercise. This is done by sending
34 : * PROCSIG_CATCHUP_INTERRUPT to any backend that gets too far behind.
35 : *
36 : * The signal handler will set an interrupt pending flag and will set the
37 : * processes latch. Whenever starting to read from the client, or when
38 : * interrupted while doing so, ProcessClientReadInterrupt() will call
39 : * ProcessCatchupEvent().
40 : */
41 : volatile sig_atomic_t catchupInterruptPending = false;
42 :
43 :
44 : /*
45 : * SendSharedInvalidMessages
46 : * Add shared-cache-invalidation message(s) to the global SI message queue.
47 : */
48 : void
5407 tgl 49 CBC 584302 : SendSharedInvalidMessages(const SharedInvalidationMessage *msgs, int n)
50 : {
51 584302 : SIInsertDataEntries(msgs, n);
9770 scrappy 52 584302 : }
53 :
54 : /*
55 : * ReceiveSharedInvalidMessages
56 : * Process shared-cache-invalidation messages waiting for this backend
57 : *
58 : * We guarantee to process all messages that had been queued before the
59 : * routine was entered. It is of course possible for more messages to get
60 : * queued right after our last SIGetDataEntries call.
61 : *
62 : * NOTE: it is entirely possible for this routine to be invoked recursively
63 : * as a consequence of processing inside the invalFunction or resetFunction.
64 : * Furthermore, such a recursive call must guarantee that all outstanding
65 : * inval messages have been processed before it exits. This is the reason
66 : * for the strange-looking choice to use a statically allocated buffer array
67 : * and counters; it's so that a recursive call can process messages already
68 : * sucked out of sinvaladt.c.
69 : */
70 : void
1348 andres 71 26426969 : ReceiveSharedInvalidMessages(void (*invalFunction) (SharedInvalidationMessage *msg),
72 : void (*resetFunction) (void))
73 : {
74 : #define MAXINVALMSGS 32
75 : static SharedInvalidationMessage messages[MAXINVALMSGS];
76 :
77 : /*
78 : * We use volatile here to prevent bugs if a compiler doesn't realize that
79 : * recursion is a possibility ...
80 : */
81 : static volatile int nextmsg = 0;
82 : static volatile int nummsgs = 0;
83 :
84 : /* Deal with any messages still pending from an outer recursion */
5407 tgl 85 26427018 : while (nextmsg < nummsgs)
86 : {
3261 87 49 : SharedInvalidationMessage msg = messages[nextmsg++];
88 :
4293 rhaas 89 49 : SharedInvalidMessageCounter++;
3261 tgl 90 49 : invalFunction(&msg);
91 : }
92 :
93 : do
94 : {
95 : int getResult;
96 :
5407 97 26828383 : nextmsg = nummsgs = 0;
98 :
99 : /* Try to get some more messages */
100 26828383 : getResult = SIGetDataEntries(messages, MAXINVALMSGS);
101 :
8618 102 26828383 : if (getResult < 0)
103 : {
104 : /* got a reset message */
7199 105 217 : elog(DEBUG4, "cache state reset");
4293 rhaas 106 217 : SharedInvalidMessageCounter++;
8618 tgl 107 217 : resetFunction();
5407 108 217 : break; /* nothing more to do */
109 : }
110 :
111 : /* Process them, being wary that a recursive call might eat some */
112 26828166 : nextmsg = 0;
113 26828166 : nummsgs = getResult;
114 :
115 42773521 : while (nextmsg < nummsgs)
116 : {
3261 117 15945355 : SharedInvalidationMessage msg = messages[nextmsg++];
118 :
4293 rhaas 119 15945355 : SharedInvalidMessageCounter++;
3261 tgl 120 15945355 : invalFunction(&msg);
121 : }
122 :
123 : /*
124 : * We only need to loop if the last SIGetDataEntries call (which might
125 : * have been within a recursive call) returned a full buffer.
126 : */
5407 127 26828166 : } while (nummsgs == MAXINVALMSGS);
128 :
129 : /*
130 : * We are now caught up. If we received a catchup signal, reset that
131 : * flag, and call SICleanupQueue(). This is not so much because we need
132 : * to flush dead messages right now, as that we want to pass on the
133 : * catchup signal to the next slowest backend. "Daisy chaining" the
134 : * catchup signal this way avoids creating spikes in system load for what
135 : * should be just a background maintenance activity.
136 : */
2987 andres 137 26426969 : if (catchupInterruptPending)
138 : {
139 2648 : catchupInterruptPending = false;
5407 tgl 140 2648 : elog(DEBUG4, "sinval catchup complete, cleaning queue");
141 2648 : SICleanupQueue(false, 0);
142 : }
9770 scrappy 143 26426969 : }
144 :
145 :
146 : /*
147 : * HandleCatchupInterrupt
148 : *
149 : * This is called when PROCSIG_CATCHUP_INTERRUPT is received.
150 : *
151 : * We used to directly call ProcessCatchupEvent directly when idle. These days
152 : * we just set a flag to do it later and notify the process of that fact by
153 : * setting the process's latch.
154 : */
155 : void
5000 tgl 156 2651 : HandleCatchupInterrupt(void)
157 : {
158 : /*
159 : * Note: this is called by a SIGNAL HANDLER. You must be very wary what
160 : * you do here.
161 : */
162 :
2987 andres 163 2651 : catchupInterruptPending = true;
164 :
165 : /* make sure the event is processed in due course */
166 2651 : SetLatch(MyLatch);
6895 tgl 167 2651 : }
168 :
169 : /*
170 : * ProcessCatchupInterrupt
171 : *
172 : * The portion of catchup interrupt handling that runs outside of the signal
173 : * handler, which allows it to actually process pending invalidations.
174 : */
175 : void
2987 andres 176 1865 : ProcessCatchupInterrupt(void)
177 : {
178 3331 : while (catchupInterruptPending)
179 : {
180 : /*
181 : * What we need to do here is cause ReceiveSharedInvalidMessages() to
182 : * run, which will do the necessary work and also reset the
183 : * catchupInterruptPending flag. If we are inside a transaction we
184 : * can just call AcceptInvalidationMessages() to do this. If we
185 : * aren't, we start and immediately end a transaction; the call to
186 : * AcceptInvalidationMessages() happens down inside transaction start.
187 : *
188 : * It is awfully tempting to just call AcceptInvalidationMessages()
189 : * without the rest of the xact start/stop overhead, and I think that
190 : * would actually work in the normal case; but I am not sure that
191 : * things would clean up nicely if we got an error partway through.
192 : */
193 1466 : if (IsTransactionOrTransactionBlock())
194 : {
195 21 : elog(DEBUG4, "ProcessCatchupEvent inside transaction");
196 21 : AcceptInvalidationMessages();
197 : }
198 : else
199 : {
200 1445 : elog(DEBUG4, "ProcessCatchupEvent outside transaction");
201 1445 : StartTransactionCommand();
202 1445 : CommitTransactionCommand();
203 : }
204 : }
6895 tgl 205 1865 : }
|