LCOV - differential code coverage report
Current view: top level - src/backend/replication/logical - applyparallelworker.c (source / functions) Coverage Total Hit UNC GNC
Current: Differential Code Coverage HEAD vs 15 Lines: 88.1 % 429 378 51 378
Current Date: 2023-04-08 15:15:32 Functions: 100.0 % 36 36 36
Baseline: 15
Baseline Date: 2023-04-08 15:09:40
Legend: Lines: hit not hit

           TLA  Line data    Source code
       1                 : /*-------------------------------------------------------------------------
       2                 :  * applyparallelworker.c
       3                 :  *     Support routines for applying xact by parallel apply worker
       4                 :  *
       5                 :  * Copyright (c) 2023, PostgreSQL Global Development Group
       6                 :  *
       7                 :  * IDENTIFICATION
       8                 :  *    src/backend/replication/logical/applyparallelworker.c
       9                 :  *
      10                 :  * This file contains the code to launch, set up, and teardown a parallel apply
      11                 :  * worker which receives the changes from the leader worker and invokes routines
      12                 :  * to apply those on the subscriber database. Additionally, this file contains
      13                 :  * routines that are intended to support setting up, using, and tearing down a
      14                 :  * ParallelApplyWorkerInfo which is required so the leader worker and parallel
      15                 :  * apply workers can communicate with each other.
      16                 :  *
      17                 :  * The parallel apply workers are assigned (if available) as soon as xact's
      18                 :  * first stream is received for subscriptions that have set their 'streaming'
      19                 :  * option as parallel. The leader apply worker will send changes to this new
      20                 :  * worker via shared memory. We keep this worker assigned till the transaction
      21                 :  * commit is received and also wait for the worker to finish at commit. This
      22                 :  * preserves commit ordering and avoid file I/O in most cases, although we
      23                 :  * still need to spill to a file if there is no worker available. See comments
      24                 :  * atop logical/worker to know more about streamed xacts whose changes are
      25                 :  * spilled to disk. It is important to maintain commit order to avoid failures
      26                 :  * due to: (a) transaction dependencies - say if we insert a row in the first
      27                 :  * transaction and update it in the second transaction on publisher then
      28                 :  * allowing the subscriber to apply both in parallel can lead to failure in the
      29                 :  * update; (b) deadlocks - allowing transactions that update the same set of
      30                 :  * rows/tables in the opposite order to be applied in parallel can lead to
      31                 :  * deadlocks.
      32                 :  *
      33                 :  * A worker pool is used to avoid restarting workers for each streaming
      34                 :  * transaction. We maintain each worker's information (ParallelApplyWorkerInfo)
      35                 :  * in the ParallelApplyWorkerPool. After successfully launching a new worker,
      36                 :  * its information is added to the ParallelApplyWorkerPool. Once the worker
      37                 :  * finishes applying the transaction, it is marked as available for re-use.
      38                 :  * Now, before starting a new worker to apply the streaming transaction, we
      39                 :  * check the list for any available worker. Note that we retain a maximum of
      40                 :  * half the max_parallel_apply_workers_per_subscription workers in the pool and
      41                 :  * after that, we simply exit the worker after applying the transaction.
      42                 :  *
      43                 :  * XXX This worker pool threshold is arbitrary and we can provide a GUC
      44                 :  * variable for this in the future if required.
      45                 :  *
      46                 :  * The leader apply worker will create a separate dynamic shared memory segment
      47                 :  * when each parallel apply worker starts. The reason for this design is that
      48                 :  * we cannot predict how many workers will be needed. It may be possible to
      49                 :  * allocate enough shared memory in one segment based on the maximum number of
      50                 :  * parallel apply workers (max_parallel_apply_workers_per_subscription), but
      51                 :  * this would waste memory if no process is actually started.
      52                 :  *
      53                 :  * The dynamic shared memory segment contains: (a) a shm_mq that is used to
      54                 :  * send changes in the transaction from leader apply worker to parallel apply
      55                 :  * worker; (b) another shm_mq that is used to send errors (and other messages
      56                 :  * reported via elog/ereport) from the parallel apply worker to leader apply
      57                 :  * worker; (c) necessary information to be shared among parallel apply workers
      58                 :  * and the leader apply worker (i.e. members of ParallelApplyWorkerShared).
      59                 :  *
      60                 :  * Locking Considerations
      61                 :  * ----------------------
      62                 :  * We have a risk of deadlock due to concurrently applying the transactions in
      63                 :  * parallel mode that were independent on the publisher side but became
      64                 :  * dependent on the subscriber side due to the different database structures
      65                 :  * (like schema of subscription tables, constraints, etc.) on each side. This
      66                 :  * can happen even without parallel mode when there are concurrent operations
      67                 :  * on the subscriber. In order to detect the deadlocks among leader (LA) and
      68                 :  * parallel apply (PA) workers, we used lmgr locks when the PA waits for the
      69                 :  * next stream (set of changes) and LA waits for PA to finish the transaction.
      70                 :  * An alternative approach could be to not allow parallelism when the schema of
      71                 :  * tables is different between the publisher and subscriber but that would be
      72                 :  * too restrictive and would require the publisher to send much more
      73                 :  * information than it is currently sending.
      74                 :  *
      75                 :  * Consider a case where the subscribed table does not have a unique key on the
      76                 :  * publisher and has a unique key on the subscriber. The deadlock can happen in
      77                 :  * the following ways:
      78                 :  *
      79                 :  * 1) Deadlock between the leader apply worker and a parallel apply worker
      80                 :  *
      81                 :  * Consider that the parallel apply worker (PA) is executing TX-1 and the
      82                 :  * leader apply worker (LA) is executing TX-2 concurrently on the subscriber.
      83                 :  * Now, LA is waiting for PA because of the unique key constraint of the
      84                 :  * subscribed table while PA is waiting for LA to send the next stream of
      85                 :  * changes or transaction finish command message.
      86                 :  *
      87                 :  * In order for lmgr to detect this, we have LA acquire a session lock on the
      88                 :  * remote transaction (by pa_lock_stream()) and have PA wait on the lock before
      89                 :  * trying to receive the next stream of changes. Specifically, LA will acquire
      90                 :  * the lock in AccessExclusive mode before sending the STREAM_STOP and will
      91                 :  * release it if already acquired after sending the STREAM_START, STREAM_ABORT
      92                 :  * (for toplevel transaction), STREAM_PREPARE, and STREAM_COMMIT. The PA will
      93                 :  * acquire the lock in AccessShare mode after processing STREAM_STOP and
      94                 :  * STREAM_ABORT (for subtransaction) and then release the lock immediately
      95                 :  * after acquiring it.
      96                 :  *
      97                 :  * The lock graph for the above example will look as follows:
      98                 :  * LA (waiting to acquire the lock on the unique index) -> PA (waiting to
      99                 :  * acquire the stream lock) -> LA
     100                 :  *
     101                 :  * This way, when PA is waiting for LA for the next stream of changes, we can
     102                 :  * have a wait-edge from PA to LA in lmgr, which will make us detect the
     103                 :  * deadlock between LA and PA.
     104                 :  *
     105                 :  * 2) Deadlock between the leader apply worker and parallel apply workers
     106                 :  *
     107                 :  * This scenario is similar to the first case but TX-1 and TX-2 are executed by
     108                 :  * two parallel apply workers (PA-1 and PA-2 respectively). In this scenario,
     109                 :  * PA-2 is waiting for PA-1 to complete its transaction while PA-1 is waiting
     110                 :  * for subsequent input from LA. Also, LA is waiting for PA-2 to complete its
     111                 :  * transaction in order to preserve the commit order. There is a deadlock among
     112                 :  * the three processes.
     113                 :  *
     114                 :  * In order for lmgr to detect this, we have PA acquire a session lock (this is
     115                 :  * a different lock than referred in the previous case, see
     116                 :  * pa_lock_transaction()) on the transaction being applied and have LA wait on
     117                 :  * the lock before proceeding in the transaction finish commands. Specifically,
     118                 :  * PA will acquire this lock in AccessExclusive mode before executing the first
     119                 :  * message of the transaction and release it at the xact end. LA will acquire
     120                 :  * this lock in AccessShare mode at transaction finish commands and release it
     121                 :  * immediately.
     122                 :  *
     123                 :  * The lock graph for the above example will look as follows:
     124                 :  * LA (waiting to acquire the transaction lock) -> PA-2 (waiting to acquire the
     125                 :  * lock due to unique index constraint) -> PA-1 (waiting to acquire the stream
     126                 :  * lock) -> LA
     127                 :  *
     128                 :  * This way when LA is waiting to finish the transaction end command to preserve
     129                 :  * the commit order, we will be able to detect deadlock, if any.
     130                 :  *
     131                 :  * One might think we can use XactLockTableWait(), but XactLockTableWait()
     132                 :  * considers PREPARED TRANSACTION as still in progress which means the lock
     133                 :  * won't be released even after the parallel apply worker has prepared the
     134                 :  * transaction.
     135                 :  *
     136                 :  * 3) Deadlock when the shm_mq buffer is full
     137                 :  *
     138                 :  * In the previous scenario (ie. PA-1 and PA-2 are executing transactions
     139                 :  * concurrently), if the shm_mq buffer between LA and PA-2 is full, LA has to
     140                 :  * wait to send messages, and this wait doesn't appear in lmgr.
     141                 :  *
     142                 :  * To avoid this wait, we use a non-blocking write and wait with a timeout. If
     143                 :  * the timeout is exceeded, the LA will serialize all the pending messages to
     144                 :  * a file and indicate PA-2 that it needs to read that file for the remaining
     145                 :  * messages. Then LA will start waiting for commit as in the previous case
     146                 :  * which will detect deadlock if any. See pa_send_data() and
     147                 :  * enum TransApplyAction.
     148                 :  *
     149                 :  * Lock types
     150                 :  * ----------
     151                 :  * Both the stream lock and the transaction lock mentioned above are
     152                 :  * session-level locks because both locks could be acquired outside the
     153                 :  * transaction, and the stream lock in the leader needs to persist across
     154                 :  * transaction boundaries i.e. until the end of the streaming transaction.
     155                 :  *-------------------------------------------------------------------------
     156                 :  */
     157                 : 
     158                 : #include "postgres.h"
     159                 : 
     160                 : #include "libpq/pqformat.h"
     161                 : #include "libpq/pqmq.h"
     162                 : #include "pgstat.h"
     163                 : #include "postmaster/interrupt.h"
     164                 : #include "replication/logicallauncher.h"
     165                 : #include "replication/logicalworker.h"
     166                 : #include "replication/origin.h"
     167                 : #include "replication/worker_internal.h"
     168                 : #include "storage/ipc.h"
     169                 : #include "storage/lmgr.h"
     170                 : #include "tcop/tcopprot.h"
     171                 : #include "utils/inval.h"
     172                 : #include "utils/memutils.h"
     173                 : #include "utils/syscache.h"
     174                 : 
     175                 : #define PG_LOGICAL_APPLY_SHM_MAGIC 0x787ca067
     176                 : 
     177                 : /*
     178                 :  * DSM keys for parallel apply worker. Unlike other parallel execution code,
     179                 :  * since we don't need to worry about DSM keys conflicting with plan_node_id we
     180                 :  * can use small integers.
     181                 :  */
     182                 : #define PARALLEL_APPLY_KEY_SHARED       1
     183                 : #define PARALLEL_APPLY_KEY_MQ           2
     184                 : #define PARALLEL_APPLY_KEY_ERROR_QUEUE  3
     185                 : 
     186                 : /* Queue size of DSM, 16 MB for now. */
     187                 : #define DSM_QUEUE_SIZE  (16 * 1024 * 1024)
     188                 : 
     189                 : /*
     190                 :  * Error queue size of DSM. It is desirable to make it large enough that a
     191                 :  * typical ErrorResponse can be sent without blocking. That way, a worker that
     192                 :  * errors out can write the whole message into the queue and terminate without
     193                 :  * waiting for the user backend.
     194                 :  */
     195                 : #define DSM_ERROR_QUEUE_SIZE            (16 * 1024)
     196                 : 
     197                 : /*
     198                 :  * There are three fields in each message received by the parallel apply
     199                 :  * worker: start_lsn, end_lsn and send_time. Because we have updated these
     200                 :  * statistics in the leader apply worker, we can ignore these fields in the
     201                 :  * parallel apply worker (see function LogicalRepApplyLoop).
     202                 :  */
     203                 : #define SIZE_STATS_MESSAGE (2 * sizeof(XLogRecPtr) + sizeof(TimestampTz))
     204                 : 
     205                 : /*
     206                 :  * The type of session-level lock on a transaction being applied on a logical
     207                 :  * replication subscriber.
     208                 :  */
     209                 : #define PARALLEL_APPLY_LOCK_STREAM  0
     210                 : #define PARALLEL_APPLY_LOCK_XACT    1
     211                 : 
     212                 : /*
     213                 :  * Hash table entry to map xid to the parallel apply worker state.
     214                 :  */
     215                 : typedef struct ParallelApplyWorkerEntry
     216                 : {
     217                 :     TransactionId xid;          /* Hash key -- must be first */
     218                 :     ParallelApplyWorkerInfo *winfo;
     219                 : } ParallelApplyWorkerEntry;
     220                 : 
     221                 : /*
     222                 :  * A hash table used to cache the state of streaming transactions being applied
     223                 :  * by the parallel apply workers.
     224                 :  */
     225                 : static HTAB *ParallelApplyTxnHash = NULL;
     226                 : 
     227                 : /*
     228                 : * A list (pool) of active parallel apply workers. The information for
     229                 : * the new worker is added to the list after successfully launching it. The
     230                 : * list entry is removed if there are already enough workers in the worker
     231                 : * pool at the end of the transaction. For more information about the worker
     232                 : * pool, see comments atop this file.
     233                 :  */
     234                 : static List *ParallelApplyWorkerPool = NIL;
     235                 : 
     236                 : /*
     237                 :  * Information shared between leader apply worker and parallel apply worker.
     238                 :  */
     239                 : ParallelApplyWorkerShared *MyParallelShared = NULL;
     240                 : 
     241                 : /*
     242                 :  * Is there a message sent by a parallel apply worker that the leader apply
     243                 :  * worker needs to receive?
     244                 :  */
     245                 : volatile sig_atomic_t ParallelApplyMessagePending = false;
     246                 : 
     247                 : /*
     248                 :  * Cache the parallel apply worker information required for applying the
     249                 :  * current streaming transaction. It is used to save the cost of searching the
     250                 :  * hash table when applying the changes between STREAM_START and STREAM_STOP.
     251                 :  */
     252                 : static ParallelApplyWorkerInfo *stream_apply_worker = NULL;
     253                 : 
     254                 : /* A list to maintain subtransactions, if any. */
     255                 : static List *subxactlist = NIL;
     256                 : 
     257                 : static void pa_free_worker_info(ParallelApplyWorkerInfo *winfo);
     258                 : static ParallelTransState pa_get_xact_state(ParallelApplyWorkerShared *wshared);
     259                 : static PartialFileSetState pa_get_fileset_state(void);
     260                 : 
     261                 : /*
     262                 :  * Returns true if it is OK to start a parallel apply worker, false otherwise.
     263                 :  */
     264                 : static bool
     265 GNC          82 : pa_can_start(void)
     266                 : {
     267                 :     /* Only leader apply workers can start parallel apply workers. */
     268              82 :     if (!am_leader_apply_worker())
     269              27 :         return false;
     270                 : 
     271                 :     /*
     272                 :      * It is good to check for any change in the subscription parameter to
     273                 :      * avoid the case where for a very long time the change doesn't get
     274                 :      * reflected. This can happen when there is a constant flow of streaming
     275                 :      * transactions that are handled by parallel apply workers.
     276                 :      *
     277                 :      * It is better to do it before the below checks so that the latest values
     278                 :      * of subscription can be used for the checks.
     279                 :      */
     280              55 :     maybe_reread_subscription();
     281                 : 
     282                 :     /*
     283                 :      * Don't start a new parallel apply worker if the subscription is not
     284                 :      * using parallel streaming mode, or if the publisher does not support
     285                 :      * parallel apply.
     286                 :      */
     287              55 :     if (!MyLogicalRepWorker->parallel_apply)
     288              28 :         return false;
     289                 : 
     290                 :     /*
     291                 :      * Don't start a new parallel worker if user has set skiplsn as it's
     292                 :      * possible that they want to skip the streaming transaction. For
     293                 :      * streaming transactions, we need to serialize the transaction to a file
     294                 :      * so that we can get the last LSN of the transaction to judge whether to
     295                 :      * skip before starting to apply the change.
     296                 :      *
     297                 :      * One might think that we could allow parallelism if the first lsn of the
     298                 :      * transaction is greater than skiplsn, but we don't send it with the
     299                 :      * STREAM START message, and it doesn't seem worth sending the extra eight
     300                 :      * bytes with the STREAM START to enable parallelism for this case.
     301                 :      */
     302              27 :     if (!XLogRecPtrIsInvalid(MySubscription->skiplsn))
     303 UNC           0 :         return false;
     304                 : 
     305                 :     /*
     306                 :      * For streaming transactions that are being applied using a parallel
     307                 :      * apply worker, we cannot decide whether to apply the change for a
     308                 :      * relation that is not in the READY state (see
     309                 :      * should_apply_changes_for_rel) as we won't know remote_final_lsn by that
     310                 :      * time. So, we don't start the new parallel apply worker in this case.
     311                 :      */
     312 GNC          27 :     if (!AllTablesyncsReady())
     313 UNC           0 :         return false;
     314                 : 
     315 GNC          27 :     return true;
     316                 : }
     317                 : 
     318                 : /*
     319                 :  * Set up a dynamic shared memory segment.
     320                 :  *
     321                 :  * We set up a control region that contains a fixed-size worker info
     322                 :  * (ParallelApplyWorkerShared), a message queue, and an error queue.
     323                 :  *
     324                 :  * Returns true on success, false on failure.
     325                 :  */
     326                 : static bool
     327              10 : pa_setup_dsm(ParallelApplyWorkerInfo *winfo)
     328                 : {
     329                 :     shm_toc_estimator e;
     330                 :     Size        segsize;
     331                 :     dsm_segment *seg;
     332                 :     shm_toc    *toc;
     333                 :     ParallelApplyWorkerShared *shared;
     334                 :     shm_mq     *mq;
     335              10 :     Size        queue_size = DSM_QUEUE_SIZE;
     336              10 :     Size        error_queue_size = DSM_ERROR_QUEUE_SIZE;
     337                 : 
     338                 :     /*
     339                 :      * Estimate how much shared memory we need.
     340                 :      *
     341                 :      * Because the TOC machinery may choose to insert padding of oddly-sized
     342                 :      * requests, we must estimate each chunk separately.
     343                 :      *
     344                 :      * We need one key to register the location of the header, and two other
     345                 :      * keys to track the locations of the message queue and the error message
     346                 :      * queue.
     347                 :      */
     348              10 :     shm_toc_initialize_estimator(&e);
     349              10 :     shm_toc_estimate_chunk(&e, sizeof(ParallelApplyWorkerShared));
     350              10 :     shm_toc_estimate_chunk(&e, queue_size);
     351              10 :     shm_toc_estimate_chunk(&e, error_queue_size);
     352                 : 
     353              10 :     shm_toc_estimate_keys(&e, 3);
     354              10 :     segsize = shm_toc_estimate(&e);
     355                 : 
     356                 :     /* Create the shared memory segment and establish a table of contents. */
     357              10 :     seg = dsm_create(shm_toc_estimate(&e), 0);
     358              10 :     if (!seg)
     359 UNC           0 :         return false;
     360                 : 
     361 GNC          10 :     toc = shm_toc_create(PG_LOGICAL_APPLY_SHM_MAGIC, dsm_segment_address(seg),
     362                 :                          segsize);
     363                 : 
     364                 :     /* Set up the header region. */
     365              10 :     shared = shm_toc_allocate(toc, sizeof(ParallelApplyWorkerShared));
     366              10 :     SpinLockInit(&shared->mutex);
     367                 : 
     368              10 :     shared->xact_state = PARALLEL_TRANS_UNKNOWN;
     369              10 :     pg_atomic_init_u32(&(shared->pending_stream_count), 0);
     370              10 :     shared->last_commit_end = InvalidXLogRecPtr;
     371              10 :     shared->fileset_state = FS_EMPTY;
     372                 : 
     373              10 :     shm_toc_insert(toc, PARALLEL_APPLY_KEY_SHARED, shared);
     374                 : 
     375                 :     /* Set up message queue for the worker. */
     376              10 :     mq = shm_mq_create(shm_toc_allocate(toc, queue_size), queue_size);
     377              10 :     shm_toc_insert(toc, PARALLEL_APPLY_KEY_MQ, mq);
     378              10 :     shm_mq_set_sender(mq, MyProc);
     379                 : 
     380                 :     /* Attach the queue. */
     381              10 :     winfo->mq_handle = shm_mq_attach(mq, seg, NULL);
     382                 : 
     383                 :     /* Set up error queue for the worker. */
     384              10 :     mq = shm_mq_create(shm_toc_allocate(toc, error_queue_size),
     385                 :                        error_queue_size);
     386              10 :     shm_toc_insert(toc, PARALLEL_APPLY_KEY_ERROR_QUEUE, mq);
     387              10 :     shm_mq_set_receiver(mq, MyProc);
     388                 : 
     389                 :     /* Attach the queue. */
     390              10 :     winfo->error_mq_handle = shm_mq_attach(mq, seg, NULL);
     391                 : 
     392                 :     /* Return results to caller. */
     393              10 :     winfo->dsm_seg = seg;
     394              10 :     winfo->shared = shared;
     395                 : 
     396              10 :     return true;
     397                 : }
     398                 : 
     399                 : /*
     400                 :  * Try to get a parallel apply worker from the pool. If none is available then
     401                 :  * start a new one.
     402                 :  */
     403                 : static ParallelApplyWorkerInfo *
     404              27 : pa_launch_parallel_worker(void)
     405                 : {
     406                 :     MemoryContext oldcontext;
     407                 :     bool        launched;
     408                 :     ParallelApplyWorkerInfo *winfo;
     409                 :     ListCell   *lc;
     410                 : 
     411                 :     /* Try to get an available parallel apply worker from the worker pool. */
     412              29 :     foreach(lc, ParallelApplyWorkerPool)
     413                 :     {
     414              19 :         winfo = (ParallelApplyWorkerInfo *) lfirst(lc);
     415                 : 
     416              19 :         if (!winfo->in_use)
     417              17 :             return winfo;
     418                 :     }
     419                 : 
     420                 :     /*
     421                 :      * Start a new parallel apply worker.
     422                 :      *
     423                 :      * The worker info can be used for the lifetime of the worker process, so
     424                 :      * create it in a permanent context.
     425                 :      */
     426              10 :     oldcontext = MemoryContextSwitchTo(ApplyContext);
     427                 : 
     428              10 :     winfo = (ParallelApplyWorkerInfo *) palloc0(sizeof(ParallelApplyWorkerInfo));
     429                 : 
     430                 :     /* Setup shared memory. */
     431              10 :     if (!pa_setup_dsm(winfo))
     432                 :     {
     433 UNC           0 :         MemoryContextSwitchTo(oldcontext);
     434               0 :         pfree(winfo);
     435               0 :         return NULL;
     436                 :     }
     437                 : 
     438 GNC          10 :     launched = logicalrep_worker_launch(MyLogicalRepWorker->dbid,
     439              10 :                                         MySubscription->oid,
     440              10 :                                         MySubscription->name,
     441              10 :                                         MyLogicalRepWorker->userid,
     442                 :                                         InvalidOid,
     443                 :                                         dsm_segment_handle(winfo->dsm_seg));
     444                 : 
     445              10 :     if (launched)
     446                 :     {
     447              10 :         ParallelApplyWorkerPool = lappend(ParallelApplyWorkerPool, winfo);
     448                 :     }
     449                 :     else
     450                 :     {
     451 UNC           0 :         pa_free_worker_info(winfo);
     452               0 :         winfo = NULL;
     453                 :     }
     454                 : 
     455 GNC          10 :     MemoryContextSwitchTo(oldcontext);
     456                 : 
     457              10 :     return winfo;
     458                 : }
     459                 : 
     460                 : /*
     461                 :  * Allocate a parallel apply worker that will be used for the specified xid.
     462                 :  *
     463                 :  * We first try to get an available worker from the pool, if any and then try
     464                 :  * to launch a new worker. On successful allocation, remember the worker
     465                 :  * information in the hash table so that we can get it later for processing the
     466                 :  * streaming changes.
     467                 :  */
     468                 : void
     469              82 : pa_allocate_worker(TransactionId xid)
     470                 : {
     471                 :     bool        found;
     472              82 :     ParallelApplyWorkerInfo *winfo = NULL;
     473                 :     ParallelApplyWorkerEntry *entry;
     474                 : 
     475              82 :     if (!pa_can_start())
     476              55 :         return;
     477                 : 
     478              27 :     winfo = pa_launch_parallel_worker();
     479              27 :     if (!winfo)
     480 UNC           0 :         return;
     481                 : 
     482                 :     /* First time through, initialize parallel apply worker state hashtable. */
     483 GNC          27 :     if (!ParallelApplyTxnHash)
     484                 :     {
     485                 :         HASHCTL     ctl;
     486                 : 
     487              91 :         MemSet(&ctl, 0, sizeof(ctl));
     488               7 :         ctl.keysize = sizeof(TransactionId);
     489               7 :         ctl.entrysize = sizeof(ParallelApplyWorkerEntry);
     490               7 :         ctl.hcxt = ApplyContext;
     491                 : 
     492               7 :         ParallelApplyTxnHash = hash_create("logical replication parallel apply workers hash",
     493                 :                                            16, &ctl,
     494                 :                                            HASH_ELEM | HASH_BLOBS | HASH_CONTEXT);
     495                 :     }
     496                 : 
     497                 :     /* Create an entry for the requested transaction. */
     498              27 :     entry = hash_search(ParallelApplyTxnHash, &xid, HASH_ENTER, &found);
     499              27 :     if (found)
     500 UNC           0 :         elog(ERROR, "hash table corrupted");
     501                 : 
     502                 :     /* Update the transaction information in shared memory. */
     503 GNC          27 :     SpinLockAcquire(&winfo->shared->mutex);
     504              27 :     winfo->shared->xact_state = PARALLEL_TRANS_UNKNOWN;
     505              27 :     winfo->shared->xid = xid;
     506              27 :     SpinLockRelease(&winfo->shared->mutex);
     507                 : 
     508              27 :     winfo->in_use = true;
     509              27 :     winfo->serialize_changes = false;
     510              27 :     entry->winfo = winfo;
     511              27 :     entry->xid = xid;
     512                 : }
     513                 : 
     514                 : /*
     515                 :  * Find the assigned worker for the given transaction, if any.
     516                 :  */
     517                 : ParallelApplyWorkerInfo *
     518          256993 : pa_find_worker(TransactionId xid)
     519                 : {
     520                 :     bool        found;
     521                 :     ParallelApplyWorkerEntry *entry;
     522                 : 
     523          256993 :     if (!TransactionIdIsValid(xid))
     524           79826 :         return NULL;
     525                 : 
     526          177167 :     if (!ParallelApplyTxnHash)
     527          103236 :         return NULL;
     528                 : 
     529                 :     /* Return the cached parallel apply worker if valid. */
     530           73931 :     if (stream_apply_worker)
     531           73644 :         return stream_apply_worker;
     532                 : 
     533                 :     /* Find an entry for the requested transaction. */
     534             287 :     entry = hash_search(ParallelApplyTxnHash, &xid, HASH_FIND, &found);
     535             287 :     if (found)
     536                 :     {
     537                 :         /* The worker must not have exited.  */
     538             287 :         Assert(entry->winfo->in_use);
     539             287 :         return entry->winfo;
     540                 :     }
     541                 : 
     542 UNC           0 :     return NULL;
     543                 : }
     544                 : 
     545                 : /*
     546                 :  * Makes the worker available for reuse.
     547                 :  *
     548                 :  * This removes the parallel apply worker entry from the hash table so that it
     549                 :  * can't be used. If there are enough workers in the pool, it stops the worker
     550                 :  * and frees the corresponding info. Otherwise it just marks the worker as
     551                 :  * available for reuse.
     552                 :  *
     553                 :  * For more information about the worker pool, see comments atop this file.
     554                 :  */
     555                 : static void
     556 GNC          24 : pa_free_worker(ParallelApplyWorkerInfo *winfo)
     557                 : {
     558              24 :     Assert(!am_parallel_apply_worker());
     559              24 :     Assert(winfo->in_use);
     560              24 :     Assert(pa_get_xact_state(winfo->shared) == PARALLEL_TRANS_FINISHED);
     561                 : 
     562              24 :     if (!hash_search(ParallelApplyTxnHash, &winfo->shared->xid, HASH_REMOVE, NULL))
     563 UNC           0 :         elog(ERROR, "hash table corrupted");
     564                 : 
     565                 :     /*
     566                 :      * Stop the worker if there are enough workers in the pool.
     567                 :      *
     568                 :      * XXX Additionally, we also stop the worker if the leader apply worker
     569                 :      * serialize part of the transaction data due to a send timeout. This is
     570                 :      * because the message could be partially written to the queue and there
     571                 :      * is no way to clean the queue other than resending the message until it
     572                 :      * succeeds. Instead of trying to send the data which anyway would have
     573                 :      * been serialized and then letting the parallel apply worker deal with
     574                 :      * the spurious message, we stop the worker.
     575                 :      */
     576 GNC          24 :     if (winfo->serialize_changes ||
     577              20 :         list_length(ParallelApplyWorkerPool) >
     578              20 :         (max_parallel_apply_workers_per_subscription / 2))
     579                 :     {
     580                 :         int         slot_no;
     581                 :         uint16      generation;
     582                 : 
     583               5 :         SpinLockAcquire(&winfo->shared->mutex);
     584               5 :         generation = winfo->shared->logicalrep_worker_generation;
     585               5 :         slot_no = winfo->shared->logicalrep_worker_slot_no;
     586               5 :         SpinLockRelease(&winfo->shared->mutex);
     587                 : 
     588               5 :         logicalrep_pa_worker_stop(slot_no, generation);
     589                 : 
     590               5 :         pa_free_worker_info(winfo);
     591                 : 
     592               5 :         return;
     593                 :     }
     594                 : 
     595              19 :     winfo->in_use = false;
     596              19 :     winfo->serialize_changes = false;
     597                 : }
     598                 : 
     599                 : /*
     600                 :  * Free the parallel apply worker information and unlink the files with
     601                 :  * serialized changes if any.
     602                 :  */
     603                 : static void
     604               5 : pa_free_worker_info(ParallelApplyWorkerInfo *winfo)
     605                 : {
     606               5 :     Assert(winfo);
     607                 : 
     608               5 :     if (winfo->mq_handle)
     609               5 :         shm_mq_detach(winfo->mq_handle);
     610                 : 
     611               5 :     if (winfo->error_mq_handle)
     612               5 :         shm_mq_detach(winfo->error_mq_handle);
     613                 : 
     614                 :     /* Unlink the files with serialized changes. */
     615               5 :     if (winfo->serialize_changes)
     616               4 :         stream_cleanup_files(MyLogicalRepWorker->subid, winfo->shared->xid);
     617                 : 
     618               5 :     if (winfo->dsm_seg)
     619               5 :         dsm_detach(winfo->dsm_seg);
     620                 : 
     621                 :     /* Remove from the worker pool. */
     622               5 :     ParallelApplyWorkerPool = list_delete_ptr(ParallelApplyWorkerPool, winfo);
     623                 : 
     624               5 :     pfree(winfo);
     625               5 : }
     626                 : 
     627                 : /*
     628                 :  * Detach the error queue for all parallel apply workers.
     629                 :  */
     630                 : void
     631             148 : pa_detach_all_error_mq(void)
     632                 : {
     633                 :     ListCell   *lc;
     634                 : 
     635             153 :     foreach(lc, ParallelApplyWorkerPool)
     636                 :     {
     637               5 :         ParallelApplyWorkerInfo *winfo = (ParallelApplyWorkerInfo *) lfirst(lc);
     638                 : 
     639               5 :         shm_mq_detach(winfo->error_mq_handle);
     640               5 :         winfo->error_mq_handle = NULL;
     641                 :     }
     642             148 : }
     643                 : 
     644                 : /*
     645                 :  * Check if there are any pending spooled messages.
     646                 :  */
     647                 : static bool
     648              16 : pa_has_spooled_message_pending()
     649                 : {
     650                 :     PartialFileSetState fileset_state;
     651                 : 
     652              16 :     fileset_state = pa_get_fileset_state();
     653                 : 
     654              16 :     return (fileset_state != FS_EMPTY);
     655                 : }
     656                 : 
     657                 : /*
     658                 :  * Replay the spooled messages once the leader apply worker has finished
     659                 :  * serializing changes to the file.
     660                 :  *
     661                 :  * Returns false if there aren't any pending spooled messages, true otherwise.
     662                 :  */
     663                 : static bool
     664              52 : pa_process_spooled_messages_if_required(void)
     665                 : {
     666                 :     PartialFileSetState fileset_state;
     667                 : 
     668              52 :     fileset_state = pa_get_fileset_state();
     669                 : 
     670              52 :     if (fileset_state == FS_EMPTY)
     671              44 :         return false;
     672                 : 
     673                 :     /*
     674                 :      * If the leader apply worker is busy serializing the partial changes then
     675                 :      * acquire the stream lock now and wait for the leader worker to finish
     676                 :      * serializing the changes. Otherwise, the parallel apply worker won't get
     677                 :      * a chance to receive a STREAM_STOP (and acquire the stream lock) until
     678                 :      * the leader had serialized all changes which can lead to undetected
     679                 :      * deadlock.
     680                 :      *
     681                 :      * Note that the fileset state can be FS_SERIALIZE_DONE once the leader
     682                 :      * worker has finished serializing the changes.
     683                 :      */
     684               8 :     if (fileset_state == FS_SERIALIZE_IN_PROGRESS)
     685                 :     {
     686 UNC           0 :         pa_lock_stream(MyParallelShared->xid, AccessShareLock);
     687               0 :         pa_unlock_stream(MyParallelShared->xid, AccessShareLock);
     688                 : 
     689               0 :         fileset_state = pa_get_fileset_state();
     690                 :     }
     691                 : 
     692                 :     /*
     693                 :      * We cannot read the file immediately after the leader has serialized all
     694                 :      * changes to the file because there may still be messages in the memory
     695                 :      * queue. We will apply all spooled messages the next time we call this
     696                 :      * function and that will ensure there are no messages left in the memory
     697                 :      * queue.
     698                 :      */
     699 GNC           8 :     if (fileset_state == FS_SERIALIZE_DONE)
     700                 :     {
     701               4 :         pa_set_fileset_state(MyParallelShared, FS_READY);
     702                 :     }
     703               4 :     else if (fileset_state == FS_READY)
     704                 :     {
     705               4 :         apply_spooled_messages(&MyParallelShared->fileset,
     706               4 :                                MyParallelShared->xid,
     707                 :                                InvalidXLogRecPtr);
     708               4 :         pa_set_fileset_state(MyParallelShared, FS_EMPTY);
     709                 :     }
     710                 : 
     711               8 :     return true;
     712                 : }
     713                 : 
     714                 : /*
     715                 :  * Interrupt handler for main loop of parallel apply worker.
     716                 :  */
     717                 : static void
     718           63928 : ProcessParallelApplyInterrupts(void)
     719                 : {
     720           63928 :     CHECK_FOR_INTERRUPTS();
     721                 : 
     722           63926 :     if (ShutdownRequestPending)
     723                 :     {
     724               5 :         ereport(LOG,
     725                 :                 (errmsg("logical replication parallel apply worker for subscription \"%s\" has finished",
     726                 :                         MySubscription->name)));
     727                 : 
     728               5 :         proc_exit(0);
     729                 :     }
     730                 : 
     731           63921 :     if (ConfigReloadPending)
     732                 :     {
     733               4 :         ConfigReloadPending = false;
     734               4 :         ProcessConfigFile(PGC_SIGHUP);
     735                 :     }
     736           63921 : }
     737                 : 
     738                 : /* Parallel apply worker main loop. */
     739                 : static void
     740              10 : LogicalParallelApplyLoop(shm_mq_handle *mqh)
     741                 : {
     742                 :     shm_mq_result shmq_res;
     743                 :     ErrorContextCallback errcallback;
     744              10 :     MemoryContext oldcxt = CurrentMemoryContext;
     745                 : 
     746                 :     /*
     747                 :      * Init the ApplyMessageContext which we clean up after each replication
     748                 :      * protocol message.
     749                 :      */
     750              10 :     ApplyMessageContext = AllocSetContextCreate(ApplyContext,
     751                 :                                                 "ApplyMessageContext",
     752                 :                                                 ALLOCSET_DEFAULT_SIZES);
     753                 : 
     754                 :     /*
     755                 :      * Push apply error context callback. Fields will be filled while applying
     756                 :      * a change.
     757                 :      */
     758              10 :     errcallback.callback = apply_error_callback;
     759              10 :     errcallback.previous = error_context_stack;
     760              10 :     error_context_stack = &errcallback;
     761                 : 
     762                 :     for (;;)
     763           63918 :     {
     764                 :         void       *data;
     765                 :         Size        len;
     766                 : 
     767           63928 :         ProcessParallelApplyInterrupts();
     768                 : 
     769                 :         /* Ensure we are reading the data into our memory context. */
     770           63921 :         MemoryContextSwitchTo(ApplyMessageContext);
     771                 : 
     772           63921 :         shmq_res = shm_mq_receive(mqh, &len, &data, true);
     773                 : 
     774           63921 :         if (shmq_res == SHM_MQ_SUCCESS)
     775                 :         {
     776                 :             StringInfoData s;
     777                 :             int         c;
     778                 : 
     779           63869 :             if (len == 0)
     780 UNC           0 :                 elog(ERROR, "invalid message length");
     781                 : 
     782 GNC       63869 :             s.cursor = 0;
     783           63869 :             s.maxlen = -1;
     784           63869 :             s.data = (char *) data;
     785           63869 :             s.len = len;
     786                 : 
     787                 :             /*
     788                 :              * The first byte of messages sent from leader apply worker to
     789                 :              * parallel apply workers can only be 'w'.
     790                 :              */
     791           63869 :             c = pq_getmsgbyte(&s);
     792           63869 :             if (c != 'w')
     793 UNC           0 :                 elog(ERROR, "unexpected message \"%c\"", c);
     794                 : 
     795                 :             /*
     796                 :              * Ignore statistics fields that have been updated by the leader
     797                 :              * apply worker.
     798                 :              *
     799                 :              * XXX We can avoid sending the statistics fields from the leader
     800                 :              * apply worker but for that, it needs to rebuild the entire
     801                 :              * message by removing these fields which could be more work than
     802                 :              * simply ignoring these fields in the parallel apply worker.
     803                 :              */
     804 GNC       63869 :             s.cursor += SIZE_STATS_MESSAGE;
     805                 : 
     806           63869 :             apply_dispatch(&s);
     807                 :         }
     808              52 :         else if (shmq_res == SHM_MQ_WOULD_BLOCK)
     809                 :         {
     810                 :             /* Replay the changes from the file, if any. */
     811              52 :             if (!pa_process_spooled_messages_if_required())
     812                 :             {
     813                 :                 int         rc;
     814                 : 
     815                 :                 /* Wait for more work. */
     816              44 :                 rc = WaitLatch(MyLatch,
     817                 :                                WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH,
     818                 :                                1000L,
     819                 :                                WAIT_EVENT_LOGICAL_PARALLEL_APPLY_MAIN);
     820                 : 
     821              44 :                 if (rc & WL_LATCH_SET)
     822              41 :                     ResetLatch(MyLatch);
     823                 :             }
     824                 :         }
     825                 :         else
     826                 :         {
     827 UNC           0 :             Assert(shmq_res == SHM_MQ_DETACHED);
     828                 : 
     829               0 :             ereport(ERROR,
     830                 :                     (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
     831                 :                      errmsg("lost connection to the logical replication apply worker")));
     832                 :         }
     833                 : 
     834 GNC       63918 :         MemoryContextReset(ApplyMessageContext);
     835           63918 :         MemoryContextSwitchTo(oldcxt);
     836                 :     }
     837                 : 
     838                 :     /* Pop the error context stack. */
     839                 :     error_context_stack = errcallback.previous;
     840                 : 
     841                 :     MemoryContextSwitchTo(oldcxt);
     842                 : }
     843                 : 
     844                 : /*
     845                 :  * Make sure the leader apply worker tries to read from our error queue one more
     846                 :  * time. This guards against the case where we exit uncleanly without sending
     847                 :  * an ErrorResponse, for example because some code calls proc_exit directly.
     848                 :  */
     849                 : static void
     850              10 : pa_shutdown(int code, Datum arg)
     851                 : {
     852              10 :     SendProcSignal(MyLogicalRepWorker->leader_pid,
     853                 :                    PROCSIG_PARALLEL_APPLY_MESSAGE,
     854                 :                    InvalidBackendId);
     855                 : 
     856              10 :     dsm_detach((dsm_segment *) DatumGetPointer(arg));
     857              10 : }
     858                 : 
     859                 : /*
     860                 :  * Parallel apply worker entry point.
     861                 :  */
     862                 : void
     863              10 : ParallelApplyWorkerMain(Datum main_arg)
     864                 : {
     865                 :     ParallelApplyWorkerShared *shared;
     866                 :     dsm_handle  handle;
     867                 :     dsm_segment *seg;
     868                 :     shm_toc    *toc;
     869                 :     shm_mq     *mq;
     870                 :     shm_mq_handle *mqh;
     871                 :     shm_mq_handle *error_mqh;
     872                 :     RepOriginId originid;
     873              10 :     int         worker_slot = DatumGetInt32(main_arg);
     874                 :     char        originname[NAMEDATALEN];
     875                 : 
     876                 :     /* Setup signal handling. */
     877              10 :     pqsignal(SIGHUP, SignalHandlerForConfigReload);
     878              10 :     pqsignal(SIGINT, SignalHandlerForShutdownRequest);
     879              10 :     pqsignal(SIGTERM, die);
     880              10 :     BackgroundWorkerUnblockSignals();
     881                 : 
     882                 :     /*
     883                 :      * Attach to the dynamic shared memory segment for the parallel apply, and
     884                 :      * find its table of contents.
     885                 :      *
     886                 :      * Like parallel query, we don't need resource owner by this time. See
     887                 :      * ParallelWorkerMain.
     888                 :      */
     889              10 :     memcpy(&handle, MyBgworkerEntry->bgw_extra, sizeof(dsm_handle));
     890              10 :     seg = dsm_attach(handle);
     891              10 :     if (!seg)
     892 UNC           0 :         ereport(ERROR,
     893                 :                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
     894                 :                  errmsg("unable to map dynamic shared memory segment")));
     895                 : 
     896 GNC          10 :     toc = shm_toc_attach(PG_LOGICAL_APPLY_SHM_MAGIC, dsm_segment_address(seg));
     897              10 :     if (!toc)
     898 UNC           0 :         ereport(ERROR,
     899                 :                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
     900                 :                  errmsg("bad magic number in dynamic shared memory segment")));
     901                 : 
     902 GNC          10 :     before_shmem_exit(pa_shutdown, PointerGetDatum(seg));
     903                 : 
     904                 :     /* Look up the shared information. */
     905              10 :     shared = shm_toc_lookup(toc, PARALLEL_APPLY_KEY_SHARED, false);
     906              10 :     MyParallelShared = shared;
     907                 : 
     908                 :     /*
     909                 :      * Attach to the message queue.
     910                 :      */
     911              10 :     mq = shm_toc_lookup(toc, PARALLEL_APPLY_KEY_MQ, false);
     912              10 :     shm_mq_set_receiver(mq, MyProc);
     913              10 :     mqh = shm_mq_attach(mq, seg, NULL);
     914                 : 
     915                 :     /*
     916                 :      * Primary initialization is complete. Now, we can attach to our slot.
     917                 :      * This is to ensure that the leader apply worker does not write data to
     918                 :      * the uninitialized memory queue.
     919                 :      */
     920              10 :     logicalrep_worker_attach(worker_slot);
     921                 : 
     922              10 :     SpinLockAcquire(&MyParallelShared->mutex);
     923              10 :     MyParallelShared->logicalrep_worker_generation = MyLogicalRepWorker->generation;
     924              10 :     MyParallelShared->logicalrep_worker_slot_no = worker_slot;
     925              10 :     SpinLockRelease(&MyParallelShared->mutex);
     926                 : 
     927                 :     /*
     928                 :      * Attach to the error queue.
     929                 :      */
     930              10 :     mq = shm_toc_lookup(toc, PARALLEL_APPLY_KEY_ERROR_QUEUE, false);
     931              10 :     shm_mq_set_sender(mq, MyProc);
     932              10 :     error_mqh = shm_mq_attach(mq, seg, NULL);
     933                 : 
     934              10 :     pq_redirect_to_shm_mq(seg, error_mqh);
     935              10 :     pq_set_parallel_leader(MyLogicalRepWorker->leader_pid,
     936                 :                            InvalidBackendId);
     937                 : 
     938              10 :     MyLogicalRepWorker->last_send_time = MyLogicalRepWorker->last_recv_time =
     939              10 :         MyLogicalRepWorker->reply_time = 0;
     940                 : 
     941              10 :     InitializeApplyWorker();
     942                 : 
     943                 :     /* Setup replication origin tracking. */
     944              10 :     StartTransactionCommand();
     945              10 :     ReplicationOriginNameForLogicalRep(MySubscription->oid, InvalidOid,
     946                 :                                        originname, sizeof(originname));
     947              10 :     originid = replorigin_by_name(originname, false);
     948                 : 
     949                 :     /*
     950                 :      * The parallel apply worker doesn't need to monopolize this replication
     951                 :      * origin which was already acquired by its leader process.
     952                 :      */
     953              10 :     replorigin_session_setup(originid, MyLogicalRepWorker->leader_pid);
     954              10 :     replorigin_session_origin = originid;
     955              10 :     CommitTransactionCommand();
     956                 : 
     957                 :     /*
     958                 :      * Setup callback for syscache so that we know when something changes in
     959                 :      * the subscription relation state.
     960                 :      */
     961              10 :     CacheRegisterSyscacheCallback(SUBSCRIPTIONRELMAP,
     962                 :                                   invalidate_syncing_table_states,
     963                 :                                   (Datum) 0);
     964                 : 
     965              10 :     set_apply_error_context_origin(originname);
     966                 : 
     967              10 :     LogicalParallelApplyLoop(mqh);
     968                 : 
     969                 :     /*
     970                 :      * The parallel apply worker must not get here because the parallel apply
     971                 :      * worker will only stop when it receives a SIGTERM or SIGINT from the
     972                 :      * leader, or when there is an error. None of these cases will allow the
     973                 :      * code to reach here.
     974                 :      */
     975 UNC           0 :     Assert(false);
     976                 : }
     977                 : 
     978                 : /*
     979                 :  * Handle receipt of an interrupt indicating a parallel apply worker message.
     980                 :  *
     981                 :  * Note: this is called within a signal handler! All we can do is set a flag
     982                 :  * that will cause the next CHECK_FOR_INTERRUPTS() to invoke
     983                 :  * HandleParallelApplyMessages().
     984                 :  */
     985                 : void
     986 GNC           5 : HandleParallelApplyMessageInterrupt(void)
     987                 : {
     988               5 :     InterruptPending = true;
     989               5 :     ParallelApplyMessagePending = true;
     990               5 :     SetLatch(MyLatch);
     991               5 : }
     992                 : 
     993                 : /*
     994                 :  * Handle a single protocol message received from a single parallel apply
     995                 :  * worker.
     996                 :  */
     997                 : static void
     998               1 : HandleParallelApplyMessage(StringInfo msg)
     999                 : {
    1000                 :     char        msgtype;
    1001                 : 
    1002               1 :     msgtype = pq_getmsgbyte(msg);
    1003                 : 
    1004               1 :     switch (msgtype)
    1005                 :     {
    1006               1 :         case 'E':               /* ErrorResponse */
    1007                 :             {
    1008                 :                 ErrorData   edata;
    1009                 : 
    1010                 :                 /* Parse ErrorResponse. */
    1011               1 :                 pq_parse_errornotice(msg, &edata);
    1012                 : 
    1013                 :                 /*
    1014                 :                  * If desired, add a context line to show that this is a
    1015                 :                  * message propagated from a parallel apply worker. Otherwise,
    1016                 :                  * it can sometimes be confusing to understand what actually
    1017                 :                  * happened.
    1018                 :                  */
    1019               1 :                 if (edata.context)
    1020               1 :                     edata.context = psprintf("%s\n%s", edata.context,
    1021                 :                                              _("logical replication parallel apply worker"));
    1022                 :                 else
    1023 UNC           0 :                     edata.context = pstrdup(_("logical replication parallel apply worker"));
    1024                 : 
    1025                 :                 /*
    1026                 :                  * Context beyond that should use the error context callbacks
    1027                 :                  * that were in effect in LogicalRepApplyLoop().
    1028                 :                  */
    1029 GNC           1 :                 error_context_stack = apply_error_context_stack;
    1030                 : 
    1031                 :                 /*
    1032                 :                  * The actual error must have been reported by the parallel
    1033                 :                  * apply worker.
    1034                 :                  */
    1035               1 :                 ereport(ERROR,
    1036                 :                         (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
    1037                 :                          errmsg("logical replication parallel apply worker exited due to error"),
    1038                 :                          errcontext("%s", edata.context)));
    1039                 :             }
    1040                 : 
    1041                 :             /*
    1042                 :              * Don't need to do anything about NoticeResponse and
    1043                 :              * NotifyResponse as the logical replication worker doesn't need
    1044                 :              * to send messages to the client.
    1045                 :              */
    1046 UNC           0 :         case 'N':
    1047                 :         case 'A':
    1048               0 :             break;
    1049                 : 
    1050               0 :         default:
    1051               0 :             elog(ERROR, "unrecognized message type received from logical replication parallel apply worker: %c (message length %d bytes)",
    1052                 :                  msgtype, msg->len);
    1053                 :     }
    1054               0 : }
    1055                 : 
    1056                 : /*
    1057                 :  * Handle any queued protocol messages received from parallel apply workers.
    1058                 :  */
    1059                 : void
    1060 GNC           1 : HandleParallelApplyMessages(void)
    1061                 : {
    1062                 :     ListCell   *lc;
    1063                 :     MemoryContext oldcontext;
    1064                 : 
    1065                 :     static MemoryContext hpam_context = NULL;
    1066                 : 
    1067                 :     /*
    1068                 :      * This is invoked from ProcessInterrupts(), and since some of the
    1069                 :      * functions it calls contain CHECK_FOR_INTERRUPTS(), there is a potential
    1070                 :      * for recursive calls if more signals are received while this runs. It's
    1071                 :      * unclear that recursive entry would be safe, and it doesn't seem useful
    1072                 :      * even if it is safe, so let's block interrupts until done.
    1073                 :      */
    1074               1 :     HOLD_INTERRUPTS();
    1075                 : 
    1076                 :     /*
    1077                 :      * Moreover, CurrentMemoryContext might be pointing almost anywhere. We
    1078                 :      * don't want to risk leaking data into long-lived contexts, so let's do
    1079                 :      * our work here in a private context that we can reset on each use.
    1080                 :      */
    1081               1 :     if (!hpam_context)          /* first time through? */
    1082               1 :         hpam_context = AllocSetContextCreate(TopMemoryContext,
    1083                 :                                              "HandleParallelApplyMessages",
    1084                 :                                              ALLOCSET_DEFAULT_SIZES);
    1085                 :     else
    1086 UNC           0 :         MemoryContextReset(hpam_context);
    1087                 : 
    1088 GNC           1 :     oldcontext = MemoryContextSwitchTo(hpam_context);
    1089                 : 
    1090               1 :     ParallelApplyMessagePending = false;
    1091                 : 
    1092               1 :     foreach(lc, ParallelApplyWorkerPool)
    1093                 :     {
    1094                 :         shm_mq_result res;
    1095                 :         Size        nbytes;
    1096                 :         void       *data;
    1097               1 :         ParallelApplyWorkerInfo *winfo = (ParallelApplyWorkerInfo *) lfirst(lc);
    1098                 : 
    1099                 :         /*
    1100                 :          * The leader will detach from the error queue and set it to NULL
    1101                 :          * before preparing to stop all parallel apply workers, so we don't
    1102                 :          * need to handle error messages anymore. See
    1103                 :          * logicalrep_worker_detach.
    1104                 :          */
    1105               1 :         if (!winfo->error_mq_handle)
    1106 UNC           0 :             continue;
    1107                 : 
    1108 GNC           1 :         res = shm_mq_receive(winfo->error_mq_handle, &nbytes, &data, true);
    1109                 : 
    1110               1 :         if (res == SHM_MQ_WOULD_BLOCK)
    1111 UNC           0 :             continue;
    1112 GNC           1 :         else if (res == SHM_MQ_SUCCESS)
    1113                 :         {
    1114                 :             StringInfoData msg;
    1115                 : 
    1116               1 :             initStringInfo(&msg);
    1117               1 :             appendBinaryStringInfo(&msg, data, nbytes);
    1118               1 :             HandleParallelApplyMessage(&msg);
    1119 UNC           0 :             pfree(msg.data);
    1120                 :         }
    1121                 :         else
    1122               0 :             ereport(ERROR,
    1123                 :                     (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
    1124                 :                      errmsg("lost connection to the logical replication parallel apply worker")));
    1125                 :     }
    1126                 : 
    1127               0 :     MemoryContextSwitchTo(oldcontext);
    1128                 : 
    1129                 :     /* Might as well clear the context on our way out */
    1130               0 :     MemoryContextReset(hpam_context);
    1131                 : 
    1132               0 :     RESUME_INTERRUPTS();
    1133               0 : }
    1134                 : 
    1135                 : /*
    1136                 :  * Send the data to the specified parallel apply worker via shared-memory
    1137                 :  * queue.
    1138                 :  *
    1139                 :  * Returns false if the attempt to send data via shared memory times out, true
    1140                 :  * otherwise.
    1141                 :  */
    1142                 : bool
    1143 GNC       68894 : pa_send_data(ParallelApplyWorkerInfo *winfo, Size nbytes, const void *data)
    1144                 : {
    1145                 :     int         rc;
    1146                 :     shm_mq_result result;
    1147           68894 :     TimestampTz startTime = 0;
    1148                 : 
    1149           68894 :     Assert(!IsTransactionState());
    1150           68894 :     Assert(!winfo->serialize_changes);
    1151                 : 
    1152                 :     /*
    1153                 :      * We don't try to send data to parallel worker for 'immediate' mode. This
    1154                 :      * is primarily used for testing purposes.
    1155                 :      */
    1156           68894 :     if (unlikely(logical_replication_mode == LOGICAL_REP_MODE_IMMEDIATE))
    1157               4 :         return false;
    1158                 : 
    1159                 : /*
    1160                 :  * This timeout is a bit arbitrary but testing revealed that it is sufficient
    1161                 :  * to send the message unless the parallel apply worker is waiting on some
    1162                 :  * lock or there is a serious resource crunch. See the comments atop this file
    1163                 :  * to know why we are using a non-blocking way to send the message.
    1164                 :  */
    1165                 : #define SHM_SEND_RETRY_INTERVAL_MS 1000
    1166                 : #define SHM_SEND_TIMEOUT_MS     (10000 - SHM_SEND_RETRY_INTERVAL_MS)
    1167                 : 
    1168                 :     for (;;)
    1169                 :     {
    1170           68890 :         result = shm_mq_send(winfo->mq_handle, nbytes, data, true, true);
    1171                 : 
    1172           68890 :         if (result == SHM_MQ_SUCCESS)
    1173           68890 :             return true;
    1174 UNC           0 :         else if (result == SHM_MQ_DETACHED)
    1175               0 :             ereport(ERROR,
    1176                 :                     (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
    1177                 :                      errmsg("could not send data to shared-memory queue")));
    1178                 : 
    1179               0 :         Assert(result == SHM_MQ_WOULD_BLOCK);
    1180                 : 
    1181                 :         /* Wait before retrying. */
    1182               0 :         rc = WaitLatch(MyLatch,
    1183                 :                        WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH,
    1184                 :                        SHM_SEND_RETRY_INTERVAL_MS,
    1185                 :                        WAIT_EVENT_LOGICAL_APPLY_SEND_DATA);
    1186                 : 
    1187               0 :         if (rc & WL_LATCH_SET)
    1188                 :         {
    1189               0 :             ResetLatch(MyLatch);
    1190               0 :             CHECK_FOR_INTERRUPTS();
    1191                 :         }
    1192                 : 
    1193               0 :         if (startTime == 0)
    1194               0 :             startTime = GetCurrentTimestamp();
    1195               0 :         else if (TimestampDifferenceExceeds(startTime, GetCurrentTimestamp(),
    1196                 :                                             SHM_SEND_TIMEOUT_MS))
    1197               0 :             return false;
    1198                 :     }
    1199                 : }
    1200                 : 
    1201                 : /*
    1202                 :  * Switch to PARTIAL_SERIALIZE mode for the current transaction -- this means
    1203                 :  * that the current data and any subsequent data for this transaction will be
    1204                 :  * serialized to a file. This is done to prevent possible deadlocks with
    1205                 :  * another parallel apply worker (refer to the comments atop this file).
    1206                 :  */
    1207                 : void
    1208 GNC           4 : pa_switch_to_partial_serialize(ParallelApplyWorkerInfo *winfo,
    1209                 :                                bool stream_locked)
    1210                 : {
    1211               4 :     ereport(LOG,
    1212                 :             (errmsg("logical replication apply worker will serialize the remaining changes of remote transaction %u to a file",
    1213                 :                     winfo->shared->xid)));
    1214                 : 
    1215                 :     /*
    1216                 :      * The parallel apply worker could be stuck for some reason (say waiting
    1217                 :      * on some lock by other backend), so stop trying to send data directly to
    1218                 :      * it and start serializing data to the file instead.
    1219                 :      */
    1220               4 :     winfo->serialize_changes = true;
    1221                 : 
    1222                 :     /* Initialize the stream fileset. */
    1223               4 :     stream_start_internal(winfo->shared->xid, true);
    1224                 : 
    1225                 :     /*
    1226                 :      * Acquires the stream lock if not already to make sure that the parallel
    1227                 :      * apply worker will wait for the leader to release the stream lock until
    1228                 :      * the end of the transaction.
    1229                 :      */
    1230               4 :     if (!stream_locked)
    1231               4 :         pa_lock_stream(winfo->shared->xid, AccessExclusiveLock);
    1232                 : 
    1233               4 :     pa_set_fileset_state(winfo->shared, FS_SERIALIZE_IN_PROGRESS);
    1234               4 : }
    1235                 : 
    1236                 : /*
    1237                 :  * Wait until the parallel apply worker's transaction state has reached or
    1238                 :  * exceeded the given xact_state.
    1239                 :  */
    1240                 : static void
    1241              25 : pa_wait_for_xact_state(ParallelApplyWorkerInfo *winfo,
    1242                 :                        ParallelTransState xact_state)
    1243                 : {
    1244                 :     for (;;)
    1245                 :     {
    1246                 :         /*
    1247                 :          * Stop if the transaction state has reached or exceeded the given
    1248                 :          * xact_state.
    1249                 :          */
    1250             291 :         if (pa_get_xact_state(winfo->shared) >= xact_state)
    1251              25 :             break;
    1252                 : 
    1253                 :         /* Wait to be signalled. */
    1254             266 :         (void) WaitLatch(MyLatch,
    1255                 :                          WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH,
    1256                 :                          10L,
    1257                 :                          WAIT_EVENT_LOGICAL_PARALLEL_APPLY_STATE_CHANGE);
    1258                 : 
    1259                 :         /* Reset the latch so we don't spin. */
    1260             266 :         ResetLatch(MyLatch);
    1261                 : 
    1262                 :         /* An interrupt may have occurred while we were waiting. */
    1263             266 :         CHECK_FOR_INTERRUPTS();
    1264                 :     }
    1265              25 : }
    1266                 : 
    1267                 : /*
    1268                 :  * Wait until the parallel apply worker's transaction finishes.
    1269                 :  */
    1270                 : static void
    1271              25 : pa_wait_for_xact_finish(ParallelApplyWorkerInfo *winfo)
    1272                 : {
    1273                 :     /*
    1274                 :      * Wait until the parallel apply worker set the state to
    1275                 :      * PARALLEL_TRANS_STARTED which means it has acquired the transaction
    1276                 :      * lock. This is to prevent leader apply worker from acquiring the
    1277                 :      * transaction lock earlier than the parallel apply worker.
    1278                 :      */
    1279              25 :     pa_wait_for_xact_state(winfo, PARALLEL_TRANS_STARTED);
    1280                 : 
    1281                 :     /*
    1282                 :      * Wait for the transaction lock to be released. This is required to
    1283                 :      * detect deadlock among leader and parallel apply workers. Refer to the
    1284                 :      * comments atop this file.
    1285                 :      */
    1286              25 :     pa_lock_transaction(winfo->shared->xid, AccessShareLock);
    1287              24 :     pa_unlock_transaction(winfo->shared->xid, AccessShareLock);
    1288                 : 
    1289                 :     /*
    1290                 :      * Check if the state becomes PARALLEL_TRANS_FINISHED in case the parallel
    1291                 :      * apply worker failed while applying changes causing the lock to be
    1292                 :      * released.
    1293                 :      */
    1294              24 :     if (pa_get_xact_state(winfo->shared) != PARALLEL_TRANS_FINISHED)
    1295 UNC           0 :         ereport(ERROR,
    1296                 :                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
    1297                 :                  errmsg("lost connection to the logical replication parallel apply worker")));
    1298 GNC          24 : }
    1299                 : 
    1300                 : /*
    1301                 :  * Set the transaction state for a given parallel apply worker.
    1302                 :  */
    1303                 : void
    1304              51 : pa_set_xact_state(ParallelApplyWorkerShared *wshared,
    1305                 :                   ParallelTransState xact_state)
    1306                 : {
    1307              51 :     SpinLockAcquire(&wshared->mutex);
    1308              51 :     wshared->xact_state = xact_state;
    1309              51 :     SpinLockRelease(&wshared->mutex);
    1310              51 : }
    1311                 : 
    1312                 : /*
    1313                 :  * Get the transaction state for a given parallel apply worker.
    1314                 :  */
    1315                 : static ParallelTransState
    1316             339 : pa_get_xact_state(ParallelApplyWorkerShared *wshared)
    1317                 : {
    1318                 :     ParallelTransState xact_state;
    1319                 : 
    1320             339 :     SpinLockAcquire(&wshared->mutex);
    1321             339 :     xact_state = wshared->xact_state;
    1322             339 :     SpinLockRelease(&wshared->mutex);
    1323                 : 
    1324             339 :     return xact_state;
    1325                 : }
    1326                 : 
    1327                 : /*
    1328                 :  * Cache the parallel apply worker information.
    1329                 :  */
    1330                 : void
    1331             504 : pa_set_stream_apply_worker(ParallelApplyWorkerInfo *winfo)
    1332                 : {
    1333             504 :     stream_apply_worker = winfo;
    1334             504 : }
    1335                 : 
    1336                 : /*
    1337                 :  * Form a unique savepoint name for the streaming transaction.
    1338                 :  *
    1339                 :  * Note that different subscriptions for publications on different nodes can
    1340                 :  * receive same remote xid, so we need to use subscription id along with it.
    1341                 :  *
    1342                 :  * Returns the name in the supplied buffer.
    1343                 :  */
    1344                 : static void
    1345              27 : pa_savepoint_name(Oid suboid, TransactionId xid, char *spname, Size szsp)
    1346                 : {
    1347              27 :     snprintf(spname, szsp, "pg_sp_%u_%u", suboid, xid);
    1348              27 : }
    1349                 : 
    1350                 : /*
    1351                 :  * Define a savepoint for a subxact in parallel apply worker if needed.
    1352                 :  *
    1353                 :  * The parallel apply worker can figure out if a new subtransaction was
    1354                 :  * started by checking if the new change arrived with a different xid. In that
    1355                 :  * case define a named savepoint, so that we are able to rollback to it
    1356                 :  * if required.
    1357                 :  */
    1358                 : void
    1359           68393 : pa_start_subtrans(TransactionId current_xid, TransactionId top_xid)
    1360                 : {
    1361           68393 :     if (current_xid != top_xid &&
    1362              52 :         !list_member_xid(subxactlist, current_xid))
    1363                 :     {
    1364                 :         MemoryContext oldctx;
    1365                 :         char        spname[NAMEDATALEN];
    1366                 : 
    1367              17 :         pa_savepoint_name(MySubscription->oid, current_xid,
    1368                 :                           spname, sizeof(spname));
    1369                 : 
    1370              17 :         elog(DEBUG1, "defining savepoint %s in logical replication parallel apply worker", spname);
    1371                 : 
    1372                 :         /* We must be in transaction block to define the SAVEPOINT. */
    1373              17 :         if (!IsTransactionBlock())
    1374                 :         {
    1375               5 :             if (!IsTransactionState())
    1376 UNC           0 :                 StartTransactionCommand();
    1377                 : 
    1378 GNC           5 :             BeginTransactionBlock();
    1379               5 :             CommitTransactionCommand();
    1380                 :         }
    1381                 : 
    1382              17 :         DefineSavepoint(spname);
    1383                 : 
    1384                 :         /*
    1385                 :          * CommitTransactionCommand is needed to start a subtransaction after
    1386                 :          * issuing a SAVEPOINT inside a transaction block (see
    1387                 :          * StartSubTransaction()).
    1388                 :          */
    1389              17 :         CommitTransactionCommand();
    1390                 : 
    1391              17 :         oldctx = MemoryContextSwitchTo(TopTransactionContext);
    1392              17 :         subxactlist = lappend_xid(subxactlist, current_xid);
    1393              17 :         MemoryContextSwitchTo(oldctx);
    1394                 :     }
    1395           68393 : }
    1396                 : 
    1397                 : /* Reset the list that maintains subtransactions. */
    1398                 : void
    1399              24 : pa_reset_subtrans(void)
    1400                 : {
    1401                 :     /*
    1402                 :      * We don't need to free this explicitly as the allocated memory will be
    1403                 :      * freed at the transaction end.
    1404                 :      */
    1405              24 :     subxactlist = NIL;
    1406              24 : }
    1407                 : 
    1408                 : /*
    1409                 :  * Handle STREAM ABORT message when the transaction was applied in a parallel
    1410                 :  * apply worker.
    1411                 :  */
    1412                 : void
    1413              12 : pa_stream_abort(LogicalRepStreamAbortData *abort_data)
    1414                 : {
    1415              12 :     TransactionId xid = abort_data->xid;
    1416              12 :     TransactionId subxid = abort_data->subxid;
    1417                 : 
    1418                 :     /*
    1419                 :      * Update origin state so we can restart streaming from correct position
    1420                 :      * in case of crash.
    1421                 :      */
    1422              12 :     replorigin_session_origin_lsn = abort_data->abort_lsn;
    1423              12 :     replorigin_session_origin_timestamp = abort_data->abort_time;
    1424                 : 
    1425                 :     /*
    1426                 :      * If the two XIDs are the same, it's in fact abort of toplevel xact, so
    1427                 :      * just free the subxactlist.
    1428                 :      */
    1429              12 :     if (subxid == xid)
    1430                 :     {
    1431               2 :         pa_set_xact_state(MyParallelShared, PARALLEL_TRANS_FINISHED);
    1432                 : 
    1433                 :         /*
    1434                 :          * Release the lock as we might be processing an empty streaming
    1435                 :          * transaction in which case the lock won't be released during
    1436                 :          * transaction rollback.
    1437                 :          *
    1438                 :          * Note that it's ok to release the transaction lock before aborting
    1439                 :          * the transaction because even if the parallel apply worker dies due
    1440                 :          * to crash or some other reason, such a transaction would still be
    1441                 :          * considered aborted.
    1442                 :          */
    1443               2 :         pa_unlock_transaction(xid, AccessExclusiveLock);
    1444                 : 
    1445               2 :         AbortCurrentTransaction();
    1446                 : 
    1447               2 :         if (IsTransactionBlock())
    1448                 :         {
    1449               1 :             EndTransactionBlock(false);
    1450               1 :             CommitTransactionCommand();
    1451                 :         }
    1452                 : 
    1453               2 :         pa_reset_subtrans();
    1454                 : 
    1455               2 :         pgstat_report_activity(STATE_IDLE, NULL);
    1456                 :     }
    1457                 :     else
    1458                 :     {
    1459                 :         /* OK, so it's a subxact. Rollback to the savepoint. */
    1460                 :         int         i;
    1461                 :         char        spname[NAMEDATALEN];
    1462                 : 
    1463              10 :         pa_savepoint_name(MySubscription->oid, subxid, spname, sizeof(spname));
    1464                 : 
    1465              10 :         elog(DEBUG1, "rolling back to savepoint %s in logical replication parallel apply worker", spname);
    1466                 : 
    1467                 :         /*
    1468                 :          * Search the subxactlist, determine the offset tracked for the
    1469                 :          * subxact, and truncate the list.
    1470                 :          *
    1471                 :          * Note that for an empty sub-transaction we won't find the subxid
    1472                 :          * here.
    1473                 :          */
    1474              12 :         for (i = list_length(subxactlist) - 1; i >= 0; i--)
    1475                 :         {
    1476              11 :             TransactionId xid_tmp = lfirst_xid(list_nth_cell(subxactlist, i));
    1477                 : 
    1478              11 :             if (xid_tmp == subxid)
    1479                 :             {
    1480               9 :                 RollbackToSavepoint(spname);
    1481               9 :                 CommitTransactionCommand();
    1482               9 :                 subxactlist = list_truncate(subxactlist, i);
    1483               9 :                 break;
    1484                 :             }
    1485                 :         }
    1486                 :     }
    1487              12 : }
    1488                 : 
    1489                 : /*
    1490                 :  * Set the fileset state for a particular parallel apply worker. The fileset
    1491                 :  * will be set once the leader worker serialized all changes to the file
    1492                 :  * so that it can be used by parallel apply worker.
    1493                 :  */
    1494                 : void
    1495              16 : pa_set_fileset_state(ParallelApplyWorkerShared *wshared,
    1496                 :                      PartialFileSetState fileset_state)
    1497                 : {
    1498              16 :     SpinLockAcquire(&wshared->mutex);
    1499              16 :     wshared->fileset_state = fileset_state;
    1500                 : 
    1501              16 :     if (fileset_state == FS_SERIALIZE_DONE)
    1502                 :     {
    1503               4 :         Assert(am_leader_apply_worker());
    1504               4 :         Assert(MyLogicalRepWorker->stream_fileset);
    1505               4 :         wshared->fileset = *MyLogicalRepWorker->stream_fileset;
    1506                 :     }
    1507                 : 
    1508              16 :     SpinLockRelease(&wshared->mutex);
    1509              16 : }
    1510                 : 
    1511                 : /*
    1512                 :  * Get the fileset state for the current parallel apply worker.
    1513                 :  */
    1514                 : static PartialFileSetState
    1515              68 : pa_get_fileset_state(void)
    1516                 : {
    1517                 :     PartialFileSetState fileset_state;
    1518                 : 
    1519              68 :     Assert(am_parallel_apply_worker());
    1520                 : 
    1521              68 :     SpinLockAcquire(&MyParallelShared->mutex);
    1522              68 :     fileset_state = MyParallelShared->fileset_state;
    1523              68 :     SpinLockRelease(&MyParallelShared->mutex);
    1524                 : 
    1525              68 :     return fileset_state;
    1526                 : }
    1527                 : 
    1528                 : /*
    1529                 :  * Helper functions to acquire and release a lock for each stream block.
    1530                 :  *
    1531                 :  * Set locktag_field4 to PARALLEL_APPLY_LOCK_STREAM to indicate that it's a
    1532                 :  * stream lock.
    1533                 :  *
    1534                 :  * Refer to the comments atop this file to see how the stream lock is used.
    1535                 :  */
    1536                 : void
    1537             275 : pa_lock_stream(TransactionId xid, LOCKMODE lockmode)
    1538                 : {
    1539             275 :     LockApplyTransactionForSession(MyLogicalRepWorker->subid, xid,
    1540                 :                                    PARALLEL_APPLY_LOCK_STREAM, lockmode);
    1541             273 : }
    1542                 : 
    1543                 : void
    1544             271 : pa_unlock_stream(TransactionId xid, LOCKMODE lockmode)
    1545                 : {
    1546             271 :     UnlockApplyTransactionForSession(MyLogicalRepWorker->subid, xid,
    1547                 :                                      PARALLEL_APPLY_LOCK_STREAM, lockmode);
    1548             271 : }
    1549                 : 
    1550                 : /*
    1551                 :  * Helper functions to acquire and release a lock for each local transaction
    1552                 :  * apply.
    1553                 :  *
    1554                 :  * Set locktag_field4 to PARALLEL_APPLY_LOCK_XACT to indicate that it's a
    1555                 :  * transaction lock.
    1556                 :  *
    1557                 :  * Note that all the callers must pass a remote transaction ID instead of a
    1558                 :  * local transaction ID as xid. This is because the local transaction ID will
    1559                 :  * only be assigned while applying the first change in the parallel apply but
    1560                 :  * it's possible that the first change in the parallel apply worker is blocked
    1561                 :  * by a concurrently executing transaction in another parallel apply worker. We
    1562                 :  * can only communicate the local transaction id to the leader after applying
    1563                 :  * the first change so it won't be able to wait after sending the xact finish
    1564                 :  * command using this lock.
    1565                 :  *
    1566                 :  * Refer to the comments atop this file to see how the transaction lock is
    1567                 :  * used.
    1568                 :  */
    1569                 : void
    1570              52 : pa_lock_transaction(TransactionId xid, LOCKMODE lockmode)
    1571                 : {
    1572              52 :     LockApplyTransactionForSession(MyLogicalRepWorker->subid, xid,
    1573                 :                                    PARALLEL_APPLY_LOCK_XACT, lockmode);
    1574              51 : }
    1575                 : 
    1576                 : void
    1577              48 : pa_unlock_transaction(TransactionId xid, LOCKMODE lockmode)
    1578                 : {
    1579              48 :     UnlockApplyTransactionForSession(MyLogicalRepWorker->subid, xid,
    1580                 :                                      PARALLEL_APPLY_LOCK_XACT, lockmode);
    1581              48 : }
    1582                 : 
    1583                 : /*
    1584                 :  * Decrement the number of pending streaming blocks and wait on the stream lock
    1585                 :  * if there is no pending block available.
    1586                 :  */
    1587                 : void
    1588             251 : pa_decr_and_wait_stream_block(void)
    1589                 : {
    1590             251 :     Assert(am_parallel_apply_worker());
    1591                 : 
    1592                 :     /*
    1593                 :      * It is only possible to not have any pending stream chunks when we are
    1594                 :      * applying spooled messages.
    1595                 :      */
    1596             251 :     if (pg_atomic_read_u32(&MyParallelShared->pending_stream_count) == 0)
    1597                 :     {
    1598              16 :         if (pa_has_spooled_message_pending())
    1599              16 :             return;
    1600                 : 
    1601 UNC           0 :         elog(ERROR, "invalid pending streaming chunk 0");
    1602                 :     }
    1603                 : 
    1604 GNC         235 :     if (pg_atomic_sub_fetch_u32(&MyParallelShared->pending_stream_count, 1) == 0)
    1605                 :     {
    1606              25 :         pa_lock_stream(MyParallelShared->xid, AccessShareLock);
    1607              23 :         pa_unlock_stream(MyParallelShared->xid, AccessShareLock);
    1608                 :     }
    1609                 : }
    1610                 : 
    1611                 : /*
    1612                 :  * Finish processing the streaming transaction in the leader apply worker.
    1613                 :  */
    1614                 : void
    1615              25 : pa_xact_finish(ParallelApplyWorkerInfo *winfo, XLogRecPtr remote_lsn)
    1616                 : {
    1617              25 :     Assert(am_leader_apply_worker());
    1618                 : 
    1619                 :     /*
    1620                 :      * Unlock the shared object lock so that parallel apply worker can
    1621                 :      * continue to receive and apply changes.
    1622                 :      */
    1623              25 :     pa_unlock_stream(winfo->shared->xid, AccessExclusiveLock);
    1624                 : 
    1625                 :     /*
    1626                 :      * Wait for that worker to finish. This is necessary to maintain commit
    1627                 :      * order which avoids failures due to transaction dependencies and
    1628                 :      * deadlocks.
    1629                 :      */
    1630              25 :     pa_wait_for_xact_finish(winfo);
    1631                 : 
    1632              24 :     if (!XLogRecPtrIsInvalid(remote_lsn))
    1633              22 :         store_flush_position(remote_lsn, winfo->shared->last_commit_end);
    1634                 : 
    1635              24 :     pa_free_worker(winfo);
    1636              24 : }
        

Generated by: LCOV version v1.16-55-g56c0a2a