LCOV - differential code coverage report
Current view: top level - src/backend/commands - async.c (source / functions) Coverage Total Hit UBC CBC
Current: Differential Code Coverage HEAD vs 15 Lines: 87.0 % 584 508 76 508
Current Date: 2023-04-08 15:15:32 Functions: 97.7 % 44 43 1 43
Baseline: 15
Baseline Date: 2023-04-08 15:09:40
Legend: Lines: hit not hit

           TLA  Line data    Source code
       1                 : /*-------------------------------------------------------------------------
       2                 :  *
       3                 :  * async.c
       4                 :  *    Asynchronous notification: NOTIFY, LISTEN, UNLISTEN
       5                 :  *
       6                 :  * Portions Copyright (c) 1996-2023, PostgreSQL Global Development Group
       7                 :  * Portions Copyright (c) 1994, Regents of the University of California
       8                 :  *
       9                 :  * IDENTIFICATION
      10                 :  *    src/backend/commands/async.c
      11                 :  *
      12                 :  *-------------------------------------------------------------------------
      13                 :  */
      14                 : 
      15                 : /*-------------------------------------------------------------------------
      16                 :  * Async Notification Model as of 9.0:
      17                 :  *
      18                 :  * 1. Multiple backends on same machine. Multiple backends listening on
      19                 :  *    several channels. (Channels are also called "conditions" in other
      20                 :  *    parts of the code.)
      21                 :  *
      22                 :  * 2. There is one central queue in disk-based storage (directory pg_notify/),
      23                 :  *    with actively-used pages mapped into shared memory by the slru.c module.
      24                 :  *    All notification messages are placed in the queue and later read out
      25                 :  *    by listening backends.
      26                 :  *
      27                 :  *    There is no central knowledge of which backend listens on which channel;
      28                 :  *    every backend has its own list of interesting channels.
      29                 :  *
      30                 :  *    Although there is only one queue, notifications are treated as being
      31                 :  *    database-local; this is done by including the sender's database OID
      32                 :  *    in each notification message.  Listening backends ignore messages
      33                 :  *    that don't match their database OID.  This is important because it
      34                 :  *    ensures senders and receivers have the same database encoding and won't
      35                 :  *    misinterpret non-ASCII text in the channel name or payload string.
      36                 :  *
      37                 :  *    Since notifications are not expected to survive database crashes,
      38                 :  *    we can simply clean out the pg_notify data at any reboot, and there
      39                 :  *    is no need for WAL support or fsync'ing.
      40                 :  *
      41                 :  * 3. Every backend that is listening on at least one channel registers by
      42                 :  *    entering its PID into the array in AsyncQueueControl. It then scans all
      43                 :  *    incoming notifications in the central queue and first compares the
      44                 :  *    database OID of the notification with its own database OID and then
      45                 :  *    compares the notified channel with the list of channels that it listens
      46                 :  *    to. In case there is a match it delivers the notification event to its
      47                 :  *    frontend.  Non-matching events are simply skipped.
      48                 :  *
      49                 :  * 4. The NOTIFY statement (routine Async_Notify) stores the notification in
      50                 :  *    a backend-local list which will not be processed until transaction end.
      51                 :  *
      52                 :  *    Duplicate notifications from the same transaction are sent out as one
      53                 :  *    notification only. This is done to save work when for example a trigger
      54                 :  *    on a 2 million row table fires a notification for each row that has been
      55                 :  *    changed. If the application needs to receive every single notification
      56                 :  *    that has been sent, it can easily add some unique string into the extra
      57                 :  *    payload parameter.
      58                 :  *
      59                 :  *    When the transaction is ready to commit, PreCommit_Notify() adds the
      60                 :  *    pending notifications to the head of the queue. The head pointer of the
      61                 :  *    queue always points to the next free position and a position is just a
      62                 :  *    page number and the offset in that page. This is done before marking the
      63                 :  *    transaction as committed in clog. If we run into problems writing the
      64                 :  *    notifications, we can still call elog(ERROR, ...) and the transaction
      65                 :  *    will roll back.
      66                 :  *
      67                 :  *    Once we have put all of the notifications into the queue, we return to
      68                 :  *    CommitTransaction() which will then do the actual transaction commit.
      69                 :  *
      70                 :  *    After commit we are called another time (AtCommit_Notify()). Here we
      71                 :  *    make any actual updates to the effective listen state (listenChannels).
      72                 :  *    Then we signal any backends that may be interested in our messages
      73                 :  *    (including our own backend, if listening).  This is done by
      74                 :  *    SignalBackends(), which scans the list of listening backends and sends a
      75                 :  *    PROCSIG_NOTIFY_INTERRUPT signal to every listening backend (we don't
      76                 :  *    know which backend is listening on which channel so we must signal them
      77                 :  *    all).  We can exclude backends that are already up to date, though, and
      78                 :  *    we can also exclude backends that are in other databases (unless they
      79                 :  *    are way behind and should be kicked to make them advance their
      80                 :  *    pointers).
      81                 :  *
      82                 :  *    Finally, after we are out of the transaction altogether and about to go
      83                 :  *    idle, we scan the queue for messages that need to be sent to our
      84                 :  *    frontend (which might be notifies from other backends, or self-notifies
      85                 :  *    from our own).  This step is not part of the CommitTransaction sequence
      86                 :  *    for two important reasons.  First, we could get errors while sending
      87                 :  *    data to our frontend, and it's really bad for errors to happen in
      88                 :  *    post-commit cleanup.  Second, in cases where a procedure issues commits
      89                 :  *    within a single frontend command, we don't want to send notifies to our
      90                 :  *    frontend until the command is done; but notifies to other backends
      91                 :  *    should go out immediately after each commit.
      92                 :  *
      93                 :  * 5. Upon receipt of a PROCSIG_NOTIFY_INTERRUPT signal, the signal handler
      94                 :  *    sets the process's latch, which triggers the event to be processed
      95                 :  *    immediately if this backend is idle (i.e., it is waiting for a frontend
      96                 :  *    command and is not within a transaction block. C.f.
      97                 :  *    ProcessClientReadInterrupt()).  Otherwise the handler may only set a
      98                 :  *    flag, which will cause the processing to occur just before we next go
      99                 :  *    idle.
     100                 :  *
     101                 :  *    Inbound-notify processing consists of reading all of the notifications
     102                 :  *    that have arrived since scanning last time. We read every notification
     103                 :  *    until we reach either a notification from an uncommitted transaction or
     104                 :  *    the head pointer's position.
     105                 :  *
     106                 :  * 6. To avoid SLRU wraparound and limit disk space consumption, the tail
     107                 :  *    pointer needs to be advanced so that old pages can be truncated.
     108                 :  *    This is relatively expensive (notably, it requires an exclusive lock),
     109                 :  *    so we don't want to do it often.  We make sending backends do this work
     110                 :  *    if they advanced the queue head into a new page, but only once every
     111                 :  *    QUEUE_CLEANUP_DELAY pages.
     112                 :  *
     113                 :  * An application that listens on the same channel it notifies will get
     114                 :  * NOTIFY messages for its own NOTIFYs.  These can be ignored, if not useful,
     115                 :  * by comparing be_pid in the NOTIFY message to the application's own backend's
     116                 :  * PID.  (As of FE/BE protocol 2.0, the backend's PID is provided to the
     117                 :  * frontend during startup.)  The above design guarantees that notifies from
     118                 :  * other backends will never be missed by ignoring self-notifies.
     119                 :  *
     120                 :  * The amount of shared memory used for notify management (NUM_NOTIFY_BUFFERS)
     121                 :  * can be varied without affecting anything but performance.  The maximum
     122                 :  * amount of notification data that can be queued at one time is determined
     123                 :  * by slru.c's wraparound limit; see QUEUE_MAX_PAGE below.
     124                 :  *-------------------------------------------------------------------------
     125                 :  */
     126                 : 
     127                 : #include "postgres.h"
     128                 : 
     129                 : #include <limits.h>
     130                 : #include <unistd.h>
     131                 : #include <signal.h>
     132                 : 
     133                 : #include "access/parallel.h"
     134                 : #include "access/slru.h"
     135                 : #include "access/transam.h"
     136                 : #include "access/xact.h"
     137                 : #include "catalog/pg_database.h"
     138                 : #include "commands/async.h"
     139                 : #include "common/hashfn.h"
     140                 : #include "funcapi.h"
     141                 : #include "libpq/libpq.h"
     142                 : #include "libpq/pqformat.h"
     143                 : #include "miscadmin.h"
     144                 : #include "storage/ipc.h"
     145                 : #include "storage/lmgr.h"
     146                 : #include "storage/proc.h"
     147                 : #include "storage/procarray.h"
     148                 : #include "storage/procsignal.h"
     149                 : #include "storage/sinval.h"
     150                 : #include "tcop/tcopprot.h"
     151                 : #include "utils/builtins.h"
     152                 : #include "utils/memutils.h"
     153                 : #include "utils/ps_status.h"
     154                 : #include "utils/snapmgr.h"
     155                 : #include "utils/timestamp.h"
     156                 : 
     157                 : 
     158                 : /*
     159                 :  * Maximum size of a NOTIFY payload, including terminating NULL.  This
     160                 :  * must be kept small enough so that a notification message fits on one
     161                 :  * SLRU page.  The magic fudge factor here is noncritical as long as it's
     162                 :  * more than AsyncQueueEntryEmptySize --- we make it significantly bigger
     163                 :  * than that, so changes in that data structure won't affect user-visible
     164                 :  * restrictions.
     165                 :  */
     166                 : #define NOTIFY_PAYLOAD_MAX_LENGTH   (BLCKSZ - NAMEDATALEN - 128)
     167                 : 
     168                 : /*
     169                 :  * Struct representing an entry in the global notify queue
     170                 :  *
     171                 :  * This struct declaration has the maximal length, but in a real queue entry
     172                 :  * the data area is only big enough for the actual channel and payload strings
     173                 :  * (each null-terminated).  AsyncQueueEntryEmptySize is the minimum possible
     174                 :  * entry size, if both channel and payload strings are empty (but note it
     175                 :  * doesn't include alignment padding).
     176                 :  *
     177                 :  * The "length" field should always be rounded up to the next QUEUEALIGN
     178                 :  * multiple so that all fields are properly aligned.
     179                 :  */
     180                 : typedef struct AsyncQueueEntry
     181                 : {
     182                 :     int         length;         /* total allocated length of entry */
     183                 :     Oid         dboid;          /* sender's database OID */
     184                 :     TransactionId xid;          /* sender's XID */
     185                 :     int32       srcPid;         /* sender's PID */
     186                 :     char        data[NAMEDATALEN + NOTIFY_PAYLOAD_MAX_LENGTH];
     187                 : } AsyncQueueEntry;
     188                 : 
     189                 : /* Currently, no field of AsyncQueueEntry requires more than int alignment */
     190                 : #define QUEUEALIGN(len)     INTALIGN(len)
     191                 : 
     192                 : #define AsyncQueueEntryEmptySize    (offsetof(AsyncQueueEntry, data) + 2)
     193                 : 
     194                 : /*
     195                 :  * Struct describing a queue position, and assorted macros for working with it
     196                 :  */
     197                 : typedef struct QueuePosition
     198                 : {
     199                 :     int         page;           /* SLRU page number */
     200                 :     int         offset;         /* byte offset within page */
     201                 : } QueuePosition;
     202                 : 
     203                 : #define QUEUE_POS_PAGE(x)       ((x).page)
     204                 : #define QUEUE_POS_OFFSET(x)     ((x).offset)
     205                 : 
     206                 : #define SET_QUEUE_POS(x,y,z) \
     207                 :     do { \
     208                 :         (x).page = (y); \
     209                 :         (x).offset = (z); \
     210                 :     } while (0)
     211                 : 
     212                 : #define QUEUE_POS_EQUAL(x,y) \
     213                 :     ((x).page == (y).page && (x).offset == (y).offset)
     214                 : 
     215                 : #define QUEUE_POS_IS_ZERO(x) \
     216                 :     ((x).page == 0 && (x).offset == 0)
     217                 : 
     218                 : /* choose logically smaller QueuePosition */
     219                 : #define QUEUE_POS_MIN(x,y) \
     220                 :     (asyncQueuePagePrecedes((x).page, (y).page) ? (x) : \
     221                 :      (x).page != (y).page ? (y) : \
     222                 :      (x).offset < (y).offset ? (x) : (y))
     223                 : 
     224                 : /* choose logically larger QueuePosition */
     225                 : #define QUEUE_POS_MAX(x,y) \
     226                 :     (asyncQueuePagePrecedes((x).page, (y).page) ? (y) : \
     227                 :      (x).page != (y).page ? (x) : \
     228                 :      (x).offset > (y).offset ? (x) : (y))
     229                 : 
     230                 : /*
     231                 :  * Parameter determining how often we try to advance the tail pointer:
     232                 :  * we do that after every QUEUE_CLEANUP_DELAY pages of NOTIFY data.  This is
     233                 :  * also the distance by which a backend in another database needs to be
     234                 :  * behind before we'll decide we need to wake it up to advance its pointer.
     235                 :  *
     236                 :  * Resist the temptation to make this really large.  While that would save
     237                 :  * work in some places, it would add cost in others.  In particular, this
     238                 :  * should likely be less than NUM_NOTIFY_BUFFERS, to ensure that backends
     239                 :  * catch up before the pages they'll need to read fall out of SLRU cache.
     240                 :  */
     241                 : #define QUEUE_CLEANUP_DELAY 4
     242                 : 
     243                 : /*
     244                 :  * Struct describing a listening backend's status
     245                 :  */
     246                 : typedef struct QueueBackendStatus
     247                 : {
     248                 :     int32       pid;            /* either a PID or InvalidPid */
     249                 :     Oid         dboid;          /* backend's database OID, or InvalidOid */
     250                 :     BackendId   nextListener;   /* id of next listener, or InvalidBackendId */
     251                 :     QueuePosition pos;          /* backend has read queue up to here */
     252                 : } QueueBackendStatus;
     253                 : 
     254                 : /*
     255                 :  * Shared memory state for LISTEN/NOTIFY (excluding its SLRU stuff)
     256                 :  *
     257                 :  * The AsyncQueueControl structure is protected by the NotifyQueueLock and
     258                 :  * NotifyQueueTailLock.
     259                 :  *
     260                 :  * When holding NotifyQueueLock in SHARED mode, backends may only inspect
     261                 :  * their own entries as well as the head and tail pointers. Consequently we
     262                 :  * can allow a backend to update its own record while holding only SHARED lock
     263                 :  * (since no other backend will inspect it).
     264                 :  *
     265                 :  * When holding NotifyQueueLock in EXCLUSIVE mode, backends can inspect the
     266                 :  * entries of other backends and also change the head pointer. When holding
     267                 :  * both NotifyQueueLock and NotifyQueueTailLock in EXCLUSIVE mode, backends
     268                 :  * can change the tail pointers.
     269                 :  *
     270                 :  * NotifySLRULock is used as the control lock for the pg_notify SLRU buffers.
     271                 :  * In order to avoid deadlocks, whenever we need multiple locks, we first get
     272                 :  * NotifyQueueTailLock, then NotifyQueueLock, and lastly NotifySLRULock.
     273                 :  *
     274                 :  * Each backend uses the backend[] array entry with index equal to its
     275                 :  * BackendId (which can range from 1 to MaxBackends).  We rely on this to make
     276                 :  * SendProcSignal fast.
     277                 :  *
     278                 :  * The backend[] array entries for actively-listening backends are threaded
     279                 :  * together using firstListener and the nextListener links, so that we can
     280                 :  * scan them without having to iterate over inactive entries.  We keep this
     281                 :  * list in order by BackendId so that the scan is cache-friendly when there
     282                 :  * are many active entries.
     283                 :  */
     284                 : typedef struct AsyncQueueControl
     285                 : {
     286                 :     QueuePosition head;         /* head points to the next free location */
     287                 :     QueuePosition tail;         /* tail must be <= the queue position of every
     288                 :                                  * listening backend */
     289                 :     int         stopPage;       /* oldest unrecycled page; must be <=
     290                 :                                  * tail.page */
     291                 :     BackendId   firstListener;  /* id of first listener, or InvalidBackendId */
     292                 :     TimestampTz lastQueueFillWarn;  /* time of last queue-full msg */
     293                 :     QueueBackendStatus backend[FLEXIBLE_ARRAY_MEMBER];
     294                 :     /* backend[0] is not used; used entries are from [1] to [MaxBackends] */
     295                 : } AsyncQueueControl;
     296                 : 
     297                 : static AsyncQueueControl *asyncQueueControl;
     298                 : 
     299                 : #define QUEUE_HEAD                  (asyncQueueControl->head)
     300                 : #define QUEUE_TAIL                  (asyncQueueControl->tail)
     301                 : #define QUEUE_STOP_PAGE             (asyncQueueControl->stopPage)
     302                 : #define QUEUE_FIRST_LISTENER        (asyncQueueControl->firstListener)
     303                 : #define QUEUE_BACKEND_PID(i)        (asyncQueueControl->backend[i].pid)
     304                 : #define QUEUE_BACKEND_DBOID(i)      (asyncQueueControl->backend[i].dboid)
     305                 : #define QUEUE_NEXT_LISTENER(i)      (asyncQueueControl->backend[i].nextListener)
     306                 : #define QUEUE_BACKEND_POS(i)        (asyncQueueControl->backend[i].pos)
     307                 : 
     308                 : /*
     309                 :  * The SLRU buffer area through which we access the notification queue
     310                 :  */
     311                 : static SlruCtlData NotifyCtlData;
     312                 : 
     313                 : #define NotifyCtl                   (&NotifyCtlData)
     314                 : #define QUEUE_PAGESIZE              BLCKSZ
     315                 : #define QUEUE_FULL_WARN_INTERVAL    5000    /* warn at most once every 5s */
     316                 : 
     317                 : /*
     318                 :  * Use segments 0000 through FFFF.  Each contains SLRU_PAGES_PER_SEGMENT pages
     319                 :  * which gives us the pages from 0 to SLRU_PAGES_PER_SEGMENT * 0x10000 - 1.
     320                 :  * We could use as many segments as SlruScanDirectory() allows, but this gives
     321                 :  * us so much space already that it doesn't seem worth the trouble.
     322                 :  *
     323                 :  * The most data we can have in the queue at a time is QUEUE_MAX_PAGE/2
     324                 :  * pages, because more than that would confuse slru.c into thinking there
     325                 :  * was a wraparound condition.  With the default BLCKSZ this means there
     326                 :  * can be up to 8GB of queued-and-not-read data.
     327                 :  *
     328                 :  * Note: it's possible to redefine QUEUE_MAX_PAGE with a smaller multiple of
     329                 :  * SLRU_PAGES_PER_SEGMENT, for easier testing of queue-full behaviour.
     330                 :  */
     331                 : #define QUEUE_MAX_PAGE          (SLRU_PAGES_PER_SEGMENT * 0x10000 - 1)
     332                 : 
     333                 : /*
     334                 :  * listenChannels identifies the channels we are actually listening to
     335                 :  * (ie, have committed a LISTEN on).  It is a simple list of channel names,
     336                 :  * allocated in TopMemoryContext.
     337                 :  */
     338                 : static List *listenChannels = NIL;  /* list of C strings */
     339                 : 
     340                 : /*
     341                 :  * State for pending LISTEN/UNLISTEN actions consists of an ordered list of
     342                 :  * all actions requested in the current transaction.  As explained above,
     343                 :  * we don't actually change listenChannels until we reach transaction commit.
     344                 :  *
     345                 :  * The list is kept in CurTransactionContext.  In subtransactions, each
     346                 :  * subtransaction has its own list in its own CurTransactionContext, but
     347                 :  * successful subtransactions attach their lists to their parent's list.
     348                 :  * Failed subtransactions simply discard their lists.
     349                 :  */
     350                 : typedef enum
     351                 : {
     352                 :     LISTEN_LISTEN,
     353                 :     LISTEN_UNLISTEN,
     354                 :     LISTEN_UNLISTEN_ALL
     355                 : } ListenActionKind;
     356                 : 
     357                 : typedef struct
     358                 : {
     359                 :     ListenActionKind action;
     360                 :     char        channel[FLEXIBLE_ARRAY_MEMBER]; /* nul-terminated string */
     361                 : } ListenAction;
     362                 : 
     363                 : typedef struct ActionList
     364                 : {
     365                 :     int         nestingLevel;   /* current transaction nesting depth */
     366                 :     List       *actions;        /* list of ListenAction structs */
     367                 :     struct ActionList *upper;   /* details for upper transaction levels */
     368                 : } ActionList;
     369                 : 
     370                 : static ActionList *pendingActions = NULL;
     371                 : 
     372                 : /*
     373                 :  * State for outbound notifies consists of a list of all channels+payloads
     374                 :  * NOTIFYed in the current transaction.  We do not actually perform a NOTIFY
     375                 :  * until and unless the transaction commits.  pendingNotifies is NULL if no
     376                 :  * NOTIFYs have been done in the current (sub) transaction.
     377                 :  *
     378                 :  * We discard duplicate notify events issued in the same transaction.
     379                 :  * Hence, in addition to the list proper (which we need to track the order
     380                 :  * of the events, since we guarantee to deliver them in order), we build a
     381                 :  * hash table which we can probe to detect duplicates.  Since building the
     382                 :  * hash table is somewhat expensive, we do so only once we have at least
     383                 :  * MIN_HASHABLE_NOTIFIES events queued in the current (sub) transaction;
     384                 :  * before that we just scan the events linearly.
     385                 :  *
     386                 :  * The list is kept in CurTransactionContext.  In subtransactions, each
     387                 :  * subtransaction has its own list in its own CurTransactionContext, but
     388                 :  * successful subtransactions add their entries to their parent's list.
     389                 :  * Failed subtransactions simply discard their lists.  Since these lists
     390                 :  * are independent, there may be notify events in a subtransaction's list
     391                 :  * that duplicate events in some ancestor (sub) transaction; we get rid of
     392                 :  * the dups when merging the subtransaction's list into its parent's.
     393                 :  *
     394                 :  * Note: the action and notify lists do not interact within a transaction.
     395                 :  * In particular, if a transaction does NOTIFY and then LISTEN on the same
     396                 :  * condition name, it will get a self-notify at commit.  This is a bit odd
     397                 :  * but is consistent with our historical behavior.
     398                 :  */
     399                 : typedef struct Notification
     400                 : {
     401                 :     uint16      channel_len;    /* length of channel-name string */
     402                 :     uint16      payload_len;    /* length of payload string */
     403                 :     /* null-terminated channel name, then null-terminated payload follow */
     404                 :     char        data[FLEXIBLE_ARRAY_MEMBER];
     405                 : } Notification;
     406                 : 
     407                 : typedef struct NotificationList
     408                 : {
     409                 :     int         nestingLevel;   /* current transaction nesting depth */
     410                 :     List       *events;         /* list of Notification structs */
     411                 :     HTAB       *hashtab;        /* hash of NotificationHash structs, or NULL */
     412                 :     struct NotificationList *upper; /* details for upper transaction levels */
     413                 : } NotificationList;
     414                 : 
     415                 : #define MIN_HASHABLE_NOTIFIES 16    /* threshold to build hashtab */
     416                 : 
     417                 : typedef struct NotificationHash
     418                 : {
     419                 :     Notification *event;        /* => the actual Notification struct */
     420                 : } NotificationHash;
     421                 : 
     422                 : static NotificationList *pendingNotifies = NULL;
     423                 : 
     424                 : /*
     425                 :  * Inbound notifications are initially processed by HandleNotifyInterrupt(),
     426                 :  * called from inside a signal handler. That just sets the
     427                 :  * notifyInterruptPending flag and sets the process
     428                 :  * latch. ProcessNotifyInterrupt() will then be called whenever it's safe to
     429                 :  * actually deal with the interrupt.
     430                 :  */
     431                 : volatile sig_atomic_t notifyInterruptPending = false;
     432                 : 
     433                 : /* True if we've registered an on_shmem_exit cleanup */
     434                 : static bool unlistenExitRegistered = false;
     435                 : 
     436                 : /* True if we're currently registered as a listener in asyncQueueControl */
     437                 : static bool amRegisteredListener = false;
     438                 : 
     439                 : /* have we advanced to a page that's a multiple of QUEUE_CLEANUP_DELAY? */
     440                 : static bool tryAdvanceTail = false;
     441                 : 
     442                 : /* GUC parameter */
     443                 : bool        Trace_notify = false;
     444                 : 
     445                 : /* local function prototypes */
     446                 : static int  asyncQueuePageDiff(int p, int q);
     447                 : static bool asyncQueuePagePrecedes(int p, int q);
     448                 : static void queue_listen(ListenActionKind action, const char *channel);
     449                 : static void Async_UnlistenOnExit(int code, Datum arg);
     450                 : static void Exec_ListenPreCommit(void);
     451                 : static void Exec_ListenCommit(const char *channel);
     452                 : static void Exec_UnlistenCommit(const char *channel);
     453                 : static void Exec_UnlistenAllCommit(void);
     454                 : static bool IsListeningOn(const char *channel);
     455                 : static void asyncQueueUnregister(void);
     456                 : static bool asyncQueueIsFull(void);
     457                 : static bool asyncQueueAdvance(volatile QueuePosition *position, int entryLength);
     458                 : static void asyncQueueNotificationToEntry(Notification *n, AsyncQueueEntry *qe);
     459                 : static ListCell *asyncQueueAddEntries(ListCell *nextNotify);
     460                 : static double asyncQueueUsage(void);
     461                 : static void asyncQueueFillWarning(void);
     462                 : static void SignalBackends(void);
     463                 : static void asyncQueueReadAllNotifications(void);
     464                 : static bool asyncQueueProcessPageEntries(volatile QueuePosition *current,
     465                 :                                          QueuePosition stop,
     466                 :                                          char *page_buffer,
     467                 :                                          Snapshot snapshot);
     468                 : static void asyncQueueAdvanceTail(void);
     469                 : static void ProcessIncomingNotify(bool flush);
     470                 : static bool AsyncExistsPendingNotify(Notification *n);
     471                 : static void AddEventToPendingNotifies(Notification *n);
     472                 : static uint32 notification_hash(const void *key, Size keysize);
     473                 : static int  notification_match(const void *key1, const void *key2, Size keysize);
     474                 : static void ClearPendingActionsAndNotifies(void);
     475                 : 
     476                 : /*
     477                 :  * Compute the difference between two queue page numbers (i.e., p - q),
     478                 :  * accounting for wraparound.
     479                 :  */
     480                 : static int
     481 CBC         108 : asyncQueuePageDiff(int p, int q)
     482                 : {
     483                 :     int         diff;
     484                 : 
     485                 :     /*
     486                 :      * We have to compare modulo (QUEUE_MAX_PAGE+1)/2.  Both inputs should be
     487                 :      * in the range 0..QUEUE_MAX_PAGE.
     488                 :      */
     489             108 :     Assert(p >= 0 && p <= QUEUE_MAX_PAGE);
     490             108 :     Assert(q >= 0 && q <= QUEUE_MAX_PAGE);
     491                 : 
     492             108 :     diff = p - q;
     493             108 :     if (diff >= ((QUEUE_MAX_PAGE + 1) / 2))
     494 UBC           0 :         diff -= QUEUE_MAX_PAGE + 1;
     495 CBC         108 :     else if (diff < -((QUEUE_MAX_PAGE + 1) / 2))
     496 UBC           0 :         diff += QUEUE_MAX_PAGE + 1;
     497 CBC         108 :     return diff;
     498                 : }
     499                 : 
     500                 : /*
     501                 :  * Is p < q, accounting for wraparound?
     502                 :  *
     503                 :  * Since asyncQueueIsFull() blocks creation of a page that could precede any
     504                 :  * extant page, we need not assess entries within a page.
     505                 :  */
     506                 : static bool
     507             108 : asyncQueuePagePrecedes(int p, int q)
     508                 : {
     509             108 :     return asyncQueuePageDiff(p, q) < 0;
     510                 : }
     511                 : 
     512                 : /*
     513                 :  * Report space needed for our shared memory area
     514                 :  */
     515                 : Size
     516            2738 : AsyncShmemSize(void)
     517                 : {
     518                 :     Size        size;
     519                 : 
     520                 :     /* This had better match AsyncShmemInit */
     521            2738 :     size = mul_size(MaxBackends + 1, sizeof(QueueBackendStatus));
     522            2738 :     size = add_size(size, offsetof(AsyncQueueControl, backend));
     523                 : 
     524            2738 :     size = add_size(size, SimpleLruShmemSize(NUM_NOTIFY_BUFFERS, 0));
     525                 : 
     526            2738 :     return size;
     527                 : }
     528                 : 
     529                 : /*
     530                 :  * Initialize our shared memory area
     531                 :  */
     532                 : void
     533            1826 : AsyncShmemInit(void)
     534                 : {
     535                 :     bool        found;
     536                 :     Size        size;
     537                 : 
     538                 :     /*
     539                 :      * Create or attach to the AsyncQueueControl structure.
     540                 :      *
     541                 :      * The used entries in the backend[] array run from 1 to MaxBackends; the
     542                 :      * zero'th entry is unused but must be allocated.
     543                 :      */
     544            1826 :     size = mul_size(MaxBackends + 1, sizeof(QueueBackendStatus));
     545            1826 :     size = add_size(size, offsetof(AsyncQueueControl, backend));
     546                 : 
     547            1826 :     asyncQueueControl = (AsyncQueueControl *)
     548            1826 :         ShmemInitStruct("Async Queue Control", size, &found);
     549                 : 
     550            1826 :     if (!found)
     551                 :     {
     552                 :         /* First time through, so initialize it */
     553            1826 :         SET_QUEUE_POS(QUEUE_HEAD, 0, 0);
     554            1826 :         SET_QUEUE_POS(QUEUE_TAIL, 0, 0);
     555            1826 :         QUEUE_STOP_PAGE = 0;
     556            1826 :         QUEUE_FIRST_LISTENER = InvalidBackendId;
     557            1826 :         asyncQueueControl->lastQueueFillWarn = 0;
     558                 :         /* zero'th entry won't be used, but let's initialize it anyway */
     559          195029 :         for (int i = 0; i <= MaxBackends; i++)
     560                 :         {
     561          193203 :             QUEUE_BACKEND_PID(i) = InvalidPid;
     562          193203 :             QUEUE_BACKEND_DBOID(i) = InvalidOid;
     563          193203 :             QUEUE_NEXT_LISTENER(i) = InvalidBackendId;
     564          193203 :             SET_QUEUE_POS(QUEUE_BACKEND_POS(i), 0, 0);
     565                 :         }
     566                 :     }
     567                 : 
     568                 :     /*
     569                 :      * Set up SLRU management of the pg_notify data.
     570                 :      */
     571            1826 :     NotifyCtl->PagePrecedes = asyncQueuePagePrecedes;
     572            1826 :     SimpleLruInit(NotifyCtl, "Notify", NUM_NOTIFY_BUFFERS, 0,
     573            1826 :                   NotifySLRULock, "pg_notify", LWTRANCHE_NOTIFY_BUFFER,
     574                 :                   SYNC_HANDLER_NONE);
     575                 : 
     576            1826 :     if (!found)
     577                 :     {
     578                 :         /*
     579                 :          * During start or reboot, clean out the pg_notify directory.
     580                 :          */
     581            1826 :         (void) SlruScanDirectory(NotifyCtl, SlruScanDirCbDeleteAll, NULL);
     582                 :     }
     583            1826 : }
     584                 : 
     585                 : 
     586                 : /*
     587                 :  * pg_notify -
     588                 :  *    SQL function to send a notification event
     589                 :  */
     590                 : Datum
     591            1054 : pg_notify(PG_FUNCTION_ARGS)
     592                 : {
     593                 :     const char *channel;
     594                 :     const char *payload;
     595                 : 
     596            1054 :     if (PG_ARGISNULL(0))
     597               3 :         channel = "";
     598                 :     else
     599            1051 :         channel = text_to_cstring(PG_GETARG_TEXT_PP(0));
     600                 : 
     601            1054 :     if (PG_ARGISNULL(1))
     602               6 :         payload = "";
     603                 :     else
     604            1048 :         payload = text_to_cstring(PG_GETARG_TEXT_PP(1));
     605                 : 
     606                 :     /* For NOTIFY as a statement, this is checked in ProcessUtility */
     607            1054 :     PreventCommandDuringRecovery("NOTIFY");
     608                 : 
     609            1054 :     Async_Notify(channel, payload);
     610                 : 
     611            1045 :     PG_RETURN_VOID();
     612                 : }
     613                 : 
     614                 : 
     615                 : /*
     616                 :  * Async_Notify
     617                 :  *
     618                 :  *      This is executed by the SQL notify command.
     619                 :  *
     620                 :  *      Adds the message to the list of pending notifies.
     621                 :  *      Actual notification happens during transaction commit.
     622                 :  *      ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
     623                 :  */
     624                 : void
     625            1103 : Async_Notify(const char *channel, const char *payload)
     626                 : {
     627            1103 :     int         my_level = GetCurrentTransactionNestLevel();
     628                 :     size_t      channel_len;
     629                 :     size_t      payload_len;
     630                 :     Notification *n;
     631                 :     MemoryContext oldcontext;
     632                 : 
     633            1103 :     if (IsParallelWorker())
     634 UBC           0 :         elog(ERROR, "cannot send notifications from a parallel worker");
     635                 : 
     636 CBC        1103 :     if (Trace_notify)
     637 UBC           0 :         elog(DEBUG1, "Async_Notify(%s)", channel);
     638                 : 
     639 CBC        1103 :     channel_len = channel ? strlen(channel) : 0;
     640            1103 :     payload_len = payload ? strlen(payload) : 0;
     641                 : 
     642                 :     /* a channel name must be specified */
     643            1103 :     if (channel_len == 0)
     644               6 :         ereport(ERROR,
     645                 :                 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
     646                 :                  errmsg("channel name cannot be empty")));
     647                 : 
     648                 :     /* enforce length limits */
     649            1097 :     if (channel_len >= NAMEDATALEN)
     650               3 :         ereport(ERROR,
     651                 :                 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
     652                 :                  errmsg("channel name too long")));
     653                 : 
     654            1094 :     if (payload_len >= NOTIFY_PAYLOAD_MAX_LENGTH)
     655 UBC           0 :         ereport(ERROR,
     656                 :                 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
     657                 :                  errmsg("payload string too long")));
     658                 : 
     659                 :     /*
     660                 :      * We must construct the Notification entry, even if we end up not using
     661                 :      * it, in order to compare it cheaply to existing list entries.
     662                 :      *
     663                 :      * The notification list needs to live until end of transaction, so store
     664                 :      * it in the transaction context.
     665                 :      */
     666 CBC        1094 :     oldcontext = MemoryContextSwitchTo(CurTransactionContext);
     667                 : 
     668            1094 :     n = (Notification *) palloc(offsetof(Notification, data) +
     669            1094 :                                 channel_len + payload_len + 2);
     670            1094 :     n->channel_len = channel_len;
     671            1094 :     n->payload_len = payload_len;
     672            1094 :     strcpy(n->data, channel);
     673            1094 :     if (payload)
     674            1081 :         strcpy(n->data + channel_len + 1, payload);
     675                 :     else
     676              13 :         n->data[channel_len + 1] = '\0';
     677                 : 
     678            1094 :     if (pendingNotifies == NULL || my_level > pendingNotifies->nestingLevel)
     679              50 :     {
     680                 :         NotificationList *notifies;
     681                 : 
     682                 :         /*
     683                 :          * First notify event in current (sub)xact. Note that we allocate the
     684                 :          * NotificationList in TopTransactionContext; the nestingLevel might
     685                 :          * get changed later by AtSubCommit_Notify.
     686                 :          */
     687                 :         notifies = (NotificationList *)
     688              50 :             MemoryContextAlloc(TopTransactionContext,
     689                 :                                sizeof(NotificationList));
     690              50 :         notifies->nestingLevel = my_level;
     691              50 :         notifies->events = list_make1(n);
     692                 :         /* We certainly don't need a hashtable yet */
     693              50 :         notifies->hashtab = NULL;
     694              50 :         notifies->upper = pendingNotifies;
     695              50 :         pendingNotifies = notifies;
     696                 :     }
     697                 :     else
     698                 :     {
     699                 :         /* Now check for duplicates */
     700            1044 :         if (AsyncExistsPendingNotify(n))
     701                 :         {
     702                 :             /* It's a dup, so forget it */
     703              12 :             pfree(n);
     704              12 :             MemoryContextSwitchTo(oldcontext);
     705              12 :             return;
     706                 :         }
     707                 : 
     708                 :         /* Append more events to existing list */
     709            1032 :         AddEventToPendingNotifies(n);
     710                 :     }
     711                 : 
     712            1082 :     MemoryContextSwitchTo(oldcontext);
     713                 : }
     714                 : 
     715                 : /*
     716                 :  * queue_listen
     717                 :  *      Common code for listen, unlisten, unlisten all commands.
     718                 :  *
     719                 :  *      Adds the request to the list of pending actions.
     720                 :  *      Actual update of the listenChannels list happens during transaction
     721                 :  *      commit.
     722                 :  */
     723                 : static void
     724              57 : queue_listen(ListenActionKind action, const char *channel)
     725                 : {
     726                 :     MemoryContext oldcontext;
     727                 :     ListenAction *actrec;
     728              57 :     int         my_level = GetCurrentTransactionNestLevel();
     729                 : 
     730                 :     /*
     731                 :      * Unlike Async_Notify, we don't try to collapse out duplicates. It would
     732                 :      * be too complicated to ensure we get the right interactions of
     733                 :      * conflicting LISTEN/UNLISTEN/UNLISTEN_ALL, and it's unlikely that there
     734                 :      * would be any performance benefit anyway in sane applications.
     735                 :      */
     736              57 :     oldcontext = MemoryContextSwitchTo(CurTransactionContext);
     737                 : 
     738                 :     /* space for terminating null is included in sizeof(ListenAction) */
     739              57 :     actrec = (ListenAction *) palloc(offsetof(ListenAction, channel) +
     740              57 :                                      strlen(channel) + 1);
     741              57 :     actrec->action = action;
     742              57 :     strcpy(actrec->channel, channel);
     743                 : 
     744              57 :     if (pendingActions == NULL || my_level > pendingActions->nestingLevel)
     745              50 :     {
     746                 :         ActionList *actions;
     747                 : 
     748                 :         /*
     749                 :          * First action in current sub(xact). Note that we allocate the
     750                 :          * ActionList in TopTransactionContext; the nestingLevel might get
     751                 :          * changed later by AtSubCommit_Notify.
     752                 :          */
     753                 :         actions = (ActionList *)
     754              50 :             MemoryContextAlloc(TopTransactionContext, sizeof(ActionList));
     755              50 :         actions->nestingLevel = my_level;
     756              50 :         actions->actions = list_make1(actrec);
     757              50 :         actions->upper = pendingActions;
     758              50 :         pendingActions = actions;
     759                 :     }
     760                 :     else
     761               7 :         pendingActions->actions = lappend(pendingActions->actions, actrec);
     762                 : 
     763              57 :     MemoryContextSwitchTo(oldcontext);
     764              57 : }
     765                 : 
     766                 : /*
     767                 :  * Async_Listen
     768                 :  *
     769                 :  *      This is executed by the SQL listen command.
     770                 :  */
     771                 : void
     772              37 : Async_Listen(const char *channel)
     773                 : {
     774              37 :     if (Trace_notify)
     775 UBC           0 :         elog(DEBUG1, "Async_Listen(%s,%d)", channel, MyProcPid);
     776                 : 
     777 CBC          37 :     queue_listen(LISTEN_LISTEN, channel);
     778              37 : }
     779                 : 
     780                 : /*
     781                 :  * Async_Unlisten
     782                 :  *
     783                 :  *      This is executed by the SQL unlisten command.
     784                 :  */
     785                 : void
     786               3 : Async_Unlisten(const char *channel)
     787                 : {
     788               3 :     if (Trace_notify)
     789 UBC           0 :         elog(DEBUG1, "Async_Unlisten(%s,%d)", channel, MyProcPid);
     790                 : 
     791                 :     /* If we couldn't possibly be listening, no need to queue anything */
     792 CBC           3 :     if (pendingActions == NULL && !unlistenExitRegistered)
     793 UBC           0 :         return;
     794                 : 
     795 CBC           3 :     queue_listen(LISTEN_UNLISTEN, channel);
     796                 : }
     797                 : 
     798                 : /*
     799                 :  * Async_UnlistenAll
     800                 :  *
     801                 :  *      This is invoked by UNLISTEN * command, and also at backend exit.
     802                 :  */
     803                 : void
     804              19 : Async_UnlistenAll(void)
     805                 : {
     806              19 :     if (Trace_notify)
     807 UBC           0 :         elog(DEBUG1, "Async_UnlistenAll(%d)", MyProcPid);
     808                 : 
     809                 :     /* If we couldn't possibly be listening, no need to queue anything */
     810 CBC          19 :     if (pendingActions == NULL && !unlistenExitRegistered)
     811               2 :         return;
     812                 : 
     813              17 :     queue_listen(LISTEN_UNLISTEN_ALL, "");
     814                 : }
     815                 : 
     816                 : /*
     817                 :  * SQL function: return a set of the channel names this backend is actively
     818                 :  * listening to.
     819                 :  *
     820                 :  * Note: this coding relies on the fact that the listenChannels list cannot
     821                 :  * change within a transaction.
     822                 :  */
     823                 : Datum
     824               9 : pg_listening_channels(PG_FUNCTION_ARGS)
     825                 : {
     826                 :     FuncCallContext *funcctx;
     827                 : 
     828                 :     /* stuff done only on the first call of the function */
     829               9 :     if (SRF_IS_FIRSTCALL())
     830                 :     {
     831                 :         /* create a function context for cross-call persistence */
     832               6 :         funcctx = SRF_FIRSTCALL_INIT();
     833                 :     }
     834                 : 
     835                 :     /* stuff done on every call of the function */
     836               9 :     funcctx = SRF_PERCALL_SETUP();
     837                 : 
     838               9 :     if (funcctx->call_cntr < list_length(listenChannels))
     839                 :     {
     840               3 :         char       *channel = (char *) list_nth(listenChannels,
     841               3 :                                                 funcctx->call_cntr);
     842                 : 
     843               3 :         SRF_RETURN_NEXT(funcctx, CStringGetTextDatum(channel));
     844                 :     }
     845                 : 
     846               6 :     SRF_RETURN_DONE(funcctx);
     847                 : }
     848                 : 
     849                 : /*
     850                 :  * Async_UnlistenOnExit
     851                 :  *
     852                 :  * This is executed at backend exit if we have done any LISTENs in this
     853                 :  * backend.  It might not be necessary anymore, if the user UNLISTENed
     854                 :  * everything, but we don't try to detect that case.
     855                 :  */
     856                 : static void
     857              14 : Async_UnlistenOnExit(int code, Datum arg)
     858                 : {
     859              14 :     Exec_UnlistenAllCommit();
     860              14 :     asyncQueueUnregister();
     861              14 : }
     862                 : 
     863                 : /*
     864                 :  * AtPrepare_Notify
     865                 :  *
     866                 :  *      This is called at the prepare phase of a two-phase
     867                 :  *      transaction.  Save the state for possible commit later.
     868                 :  */
     869                 : void
     870             358 : AtPrepare_Notify(void)
     871                 : {
     872                 :     /* It's not allowed to have any pending LISTEN/UNLISTEN/NOTIFY actions */
     873             358 :     if (pendingActions || pendingNotifies)
     874 UBC           0 :         ereport(ERROR,
     875                 :                 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
     876                 :                  errmsg("cannot PREPARE a transaction that has executed LISTEN, UNLISTEN, or NOTIFY")));
     877 CBC         358 : }
     878                 : 
     879                 : /*
     880                 :  * PreCommit_Notify
     881                 :  *
     882                 :  *      This is called at transaction commit, before actually committing to
     883                 :  *      clog.
     884                 :  *
     885                 :  *      If there are pending LISTEN actions, make sure we are listed in the
     886                 :  *      shared-memory listener array.  This must happen before commit to
     887                 :  *      ensure we don't miss any notifies from transactions that commit
     888                 :  *      just after ours.
     889                 :  *
     890                 :  *      If there are outbound notify requests in the pendingNotifies list,
     891                 :  *      add them to the global queue.  We do that before commit so that
     892                 :  *      we can still throw error if we run out of queue space.
     893                 :  */
     894                 : void
     895          465495 : PreCommit_Notify(void)
     896                 : {
     897                 :     ListCell   *p;
     898                 : 
     899          465495 :     if (!pendingActions && !pendingNotifies)
     900          465398 :         return;                 /* no relevant statements in this xact */
     901                 : 
     902              97 :     if (Trace_notify)
     903 UBC           0 :         elog(DEBUG1, "PreCommit_Notify");
     904                 : 
     905                 :     /* Preflight for any pending listen/unlisten actions */
     906 CBC          97 :     if (pendingActions != NULL)
     907                 :     {
     908             105 :         foreach(p, pendingActions->actions)
     909                 :         {
     910              56 :             ListenAction *actrec = (ListenAction *) lfirst(p);
     911                 : 
     912              56 :             switch (actrec->action)
     913                 :             {
     914              37 :                 case LISTEN_LISTEN:
     915              37 :                     Exec_ListenPreCommit();
     916              37 :                     break;
     917               3 :                 case LISTEN_UNLISTEN:
     918                 :                     /* there is no Exec_UnlistenPreCommit() */
     919               3 :                     break;
     920              16 :                 case LISTEN_UNLISTEN_ALL:
     921                 :                     /* there is no Exec_UnlistenAllPreCommit() */
     922              16 :                     break;
     923                 :             }
     924                 :         }
     925                 :     }
     926                 : 
     927                 :     /* Queue any pending notifies (must happen after the above) */
     928              97 :     if (pendingNotifies)
     929                 :     {
     930                 :         ListCell   *nextNotify;
     931                 : 
     932                 :         /*
     933                 :          * Make sure that we have an XID assigned to the current transaction.
     934                 :          * GetCurrentTransactionId is cheap if we already have an XID, but not
     935                 :          * so cheap if we don't, and we'd prefer not to do that work while
     936                 :          * holding NotifyQueueLock.
     937                 :          */
     938              48 :         (void) GetCurrentTransactionId();
     939                 : 
     940                 :         /*
     941                 :          * Serialize writers by acquiring a special lock that we hold till
     942                 :          * after commit.  This ensures that queue entries appear in commit
     943                 :          * order, and in particular that there are never uncommitted queue
     944                 :          * entries ahead of committed ones, so an uncommitted transaction
     945                 :          * can't block delivery of deliverable notifications.
     946                 :          *
     947                 :          * We use a heavyweight lock so that it'll automatically be released
     948                 :          * after either commit or abort.  This also allows deadlocks to be
     949                 :          * detected, though really a deadlock shouldn't be possible here.
     950                 :          *
     951                 :          * The lock is on "database 0", which is pretty ugly but it doesn't
     952                 :          * seem worth inventing a special locktag category just for this.
     953                 :          * (Historical note: before PG 9.0, a similar lock on "database 0" was
     954                 :          * used by the flatfiles mechanism.)
     955                 :          */
     956              48 :         LockSharedObject(DatabaseRelationId, InvalidOid, 0,
     957                 :                          AccessExclusiveLock);
     958                 : 
     959                 :         /* Now push the notifications into the queue */
     960              48 :         nextNotify = list_head(pendingNotifies->events);
     961             131 :         while (nextNotify != NULL)
     962                 :         {
     963                 :             /*
     964                 :              * Add the pending notifications to the queue.  We acquire and
     965                 :              * release NotifyQueueLock once per page, which might be overkill
     966                 :              * but it does allow readers to get in while we're doing this.
     967                 :              *
     968                 :              * A full queue is very uncommon and should really not happen,
     969                 :              * given that we have so much space available in the SLRU pages.
     970                 :              * Nevertheless we need to deal with this possibility. Note that
     971                 :              * when we get here we are in the process of committing our
     972                 :              * transaction, but we have not yet committed to clog, so at this
     973                 :              * point in time we can still roll the transaction back.
     974                 :              */
     975              83 :             LWLockAcquire(NotifyQueueLock, LW_EXCLUSIVE);
     976              83 :             asyncQueueFillWarning();
     977              83 :             if (asyncQueueIsFull())
     978 UBC           0 :                 ereport(ERROR,
     979                 :                         (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
     980                 :                          errmsg("too many notifications in the NOTIFY queue")));
     981 CBC          83 :             nextNotify = asyncQueueAddEntries(nextNotify);
     982              83 :             LWLockRelease(NotifyQueueLock);
     983                 :         }
     984                 : 
     985                 :         /* Note that we don't clear pendingNotifies; AtCommit_Notify will. */
     986                 :     }
     987                 : }
     988                 : 
     989                 : /*
     990                 :  * AtCommit_Notify
     991                 :  *
     992                 :  *      This is called at transaction commit, after committing to clog.
     993                 :  *
     994                 :  *      Update listenChannels and clear transaction-local state.
     995                 :  *
     996                 :  *      If we issued any notifications in the transaction, send signals to
     997                 :  *      listening backends (possibly including ourselves) to process them.
     998                 :  *      Also, if we filled enough queue pages with new notifies, try to
     999                 :  *      advance the queue tail pointer.
    1000                 :  */
    1001                 : void
    1002          465340 : AtCommit_Notify(void)
    1003                 : {
    1004                 :     ListCell   *p;
    1005                 : 
    1006                 :     /*
    1007                 :      * Allow transactions that have not executed LISTEN/UNLISTEN/NOTIFY to
    1008                 :      * return as soon as possible
    1009                 :      */
    1010          465340 :     if (!pendingActions && !pendingNotifies)
    1011          465243 :         return;
    1012                 : 
    1013              97 :     if (Trace_notify)
    1014 UBC           0 :         elog(DEBUG1, "AtCommit_Notify");
    1015                 : 
    1016                 :     /* Perform any pending listen/unlisten actions */
    1017 CBC          97 :     if (pendingActions != NULL)
    1018                 :     {
    1019             105 :         foreach(p, pendingActions->actions)
    1020                 :         {
    1021              56 :             ListenAction *actrec = (ListenAction *) lfirst(p);
    1022                 : 
    1023              56 :             switch (actrec->action)
    1024                 :             {
    1025              37 :                 case LISTEN_LISTEN:
    1026              37 :                     Exec_ListenCommit(actrec->channel);
    1027              37 :                     break;
    1028               3 :                 case LISTEN_UNLISTEN:
    1029               3 :                     Exec_UnlistenCommit(actrec->channel);
    1030               3 :                     break;
    1031              16 :                 case LISTEN_UNLISTEN_ALL:
    1032              16 :                     Exec_UnlistenAllCommit();
    1033              16 :                     break;
    1034                 :             }
    1035                 :         }
    1036                 :     }
    1037                 : 
    1038                 :     /* If no longer listening to anything, get out of listener array */
    1039              97 :     if (amRegisteredListener && listenChannels == NIL)
    1040              13 :         asyncQueueUnregister();
    1041                 : 
    1042                 :     /*
    1043                 :      * Send signals to listening backends.  We need do this only if there are
    1044                 :      * pending notifies, which were previously added to the shared queue by
    1045                 :      * PreCommit_Notify().
    1046                 :      */
    1047              97 :     if (pendingNotifies != NULL)
    1048              48 :         SignalBackends();
    1049                 : 
    1050                 :     /*
    1051                 :      * If it's time to try to advance the global tail pointer, do that.
    1052                 :      *
    1053                 :      * (It might seem odd to do this in the sender, when more than likely the
    1054                 :      * listeners won't yet have read the messages we just sent.  However,
    1055                 :      * there's less contention if only the sender does it, and there is little
    1056                 :      * need for urgency in advancing the global tail.  So this typically will
    1057                 :      * be clearing out messages that were sent some time ago.)
    1058                 :      */
    1059              97 :     if (tryAdvanceTail)
    1060                 :     {
    1061               8 :         tryAdvanceTail = false;
    1062               8 :         asyncQueueAdvanceTail();
    1063                 :     }
    1064                 : 
    1065                 :     /* And clean up */
    1066              97 :     ClearPendingActionsAndNotifies();
    1067                 : }
    1068                 : 
    1069                 : /*
    1070                 :  * Exec_ListenPreCommit --- subroutine for PreCommit_Notify
    1071                 :  *
    1072                 :  * This function must make sure we are ready to catch any incoming messages.
    1073                 :  */
    1074                 : static void
    1075              37 : Exec_ListenPreCommit(void)
    1076                 : {
    1077                 :     QueuePosition head;
    1078                 :     QueuePosition max;
    1079                 :     BackendId   prevListener;
    1080                 : 
    1081                 :     /*
    1082                 :      * Nothing to do if we are already listening to something, nor if we
    1083                 :      * already ran this routine in this transaction.
    1084                 :      */
    1085              37 :     if (amRegisteredListener)
    1086              18 :         return;
    1087                 : 
    1088              19 :     if (Trace_notify)
    1089 UBC           0 :         elog(DEBUG1, "Exec_ListenPreCommit(%d)", MyProcPid);
    1090                 : 
    1091                 :     /*
    1092                 :      * Before registering, make sure we will unlisten before dying. (Note:
    1093                 :      * this action does not get undone if we abort later.)
    1094                 :      */
    1095 CBC          19 :     if (!unlistenExitRegistered)
    1096                 :     {
    1097              14 :         before_shmem_exit(Async_UnlistenOnExit, 0);
    1098              14 :         unlistenExitRegistered = true;
    1099                 :     }
    1100                 : 
    1101                 :     /*
    1102                 :      * This is our first LISTEN, so establish our pointer.
    1103                 :      *
    1104                 :      * We set our pointer to the global tail pointer and then move it forward
    1105                 :      * over already-committed notifications.  This ensures we cannot miss any
    1106                 :      * not-yet-committed notifications.  We might get a few more but that
    1107                 :      * doesn't hurt.
    1108                 :      *
    1109                 :      * In some scenarios there might be a lot of committed notifications that
    1110                 :      * have not yet been pruned away (because some backend is being lazy about
    1111                 :      * reading them).  To reduce our startup time, we can look at other
    1112                 :      * backends and adopt the maximum "pos" pointer of any backend that's in
    1113                 :      * our database; any notifications it's already advanced over are surely
    1114                 :      * committed and need not be re-examined by us.  (We must consider only
    1115                 :      * backends connected to our DB, because others will not have bothered to
    1116                 :      * check committed-ness of notifications in our DB.)
    1117                 :      *
    1118                 :      * We need exclusive lock here so we can look at other backends' entries
    1119                 :      * and manipulate the list links.
    1120                 :      */
    1121              19 :     LWLockAcquire(NotifyQueueLock, LW_EXCLUSIVE);
    1122              19 :     head = QUEUE_HEAD;
    1123              19 :     max = QUEUE_TAIL;
    1124              19 :     prevListener = InvalidBackendId;
    1125              21 :     for (BackendId i = QUEUE_FIRST_LISTENER; i > 0; i = QUEUE_NEXT_LISTENER(i))
    1126                 :     {
    1127               2 :         if (QUEUE_BACKEND_DBOID(i) == MyDatabaseId)
    1128               2 :             max = QUEUE_POS_MAX(max, QUEUE_BACKEND_POS(i));
    1129                 :         /* Also find last listening backend before this one */
    1130               2 :         if (i < MyBackendId)
    1131               1 :             prevListener = i;
    1132                 :     }
    1133              19 :     QUEUE_BACKEND_POS(MyBackendId) = max;
    1134              19 :     QUEUE_BACKEND_PID(MyBackendId) = MyProcPid;
    1135              19 :     QUEUE_BACKEND_DBOID(MyBackendId) = MyDatabaseId;
    1136                 :     /* Insert backend into list of listeners at correct position */
    1137              19 :     if (prevListener > 0)
    1138                 :     {
    1139               1 :         QUEUE_NEXT_LISTENER(MyBackendId) = QUEUE_NEXT_LISTENER(prevListener);
    1140               1 :         QUEUE_NEXT_LISTENER(prevListener) = MyBackendId;
    1141                 :     }
    1142                 :     else
    1143                 :     {
    1144              18 :         QUEUE_NEXT_LISTENER(MyBackendId) = QUEUE_FIRST_LISTENER;
    1145              18 :         QUEUE_FIRST_LISTENER = MyBackendId;
    1146                 :     }
    1147              19 :     LWLockRelease(NotifyQueueLock);
    1148                 : 
    1149                 :     /* Now we are listed in the global array, so remember we're listening */
    1150              19 :     amRegisteredListener = true;
    1151                 : 
    1152                 :     /*
    1153                 :      * Try to move our pointer forward as far as possible.  This will skip
    1154                 :      * over already-committed notifications, which we want to do because they
    1155                 :      * might be quite stale.  Note that we are not yet listening on anything,
    1156                 :      * so we won't deliver such notifications to our frontend.  Also, although
    1157                 :      * our transaction might have executed NOTIFY, those message(s) aren't
    1158                 :      * queued yet so we won't skip them here.
    1159                 :      */
    1160              19 :     if (!QUEUE_POS_EQUAL(max, head))
    1161              11 :         asyncQueueReadAllNotifications();
    1162                 : }
    1163                 : 
    1164                 : /*
    1165                 :  * Exec_ListenCommit --- subroutine for AtCommit_Notify
    1166                 :  *
    1167                 :  * Add the channel to the list of channels we are listening on.
    1168                 :  */
    1169                 : static void
    1170              37 : Exec_ListenCommit(const char *channel)
    1171                 : {
    1172                 :     MemoryContext oldcontext;
    1173                 : 
    1174                 :     /* Do nothing if we are already listening on this channel */
    1175              37 :     if (IsListeningOn(channel))
    1176              10 :         return;
    1177                 : 
    1178                 :     /*
    1179                 :      * Add the new channel name to listenChannels.
    1180                 :      *
    1181                 :      * XXX It is theoretically possible to get an out-of-memory failure here,
    1182                 :      * which would be bad because we already committed.  For the moment it
    1183                 :      * doesn't seem worth trying to guard against that, but maybe improve this
    1184                 :      * later.
    1185                 :      */
    1186              27 :     oldcontext = MemoryContextSwitchTo(TopMemoryContext);
    1187              27 :     listenChannels = lappend(listenChannels, pstrdup(channel));
    1188              27 :     MemoryContextSwitchTo(oldcontext);
    1189                 : }
    1190                 : 
    1191                 : /*
    1192                 :  * Exec_UnlistenCommit --- subroutine for AtCommit_Notify
    1193                 :  *
    1194                 :  * Remove the specified channel name from listenChannels.
    1195                 :  */
    1196                 : static void
    1197               3 : Exec_UnlistenCommit(const char *channel)
    1198                 : {
    1199                 :     ListCell   *q;
    1200                 : 
    1201               3 :     if (Trace_notify)
    1202 UBC           0 :         elog(DEBUG1, "Exec_UnlistenCommit(%s,%d)", channel, MyProcPid);
    1203                 : 
    1204 CBC           3 :     foreach(q, listenChannels)
    1205                 :     {
    1206               3 :         char       *lchan = (char *) lfirst(q);
    1207                 : 
    1208               3 :         if (strcmp(lchan, channel) == 0)
    1209                 :         {
    1210               3 :             listenChannels = foreach_delete_current(listenChannels, q);
    1211               3 :             pfree(lchan);
    1212               3 :             break;
    1213                 :         }
    1214                 :     }
    1215                 : 
    1216                 :     /*
    1217                 :      * We do not complain about unlistening something not being listened;
    1218                 :      * should we?
    1219                 :      */
    1220               3 : }
    1221                 : 
    1222                 : /*
    1223                 :  * Exec_UnlistenAllCommit --- subroutine for AtCommit_Notify
    1224                 :  *
    1225                 :  *      Unlisten on all channels for this backend.
    1226                 :  */
    1227                 : static void
    1228              30 : Exec_UnlistenAllCommit(void)
    1229                 : {
    1230              30 :     if (Trace_notify)
    1231 UBC           0 :         elog(DEBUG1, "Exec_UnlistenAllCommit(%d)", MyProcPid);
    1232                 : 
    1233 CBC          30 :     list_free_deep(listenChannels);
    1234              30 :     listenChannels = NIL;
    1235              30 : }
    1236                 : 
    1237                 : /*
    1238                 :  * Test whether we are actively listening on the given channel name.
    1239                 :  *
    1240                 :  * Note: this function is executed for every notification found in the queue.
    1241                 :  * Perhaps it is worth further optimization, eg convert the list to a sorted
    1242                 :  * array so we can binary-search it.  In practice the list is likely to be
    1243                 :  * fairly short, though.
    1244                 :  */
    1245                 : static bool
    1246            1196 : IsListeningOn(const char *channel)
    1247                 : {
    1248                 :     ListCell   *p;
    1249                 : 
    1250            1257 :     foreach(p, listenChannels)
    1251                 :     {
    1252             102 :         char       *lchan = (char *) lfirst(p);
    1253                 : 
    1254             102 :         if (strcmp(lchan, channel) == 0)
    1255              41 :             return true;
    1256                 :     }
    1257            1155 :     return false;
    1258                 : }
    1259                 : 
    1260                 : /*
    1261                 :  * Remove our entry from the listeners array when we are no longer listening
    1262                 :  * on any channel.  NB: must not fail if we're already not listening.
    1263                 :  */
    1264                 : static void
    1265              27 : asyncQueueUnregister(void)
    1266                 : {
    1267              27 :     Assert(listenChannels == NIL);  /* else caller error */
    1268                 : 
    1269              27 :     if (!amRegisteredListener)  /* nothing to do */
    1270               8 :         return;
    1271                 : 
    1272                 :     /*
    1273                 :      * Need exclusive lock here to manipulate list links.
    1274                 :      */
    1275              19 :     LWLockAcquire(NotifyQueueLock, LW_EXCLUSIVE);
    1276                 :     /* Mark our entry as invalid */
    1277              19 :     QUEUE_BACKEND_PID(MyBackendId) = InvalidPid;
    1278              19 :     QUEUE_BACKEND_DBOID(MyBackendId) = InvalidOid;
    1279                 :     /* and remove it from the list */
    1280              19 :     if (QUEUE_FIRST_LISTENER == MyBackendId)
    1281              18 :         QUEUE_FIRST_LISTENER = QUEUE_NEXT_LISTENER(MyBackendId);
    1282                 :     else
    1283                 :     {
    1284               1 :         for (BackendId i = QUEUE_FIRST_LISTENER; i > 0; i = QUEUE_NEXT_LISTENER(i))
    1285                 :         {
    1286               1 :             if (QUEUE_NEXT_LISTENER(i) == MyBackendId)
    1287                 :             {
    1288               1 :                 QUEUE_NEXT_LISTENER(i) = QUEUE_NEXT_LISTENER(MyBackendId);
    1289               1 :                 break;
    1290                 :             }
    1291                 :         }
    1292                 :     }
    1293              19 :     QUEUE_NEXT_LISTENER(MyBackendId) = InvalidBackendId;
    1294              19 :     LWLockRelease(NotifyQueueLock);
    1295                 : 
    1296                 :     /* mark ourselves as no longer listed in the global array */
    1297              19 :     amRegisteredListener = false;
    1298                 : }
    1299                 : 
    1300                 : /*
    1301                 :  * Test whether there is room to insert more notification messages.
    1302                 :  *
    1303                 :  * Caller must hold at least shared NotifyQueueLock.
    1304                 :  */
    1305                 : static bool
    1306              83 : asyncQueueIsFull(void)
    1307                 : {
    1308                 :     int         nexthead;
    1309                 :     int         boundary;
    1310                 : 
    1311                 :     /*
    1312                 :      * The queue is full if creating a new head page would create a page that
    1313                 :      * logically precedes the current global tail pointer, ie, the head
    1314                 :      * pointer would wrap around compared to the tail.  We cannot create such
    1315                 :      * a head page for fear of confusing slru.c.  For safety we round the tail
    1316                 :      * pointer back to a segment boundary (truncation logic in
    1317                 :      * asyncQueueAdvanceTail does not do this, so doing it here is optional).
    1318                 :      *
    1319                 :      * Note that this test is *not* dependent on how much space there is on
    1320                 :      * the current head page.  This is necessary because asyncQueueAddEntries
    1321                 :      * might try to create the next head page in any case.
    1322                 :      */
    1323              83 :     nexthead = QUEUE_POS_PAGE(QUEUE_HEAD) + 1;
    1324              83 :     if (nexthead > QUEUE_MAX_PAGE)
    1325 UBC           0 :         nexthead = 0;           /* wrap around */
    1326 CBC          83 :     boundary = QUEUE_STOP_PAGE;
    1327              83 :     boundary -= boundary % SLRU_PAGES_PER_SEGMENT;
    1328              83 :     return asyncQueuePagePrecedes(nexthead, boundary);
    1329                 : }
    1330                 : 
    1331                 : /*
    1332                 :  * Advance the QueuePosition to the next entry, assuming that the current
    1333                 :  * entry is of length entryLength.  If we jump to a new page the function
    1334                 :  * returns true, else false.
    1335                 :  */
    1336                 : static bool
    1337            2301 : asyncQueueAdvance(volatile QueuePosition *position, int entryLength)
    1338                 : {
    1339            2301 :     int         pageno = QUEUE_POS_PAGE(*position);
    1340            2301 :     int         offset = QUEUE_POS_OFFSET(*position);
    1341            2301 :     bool        pageJump = false;
    1342                 : 
    1343                 :     /*
    1344                 :      * Move to the next writing position: First jump over what we have just
    1345                 :      * written or read.
    1346                 :      */
    1347            2301 :     offset += entryLength;
    1348            2301 :     Assert(offset <= QUEUE_PAGESIZE);
    1349                 : 
    1350                 :     /*
    1351                 :      * In a second step check if another entry can possibly be written to the
    1352                 :      * page. If so, stay here, we have reached the next position. If not, then
    1353                 :      * we need to move on to the next page.
    1354                 :      */
    1355            2301 :     if (offset + QUEUEALIGN(AsyncQueueEntryEmptySize) > QUEUE_PAGESIZE)
    1356                 :     {
    1357              70 :         pageno++;
    1358              70 :         if (pageno > QUEUE_MAX_PAGE)
    1359 UBC           0 :             pageno = 0;         /* wrap around */
    1360 CBC          70 :         offset = 0;
    1361              70 :         pageJump = true;
    1362                 :     }
    1363                 : 
    1364            2301 :     SET_QUEUE_POS(*position, pageno, offset);
    1365            2301 :     return pageJump;
    1366                 : }
    1367                 : 
    1368                 : /*
    1369                 :  * Fill the AsyncQueueEntry at *qe with an outbound notification message.
    1370                 :  */
    1371                 : static void
    1372            1109 : asyncQueueNotificationToEntry(Notification *n, AsyncQueueEntry *qe)
    1373                 : {
    1374            1109 :     size_t      channellen = n->channel_len;
    1375            1109 :     size_t      payloadlen = n->payload_len;
    1376                 :     int         entryLength;
    1377                 : 
    1378            1109 :     Assert(channellen < NAMEDATALEN);
    1379            1109 :     Assert(payloadlen < NOTIFY_PAYLOAD_MAX_LENGTH);
    1380                 : 
    1381                 :     /* The terminators are already included in AsyncQueueEntryEmptySize */
    1382            1109 :     entryLength = AsyncQueueEntryEmptySize + payloadlen + channellen;
    1383            1109 :     entryLength = QUEUEALIGN(entryLength);
    1384            1109 :     qe->length = entryLength;
    1385            1109 :     qe->dboid = MyDatabaseId;
    1386            1109 :     qe->xid = GetCurrentTransactionId();
    1387            1109 :     qe->srcPid = MyProcPid;
    1388            1109 :     memcpy(qe->data, n->data, channellen + payloadlen + 2);
    1389            1109 : }
    1390                 : 
    1391                 : /*
    1392                 :  * Add pending notifications to the queue.
    1393                 :  *
    1394                 :  * We go page by page here, i.e. we stop once we have to go to a new page but
    1395                 :  * we will be called again and then fill that next page. If an entry does not
    1396                 :  * fit into the current page, we write a dummy entry with an InvalidOid as the
    1397                 :  * database OID in order to fill the page. So every page is always used up to
    1398                 :  * the last byte which simplifies reading the page later.
    1399                 :  *
    1400                 :  * We are passed the list cell (in pendingNotifies->events) containing the next
    1401                 :  * notification to write and return the first still-unwritten cell back.
    1402                 :  * Eventually we will return NULL indicating all is done.
    1403                 :  *
    1404                 :  * We are holding NotifyQueueLock already from the caller and grab
    1405                 :  * NotifySLRULock locally in this function.
    1406                 :  */
    1407                 : static ListCell *
    1408              83 : asyncQueueAddEntries(ListCell *nextNotify)
    1409                 : {
    1410                 :     AsyncQueueEntry qe;
    1411                 :     QueuePosition queue_head;
    1412                 :     int         pageno;
    1413                 :     int         offset;
    1414                 :     int         slotno;
    1415                 : 
    1416                 :     /* We hold both NotifyQueueLock and NotifySLRULock during this operation */
    1417              83 :     LWLockAcquire(NotifySLRULock, LW_EXCLUSIVE);
    1418                 : 
    1419                 :     /*
    1420                 :      * We work with a local copy of QUEUE_HEAD, which we write back to shared
    1421                 :      * memory upon exiting.  The reason for this is that if we have to advance
    1422                 :      * to a new page, SimpleLruZeroPage might fail (out of disk space, for
    1423                 :      * instance), and we must not advance QUEUE_HEAD if it does.  (Otherwise,
    1424                 :      * subsequent insertions would try to put entries into a page that slru.c
    1425                 :      * thinks doesn't exist yet.)  So, use a local position variable.  Note
    1426                 :      * that if we do fail, any already-inserted queue entries are forgotten;
    1427                 :      * this is okay, since they'd be useless anyway after our transaction
    1428                 :      * rolls back.
    1429                 :      */
    1430              83 :     queue_head = QUEUE_HEAD;
    1431                 : 
    1432                 :     /*
    1433                 :      * If this is the first write since the postmaster started, we need to
    1434                 :      * initialize the first page of the async SLRU.  Otherwise, the current
    1435                 :      * page should be initialized already, so just fetch it.
    1436                 :      *
    1437                 :      * (We could also take the first path when the SLRU position has just
    1438                 :      * wrapped around, but re-zeroing the page is harmless in that case.)
    1439                 :      */
    1440              83 :     pageno = QUEUE_POS_PAGE(queue_head);
    1441              83 :     if (QUEUE_POS_IS_ZERO(queue_head))
    1442               7 :         slotno = SimpleLruZeroPage(NotifyCtl, pageno);
    1443                 :     else
    1444              76 :         slotno = SimpleLruReadPage(NotifyCtl, pageno, true,
    1445                 :                                    InvalidTransactionId);
    1446                 : 
    1447                 :     /* Note we mark the page dirty before writing in it */
    1448              83 :     NotifyCtl->shared->page_dirty[slotno] = true;
    1449                 : 
    1450            1157 :     while (nextNotify != NULL)
    1451                 :     {
    1452            1109 :         Notification *n = (Notification *) lfirst(nextNotify);
    1453                 : 
    1454                 :         /* Construct a valid queue entry in local variable qe */
    1455            1109 :         asyncQueueNotificationToEntry(n, &qe);
    1456                 : 
    1457            1109 :         offset = QUEUE_POS_OFFSET(queue_head);
    1458                 : 
    1459                 :         /* Check whether the entry really fits on the current page */
    1460            1109 :         if (offset + qe.length <= QUEUE_PAGESIZE)
    1461                 :         {
    1462                 :             /* OK, so advance nextNotify past this item */
    1463            1076 :             nextNotify = lnext(pendingNotifies->events, nextNotify);
    1464                 :         }
    1465                 :         else
    1466                 :         {
    1467                 :             /*
    1468                 :              * Write a dummy entry to fill up the page. Actually readers will
    1469                 :              * only check dboid and since it won't match any reader's database
    1470                 :              * OID, they will ignore this entry and move on.
    1471                 :              */
    1472              33 :             qe.length = QUEUE_PAGESIZE - offset;
    1473              33 :             qe.dboid = InvalidOid;
    1474              33 :             qe.data[0] = '\0';  /* empty channel */
    1475              33 :             qe.data[1] = '\0';  /* empty payload */
    1476                 :         }
    1477                 : 
    1478                 :         /* Now copy qe into the shared buffer page */
    1479            1109 :         memcpy(NotifyCtl->shared->page_buffer[slotno] + offset,
    1480                 :                &qe,
    1481            1109 :                qe.length);
    1482                 : 
    1483                 :         /* Advance queue_head appropriately, and detect if page is full */
    1484            1109 :         if (asyncQueueAdvance(&(queue_head), qe.length))
    1485                 :         {
    1486                 :             /*
    1487                 :              * Page is full, so we're done here, but first fill the next page
    1488                 :              * with zeroes.  The reason to do this is to ensure that slru.c's
    1489                 :              * idea of the head page is always the same as ours, which avoids
    1490                 :              * boundary problems in SimpleLruTruncate.  The test in
    1491                 :              * asyncQueueIsFull() ensured that there is room to create this
    1492                 :              * page without overrunning the queue.
    1493                 :              */
    1494              35 :             slotno = SimpleLruZeroPage(NotifyCtl, QUEUE_POS_PAGE(queue_head));
    1495                 : 
    1496                 :             /*
    1497                 :              * If the new page address is a multiple of QUEUE_CLEANUP_DELAY,
    1498                 :              * set flag to remember that we should try to advance the tail
    1499                 :              * pointer (we don't want to actually do that right here).
    1500                 :              */
    1501              35 :             if (QUEUE_POS_PAGE(queue_head) % QUEUE_CLEANUP_DELAY == 0)
    1502               8 :                 tryAdvanceTail = true;
    1503                 : 
    1504                 :             /* And exit the loop */
    1505              35 :             break;
    1506                 :         }
    1507                 :     }
    1508                 : 
    1509                 :     /* Success, so update the global QUEUE_HEAD */
    1510              83 :     QUEUE_HEAD = queue_head;
    1511                 : 
    1512              83 :     LWLockRelease(NotifySLRULock);
    1513                 : 
    1514              83 :     return nextNotify;
    1515                 : }
    1516                 : 
    1517                 : /*
    1518                 :  * SQL function to return the fraction of the notification queue currently
    1519                 :  * occupied.
    1520                 :  */
    1521                 : Datum
    1522               5 : pg_notification_queue_usage(PG_FUNCTION_ARGS)
    1523                 : {
    1524                 :     double      usage;
    1525                 : 
    1526                 :     /* Advance the queue tail so we don't report a too-large result */
    1527               5 :     asyncQueueAdvanceTail();
    1528                 : 
    1529               5 :     LWLockAcquire(NotifyQueueLock, LW_SHARED);
    1530               5 :     usage = asyncQueueUsage();
    1531               5 :     LWLockRelease(NotifyQueueLock);
    1532                 : 
    1533               5 :     PG_RETURN_FLOAT8(usage);
    1534                 : }
    1535                 : 
    1536                 : /*
    1537                 :  * Return the fraction of the queue that is currently occupied.
    1538                 :  *
    1539                 :  * The caller must hold NotifyQueueLock in (at least) shared mode.
    1540                 :  *
    1541                 :  * Note: we measure the distance to the logical tail page, not the physical
    1542                 :  * tail page.  In some sense that's wrong, but the relative position of the
    1543                 :  * physical tail is affected by details such as SLRU segment boundaries,
    1544                 :  * so that a result based on that is unpleasantly unstable.
    1545                 :  */
    1546                 : static double
    1547              88 : asyncQueueUsage(void)
    1548                 : {
    1549              88 :     int         headPage = QUEUE_POS_PAGE(QUEUE_HEAD);
    1550              88 :     int         tailPage = QUEUE_POS_PAGE(QUEUE_TAIL);
    1551                 :     int         occupied;
    1552                 : 
    1553              88 :     occupied = headPage - tailPage;
    1554                 : 
    1555              88 :     if (occupied == 0)
    1556              41 :         return (double) 0;      /* fast exit for common case */
    1557                 : 
    1558              47 :     if (occupied < 0)
    1559                 :     {
    1560                 :         /* head has wrapped around, tail not yet */
    1561 UBC           0 :         occupied += QUEUE_MAX_PAGE + 1;
    1562                 :     }
    1563                 : 
    1564 CBC          47 :     return (double) occupied / (double) ((QUEUE_MAX_PAGE + 1) / 2);
    1565                 : }
    1566                 : 
    1567                 : /*
    1568                 :  * Check whether the queue is at least half full, and emit a warning if so.
    1569                 :  *
    1570                 :  * This is unlikely given the size of the queue, but possible.
    1571                 :  * The warnings show up at most once every QUEUE_FULL_WARN_INTERVAL.
    1572                 :  *
    1573                 :  * Caller must hold exclusive NotifyQueueLock.
    1574                 :  */
    1575                 : static void
    1576              83 : asyncQueueFillWarning(void)
    1577                 : {
    1578                 :     double      fillDegree;
    1579                 :     TimestampTz t;
    1580                 : 
    1581              83 :     fillDegree = asyncQueueUsage();
    1582              83 :     if (fillDegree < 0.5)
    1583              83 :         return;
    1584                 : 
    1585 UBC           0 :     t = GetCurrentTimestamp();
    1586                 : 
    1587               0 :     if (TimestampDifferenceExceeds(asyncQueueControl->lastQueueFillWarn,
    1588                 :                                    t, QUEUE_FULL_WARN_INTERVAL))
    1589                 :     {
    1590               0 :         QueuePosition min = QUEUE_HEAD;
    1591               0 :         int32       minPid = InvalidPid;
    1592                 : 
    1593               0 :         for (BackendId i = QUEUE_FIRST_LISTENER; i > 0; i = QUEUE_NEXT_LISTENER(i))
    1594                 :         {
    1595               0 :             Assert(QUEUE_BACKEND_PID(i) != InvalidPid);
    1596               0 :             min = QUEUE_POS_MIN(min, QUEUE_BACKEND_POS(i));
    1597               0 :             if (QUEUE_POS_EQUAL(min, QUEUE_BACKEND_POS(i)))
    1598               0 :                 minPid = QUEUE_BACKEND_PID(i);
    1599                 :         }
    1600                 : 
    1601               0 :         ereport(WARNING,
    1602                 :                 (errmsg("NOTIFY queue is %.0f%% full", fillDegree * 100),
    1603                 :                  (minPid != InvalidPid ?
    1604                 :                   errdetail("The server process with PID %d is among those with the oldest transactions.", minPid)
    1605                 :                   : 0),
    1606                 :                  (minPid != InvalidPid ?
    1607                 :                   errhint("The NOTIFY queue cannot be emptied until that process ends its current transaction.")
    1608                 :                   : 0)));
    1609                 : 
    1610               0 :         asyncQueueControl->lastQueueFillWarn = t;
    1611                 :     }
    1612                 : }
    1613                 : 
    1614                 : /*
    1615                 :  * Send signals to listening backends.
    1616                 :  *
    1617                 :  * Normally we signal only backends in our own database, since only those
    1618                 :  * backends could be interested in notifies we send.  However, if there's
    1619                 :  * notify traffic in our database but no traffic in another database that
    1620                 :  * does have listener(s), those listeners will fall further and further
    1621                 :  * behind.  Waken them anyway if they're far enough behind, so that they'll
    1622                 :  * advance their queue position pointers, allowing the global tail to advance.
    1623                 :  *
    1624                 :  * Since we know the BackendId and the Pid the signaling is quite cheap.
    1625                 :  *
    1626                 :  * This is called during CommitTransaction(), so it's important for it
    1627                 :  * to have very low probability of failure.
    1628                 :  */
    1629                 : static void
    1630 CBC          48 : SignalBackends(void)
    1631                 : {
    1632                 :     int32      *pids;
    1633                 :     BackendId  *ids;
    1634                 :     int         count;
    1635                 : 
    1636                 :     /*
    1637                 :      * Identify backends that we need to signal.  We don't want to send
    1638                 :      * signals while holding the NotifyQueueLock, so this loop just builds a
    1639                 :      * list of target PIDs.
    1640                 :      *
    1641                 :      * XXX in principle these pallocs could fail, which would be bad. Maybe
    1642                 :      * preallocate the arrays?  They're not that large, though.
    1643                 :      */
    1644              48 :     pids = (int32 *) palloc(MaxBackends * sizeof(int32));
    1645              48 :     ids = (BackendId *) palloc(MaxBackends * sizeof(BackendId));
    1646              48 :     count = 0;
    1647                 : 
    1648              48 :     LWLockAcquire(NotifyQueueLock, LW_EXCLUSIVE);
    1649              88 :     for (BackendId i = QUEUE_FIRST_LISTENER; i > 0; i = QUEUE_NEXT_LISTENER(i))
    1650                 :     {
    1651              40 :         int32       pid = QUEUE_BACKEND_PID(i);
    1652                 :         QueuePosition pos;
    1653                 : 
    1654              40 :         Assert(pid != InvalidPid);
    1655              40 :         pos = QUEUE_BACKEND_POS(i);
    1656              40 :         if (QUEUE_BACKEND_DBOID(i) == MyDatabaseId)
    1657                 :         {
    1658                 :             /*
    1659                 :              * Always signal listeners in our own database, unless they're
    1660                 :              * already caught up (unlikely, but possible).
    1661                 :              */
    1662              40 :             if (QUEUE_POS_EQUAL(pos, QUEUE_HEAD))
    1663 UBC           0 :                 continue;
    1664                 :         }
    1665                 :         else
    1666                 :         {
    1667                 :             /*
    1668                 :              * Listeners in other databases should be signaled only if they
    1669                 :              * are far behind.
    1670                 :              */
    1671               0 :             if (asyncQueuePageDiff(QUEUE_POS_PAGE(QUEUE_HEAD),
    1672                 :                                    QUEUE_POS_PAGE(pos)) < QUEUE_CLEANUP_DELAY)
    1673               0 :                 continue;
    1674                 :         }
    1675                 :         /* OK, need to signal this one */
    1676 CBC          40 :         pids[count] = pid;
    1677              40 :         ids[count] = i;
    1678              40 :         count++;
    1679                 :     }
    1680              48 :     LWLockRelease(NotifyQueueLock);
    1681                 : 
    1682                 :     /* Now send signals */
    1683              88 :     for (int i = 0; i < count; i++)
    1684                 :     {
    1685              40 :         int32       pid = pids[i];
    1686                 : 
    1687                 :         /*
    1688                 :          * If we are signaling our own process, no need to involve the kernel;
    1689                 :          * just set the flag directly.
    1690                 :          */
    1691              40 :         if (pid == MyProcPid)
    1692                 :         {
    1693              20 :             notifyInterruptPending = true;
    1694              20 :             continue;
    1695                 :         }
    1696                 : 
    1697                 :         /*
    1698                 :          * Note: assuming things aren't broken, a signal failure here could
    1699                 :          * only occur if the target backend exited since we released
    1700                 :          * NotifyQueueLock; which is unlikely but certainly possible. So we
    1701                 :          * just log a low-level debug message if it happens.
    1702                 :          */
    1703              20 :         if (SendProcSignal(pid, PROCSIG_NOTIFY_INTERRUPT, ids[i]) < 0)
    1704 UBC           0 :             elog(DEBUG3, "could not signal backend with PID %d: %m", pid);
    1705                 :     }
    1706                 : 
    1707 CBC          48 :     pfree(pids);
    1708              48 :     pfree(ids);
    1709              48 : }
    1710                 : 
    1711                 : /*
    1712                 :  * AtAbort_Notify
    1713                 :  *
    1714                 :  *  This is called at transaction abort.
    1715                 :  *
    1716                 :  *  Gets rid of pending actions and outbound notifies that we would have
    1717                 :  *  executed if the transaction got committed.
    1718                 :  */
    1719                 : void
    1720           20099 : AtAbort_Notify(void)
    1721                 : {
    1722                 :     /*
    1723                 :      * If we LISTEN but then roll back the transaction after PreCommit_Notify,
    1724                 :      * we have registered as a listener but have not made any entry in
    1725                 :      * listenChannels.  In that case, deregister again.
    1726                 :      */
    1727           20099 :     if (amRegisteredListener && listenChannels == NIL)
    1728 UBC           0 :         asyncQueueUnregister();
    1729                 : 
    1730                 :     /* And clean up */
    1731 CBC       20099 :     ClearPendingActionsAndNotifies();
    1732           20099 : }
    1733                 : 
    1734                 : /*
    1735                 :  * AtSubCommit_Notify() --- Take care of subtransaction commit.
    1736                 :  *
    1737                 :  * Reassign all items in the pending lists to the parent transaction.
    1738                 :  */
    1739                 : void
    1740            4317 : AtSubCommit_Notify(void)
    1741                 : {
    1742            4317 :     int         my_level = GetCurrentTransactionNestLevel();
    1743                 : 
    1744                 :     /* If there are actions at our nesting level, we must reparent them. */
    1745            4317 :     if (pendingActions != NULL &&
    1746 UBC           0 :         pendingActions->nestingLevel >= my_level)
    1747                 :     {
    1748               0 :         if (pendingActions->upper == NULL ||
    1749               0 :             pendingActions->upper->nestingLevel < my_level - 1)
    1750                 :         {
    1751                 :             /* nothing to merge; give the whole thing to the parent */
    1752               0 :             --pendingActions->nestingLevel;
    1753                 :         }
    1754                 :         else
    1755                 :         {
    1756               0 :             ActionList *childPendingActions = pendingActions;
    1757                 : 
    1758               0 :             pendingActions = pendingActions->upper;
    1759                 : 
    1760                 :             /*
    1761                 :              * Mustn't try to eliminate duplicates here --- see queue_listen()
    1762                 :              */
    1763               0 :             pendingActions->actions =
    1764               0 :                 list_concat(pendingActions->actions,
    1765               0 :                             childPendingActions->actions);
    1766               0 :             pfree(childPendingActions);
    1767                 :         }
    1768                 :     }
    1769                 : 
    1770                 :     /* If there are notifies at our nesting level, we must reparent them. */
    1771 CBC        4317 :     if (pendingNotifies != NULL &&
    1772               2 :         pendingNotifies->nestingLevel >= my_level)
    1773                 :     {
    1774               1 :         Assert(pendingNotifies->nestingLevel == my_level);
    1775                 : 
    1776               1 :         if (pendingNotifies->upper == NULL ||
    1777               1 :             pendingNotifies->upper->nestingLevel < my_level - 1)
    1778                 :         {
    1779                 :             /* nothing to merge; give the whole thing to the parent */
    1780 UBC           0 :             --pendingNotifies->nestingLevel;
    1781                 :         }
    1782                 :         else
    1783                 :         {
    1784                 :             /*
    1785                 :              * Formerly, we didn't bother to eliminate duplicates here, but
    1786                 :              * now we must, else we fall foul of "Assert(!found)", either here
    1787                 :              * or during a later attempt to build the parent-level hashtable.
    1788                 :              */
    1789 CBC           1 :             NotificationList *childPendingNotifies = pendingNotifies;
    1790                 :             ListCell   *l;
    1791                 : 
    1792               1 :             pendingNotifies = pendingNotifies->upper;
    1793                 :             /* Insert all the subxact's events into parent, except for dups */
    1794               5 :             foreach(l, childPendingNotifies->events)
    1795                 :             {
    1796               4 :                 Notification *childn = (Notification *) lfirst(l);
    1797                 : 
    1798               4 :                 if (!AsyncExistsPendingNotify(childn))
    1799               2 :                     AddEventToPendingNotifies(childn);
    1800                 :             }
    1801               1 :             pfree(childPendingNotifies);
    1802                 :         }
    1803                 :     }
    1804            4317 : }
    1805                 : 
    1806                 : /*
    1807                 :  * AtSubAbort_Notify() --- Take care of subtransaction abort.
    1808                 :  */
    1809                 : void
    1810            4468 : AtSubAbort_Notify(void)
    1811                 : {
    1812            4468 :     int         my_level = GetCurrentTransactionNestLevel();
    1813                 : 
    1814                 :     /*
    1815                 :      * All we have to do is pop the stack --- the actions/notifies made in
    1816                 :      * this subxact are no longer interesting, and the space will be freed
    1817                 :      * when CurTransactionContext is recycled. We still have to free the
    1818                 :      * ActionList and NotificationList objects themselves, though, because
    1819                 :      * those are allocated in TopTransactionContext.
    1820                 :      *
    1821                 :      * Note that there might be no entries at all, or no entries for the
    1822                 :      * current subtransaction level, either because none were ever created, or
    1823                 :      * because we reentered this routine due to trouble during subxact abort.
    1824                 :      */
    1825            4468 :     while (pendingActions != NULL &&
    1826 UBC           0 :            pendingActions->nestingLevel >= my_level)
    1827                 :     {
    1828               0 :         ActionList *childPendingActions = pendingActions;
    1829                 : 
    1830               0 :         pendingActions = pendingActions->upper;
    1831               0 :         pfree(childPendingActions);
    1832                 :     }
    1833                 : 
    1834 CBC        4469 :     while (pendingNotifies != NULL &&
    1835               2 :            pendingNotifies->nestingLevel >= my_level)
    1836                 :     {
    1837               1 :         NotificationList *childPendingNotifies = pendingNotifies;
    1838                 : 
    1839               1 :         pendingNotifies = pendingNotifies->upper;
    1840               1 :         pfree(childPendingNotifies);
    1841                 :     }
    1842            4468 : }
    1843                 : 
    1844                 : /*
    1845                 :  * HandleNotifyInterrupt
    1846                 :  *
    1847                 :  *      Signal handler portion of interrupt handling. Let the backend know
    1848                 :  *      that there's a pending notify interrupt. If we're currently reading
    1849                 :  *      from the client, this will interrupt the read and
    1850                 :  *      ProcessClientReadInterrupt() will call ProcessNotifyInterrupt().
    1851                 :  */
    1852                 : void
    1853              20 : HandleNotifyInterrupt(void)
    1854                 : {
    1855                 :     /*
    1856                 :      * Note: this is called by a SIGNAL HANDLER. You must be very wary what
    1857                 :      * you do here.
    1858                 :      */
    1859                 : 
    1860                 :     /* signal that work needs to be done */
    1861              20 :     notifyInterruptPending = true;
    1862                 : 
    1863                 :     /* make sure the event is processed in due course */
    1864              20 :     SetLatch(MyLatch);
    1865              20 : }
    1866                 : 
    1867                 : /*
    1868                 :  * ProcessNotifyInterrupt
    1869                 :  *
    1870                 :  *      This is called if we see notifyInterruptPending set, just before
    1871                 :  *      transmitting ReadyForQuery at the end of a frontend command, and
    1872                 :  *      also if a notify signal occurs while reading from the frontend.
    1873                 :  *      HandleNotifyInterrupt() will cause the read to be interrupted
    1874                 :  *      via the process's latch, and this routine will get called.
    1875                 :  *      If we are truly idle (ie, *not* inside a transaction block),
    1876                 :  *      process the incoming notifies.
    1877                 :  *
    1878                 :  *      If "flush" is true, force any frontend messages out immediately.
    1879                 :  *      This can be false when being called at the end of a frontend command,
    1880                 :  *      since we'll flush after sending ReadyForQuery.
    1881                 :  */
    1882                 : void
    1883              95 : ProcessNotifyInterrupt(bool flush)
    1884                 : {
    1885              95 :     if (IsTransactionOrTransactionBlock())
    1886              56 :         return;                 /* not really idle */
    1887                 : 
    1888                 :     /* Loop in case another signal arrives while sending messages */
    1889              78 :     while (notifyInterruptPending)
    1890              39 :         ProcessIncomingNotify(flush);
    1891                 : }
    1892                 : 
    1893                 : 
    1894                 : /*
    1895                 :  * Read all pending notifications from the queue, and deliver appropriate
    1896                 :  * ones to my frontend.  Stop when we reach queue head or an uncommitted
    1897                 :  * notification.
    1898                 :  */
    1899                 : static void
    1900              50 : asyncQueueReadAllNotifications(void)
    1901                 : {
    1902                 :     volatile QueuePosition pos;
    1903                 :     QueuePosition head;
    1904                 :     Snapshot    snapshot;
    1905                 : 
    1906                 :     /* page_buffer must be adequately aligned, so use a union */
    1907                 :     union
    1908                 :     {
    1909                 :         char        buf[QUEUE_PAGESIZE];
    1910                 :         AsyncQueueEntry align;
    1911                 :     }           page_buffer;
    1912                 : 
    1913                 :     /* Fetch current state */
    1914              50 :     LWLockAcquire(NotifyQueueLock, LW_SHARED);
    1915                 :     /* Assert checks that we have a valid state entry */
    1916              50 :     Assert(MyProcPid == QUEUE_BACKEND_PID(MyBackendId));
    1917              50 :     pos = QUEUE_BACKEND_POS(MyBackendId);
    1918              50 :     head = QUEUE_HEAD;
    1919              50 :     LWLockRelease(NotifyQueueLock);
    1920                 : 
    1921              50 :     if (QUEUE_POS_EQUAL(pos, head))
    1922                 :     {
    1923                 :         /* Nothing to do, we have read all notifications already. */
    1924 UBC           0 :         return;
    1925                 :     }
    1926                 : 
    1927                 :     /*----------
    1928                 :      * Get snapshot we'll use to decide which xacts are still in progress.
    1929                 :      * This is trickier than it might seem, because of race conditions.
    1930                 :      * Consider the following example:
    1931                 :      *
    1932                 :      * Backend 1:                    Backend 2:
    1933                 :      *
    1934                 :      * transaction starts
    1935                 :      * UPDATE foo SET ...;
    1936                 :      * NOTIFY foo;
    1937                 :      * commit starts
    1938                 :      * queue the notify message
    1939                 :      *                               transaction starts
    1940                 :      *                               LISTEN foo;  -- first LISTEN in session
    1941                 :      *                               SELECT * FROM foo WHERE ...;
    1942                 :      * commit to clog
    1943                 :      *                               commit starts
    1944                 :      *                               add backend 2 to array of listeners
    1945                 :      *                               advance to queue head (this code)
    1946                 :      *                               commit to clog
    1947                 :      *
    1948                 :      * Transaction 2's SELECT has not seen the UPDATE's effects, since that
    1949                 :      * wasn't committed yet.  Ideally we'd ensure that client 2 would
    1950                 :      * eventually get transaction 1's notify message, but there's no way
    1951                 :      * to do that; until we're in the listener array, there's no guarantee
    1952                 :      * that the notify message doesn't get removed from the queue.
    1953                 :      *
    1954                 :      * Therefore the coding technique transaction 2 is using is unsafe:
    1955                 :      * applications must commit a LISTEN before inspecting database state,
    1956                 :      * if they want to ensure they will see notifications about subsequent
    1957                 :      * changes to that state.
    1958                 :      *
    1959                 :      * What we do guarantee is that we'll see all notifications from
    1960                 :      * transactions committing after the snapshot we take here.
    1961                 :      * Exec_ListenPreCommit has already added us to the listener array,
    1962                 :      * so no not-yet-committed messages can be removed from the queue
    1963                 :      * before we see them.
    1964                 :      *----------
    1965                 :      */
    1966 CBC          50 :     snapshot = RegisterSnapshot(GetLatestSnapshot());
    1967                 : 
    1968                 :     /*
    1969                 :      * It is possible that we fail while trying to send a message to our
    1970                 :      * frontend (for example, because of encoding conversion failure).  If
    1971                 :      * that happens it is critical that we not try to send the same message
    1972                 :      * over and over again.  Therefore, we place a PG_TRY block here that will
    1973                 :      * forcibly advance our queue position before we lose control to an error.
    1974                 :      * (We could alternatively retake NotifyQueueLock and move the position
    1975                 :      * before handling each individual message, but that seems like too much
    1976                 :      * lock traffic.)
    1977                 :      */
    1978              50 :     PG_TRY();
    1979                 :     {
    1980                 :         bool        reachedStop;
    1981                 : 
    1982                 :         do
    1983                 :         {
    1984              85 :             int         curpage = QUEUE_POS_PAGE(pos);
    1985              85 :             int         curoffset = QUEUE_POS_OFFSET(pos);
    1986                 :             int         slotno;
    1987                 :             int         copysize;
    1988                 : 
    1989                 :             /*
    1990                 :              * We copy the data from SLRU into a local buffer, so as to avoid
    1991                 :              * holding the NotifySLRULock while we are examining the entries
    1992                 :              * and possibly transmitting them to our frontend.  Copy only the
    1993                 :              * part of the page we will actually inspect.
    1994                 :              */
    1995              85 :             slotno = SimpleLruReadPage_ReadOnly(NotifyCtl, curpage,
    1996                 :                                                 InvalidTransactionId);
    1997              85 :             if (curpage == QUEUE_POS_PAGE(head))
    1998                 :             {
    1999                 :                 /* we only want to read as far as head */
    2000              50 :                 copysize = QUEUE_POS_OFFSET(head) - curoffset;
    2001              50 :                 if (copysize < 0)
    2002 UBC           0 :                     copysize = 0;   /* just for safety */
    2003                 :             }
    2004                 :             else
    2005                 :             {
    2006                 :                 /* fetch all the rest of the page */
    2007 CBC          35 :                 copysize = QUEUE_PAGESIZE - curoffset;
    2008                 :             }
    2009              85 :             memcpy(page_buffer.buf + curoffset,
    2010              85 :                    NotifyCtl->shared->page_buffer[slotno] + curoffset,
    2011                 :                    copysize);
    2012                 :             /* Release lock that we got from SimpleLruReadPage_ReadOnly() */
    2013              85 :             LWLockRelease(NotifySLRULock);
    2014                 : 
    2015                 :             /*
    2016                 :              * Process messages up to the stop position, end of page, or an
    2017                 :              * uncommitted message.
    2018                 :              *
    2019                 :              * Our stop position is what we found to be the head's position
    2020                 :              * when we entered this function. It might have changed already.
    2021                 :              * But if it has, we will receive (or have already received and
    2022                 :              * queued) another signal and come here again.
    2023                 :              *
    2024                 :              * We are not holding NotifyQueueLock here! The queue can only
    2025                 :              * extend beyond the head pointer (see above) and we leave our
    2026                 :              * backend's pointer where it is so nobody will truncate or
    2027                 :              * rewrite pages under us. Especially we don't want to hold a lock
    2028                 :              * while sending the notifications to the frontend.
    2029                 :              */
    2030              85 :             reachedStop = asyncQueueProcessPageEntries(&pos, head,
    2031                 :                                                        page_buffer.buf,
    2032                 :                                                        snapshot);
    2033              85 :         } while (!reachedStop);
    2034                 :     }
    2035 UBC           0 :     PG_FINALLY();
    2036                 :     {
    2037                 :         /* Update shared state */
    2038 CBC          50 :         LWLockAcquire(NotifyQueueLock, LW_SHARED);
    2039              50 :         QUEUE_BACKEND_POS(MyBackendId) = pos;
    2040              50 :         LWLockRelease(NotifyQueueLock);
    2041                 :     }
    2042              50 :     PG_END_TRY();
    2043                 : 
    2044                 :     /* Done with snapshot */
    2045              50 :     UnregisterSnapshot(snapshot);
    2046                 : }
    2047                 : 
    2048                 : /*
    2049                 :  * Fetch notifications from the shared queue, beginning at position current,
    2050                 :  * and deliver relevant ones to my frontend.
    2051                 :  *
    2052                 :  * The current page must have been fetched into page_buffer from shared
    2053                 :  * memory.  (We could access the page right in shared memory, but that
    2054                 :  * would imply holding the NotifySLRULock throughout this routine.)
    2055                 :  *
    2056                 :  * We stop if we reach the "stop" position, or reach a notification from an
    2057                 :  * uncommitted transaction, or reach the end of the page.
    2058                 :  *
    2059                 :  * The function returns true once we have reached the stop position or an
    2060                 :  * uncommitted notification, and false if we have finished with the page.
    2061                 :  * In other words: once it returns true there is no need to look further.
    2062                 :  * The QueuePosition *current is advanced past all processed messages.
    2063                 :  */
    2064                 : static bool
    2065              85 : asyncQueueProcessPageEntries(volatile QueuePosition *current,
    2066                 :                              QueuePosition stop,
    2067                 :                              char *page_buffer,
    2068                 :                              Snapshot snapshot)
    2069                 : {
    2070              85 :     bool        reachedStop = false;
    2071                 :     bool        reachedEndOfPage;
    2072                 :     AsyncQueueEntry *qe;
    2073                 : 
    2074                 :     do
    2075                 :     {
    2076            1242 :         QueuePosition thisentry = *current;
    2077                 : 
    2078            1242 :         if (QUEUE_POS_EQUAL(thisentry, stop))
    2079              50 :             break;
    2080                 : 
    2081            1192 :         qe = (AsyncQueueEntry *) (page_buffer + QUEUE_POS_OFFSET(thisentry));
    2082                 : 
    2083                 :         /*
    2084                 :          * Advance *current over this message, possibly to the next page. As
    2085                 :          * noted in the comments for asyncQueueReadAllNotifications, we must
    2086                 :          * do this before possibly failing while processing the message.
    2087                 :          */
    2088            1192 :         reachedEndOfPage = asyncQueueAdvance(current, qe->length);
    2089                 : 
    2090                 :         /* Ignore messages destined for other databases */
    2091            1192 :         if (qe->dboid == MyDatabaseId)
    2092                 :         {
    2093            1159 :             if (XidInMVCCSnapshot(qe->xid, snapshot))
    2094                 :             {
    2095                 :                 /*
    2096                 :                  * The source transaction is still in progress, so we can't
    2097                 :                  * process this message yet.  Break out of the loop, but first
    2098                 :                  * back up *current so we will reprocess the message next
    2099                 :                  * time.  (Note: it is unlikely but not impossible for
    2100                 :                  * TransactionIdDidCommit to fail, so we can't really avoid
    2101                 :                  * this advance-then-back-up behavior when dealing with an
    2102                 :                  * uncommitted message.)
    2103                 :                  *
    2104                 :                  * Note that we must test XidInMVCCSnapshot before we test
    2105                 :                  * TransactionIdDidCommit, else we might return a message from
    2106                 :                  * a transaction that is not yet visible to snapshots; compare
    2107                 :                  * the comments at the head of heapam_visibility.c.
    2108                 :                  *
    2109                 :                  * Also, while our own xact won't be listed in the snapshot,
    2110                 :                  * we need not check for TransactionIdIsCurrentTransactionId
    2111                 :                  * because our transaction cannot (yet) have queued any
    2112                 :                  * messages.
    2113                 :                  */
    2114 UBC           0 :                 *current = thisentry;
    2115               0 :                 reachedStop = true;
    2116               0 :                 break;
    2117                 :             }
    2118 CBC        1159 :             else if (TransactionIdDidCommit(qe->xid))
    2119                 :             {
    2120                 :                 /* qe->data is the null-terminated channel name */
    2121            1159 :                 char       *channel = qe->data;
    2122                 : 
    2123            1159 :                 if (IsListeningOn(channel))
    2124                 :                 {
    2125                 :                     /* payload follows channel name */
    2126              31 :                     char       *payload = qe->data + strlen(channel) + 1;
    2127                 : 
    2128              31 :                     NotifyMyFrontEnd(channel, payload, qe->srcPid);
    2129                 :                 }
    2130                 :             }
    2131                 :             else
    2132                 :             {
    2133                 :                 /*
    2134                 :                  * The source transaction aborted or crashed, so we just
    2135                 :                  * ignore its notifications.
    2136                 :                  */
    2137                 :             }
    2138                 :         }
    2139                 : 
    2140                 :         /* Loop back if we're not at end of page */
    2141            1192 :     } while (!reachedEndOfPage);
    2142                 : 
    2143              85 :     if (QUEUE_POS_EQUAL(*current, stop))
    2144              50 :         reachedStop = true;
    2145                 : 
    2146              85 :     return reachedStop;
    2147                 : }
    2148                 : 
    2149                 : /*
    2150                 :  * Advance the shared queue tail variable to the minimum of all the
    2151                 :  * per-backend tail pointers.  Truncate pg_notify space if possible.
    2152                 :  *
    2153                 :  * This is (usually) called during CommitTransaction(), so it's important for
    2154                 :  * it to have very low probability of failure.
    2155                 :  */
    2156                 : static void
    2157              13 : asyncQueueAdvanceTail(void)
    2158                 : {
    2159                 :     QueuePosition min;
    2160                 :     int         oldtailpage;
    2161                 :     int         newtailpage;
    2162                 :     int         boundary;
    2163                 : 
    2164                 :     /* Restrict task to one backend per cluster; see SimpleLruTruncate(). */
    2165              13 :     LWLockAcquire(NotifyQueueTailLock, LW_EXCLUSIVE);
    2166                 : 
    2167                 :     /*
    2168                 :      * Compute the new tail.  Pre-v13, it's essential that QUEUE_TAIL be exact
    2169                 :      * (ie, exactly match at least one backend's queue position), so it must
    2170                 :      * be updated atomically with the actual computation.  Since v13, we could
    2171                 :      * get away with not doing it like that, but it seems prudent to keep it
    2172                 :      * so.
    2173                 :      *
    2174                 :      * Also, because incoming backends will scan forward from QUEUE_TAIL, that
    2175                 :      * must be advanced before we can truncate any data.  Thus, QUEUE_TAIL is
    2176                 :      * the logical tail, while QUEUE_STOP_PAGE is the physical tail, or oldest
    2177                 :      * un-truncated page.  When QUEUE_STOP_PAGE != QUEUE_POS_PAGE(QUEUE_TAIL),
    2178                 :      * there are pages we can truncate but haven't yet finished doing so.
    2179                 :      *
    2180                 :      * For concurrency's sake, we don't want to hold NotifyQueueLock while
    2181                 :      * performing SimpleLruTruncate.  This is OK because no backend will try
    2182                 :      * to access the pages we are in the midst of truncating.
    2183                 :      */
    2184              13 :     LWLockAcquire(NotifyQueueLock, LW_EXCLUSIVE);
    2185              13 :     min = QUEUE_HEAD;
    2186              23 :     for (BackendId i = QUEUE_FIRST_LISTENER; i > 0; i = QUEUE_NEXT_LISTENER(i))
    2187                 :     {
    2188              10 :         Assert(QUEUE_BACKEND_PID(i) != InvalidPid);
    2189              10 :         min = QUEUE_POS_MIN(min, QUEUE_BACKEND_POS(i));
    2190                 :     }
    2191              13 :     QUEUE_TAIL = min;
    2192              13 :     oldtailpage = QUEUE_STOP_PAGE;
    2193              13 :     LWLockRelease(NotifyQueueLock);
    2194                 : 
    2195                 :     /*
    2196                 :      * We can truncate something if the global tail advanced across an SLRU
    2197                 :      * segment boundary.
    2198                 :      *
    2199                 :      * XXX it might be better to truncate only once every several segments, to
    2200                 :      * reduce the number of directory scans.
    2201                 :      */
    2202              13 :     newtailpage = QUEUE_POS_PAGE(min);
    2203              13 :     boundary = newtailpage - (newtailpage % SLRU_PAGES_PER_SEGMENT);
    2204              13 :     if (asyncQueuePagePrecedes(oldtailpage, boundary))
    2205                 :     {
    2206                 :         /*
    2207                 :          * SimpleLruTruncate() will ask for NotifySLRULock but will also
    2208                 :          * release the lock again.
    2209                 :          */
    2210 UBC           0 :         SimpleLruTruncate(NotifyCtl, newtailpage);
    2211                 : 
    2212                 :         /*
    2213                 :          * Update QUEUE_STOP_PAGE.  This changes asyncQueueIsFull()'s verdict
    2214                 :          * for the segment immediately prior to the old tail, allowing fresh
    2215                 :          * data into that segment.
    2216                 :          */
    2217               0 :         LWLockAcquire(NotifyQueueLock, LW_EXCLUSIVE);
    2218               0 :         QUEUE_STOP_PAGE = newtailpage;
    2219               0 :         LWLockRelease(NotifyQueueLock);
    2220                 :     }
    2221                 : 
    2222 CBC          13 :     LWLockRelease(NotifyQueueTailLock);
    2223              13 : }
    2224                 : 
    2225                 : /*
    2226                 :  * ProcessIncomingNotify
    2227                 :  *
    2228                 :  *      Scan the queue for arriving notifications and report them to the front
    2229                 :  *      end.  The notifications might be from other sessions, or our own;
    2230                 :  *      there's no need to distinguish here.
    2231                 :  *
    2232                 :  *      If "flush" is true, force any frontend messages out immediately.
    2233                 :  *
    2234                 :  *      NOTE: since we are outside any transaction, we must create our own.
    2235                 :  */
    2236                 : static void
    2237              39 : ProcessIncomingNotify(bool flush)
    2238                 : {
    2239                 :     /* We *must* reset the flag */
    2240              39 :     notifyInterruptPending = false;
    2241                 : 
    2242                 :     /* Do nothing else if we aren't actively listening */
    2243              39 :     if (listenChannels == NIL)
    2244 UBC           0 :         return;
    2245                 : 
    2246 CBC          39 :     if (Trace_notify)
    2247 UBC           0 :         elog(DEBUG1, "ProcessIncomingNotify");
    2248                 : 
    2249 CBC          39 :     set_ps_display("notify interrupt");
    2250                 : 
    2251                 :     /*
    2252                 :      * We must run asyncQueueReadAllNotifications inside a transaction, else
    2253                 :      * bad things happen if it gets an error.
    2254                 :      */
    2255              39 :     StartTransactionCommand();
    2256                 : 
    2257              39 :     asyncQueueReadAllNotifications();
    2258                 : 
    2259              39 :     CommitTransactionCommand();
    2260                 : 
    2261                 :     /*
    2262                 :      * If this isn't an end-of-command case, we must flush the notify messages
    2263                 :      * to ensure frontend gets them promptly.
    2264                 :      */
    2265              39 :     if (flush)
    2266              10 :         pq_flush();
    2267                 : 
    2268              39 :     set_ps_display("idle");
    2269                 : 
    2270              39 :     if (Trace_notify)
    2271 UBC           0 :         elog(DEBUG1, "ProcessIncomingNotify: done");
    2272                 : }
    2273                 : 
    2274                 : /*
    2275                 :  * Send NOTIFY message to my front end.
    2276                 :  */
    2277                 : void
    2278 CBC          31 : NotifyMyFrontEnd(const char *channel, const char *payload, int32 srcPid)
    2279                 : {
    2280              31 :     if (whereToSendOutput == DestRemote)
    2281                 :     {
    2282                 :         StringInfoData buf;
    2283                 : 
    2284              31 :         pq_beginmessage(&buf, 'A');
    2285              31 :         pq_sendint32(&buf, srcPid);
    2286              31 :         pq_sendstring(&buf, channel);
    2287              31 :         pq_sendstring(&buf, payload);
    2288              31 :         pq_endmessage(&buf);
    2289                 : 
    2290                 :         /*
    2291                 :          * NOTE: we do not do pq_flush() here.  Some level of caller will
    2292                 :          * handle it later, allowing this message to be combined into a packet
    2293                 :          * with other ones.
    2294                 :          */
    2295                 :     }
    2296                 :     else
    2297 UBC           0 :         elog(INFO, "NOTIFY for \"%s\" payload \"%s\"", channel, payload);
    2298 CBC          31 : }
    2299                 : 
    2300                 : /* Does pendingNotifies include a match for the given event? */
    2301                 : static bool
    2302            1048 : AsyncExistsPendingNotify(Notification *n)
    2303                 : {
    2304            1048 :     if (pendingNotifies == NULL)
    2305 UBC           0 :         return false;
    2306                 : 
    2307 CBC        1048 :     if (pendingNotifies->hashtab != NULL)
    2308                 :     {
    2309                 :         /* Use the hash table to probe for a match */
    2310             983 :         if (hash_search(pendingNotifies->hashtab,
    2311                 :                         &n,
    2312                 :                         HASH_FIND,
    2313                 :                         NULL))
    2314 UBC           0 :             return true;
    2315                 :     }
    2316                 :     else
    2317                 :     {
    2318                 :         /* Must scan the event list */
    2319                 :         ListCell   *l;
    2320                 : 
    2321 CBC         271 :         foreach(l, pendingNotifies->events)
    2322                 :         {
    2323             220 :             Notification *oldn = (Notification *) lfirst(l);
    2324                 : 
    2325             220 :             if (n->channel_len == oldn->channel_len &&
    2326             220 :                 n->payload_len == oldn->payload_len &&
    2327             125 :                 memcmp(n->data, oldn->data,
    2328             125 :                        n->channel_len + n->payload_len + 2) == 0)
    2329              14 :                 return true;
    2330                 :         }
    2331                 :     }
    2332                 : 
    2333            1034 :     return false;
    2334                 : }
    2335                 : 
    2336                 : /*
    2337                 :  * Add a notification event to a pre-existing pendingNotifies list.
    2338                 :  *
    2339                 :  * Because pendingNotifies->events is already nonempty, this works
    2340                 :  * correctly no matter what CurrentMemoryContext is.
    2341                 :  */
    2342                 : static void
    2343            1034 : AddEventToPendingNotifies(Notification *n)
    2344                 : {
    2345            1034 :     Assert(pendingNotifies->events != NIL);
    2346                 : 
    2347                 :     /* Create the hash table if it's time to */
    2348            1034 :     if (list_length(pendingNotifies->events) >= MIN_HASHABLE_NOTIFIES &&
    2349             984 :         pendingNotifies->hashtab == NULL)
    2350                 :     {
    2351                 :         HASHCTL     hash_ctl;
    2352                 :         ListCell   *l;
    2353                 : 
    2354                 :         /* Create the hash table */
    2355               1 :         hash_ctl.keysize = sizeof(Notification *);
    2356               1 :         hash_ctl.entrysize = sizeof(NotificationHash);
    2357               1 :         hash_ctl.hash = notification_hash;
    2358               1 :         hash_ctl.match = notification_match;
    2359               1 :         hash_ctl.hcxt = CurTransactionContext;
    2360               2 :         pendingNotifies->hashtab =
    2361               1 :             hash_create("Pending Notifies",
    2362                 :                         256L,
    2363                 :                         &hash_ctl,
    2364                 :                         HASH_ELEM | HASH_FUNCTION | HASH_COMPARE | HASH_CONTEXT);
    2365                 : 
    2366                 :         /* Insert all the already-existing events */
    2367              17 :         foreach(l, pendingNotifies->events)
    2368                 :         {
    2369              16 :             Notification *oldn = (Notification *) lfirst(l);
    2370                 :             NotificationHash *hentry;
    2371                 :             bool        found;
    2372                 : 
    2373              16 :             hentry = (NotificationHash *) hash_search(pendingNotifies->hashtab,
    2374                 :                                                       &oldn,
    2375                 :                                                       HASH_ENTER,
    2376                 :                                                       &found);
    2377              16 :             Assert(!found);
    2378              16 :             hentry->event = oldn;
    2379                 :         }
    2380                 :     }
    2381                 : 
    2382                 :     /* Add new event to the list, in order */
    2383            1034 :     pendingNotifies->events = lappend(pendingNotifies->events, n);
    2384                 : 
    2385                 :     /* Add event to the hash table if needed */
    2386            1034 :     if (pendingNotifies->hashtab != NULL)
    2387                 :     {
    2388                 :         NotificationHash *hentry;
    2389                 :         bool        found;
    2390                 : 
    2391             984 :         hentry = (NotificationHash *) hash_search(pendingNotifies->hashtab,
    2392                 :                                                   &n,
    2393                 :                                                   HASH_ENTER,
    2394                 :                                                   &found);
    2395             984 :         Assert(!found);
    2396             984 :         hentry->event = n;
    2397                 :     }
    2398            1034 : }
    2399                 : 
    2400                 : /*
    2401                 :  * notification_hash: hash function for notification hash table
    2402                 :  *
    2403                 :  * The hash "keys" are pointers to Notification structs.
    2404                 :  */
    2405                 : static uint32
    2406            1983 : notification_hash(const void *key, Size keysize)
    2407                 : {
    2408            1983 :     const Notification *k = *(const Notification *const *) key;
    2409                 : 
    2410            1983 :     Assert(keysize == sizeof(Notification *));
    2411                 :     /* We don't bother to include the payload's trailing null in the hash */
    2412            1983 :     return DatumGetUInt32(hash_any((const unsigned char *) k->data,
    2413            1983 :                                    k->channel_len + k->payload_len + 1));
    2414                 : }
    2415                 : 
    2416                 : /*
    2417                 :  * notification_match: match function to use with notification_hash
    2418                 :  */
    2419                 : static int
    2420 UBC           0 : notification_match(const void *key1, const void *key2, Size keysize)
    2421                 : {
    2422               0 :     const Notification *k1 = *(const Notification *const *) key1;
    2423               0 :     const Notification *k2 = *(const Notification *const *) key2;
    2424                 : 
    2425               0 :     Assert(keysize == sizeof(Notification *));
    2426               0 :     if (k1->channel_len == k2->channel_len &&
    2427               0 :         k1->payload_len == k2->payload_len &&
    2428               0 :         memcmp(k1->data, k2->data,
    2429               0 :                k1->channel_len + k1->payload_len + 2) == 0)
    2430               0 :         return 0;               /* equal */
    2431               0 :     return 1;                   /* not equal */
    2432                 : }
    2433                 : 
    2434                 : /* Clear the pendingActions and pendingNotifies lists. */
    2435                 : static void
    2436 CBC       20196 : ClearPendingActionsAndNotifies(void)
    2437                 : {
    2438                 :     /*
    2439                 :      * Everything's allocated in either TopTransactionContext or the context
    2440                 :      * for the subtransaction to which it corresponds.  So, there's nothing to
    2441                 :      * do here except reset the pointers; the space will be reclaimed when the
    2442                 :      * contexts are deleted.
    2443                 :      */
    2444           20196 :     pendingActions = NULL;
    2445           20196 :     pendingNotifies = NULL;
    2446           20196 : }
        

Generated by: LCOV version v1.16-55-g56c0a2a