Age Owner Branch data TLA Line data Source code
1 : : /*-------------------------------------------------------------------------
2 : : *
3 : : * shm_mq.c
4 : : * single-reader, single-writer shared memory message queue
5 : : *
6 : : * Both the sender and the receiver must have a PGPROC; their respective
7 : : * process latches are used for synchronization. Only the sender may send,
8 : : * and only the receiver may receive. This is intended to allow a user
9 : : * backend to communicate with worker backends that it has registered.
10 : : *
11 : : * Portions Copyright (c) 1996-2024, PostgreSQL Global Development Group
12 : : * Portions Copyright (c) 1994, Regents of the University of California
13 : : *
14 : : * src/backend/storage/ipc/shm_mq.c
15 : : *
16 : : *-------------------------------------------------------------------------
17 : : */
18 : :
19 : : #include "postgres.h"
20 : :
21 : : #include "miscadmin.h"
22 : : #include "pgstat.h"
23 : : #include "port/pg_bitutils.h"
24 : : #include "postmaster/bgworker.h"
25 : : #include "storage/shm_mq.h"
26 : : #include "storage/spin.h"
27 : : #include "utils/memutils.h"
28 : :
29 : : /*
30 : : * This structure represents the actual queue, stored in shared memory.
31 : : *
32 : : * Some notes on synchronization:
33 : : *
34 : : * mq_receiver and mq_bytes_read can only be changed by the receiver; and
35 : : * mq_sender and mq_bytes_written can only be changed by the sender.
36 : : * mq_receiver and mq_sender are protected by mq_mutex, although, importantly,
37 : : * they cannot change once set, and thus may be read without a lock once this
38 : : * is known to be the case.
39 : : *
40 : : * mq_bytes_read and mq_bytes_written are not protected by the mutex. Instead,
41 : : * they are written atomically using 8 byte loads and stores. Memory barriers
42 : : * must be carefully used to synchronize reads and writes of these values with
43 : : * reads and writes of the actual data in mq_ring.
44 : : *
45 : : * mq_detached needs no locking. It can be set by either the sender or the
46 : : * receiver, but only ever from false to true, so redundant writes don't
47 : : * matter. It is important that if we set mq_detached and then set the
48 : : * counterparty's latch, the counterparty must be certain to see the change
49 : : * after waking up. Since SetLatch begins with a memory barrier and ResetLatch
50 : : * ends with one, this should be OK.
51 : : *
52 : : * mq_ring_size and mq_ring_offset never change after initialization, and
53 : : * can therefore be read without the lock.
54 : : *
55 : : * Importantly, mq_ring can be safely read and written without a lock.
56 : : * At any given time, the difference between mq_bytes_read and
57 : : * mq_bytes_written defines the number of bytes within mq_ring that contain
58 : : * unread data, and mq_bytes_read defines the position where those bytes
59 : : * begin. The sender can increase the number of unread bytes at any time,
60 : : * but only the receiver can give license to overwrite those bytes, by
61 : : * incrementing mq_bytes_read. Therefore, it's safe for the receiver to read
62 : : * the unread bytes it knows to be present without the lock. Conversely,
63 : : * the sender can write to the unused portion of the ring buffer without
64 : : * the lock, because nobody else can be reading or writing those bytes. The
65 : : * receiver could be making more bytes unused by incrementing mq_bytes_read,
66 : : * but that's OK. Note that it would be unsafe for the receiver to read any
67 : : * data it's already marked as read, or to write any data; and it would be
68 : : * unsafe for the sender to reread any data after incrementing
69 : : * mq_bytes_written, but fortunately there's no need for any of that.
70 : : */
71 : : struct shm_mq
72 : : {
73 : : slock_t mq_mutex;
74 : : PGPROC *mq_receiver;
75 : : PGPROC *mq_sender;
76 : : pg_atomic_uint64 mq_bytes_read;
77 : : pg_atomic_uint64 mq_bytes_written;
78 : : Size mq_ring_size;
79 : : bool mq_detached;
80 : : uint8 mq_ring_offset;
81 : : char mq_ring[FLEXIBLE_ARRAY_MEMBER];
82 : : };
83 : :
84 : : /*
85 : : * This structure is a backend-private handle for access to a queue.
86 : : *
87 : : * mqh_queue is a pointer to the queue we've attached, and mqh_segment is
88 : : * an optional pointer to the dynamic shared memory segment that contains it.
89 : : * (If mqh_segment is provided, we register an on_dsm_detach callback to
90 : : * make sure we detach from the queue before detaching from DSM.)
91 : : *
92 : : * If this queue is intended to connect the current process with a background
93 : : * worker that started it, the user can pass a pointer to the worker handle
94 : : * to shm_mq_attach(), and we'll store it in mqh_handle. The point of this
95 : : * is to allow us to begin sending to or receiving from that queue before the
96 : : * process we'll be communicating with has even been started. If it fails
97 : : * to start, the handle will allow us to notice that and fail cleanly, rather
98 : : * than waiting forever; see shm_mq_wait_internal. This is mostly useful in
99 : : * simple cases - e.g. where there are just 2 processes communicating; in
100 : : * more complex scenarios, every process may not have a BackgroundWorkerHandle
101 : : * available, or may need to watch for the failure of more than one other
102 : : * process at a time.
103 : : *
104 : : * When a message exists as a contiguous chunk of bytes in the queue - that is,
105 : : * it is smaller than the size of the ring buffer and does not wrap around
106 : : * the end - we return the message to the caller as a pointer into the buffer.
107 : : * For messages that are larger or happen to wrap, we reassemble the message
108 : : * locally by copying the chunks into a backend-local buffer. mqh_buffer is
109 : : * the buffer, and mqh_buflen is the number of bytes allocated for it.
110 : : *
111 : : * mqh_send_pending, is number of bytes that is written to the queue but not
112 : : * yet updated in the shared memory. We will not update it until the written
113 : : * data is 1/4th of the ring size or the tuple queue is full. This will
114 : : * prevent frequent CPU cache misses, and it will also avoid frequent
115 : : * SetLatch() calls, which are quite expensive.
116 : : *
117 : : * mqh_partial_bytes, mqh_expected_bytes, and mqh_length_word_complete
118 : : * are used to track the state of non-blocking operations. When the caller
119 : : * attempts a non-blocking operation that returns SHM_MQ_WOULD_BLOCK, they
120 : : * are expected to retry the call at a later time with the same argument;
121 : : * we need to retain enough state to pick up where we left off.
122 : : * mqh_length_word_complete tracks whether we are done sending or receiving
123 : : * (whichever we're doing) the entire length word. mqh_partial_bytes tracks
124 : : * the number of bytes read or written for either the length word or the
125 : : * message itself, and mqh_expected_bytes - which is used only for reads -
126 : : * tracks the expected total size of the payload.
127 : : *
128 : : * mqh_counterparty_attached tracks whether we know the counterparty to have
129 : : * attached to the queue at some previous point. This lets us avoid some
130 : : * mutex acquisitions.
131 : : *
132 : : * mqh_context is the memory context in effect at the time we attached to
133 : : * the shm_mq. The shm_mq_handle itself is allocated in this context, and
134 : : * we make sure any other allocations we do happen in this context as well,
135 : : * to avoid nasty surprises.
136 : : */
137 : : struct shm_mq_handle
138 : : {
139 : : shm_mq *mqh_queue;
140 : : dsm_segment *mqh_segment;
141 : : BackgroundWorkerHandle *mqh_handle;
142 : : char *mqh_buffer;
143 : : Size mqh_buflen;
144 : : Size mqh_consume_pending;
145 : : Size mqh_send_pending;
146 : : Size mqh_partial_bytes;
147 : : Size mqh_expected_bytes;
148 : : bool mqh_length_word_complete;
149 : : bool mqh_counterparty_attached;
150 : : MemoryContext mqh_context;
151 : : };
152 : :
153 : : static void shm_mq_detach_internal(shm_mq *mq);
154 : : static shm_mq_result shm_mq_send_bytes(shm_mq_handle *mqh, Size nbytes,
155 : : const void *data, bool nowait, Size *bytes_written);
156 : : static shm_mq_result shm_mq_receive_bytes(shm_mq_handle *mqh,
157 : : Size bytes_needed, bool nowait, Size *nbytesp,
158 : : void **datap);
159 : : static bool shm_mq_counterparty_gone(shm_mq *mq,
160 : : BackgroundWorkerHandle *handle);
161 : : static bool shm_mq_wait_internal(shm_mq *mq, PGPROC **ptr,
162 : : BackgroundWorkerHandle *handle);
163 : : static void shm_mq_inc_bytes_read(shm_mq *mq, Size n);
164 : : static void shm_mq_inc_bytes_written(shm_mq *mq, Size n);
165 : : static void shm_mq_detach_callback(dsm_segment *seg, Datum arg);
166 : :
167 : : /* Minimum queue size is enough for header and at least one chunk of data. */
168 : : const Size shm_mq_minimum_size =
169 : : MAXALIGN(offsetof(shm_mq, mq_ring)) + MAXIMUM_ALIGNOF;
170 : :
171 : : #define MQH_INITIAL_BUFSIZE 8192
172 : :
173 : : /*
174 : : * Initialize a new shared message queue.
175 : : */
176 : : shm_mq *
3743 rhaas@postgresql.org 177 :CBC 2666 : shm_mq_create(void *address, Size size)
178 : : {
179 : 2666 : shm_mq *mq = address;
3680 180 : 2666 : Size data_offset = MAXALIGN(offsetof(shm_mq, mq_ring));
181 : :
182 : : /* If the size isn't MAXALIGN'd, just discard the odd bytes. */
3743 183 : 2666 : size = MAXALIGN_DOWN(size);
184 : :
185 : : /* Queue size must be large enough to hold some data. */
186 [ - + ]: 2666 : Assert(size > data_offset);
187 : :
188 : : /* Initialize queue header. */
189 : 2666 : SpinLockInit(&mq->mq_mutex);
190 : 2666 : mq->mq_receiver = NULL;
191 : 2666 : mq->mq_sender = NULL;
2235 192 : 2666 : pg_atomic_init_u64(&mq->mq_bytes_read, 0);
193 : 2666 : pg_atomic_init_u64(&mq->mq_bytes_written, 0);
3743 194 : 2666 : mq->mq_ring_size = size - data_offset;
195 : 2666 : mq->mq_detached = false;
196 : 2666 : mq->mq_ring_offset = data_offset - offsetof(shm_mq, mq_ring);
197 : :
198 : 2666 : return mq;
199 : : }
200 : :
201 : : /*
202 : : * Set the identity of the process that will receive from a shared message
203 : : * queue.
204 : : */
205 : : void
206 : 2666 : shm_mq_set_receiver(shm_mq *mq, PGPROC *proc)
207 : : {
208 : : PGPROC *sender;
209 : :
210 [ - + ]: 2666 : SpinLockAcquire(&mq->mq_mutex);
2236 andres@anarazel.de 211 [ - + ]: 2666 : Assert(mq->mq_receiver == NULL);
212 : 2666 : mq->mq_receiver = proc;
213 : 2666 : sender = mq->mq_sender;
3743 rhaas@postgresql.org 214 : 2666 : SpinLockRelease(&mq->mq_mutex);
215 : :
216 [ + + ]: 2666 : if (sender != NULL)
217 : 18 : SetLatch(&sender->procLatch);
218 : 2666 : }
219 : :
220 : : /*
221 : : * Set the identity of the process that will send to a shared message queue.
222 : : */
223 : : void
224 : 2588 : shm_mq_set_sender(shm_mq *mq, PGPROC *proc)
225 : : {
226 : : PGPROC *receiver;
227 : :
228 [ + + ]: 2588 : SpinLockAcquire(&mq->mq_mutex);
2236 andres@anarazel.de 229 [ - + ]: 2588 : Assert(mq->mq_sender == NULL);
230 : 2588 : mq->mq_sender = proc;
231 : 2588 : receiver = mq->mq_receiver;
3743 rhaas@postgresql.org 232 : 2588 : SpinLockRelease(&mq->mq_mutex);
233 : :
234 [ + + ]: 2588 : if (receiver != NULL)
235 : 2570 : SetLatch(&receiver->procLatch);
236 : 2588 : }
237 : :
238 : : /*
239 : : * Get the configured receiver.
240 : : */
241 : : PGPROC *
242 : 4 : shm_mq_get_receiver(shm_mq *mq)
243 : : {
244 : : PGPROC *receiver;
245 : :
246 [ - + ]: 4 : SpinLockAcquire(&mq->mq_mutex);
2236 andres@anarazel.de 247 : 4 : receiver = mq->mq_receiver;
3743 rhaas@postgresql.org 248 : 4 : SpinLockRelease(&mq->mq_mutex);
249 : :
250 : 4 : return receiver;
251 : : }
252 : :
253 : : /*
254 : : * Get the configured sender.
255 : : */
256 : : PGPROC *
257 : 759913 : shm_mq_get_sender(shm_mq *mq)
258 : : {
259 : : PGPROC *sender;
260 : :
261 [ + + ]: 759913 : SpinLockAcquire(&mq->mq_mutex);
2236 andres@anarazel.de 262 : 759913 : sender = mq->mq_sender;
3743 rhaas@postgresql.org 263 : 759913 : SpinLockRelease(&mq->mq_mutex);
264 : :
265 : 759913 : return sender;
266 : : }
267 : :
268 : : /*
269 : : * Attach to a shared message queue so we can send or receive messages.
270 : : *
271 : : * The memory context in effect at the time this function is called should
272 : : * be one which will last for at least as long as the message queue itself.
273 : : * We'll allocate the handle in that context, and future allocations that
274 : : * are needed to buffer incoming data will happen in that context as well.
275 : : *
276 : : * If seg != NULL, the queue will be automatically detached when that dynamic
277 : : * shared memory segment is detached.
278 : : *
279 : : * If handle != NULL, the queue can be read or written even before the
280 : : * other process has attached. We'll wait for it to do so if needed. The
281 : : * handle must be for a background worker initialized with bgw_notify_pid
282 : : * equal to our PID.
283 : : *
284 : : * shm_mq_detach() should be called when done. This will free the
285 : : * shm_mq_handle and mark the queue itself as detached, so that our
286 : : * counterpart won't get stuck waiting for us to fill or drain the queue
287 : : * after we've already lost interest.
288 : : */
289 : : shm_mq_handle *
290 : 5254 : shm_mq_attach(shm_mq *mq, dsm_segment *seg, BackgroundWorkerHandle *handle)
291 : : {
3631 bruce@momjian.us 292 : 5254 : shm_mq_handle *mqh = palloc(sizeof(shm_mq_handle));
293 : :
3743 rhaas@postgresql.org 294 [ + + - + ]: 5254 : Assert(mq->mq_receiver == MyProc || mq->mq_sender == MyProc);
295 : 5254 : mqh->mqh_queue = mq;
296 : 5254 : mqh->mqh_segment = seg;
297 : 5254 : mqh->mqh_handle = handle;
2418 tgl@sss.pgh.pa.us 298 : 5254 : mqh->mqh_buffer = NULL;
3743 rhaas@postgresql.org 299 : 5254 : mqh->mqh_buflen = 0;
300 : 5254 : mqh->mqh_consume_pending = 0;
913 301 : 5254 : mqh->mqh_send_pending = 0;
3680 302 : 5254 : mqh->mqh_partial_bytes = 0;
2418 tgl@sss.pgh.pa.us 303 : 5254 : mqh->mqh_expected_bytes = 0;
3680 rhaas@postgresql.org 304 : 5254 : mqh->mqh_length_word_complete = false;
3743 305 : 5254 : mqh->mqh_counterparty_attached = false;
2418 tgl@sss.pgh.pa.us 306 : 5254 : mqh->mqh_context = CurrentMemoryContext;
307 : :
3743 rhaas@postgresql.org 308 [ + - ]: 5254 : if (seg != NULL)
309 : 5254 : on_dsm_detach(seg, shm_mq_detach_callback, PointerGetDatum(mq));
310 : :
311 : 5254 : return mqh;
312 : : }
313 : :
314 : : /*
315 : : * Associate a BackgroundWorkerHandle with a shm_mq_handle just as if it had
316 : : * been passed to shm_mq_attach.
317 : : */
318 : : void
3476 319 : 2554 : shm_mq_set_handle(shm_mq_handle *mqh, BackgroundWorkerHandle *handle)
320 : : {
321 [ - + ]: 2554 : Assert(mqh->mqh_handle == NULL);
322 : 2554 : mqh->mqh_handle = handle;
323 : 2554 : }
324 : :
325 : : /*
326 : : * Write a message into a shared message queue.
327 : : */
328 : : shm_mq_result
913 329 : 1309947 : shm_mq_send(shm_mq_handle *mqh, Size nbytes, const void *data, bool nowait,
330 : : bool force_flush)
331 : : {
332 : : shm_mq_iovec iov;
333 : :
3476 334 : 1309947 : iov.data = data;
335 : 1309947 : iov.len = nbytes;
336 : :
913 337 : 1309947 : return shm_mq_sendv(mqh, &iov, 1, nowait, force_flush);
338 : : }
339 : :
340 : : /*
341 : : * Write a message into a shared message queue, gathered from multiple
342 : : * addresses.
343 : : *
344 : : * When nowait = false, we'll wait on our process latch when the ring buffer
345 : : * fills up, and then continue writing once the receiver has drained some data.
346 : : * The process latch is reset after each wait.
347 : : *
348 : : * When nowait = true, we do not manipulate the state of the process latch;
349 : : * instead, if the buffer becomes full, we return SHM_MQ_WOULD_BLOCK. In
350 : : * this case, the caller should call this function again, with the same
351 : : * arguments, each time the process latch is set. (Once begun, the sending
352 : : * of a message cannot be aborted except by detaching from the queue; changing
353 : : * the length or payload will corrupt the queue.)
354 : : *
355 : : * When force_flush = true, we immediately update the shm_mq's mq_bytes_written
356 : : * and notify the receiver (if it is already attached). Otherwise, we don't
357 : : * update it until we have written an amount of data greater than 1/4th of the
358 : : * ring size.
359 : : */
360 : : shm_mq_result
361 : 1311282 : shm_mq_sendv(shm_mq_handle *mqh, shm_mq_iovec *iov, int iovcnt, bool nowait,
362 : : bool force_flush)
363 : : {
364 : : shm_mq_result res;
3631 bruce@momjian.us 365 : 1311282 : shm_mq *mq = mqh->mqh_queue;
366 : : PGPROC *receiver;
3476 rhaas@postgresql.org 367 : 1311282 : Size nbytes = 0;
368 : : Size bytes_written;
369 : : int i;
370 : 1311282 : int which_iov = 0;
371 : : Size offset;
372 : :
3743 373 [ - + ]: 1311282 : Assert(mq->mq_sender == MyProc);
374 : :
375 : : /* Compute total size of write. */
3476 376 [ + + ]: 2623899 : for (i = 0; i < iovcnt; ++i)
377 : 1312617 : nbytes += iov[i].len;
378 : :
379 : : /* Prevent writing messages overwhelming the receiver. */
1273 peter@eisentraut.org 380 [ - + ]: 1311282 : if (nbytes > MaxAllocSize)
1273 peter@eisentraut.org 381 [ # # ]:UBC 0 : ereport(ERROR,
382 : : (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
383 : : errmsg("cannot send a message of size %zu via shared memory queue",
384 : : nbytes)));
385 : :
386 : : /* Try to write, or finish writing, the length word into the buffer. */
3680 rhaas@postgresql.org 387 [ + + ]:CBC 2618039 : while (!mqh->mqh_length_word_complete)
388 : : {
389 [ - + ]: 1306759 : Assert(mqh->mqh_partial_bytes < sizeof(Size));
390 : 1306759 : res = shm_mq_send_bytes(mqh, sizeof(Size) - mqh->mqh_partial_bytes,
2489 tgl@sss.pgh.pa.us 391 : 1306759 : ((char *) &nbytes) + mqh->mqh_partial_bytes,
392 : : nowait, &bytes_written);
393 : :
2869 rhaas@postgresql.org 394 [ + + ]: 1306759 : if (res == SHM_MQ_DETACHED)
395 : : {
396 : : /* Reset state in case caller tries to send another message. */
397 : 2 : mqh->mqh_partial_bytes = 0;
398 : 2 : mqh->mqh_length_word_complete = false;
3743 399 : 2 : return res;
400 : : }
2869 401 : 1306757 : mqh->mqh_partial_bytes += bytes_written;
402 : :
3680 403 [ + - ]: 1306757 : if (mqh->mqh_partial_bytes >= sizeof(Size))
404 : : {
405 [ - + ]: 1306757 : Assert(mqh->mqh_partial_bytes == sizeof(Size));
406 : :
407 : 1306757 : mqh->mqh_partial_bytes = 0;
408 : 1306757 : mqh->mqh_length_word_complete = true;
409 : : }
410 : :
2869 411 [ - + ]: 1306757 : if (res != SHM_MQ_SUCCESS)
2869 rhaas@postgresql.org 412 :UBC 0 : return res;
413 : :
414 : : /* Length word can't be split unless bigger than required alignment. */
3680 rhaas@postgresql.org 415 [ - + ]:CBC 1306757 : Assert(mqh->mqh_length_word_complete || sizeof(Size) > MAXIMUM_ALIGNOF);
416 : : }
417 : :
418 : : /* Write the actual data bytes into the buffer. */
419 [ - + ]: 1311280 : Assert(mqh->mqh_partial_bytes <= nbytes);
3476 420 : 1311280 : offset = mqh->mqh_partial_bytes;
421 : : do
422 : : {
423 : : Size chunksize;
424 : :
425 : : /* Figure out which bytes need to be sent next. */
426 [ + + ]: 1311294 : if (offset >= iov[which_iov].len)
427 : : {
428 : 4006 : offset -= iov[which_iov].len;
429 : 4006 : ++which_iov;
430 [ + + ]: 4006 : if (which_iov >= iovcnt)
431 : 4000 : break;
432 : 6 : continue;
433 : : }
434 : :
435 : : /*
436 : : * We want to avoid copying the data if at all possible, but every
437 : : * chunk of bytes we write into the queue has to be MAXALIGN'd, except
438 : : * the last. Thus, if a chunk other than the last one ends on a
439 : : * non-MAXALIGN'd boundary, we have to combine the tail end of its
440 : : * data with data from one or more following chunks until we either
441 : : * reach the last chunk or accumulate a number of bytes which is
442 : : * MAXALIGN'd.
443 : : */
444 [ + + ]: 1307288 : if (which_iov + 1 < iovcnt &&
445 [ + - ]: 1327 : offset + MAXIMUM_ALIGNOF > iov[which_iov].len)
446 : 1327 : {
447 : : char tmpbuf[MAXIMUM_ALIGNOF];
3249 bruce@momjian.us 448 : 1327 : int j = 0;
449 : :
450 : : for (;;)
451 : : {
3476 rhaas@postgresql.org 452 [ + + ]: 4029 : if (offset < iov[which_iov].len)
453 : : {
454 : 1383 : tmpbuf[j] = iov[which_iov].data[offset];
455 : 1383 : j++;
456 : 1383 : offset++;
457 [ + + ]: 1383 : if (j == MAXIMUM_ALIGNOF)
458 : 8 : break;
459 : : }
460 : : else
461 : : {
462 : 2646 : offset -= iov[which_iov].len;
463 : 2646 : which_iov++;
464 [ + + ]: 2646 : if (which_iov >= iovcnt)
465 : 1319 : break;
466 : : }
467 : : }
468 : :
469 : 1327 : res = shm_mq_send_bytes(mqh, j, tmpbuf, nowait, &bytes_written);
470 : :
2869 471 [ - + ]: 1327 : if (res == SHM_MQ_DETACHED)
472 : : {
473 : : /* Reset state in case caller tries to send another message. */
2869 rhaas@postgresql.org 474 :UBC 0 : mqh->mqh_partial_bytes = 0;
475 : 0 : mqh->mqh_length_word_complete = false;
476 : 0 : return res;
477 : : }
478 : :
3476 rhaas@postgresql.org 479 :CBC 1327 : mqh->mqh_partial_bytes += bytes_written;
480 [ - + ]: 1327 : if (res != SHM_MQ_SUCCESS)
3476 rhaas@postgresql.org 481 :UBC 0 : return res;
3476 rhaas@postgresql.org 482 :CBC 1327 : continue;
483 : : }
484 : :
485 : : /*
486 : : * If this is the last chunk, we can write all the data, even if it
487 : : * isn't a multiple of MAXIMUM_ALIGNOF. Otherwise, we need to
488 : : * MAXALIGN_DOWN the write size.
489 : : */
490 : 1305961 : chunksize = iov[which_iov].len - offset;
491 [ - + ]: 1305961 : if (which_iov + 1 < iovcnt)
3476 rhaas@postgresql.org 492 :UBC 0 : chunksize = MAXALIGN_DOWN(chunksize);
3476 rhaas@postgresql.org 493 :CBC 1305961 : res = shm_mq_send_bytes(mqh, chunksize, &iov[which_iov].data[offset],
494 : : nowait, &bytes_written);
495 : :
2869 496 [ - + ]: 1305961 : if (res == SHM_MQ_DETACHED)
497 : : {
498 : : /* Reset state in case caller tries to send another message. */
2869 rhaas@postgresql.org 499 :UBC 0 : mqh->mqh_length_word_complete = false;
500 : 0 : mqh->mqh_partial_bytes = 0;
501 : 0 : return res;
502 : : }
503 : :
3476 rhaas@postgresql.org 504 :CBC 1305961 : mqh->mqh_partial_bytes += bytes_written;
505 : 1305961 : offset += bytes_written;
506 [ + + ]: 1305961 : if (res != SHM_MQ_SUCCESS)
507 : 4523 : return res;
508 [ + + ]: 1302771 : } while (mqh->mqh_partial_bytes < nbytes);
509 : :
510 : : /* Reset for next message. */
511 : 1306757 : mqh->mqh_partial_bytes = 0;
512 : 1306757 : mqh->mqh_length_word_complete = false;
513 : :
514 : : /* If queue has been detached, let caller know. */
2235 515 [ - + ]: 1306757 : if (mq->mq_detached)
2235 rhaas@postgresql.org 516 :UBC 0 : return SHM_MQ_DETACHED;
517 : :
518 : : /*
519 : : * If the counterparty is known to have attached, we can read mq_receiver
520 : : * without acquiring the spinlock. Otherwise, more caution is needed.
521 : : */
2235 rhaas@postgresql.org 522 [ + + ]:CBC 1306757 : if (mqh->mqh_counterparty_attached)
523 : 1304276 : receiver = mq->mq_receiver;
524 : : else
525 : : {
526 [ - + ]: 2481 : SpinLockAcquire(&mq->mq_mutex);
527 : 2481 : receiver = mq->mq_receiver;
528 : 2481 : SpinLockRelease(&mq->mq_mutex);
691 529 [ + - ]: 2481 : if (receiver != NULL)
530 : 2481 : mqh->mqh_counterparty_attached = true;
531 : : }
532 : :
533 : : /*
534 : : * If the caller has requested force flush or we have written more than
535 : : * 1/4 of the ring size, mark it as written in shared memory and notify
536 : : * the receiver.
537 : : */
913 538 [ + + + + ]: 1306757 : if (force_flush || mqh->mqh_send_pending > (mq->mq_ring_size >> 2))
539 : : {
540 : 121315 : shm_mq_inc_bytes_written(mq, mqh->mqh_send_pending);
691 541 [ + - ]: 121315 : if (receiver != NULL)
542 : 121315 : SetLatch(&receiver->procLatch);
913 543 : 121315 : mqh->mqh_send_pending = 0;
544 : : }
545 : :
2235 546 : 1306757 : return SHM_MQ_SUCCESS;
547 : : }
548 : :
549 : : /*
550 : : * Receive a message from a shared message queue.
551 : : *
552 : : * We set *nbytes to the message length and *data to point to the message
553 : : * payload. If the entire message exists in the queue as a single,
554 : : * contiguous chunk, *data will point directly into shared memory; otherwise,
555 : : * it will point to a temporary buffer. This mostly avoids data copying in
556 : : * the hoped-for case where messages are short compared to the buffer size,
557 : : * while still allowing longer messages. In either case, the return value
558 : : * remains valid until the next receive operation is performed on the queue.
559 : : *
560 : : * When nowait = false, we'll wait on our process latch when the ring buffer
561 : : * is empty and we have not yet received a full message. The sender will
562 : : * set our process latch after more data has been written, and we'll resume
563 : : * processing. Each call will therefore return a complete message
564 : : * (unless the sender detaches the queue).
565 : : *
566 : : * When nowait = true, we do not manipulate the state of the process latch;
567 : : * instead, whenever the buffer is empty and we need to read from it, we
568 : : * return SHM_MQ_WOULD_BLOCK. In this case, the caller should call this
569 : : * function again after the process latch has been set.
570 : : */
571 : : shm_mq_result
3680 572 : 2088520 : shm_mq_receive(shm_mq_handle *mqh, Size *nbytesp, void **datap, bool nowait)
573 : : {
3631 bruce@momjian.us 574 : 2088520 : shm_mq *mq = mqh->mqh_queue;
575 : : shm_mq_result res;
576 : 2088520 : Size rb = 0;
577 : : Size nbytes;
578 : : void *rawdata;
579 : :
3743 rhaas@postgresql.org 580 [ - + ]: 2088520 : Assert(mq->mq_receiver == MyProc);
581 : :
582 : : /* We can't receive data until the sender has attached. */
583 [ + + ]: 2088520 : if (!mqh->mqh_counterparty_attached)
584 : : {
585 [ + + ]: 435093 : if (nowait)
586 : : {
587 : : int counterparty_gone;
588 : :
589 : : /*
590 : : * We shouldn't return at this point at all unless the sender
591 : : * hasn't attached yet. However, the correct return value depends
592 : : * on whether the sender is still attached. If we first test
593 : : * whether the sender has ever attached and then test whether the
594 : : * sender has detached, there's a race condition: a sender that
595 : : * attaches and detaches very quickly might fool us into thinking
596 : : * the sender never attached at all. So, test whether our
597 : : * counterparty is definitively gone first, and only afterwards
598 : : * check whether the sender ever attached in the first place.
599 : : */
3085 600 : 435064 : counterparty_gone = shm_mq_counterparty_gone(mq, mqh->mqh_handle);
3743 601 [ + + ]: 435064 : if (shm_mq_get_sender(mq) == NULL)
602 : : {
3085 603 [ - + ]: 432513 : if (counterparty_gone)
3097 rhaas@postgresql.org 604 :UBC 0 : return SHM_MQ_DETACHED;
605 : : else
3085 rhaas@postgresql.org 606 :CBC 432513 : return SHM_MQ_WOULD_BLOCK;
607 : : }
608 : : }
3637 609 [ + + ]: 29 : else if (!shm_mq_wait_internal(mq, &mq->mq_sender, mqh->mqh_handle)
610 [ - + ]: 4 : && shm_mq_get_sender(mq) == NULL)
611 : : {
3743 rhaas@postgresql.org 612 :UBC 0 : mq->mq_detached = true;
613 : 0 : return SHM_MQ_DETACHED;
614 : : }
3743 rhaas@postgresql.org 615 :CBC 2580 : mqh->mqh_counterparty_attached = true;
616 : : }
617 : :
618 : : /*
619 : : * If we've consumed an amount of data greater than 1/4th of the ring
620 : : * size, mark it consumed in shared memory. We try to avoid doing this
621 : : * unnecessarily when only a small amount of data has been consumed,
622 : : * because SetLatch() is fairly expensive and we don't want to do it too
623 : : * often.
624 : : */
2235 625 [ + + ]: 1656007 : if (mqh->mqh_consume_pending > mq->mq_ring_size / 4)
626 : : {
3743 627 : 18827 : shm_mq_inc_bytes_read(mq, mqh->mqh_consume_pending);
628 : 18827 : mqh->mqh_consume_pending = 0;
629 : : }
630 : :
631 : : /* Try to read, or finish reading, the length word from the buffer. */
3680 632 [ + + ]: 1677571 : while (!mqh->mqh_length_word_complete)
633 : : {
634 : : /* Try to receive the message length word. */
635 [ - + ]: 1651809 : Assert(mqh->mqh_partial_bytes < sizeof(Size));
2235 636 : 1651809 : res = shm_mq_receive_bytes(mqh, sizeof(Size) - mqh->mqh_partial_bytes,
637 : : nowait, &rb, &rawdata);
3743 638 [ + + ]: 1651809 : if (res != SHM_MQ_SUCCESS)
639 : 354963 : return res;
640 : :
641 : : /*
642 : : * Hopefully, we'll receive the entire message length word at once.
643 : : * But if sizeof(Size) > MAXIMUM_ALIGNOF, then it might be split over
644 : : * multiple reads.
645 : : */
3680 646 [ + - + - ]: 1296846 : if (mqh->mqh_partial_bytes == 0 && rb >= sizeof(Size))
3743 647 : 21564 : {
648 : : Size needed;
649 : :
3631 bruce@momjian.us 650 : 1296846 : nbytes = *(Size *) rawdata;
651 : :
652 : : /* If we've already got the whole message, we're done. */
3680 rhaas@postgresql.org 653 : 1296846 : needed = MAXALIGN(sizeof(Size)) + MAXALIGN(nbytes);
654 [ + + ]: 1296846 : if (rb >= needed)
655 : : {
2235 656 : 1275282 : mqh->mqh_consume_pending += needed;
3680 657 : 1275282 : *nbytesp = nbytes;
658 : 1275282 : *datap = ((char *) rawdata) + MAXALIGN(sizeof(Size));
659 : 1275282 : return SHM_MQ_SUCCESS;
660 : : }
661 : :
662 : : /*
663 : : * We don't have the whole message, but we at least have the whole
664 : : * length word.
665 : : */
666 : 21564 : mqh->mqh_expected_bytes = nbytes;
667 : 21564 : mqh->mqh_length_word_complete = true;
2235 668 : 21564 : mqh->mqh_consume_pending += MAXALIGN(sizeof(Size));
3680 669 : 21564 : rb -= MAXALIGN(sizeof(Size));
670 : : }
671 : : else
672 : : {
673 : : Size lengthbytes;
674 : :
675 : : /* Can't be split unless bigger than required alignment. */
3680 rhaas@postgresql.org 676 :UBC 0 : Assert(sizeof(Size) > MAXIMUM_ALIGNOF);
677 : :
678 : : /* Message word is split; need buffer to reassemble. */
679 : : if (mqh->mqh_buffer == NULL)
680 : : {
681 : : mqh->mqh_buffer = MemoryContextAlloc(mqh->mqh_context,
682 : : MQH_INITIAL_BUFSIZE);
683 : : mqh->mqh_buflen = MQH_INITIAL_BUFSIZE;
684 : : }
685 : : Assert(mqh->mqh_buflen >= sizeof(Size));
686 : :
687 : : /* Copy partial length word; remember to consume it. */
688 : : if (mqh->mqh_partial_bytes + rb > sizeof(Size))
689 : : lengthbytes = sizeof(Size) - mqh->mqh_partial_bytes;
690 : : else
691 : : lengthbytes = rb;
692 : : memcpy(&mqh->mqh_buffer[mqh->mqh_partial_bytes], rawdata,
693 : : lengthbytes);
694 : : mqh->mqh_partial_bytes += lengthbytes;
695 : : mqh->mqh_consume_pending += MAXALIGN(lengthbytes);
696 : : rb -= lengthbytes;
697 : :
698 : : /* If we now have the whole word, we're ready to read payload. */
699 : : if (mqh->mqh_partial_bytes >= sizeof(Size))
700 : : {
701 : : Assert(mqh->mqh_partial_bytes == sizeof(Size));
702 : : mqh->mqh_expected_bytes = *(Size *) mqh->mqh_buffer;
703 : : mqh->mqh_length_word_complete = true;
704 : : mqh->mqh_partial_bytes = 0;
705 : : }
706 : : }
707 : : }
3680 rhaas@postgresql.org 708 :CBC 25762 : nbytes = mqh->mqh_expected_bytes;
709 : :
710 : : /*
711 : : * Should be disallowed on the sending side already, but better check and
712 : : * error out on the receiver side as well rather than trying to read a
713 : : * prohibitively large message.
714 : : */
1273 peter@eisentraut.org 715 [ - + ]: 25762 : if (nbytes > MaxAllocSize)
1273 peter@eisentraut.org 716 [ # # ]:UBC 0 : ereport(ERROR,
717 : : (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
718 : : errmsg("invalid message size %zu in shared memory queue",
719 : : nbytes)));
720 : :
3680 rhaas@postgresql.org 721 [ + + ]:CBC 25762 : if (mqh->mqh_partial_bytes == 0)
722 : : {
723 : : /*
724 : : * Try to obtain the whole message in a single chunk. If this works,
725 : : * we need not copy the data and can return a pointer directly into
726 : : * shared memory.
727 : : */
2235 728 : 21578 : res = shm_mq_receive_bytes(mqh, nbytes, nowait, &rb, &rawdata);
3743 729 [ + + ]: 21578 : if (res != SHM_MQ_SUCCESS)
730 : 14 : return res;
731 [ + + ]: 21564 : if (rb >= nbytes)
732 : : {
3680 733 : 192 : mqh->mqh_length_word_complete = false;
2235 734 : 192 : mqh->mqh_consume_pending += MAXALIGN(nbytes);
3743 735 : 192 : *nbytesp = nbytes;
736 : 192 : *datap = rawdata;
737 : 192 : return SHM_MQ_SUCCESS;
738 : : }
739 : :
740 : : /*
741 : : * The message has wrapped the buffer. We'll need to copy it in order
742 : : * to return it to the client in one chunk. First, make sure we have
743 : : * a large enough buffer available.
744 : : */
745 [ + + ]: 21372 : if (mqh->mqh_buflen < nbytes)
746 : : {
747 : : Size newbuflen;
748 : :
749 : : /*
750 : : * Increase size to the next power of 2 that's >= nbytes, but
751 : : * limit to MaxAllocSize.
752 : : */
994 tgl@sss.pgh.pa.us 753 : 119 : newbuflen = pg_nextpower2_size_t(nbytes);
1273 peter@eisentraut.org 754 : 119 : newbuflen = Min(newbuflen, MaxAllocSize);
755 : :
3743 rhaas@postgresql.org 756 [ - + ]: 119 : if (mqh->mqh_buffer != NULL)
757 : : {
3743 rhaas@postgresql.org 758 :UBC 0 : pfree(mqh->mqh_buffer);
759 : 0 : mqh->mqh_buffer = NULL;
760 : 0 : mqh->mqh_buflen = 0;
761 : : }
3743 rhaas@postgresql.org 762 :CBC 119 : mqh->mqh_buffer = MemoryContextAlloc(mqh->mqh_context, newbuflen);
763 : 119 : mqh->mqh_buflen = newbuflen;
764 : : }
765 : : }
766 : :
767 : : /* Loop until we've copied the entire message. */
768 : : for (;;)
769 : 153800 : {
770 : : Size still_needed;
771 : :
772 : : /* Copy as much as we can. */
3680 773 [ - + ]: 179356 : Assert(mqh->mqh_partial_bytes + rb <= nbytes);
773 tgl@sss.pgh.pa.us 774 [ + + ]: 179356 : if (rb > 0)
775 : : {
776 : 175172 : memcpy(&mqh->mqh_buffer[mqh->mqh_partial_bytes], rawdata, rb);
777 : 175172 : mqh->mqh_partial_bytes += rb;
778 : : }
779 : :
780 : : /*
781 : : * Update count of bytes that can be consumed, accounting for
782 : : * alignment padding. Note that this will never actually insert any
783 : : * padding except at the end of a message, because the buffer size is
784 : : * a multiple of MAXIMUM_ALIGNOF, and each read and write is as well.
785 : : */
3680 rhaas@postgresql.org 786 [ + + - + ]: 179356 : Assert(mqh->mqh_partial_bytes == nbytes || rb == MAXALIGN(rb));
2235 787 : 179356 : mqh->mqh_consume_pending += MAXALIGN(rb);
788 : :
789 : : /* If we got all the data, exit the loop. */
3680 790 [ + + ]: 179356 : if (mqh->mqh_partial_bytes >= nbytes)
3743 791 : 21372 : break;
792 : :
793 : : /* Wait for some more data. */
3680 794 : 157984 : still_needed = nbytes - mqh->mqh_partial_bytes;
2235 795 : 157984 : res = shm_mq_receive_bytes(mqh, still_needed, nowait, &rb, &rawdata);
3743 796 [ + + ]: 157984 : if (res != SHM_MQ_SUCCESS)
797 : 4184 : return res;
798 [ + + ]: 153800 : if (rb > still_needed)
799 : 20730 : rb = still_needed;
800 : : }
801 : :
802 : : /* Return the complete message, and reset for next message. */
803 : 21372 : *nbytesp = nbytes;
804 : 21372 : *datap = mqh->mqh_buffer;
3680 805 : 21372 : mqh->mqh_length_word_complete = false;
806 : 21372 : mqh->mqh_partial_bytes = 0;
3743 807 : 21372 : return SHM_MQ_SUCCESS;
808 : : }
809 : :
810 : : /*
811 : : * Wait for the other process that's supposed to use this queue to attach
812 : : * to it.
813 : : *
814 : : * The return value is SHM_MQ_DETACHED if the worker has already detached or
815 : : * if it dies; it is SHM_MQ_SUCCESS if we detect that the worker has attached.
816 : : * Note that we will only be able to detect that the worker has died before
817 : : * attaching if a background worker handle was passed to shm_mq_attach().
818 : : */
819 : : shm_mq_result
3743 rhaas@postgresql.org 820 :UBC 0 : shm_mq_wait_for_attach(shm_mq_handle *mqh)
821 : : {
822 : 0 : shm_mq *mq = mqh->mqh_queue;
823 : : PGPROC **victim;
824 : :
825 [ # # ]: 0 : if (shm_mq_get_receiver(mq) == MyProc)
826 : 0 : victim = &mq->mq_sender;
827 : : else
828 : : {
829 [ # # ]: 0 : Assert(shm_mq_get_sender(mq) == MyProc);
830 : 0 : victim = &mq->mq_receiver;
831 : : }
832 : :
833 [ # # ]: 0 : if (shm_mq_wait_internal(mq, victim, mqh->mqh_handle))
834 : 0 : return SHM_MQ_SUCCESS;
835 : : else
836 : 0 : return SHM_MQ_DETACHED;
837 : : }
838 : :
839 : : /*
840 : : * Detach from a shared message queue, and destroy the shm_mq_handle.
841 : : */
842 : : void
2418 tgl@sss.pgh.pa.us 843 :CBC 3835 : shm_mq_detach(shm_mq_handle *mqh)
844 : : {
845 : : /* Before detaching, notify the receiver about any already-written data. */
913 rhaas@postgresql.org 846 [ + + ]: 3835 : if (mqh->mqh_send_pending > 0)
847 : : {
848 : 1143 : shm_mq_inc_bytes_written(mqh->mqh_queue, mqh->mqh_send_pending);
849 : 1143 : mqh->mqh_send_pending = 0;
850 : : }
851 : :
852 : : /* Notify counterparty that we're outta here. */
2418 tgl@sss.pgh.pa.us 853 : 3835 : shm_mq_detach_internal(mqh->mqh_queue);
854 : :
855 : : /* Cancel on_dsm_detach callback, if any. */
856 [ + - ]: 3835 : if (mqh->mqh_segment)
857 : 3835 : cancel_on_dsm_detach(mqh->mqh_segment,
858 : : shm_mq_detach_callback,
859 : 3835 : PointerGetDatum(mqh->mqh_queue));
860 : :
861 : : /* Release local memory associated with handle. */
862 [ + + ]: 3835 : if (mqh->mqh_buffer != NULL)
863 : 111 : pfree(mqh->mqh_buffer);
864 : 3835 : pfree(mqh);
865 : 3835 : }
866 : :
867 : : /*
868 : : * Notify counterparty that we're detaching from shared message queue.
869 : : *
870 : : * The purpose of this function is to make sure that the process
871 : : * with which we're communicating doesn't block forever waiting for us to
872 : : * fill or drain the queue once we've lost interest. When the sender
873 : : * detaches, the receiver can read any messages remaining in the queue;
874 : : * further reads will return SHM_MQ_DETACHED. If the receiver detaches,
875 : : * further attempts to send messages will likewise return SHM_MQ_DETACHED.
876 : : *
877 : : * This is separated out from shm_mq_detach() because if the on_dsm_detach
878 : : * callback fires, we only want to do this much. We do not try to touch
879 : : * the local shm_mq_handle, as it may have been pfree'd already.
880 : : */
881 : : static void
882 : 5250 : shm_mq_detach_internal(shm_mq *mq)
883 : : {
884 : : PGPROC *victim;
885 : :
3743 rhaas@postgresql.org 886 [ + + ]: 5250 : SpinLockAcquire(&mq->mq_mutex);
2236 andres@anarazel.de 887 [ + + ]: 5250 : if (mq->mq_sender == MyProc)
888 : 2586 : victim = mq->mq_receiver;
889 : : else
890 : : {
891 [ - + ]: 2664 : Assert(mq->mq_receiver == MyProc);
892 : 2664 : victim = mq->mq_sender;
893 : : }
894 : 5250 : mq->mq_detached = true;
3743 rhaas@postgresql.org 895 : 5250 : SpinLockRelease(&mq->mq_mutex);
896 : :
897 [ + + ]: 5250 : if (victim != NULL)
898 : 5172 : SetLatch(&victim->procLatch);
899 : 5250 : }
900 : :
901 : : /*
902 : : * Get the shm_mq from handle.
903 : : */
904 : : shm_mq *
3131 905 : 324845 : shm_mq_get_queue(shm_mq_handle *mqh)
906 : : {
907 : 324845 : return mqh->mqh_queue;
908 : : }
909 : :
910 : : /*
911 : : * Write bytes into a shared message queue.
912 : : */
913 : : static shm_mq_result
3476 914 : 2614047 : shm_mq_send_bytes(shm_mq_handle *mqh, Size nbytes, const void *data,
915 : : bool nowait, Size *bytes_written)
916 : : {
3743 917 : 2614047 : shm_mq *mq = mqh->mqh_queue;
3680 918 : 2614047 : Size sent = 0;
919 : : uint64 used;
920 : 2614047 : Size ringsize = mq->mq_ring_size;
921 : : Size available;
922 : :
3743 923 [ + + ]: 5548052 : while (sent < nbytes)
924 : : {
925 : : uint64 rb;
926 : : uint64 wb;
927 : :
928 : : /* Compute number of ring buffer bytes used and available. */
2235 929 : 2938530 : rb = pg_atomic_read_u64(&mq->mq_bytes_read);
913 930 : 2938530 : wb = pg_atomic_read_u64(&mq->mq_bytes_written) + mqh->mqh_send_pending;
2235 931 [ - + ]: 2938530 : Assert(wb >= rb);
932 : 2938530 : used = wb - rb;
3743 933 [ - + ]: 2938530 : Assert(used <= ringsize);
934 : 2938530 : available = Min(ringsize - used, nbytes - sent);
935 : :
936 : : /*
937 : : * Bail out if the queue has been detached. Note that we would be in
938 : : * trouble if the compiler decided to cache the value of
939 : : * mq->mq_detached in a register or on the stack across loop
940 : : * iterations. It probably shouldn't do that anyway since we'll
941 : : * always return, call an external function that performs a system
942 : : * call, or reach a memory barrier at some point later in the loop,
943 : : * but just to be sure, insert a compiler barrier here.
944 : : */
2235 945 : 2938530 : pg_compiler_barrier();
946 [ + + ]: 2938530 : if (mq->mq_detached)
947 : : {
3552 948 : 2 : *bytes_written = sent;
3743 949 : 2 : return SHM_MQ_DETACHED;
950 : : }
951 : :
3173 952 [ + + + + ]: 2938528 : if (available == 0 && !mqh->mqh_counterparty_attached)
953 : : {
954 : : /*
955 : : * The queue is full, so if the receiver isn't yet known to be
956 : : * attached, we must wait for that to happen.
957 : : */
958 [ + + ]: 9 : if (nowait)
959 : : {
3097 960 [ - + ]: 4 : if (shm_mq_counterparty_gone(mq, mqh->mqh_handle))
961 : : {
3097 rhaas@postgresql.org 962 :UBC 0 : *bytes_written = sent;
963 : 0 : return SHM_MQ_DETACHED;
964 : : }
3173 rhaas@postgresql.org 965 [ - + ]:CBC 4 : if (shm_mq_get_receiver(mq) == NULL)
966 : : {
3552 rhaas@postgresql.org 967 :UBC 0 : *bytes_written = sent;
3173 968 : 0 : return SHM_MQ_WOULD_BLOCK;
969 : : }
970 : : }
3173 rhaas@postgresql.org 971 [ - + ]:CBC 5 : else if (!shm_mq_wait_internal(mq, &mq->mq_receiver,
972 : : mqh->mqh_handle))
973 : : {
3173 rhaas@postgresql.org 974 :UBC 0 : mq->mq_detached = true;
975 : 0 : *bytes_written = sent;
976 : 0 : return SHM_MQ_DETACHED;
977 : : }
3173 rhaas@postgresql.org 978 :CBC 9 : mqh->mqh_counterparty_attached = true;
979 : :
980 : : /*
981 : : * The receiver may have read some data after attaching, so we
982 : : * must not wait without rechecking the queue state.
983 : : */
984 : : }
985 [ + + ]: 2938519 : else if (available == 0)
986 : : {
987 : : /* Update the pending send bytes in the shared memory. */
913 988 : 159434 : shm_mq_inc_bytes_written(mq, mqh->mqh_send_pending);
989 : :
990 : : /*
991 : : * Since mq->mqh_counterparty_attached is known to be true at this
992 : : * point, mq_receiver has been set, and it can't change once set.
993 : : * Therefore, we can read it without acquiring the spinlock.
994 : : */
2235 995 [ - + ]: 159434 : Assert(mqh->mqh_counterparty_attached);
996 : 159434 : SetLatch(&mq->mq_receiver->procLatch);
997 : :
998 : : /*
999 : : * We have just updated the mqh_send_pending bytes in the shared
1000 : : * memory so reset it.
1001 : : */
913 1002 : 159434 : mqh->mqh_send_pending = 0;
1003 : :
1004 : : /* Skip manipulation of our latch if nowait = true. */
3743 1005 [ + + ]: 159434 : if (nowait)
1006 : : {
1007 : 4523 : *bytes_written = sent;
1008 : 4523 : return SHM_MQ_WOULD_BLOCK;
1009 : : }
1010 : :
1011 : : /*
1012 : : * Wait for our latch to be set. It might already be set for some
1013 : : * unrelated reason, but that'll just result in one extra trip
1014 : : * through the loop. It's worth it to avoid resetting the latch
1015 : : * at top of loop, because setting an already-set latch is much
1016 : : * cheaper than setting one that has been reset.
1017 : : */
1969 tmunro@postgresql.or 1018 : 154911 : (void) WaitLatch(MyLatch, WL_LATCH_SET | WL_EXIT_ON_PM_DEATH, 0,
1019 : : WAIT_EVENT_MESSAGE_QUEUE_SEND);
1020 : :
1021 : : /* Reset the latch so we don't spin. */
3378 andres@anarazel.de 1022 : 154911 : ResetLatch(MyLatch);
1023 : :
1024 : : /* An interrupt may have occurred while we were waiting. */
2813 tgl@sss.pgh.pa.us 1025 [ - + ]: 154911 : CHECK_FOR_INTERRUPTS();
1026 : : }
1027 : : else
1028 : : {
1029 : : Size offset;
1030 : : Size sendnow;
1031 : :
2235 rhaas@postgresql.org 1032 : 2779085 : offset = wb % (uint64) ringsize;
1033 : 2779085 : sendnow = Min(available, ringsize - offset);
1034 : :
1035 : : /*
1036 : : * Write as much data as we can via a single memcpy(). Make sure
1037 : : * these writes happen after the read of mq_bytes_read, above.
1038 : : * This barrier pairs with the one in shm_mq_inc_bytes_read.
1039 : : * (Since we're separating the read of mq_bytes_read from a
1040 : : * subsequent write to mq_ring, we need a full barrier here.)
1041 : : */
1042 : 2779085 : pg_memory_barrier();
3743 1043 : 2779085 : memcpy(&mq->mq_ring[mq->mq_ring_offset + offset],
1044 : : (char *) data + sent, sendnow);
1045 : 2779085 : sent += sendnow;
1046 : :
1047 : : /*
1048 : : * Update count of bytes written, with alignment padding. Note
1049 : : * that this will never actually insert any padding except at the
1050 : : * end of a run of bytes, because the buffer size is a multiple of
1051 : : * MAXIMUM_ALIGNOF, and each read is as well.
1052 : : */
3680 1053 [ + + - + ]: 2779085 : Assert(sent == nbytes || sendnow == MAXALIGN(sendnow));
1054 : :
1055 : : /*
1056 : : * For efficiency, we don't update the bytes written in the shared
1057 : : * memory and also don't set the reader's latch here. Refer to
1058 : : * the comments atop the shm_mq_handle structure for more
1059 : : * information.
1060 : : */
913 1061 : 2779085 : mqh->mqh_send_pending += MAXALIGN(sendnow);
1062 : : }
1063 : : }
1064 : :
3743 1065 : 2609522 : *bytes_written = sent;
1066 : 2609522 : return SHM_MQ_SUCCESS;
1067 : : }
1068 : :
1069 : : /*
1070 : : * Wait until at least *nbytesp bytes are available to be read from the
1071 : : * shared message queue, or until the buffer wraps around. If the queue is
1072 : : * detached, returns SHM_MQ_DETACHED. If nowait is specified and a wait
1073 : : * would be required, returns SHM_MQ_WOULD_BLOCK. Otherwise, *datap is set
1074 : : * to the location at which data bytes can be read, *nbytesp is set to the
1075 : : * number of bytes which can be read at that address, and the return value
1076 : : * is SHM_MQ_SUCCESS.
1077 : : */
1078 : : static shm_mq_result
2235 1079 : 1831371 : shm_mq_receive_bytes(shm_mq_handle *mqh, Size bytes_needed, bool nowait,
1080 : : Size *nbytesp, void **datap)
1081 : : {
1082 : 1831371 : shm_mq *mq = mqh->mqh_queue;
3680 1083 : 1831371 : Size ringsize = mq->mq_ring_size;
1084 : : uint64 used;
1085 : : uint64 written;
1086 : :
1087 : : for (;;)
3743 1088 : 202614 : {
1089 : : Size offset;
1090 : : uint64 read;
1091 : :
1092 : : /* Get bytes written, so we can compute what's available to read. */
2235 1093 : 2033985 : written = pg_atomic_read_u64(&mq->mq_bytes_written);
1094 : :
1095 : : /*
1096 : : * Get bytes read. Include bytes we could consume but have not yet
1097 : : * consumed.
1098 : : */
1099 : 2033985 : read = pg_atomic_read_u64(&mq->mq_bytes_read) +
1100 : 2033985 : mqh->mqh_consume_pending;
1101 : 2033985 : used = written - read;
3743 1102 [ - + ]: 2033985 : Assert(used <= ringsize);
2235 1103 : 2033985 : offset = read % (uint64) ringsize;
1104 : :
1105 : : /* If we have enough data or buffer has wrapped, we're done. */
3743 1106 [ + + + + ]: 2033985 : if (used >= bytes_needed || offset + used >= ringsize)
1107 : : {
1108 : 1472210 : *nbytesp = Min(used, ringsize - offset);
1109 : 1472210 : *datap = &mq->mq_ring[mq->mq_ring_offset + offset];
1110 : :
1111 : : /*
1112 : : * Separate the read of mq_bytes_written, above, from caller's
1113 : : * attempt to read the data itself. Pairs with the barrier in
1114 : : * shm_mq_inc_bytes_written.
1115 : : */
2235 1116 : 1472210 : pg_read_barrier();
3743 1117 : 1472210 : return SHM_MQ_SUCCESS;
1118 : : }
1119 : :
1120 : : /*
1121 : : * Fall out before waiting if the queue has been detached.
1122 : : *
1123 : : * Note that we don't check for this until *after* considering whether
1124 : : * the data already available is enough, since the receiver can finish
1125 : : * receiving a message stored in the buffer even after the sender has
1126 : : * detached.
1127 : : */
2235 1128 [ + + ]: 561775 : if (mq->mq_detached)
1129 : : {
1130 : : /*
1131 : : * If the writer advanced mq_bytes_written and then set
1132 : : * mq_detached, we might not have read the final value of
1133 : : * mq_bytes_written above. Insert a read barrier and then check
1134 : : * again if mq_bytes_written has advanced.
1135 : : */
2232 1136 : 1230 : pg_read_barrier();
1137 [ - + ]: 1230 : if (written != pg_atomic_read_u64(&mq->mq_bytes_written))
2232 rhaas@postgresql.org 1138 :UBC 0 : continue;
1139 : :
3743 rhaas@postgresql.org 1140 :CBC 1230 : return SHM_MQ_DETACHED;
1141 : : }
1142 : :
1143 : : /*
1144 : : * We didn't get enough data to satisfy the request, so mark any data
1145 : : * previously-consumed as read to make more buffer space.
1146 : : */
2235 1147 [ + + ]: 560545 : if (mqh->mqh_consume_pending > 0)
1148 : : {
1149 : 184069 : shm_mq_inc_bytes_read(mq, mqh->mqh_consume_pending);
1150 : 184069 : mqh->mqh_consume_pending = 0;
1151 : : }
1152 : :
1153 : : /* Skip manipulation of our latch if nowait = true. */
3743 1154 [ + + ]: 560545 : if (nowait)
1155 : 357931 : return SHM_MQ_WOULD_BLOCK;
1156 : :
1157 : : /*
1158 : : * Wait for our latch to be set. It might already be set for some
1159 : : * unrelated reason, but that'll just result in one extra trip through
1160 : : * the loop. It's worth it to avoid resetting the latch at top of
1161 : : * loop, because setting an already-set latch is much cheaper than
1162 : : * setting one that has been reset.
1163 : : */
1969 tmunro@postgresql.or 1164 : 202614 : (void) WaitLatch(MyLatch, WL_LATCH_SET | WL_EXIT_ON_PM_DEATH, 0,
1165 : : WAIT_EVENT_MESSAGE_QUEUE_RECEIVE);
1166 : :
1167 : : /* Reset the latch so we don't spin. */
3378 andres@anarazel.de 1168 : 202614 : ResetLatch(MyLatch);
1169 : :
1170 : : /* An interrupt may have occurred while we were waiting. */
2813 tgl@sss.pgh.pa.us 1171 [ + + ]: 202614 : CHECK_FOR_INTERRUPTS();
1172 : : }
1173 : : }
1174 : :
1175 : : /*
1176 : : * Test whether a counterparty who may not even be alive yet is definitely gone.
1177 : : */
1178 : : static bool
2236 andres@anarazel.de 1179 : 435068 : shm_mq_counterparty_gone(shm_mq *mq, BackgroundWorkerHandle *handle)
1180 : : {
1181 : : pid_t pid;
1182 : :
1183 : : /* If the queue has been detached, counterparty is definitely gone. */
2235 rhaas@postgresql.org 1184 [ + + ]: 435068 : if (mq->mq_detached)
3097 1185 : 1015 : return true;
1186 : :
1187 : : /* If there's a handle, check worker status. */
1188 [ + + ]: 434053 : if (handle != NULL)
1189 : : {
1190 : : BgwHandleStatus status;
1191 : :
1192 : : /* Check for unexpected worker death. */
1193 : 434036 : status = GetBackgroundWorkerPid(handle, &pid);
1194 [ + + - + ]: 434036 : if (status != BGWH_STARTED && status != BGWH_NOT_YET_STARTED)
1195 : : {
1196 : : /* Mark it detached, just to make it official. */
3097 rhaas@postgresql.org 1197 :UBC 0 : mq->mq_detached = true;
1198 : 0 : return true;
1199 : : }
1200 : : }
1201 : :
1202 : : /* Counterparty is not definitively gone. */
3097 rhaas@postgresql.org 1203 :CBC 434053 : return false;
1204 : : }
1205 : :
1206 : : /*
1207 : : * This is used when a process is waiting for its counterpart to attach to the
1208 : : * queue. We exit when the other process attaches as expected, or, if
1209 : : * handle != NULL, when the referenced background process or the postmaster
1210 : : * dies. Note that if handle == NULL, and the process fails to attach, we'll
1211 : : * potentially get stuck here forever waiting for a process that may never
1212 : : * start. We do check for interrupts, though.
1213 : : *
1214 : : * ptr is a pointer to the memory address that we're expecting to become
1215 : : * non-NULL when our counterpart attaches to the queue.
1216 : : */
1217 : : static bool
2236 andres@anarazel.de 1218 : 34 : shm_mq_wait_internal(shm_mq *mq, PGPROC **ptr, BackgroundWorkerHandle *handle)
1219 : : {
3631 bruce@momjian.us 1220 : 34 : bool result = false;
1221 : :
1222 : : for (;;)
3743 rhaas@postgresql.org 1223 : 49 : {
1224 : : BgwHandleStatus status;
1225 : : pid_t pid;
1226 : :
1227 : : /* Acquire the lock just long enough to check the pointer. */
3110 1228 [ - + ]: 83 : SpinLockAcquire(&mq->mq_mutex);
1229 : 83 : result = (*ptr != NULL);
1230 : 83 : SpinLockRelease(&mq->mq_mutex);
1231 : :
1232 : : /* Fail if detached; else succeed if initialized. */
2235 1233 [ + + ]: 83 : if (mq->mq_detached)
1234 : : {
3110 1235 : 4 : result = false;
1236 : 4 : break;
1237 : : }
1238 [ + + ]: 79 : if (result)
1239 : 30 : break;
1240 : :
1241 [ + - ]: 49 : if (handle != NULL)
1242 : : {
1243 : : /* Check for unexpected worker death. */
1244 : 49 : status = GetBackgroundWorkerPid(handle, &pid);
1245 [ + + - + ]: 49 : if (status != BGWH_STARTED && status != BGWH_NOT_YET_STARTED)
1246 : : {
3743 rhaas@postgresql.org 1247 :UBC 0 : result = false;
1248 : 0 : break;
1249 : : }
1250 : : }
1251 : :
1252 : : /* Wait to be signaled. */
1969 tmunro@postgresql.or 1253 :CBC 49 : (void) WaitLatch(MyLatch, WL_LATCH_SET | WL_EXIT_ON_PM_DEATH, 0,
1254 : : WAIT_EVENT_MESSAGE_QUEUE_INTERNAL);
1255 : :
1256 : : /* Reset the latch so we don't spin. */
3110 rhaas@postgresql.org 1257 : 49 : ResetLatch(MyLatch);
1258 : :
1259 : : /* An interrupt may have occurred while we were waiting. */
2813 tgl@sss.pgh.pa.us 1260 [ - + ]: 49 : CHECK_FOR_INTERRUPTS();
1261 : : }
1262 : :
3743 rhaas@postgresql.org 1263 : 34 : return result;
1264 : : }
1265 : :
1266 : : /*
1267 : : * Increment the number of bytes read.
1268 : : */
1269 : : static void
2236 andres@anarazel.de 1270 : 202896 : shm_mq_inc_bytes_read(shm_mq *mq, Size n)
1271 : : {
1272 : : PGPROC *sender;
1273 : :
1274 : : /*
1275 : : * Separate prior reads of mq_ring from the increment of mq_bytes_read
1276 : : * which follows. This pairs with the full barrier in
1277 : : * shm_mq_send_bytes(). We only need a read barrier here because the
1278 : : * increment of mq_bytes_read is actually a read followed by a dependent
1279 : : * write.
1280 : : */
2235 rhaas@postgresql.org 1281 : 202896 : pg_read_barrier();
1282 : :
1283 : : /*
1284 : : * There's no need to use pg_atomic_fetch_add_u64 here, because nobody
1285 : : * else can be changing this value. This method should be cheaper.
1286 : : */
1287 : 202896 : pg_atomic_write_u64(&mq->mq_bytes_read,
1288 : 202896 : pg_atomic_read_u64(&mq->mq_bytes_read) + n);
1289 : :
1290 : : /*
1291 : : * We shouldn't have any bytes to read without a sender, so we can read
1292 : : * mq_sender here without a lock. Once it's initialized, it can't change.
1293 : : */
3743 1294 : 202896 : sender = mq->mq_sender;
1295 [ - + ]: 202896 : Assert(sender != NULL);
1296 : 202896 : SetLatch(&sender->procLatch);
1297 : 202896 : }
1298 : :
1299 : : /*
1300 : : * Increment the number of bytes written.
1301 : : */
1302 : : static void
2236 andres@anarazel.de 1303 : 281892 : shm_mq_inc_bytes_written(shm_mq *mq, Size n)
1304 : : {
1305 : : /*
1306 : : * Separate prior reads of mq_ring from the write of mq_bytes_written
1307 : : * which we're about to do. Pairs with the read barrier found in
1308 : : * shm_mq_receive_bytes.
1309 : : */
2235 rhaas@postgresql.org 1310 : 281892 : pg_write_barrier();
1311 : :
1312 : : /*
1313 : : * There's no need to use pg_atomic_fetch_add_u64 here, because nobody
1314 : : * else can be changing this value. This method avoids taking the bus
1315 : : * lock unnecessarily.
1316 : : */
1317 : 281892 : pg_atomic_write_u64(&mq->mq_bytes_written,
1318 : 281892 : pg_atomic_read_u64(&mq->mq_bytes_written) + n);
3743 1319 : 281892 : }
1320 : :
1321 : : /* Shim for on_dsm_detach callback. */
1322 : : static void
1323 : 1415 : shm_mq_detach_callback(dsm_segment *seg, Datum arg)
1324 : : {
1325 : 1415 : shm_mq *mq = (shm_mq *) DatumGetPointer(arg);
1326 : :
2418 tgl@sss.pgh.pa.us 1327 : 1415 : shm_mq_detach_internal(mq);
3743 rhaas@postgresql.org 1328 : 1415 : }
|