Age Owner TLA Line data Source code
1 : /*-------------------------------------------------------------------------
2 : *
3 : * shm_mq.c
4 : * single-reader, single-writer shared memory message queue
5 : *
6 : * Both the sender and the receiver must have a PGPROC; their respective
7 : * process latches are used for synchronization. Only the sender may send,
8 : * and only the receiver may receive. This is intended to allow a user
9 : * backend to communicate with worker backends that it has registered.
10 : *
11 : * Portions Copyright (c) 1996-2023, PostgreSQL Global Development Group
12 : * Portions Copyright (c) 1994, Regents of the University of California
13 : *
14 : * src/backend/storage/ipc/shm_mq.c
15 : *
16 : *-------------------------------------------------------------------------
17 : */
18 :
19 : #include "postgres.h"
20 :
21 : #include "miscadmin.h"
22 : #include "pgstat.h"
23 : #include "port/pg_bitutils.h"
24 : #include "postmaster/bgworker.h"
25 : #include "storage/procsignal.h"
26 : #include "storage/shm_mq.h"
27 : #include "storage/spin.h"
28 : #include "utils/memutils.h"
29 :
30 : /*
31 : * This structure represents the actual queue, stored in shared memory.
32 : *
33 : * Some notes on synchronization:
34 : *
35 : * mq_receiver and mq_bytes_read can only be changed by the receiver; and
36 : * mq_sender and mq_bytes_written can only be changed by the sender.
37 : * mq_receiver and mq_sender are protected by mq_mutex, although, importantly,
38 : * they cannot change once set, and thus may be read without a lock once this
39 : * is known to be the case.
40 : *
41 : * mq_bytes_read and mq_bytes_written are not protected by the mutex. Instead,
42 : * they are written atomically using 8 byte loads and stores. Memory barriers
43 : * must be carefully used to synchronize reads and writes of these values with
44 : * reads and writes of the actual data in mq_ring.
45 : *
46 : * mq_detached needs no locking. It can be set by either the sender or the
47 : * receiver, but only ever from false to true, so redundant writes don't
48 : * matter. It is important that if we set mq_detached and then set the
49 : * counterparty's latch, the counterparty must be certain to see the change
50 : * after waking up. Since SetLatch begins with a memory barrier and ResetLatch
51 : * ends with one, this should be OK.
52 : *
53 : * mq_ring_size and mq_ring_offset never change after initialization, and
54 : * can therefore be read without the lock.
55 : *
56 : * Importantly, mq_ring can be safely read and written without a lock.
57 : * At any given time, the difference between mq_bytes_read and
58 : * mq_bytes_written defines the number of bytes within mq_ring that contain
59 : * unread data, and mq_bytes_read defines the position where those bytes
60 : * begin. The sender can increase the number of unread bytes at any time,
61 : * but only the receiver can give license to overwrite those bytes, by
62 : * incrementing mq_bytes_read. Therefore, it's safe for the receiver to read
63 : * the unread bytes it knows to be present without the lock. Conversely,
64 : * the sender can write to the unused portion of the ring buffer without
65 : * the lock, because nobody else can be reading or writing those bytes. The
66 : * receiver could be making more bytes unused by incrementing mq_bytes_read,
67 : * but that's OK. Note that it would be unsafe for the receiver to read any
68 : * data it's already marked as read, or to write any data; and it would be
69 : * unsafe for the sender to reread any data after incrementing
70 : * mq_bytes_written, but fortunately there's no need for any of that.
71 : */
72 : struct shm_mq
73 : {
74 : slock_t mq_mutex;
75 : PGPROC *mq_receiver;
76 : PGPROC *mq_sender;
77 : pg_atomic_uint64 mq_bytes_read;
78 : pg_atomic_uint64 mq_bytes_written;
79 : Size mq_ring_size;
80 : bool mq_detached;
81 : uint8 mq_ring_offset;
82 : char mq_ring[FLEXIBLE_ARRAY_MEMBER];
83 : };
84 :
85 : /*
86 : * This structure is a backend-private handle for access to a queue.
87 : *
88 : * mqh_queue is a pointer to the queue we've attached, and mqh_segment is
89 : * an optional pointer to the dynamic shared memory segment that contains it.
90 : * (If mqh_segment is provided, we register an on_dsm_detach callback to
91 : * make sure we detach from the queue before detaching from DSM.)
92 : *
93 : * If this queue is intended to connect the current process with a background
94 : * worker that started it, the user can pass a pointer to the worker handle
95 : * to shm_mq_attach(), and we'll store it in mqh_handle. The point of this
96 : * is to allow us to begin sending to or receiving from that queue before the
97 : * process we'll be communicating with has even been started. If it fails
98 : * to start, the handle will allow us to notice that and fail cleanly, rather
99 : * than waiting forever; see shm_mq_wait_internal. This is mostly useful in
100 : * simple cases - e.g. where there are just 2 processes communicating; in
101 : * more complex scenarios, every process may not have a BackgroundWorkerHandle
102 : * available, or may need to watch for the failure of more than one other
103 : * process at a time.
104 : *
105 : * When a message exists as a contiguous chunk of bytes in the queue - that is,
106 : * it is smaller than the size of the ring buffer and does not wrap around
107 : * the end - we return the message to the caller as a pointer into the buffer.
108 : * For messages that are larger or happen to wrap, we reassemble the message
109 : * locally by copying the chunks into a backend-local buffer. mqh_buffer is
110 : * the buffer, and mqh_buflen is the number of bytes allocated for it.
111 : *
112 : * mqh_send_pending, is number of bytes that is written to the queue but not
113 : * yet updated in the shared memory. We will not update it until the written
114 : * data is 1/4th of the ring size or the tuple queue is full. This will
115 : * prevent frequent CPU cache misses, and it will also avoid frequent
116 : * SetLatch() calls, which are quite expensive.
117 : *
118 : * mqh_partial_bytes, mqh_expected_bytes, and mqh_length_word_complete
119 : * are used to track the state of non-blocking operations. When the caller
120 : * attempts a non-blocking operation that returns SHM_MQ_WOULD_BLOCK, they
121 : * are expected to retry the call at a later time with the same argument;
122 : * we need to retain enough state to pick up where we left off.
123 : * mqh_length_word_complete tracks whether we are done sending or receiving
124 : * (whichever we're doing) the entire length word. mqh_partial_bytes tracks
125 : * the number of bytes read or written for either the length word or the
126 : * message itself, and mqh_expected_bytes - which is used only for reads -
127 : * tracks the expected total size of the payload.
128 : *
129 : * mqh_counterparty_attached tracks whether we know the counterparty to have
130 : * attached to the queue at some previous point. This lets us avoid some
131 : * mutex acquisitions.
132 : *
133 : * mqh_context is the memory context in effect at the time we attached to
134 : * the shm_mq. The shm_mq_handle itself is allocated in this context, and
135 : * we make sure any other allocations we do happen in this context as well,
136 : * to avoid nasty surprises.
137 : */
138 : struct shm_mq_handle
139 : {
140 : shm_mq *mqh_queue;
141 : dsm_segment *mqh_segment;
142 : BackgroundWorkerHandle *mqh_handle;
143 : char *mqh_buffer;
144 : Size mqh_buflen;
145 : Size mqh_consume_pending;
146 : Size mqh_send_pending;
147 : Size mqh_partial_bytes;
148 : Size mqh_expected_bytes;
149 : bool mqh_length_word_complete;
150 : bool mqh_counterparty_attached;
151 : MemoryContext mqh_context;
152 : };
153 :
154 : static void shm_mq_detach_internal(shm_mq *mq);
155 : static shm_mq_result shm_mq_send_bytes(shm_mq_handle *mqh, Size nbytes,
156 : const void *data, bool nowait, Size *bytes_written);
157 : static shm_mq_result shm_mq_receive_bytes(shm_mq_handle *mqh,
158 : Size bytes_needed, bool nowait, Size *nbytesp,
159 : void **datap);
160 : static bool shm_mq_counterparty_gone(shm_mq *mq,
161 : BackgroundWorkerHandle *handle);
162 : static bool shm_mq_wait_internal(shm_mq *mq, PGPROC **ptr,
163 : BackgroundWorkerHandle *handle);
164 : static void shm_mq_inc_bytes_read(shm_mq *mq, Size n);
165 : static void shm_mq_inc_bytes_written(shm_mq *mq, Size n);
166 : static void shm_mq_detach_callback(dsm_segment *seg, Datum arg);
167 :
168 : /* Minimum queue size is enough for header and at least one chunk of data. */
169 : const Size shm_mq_minimum_size =
170 : MAXALIGN(offsetof(shm_mq, mq_ring)) + MAXIMUM_ALIGNOF;
171 :
172 : #define MQH_INITIAL_BUFSIZE 8192
173 :
174 : /*
175 : * Initialize a new shared message queue.
176 : */
177 : shm_mq *
3372 rhaas 178 CBC 2618 : shm_mq_create(void *address, Size size)
179 : {
180 2618 : shm_mq *mq = address;
3309 181 2618 : Size data_offset = MAXALIGN(offsetof(shm_mq, mq_ring));
182 :
183 : /* If the size isn't MAXALIGN'd, just discard the odd bytes. */
3372 184 2618 : size = MAXALIGN_DOWN(size);
185 :
186 : /* Queue size must be large enough to hold some data. */
187 2618 : Assert(size > data_offset);
188 :
189 : /* Initialize queue header. */
190 2618 : SpinLockInit(&mq->mq_mutex);
191 2618 : mq->mq_receiver = NULL;
192 2618 : mq->mq_sender = NULL;
1864 193 2618 : pg_atomic_init_u64(&mq->mq_bytes_read, 0);
194 2618 : pg_atomic_init_u64(&mq->mq_bytes_written, 0);
3372 195 2618 : mq->mq_ring_size = size - data_offset;
196 2618 : mq->mq_detached = false;
197 2618 : mq->mq_ring_offset = data_offset - offsetof(shm_mq, mq_ring);
198 :
199 2618 : return mq;
200 : }
201 :
202 : /*
203 : * Set the identity of the process that will receive from a shared message
204 : * queue.
205 : */
206 : void
207 2618 : shm_mq_set_receiver(shm_mq *mq, PGPROC *proc)
208 : {
209 : PGPROC *sender;
210 :
211 2618 : SpinLockAcquire(&mq->mq_mutex);
1865 andres 212 2618 : Assert(mq->mq_receiver == NULL);
213 2618 : mq->mq_receiver = proc;
214 2618 : sender = mq->mq_sender;
3372 rhaas 215 2618 : SpinLockRelease(&mq->mq_mutex);
216 :
217 2618 : if (sender != NULL)
218 17 : SetLatch(&sender->procLatch);
219 2618 : }
220 :
221 : /*
222 : * Set the identity of the process that will send to a shared message queue.
223 : */
224 : void
225 2540 : shm_mq_set_sender(shm_mq *mq, PGPROC *proc)
226 : {
227 : PGPROC *receiver;
228 :
229 2540 : SpinLockAcquire(&mq->mq_mutex);
1865 andres 230 2540 : Assert(mq->mq_sender == NULL);
231 2540 : mq->mq_sender = proc;
232 2540 : receiver = mq->mq_receiver;
3372 rhaas 233 2540 : SpinLockRelease(&mq->mq_mutex);
234 :
235 2540 : if (receiver != NULL)
236 2523 : SetLatch(&receiver->procLatch);
237 2540 : }
238 :
239 : /*
240 : * Get the configured receiver.
241 : */
242 : PGPROC *
243 1 : shm_mq_get_receiver(shm_mq *mq)
244 : {
245 : PGPROC *receiver;
246 :
247 1 : SpinLockAcquire(&mq->mq_mutex);
1865 andres 248 1 : receiver = mq->mq_receiver;
3372 rhaas 249 1 : SpinLockRelease(&mq->mq_mutex);
250 :
251 1 : return receiver;
252 : }
253 :
254 : /*
255 : * Get the configured sender.
256 : */
257 : PGPROC *
258 6687588 : shm_mq_get_sender(shm_mq *mq)
259 : {
260 : PGPROC *sender;
261 :
262 6687588 : SpinLockAcquire(&mq->mq_mutex);
1865 andres 263 6687588 : sender = mq->mq_sender;
3372 rhaas 264 6687588 : SpinLockRelease(&mq->mq_mutex);
265 :
266 6687588 : return sender;
267 : }
268 :
269 : /*
270 : * Attach to a shared message queue so we can send or receive messages.
271 : *
272 : * The memory context in effect at the time this function is called should
273 : * be one which will last for at least as long as the message queue itself.
274 : * We'll allocate the handle in that context, and future allocations that
275 : * are needed to buffer incoming data will happen in that context as well.
276 : *
277 : * If seg != NULL, the queue will be automatically detached when that dynamic
278 : * shared memory segment is detached.
279 : *
280 : * If handle != NULL, the queue can be read or written even before the
281 : * other process has attached. We'll wait for it to do so if needed. The
282 : * handle must be for a background worker initialized with bgw_notify_pid
283 : * equal to our PID.
284 : *
285 : * shm_mq_detach() should be called when done. This will free the
286 : * shm_mq_handle and mark the queue itself as detached, so that our
287 : * counterpart won't get stuck waiting for us to fill or drain the queue
288 : * after we've already lost interest.
289 : */
290 : shm_mq_handle *
291 5158 : shm_mq_attach(shm_mq *mq, dsm_segment *seg, BackgroundWorkerHandle *handle)
292 : {
3260 bruce 293 5158 : shm_mq_handle *mqh = palloc(sizeof(shm_mq_handle));
294 :
3372 rhaas 295 5158 : Assert(mq->mq_receiver == MyProc || mq->mq_sender == MyProc);
296 5158 : mqh->mqh_queue = mq;
297 5158 : mqh->mqh_segment = seg;
298 5158 : mqh->mqh_handle = handle;
2047 tgl 299 5158 : mqh->mqh_buffer = NULL;
3372 rhaas 300 5158 : mqh->mqh_buflen = 0;
301 5158 : mqh->mqh_consume_pending = 0;
542 302 5158 : mqh->mqh_send_pending = 0;
3309 303 5158 : mqh->mqh_partial_bytes = 0;
2047 tgl 304 5158 : mqh->mqh_expected_bytes = 0;
3309 rhaas 305 5158 : mqh->mqh_length_word_complete = false;
3372 306 5158 : mqh->mqh_counterparty_attached = false;
2047 tgl 307 5158 : mqh->mqh_context = CurrentMemoryContext;
308 :
3372 rhaas 309 5158 : if (seg != NULL)
310 5158 : on_dsm_detach(seg, shm_mq_detach_callback, PointerGetDatum(mq));
311 :
312 5158 : return mqh;
313 : }
314 :
315 : /*
316 : * Associate a BackgroundWorkerHandle with a shm_mq_handle just as if it had
317 : * been passed to shm_mq_attach.
318 : */
319 : void
3105 320 2508 : shm_mq_set_handle(shm_mq_handle *mqh, BackgroundWorkerHandle *handle)
321 : {
322 2508 : Assert(mqh->mqh_handle == NULL);
323 2508 : mqh->mqh_handle = handle;
324 2508 : }
325 :
326 : /*
327 : * Write a message into a shared message queue.
328 : */
329 : shm_mq_result
542 330 843092 : shm_mq_send(shm_mq_handle *mqh, Size nbytes, const void *data, bool nowait,
331 : bool force_flush)
332 : {
333 : shm_mq_iovec iov;
334 :
3105 335 843092 : iov.data = data;
336 843092 : iov.len = nbytes;
337 :
542 338 843092 : return shm_mq_sendv(mqh, &iov, 1, nowait, force_flush);
339 : }
340 :
341 : /*
342 : * Write a message into a shared message queue, gathered from multiple
343 : * addresses.
344 : *
345 : * When nowait = false, we'll wait on our process latch when the ring buffer
346 : * fills up, and then continue writing once the receiver has drained some data.
347 : * The process latch is reset after each wait.
348 : *
349 : * When nowait = true, we do not manipulate the state of the process latch;
350 : * instead, if the buffer becomes full, we return SHM_MQ_WOULD_BLOCK. In
351 : * this case, the caller should call this function again, with the same
352 : * arguments, each time the process latch is set. (Once begun, the sending
353 : * of a message cannot be aborted except by detaching from the queue; changing
354 : * the length or payload will corrupt the queue.)
355 : *
356 : * When force_flush = true, we immediately update the shm_mq's mq_bytes_written
357 : * and notify the receiver (if it is already attached). Otherwise, we don't
358 : * update it until we have written an amount of data greater than 1/4th of the
359 : * ring size.
360 : */
361 : shm_mq_result
362 845698 : shm_mq_sendv(shm_mq_handle *mqh, shm_mq_iovec *iov, int iovcnt, bool nowait,
363 : bool force_flush)
364 : {
365 : shm_mq_result res;
3260 bruce 366 845698 : shm_mq *mq = mqh->mqh_queue;
367 : PGPROC *receiver;
3105 rhaas 368 845698 : Size nbytes = 0;
369 : Size bytes_written;
370 : int i;
371 845698 : int which_iov = 0;
372 : Size offset;
373 :
3372 374 845698 : Assert(mq->mq_sender == MyProc);
375 :
376 : /* Compute total size of write. */
3105 377 1694002 : for (i = 0; i < iovcnt; ++i)
378 848304 : nbytes += iov[i].len;
379 :
380 : /* Prevent writing messages overwhelming the receiver. */
902 peter 381 845698 : if (nbytes > MaxAllocSize)
902 peter 382 UBC 0 : ereport(ERROR,
383 : (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
384 : errmsg("cannot send a message of size %zu via shared memory queue",
385 : nbytes)));
386 :
387 : /* Try to write, or finish writing, the length word into the buffer. */
3309 rhaas 388 CBC 1687019 : while (!mqh->mqh_length_word_complete)
389 : {
390 841324 : Assert(mqh->mqh_partial_bytes < sizeof(Size));
391 841324 : res = shm_mq_send_bytes(mqh, sizeof(Size) - mqh->mqh_partial_bytes,
2118 tgl 392 841324 : ((char *) &nbytes) + mqh->mqh_partial_bytes,
393 : nowait, &bytes_written);
394 :
2498 rhaas 395 841324 : if (res == SHM_MQ_DETACHED)
396 : {
397 : /* Reset state in case caller tries to send another message. */
398 3 : mqh->mqh_partial_bytes = 0;
399 3 : mqh->mqh_length_word_complete = false;
3372 400 3 : return res;
401 : }
2498 402 841321 : mqh->mqh_partial_bytes += bytes_written;
403 :
3309 404 841321 : if (mqh->mqh_partial_bytes >= sizeof(Size))
405 : {
406 841321 : Assert(mqh->mqh_partial_bytes == sizeof(Size));
407 :
408 841321 : mqh->mqh_partial_bytes = 0;
409 841321 : mqh->mqh_length_word_complete = true;
410 : }
411 :
2498 412 841321 : if (res != SHM_MQ_SUCCESS)
2498 rhaas 413 UBC 0 : return res;
414 :
415 : /* Length word can't be split unless bigger than required alignment. */
3309 rhaas 416 CBC 841321 : Assert(mqh->mqh_length_word_complete || sizeof(Size) > MAXIMUM_ALIGNOF);
417 : }
418 :
419 : /* Write the actual data bytes into the buffer. */
420 845695 : Assert(mqh->mqh_partial_bytes <= nbytes);
3105 421 845695 : offset = mqh->mqh_partial_bytes;
422 : do
423 : {
424 : Size chunksize;
425 :
426 : /* Figure out which bytes need to be sent next. */
427 847003 : if (offset >= iov[which_iov].len)
428 : {
429 4005 : offset -= iov[which_iov].len;
430 4005 : ++which_iov;
431 4005 : if (which_iov >= iovcnt)
432 4000 : break;
433 5 : continue;
434 : }
435 :
436 : /*
437 : * We want to avoid copying the data if at all possible, but every
438 : * chunk of bytes we write into the queue has to be MAXALIGN'd, except
439 : * the last. Thus, if a chunk other than the last one ends on a
440 : * non-MAXALIGN'd boundary, we have to combine the tail end of its
441 : * data with data from one or more following chunks until we either
442 : * reach the last chunk or accumulate a number of bytes which is
443 : * MAXALIGN'd.
444 : */
445 842998 : if (which_iov + 1 < iovcnt &&
446 2598 : offset + MAXIMUM_ALIGNOF > iov[which_iov].len)
447 2598 : {
448 : char tmpbuf[MAXIMUM_ALIGNOF];
2878 bruce 449 2598 : int j = 0;
450 :
451 : for (;;)
452 : {
3105 rhaas 453 15612 : if (offset < iov[which_iov].len)
454 : {
455 11719 : tmpbuf[j] = iov[which_iov].data[offset];
456 11719 : j++;
457 11719 : offset++;
458 11719 : if (j == MAXIMUM_ALIGNOF)
459 1303 : break;
460 : }
461 : else
462 : {
463 3893 : offset -= iov[which_iov].len;
464 3893 : which_iov++;
465 3893 : if (which_iov >= iovcnt)
466 1295 : break;
467 : }
468 : }
469 :
470 2598 : res = shm_mq_send_bytes(mqh, j, tmpbuf, nowait, &bytes_written);
471 :
2498 472 2598 : if (res == SHM_MQ_DETACHED)
473 : {
474 : /* Reset state in case caller tries to send another message. */
2498 rhaas 475 UBC 0 : mqh->mqh_partial_bytes = 0;
476 0 : mqh->mqh_length_word_complete = false;
477 0 : return res;
478 : }
479 :
3105 rhaas 480 CBC 2598 : mqh->mqh_partial_bytes += bytes_written;
481 2598 : if (res != SHM_MQ_SUCCESS)
3105 rhaas 482 UBC 0 : return res;
3105 rhaas 483 CBC 2598 : continue;
484 : }
485 :
486 : /*
487 : * If this is the last chunk, we can write all the data, even if it
488 : * isn't a multiple of MAXIMUM_ALIGNOF. Otherwise, we need to
489 : * MAXALIGN_DOWN the write size.
490 : */
491 840400 : chunksize = iov[which_iov].len - offset;
492 840400 : if (which_iov + 1 < iovcnt)
3105 rhaas 493 UBC 0 : chunksize = MAXALIGN_DOWN(chunksize);
3105 rhaas 494 CBC 840400 : res = shm_mq_send_bytes(mqh, chunksize, &iov[which_iov].data[offset],
495 : nowait, &bytes_written);
496 :
2498 497 840400 : if (res == SHM_MQ_DETACHED)
498 : {
499 : /* Reset state in case caller tries to send another message. */
2498 rhaas 500 UBC 0 : mqh->mqh_length_word_complete = false;
501 0 : mqh->mqh_partial_bytes = 0;
502 0 : return res;
503 : }
504 :
3105 rhaas 505 CBC 840400 : mqh->mqh_partial_bytes += bytes_written;
506 840400 : offset += bytes_written;
507 840400 : if (res != SHM_MQ_SUCCESS)
508 4374 : return res;
509 838629 : } while (mqh->mqh_partial_bytes < nbytes);
510 :
511 : /* Reset for next message. */
512 841321 : mqh->mqh_partial_bytes = 0;
513 841321 : mqh->mqh_length_word_complete = false;
514 :
515 : /* If queue has been detached, let caller know. */
1864 516 841321 : if (mq->mq_detached)
1864 rhaas 517 UBC 0 : return SHM_MQ_DETACHED;
518 :
519 : /*
520 : * If the counterparty is known to have attached, we can read mq_receiver
521 : * without acquiring the spinlock. Otherwise, more caution is needed.
522 : */
1864 rhaas 523 CBC 841321 : if (mqh->mqh_counterparty_attached)
524 839353 : receiver = mq->mq_receiver;
525 : else
526 : {
527 1968 : SpinLockAcquire(&mq->mq_mutex);
528 1968 : receiver = mq->mq_receiver;
529 1968 : SpinLockRelease(&mq->mq_mutex);
320 530 1968 : if (receiver != NULL)
531 1968 : mqh->mqh_counterparty_attached = true;
532 : }
533 :
534 : /*
535 : * If the caller has requested force flush or we have written more than
536 : * 1/4 of the ring size, mark it as written in shared memory and notify
537 : * the receiver.
538 : */
542 539 841321 : if (force_flush || mqh->mqh_send_pending > (mq->mq_ring_size >> 2))
540 : {
541 121586 : shm_mq_inc_bytes_written(mq, mqh->mqh_send_pending);
320 542 121586 : if (receiver != NULL)
543 121586 : SetLatch(&receiver->procLatch);
542 544 121586 : mqh->mqh_send_pending = 0;
545 : }
546 :
1864 547 841321 : return SHM_MQ_SUCCESS;
548 : }
549 :
550 : /*
551 : * Receive a message from a shared message queue.
552 : *
553 : * We set *nbytes to the message length and *data to point to the message
554 : * payload. If the entire message exists in the queue as a single,
555 : * contiguous chunk, *data will point directly into shared memory; otherwise,
556 : * it will point to a temporary buffer. This mostly avoids data copying in
557 : * the hoped-for case where messages are short compared to the buffer size,
558 : * while still allowing longer messages. In either case, the return value
559 : * remains valid until the next receive operation is performed on the queue.
560 : *
561 : * When nowait = false, we'll wait on our process latch when the ring buffer
562 : * is empty and we have not yet received a full message. The sender will
563 : * set our process latch after more data has been written, and we'll resume
564 : * processing. Each call will therefore return a complete message
565 : * (unless the sender detaches the queue).
566 : *
567 : * When nowait = true, we do not manipulate the state of the process latch;
568 : * instead, whenever the buffer is empty and we need to read from it, we
569 : * return SHM_MQ_WOULD_BLOCK. In this case, the caller should call this
570 : * function again after the process latch has been set.
571 : */
572 : shm_mq_result
3309 573 3463833 : shm_mq_receive(shm_mq_handle *mqh, Size *nbytesp, void **datap, bool nowait)
574 : {
3260 bruce 575 3463833 : shm_mq *mq = mqh->mqh_queue;
576 : shm_mq_result res;
577 3463833 : Size rb = 0;
578 : Size nbytes;
579 : void *rawdata;
580 :
3372 rhaas 581 3463833 : Assert(mq->mq_receiver == MyProc);
582 :
583 : /* We can't receive data until the sender has attached. */
584 3463833 : if (!mqh->mqh_counterparty_attached)
585 : {
586 2493927 : if (nowait)
587 : {
588 : int counterparty_gone;
589 :
590 : /*
591 : * We shouldn't return at this point at all unless the sender
592 : * hasn't attached yet. However, the correct return value depends
593 : * on whether the sender is still attached. If we first test
594 : * whether the sender has ever attached and then test whether the
595 : * sender has detached, there's a race condition: a sender that
596 : * attaches and detaches very quickly might fool us into thinking
597 : * the sender never attached at all. So, test whether our
598 : * counterparty is definitively gone first, and only afterwards
599 : * check whether the sender ever attached in the first place.
600 : */
2714 601 2493762 : counterparty_gone = shm_mq_counterparty_gone(mq, mqh->mqh_handle);
3372 602 2493762 : if (shm_mq_get_sender(mq) == NULL)
603 : {
2714 604 2491396 : if (counterparty_gone)
2726 rhaas 605 UBC 0 : return SHM_MQ_DETACHED;
606 : else
2714 rhaas 607 CBC 2491396 : return SHM_MQ_WOULD_BLOCK;
608 : }
609 : }
3266 610 165 : else if (!shm_mq_wait_internal(mq, &mq->mq_sender, mqh->mqh_handle)
611 84 : && shm_mq_get_sender(mq) == NULL)
612 : {
3372 rhaas 613 UBC 0 : mq->mq_detached = true;
614 0 : return SHM_MQ_DETACHED;
615 : }
3372 rhaas 616 CBC 2531 : mqh->mqh_counterparty_attached = true;
617 : }
618 :
619 : /*
620 : * If we've consumed an amount of data greater than 1/4th of the ring
621 : * size, mark it consumed in shared memory. We try to avoid doing this
622 : * unnecessarily when only a small amount of data has been consumed,
623 : * because SetLatch() is fairly expensive and we don't want to do it too
624 : * often.
625 : */
1864 626 972437 : if (mqh->mqh_consume_pending > mq->mq_ring_size / 4)
627 : {
3372 628 18922 : shm_mq_inc_bytes_read(mq, mqh->mqh_consume_pending);
629 18922 : mqh->mqh_consume_pending = 0;
630 : }
631 :
632 : /* Try to read, or finish reading, the length word from the buffer. */
3309 633 994119 : while (!mqh->mqh_length_word_complete)
634 : {
635 : /* Try to receive the message length word. */
636 967963 : Assert(mqh->mqh_partial_bytes < sizeof(Size));
1864 637 967963 : res = shm_mq_receive_bytes(mqh, sizeof(Size) - mqh->mqh_partial_bytes,
638 : nowait, &rb, &rawdata);
3372 639 967963 : if (res != SHM_MQ_SUCCESS)
640 131664 : return res;
641 :
642 : /*
643 : * Hopefully, we'll receive the entire message length word at once.
644 : * But if sizeof(Size) > MAXIMUM_ALIGNOF, then it might be split over
645 : * multiple reads.
646 : */
3309 647 836299 : if (mqh->mqh_partial_bytes == 0 && rb >= sizeof(Size))
3372 648 21682 : {
649 : Size needed;
650 :
3260 bruce 651 836299 : nbytes = *(Size *) rawdata;
652 :
653 : /* If we've already got the whole message, we're done. */
3309 rhaas 654 836299 : needed = MAXALIGN(sizeof(Size)) + MAXALIGN(nbytes);
655 836299 : if (rb >= needed)
656 : {
1864 657 814617 : mqh->mqh_consume_pending += needed;
3309 658 814617 : *nbytesp = nbytes;
659 814617 : *datap = ((char *) rawdata) + MAXALIGN(sizeof(Size));
660 814617 : return SHM_MQ_SUCCESS;
661 : }
662 :
663 : /*
664 : * We don't have the whole message, but we at least have the whole
665 : * length word.
666 : */
667 21682 : mqh->mqh_expected_bytes = nbytes;
668 21682 : mqh->mqh_length_word_complete = true;
1864 669 21682 : mqh->mqh_consume_pending += MAXALIGN(sizeof(Size));
3309 670 21682 : rb -= MAXALIGN(sizeof(Size));
671 : }
672 : else
673 : {
674 : Size lengthbytes;
675 :
676 : /* Can't be split unless bigger than required alignment. */
3309 rhaas 677 UBC 0 : Assert(sizeof(Size) > MAXIMUM_ALIGNOF);
678 :
679 : /* Message word is split; need buffer to reassemble. */
680 : if (mqh->mqh_buffer == NULL)
681 : {
682 : mqh->mqh_buffer = MemoryContextAlloc(mqh->mqh_context,
683 : MQH_INITIAL_BUFSIZE);
684 : mqh->mqh_buflen = MQH_INITIAL_BUFSIZE;
685 : }
686 : Assert(mqh->mqh_buflen >= sizeof(Size));
687 :
688 : /* Copy partial length word; remember to consume it. */
689 : if (mqh->mqh_partial_bytes + rb > sizeof(Size))
690 : lengthbytes = sizeof(Size) - mqh->mqh_partial_bytes;
691 : else
692 : lengthbytes = rb;
693 : memcpy(&mqh->mqh_buffer[mqh->mqh_partial_bytes], rawdata,
694 : lengthbytes);
695 : mqh->mqh_partial_bytes += lengthbytes;
696 : mqh->mqh_consume_pending += MAXALIGN(lengthbytes);
697 : rb -= lengthbytes;
698 :
699 : /* If we now have the whole word, we're ready to read payload. */
700 : if (mqh->mqh_partial_bytes >= sizeof(Size))
701 : {
702 : Assert(mqh->mqh_partial_bytes == sizeof(Size));
703 : mqh->mqh_expected_bytes = *(Size *) mqh->mqh_buffer;
704 : mqh->mqh_length_word_complete = true;
705 : mqh->mqh_partial_bytes = 0;
706 : }
707 : }
708 : }
3309 rhaas 709 CBC 26156 : nbytes = mqh->mqh_expected_bytes;
710 :
711 : /*
712 : * Should be disallowed on the sending side already, but better check and
713 : * error out on the receiver side as well rather than trying to read a
714 : * prohibitively large message.
715 : */
902 peter 716 26156 : if (nbytes > MaxAllocSize)
902 peter 717 UBC 0 : ereport(ERROR,
718 : (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
719 : errmsg("invalid message size %zu in shared memory queue",
720 : nbytes)));
721 :
3309 rhaas 722 CBC 26156 : if (mqh->mqh_partial_bytes == 0)
723 : {
724 : /*
725 : * Try to obtain the whole message in a single chunk. If this works,
726 : * we need not copy the data and can return a pointer directly into
727 : * shared memory.
728 : */
1864 729 21804 : res = shm_mq_receive_bytes(mqh, nbytes, nowait, &rb, &rawdata);
3372 730 21804 : if (res != SHM_MQ_SUCCESS)
731 122 : return res;
732 21682 : if (rb >= nbytes)
733 : {
3309 734 163 : mqh->mqh_length_word_complete = false;
1864 735 163 : mqh->mqh_consume_pending += MAXALIGN(nbytes);
3372 736 163 : *nbytesp = nbytes;
737 163 : *datap = rawdata;
738 163 : return SHM_MQ_SUCCESS;
739 : }
740 :
741 : /*
742 : * The message has wrapped the buffer. We'll need to copy it in order
743 : * to return it to the client in one chunk. First, make sure we have
744 : * a large enough buffer available.
745 : */
746 21519 : if (mqh->mqh_buflen < nbytes)
747 : {
748 : Size newbuflen;
749 :
750 : /*
751 : * Increase size to the next power of 2 that's >= nbytes, but
752 : * limit to MaxAllocSize.
753 : */
623 tgl 754 126 : newbuflen = pg_nextpower2_size_t(nbytes);
902 peter 755 126 : newbuflen = Min(newbuflen, MaxAllocSize);
756 :
3372 rhaas 757 126 : if (mqh->mqh_buffer != NULL)
758 : {
3372 rhaas 759 UBC 0 : pfree(mqh->mqh_buffer);
760 0 : mqh->mqh_buffer = NULL;
761 0 : mqh->mqh_buflen = 0;
762 : }
3372 rhaas 763 CBC 126 : mqh->mqh_buffer = MemoryContextAlloc(mqh->mqh_context, newbuflen);
764 126 : mqh->mqh_buflen = newbuflen;
765 : }
766 : }
767 :
768 : /* Loop until we've copied the entire message. */
769 : for (;;)
770 109947 : {
771 : Size still_needed;
772 :
773 : /* Copy as much as we can. */
3309 774 135818 : Assert(mqh->mqh_partial_bytes + rb <= nbytes);
402 tgl 775 135818 : if (rb > 0)
776 : {
777 131466 : memcpy(&mqh->mqh_buffer[mqh->mqh_partial_bytes], rawdata, rb);
778 131466 : mqh->mqh_partial_bytes += rb;
779 : }
780 :
781 : /*
782 : * Update count of bytes that can be consumed, accounting for
783 : * alignment padding. Note that this will never actually insert any
784 : * padding except at the end of a message, because the buffer size is
785 : * a multiple of MAXIMUM_ALIGNOF, and each read and write is as well.
786 : */
3309 rhaas 787 135818 : Assert(mqh->mqh_partial_bytes == nbytes || rb == MAXALIGN(rb));
1864 788 135818 : mqh->mqh_consume_pending += MAXALIGN(rb);
789 :
790 : /* If we got all the data, exit the loop. */
3309 791 135818 : if (mqh->mqh_partial_bytes >= nbytes)
3372 792 21519 : break;
793 :
794 : /* Wait for some more data. */
3309 795 114299 : still_needed = nbytes - mqh->mqh_partial_bytes;
1864 796 114299 : res = shm_mq_receive_bytes(mqh, still_needed, nowait, &rb, &rawdata);
3372 797 114299 : if (res != SHM_MQ_SUCCESS)
798 4352 : return res;
799 109947 : if (rb > still_needed)
800 20906 : rb = still_needed;
801 : }
802 :
803 : /* Return the complete message, and reset for next message. */
804 21519 : *nbytesp = nbytes;
805 21519 : *datap = mqh->mqh_buffer;
3309 806 21519 : mqh->mqh_length_word_complete = false;
807 21519 : mqh->mqh_partial_bytes = 0;
3372 808 21519 : return SHM_MQ_SUCCESS;
809 : }
810 :
811 : /*
812 : * Wait for the other process that's supposed to use this queue to attach
813 : * to it.
814 : *
815 : * The return value is SHM_MQ_DETACHED if the worker has already detached or
816 : * if it dies; it is SHM_MQ_SUCCESS if we detect that the worker has attached.
817 : * Note that we will only be able to detect that the worker has died before
818 : * attaching if a background worker handle was passed to shm_mq_attach().
819 : */
820 : shm_mq_result
3372 rhaas 821 UBC 0 : shm_mq_wait_for_attach(shm_mq_handle *mqh)
822 : {
823 0 : shm_mq *mq = mqh->mqh_queue;
824 : PGPROC **victim;
825 :
826 0 : if (shm_mq_get_receiver(mq) == MyProc)
827 0 : victim = &mq->mq_sender;
828 : else
829 : {
830 0 : Assert(shm_mq_get_sender(mq) == MyProc);
831 0 : victim = &mq->mq_receiver;
832 : }
833 :
834 0 : if (shm_mq_wait_internal(mq, victim, mqh->mqh_handle))
835 0 : return SHM_MQ_SUCCESS;
836 : else
837 0 : return SHM_MQ_DETACHED;
838 : }
839 :
840 : /*
841 : * Detach from a shared message queue, and destroy the shm_mq_handle.
842 : */
843 : void
2047 tgl 844 CBC 3765 : shm_mq_detach(shm_mq_handle *mqh)
845 : {
846 : /* Before detaching, notify the receiver about any already-written data. */
542 rhaas 847 3765 : if (mqh->mqh_send_pending > 0)
848 : {
849 651 : shm_mq_inc_bytes_written(mqh->mqh_queue, mqh->mqh_send_pending);
850 651 : mqh->mqh_send_pending = 0;
851 : }
852 :
853 : /* Notify counterparty that we're outta here. */
2047 tgl 854 3765 : shm_mq_detach_internal(mqh->mqh_queue);
855 :
856 : /* Cancel on_dsm_detach callback, if any. */
857 3765 : if (mqh->mqh_segment)
858 3765 : cancel_on_dsm_detach(mqh->mqh_segment,
859 : shm_mq_detach_callback,
860 3765 : PointerGetDatum(mqh->mqh_queue));
861 :
862 : /* Release local memory associated with handle. */
863 3765 : if (mqh->mqh_buffer != NULL)
864 118 : pfree(mqh->mqh_buffer);
865 3765 : pfree(mqh);
866 3765 : }
867 :
868 : /*
869 : * Notify counterparty that we're detaching from shared message queue.
870 : *
871 : * The purpose of this function is to make sure that the process
872 : * with which we're communicating doesn't block forever waiting for us to
873 : * fill or drain the queue once we've lost interest. When the sender
874 : * detaches, the receiver can read any messages remaining in the queue;
875 : * further reads will return SHM_MQ_DETACHED. If the receiver detaches,
876 : * further attempts to send messages will likewise return SHM_MQ_DETACHED.
877 : *
878 : * This is separated out from shm_mq_detach() because if the on_dsm_detach
879 : * callback fires, we only want to do this much. We do not try to touch
880 : * the local shm_mq_handle, as it may have been pfree'd already.
881 : */
882 : static void
883 5158 : shm_mq_detach_internal(shm_mq *mq)
884 : {
885 : PGPROC *victim;
886 :
3372 rhaas 887 5158 : SpinLockAcquire(&mq->mq_mutex);
1865 andres 888 5158 : if (mq->mq_sender == MyProc)
889 2540 : victim = mq->mq_receiver;
890 : else
891 : {
892 2618 : Assert(mq->mq_receiver == MyProc);
893 2618 : victim = mq->mq_sender;
894 : }
895 5158 : mq->mq_detached = true;
3372 rhaas 896 5158 : SpinLockRelease(&mq->mq_mutex);
897 :
898 5158 : if (victim != NULL)
899 5080 : SetLatch(&victim->procLatch);
900 5158 : }
901 :
902 : /*
903 : * Get the shm_mq from handle.
904 : */
905 : shm_mq *
2760 906 4193742 : shm_mq_get_queue(shm_mq_handle *mqh)
907 : {
908 4193742 : return mqh->mqh_queue;
909 : }
910 :
911 : /*
912 : * Write bytes into a shared message queue.
913 : */
914 : static shm_mq_result
3105 915 1684322 : shm_mq_send_bytes(shm_mq_handle *mqh, Size nbytes, const void *data,
916 : bool nowait, Size *bytes_written)
917 : {
3372 918 1684322 : shm_mq *mq = mqh->mqh_queue;
3309 919 1684322 : Size sent = 0;
920 : uint64 used;
921 1684322 : Size ringsize = mq->mq_ring_size;
922 : Size available;
923 :
3372 924 3601949 : while (sent < nbytes)
925 : {
926 : uint64 rb;
927 : uint64 wb;
928 :
929 : /* Compute number of ring buffer bytes used and available. */
1864 930 1922004 : rb = pg_atomic_read_u64(&mq->mq_bytes_read);
542 931 1922004 : wb = pg_atomic_read_u64(&mq->mq_bytes_written) + mqh->mqh_send_pending;
1864 932 1922004 : Assert(wb >= rb);
933 1922004 : used = wb - rb;
3372 934 1922004 : Assert(used <= ringsize);
935 1922004 : available = Min(ringsize - used, nbytes - sent);
936 :
937 : /*
938 : * Bail out if the queue has been detached. Note that we would be in
939 : * trouble if the compiler decided to cache the value of
940 : * mq->mq_detached in a register or on the stack across loop
941 : * iterations. It probably shouldn't do that anyway since we'll
942 : * always return, call an external function that performs a system
943 : * call, or reach a memory barrier at some point later in the loop,
944 : * but just to be sure, insert a compiler barrier here.
945 : */
1864 946 1922004 : pg_compiler_barrier();
947 1922004 : if (mq->mq_detached)
948 : {
3181 949 3 : *bytes_written = sent;
3372 950 3 : return SHM_MQ_DETACHED;
951 : }
952 :
2802 953 1922001 : if (available == 0 && !mqh->mqh_counterparty_attached)
954 : {
955 : /*
956 : * The queue is full, so if the receiver isn't yet known to be
957 : * attached, we must wait for that to happen.
958 : */
959 6 : if (nowait)
960 : {
2726 961 1 : if (shm_mq_counterparty_gone(mq, mqh->mqh_handle))
962 : {
2726 rhaas 963 UBC 0 : *bytes_written = sent;
964 0 : return SHM_MQ_DETACHED;
965 : }
2802 rhaas 966 CBC 1 : if (shm_mq_get_receiver(mq) == NULL)
967 : {
3181 rhaas 968 UBC 0 : *bytes_written = sent;
2802 969 0 : return SHM_MQ_WOULD_BLOCK;
970 : }
971 : }
2802 rhaas 972 CBC 5 : else if (!shm_mq_wait_internal(mq, &mq->mq_receiver,
973 : mqh->mqh_handle))
974 : {
2802 rhaas 975 UBC 0 : mq->mq_detached = true;
976 0 : *bytes_written = sent;
977 0 : return SHM_MQ_DETACHED;
978 : }
2802 rhaas 979 CBC 6 : mqh->mqh_counterparty_attached = true;
980 :
981 : /*
982 : * The receiver may have read some data after attaching, so we
983 : * must not wait without rechecking the queue state.
984 : */
985 : }
986 1921995 : else if (available == 0)
987 : {
988 : /* Update the pending send bytes in the shared memory. */
542 989 116353 : shm_mq_inc_bytes_written(mq, mqh->mqh_send_pending);
990 :
991 : /*
992 : * Since mq->mqh_counterparty_attached is known to be true at this
993 : * point, mq_receiver has been set, and it can't change once set.
994 : * Therefore, we can read it without acquiring the spinlock.
995 : */
1864 996 116353 : Assert(mqh->mqh_counterparty_attached);
997 116353 : SetLatch(&mq->mq_receiver->procLatch);
998 :
999 : /*
1000 : * We have just updated the mqh_send_pending bytes in the shared
1001 : * memory so reset it.
1002 : */
542 1003 116353 : mqh->mqh_send_pending = 0;
1004 :
1005 : /* Skip manipulation of our latch if nowait = true. */
3372 1006 116353 : if (nowait)
1007 : {
1008 4374 : *bytes_written = sent;
1009 4374 : return SHM_MQ_WOULD_BLOCK;
1010 : }
1011 :
1012 : /*
1013 : * Wait for our latch to be set. It might already be set for some
1014 : * unrelated reason, but that'll just result in one extra trip
1015 : * through the loop. It's worth it to avoid resetting the latch
1016 : * at top of loop, because setting an already-set latch is much
1017 : * cheaper than setting one that has been reset.
1018 : */
1598 tmunro 1019 111979 : (void) WaitLatch(MyLatch, WL_LATCH_SET | WL_EXIT_ON_PM_DEATH, 0,
1020 : WAIT_EVENT_MQ_SEND);
1021 :
1022 : /* Reset the latch so we don't spin. */
3007 andres 1023 111979 : ResetLatch(MyLatch);
1024 :
1025 : /* An interrupt may have occurred while we were waiting. */
2442 tgl 1026 111979 : CHECK_FOR_INTERRUPTS();
1027 : }
1028 : else
1029 : {
1030 : Size offset;
1031 : Size sendnow;
1032 :
1864 rhaas 1033 1805642 : offset = wb % (uint64) ringsize;
1034 1805642 : sendnow = Min(available, ringsize - offset);
1035 :
1036 : /*
1037 : * Write as much data as we can via a single memcpy(). Make sure
1038 : * these writes happen after the read of mq_bytes_read, above.
1039 : * This barrier pairs with the one in shm_mq_inc_bytes_read.
1040 : * (Since we're separating the read of mq_bytes_read from a
1041 : * subsequent write to mq_ring, we need a full barrier here.)
1042 : */
1043 1805642 : pg_memory_barrier();
3372 1044 1805642 : memcpy(&mq->mq_ring[mq->mq_ring_offset + offset],
1045 : (char *) data + sent, sendnow);
1046 1805642 : sent += sendnow;
1047 :
1048 : /*
1049 : * Update count of bytes written, with alignment padding. Note
1050 : * that this will never actually insert any padding except at the
1051 : * end of a run of bytes, because the buffer size is a multiple of
1052 : * MAXIMUM_ALIGNOF, and each read is as well.
1053 : */
3309 1054 1805642 : Assert(sent == nbytes || sendnow == MAXALIGN(sendnow));
1055 :
1056 : /*
1057 : * For efficiency, we don't update the bytes written in the shared
1058 : * memory and also don't set the reader's latch here. Refer to
1059 : * the comments atop the shm_mq_handle structure for more
1060 : * information.
1061 : */
542 1062 1805642 : mqh->mqh_send_pending += MAXALIGN(sendnow);
1063 : }
1064 : }
1065 :
3372 1066 1679945 : *bytes_written = sent;
1067 1679945 : return SHM_MQ_SUCCESS;
1068 : }
1069 :
1070 : /*
1071 : * Wait until at least *nbytesp bytes are available to be read from the
1072 : * shared message queue, or until the buffer wraps around. If the queue is
1073 : * detached, returns SHM_MQ_DETACHED. If nowait is specified and a wait
1074 : * would be required, returns SHM_MQ_WOULD_BLOCK. Otherwise, *datap is set
1075 : * to the location at which data bytes can be read, *nbytesp is set to the
1076 : * number of bytes which can be read at that address, and the return value
1077 : * is SHM_MQ_SUCCESS.
1078 : */
1079 : static shm_mq_result
1864 1080 1104066 : shm_mq_receive_bytes(shm_mq_handle *mqh, Size bytes_needed, bool nowait,
1081 : Size *nbytesp, void **datap)
1082 : {
1083 1104066 : shm_mq *mq = mqh->mqh_queue;
3309 1084 1104066 : Size ringsize = mq->mq_ring_size;
1085 : uint64 used;
1086 : uint64 written;
1087 :
1088 : for (;;)
3372 1089 148163 : {
1090 : Size offset;
1091 : uint64 read;
1092 :
1093 : /* Get bytes written, so we can compute what's available to read. */
1864 1094 1252229 : written = pg_atomic_read_u64(&mq->mq_bytes_written);
1095 :
1096 : /*
1097 : * Get bytes read. Include bytes we could consume but have not yet
1098 : * consumed.
1099 : */
1100 1252229 : read = pg_atomic_read_u64(&mq->mq_bytes_read) +
1101 1252229 : mqh->mqh_consume_pending;
1102 1252229 : used = written - read;
3372 1103 1252229 : Assert(used <= ringsize);
1864 1104 1252229 : offset = read % (uint64) ringsize;
1105 :
1106 : /* If we have enough data or buffer has wrapped, we're done. */
3372 1107 1252229 : if (used >= bytes_needed || offset + used >= ringsize)
1108 : {
1109 967928 : *nbytesp = Min(used, ringsize - offset);
1110 967928 : *datap = &mq->mq_ring[mq->mq_ring_offset + offset];
1111 :
1112 : /*
1113 : * Separate the read of mq_bytes_written, above, from caller's
1114 : * attempt to read the data itself. Pairs with the barrier in
1115 : * shm_mq_inc_bytes_written.
1116 : */
1864 1117 967928 : pg_read_barrier();
3372 1118 967928 : return SHM_MQ_SUCCESS;
1119 : }
1120 :
1121 : /*
1122 : * Fall out before waiting if the queue has been detached.
1123 : *
1124 : * Note that we don't check for this until *after* considering whether
1125 : * the data already available is enough, since the receiver can finish
1126 : * receiving a message stored in the buffer even after the sender has
1127 : * detached.
1128 : */
1864 1129 284301 : if (mq->mq_detached)
1130 : {
1131 : /*
1132 : * If the writer advanced mq_bytes_written and then set
1133 : * mq_detached, we might not have read the final value of
1134 : * mq_bytes_written above. Insert a read barrier and then check
1135 : * again if mq_bytes_written has advanced.
1136 : */
1861 1137 1214 : pg_read_barrier();
1138 1214 : if (written != pg_atomic_read_u64(&mq->mq_bytes_written))
1861 rhaas 1139 UBC 0 : continue;
1140 :
3372 rhaas 1141 CBC 1214 : return SHM_MQ_DETACHED;
1142 : }
1143 :
1144 : /*
1145 : * We didn't get enough data to satisfy the request, so mark any data
1146 : * previously-consumed as read to make more buffer space.
1147 : */
1864 1148 283087 : if (mqh->mqh_consume_pending > 0)
1149 : {
1150 136094 : shm_mq_inc_bytes_read(mq, mqh->mqh_consume_pending);
1151 136094 : mqh->mqh_consume_pending = 0;
1152 : }
1153 :
1154 : /* Skip manipulation of our latch if nowait = true. */
3372 1155 283087 : if (nowait)
1156 134924 : return SHM_MQ_WOULD_BLOCK;
1157 :
1158 : /*
1159 : * Wait for our latch to be set. It might already be set for some
1160 : * unrelated reason, but that'll just result in one extra trip through
1161 : * the loop. It's worth it to avoid resetting the latch at top of
1162 : * loop, because setting an already-set latch is much cheaper than
1163 : * setting one that has been reset.
1164 : */
1598 tmunro 1165 148163 : (void) WaitLatch(MyLatch, WL_LATCH_SET | WL_EXIT_ON_PM_DEATH, 0,
1166 : WAIT_EVENT_MQ_RECEIVE);
1167 :
1168 : /* Reset the latch so we don't spin. */
3007 andres 1169 148163 : ResetLatch(MyLatch);
1170 :
1171 : /* An interrupt may have occurred while we were waiting. */
2442 tgl 1172 148163 : CHECK_FOR_INTERRUPTS();
1173 : }
1174 : }
1175 :
1176 : /*
1177 : * Test whether a counterparty who may not even be alive yet is definitely gone.
1178 : */
1179 : static bool
1865 andres 1180 2493763 : shm_mq_counterparty_gone(shm_mq *mq, BackgroundWorkerHandle *handle)
1181 : {
1182 : pid_t pid;
1183 :
1184 : /* If the queue has been detached, counterparty is definitely gone. */
1864 rhaas 1185 2493763 : if (mq->mq_detached)
2726 1186 141 : return true;
1187 :
1188 : /* If there's a handle, check worker status. */
1189 2493622 : if (handle != NULL)
1190 : {
1191 : BgwHandleStatus status;
1192 :
1193 : /* Check for unexpected worker death. */
1194 2493611 : status = GetBackgroundWorkerPid(handle, &pid);
1195 2493611 : if (status != BGWH_STARTED && status != BGWH_NOT_YET_STARTED)
1196 : {
1197 : /* Mark it detached, just to make it official. */
2726 rhaas 1198 UBC 0 : mq->mq_detached = true;
1199 0 : return true;
1200 : }
1201 : }
1202 :
1203 : /* Counterparty is not definitively gone. */
2726 rhaas 1204 CBC 2493622 : return false;
1205 : }
1206 :
1207 : /*
1208 : * This is used when a process is waiting for its counterpart to attach to the
1209 : * queue. We exit when the other process attaches as expected, or, if
1210 : * handle != NULL, when the referenced background process or the postmaster
1211 : * dies. Note that if handle == NULL, and the process fails to attach, we'll
1212 : * potentially get stuck here forever waiting for a process that may never
1213 : * start. We do check for interrupts, though.
1214 : *
1215 : * ptr is a pointer to the memory address that we're expecting to become
1216 : * non-NULL when our counterpart attaches to the queue.
1217 : */
1218 : static bool
1865 andres 1219 170 : shm_mq_wait_internal(shm_mq *mq, PGPROC **ptr, BackgroundWorkerHandle *handle)
1220 : {
3260 bruce 1221 170 : bool result = false;
1222 :
1223 : for (;;)
3372 rhaas 1224 403 : {
1225 : BgwHandleStatus status;
1226 : pid_t pid;
1227 :
1228 : /* Acquire the lock just long enough to check the pointer. */
2739 1229 573 : SpinLockAcquire(&mq->mq_mutex);
1230 573 : result = (*ptr != NULL);
1231 573 : SpinLockRelease(&mq->mq_mutex);
1232 :
1233 : /* Fail if detached; else succeed if initialized. */
1864 1234 573 : if (mq->mq_detached)
1235 : {
2739 1236 84 : result = false;
1237 84 : break;
1238 : }
1239 489 : if (result)
1240 86 : break;
1241 :
1242 403 : if (handle != NULL)
1243 : {
1244 : /* Check for unexpected worker death. */
1245 403 : status = GetBackgroundWorkerPid(handle, &pid);
1246 403 : if (status != BGWH_STARTED && status != BGWH_NOT_YET_STARTED)
1247 : {
3372 rhaas 1248 UBC 0 : result = false;
1249 0 : break;
1250 : }
1251 : }
1252 :
1253 : /* Wait to be signaled. */
1598 tmunro 1254 CBC 403 : (void) WaitLatch(MyLatch, WL_LATCH_SET | WL_EXIT_ON_PM_DEATH, 0,
1255 : WAIT_EVENT_MQ_INTERNAL);
1256 :
1257 : /* Reset the latch so we don't spin. */
2739 rhaas 1258 403 : ResetLatch(MyLatch);
1259 :
1260 : /* An interrupt may have occurred while we were waiting. */
2442 tgl 1261 403 : CHECK_FOR_INTERRUPTS();
1262 : }
1263 :
3372 rhaas 1264 170 : return result;
1265 : }
1266 :
1267 : /*
1268 : * Increment the number of bytes read.
1269 : */
1270 : static void
1865 andres 1271 155016 : shm_mq_inc_bytes_read(shm_mq *mq, Size n)
1272 : {
1273 : PGPROC *sender;
1274 :
1275 : /*
1276 : * Separate prior reads of mq_ring from the increment of mq_bytes_read
1277 : * which follows. This pairs with the full barrier in
1278 : * shm_mq_send_bytes(). We only need a read barrier here because the
1279 : * increment of mq_bytes_read is actually a read followed by a dependent
1280 : * write.
1281 : */
1864 rhaas 1282 155016 : pg_read_barrier();
1283 :
1284 : /*
1285 : * There's no need to use pg_atomic_fetch_add_u64 here, because nobody
1286 : * else can be changing this value. This method should be cheaper.
1287 : */
1288 155016 : pg_atomic_write_u64(&mq->mq_bytes_read,
1289 155016 : pg_atomic_read_u64(&mq->mq_bytes_read) + n);
1290 :
1291 : /*
1292 : * We shouldn't have any bytes to read without a sender, so we can read
1293 : * mq_sender here without a lock. Once it's initialized, it can't change.
1294 : */
3372 1295 155016 : sender = mq->mq_sender;
1296 155016 : Assert(sender != NULL);
1297 155016 : SetLatch(&sender->procLatch);
1298 155016 : }
1299 :
1300 : /*
1301 : * Increment the number of bytes written.
1302 : */
1303 : static void
1865 andres 1304 238590 : shm_mq_inc_bytes_written(shm_mq *mq, Size n)
1305 : {
1306 : /*
1307 : * Separate prior reads of mq_ring from the write of mq_bytes_written
1308 : * which we're about to do. Pairs with the read barrier found in
1309 : * shm_mq_receive_bytes.
1310 : */
1864 rhaas 1311 238590 : pg_write_barrier();
1312 :
1313 : /*
1314 : * There's no need to use pg_atomic_fetch_add_u64 here, because nobody
1315 : * else can be changing this value. This method avoids taking the bus
1316 : * lock unnecessarily.
1317 : */
1318 238590 : pg_atomic_write_u64(&mq->mq_bytes_written,
1319 238590 : pg_atomic_read_u64(&mq->mq_bytes_written) + n);
3372 1320 238590 : }
1321 :
1322 : /* Shim for on_dsm_detach callback. */
1323 : static void
1324 1393 : shm_mq_detach_callback(dsm_segment *seg, Datum arg)
1325 : {
1326 1393 : shm_mq *mq = (shm_mq *) DatumGetPointer(arg);
1327 :
2047 tgl 1328 1393 : shm_mq_detach_internal(mq);
3372 rhaas 1329 1393 : }
|