LCOV - differential code coverage report
Current view: top level - src/backend/storage/ipc - shm_mq.c (source / functions) Coverage Total Hit UBC CBC
Current: Differential Code Coverage HEAD vs 15 Lines: 89.7 % 390 350 40 350
Current Date: 2023-04-08 15:15:32 Functions: 95.2 % 21 20 1 20
Baseline: 15
Baseline Date: 2023-04-08 15:09:40
Legend: Lines: hit not hit

           TLA  Line data    Source code
       1                 : /*-------------------------------------------------------------------------
       2                 :  *
       3                 :  * shm_mq.c
       4                 :  *    single-reader, single-writer shared memory message queue
       5                 :  *
       6                 :  * Both the sender and the receiver must have a PGPROC; their respective
       7                 :  * process latches are used for synchronization.  Only the sender may send,
       8                 :  * and only the receiver may receive.  This is intended to allow a user
       9                 :  * backend to communicate with worker backends that it has registered.
      10                 :  *
      11                 :  * Portions Copyright (c) 1996-2023, PostgreSQL Global Development Group
      12                 :  * Portions Copyright (c) 1994, Regents of the University of California
      13                 :  *
      14                 :  * src/backend/storage/ipc/shm_mq.c
      15                 :  *
      16                 :  *-------------------------------------------------------------------------
      17                 :  */
      18                 : 
      19                 : #include "postgres.h"
      20                 : 
      21                 : #include "miscadmin.h"
      22                 : #include "pgstat.h"
      23                 : #include "port/pg_bitutils.h"
      24                 : #include "postmaster/bgworker.h"
      25                 : #include "storage/procsignal.h"
      26                 : #include "storage/shm_mq.h"
      27                 : #include "storage/spin.h"
      28                 : #include "utils/memutils.h"
      29                 : 
      30                 : /*
      31                 :  * This structure represents the actual queue, stored in shared memory.
      32                 :  *
      33                 :  * Some notes on synchronization:
      34                 :  *
      35                 :  * mq_receiver and mq_bytes_read can only be changed by the receiver; and
      36                 :  * mq_sender and mq_bytes_written can only be changed by the sender.
      37                 :  * mq_receiver and mq_sender are protected by mq_mutex, although, importantly,
      38                 :  * they cannot change once set, and thus may be read without a lock once this
      39                 :  * is known to be the case.
      40                 :  *
      41                 :  * mq_bytes_read and mq_bytes_written are not protected by the mutex.  Instead,
      42                 :  * they are written atomically using 8 byte loads and stores.  Memory barriers
      43                 :  * must be carefully used to synchronize reads and writes of these values with
      44                 :  * reads and writes of the actual data in mq_ring.
      45                 :  *
      46                 :  * mq_detached needs no locking.  It can be set by either the sender or the
      47                 :  * receiver, but only ever from false to true, so redundant writes don't
      48                 :  * matter.  It is important that if we set mq_detached and then set the
      49                 :  * counterparty's latch, the counterparty must be certain to see the change
      50                 :  * after waking up.  Since SetLatch begins with a memory barrier and ResetLatch
      51                 :  * ends with one, this should be OK.
      52                 :  *
      53                 :  * mq_ring_size and mq_ring_offset never change after initialization, and
      54                 :  * can therefore be read without the lock.
      55                 :  *
      56                 :  * Importantly, mq_ring can be safely read and written without a lock.
      57                 :  * At any given time, the difference between mq_bytes_read and
      58                 :  * mq_bytes_written defines the number of bytes within mq_ring that contain
      59                 :  * unread data, and mq_bytes_read defines the position where those bytes
      60                 :  * begin.  The sender can increase the number of unread bytes at any time,
      61                 :  * but only the receiver can give license to overwrite those bytes, by
      62                 :  * incrementing mq_bytes_read.  Therefore, it's safe for the receiver to read
      63                 :  * the unread bytes it knows to be present without the lock.  Conversely,
      64                 :  * the sender can write to the unused portion of the ring buffer without
      65                 :  * the lock, because nobody else can be reading or writing those bytes.  The
      66                 :  * receiver could be making more bytes unused by incrementing mq_bytes_read,
      67                 :  * but that's OK.  Note that it would be unsafe for the receiver to read any
      68                 :  * data it's already marked as read, or to write any data; and it would be
      69                 :  * unsafe for the sender to reread any data after incrementing
      70                 :  * mq_bytes_written, but fortunately there's no need for any of that.
      71                 :  */
      72                 : struct shm_mq
      73                 : {
      74                 :     slock_t     mq_mutex;
      75                 :     PGPROC     *mq_receiver;
      76                 :     PGPROC     *mq_sender;
      77                 :     pg_atomic_uint64 mq_bytes_read;
      78                 :     pg_atomic_uint64 mq_bytes_written;
      79                 :     Size        mq_ring_size;
      80                 :     bool        mq_detached;
      81                 :     uint8       mq_ring_offset;
      82                 :     char        mq_ring[FLEXIBLE_ARRAY_MEMBER];
      83                 : };
      84                 : 
      85                 : /*
      86                 :  * This structure is a backend-private handle for access to a queue.
      87                 :  *
      88                 :  * mqh_queue is a pointer to the queue we've attached, and mqh_segment is
      89                 :  * an optional pointer to the dynamic shared memory segment that contains it.
      90                 :  * (If mqh_segment is provided, we register an on_dsm_detach callback to
      91                 :  * make sure we detach from the queue before detaching from DSM.)
      92                 :  *
      93                 :  * If this queue is intended to connect the current process with a background
      94                 :  * worker that started it, the user can pass a pointer to the worker handle
      95                 :  * to shm_mq_attach(), and we'll store it in mqh_handle.  The point of this
      96                 :  * is to allow us to begin sending to or receiving from that queue before the
      97                 :  * process we'll be communicating with has even been started.  If it fails
      98                 :  * to start, the handle will allow us to notice that and fail cleanly, rather
      99                 :  * than waiting forever; see shm_mq_wait_internal.  This is mostly useful in
     100                 :  * simple cases - e.g. where there are just 2 processes communicating; in
     101                 :  * more complex scenarios, every process may not have a BackgroundWorkerHandle
     102                 :  * available, or may need to watch for the failure of more than one other
     103                 :  * process at a time.
     104                 :  *
     105                 :  * When a message exists as a contiguous chunk of bytes in the queue - that is,
     106                 :  * it is smaller than the size of the ring buffer and does not wrap around
     107                 :  * the end - we return the message to the caller as a pointer into the buffer.
     108                 :  * For messages that are larger or happen to wrap, we reassemble the message
     109                 :  * locally by copying the chunks into a backend-local buffer.  mqh_buffer is
     110                 :  * the buffer, and mqh_buflen is the number of bytes allocated for it.
     111                 :  *
     112                 :  * mqh_send_pending, is number of bytes that is written to the queue but not
     113                 :  * yet updated in the shared memory.  We will not update it until the written
     114                 :  * data is 1/4th of the ring size or the tuple queue is full.  This will
     115                 :  * prevent frequent CPU cache misses, and it will also avoid frequent
     116                 :  * SetLatch() calls, which are quite expensive.
     117                 :  *
     118                 :  * mqh_partial_bytes, mqh_expected_bytes, and mqh_length_word_complete
     119                 :  * are used to track the state of non-blocking operations.  When the caller
     120                 :  * attempts a non-blocking operation that returns SHM_MQ_WOULD_BLOCK, they
     121                 :  * are expected to retry the call at a later time with the same argument;
     122                 :  * we need to retain enough state to pick up where we left off.
     123                 :  * mqh_length_word_complete tracks whether we are done sending or receiving
     124                 :  * (whichever we're doing) the entire length word.  mqh_partial_bytes tracks
     125                 :  * the number of bytes read or written for either the length word or the
     126                 :  * message itself, and mqh_expected_bytes - which is used only for reads -
     127                 :  * tracks the expected total size of the payload.
     128                 :  *
     129                 :  * mqh_counterparty_attached tracks whether we know the counterparty to have
     130                 :  * attached to the queue at some previous point.  This lets us avoid some
     131                 :  * mutex acquisitions.
     132                 :  *
     133                 :  * mqh_context is the memory context in effect at the time we attached to
     134                 :  * the shm_mq.  The shm_mq_handle itself is allocated in this context, and
     135                 :  * we make sure any other allocations we do happen in this context as well,
     136                 :  * to avoid nasty surprises.
     137                 :  */
     138                 : struct shm_mq_handle
     139                 : {
     140                 :     shm_mq     *mqh_queue;
     141                 :     dsm_segment *mqh_segment;
     142                 :     BackgroundWorkerHandle *mqh_handle;
     143                 :     char       *mqh_buffer;
     144                 :     Size        mqh_buflen;
     145                 :     Size        mqh_consume_pending;
     146                 :     Size        mqh_send_pending;
     147                 :     Size        mqh_partial_bytes;
     148                 :     Size        mqh_expected_bytes;
     149                 :     bool        mqh_length_word_complete;
     150                 :     bool        mqh_counterparty_attached;
     151                 :     MemoryContext mqh_context;
     152                 : };
     153                 : 
     154                 : static void shm_mq_detach_internal(shm_mq *mq);
     155                 : static shm_mq_result shm_mq_send_bytes(shm_mq_handle *mqh, Size nbytes,
     156                 :                                        const void *data, bool nowait, Size *bytes_written);
     157                 : static shm_mq_result shm_mq_receive_bytes(shm_mq_handle *mqh,
     158                 :                                           Size bytes_needed, bool nowait, Size *nbytesp,
     159                 :                                           void **datap);
     160                 : static bool shm_mq_counterparty_gone(shm_mq *mq,
     161                 :                                      BackgroundWorkerHandle *handle);
     162                 : static bool shm_mq_wait_internal(shm_mq *mq, PGPROC **ptr,
     163                 :                                  BackgroundWorkerHandle *handle);
     164                 : static void shm_mq_inc_bytes_read(shm_mq *mq, Size n);
     165                 : static void shm_mq_inc_bytes_written(shm_mq *mq, Size n);
     166                 : static void shm_mq_detach_callback(dsm_segment *seg, Datum arg);
     167                 : 
     168                 : /* Minimum queue size is enough for header and at least one chunk of data. */
     169                 : const Size  shm_mq_minimum_size =
     170                 : MAXALIGN(offsetof(shm_mq, mq_ring)) + MAXIMUM_ALIGNOF;
     171                 : 
     172                 : #define MQH_INITIAL_BUFSIZE             8192
     173                 : 
     174                 : /*
     175                 :  * Initialize a new shared message queue.
     176                 :  */
     177                 : shm_mq *
     178 CBC        2618 : shm_mq_create(void *address, Size size)
     179                 : {
     180            2618 :     shm_mq     *mq = address;
     181            2618 :     Size        data_offset = MAXALIGN(offsetof(shm_mq, mq_ring));
     182                 : 
     183                 :     /* If the size isn't MAXALIGN'd, just discard the odd bytes. */
     184            2618 :     size = MAXALIGN_DOWN(size);
     185                 : 
     186                 :     /* Queue size must be large enough to hold some data. */
     187            2618 :     Assert(size > data_offset);
     188                 : 
     189                 :     /* Initialize queue header. */
     190            2618 :     SpinLockInit(&mq->mq_mutex);
     191            2618 :     mq->mq_receiver = NULL;
     192            2618 :     mq->mq_sender = NULL;
     193            2618 :     pg_atomic_init_u64(&mq->mq_bytes_read, 0);
     194            2618 :     pg_atomic_init_u64(&mq->mq_bytes_written, 0);
     195            2618 :     mq->mq_ring_size = size - data_offset;
     196            2618 :     mq->mq_detached = false;
     197            2618 :     mq->mq_ring_offset = data_offset - offsetof(shm_mq, mq_ring);
     198                 : 
     199            2618 :     return mq;
     200                 : }
     201                 : 
     202                 : /*
     203                 :  * Set the identity of the process that will receive from a shared message
     204                 :  * queue.
     205                 :  */
     206                 : void
     207            2618 : shm_mq_set_receiver(shm_mq *mq, PGPROC *proc)
     208                 : {
     209                 :     PGPROC     *sender;
     210                 : 
     211            2618 :     SpinLockAcquire(&mq->mq_mutex);
     212            2618 :     Assert(mq->mq_receiver == NULL);
     213            2618 :     mq->mq_receiver = proc;
     214            2618 :     sender = mq->mq_sender;
     215            2618 :     SpinLockRelease(&mq->mq_mutex);
     216                 : 
     217            2618 :     if (sender != NULL)
     218              17 :         SetLatch(&sender->procLatch);
     219            2618 : }
     220                 : 
     221                 : /*
     222                 :  * Set the identity of the process that will send to a shared message queue.
     223                 :  */
     224                 : void
     225            2540 : shm_mq_set_sender(shm_mq *mq, PGPROC *proc)
     226                 : {
     227                 :     PGPROC     *receiver;
     228                 : 
     229            2540 :     SpinLockAcquire(&mq->mq_mutex);
     230            2540 :     Assert(mq->mq_sender == NULL);
     231            2540 :     mq->mq_sender = proc;
     232            2540 :     receiver = mq->mq_receiver;
     233            2540 :     SpinLockRelease(&mq->mq_mutex);
     234                 : 
     235            2540 :     if (receiver != NULL)
     236            2523 :         SetLatch(&receiver->procLatch);
     237            2540 : }
     238                 : 
     239                 : /*
     240                 :  * Get the configured receiver.
     241                 :  */
     242                 : PGPROC *
     243               1 : shm_mq_get_receiver(shm_mq *mq)
     244                 : {
     245                 :     PGPROC     *receiver;
     246                 : 
     247               1 :     SpinLockAcquire(&mq->mq_mutex);
     248               1 :     receiver = mq->mq_receiver;
     249               1 :     SpinLockRelease(&mq->mq_mutex);
     250                 : 
     251               1 :     return receiver;
     252                 : }
     253                 : 
     254                 : /*
     255                 :  * Get the configured sender.
     256                 :  */
     257                 : PGPROC *
     258         6687588 : shm_mq_get_sender(shm_mq *mq)
     259                 : {
     260                 :     PGPROC     *sender;
     261                 : 
     262         6687588 :     SpinLockAcquire(&mq->mq_mutex);
     263         6687588 :     sender = mq->mq_sender;
     264         6687588 :     SpinLockRelease(&mq->mq_mutex);
     265                 : 
     266         6687588 :     return sender;
     267                 : }
     268                 : 
     269                 : /*
     270                 :  * Attach to a shared message queue so we can send or receive messages.
     271                 :  *
     272                 :  * The memory context in effect at the time this function is called should
     273                 :  * be one which will last for at least as long as the message queue itself.
     274                 :  * We'll allocate the handle in that context, and future allocations that
     275                 :  * are needed to buffer incoming data will happen in that context as well.
     276                 :  *
     277                 :  * If seg != NULL, the queue will be automatically detached when that dynamic
     278                 :  * shared memory segment is detached.
     279                 :  *
     280                 :  * If handle != NULL, the queue can be read or written even before the
     281                 :  * other process has attached.  We'll wait for it to do so if needed.  The
     282                 :  * handle must be for a background worker initialized with bgw_notify_pid
     283                 :  * equal to our PID.
     284                 :  *
     285                 :  * shm_mq_detach() should be called when done.  This will free the
     286                 :  * shm_mq_handle and mark the queue itself as detached, so that our
     287                 :  * counterpart won't get stuck waiting for us to fill or drain the queue
     288                 :  * after we've already lost interest.
     289                 :  */
     290                 : shm_mq_handle *
     291            5158 : shm_mq_attach(shm_mq *mq, dsm_segment *seg, BackgroundWorkerHandle *handle)
     292                 : {
     293            5158 :     shm_mq_handle *mqh = palloc(sizeof(shm_mq_handle));
     294                 : 
     295            5158 :     Assert(mq->mq_receiver == MyProc || mq->mq_sender == MyProc);
     296            5158 :     mqh->mqh_queue = mq;
     297            5158 :     mqh->mqh_segment = seg;
     298            5158 :     mqh->mqh_handle = handle;
     299            5158 :     mqh->mqh_buffer = NULL;
     300            5158 :     mqh->mqh_buflen = 0;
     301            5158 :     mqh->mqh_consume_pending = 0;
     302            5158 :     mqh->mqh_send_pending = 0;
     303            5158 :     mqh->mqh_partial_bytes = 0;
     304            5158 :     mqh->mqh_expected_bytes = 0;
     305            5158 :     mqh->mqh_length_word_complete = false;
     306            5158 :     mqh->mqh_counterparty_attached = false;
     307            5158 :     mqh->mqh_context = CurrentMemoryContext;
     308                 : 
     309            5158 :     if (seg != NULL)
     310            5158 :         on_dsm_detach(seg, shm_mq_detach_callback, PointerGetDatum(mq));
     311                 : 
     312            5158 :     return mqh;
     313                 : }
     314                 : 
     315                 : /*
     316                 :  * Associate a BackgroundWorkerHandle with a shm_mq_handle just as if it had
     317                 :  * been passed to shm_mq_attach.
     318                 :  */
     319                 : void
     320            2508 : shm_mq_set_handle(shm_mq_handle *mqh, BackgroundWorkerHandle *handle)
     321                 : {
     322            2508 :     Assert(mqh->mqh_handle == NULL);
     323            2508 :     mqh->mqh_handle = handle;
     324            2508 : }
     325                 : 
     326                 : /*
     327                 :  * Write a message into a shared message queue.
     328                 :  */
     329                 : shm_mq_result
     330          843092 : shm_mq_send(shm_mq_handle *mqh, Size nbytes, const void *data, bool nowait,
     331                 :             bool force_flush)
     332                 : {
     333                 :     shm_mq_iovec iov;
     334                 : 
     335          843092 :     iov.data = data;
     336          843092 :     iov.len = nbytes;
     337                 : 
     338          843092 :     return shm_mq_sendv(mqh, &iov, 1, nowait, force_flush);
     339                 : }
     340                 : 
     341                 : /*
     342                 :  * Write a message into a shared message queue, gathered from multiple
     343                 :  * addresses.
     344                 :  *
     345                 :  * When nowait = false, we'll wait on our process latch when the ring buffer
     346                 :  * fills up, and then continue writing once the receiver has drained some data.
     347                 :  * The process latch is reset after each wait.
     348                 :  *
     349                 :  * When nowait = true, we do not manipulate the state of the process latch;
     350                 :  * instead, if the buffer becomes full, we return SHM_MQ_WOULD_BLOCK.  In
     351                 :  * this case, the caller should call this function again, with the same
     352                 :  * arguments, each time the process latch is set.  (Once begun, the sending
     353                 :  * of a message cannot be aborted except by detaching from the queue; changing
     354                 :  * the length or payload will corrupt the queue.)
     355                 :  *
     356                 :  * When force_flush = true, we immediately update the shm_mq's mq_bytes_written
     357                 :  * and notify the receiver (if it is already attached).  Otherwise, we don't
     358                 :  * update it until we have written an amount of data greater than 1/4th of the
     359                 :  * ring size.
     360                 :  */
     361                 : shm_mq_result
     362          845698 : shm_mq_sendv(shm_mq_handle *mqh, shm_mq_iovec *iov, int iovcnt, bool nowait,
     363                 :              bool force_flush)
     364                 : {
     365                 :     shm_mq_result res;
     366          845698 :     shm_mq     *mq = mqh->mqh_queue;
     367                 :     PGPROC     *receiver;
     368          845698 :     Size        nbytes = 0;
     369                 :     Size        bytes_written;
     370                 :     int         i;
     371          845698 :     int         which_iov = 0;
     372                 :     Size        offset;
     373                 : 
     374          845698 :     Assert(mq->mq_sender == MyProc);
     375                 : 
     376                 :     /* Compute total size of write. */
     377         1694002 :     for (i = 0; i < iovcnt; ++i)
     378          848304 :         nbytes += iov[i].len;
     379                 : 
     380                 :     /* Prevent writing messages overwhelming the receiver. */
     381          845698 :     if (nbytes > MaxAllocSize)
     382 UBC           0 :         ereport(ERROR,
     383                 :                 (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
     384                 :                  errmsg("cannot send a message of size %zu via shared memory queue",
     385                 :                         nbytes)));
     386                 : 
     387                 :     /* Try to write, or finish writing, the length word into the buffer. */
     388 CBC     1687019 :     while (!mqh->mqh_length_word_complete)
     389                 :     {
     390          841324 :         Assert(mqh->mqh_partial_bytes < sizeof(Size));
     391          841324 :         res = shm_mq_send_bytes(mqh, sizeof(Size) - mqh->mqh_partial_bytes,
     392          841324 :                                 ((char *) &nbytes) + mqh->mqh_partial_bytes,
     393                 :                                 nowait, &bytes_written);
     394                 : 
     395          841324 :         if (res == SHM_MQ_DETACHED)
     396                 :         {
     397                 :             /* Reset state in case caller tries to send another message. */
     398               3 :             mqh->mqh_partial_bytes = 0;
     399               3 :             mqh->mqh_length_word_complete = false;
     400               3 :             return res;
     401                 :         }
     402          841321 :         mqh->mqh_partial_bytes += bytes_written;
     403                 : 
     404          841321 :         if (mqh->mqh_partial_bytes >= sizeof(Size))
     405                 :         {
     406          841321 :             Assert(mqh->mqh_partial_bytes == sizeof(Size));
     407                 : 
     408          841321 :             mqh->mqh_partial_bytes = 0;
     409          841321 :             mqh->mqh_length_word_complete = true;
     410                 :         }
     411                 : 
     412          841321 :         if (res != SHM_MQ_SUCCESS)
     413 UBC           0 :             return res;
     414                 : 
     415                 :         /* Length word can't be split unless bigger than required alignment. */
     416 CBC      841321 :         Assert(mqh->mqh_length_word_complete || sizeof(Size) > MAXIMUM_ALIGNOF);
     417                 :     }
     418                 : 
     419                 :     /* Write the actual data bytes into the buffer. */
     420          845695 :     Assert(mqh->mqh_partial_bytes <= nbytes);
     421          845695 :     offset = mqh->mqh_partial_bytes;
     422                 :     do
     423                 :     {
     424                 :         Size        chunksize;
     425                 : 
     426                 :         /* Figure out which bytes need to be sent next. */
     427          847003 :         if (offset >= iov[which_iov].len)
     428                 :         {
     429            4005 :             offset -= iov[which_iov].len;
     430            4005 :             ++which_iov;
     431            4005 :             if (which_iov >= iovcnt)
     432            4000 :                 break;
     433               5 :             continue;
     434                 :         }
     435                 : 
     436                 :         /*
     437                 :          * We want to avoid copying the data if at all possible, but every
     438                 :          * chunk of bytes we write into the queue has to be MAXALIGN'd, except
     439                 :          * the last.  Thus, if a chunk other than the last one ends on a
     440                 :          * non-MAXALIGN'd boundary, we have to combine the tail end of its
     441                 :          * data with data from one or more following chunks until we either
     442                 :          * reach the last chunk or accumulate a number of bytes which is
     443                 :          * MAXALIGN'd.
     444                 :          */
     445          842998 :         if (which_iov + 1 < iovcnt &&
     446            2598 :             offset + MAXIMUM_ALIGNOF > iov[which_iov].len)
     447            2598 :         {
     448                 :             char        tmpbuf[MAXIMUM_ALIGNOF];
     449            2598 :             int         j = 0;
     450                 : 
     451                 :             for (;;)
     452                 :             {
     453           15612 :                 if (offset < iov[which_iov].len)
     454                 :                 {
     455           11719 :                     tmpbuf[j] = iov[which_iov].data[offset];
     456           11719 :                     j++;
     457           11719 :                     offset++;
     458           11719 :                     if (j == MAXIMUM_ALIGNOF)
     459            1303 :                         break;
     460                 :                 }
     461                 :                 else
     462                 :                 {
     463            3893 :                     offset -= iov[which_iov].len;
     464            3893 :                     which_iov++;
     465            3893 :                     if (which_iov >= iovcnt)
     466            1295 :                         break;
     467                 :                 }
     468                 :             }
     469                 : 
     470            2598 :             res = shm_mq_send_bytes(mqh, j, tmpbuf, nowait, &bytes_written);
     471                 : 
     472            2598 :             if (res == SHM_MQ_DETACHED)
     473                 :             {
     474                 :                 /* Reset state in case caller tries to send another message. */
     475 UBC           0 :                 mqh->mqh_partial_bytes = 0;
     476               0 :                 mqh->mqh_length_word_complete = false;
     477               0 :                 return res;
     478                 :             }
     479                 : 
     480 CBC        2598 :             mqh->mqh_partial_bytes += bytes_written;
     481            2598 :             if (res != SHM_MQ_SUCCESS)
     482 UBC           0 :                 return res;
     483 CBC        2598 :             continue;
     484                 :         }
     485                 : 
     486                 :         /*
     487                 :          * If this is the last chunk, we can write all the data, even if it
     488                 :          * isn't a multiple of MAXIMUM_ALIGNOF.  Otherwise, we need to
     489                 :          * MAXALIGN_DOWN the write size.
     490                 :          */
     491          840400 :         chunksize = iov[which_iov].len - offset;
     492          840400 :         if (which_iov + 1 < iovcnt)
     493 UBC           0 :             chunksize = MAXALIGN_DOWN(chunksize);
     494 CBC      840400 :         res = shm_mq_send_bytes(mqh, chunksize, &iov[which_iov].data[offset],
     495                 :                                 nowait, &bytes_written);
     496                 : 
     497          840400 :         if (res == SHM_MQ_DETACHED)
     498                 :         {
     499                 :             /* Reset state in case caller tries to send another message. */
     500 UBC           0 :             mqh->mqh_length_word_complete = false;
     501               0 :             mqh->mqh_partial_bytes = 0;
     502               0 :             return res;
     503                 :         }
     504                 : 
     505 CBC      840400 :         mqh->mqh_partial_bytes += bytes_written;
     506          840400 :         offset += bytes_written;
     507          840400 :         if (res != SHM_MQ_SUCCESS)
     508            4374 :             return res;
     509          838629 :     } while (mqh->mqh_partial_bytes < nbytes);
     510                 : 
     511                 :     /* Reset for next message. */
     512          841321 :     mqh->mqh_partial_bytes = 0;
     513          841321 :     mqh->mqh_length_word_complete = false;
     514                 : 
     515                 :     /* If queue has been detached, let caller know. */
     516          841321 :     if (mq->mq_detached)
     517 UBC           0 :         return SHM_MQ_DETACHED;
     518                 : 
     519                 :     /*
     520                 :      * If the counterparty is known to have attached, we can read mq_receiver
     521                 :      * without acquiring the spinlock.  Otherwise, more caution is needed.
     522                 :      */
     523 CBC      841321 :     if (mqh->mqh_counterparty_attached)
     524          839353 :         receiver = mq->mq_receiver;
     525                 :     else
     526                 :     {
     527            1968 :         SpinLockAcquire(&mq->mq_mutex);
     528            1968 :         receiver = mq->mq_receiver;
     529            1968 :         SpinLockRelease(&mq->mq_mutex);
     530            1968 :         if (receiver != NULL)
     531            1968 :             mqh->mqh_counterparty_attached = true;
     532                 :     }
     533                 : 
     534                 :     /*
     535                 :      * If the caller has requested force flush or we have written more than
     536                 :      * 1/4 of the ring size, mark it as written in shared memory and notify
     537                 :      * the receiver.
     538                 :      */
     539          841321 :     if (force_flush || mqh->mqh_send_pending > (mq->mq_ring_size >> 2))
     540                 :     {
     541          121586 :         shm_mq_inc_bytes_written(mq, mqh->mqh_send_pending);
     542          121586 :         if (receiver != NULL)
     543          121586 :             SetLatch(&receiver->procLatch);
     544          121586 :         mqh->mqh_send_pending = 0;
     545                 :     }
     546                 : 
     547          841321 :     return SHM_MQ_SUCCESS;
     548                 : }
     549                 : 
     550                 : /*
     551                 :  * Receive a message from a shared message queue.
     552                 :  *
     553                 :  * We set *nbytes to the message length and *data to point to the message
     554                 :  * payload.  If the entire message exists in the queue as a single,
     555                 :  * contiguous chunk, *data will point directly into shared memory; otherwise,
     556                 :  * it will point to a temporary buffer.  This mostly avoids data copying in
     557                 :  * the hoped-for case where messages are short compared to the buffer size,
     558                 :  * while still allowing longer messages.  In either case, the return value
     559                 :  * remains valid until the next receive operation is performed on the queue.
     560                 :  *
     561                 :  * When nowait = false, we'll wait on our process latch when the ring buffer
     562                 :  * is empty and we have not yet received a full message.  The sender will
     563                 :  * set our process latch after more data has been written, and we'll resume
     564                 :  * processing.  Each call will therefore return a complete message
     565                 :  * (unless the sender detaches the queue).
     566                 :  *
     567                 :  * When nowait = true, we do not manipulate the state of the process latch;
     568                 :  * instead, whenever the buffer is empty and we need to read from it, we
     569                 :  * return SHM_MQ_WOULD_BLOCK.  In this case, the caller should call this
     570                 :  * function again after the process latch has been set.
     571                 :  */
     572                 : shm_mq_result
     573         3463833 : shm_mq_receive(shm_mq_handle *mqh, Size *nbytesp, void **datap, bool nowait)
     574                 : {
     575         3463833 :     shm_mq     *mq = mqh->mqh_queue;
     576                 :     shm_mq_result res;
     577         3463833 :     Size        rb = 0;
     578                 :     Size        nbytes;
     579                 :     void       *rawdata;
     580                 : 
     581         3463833 :     Assert(mq->mq_receiver == MyProc);
     582                 : 
     583                 :     /* We can't receive data until the sender has attached. */
     584         3463833 :     if (!mqh->mqh_counterparty_attached)
     585                 :     {
     586         2493927 :         if (nowait)
     587                 :         {
     588                 :             int         counterparty_gone;
     589                 : 
     590                 :             /*
     591                 :              * We shouldn't return at this point at all unless the sender
     592                 :              * hasn't attached yet.  However, the correct return value depends
     593                 :              * on whether the sender is still attached.  If we first test
     594                 :              * whether the sender has ever attached and then test whether the
     595                 :              * sender has detached, there's a race condition: a sender that
     596                 :              * attaches and detaches very quickly might fool us into thinking
     597                 :              * the sender never attached at all.  So, test whether our
     598                 :              * counterparty is definitively gone first, and only afterwards
     599                 :              * check whether the sender ever attached in the first place.
     600                 :              */
     601         2493762 :             counterparty_gone = shm_mq_counterparty_gone(mq, mqh->mqh_handle);
     602         2493762 :             if (shm_mq_get_sender(mq) == NULL)
     603                 :             {
     604         2491396 :                 if (counterparty_gone)
     605 UBC           0 :                     return SHM_MQ_DETACHED;
     606                 :                 else
     607 CBC     2491396 :                     return SHM_MQ_WOULD_BLOCK;
     608                 :             }
     609                 :         }
     610             165 :         else if (!shm_mq_wait_internal(mq, &mq->mq_sender, mqh->mqh_handle)
     611              84 :                  && shm_mq_get_sender(mq) == NULL)
     612                 :         {
     613 UBC           0 :             mq->mq_detached = true;
     614               0 :             return SHM_MQ_DETACHED;
     615                 :         }
     616 CBC        2531 :         mqh->mqh_counterparty_attached = true;
     617                 :     }
     618                 : 
     619                 :     /*
     620                 :      * If we've consumed an amount of data greater than 1/4th of the ring
     621                 :      * size, mark it consumed in shared memory.  We try to avoid doing this
     622                 :      * unnecessarily when only a small amount of data has been consumed,
     623                 :      * because SetLatch() is fairly expensive and we don't want to do it too
     624                 :      * often.
     625                 :      */
     626          972437 :     if (mqh->mqh_consume_pending > mq->mq_ring_size / 4)
     627                 :     {
     628           18922 :         shm_mq_inc_bytes_read(mq, mqh->mqh_consume_pending);
     629           18922 :         mqh->mqh_consume_pending = 0;
     630                 :     }
     631                 : 
     632                 :     /* Try to read, or finish reading, the length word from the buffer. */
     633          994119 :     while (!mqh->mqh_length_word_complete)
     634                 :     {
     635                 :         /* Try to receive the message length word. */
     636          967963 :         Assert(mqh->mqh_partial_bytes < sizeof(Size));
     637          967963 :         res = shm_mq_receive_bytes(mqh, sizeof(Size) - mqh->mqh_partial_bytes,
     638                 :                                    nowait, &rb, &rawdata);
     639          967963 :         if (res != SHM_MQ_SUCCESS)
     640          131664 :             return res;
     641                 : 
     642                 :         /*
     643                 :          * Hopefully, we'll receive the entire message length word at once.
     644                 :          * But if sizeof(Size) > MAXIMUM_ALIGNOF, then it might be split over
     645                 :          * multiple reads.
     646                 :          */
     647          836299 :         if (mqh->mqh_partial_bytes == 0 && rb >= sizeof(Size))
     648           21682 :         {
     649                 :             Size        needed;
     650                 : 
     651          836299 :             nbytes = *(Size *) rawdata;
     652                 : 
     653                 :             /* If we've already got the whole message, we're done. */
     654          836299 :             needed = MAXALIGN(sizeof(Size)) + MAXALIGN(nbytes);
     655          836299 :             if (rb >= needed)
     656                 :             {
     657          814617 :                 mqh->mqh_consume_pending += needed;
     658          814617 :                 *nbytesp = nbytes;
     659          814617 :                 *datap = ((char *) rawdata) + MAXALIGN(sizeof(Size));
     660          814617 :                 return SHM_MQ_SUCCESS;
     661                 :             }
     662                 : 
     663                 :             /*
     664                 :              * We don't have the whole message, but we at least have the whole
     665                 :              * length word.
     666                 :              */
     667           21682 :             mqh->mqh_expected_bytes = nbytes;
     668           21682 :             mqh->mqh_length_word_complete = true;
     669           21682 :             mqh->mqh_consume_pending += MAXALIGN(sizeof(Size));
     670           21682 :             rb -= MAXALIGN(sizeof(Size));
     671                 :         }
     672                 :         else
     673                 :         {
     674                 :             Size        lengthbytes;
     675                 : 
     676                 :             /* Can't be split unless bigger than required alignment. */
     677 UBC           0 :             Assert(sizeof(Size) > MAXIMUM_ALIGNOF);
     678                 : 
     679                 :             /* Message word is split; need buffer to reassemble. */
     680                 :             if (mqh->mqh_buffer == NULL)
     681                 :             {
     682                 :                 mqh->mqh_buffer = MemoryContextAlloc(mqh->mqh_context,
     683                 :                                                      MQH_INITIAL_BUFSIZE);
     684                 :                 mqh->mqh_buflen = MQH_INITIAL_BUFSIZE;
     685                 :             }
     686                 :             Assert(mqh->mqh_buflen >= sizeof(Size));
     687                 : 
     688                 :             /* Copy partial length word; remember to consume it. */
     689                 :             if (mqh->mqh_partial_bytes + rb > sizeof(Size))
     690                 :                 lengthbytes = sizeof(Size) - mqh->mqh_partial_bytes;
     691                 :             else
     692                 :                 lengthbytes = rb;
     693                 :             memcpy(&mqh->mqh_buffer[mqh->mqh_partial_bytes], rawdata,
     694                 :                    lengthbytes);
     695                 :             mqh->mqh_partial_bytes += lengthbytes;
     696                 :             mqh->mqh_consume_pending += MAXALIGN(lengthbytes);
     697                 :             rb -= lengthbytes;
     698                 : 
     699                 :             /* If we now have the whole word, we're ready to read payload. */
     700                 :             if (mqh->mqh_partial_bytes >= sizeof(Size))
     701                 :             {
     702                 :                 Assert(mqh->mqh_partial_bytes == sizeof(Size));
     703                 :                 mqh->mqh_expected_bytes = *(Size *) mqh->mqh_buffer;
     704                 :                 mqh->mqh_length_word_complete = true;
     705                 :                 mqh->mqh_partial_bytes = 0;
     706                 :             }
     707                 :         }
     708                 :     }
     709 CBC       26156 :     nbytes = mqh->mqh_expected_bytes;
     710                 : 
     711                 :     /*
     712                 :      * Should be disallowed on the sending side already, but better check and
     713                 :      * error out on the receiver side as well rather than trying to read a
     714                 :      * prohibitively large message.
     715                 :      */
     716           26156 :     if (nbytes > MaxAllocSize)
     717 UBC           0 :         ereport(ERROR,
     718                 :                 (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
     719                 :                  errmsg("invalid message size %zu in shared memory queue",
     720                 :                         nbytes)));
     721                 : 
     722 CBC       26156 :     if (mqh->mqh_partial_bytes == 0)
     723                 :     {
     724                 :         /*
     725                 :          * Try to obtain the whole message in a single chunk.  If this works,
     726                 :          * we need not copy the data and can return a pointer directly into
     727                 :          * shared memory.
     728                 :          */
     729           21804 :         res = shm_mq_receive_bytes(mqh, nbytes, nowait, &rb, &rawdata);
     730           21804 :         if (res != SHM_MQ_SUCCESS)
     731             122 :             return res;
     732           21682 :         if (rb >= nbytes)
     733                 :         {
     734             163 :             mqh->mqh_length_word_complete = false;
     735             163 :             mqh->mqh_consume_pending += MAXALIGN(nbytes);
     736             163 :             *nbytesp = nbytes;
     737             163 :             *datap = rawdata;
     738             163 :             return SHM_MQ_SUCCESS;
     739                 :         }
     740                 : 
     741                 :         /*
     742                 :          * The message has wrapped the buffer.  We'll need to copy it in order
     743                 :          * to return it to the client in one chunk.  First, make sure we have
     744                 :          * a large enough buffer available.
     745                 :          */
     746           21519 :         if (mqh->mqh_buflen < nbytes)
     747                 :         {
     748                 :             Size        newbuflen;
     749                 : 
     750                 :             /*
     751                 :              * Increase size to the next power of 2 that's >= nbytes, but
     752                 :              * limit to MaxAllocSize.
     753                 :              */
     754             126 :             newbuflen = pg_nextpower2_size_t(nbytes);
     755             126 :             newbuflen = Min(newbuflen, MaxAllocSize);
     756                 : 
     757             126 :             if (mqh->mqh_buffer != NULL)
     758                 :             {
     759 UBC           0 :                 pfree(mqh->mqh_buffer);
     760               0 :                 mqh->mqh_buffer = NULL;
     761               0 :                 mqh->mqh_buflen = 0;
     762                 :             }
     763 CBC         126 :             mqh->mqh_buffer = MemoryContextAlloc(mqh->mqh_context, newbuflen);
     764             126 :             mqh->mqh_buflen = newbuflen;
     765                 :         }
     766                 :     }
     767                 : 
     768                 :     /* Loop until we've copied the entire message. */
     769                 :     for (;;)
     770          109947 :     {
     771                 :         Size        still_needed;
     772                 : 
     773                 :         /* Copy as much as we can. */
     774          135818 :         Assert(mqh->mqh_partial_bytes + rb <= nbytes);
     775          135818 :         if (rb > 0)
     776                 :         {
     777          131466 :             memcpy(&mqh->mqh_buffer[mqh->mqh_partial_bytes], rawdata, rb);
     778          131466 :             mqh->mqh_partial_bytes += rb;
     779                 :         }
     780                 : 
     781                 :         /*
     782                 :          * Update count of bytes that can be consumed, accounting for
     783                 :          * alignment padding.  Note that this will never actually insert any
     784                 :          * padding except at the end of a message, because the buffer size is
     785                 :          * a multiple of MAXIMUM_ALIGNOF, and each read and write is as well.
     786                 :          */
     787          135818 :         Assert(mqh->mqh_partial_bytes == nbytes || rb == MAXALIGN(rb));
     788          135818 :         mqh->mqh_consume_pending += MAXALIGN(rb);
     789                 : 
     790                 :         /* If we got all the data, exit the loop. */
     791          135818 :         if (mqh->mqh_partial_bytes >= nbytes)
     792           21519 :             break;
     793                 : 
     794                 :         /* Wait for some more data. */
     795          114299 :         still_needed = nbytes - mqh->mqh_partial_bytes;
     796          114299 :         res = shm_mq_receive_bytes(mqh, still_needed, nowait, &rb, &rawdata);
     797          114299 :         if (res != SHM_MQ_SUCCESS)
     798            4352 :             return res;
     799          109947 :         if (rb > still_needed)
     800           20906 :             rb = still_needed;
     801                 :     }
     802                 : 
     803                 :     /* Return the complete message, and reset for next message. */
     804           21519 :     *nbytesp = nbytes;
     805           21519 :     *datap = mqh->mqh_buffer;
     806           21519 :     mqh->mqh_length_word_complete = false;
     807           21519 :     mqh->mqh_partial_bytes = 0;
     808           21519 :     return SHM_MQ_SUCCESS;
     809                 : }
     810                 : 
     811                 : /*
     812                 :  * Wait for the other process that's supposed to use this queue to attach
     813                 :  * to it.
     814                 :  *
     815                 :  * The return value is SHM_MQ_DETACHED if the worker has already detached or
     816                 :  * if it dies; it is SHM_MQ_SUCCESS if we detect that the worker has attached.
     817                 :  * Note that we will only be able to detect that the worker has died before
     818                 :  * attaching if a background worker handle was passed to shm_mq_attach().
     819                 :  */
     820                 : shm_mq_result
     821 UBC           0 : shm_mq_wait_for_attach(shm_mq_handle *mqh)
     822                 : {
     823               0 :     shm_mq     *mq = mqh->mqh_queue;
     824                 :     PGPROC    **victim;
     825                 : 
     826               0 :     if (shm_mq_get_receiver(mq) == MyProc)
     827               0 :         victim = &mq->mq_sender;
     828                 :     else
     829                 :     {
     830               0 :         Assert(shm_mq_get_sender(mq) == MyProc);
     831               0 :         victim = &mq->mq_receiver;
     832                 :     }
     833                 : 
     834               0 :     if (shm_mq_wait_internal(mq, victim, mqh->mqh_handle))
     835               0 :         return SHM_MQ_SUCCESS;
     836                 :     else
     837               0 :         return SHM_MQ_DETACHED;
     838                 : }
     839                 : 
     840                 : /*
     841                 :  * Detach from a shared message queue, and destroy the shm_mq_handle.
     842                 :  */
     843                 : void
     844 CBC        3765 : shm_mq_detach(shm_mq_handle *mqh)
     845                 : {
     846                 :     /* Before detaching, notify the receiver about any already-written data. */
     847            3765 :     if (mqh->mqh_send_pending > 0)
     848                 :     {
     849             651 :         shm_mq_inc_bytes_written(mqh->mqh_queue, mqh->mqh_send_pending);
     850             651 :         mqh->mqh_send_pending = 0;
     851                 :     }
     852                 : 
     853                 :     /* Notify counterparty that we're outta here. */
     854            3765 :     shm_mq_detach_internal(mqh->mqh_queue);
     855                 : 
     856                 :     /* Cancel on_dsm_detach callback, if any. */
     857            3765 :     if (mqh->mqh_segment)
     858            3765 :         cancel_on_dsm_detach(mqh->mqh_segment,
     859                 :                              shm_mq_detach_callback,
     860            3765 :                              PointerGetDatum(mqh->mqh_queue));
     861                 : 
     862                 :     /* Release local memory associated with handle. */
     863            3765 :     if (mqh->mqh_buffer != NULL)
     864             118 :         pfree(mqh->mqh_buffer);
     865            3765 :     pfree(mqh);
     866            3765 : }
     867                 : 
     868                 : /*
     869                 :  * Notify counterparty that we're detaching from shared message queue.
     870                 :  *
     871                 :  * The purpose of this function is to make sure that the process
     872                 :  * with which we're communicating doesn't block forever waiting for us to
     873                 :  * fill or drain the queue once we've lost interest.  When the sender
     874                 :  * detaches, the receiver can read any messages remaining in the queue;
     875                 :  * further reads will return SHM_MQ_DETACHED.  If the receiver detaches,
     876                 :  * further attempts to send messages will likewise return SHM_MQ_DETACHED.
     877                 :  *
     878                 :  * This is separated out from shm_mq_detach() because if the on_dsm_detach
     879                 :  * callback fires, we only want to do this much.  We do not try to touch
     880                 :  * the local shm_mq_handle, as it may have been pfree'd already.
     881                 :  */
     882                 : static void
     883            5158 : shm_mq_detach_internal(shm_mq *mq)
     884                 : {
     885                 :     PGPROC     *victim;
     886                 : 
     887            5158 :     SpinLockAcquire(&mq->mq_mutex);
     888            5158 :     if (mq->mq_sender == MyProc)
     889            2540 :         victim = mq->mq_receiver;
     890                 :     else
     891                 :     {
     892            2618 :         Assert(mq->mq_receiver == MyProc);
     893            2618 :         victim = mq->mq_sender;
     894                 :     }
     895            5158 :     mq->mq_detached = true;
     896            5158 :     SpinLockRelease(&mq->mq_mutex);
     897                 : 
     898            5158 :     if (victim != NULL)
     899            5080 :         SetLatch(&victim->procLatch);
     900            5158 : }
     901                 : 
     902                 : /*
     903                 :  * Get the shm_mq from handle.
     904                 :  */
     905                 : shm_mq *
     906         4193742 : shm_mq_get_queue(shm_mq_handle *mqh)
     907                 : {
     908         4193742 :     return mqh->mqh_queue;
     909                 : }
     910                 : 
     911                 : /*
     912                 :  * Write bytes into a shared message queue.
     913                 :  */
     914                 : static shm_mq_result
     915         1684322 : shm_mq_send_bytes(shm_mq_handle *mqh, Size nbytes, const void *data,
     916                 :                   bool nowait, Size *bytes_written)
     917                 : {
     918         1684322 :     shm_mq     *mq = mqh->mqh_queue;
     919         1684322 :     Size        sent = 0;
     920                 :     uint64      used;
     921         1684322 :     Size        ringsize = mq->mq_ring_size;
     922                 :     Size        available;
     923                 : 
     924         3601949 :     while (sent < nbytes)
     925                 :     {
     926                 :         uint64      rb;
     927                 :         uint64      wb;
     928                 : 
     929                 :         /* Compute number of ring buffer bytes used and available. */
     930         1922004 :         rb = pg_atomic_read_u64(&mq->mq_bytes_read);
     931         1922004 :         wb = pg_atomic_read_u64(&mq->mq_bytes_written) + mqh->mqh_send_pending;
     932         1922004 :         Assert(wb >= rb);
     933         1922004 :         used = wb - rb;
     934         1922004 :         Assert(used <= ringsize);
     935         1922004 :         available = Min(ringsize - used, nbytes - sent);
     936                 : 
     937                 :         /*
     938                 :          * Bail out if the queue has been detached.  Note that we would be in
     939                 :          * trouble if the compiler decided to cache the value of
     940                 :          * mq->mq_detached in a register or on the stack across loop
     941                 :          * iterations.  It probably shouldn't do that anyway since we'll
     942                 :          * always return, call an external function that performs a system
     943                 :          * call, or reach a memory barrier at some point later in the loop,
     944                 :          * but just to be sure, insert a compiler barrier here.
     945                 :          */
     946         1922004 :         pg_compiler_barrier();
     947         1922004 :         if (mq->mq_detached)
     948                 :         {
     949               3 :             *bytes_written = sent;
     950               3 :             return SHM_MQ_DETACHED;
     951                 :         }
     952                 : 
     953         1922001 :         if (available == 0 && !mqh->mqh_counterparty_attached)
     954                 :         {
     955                 :             /*
     956                 :              * The queue is full, so if the receiver isn't yet known to be
     957                 :              * attached, we must wait for that to happen.
     958                 :              */
     959               6 :             if (nowait)
     960                 :             {
     961               1 :                 if (shm_mq_counterparty_gone(mq, mqh->mqh_handle))
     962                 :                 {
     963 UBC           0 :                     *bytes_written = sent;
     964               0 :                     return SHM_MQ_DETACHED;
     965                 :                 }
     966 CBC           1 :                 if (shm_mq_get_receiver(mq) == NULL)
     967                 :                 {
     968 UBC           0 :                     *bytes_written = sent;
     969               0 :                     return SHM_MQ_WOULD_BLOCK;
     970                 :                 }
     971                 :             }
     972 CBC           5 :             else if (!shm_mq_wait_internal(mq, &mq->mq_receiver,
     973                 :                                            mqh->mqh_handle))
     974                 :             {
     975 UBC           0 :                 mq->mq_detached = true;
     976               0 :                 *bytes_written = sent;
     977               0 :                 return SHM_MQ_DETACHED;
     978                 :             }
     979 CBC           6 :             mqh->mqh_counterparty_attached = true;
     980                 : 
     981                 :             /*
     982                 :              * The receiver may have read some data after attaching, so we
     983                 :              * must not wait without rechecking the queue state.
     984                 :              */
     985                 :         }
     986         1921995 :         else if (available == 0)
     987                 :         {
     988                 :             /* Update the pending send bytes in the shared memory. */
     989          116353 :             shm_mq_inc_bytes_written(mq, mqh->mqh_send_pending);
     990                 : 
     991                 :             /*
     992                 :              * Since mq->mqh_counterparty_attached is known to be true at this
     993                 :              * point, mq_receiver has been set, and it can't change once set.
     994                 :              * Therefore, we can read it without acquiring the spinlock.
     995                 :              */
     996          116353 :             Assert(mqh->mqh_counterparty_attached);
     997          116353 :             SetLatch(&mq->mq_receiver->procLatch);
     998                 : 
     999                 :             /*
    1000                 :              * We have just updated the mqh_send_pending bytes in the shared
    1001                 :              * memory so reset it.
    1002                 :              */
    1003          116353 :             mqh->mqh_send_pending = 0;
    1004                 : 
    1005                 :             /* Skip manipulation of our latch if nowait = true. */
    1006          116353 :             if (nowait)
    1007                 :             {
    1008            4374 :                 *bytes_written = sent;
    1009            4374 :                 return SHM_MQ_WOULD_BLOCK;
    1010                 :             }
    1011                 : 
    1012                 :             /*
    1013                 :              * Wait for our latch to be set.  It might already be set for some
    1014                 :              * unrelated reason, but that'll just result in one extra trip
    1015                 :              * through the loop.  It's worth it to avoid resetting the latch
    1016                 :              * at top of loop, because setting an already-set latch is much
    1017                 :              * cheaper than setting one that has been reset.
    1018                 :              */
    1019          111979 :             (void) WaitLatch(MyLatch, WL_LATCH_SET | WL_EXIT_ON_PM_DEATH, 0,
    1020                 :                              WAIT_EVENT_MQ_SEND);
    1021                 : 
    1022                 :             /* Reset the latch so we don't spin. */
    1023          111979 :             ResetLatch(MyLatch);
    1024                 : 
    1025                 :             /* An interrupt may have occurred while we were waiting. */
    1026          111979 :             CHECK_FOR_INTERRUPTS();
    1027                 :         }
    1028                 :         else
    1029                 :         {
    1030                 :             Size        offset;
    1031                 :             Size        sendnow;
    1032                 : 
    1033         1805642 :             offset = wb % (uint64) ringsize;
    1034         1805642 :             sendnow = Min(available, ringsize - offset);
    1035                 : 
    1036                 :             /*
    1037                 :              * Write as much data as we can via a single memcpy(). Make sure
    1038                 :              * these writes happen after the read of mq_bytes_read, above.
    1039                 :              * This barrier pairs with the one in shm_mq_inc_bytes_read.
    1040                 :              * (Since we're separating the read of mq_bytes_read from a
    1041                 :              * subsequent write to mq_ring, we need a full barrier here.)
    1042                 :              */
    1043         1805642 :             pg_memory_barrier();
    1044         1805642 :             memcpy(&mq->mq_ring[mq->mq_ring_offset + offset],
    1045                 :                    (char *) data + sent, sendnow);
    1046         1805642 :             sent += sendnow;
    1047                 : 
    1048                 :             /*
    1049                 :              * Update count of bytes written, with alignment padding.  Note
    1050                 :              * that this will never actually insert any padding except at the
    1051                 :              * end of a run of bytes, because the buffer size is a multiple of
    1052                 :              * MAXIMUM_ALIGNOF, and each read is as well.
    1053                 :              */
    1054         1805642 :             Assert(sent == nbytes || sendnow == MAXALIGN(sendnow));
    1055                 : 
    1056                 :             /*
    1057                 :              * For efficiency, we don't update the bytes written in the shared
    1058                 :              * memory and also don't set the reader's latch here.  Refer to
    1059                 :              * the comments atop the shm_mq_handle structure for more
    1060                 :              * information.
    1061                 :              */
    1062         1805642 :             mqh->mqh_send_pending += MAXALIGN(sendnow);
    1063                 :         }
    1064                 :     }
    1065                 : 
    1066         1679945 :     *bytes_written = sent;
    1067         1679945 :     return SHM_MQ_SUCCESS;
    1068                 : }
    1069                 : 
    1070                 : /*
    1071                 :  * Wait until at least *nbytesp bytes are available to be read from the
    1072                 :  * shared message queue, or until the buffer wraps around.  If the queue is
    1073                 :  * detached, returns SHM_MQ_DETACHED.  If nowait is specified and a wait
    1074                 :  * would be required, returns SHM_MQ_WOULD_BLOCK.  Otherwise, *datap is set
    1075                 :  * to the location at which data bytes can be read, *nbytesp is set to the
    1076                 :  * number of bytes which can be read at that address, and the return value
    1077                 :  * is SHM_MQ_SUCCESS.
    1078                 :  */
    1079                 : static shm_mq_result
    1080         1104066 : shm_mq_receive_bytes(shm_mq_handle *mqh, Size bytes_needed, bool nowait,
    1081                 :                      Size *nbytesp, void **datap)
    1082                 : {
    1083         1104066 :     shm_mq     *mq = mqh->mqh_queue;
    1084         1104066 :     Size        ringsize = mq->mq_ring_size;
    1085                 :     uint64      used;
    1086                 :     uint64      written;
    1087                 : 
    1088                 :     for (;;)
    1089          148163 :     {
    1090                 :         Size        offset;
    1091                 :         uint64      read;
    1092                 : 
    1093                 :         /* Get bytes written, so we can compute what's available to read. */
    1094         1252229 :         written = pg_atomic_read_u64(&mq->mq_bytes_written);
    1095                 : 
    1096                 :         /*
    1097                 :          * Get bytes read.  Include bytes we could consume but have not yet
    1098                 :          * consumed.
    1099                 :          */
    1100         1252229 :         read = pg_atomic_read_u64(&mq->mq_bytes_read) +
    1101         1252229 :             mqh->mqh_consume_pending;
    1102         1252229 :         used = written - read;
    1103         1252229 :         Assert(used <= ringsize);
    1104         1252229 :         offset = read % (uint64) ringsize;
    1105                 : 
    1106                 :         /* If we have enough data or buffer has wrapped, we're done. */
    1107         1252229 :         if (used >= bytes_needed || offset + used >= ringsize)
    1108                 :         {
    1109          967928 :             *nbytesp = Min(used, ringsize - offset);
    1110          967928 :             *datap = &mq->mq_ring[mq->mq_ring_offset + offset];
    1111                 : 
    1112                 :             /*
    1113                 :              * Separate the read of mq_bytes_written, above, from caller's
    1114                 :              * attempt to read the data itself.  Pairs with the barrier in
    1115                 :              * shm_mq_inc_bytes_written.
    1116                 :              */
    1117          967928 :             pg_read_barrier();
    1118          967928 :             return SHM_MQ_SUCCESS;
    1119                 :         }
    1120                 : 
    1121                 :         /*
    1122                 :          * Fall out before waiting if the queue has been detached.
    1123                 :          *
    1124                 :          * Note that we don't check for this until *after* considering whether
    1125                 :          * the data already available is enough, since the receiver can finish
    1126                 :          * receiving a message stored in the buffer even after the sender has
    1127                 :          * detached.
    1128                 :          */
    1129          284301 :         if (mq->mq_detached)
    1130                 :         {
    1131                 :             /*
    1132                 :              * If the writer advanced mq_bytes_written and then set
    1133                 :              * mq_detached, we might not have read the final value of
    1134                 :              * mq_bytes_written above.  Insert a read barrier and then check
    1135                 :              * again if mq_bytes_written has advanced.
    1136                 :              */
    1137            1214 :             pg_read_barrier();
    1138            1214 :             if (written != pg_atomic_read_u64(&mq->mq_bytes_written))
    1139 UBC           0 :                 continue;
    1140                 : 
    1141 CBC        1214 :             return SHM_MQ_DETACHED;
    1142                 :         }
    1143                 : 
    1144                 :         /*
    1145                 :          * We didn't get enough data to satisfy the request, so mark any data
    1146                 :          * previously-consumed as read to make more buffer space.
    1147                 :          */
    1148          283087 :         if (mqh->mqh_consume_pending > 0)
    1149                 :         {
    1150          136094 :             shm_mq_inc_bytes_read(mq, mqh->mqh_consume_pending);
    1151          136094 :             mqh->mqh_consume_pending = 0;
    1152                 :         }
    1153                 : 
    1154                 :         /* Skip manipulation of our latch if nowait = true. */
    1155          283087 :         if (nowait)
    1156          134924 :             return SHM_MQ_WOULD_BLOCK;
    1157                 : 
    1158                 :         /*
    1159                 :          * Wait for our latch to be set.  It might already be set for some
    1160                 :          * unrelated reason, but that'll just result in one extra trip through
    1161                 :          * the loop.  It's worth it to avoid resetting the latch at top of
    1162                 :          * loop, because setting an already-set latch is much cheaper than
    1163                 :          * setting one that has been reset.
    1164                 :          */
    1165          148163 :         (void) WaitLatch(MyLatch, WL_LATCH_SET | WL_EXIT_ON_PM_DEATH, 0,
    1166                 :                          WAIT_EVENT_MQ_RECEIVE);
    1167                 : 
    1168                 :         /* Reset the latch so we don't spin. */
    1169          148163 :         ResetLatch(MyLatch);
    1170                 : 
    1171                 :         /* An interrupt may have occurred while we were waiting. */
    1172          148163 :         CHECK_FOR_INTERRUPTS();
    1173                 :     }
    1174                 : }
    1175                 : 
    1176                 : /*
    1177                 :  * Test whether a counterparty who may not even be alive yet is definitely gone.
    1178                 :  */
    1179                 : static bool
    1180         2493763 : shm_mq_counterparty_gone(shm_mq *mq, BackgroundWorkerHandle *handle)
    1181                 : {
    1182                 :     pid_t       pid;
    1183                 : 
    1184                 :     /* If the queue has been detached, counterparty is definitely gone. */
    1185         2493763 :     if (mq->mq_detached)
    1186             141 :         return true;
    1187                 : 
    1188                 :     /* If there's a handle, check worker status. */
    1189         2493622 :     if (handle != NULL)
    1190                 :     {
    1191                 :         BgwHandleStatus status;
    1192                 : 
    1193                 :         /* Check for unexpected worker death. */
    1194         2493611 :         status = GetBackgroundWorkerPid(handle, &pid);
    1195         2493611 :         if (status != BGWH_STARTED && status != BGWH_NOT_YET_STARTED)
    1196                 :         {
    1197                 :             /* Mark it detached, just to make it official. */
    1198 UBC           0 :             mq->mq_detached = true;
    1199               0 :             return true;
    1200                 :         }
    1201                 :     }
    1202                 : 
    1203                 :     /* Counterparty is not definitively gone. */
    1204 CBC     2493622 :     return false;
    1205                 : }
    1206                 : 
    1207                 : /*
    1208                 :  * This is used when a process is waiting for its counterpart to attach to the
    1209                 :  * queue.  We exit when the other process attaches as expected, or, if
    1210                 :  * handle != NULL, when the referenced background process or the postmaster
    1211                 :  * dies.  Note that if handle == NULL, and the process fails to attach, we'll
    1212                 :  * potentially get stuck here forever waiting for a process that may never
    1213                 :  * start.  We do check for interrupts, though.
    1214                 :  *
    1215                 :  * ptr is a pointer to the memory address that we're expecting to become
    1216                 :  * non-NULL when our counterpart attaches to the queue.
    1217                 :  */
    1218                 : static bool
    1219             170 : shm_mq_wait_internal(shm_mq *mq, PGPROC **ptr, BackgroundWorkerHandle *handle)
    1220                 : {
    1221             170 :     bool        result = false;
    1222                 : 
    1223                 :     for (;;)
    1224             403 :     {
    1225                 :         BgwHandleStatus status;
    1226                 :         pid_t       pid;
    1227                 : 
    1228                 :         /* Acquire the lock just long enough to check the pointer. */
    1229             573 :         SpinLockAcquire(&mq->mq_mutex);
    1230             573 :         result = (*ptr != NULL);
    1231             573 :         SpinLockRelease(&mq->mq_mutex);
    1232                 : 
    1233                 :         /* Fail if detached; else succeed if initialized. */
    1234             573 :         if (mq->mq_detached)
    1235                 :         {
    1236              84 :             result = false;
    1237              84 :             break;
    1238                 :         }
    1239             489 :         if (result)
    1240              86 :             break;
    1241                 : 
    1242             403 :         if (handle != NULL)
    1243                 :         {
    1244                 :             /* Check for unexpected worker death. */
    1245             403 :             status = GetBackgroundWorkerPid(handle, &pid);
    1246             403 :             if (status != BGWH_STARTED && status != BGWH_NOT_YET_STARTED)
    1247                 :             {
    1248 UBC           0 :                 result = false;
    1249               0 :                 break;
    1250                 :             }
    1251                 :         }
    1252                 : 
    1253                 :         /* Wait to be signaled. */
    1254 CBC         403 :         (void) WaitLatch(MyLatch, WL_LATCH_SET | WL_EXIT_ON_PM_DEATH, 0,
    1255                 :                          WAIT_EVENT_MQ_INTERNAL);
    1256                 : 
    1257                 :         /* Reset the latch so we don't spin. */
    1258             403 :         ResetLatch(MyLatch);
    1259                 : 
    1260                 :         /* An interrupt may have occurred while we were waiting. */
    1261             403 :         CHECK_FOR_INTERRUPTS();
    1262                 :     }
    1263                 : 
    1264             170 :     return result;
    1265                 : }
    1266                 : 
    1267                 : /*
    1268                 :  * Increment the number of bytes read.
    1269                 :  */
    1270                 : static void
    1271          155016 : shm_mq_inc_bytes_read(shm_mq *mq, Size n)
    1272                 : {
    1273                 :     PGPROC     *sender;
    1274                 : 
    1275                 :     /*
    1276                 :      * Separate prior reads of mq_ring from the increment of mq_bytes_read
    1277                 :      * which follows.  This pairs with the full barrier in
    1278                 :      * shm_mq_send_bytes(). We only need a read barrier here because the
    1279                 :      * increment of mq_bytes_read is actually a read followed by a dependent
    1280                 :      * write.
    1281                 :      */
    1282          155016 :     pg_read_barrier();
    1283                 : 
    1284                 :     /*
    1285                 :      * There's no need to use pg_atomic_fetch_add_u64 here, because nobody
    1286                 :      * else can be changing this value.  This method should be cheaper.
    1287                 :      */
    1288          155016 :     pg_atomic_write_u64(&mq->mq_bytes_read,
    1289          155016 :                         pg_atomic_read_u64(&mq->mq_bytes_read) + n);
    1290                 : 
    1291                 :     /*
    1292                 :      * We shouldn't have any bytes to read without a sender, so we can read
    1293                 :      * mq_sender here without a lock.  Once it's initialized, it can't change.
    1294                 :      */
    1295          155016 :     sender = mq->mq_sender;
    1296          155016 :     Assert(sender != NULL);
    1297          155016 :     SetLatch(&sender->procLatch);
    1298          155016 : }
    1299                 : 
    1300                 : /*
    1301                 :  * Increment the number of bytes written.
    1302                 :  */
    1303                 : static void
    1304          238590 : shm_mq_inc_bytes_written(shm_mq *mq, Size n)
    1305                 : {
    1306                 :     /*
    1307                 :      * Separate prior reads of mq_ring from the write of mq_bytes_written
    1308                 :      * which we're about to do.  Pairs with the read barrier found in
    1309                 :      * shm_mq_receive_bytes.
    1310                 :      */
    1311          238590 :     pg_write_barrier();
    1312                 : 
    1313                 :     /*
    1314                 :      * There's no need to use pg_atomic_fetch_add_u64 here, because nobody
    1315                 :      * else can be changing this value.  This method avoids taking the bus
    1316                 :      * lock unnecessarily.
    1317                 :      */
    1318          238590 :     pg_atomic_write_u64(&mq->mq_bytes_written,
    1319          238590 :                         pg_atomic_read_u64(&mq->mq_bytes_written) + n);
    1320          238590 : }
    1321                 : 
    1322                 : /* Shim for on_dsm_detach callback. */
    1323                 : static void
    1324            1393 : shm_mq_detach_callback(dsm_segment *seg, Datum arg)
    1325                 : {
    1326            1393 :     shm_mq     *mq = (shm_mq *) DatumGetPointer(arg);
    1327                 : 
    1328            1393 :     shm_mq_detach_internal(mq);
    1329            1393 : }
        

Generated by: LCOV version v1.16-55-g56c0a2a