TLA Line data Source code
1 : /*------------------------------------------------------------------------- 2 : * 3 : * sinval.c 4 : * POSTGRES shared cache invalidation communication code. 5 : * 6 : * Portions Copyright (c) 1996-2023, PostgreSQL Global Development Group 7 : * Portions Copyright (c) 1994, Regents of the University of California 8 : * 9 : * 10 : * IDENTIFICATION 11 : * src/backend/storage/ipc/sinval.c 12 : * 13 : *------------------------------------------------------------------------- 14 : */ 15 : #include "postgres.h" 16 : 17 : #include "access/xact.h" 18 : #include "commands/async.h" 19 : #include "miscadmin.h" 20 : #include "storage/ipc.h" 21 : #include "storage/proc.h" 22 : #include "storage/sinvaladt.h" 23 : #include "utils/inval.h" 24 : 25 : 26 : uint64 SharedInvalidMessageCounter; 27 : 28 : 29 : /* 30 : * Because backends sitting idle will not be reading sinval events, we 31 : * need a way to give an idle backend a swift kick in the rear and make 32 : * it catch up before the sinval queue overflows and forces it to go 33 : * through a cache reset exercise. This is done by sending 34 : * PROCSIG_CATCHUP_INTERRUPT to any backend that gets too far behind. 35 : * 36 : * The signal handler will set an interrupt pending flag and will set the 37 : * processes latch. Whenever starting to read from the client, or when 38 : * interrupted while doing so, ProcessClientReadInterrupt() will call 39 : * ProcessCatchupEvent(). 40 : */ 41 : volatile sig_atomic_t catchupInterruptPending = false; 42 : 43 : 44 : /* 45 : * SendSharedInvalidMessages 46 : * Add shared-cache-invalidation message(s) to the global SI message queue. 47 : */ 48 : void 49 CBC 584302 : SendSharedInvalidMessages(const SharedInvalidationMessage *msgs, int n) 50 : { 51 584302 : SIInsertDataEntries(msgs, n); 52 584302 : } 53 : 54 : /* 55 : * ReceiveSharedInvalidMessages 56 : * Process shared-cache-invalidation messages waiting for this backend 57 : * 58 : * We guarantee to process all messages that had been queued before the 59 : * routine was entered. It is of course possible for more messages to get 60 : * queued right after our last SIGetDataEntries call. 61 : * 62 : * NOTE: it is entirely possible for this routine to be invoked recursively 63 : * as a consequence of processing inside the invalFunction or resetFunction. 64 : * Furthermore, such a recursive call must guarantee that all outstanding 65 : * inval messages have been processed before it exits. This is the reason 66 : * for the strange-looking choice to use a statically allocated buffer array 67 : * and counters; it's so that a recursive call can process messages already 68 : * sucked out of sinvaladt.c. 69 : */ 70 : void 71 26426969 : ReceiveSharedInvalidMessages(void (*invalFunction) (SharedInvalidationMessage *msg), 72 : void (*resetFunction) (void)) 73 : { 74 : #define MAXINVALMSGS 32 75 : static SharedInvalidationMessage messages[MAXINVALMSGS]; 76 : 77 : /* 78 : * We use volatile here to prevent bugs if a compiler doesn't realize that 79 : * recursion is a possibility ... 80 : */ 81 : static volatile int nextmsg = 0; 82 : static volatile int nummsgs = 0; 83 : 84 : /* Deal with any messages still pending from an outer recursion */ 85 26427018 : while (nextmsg < nummsgs) 86 : { 87 49 : SharedInvalidationMessage msg = messages[nextmsg++]; 88 : 89 49 : SharedInvalidMessageCounter++; 90 49 : invalFunction(&msg); 91 : } 92 : 93 : do 94 : { 95 : int getResult; 96 : 97 26828383 : nextmsg = nummsgs = 0; 98 : 99 : /* Try to get some more messages */ 100 26828383 : getResult = SIGetDataEntries(messages, MAXINVALMSGS); 101 : 102 26828383 : if (getResult < 0) 103 : { 104 : /* got a reset message */ 105 217 : elog(DEBUG4, "cache state reset"); 106 217 : SharedInvalidMessageCounter++; 107 217 : resetFunction(); 108 217 : break; /* nothing more to do */ 109 : } 110 : 111 : /* Process them, being wary that a recursive call might eat some */ 112 26828166 : nextmsg = 0; 113 26828166 : nummsgs = getResult; 114 : 115 42773521 : while (nextmsg < nummsgs) 116 : { 117 15945355 : SharedInvalidationMessage msg = messages[nextmsg++]; 118 : 119 15945355 : SharedInvalidMessageCounter++; 120 15945355 : invalFunction(&msg); 121 : } 122 : 123 : /* 124 : * We only need to loop if the last SIGetDataEntries call (which might 125 : * have been within a recursive call) returned a full buffer. 126 : */ 127 26828166 : } while (nummsgs == MAXINVALMSGS); 128 : 129 : /* 130 : * We are now caught up. If we received a catchup signal, reset that 131 : * flag, and call SICleanupQueue(). This is not so much because we need 132 : * to flush dead messages right now, as that we want to pass on the 133 : * catchup signal to the next slowest backend. "Daisy chaining" the 134 : * catchup signal this way avoids creating spikes in system load for what 135 : * should be just a background maintenance activity. 136 : */ 137 26426969 : if (catchupInterruptPending) 138 : { 139 2648 : catchupInterruptPending = false; 140 2648 : elog(DEBUG4, "sinval catchup complete, cleaning queue"); 141 2648 : SICleanupQueue(false, 0); 142 : } 143 26426969 : } 144 : 145 : 146 : /* 147 : * HandleCatchupInterrupt 148 : * 149 : * This is called when PROCSIG_CATCHUP_INTERRUPT is received. 150 : * 151 : * We used to directly call ProcessCatchupEvent directly when idle. These days 152 : * we just set a flag to do it later and notify the process of that fact by 153 : * setting the process's latch. 154 : */ 155 : void 156 2651 : HandleCatchupInterrupt(void) 157 : { 158 : /* 159 : * Note: this is called by a SIGNAL HANDLER. You must be very wary what 160 : * you do here. 161 : */ 162 : 163 2651 : catchupInterruptPending = true; 164 : 165 : /* make sure the event is processed in due course */ 166 2651 : SetLatch(MyLatch); 167 2651 : } 168 : 169 : /* 170 : * ProcessCatchupInterrupt 171 : * 172 : * The portion of catchup interrupt handling that runs outside of the signal 173 : * handler, which allows it to actually process pending invalidations. 174 : */ 175 : void 176 1865 : ProcessCatchupInterrupt(void) 177 : { 178 3331 : while (catchupInterruptPending) 179 : { 180 : /* 181 : * What we need to do here is cause ReceiveSharedInvalidMessages() to 182 : * run, which will do the necessary work and also reset the 183 : * catchupInterruptPending flag. If we are inside a transaction we 184 : * can just call AcceptInvalidationMessages() to do this. If we 185 : * aren't, we start and immediately end a transaction; the call to 186 : * AcceptInvalidationMessages() happens down inside transaction start. 187 : * 188 : * It is awfully tempting to just call AcceptInvalidationMessages() 189 : * without the rest of the xact start/stop overhead, and I think that 190 : * would actually work in the normal case; but I am not sure that 191 : * things would clean up nicely if we got an error partway through. 192 : */ 193 1466 : if (IsTransactionOrTransactionBlock()) 194 : { 195 21 : elog(DEBUG4, "ProcessCatchupEvent inside transaction"); 196 21 : AcceptInvalidationMessages(); 197 : } 198 : else 199 : { 200 1445 : elog(DEBUG4, "ProcessCatchupEvent outside transaction"); 201 1445 : StartTransactionCommand(); 202 1445 : CommitTransactionCommand(); 203 : } 204 : } 205 1865 : }