Age Owner TLA Line data Source code
1 : /*-------------------------------------------------------------------------
2 : * applyparallelworker.c
3 : * Support routines for applying xact by parallel apply worker
4 : *
5 : * Copyright (c) 2023, PostgreSQL Global Development Group
6 : *
7 : * IDENTIFICATION
8 : * src/backend/replication/logical/applyparallelworker.c
9 : *
10 : * This file contains the code to launch, set up, and teardown a parallel apply
11 : * worker which receives the changes from the leader worker and invokes routines
12 : * to apply those on the subscriber database. Additionally, this file contains
13 : * routines that are intended to support setting up, using, and tearing down a
14 : * ParallelApplyWorkerInfo which is required so the leader worker and parallel
15 : * apply workers can communicate with each other.
16 : *
17 : * The parallel apply workers are assigned (if available) as soon as xact's
18 : * first stream is received for subscriptions that have set their 'streaming'
19 : * option as parallel. The leader apply worker will send changes to this new
20 : * worker via shared memory. We keep this worker assigned till the transaction
21 : * commit is received and also wait for the worker to finish at commit. This
22 : * preserves commit ordering and avoid file I/O in most cases, although we
23 : * still need to spill to a file if there is no worker available. See comments
24 : * atop logical/worker to know more about streamed xacts whose changes are
25 : * spilled to disk. It is important to maintain commit order to avoid failures
26 : * due to: (a) transaction dependencies - say if we insert a row in the first
27 : * transaction and update it in the second transaction on publisher then
28 : * allowing the subscriber to apply both in parallel can lead to failure in the
29 : * update; (b) deadlocks - allowing transactions that update the same set of
30 : * rows/tables in the opposite order to be applied in parallel can lead to
31 : * deadlocks.
32 : *
33 : * A worker pool is used to avoid restarting workers for each streaming
34 : * transaction. We maintain each worker's information (ParallelApplyWorkerInfo)
35 : * in the ParallelApplyWorkerPool. After successfully launching a new worker,
36 : * its information is added to the ParallelApplyWorkerPool. Once the worker
37 : * finishes applying the transaction, it is marked as available for re-use.
38 : * Now, before starting a new worker to apply the streaming transaction, we
39 : * check the list for any available worker. Note that we retain a maximum of
40 : * half the max_parallel_apply_workers_per_subscription workers in the pool and
41 : * after that, we simply exit the worker after applying the transaction.
42 : *
43 : * XXX This worker pool threshold is arbitrary and we can provide a GUC
44 : * variable for this in the future if required.
45 : *
46 : * The leader apply worker will create a separate dynamic shared memory segment
47 : * when each parallel apply worker starts. The reason for this design is that
48 : * we cannot predict how many workers will be needed. It may be possible to
49 : * allocate enough shared memory in one segment based on the maximum number of
50 : * parallel apply workers (max_parallel_apply_workers_per_subscription), but
51 : * this would waste memory if no process is actually started.
52 : *
53 : * The dynamic shared memory segment contains: (a) a shm_mq that is used to
54 : * send changes in the transaction from leader apply worker to parallel apply
55 : * worker; (b) another shm_mq that is used to send errors (and other messages
56 : * reported via elog/ereport) from the parallel apply worker to leader apply
57 : * worker; (c) necessary information to be shared among parallel apply workers
58 : * and the leader apply worker (i.e. members of ParallelApplyWorkerShared).
59 : *
60 : * Locking Considerations
61 : * ----------------------
62 : * We have a risk of deadlock due to concurrently applying the transactions in
63 : * parallel mode that were independent on the publisher side but became
64 : * dependent on the subscriber side due to the different database structures
65 : * (like schema of subscription tables, constraints, etc.) on each side. This
66 : * can happen even without parallel mode when there are concurrent operations
67 : * on the subscriber. In order to detect the deadlocks among leader (LA) and
68 : * parallel apply (PA) workers, we used lmgr locks when the PA waits for the
69 : * next stream (set of changes) and LA waits for PA to finish the transaction.
70 : * An alternative approach could be to not allow parallelism when the schema of
71 : * tables is different between the publisher and subscriber but that would be
72 : * too restrictive and would require the publisher to send much more
73 : * information than it is currently sending.
74 : *
75 : * Consider a case where the subscribed table does not have a unique key on the
76 : * publisher and has a unique key on the subscriber. The deadlock can happen in
77 : * the following ways:
78 : *
79 : * 1) Deadlock between the leader apply worker and a parallel apply worker
80 : *
81 : * Consider that the parallel apply worker (PA) is executing TX-1 and the
82 : * leader apply worker (LA) is executing TX-2 concurrently on the subscriber.
83 : * Now, LA is waiting for PA because of the unique key constraint of the
84 : * subscribed table while PA is waiting for LA to send the next stream of
85 : * changes or transaction finish command message.
86 : *
87 : * In order for lmgr to detect this, we have LA acquire a session lock on the
88 : * remote transaction (by pa_lock_stream()) and have PA wait on the lock before
89 : * trying to receive the next stream of changes. Specifically, LA will acquire
90 : * the lock in AccessExclusive mode before sending the STREAM_STOP and will
91 : * release it if already acquired after sending the STREAM_START, STREAM_ABORT
92 : * (for toplevel transaction), STREAM_PREPARE, and STREAM_COMMIT. The PA will
93 : * acquire the lock in AccessShare mode after processing STREAM_STOP and
94 : * STREAM_ABORT (for subtransaction) and then release the lock immediately
95 : * after acquiring it.
96 : *
97 : * The lock graph for the above example will look as follows:
98 : * LA (waiting to acquire the lock on the unique index) -> PA (waiting to
99 : * acquire the stream lock) -> LA
100 : *
101 : * This way, when PA is waiting for LA for the next stream of changes, we can
102 : * have a wait-edge from PA to LA in lmgr, which will make us detect the
103 : * deadlock between LA and PA.
104 : *
105 : * 2) Deadlock between the leader apply worker and parallel apply workers
106 : *
107 : * This scenario is similar to the first case but TX-1 and TX-2 are executed by
108 : * two parallel apply workers (PA-1 and PA-2 respectively). In this scenario,
109 : * PA-2 is waiting for PA-1 to complete its transaction while PA-1 is waiting
110 : * for subsequent input from LA. Also, LA is waiting for PA-2 to complete its
111 : * transaction in order to preserve the commit order. There is a deadlock among
112 : * the three processes.
113 : *
114 : * In order for lmgr to detect this, we have PA acquire a session lock (this is
115 : * a different lock than referred in the previous case, see
116 : * pa_lock_transaction()) on the transaction being applied and have LA wait on
117 : * the lock before proceeding in the transaction finish commands. Specifically,
118 : * PA will acquire this lock in AccessExclusive mode before executing the first
119 : * message of the transaction and release it at the xact end. LA will acquire
120 : * this lock in AccessShare mode at transaction finish commands and release it
121 : * immediately.
122 : *
123 : * The lock graph for the above example will look as follows:
124 : * LA (waiting to acquire the transaction lock) -> PA-2 (waiting to acquire the
125 : * lock due to unique index constraint) -> PA-1 (waiting to acquire the stream
126 : * lock) -> LA
127 : *
128 : * This way when LA is waiting to finish the transaction end command to preserve
129 : * the commit order, we will be able to detect deadlock, if any.
130 : *
131 : * One might think we can use XactLockTableWait(), but XactLockTableWait()
132 : * considers PREPARED TRANSACTION as still in progress which means the lock
133 : * won't be released even after the parallel apply worker has prepared the
134 : * transaction.
135 : *
136 : * 3) Deadlock when the shm_mq buffer is full
137 : *
138 : * In the previous scenario (ie. PA-1 and PA-2 are executing transactions
139 : * concurrently), if the shm_mq buffer between LA and PA-2 is full, LA has to
140 : * wait to send messages, and this wait doesn't appear in lmgr.
141 : *
142 : * To avoid this wait, we use a non-blocking write and wait with a timeout. If
143 : * the timeout is exceeded, the LA will serialize all the pending messages to
144 : * a file and indicate PA-2 that it needs to read that file for the remaining
145 : * messages. Then LA will start waiting for commit as in the previous case
146 : * which will detect deadlock if any. See pa_send_data() and
147 : * enum TransApplyAction.
148 : *
149 : * Lock types
150 : * ----------
151 : * Both the stream lock and the transaction lock mentioned above are
152 : * session-level locks because both locks could be acquired outside the
153 : * transaction, and the stream lock in the leader needs to persist across
154 : * transaction boundaries i.e. until the end of the streaming transaction.
155 : *-------------------------------------------------------------------------
156 : */
157 :
158 : #include "postgres.h"
159 :
160 : #include "libpq/pqformat.h"
161 : #include "libpq/pqmq.h"
162 : #include "pgstat.h"
163 : #include "postmaster/interrupt.h"
164 : #include "replication/logicallauncher.h"
165 : #include "replication/logicalworker.h"
166 : #include "replication/origin.h"
167 : #include "replication/worker_internal.h"
168 : #include "storage/ipc.h"
169 : #include "storage/lmgr.h"
170 : #include "tcop/tcopprot.h"
171 : #include "utils/inval.h"
172 : #include "utils/memutils.h"
173 : #include "utils/syscache.h"
174 :
175 : #define PG_LOGICAL_APPLY_SHM_MAGIC 0x787ca067
176 :
177 : /*
178 : * DSM keys for parallel apply worker. Unlike other parallel execution code,
179 : * since we don't need to worry about DSM keys conflicting with plan_node_id we
180 : * can use small integers.
181 : */
182 : #define PARALLEL_APPLY_KEY_SHARED 1
183 : #define PARALLEL_APPLY_KEY_MQ 2
184 : #define PARALLEL_APPLY_KEY_ERROR_QUEUE 3
185 :
186 : /* Queue size of DSM, 16 MB for now. */
187 : #define DSM_QUEUE_SIZE (16 * 1024 * 1024)
188 :
189 : /*
190 : * Error queue size of DSM. It is desirable to make it large enough that a
191 : * typical ErrorResponse can be sent without blocking. That way, a worker that
192 : * errors out can write the whole message into the queue and terminate without
193 : * waiting for the user backend.
194 : */
195 : #define DSM_ERROR_QUEUE_SIZE (16 * 1024)
196 :
197 : /*
198 : * There are three fields in each message received by the parallel apply
199 : * worker: start_lsn, end_lsn and send_time. Because we have updated these
200 : * statistics in the leader apply worker, we can ignore these fields in the
201 : * parallel apply worker (see function LogicalRepApplyLoop).
202 : */
203 : #define SIZE_STATS_MESSAGE (2 * sizeof(XLogRecPtr) + sizeof(TimestampTz))
204 :
205 : /*
206 : * The type of session-level lock on a transaction being applied on a logical
207 : * replication subscriber.
208 : */
209 : #define PARALLEL_APPLY_LOCK_STREAM 0
210 : #define PARALLEL_APPLY_LOCK_XACT 1
211 :
212 : /*
213 : * Hash table entry to map xid to the parallel apply worker state.
214 : */
215 : typedef struct ParallelApplyWorkerEntry
216 : {
217 : TransactionId xid; /* Hash key -- must be first */
218 : ParallelApplyWorkerInfo *winfo;
219 : } ParallelApplyWorkerEntry;
220 :
221 : /*
222 : * A hash table used to cache the state of streaming transactions being applied
223 : * by the parallel apply workers.
224 : */
225 : static HTAB *ParallelApplyTxnHash = NULL;
226 :
227 : /*
228 : * A list (pool) of active parallel apply workers. The information for
229 : * the new worker is added to the list after successfully launching it. The
230 : * list entry is removed if there are already enough workers in the worker
231 : * pool at the end of the transaction. For more information about the worker
232 : * pool, see comments atop this file.
233 : */
234 : static List *ParallelApplyWorkerPool = NIL;
235 :
236 : /*
237 : * Information shared between leader apply worker and parallel apply worker.
238 : */
239 : ParallelApplyWorkerShared *MyParallelShared = NULL;
240 :
241 : /*
242 : * Is there a message sent by a parallel apply worker that the leader apply
243 : * worker needs to receive?
244 : */
245 : volatile sig_atomic_t ParallelApplyMessagePending = false;
246 :
247 : /*
248 : * Cache the parallel apply worker information required for applying the
249 : * current streaming transaction. It is used to save the cost of searching the
250 : * hash table when applying the changes between STREAM_START and STREAM_STOP.
251 : */
252 : static ParallelApplyWorkerInfo *stream_apply_worker = NULL;
253 :
254 : /* A list to maintain subtransactions, if any. */
255 : static List *subxactlist = NIL;
256 :
257 : static void pa_free_worker_info(ParallelApplyWorkerInfo *winfo);
258 : static ParallelTransState pa_get_xact_state(ParallelApplyWorkerShared *wshared);
259 : static PartialFileSetState pa_get_fileset_state(void);
260 :
261 : /*
262 : * Returns true if it is OK to start a parallel apply worker, false otherwise.
263 : */
264 : static bool
90 akapila 265 GNC 82 : pa_can_start(void)
266 : {
267 : /* Only leader apply workers can start parallel apply workers. */
268 82 : if (!am_leader_apply_worker())
269 27 : return false;
270 :
271 : /*
272 : * It is good to check for any change in the subscription parameter to
273 : * avoid the case where for a very long time the change doesn't get
274 : * reflected. This can happen when there is a constant flow of streaming
275 : * transactions that are handled by parallel apply workers.
276 : *
277 : * It is better to do it before the below checks so that the latest values
278 : * of subscription can be used for the checks.
279 : */
280 55 : maybe_reread_subscription();
281 :
282 : /*
283 : * Don't start a new parallel apply worker if the subscription is not
284 : * using parallel streaming mode, or if the publisher does not support
285 : * parallel apply.
286 : */
287 55 : if (!MyLogicalRepWorker->parallel_apply)
288 28 : return false;
289 :
290 : /*
291 : * Don't start a new parallel worker if user has set skiplsn as it's
292 : * possible that they want to skip the streaming transaction. For
293 : * streaming transactions, we need to serialize the transaction to a file
294 : * so that we can get the last LSN of the transaction to judge whether to
295 : * skip before starting to apply the change.
296 : *
297 : * One might think that we could allow parallelism if the first lsn of the
298 : * transaction is greater than skiplsn, but we don't send it with the
299 : * STREAM START message, and it doesn't seem worth sending the extra eight
300 : * bytes with the STREAM START to enable parallelism for this case.
301 : */
302 27 : if (!XLogRecPtrIsInvalid(MySubscription->skiplsn))
90 akapila 303 UNC 0 : return false;
304 :
305 : /*
306 : * For streaming transactions that are being applied using a parallel
307 : * apply worker, we cannot decide whether to apply the change for a
308 : * relation that is not in the READY state (see
309 : * should_apply_changes_for_rel) as we won't know remote_final_lsn by that
310 : * time. So, we don't start the new parallel apply worker in this case.
311 : */
90 akapila 312 GNC 27 : if (!AllTablesyncsReady())
90 akapila 313 UNC 0 : return false;
314 :
90 akapila 315 GNC 27 : return true;
316 : }
317 :
318 : /*
319 : * Set up a dynamic shared memory segment.
320 : *
321 : * We set up a control region that contains a fixed-size worker info
322 : * (ParallelApplyWorkerShared), a message queue, and an error queue.
323 : *
324 : * Returns true on success, false on failure.
325 : */
326 : static bool
327 10 : pa_setup_dsm(ParallelApplyWorkerInfo *winfo)
328 : {
329 : shm_toc_estimator e;
330 : Size segsize;
331 : dsm_segment *seg;
332 : shm_toc *toc;
333 : ParallelApplyWorkerShared *shared;
334 : shm_mq *mq;
335 10 : Size queue_size = DSM_QUEUE_SIZE;
336 10 : Size error_queue_size = DSM_ERROR_QUEUE_SIZE;
337 :
338 : /*
339 : * Estimate how much shared memory we need.
340 : *
341 : * Because the TOC machinery may choose to insert padding of oddly-sized
342 : * requests, we must estimate each chunk separately.
343 : *
344 : * We need one key to register the location of the header, and two other
345 : * keys to track the locations of the message queue and the error message
346 : * queue.
347 : */
348 10 : shm_toc_initialize_estimator(&e);
349 10 : shm_toc_estimate_chunk(&e, sizeof(ParallelApplyWorkerShared));
350 10 : shm_toc_estimate_chunk(&e, queue_size);
351 10 : shm_toc_estimate_chunk(&e, error_queue_size);
352 :
353 10 : shm_toc_estimate_keys(&e, 3);
354 10 : segsize = shm_toc_estimate(&e);
355 :
356 : /* Create the shared memory segment and establish a table of contents. */
357 10 : seg = dsm_create(shm_toc_estimate(&e), 0);
358 10 : if (!seg)
90 akapila 359 UNC 0 : return false;
360 :
90 akapila 361 GNC 10 : toc = shm_toc_create(PG_LOGICAL_APPLY_SHM_MAGIC, dsm_segment_address(seg),
362 : segsize);
363 :
364 : /* Set up the header region. */
365 10 : shared = shm_toc_allocate(toc, sizeof(ParallelApplyWorkerShared));
366 10 : SpinLockInit(&shared->mutex);
367 :
368 10 : shared->xact_state = PARALLEL_TRANS_UNKNOWN;
369 10 : pg_atomic_init_u32(&(shared->pending_stream_count), 0);
370 10 : shared->last_commit_end = InvalidXLogRecPtr;
371 10 : shared->fileset_state = FS_EMPTY;
372 :
373 10 : shm_toc_insert(toc, PARALLEL_APPLY_KEY_SHARED, shared);
374 :
375 : /* Set up message queue for the worker. */
376 10 : mq = shm_mq_create(shm_toc_allocate(toc, queue_size), queue_size);
377 10 : shm_toc_insert(toc, PARALLEL_APPLY_KEY_MQ, mq);
378 10 : shm_mq_set_sender(mq, MyProc);
379 :
380 : /* Attach the queue. */
381 10 : winfo->mq_handle = shm_mq_attach(mq, seg, NULL);
382 :
383 : /* Set up error queue for the worker. */
384 10 : mq = shm_mq_create(shm_toc_allocate(toc, error_queue_size),
385 : error_queue_size);
386 10 : shm_toc_insert(toc, PARALLEL_APPLY_KEY_ERROR_QUEUE, mq);
387 10 : shm_mq_set_receiver(mq, MyProc);
388 :
389 : /* Attach the queue. */
390 10 : winfo->error_mq_handle = shm_mq_attach(mq, seg, NULL);
391 :
392 : /* Return results to caller. */
393 10 : winfo->dsm_seg = seg;
394 10 : winfo->shared = shared;
395 :
396 10 : return true;
397 : }
398 :
399 : /*
400 : * Try to get a parallel apply worker from the pool. If none is available then
401 : * start a new one.
402 : */
403 : static ParallelApplyWorkerInfo *
404 27 : pa_launch_parallel_worker(void)
405 : {
406 : MemoryContext oldcontext;
407 : bool launched;
408 : ParallelApplyWorkerInfo *winfo;
409 : ListCell *lc;
410 :
411 : /* Try to get an available parallel apply worker from the worker pool. */
412 29 : foreach(lc, ParallelApplyWorkerPool)
413 : {
414 19 : winfo = (ParallelApplyWorkerInfo *) lfirst(lc);
415 :
416 19 : if (!winfo->in_use)
417 17 : return winfo;
418 : }
419 :
420 : /*
421 : * Start a new parallel apply worker.
422 : *
423 : * The worker info can be used for the lifetime of the worker process, so
424 : * create it in a permanent context.
425 : */
426 10 : oldcontext = MemoryContextSwitchTo(ApplyContext);
427 :
428 10 : winfo = (ParallelApplyWorkerInfo *) palloc0(sizeof(ParallelApplyWorkerInfo));
429 :
430 : /* Setup shared memory. */
431 10 : if (!pa_setup_dsm(winfo))
432 : {
90 akapila 433 UNC 0 : MemoryContextSwitchTo(oldcontext);
434 0 : pfree(winfo);
435 0 : return NULL;
436 : }
437 :
90 akapila 438 GNC 10 : launched = logicalrep_worker_launch(MyLogicalRepWorker->dbid,
439 10 : MySubscription->oid,
440 10 : MySubscription->name,
441 10 : MyLogicalRepWorker->userid,
442 : InvalidOid,
443 : dsm_segment_handle(winfo->dsm_seg));
444 :
445 10 : if (launched)
446 : {
447 10 : ParallelApplyWorkerPool = lappend(ParallelApplyWorkerPool, winfo);
448 : }
449 : else
450 : {
90 akapila 451 UNC 0 : pa_free_worker_info(winfo);
452 0 : winfo = NULL;
453 : }
454 :
90 akapila 455 GNC 10 : MemoryContextSwitchTo(oldcontext);
456 :
457 10 : return winfo;
458 : }
459 :
460 : /*
461 : * Allocate a parallel apply worker that will be used for the specified xid.
462 : *
463 : * We first try to get an available worker from the pool, if any and then try
464 : * to launch a new worker. On successful allocation, remember the worker
465 : * information in the hash table so that we can get it later for processing the
466 : * streaming changes.
467 : */
468 : void
469 82 : pa_allocate_worker(TransactionId xid)
470 : {
471 : bool found;
472 82 : ParallelApplyWorkerInfo *winfo = NULL;
473 : ParallelApplyWorkerEntry *entry;
474 :
475 82 : if (!pa_can_start())
476 55 : return;
477 :
86 478 27 : winfo = pa_launch_parallel_worker();
479 27 : if (!winfo)
86 akapila 480 UNC 0 : return;
481 :
482 : /* First time through, initialize parallel apply worker state hashtable. */
90 akapila 483 GNC 27 : if (!ParallelApplyTxnHash)
484 : {
485 : HASHCTL ctl;
486 :
487 91 : MemSet(&ctl, 0, sizeof(ctl));
488 7 : ctl.keysize = sizeof(TransactionId);
489 7 : ctl.entrysize = sizeof(ParallelApplyWorkerEntry);
490 7 : ctl.hcxt = ApplyContext;
491 :
492 7 : ParallelApplyTxnHash = hash_create("logical replication parallel apply workers hash",
493 : 16, &ctl,
494 : HASH_ELEM | HASH_BLOBS | HASH_CONTEXT);
495 : }
496 :
497 : /* Create an entry for the requested transaction. */
498 27 : entry = hash_search(ParallelApplyTxnHash, &xid, HASH_ENTER, &found);
499 27 : if (found)
90 akapila 500 UNC 0 : elog(ERROR, "hash table corrupted");
501 :
502 : /* Update the transaction information in shared memory. */
90 akapila 503 GNC 27 : SpinLockAcquire(&winfo->shared->mutex);
504 27 : winfo->shared->xact_state = PARALLEL_TRANS_UNKNOWN;
505 27 : winfo->shared->xid = xid;
506 27 : SpinLockRelease(&winfo->shared->mutex);
507 :
508 27 : winfo->in_use = true;
509 27 : winfo->serialize_changes = false;
510 27 : entry->winfo = winfo;
511 27 : entry->xid = xid;
512 : }
513 :
514 : /*
515 : * Find the assigned worker for the given transaction, if any.
516 : */
517 : ParallelApplyWorkerInfo *
518 256993 : pa_find_worker(TransactionId xid)
519 : {
520 : bool found;
521 : ParallelApplyWorkerEntry *entry;
522 :
523 256993 : if (!TransactionIdIsValid(xid))
524 79826 : return NULL;
525 :
526 177167 : if (!ParallelApplyTxnHash)
527 103236 : return NULL;
528 :
529 : /* Return the cached parallel apply worker if valid. */
530 73931 : if (stream_apply_worker)
531 73644 : return stream_apply_worker;
532 :
533 : /* Find an entry for the requested transaction. */
534 287 : entry = hash_search(ParallelApplyTxnHash, &xid, HASH_FIND, &found);
535 287 : if (found)
536 : {
537 : /* The worker must not have exited. */
538 287 : Assert(entry->winfo->in_use);
539 287 : return entry->winfo;
540 : }
541 :
90 akapila 542 UNC 0 : return NULL;
543 : }
544 :
545 : /*
546 : * Makes the worker available for reuse.
547 : *
548 : * This removes the parallel apply worker entry from the hash table so that it
549 : * can't be used. If there are enough workers in the pool, it stops the worker
550 : * and frees the corresponding info. Otherwise it just marks the worker as
551 : * available for reuse.
552 : *
553 : * For more information about the worker pool, see comments atop this file.
554 : */
555 : static void
90 akapila 556 GNC 24 : pa_free_worker(ParallelApplyWorkerInfo *winfo)
557 : {
558 24 : Assert(!am_parallel_apply_worker());
559 24 : Assert(winfo->in_use);
560 24 : Assert(pa_get_xact_state(winfo->shared) == PARALLEL_TRANS_FINISHED);
561 :
562 24 : if (!hash_search(ParallelApplyTxnHash, &winfo->shared->xid, HASH_REMOVE, NULL))
90 akapila 563 UNC 0 : elog(ERROR, "hash table corrupted");
564 :
565 : /*
566 : * Stop the worker if there are enough workers in the pool.
567 : *
568 : * XXX Additionally, we also stop the worker if the leader apply worker
569 : * serialize part of the transaction data due to a send timeout. This is
570 : * because the message could be partially written to the queue and there
571 : * is no way to clean the queue other than resending the message until it
572 : * succeeds. Instead of trying to send the data which anyway would have
573 : * been serialized and then letting the parallel apply worker deal with
574 : * the spurious message, we stop the worker.
575 : */
90 akapila 576 GNC 24 : if (winfo->serialize_changes ||
577 20 : list_length(ParallelApplyWorkerPool) >
578 20 : (max_parallel_apply_workers_per_subscription / 2))
579 : {
580 : int slot_no;
581 : uint16 generation;
582 :
583 5 : SpinLockAcquire(&winfo->shared->mutex);
584 5 : generation = winfo->shared->logicalrep_worker_generation;
585 5 : slot_no = winfo->shared->logicalrep_worker_slot_no;
586 5 : SpinLockRelease(&winfo->shared->mutex);
587 :
588 5 : logicalrep_pa_worker_stop(slot_no, generation);
589 :
590 5 : pa_free_worker_info(winfo);
591 :
592 5 : return;
593 : }
594 :
595 19 : winfo->in_use = false;
596 19 : winfo->serialize_changes = false;
597 : }
598 :
599 : /*
600 : * Free the parallel apply worker information and unlink the files with
601 : * serialized changes if any.
602 : */
603 : static void
604 5 : pa_free_worker_info(ParallelApplyWorkerInfo *winfo)
605 : {
606 5 : Assert(winfo);
607 :
608 5 : if (winfo->mq_handle)
609 5 : shm_mq_detach(winfo->mq_handle);
610 :
611 5 : if (winfo->error_mq_handle)
612 5 : shm_mq_detach(winfo->error_mq_handle);
613 :
614 : /* Unlink the files with serialized changes. */
615 5 : if (winfo->serialize_changes)
616 4 : stream_cleanup_files(MyLogicalRepWorker->subid, winfo->shared->xid);
617 :
618 5 : if (winfo->dsm_seg)
619 5 : dsm_detach(winfo->dsm_seg);
620 :
621 : /* Remove from the worker pool. */
622 5 : ParallelApplyWorkerPool = list_delete_ptr(ParallelApplyWorkerPool, winfo);
623 :
624 5 : pfree(winfo);
625 5 : }
626 :
627 : /*
628 : * Detach the error queue for all parallel apply workers.
629 : */
630 : void
631 148 : pa_detach_all_error_mq(void)
632 : {
633 : ListCell *lc;
634 :
635 153 : foreach(lc, ParallelApplyWorkerPool)
636 : {
637 5 : ParallelApplyWorkerInfo *winfo = (ParallelApplyWorkerInfo *) lfirst(lc);
638 :
639 5 : shm_mq_detach(winfo->error_mq_handle);
640 5 : winfo->error_mq_handle = NULL;
641 : }
642 148 : }
643 :
644 : /*
645 : * Check if there are any pending spooled messages.
646 : */
647 : static bool
648 16 : pa_has_spooled_message_pending()
649 : {
650 : PartialFileSetState fileset_state;
651 :
652 16 : fileset_state = pa_get_fileset_state();
653 :
654 16 : return (fileset_state != FS_EMPTY);
655 : }
656 :
657 : /*
658 : * Replay the spooled messages once the leader apply worker has finished
659 : * serializing changes to the file.
660 : *
661 : * Returns false if there aren't any pending spooled messages, true otherwise.
662 : */
663 : static bool
664 52 : pa_process_spooled_messages_if_required(void)
665 : {
666 : PartialFileSetState fileset_state;
667 :
668 52 : fileset_state = pa_get_fileset_state();
669 :
670 52 : if (fileset_state == FS_EMPTY)
671 44 : return false;
672 :
673 : /*
674 : * If the leader apply worker is busy serializing the partial changes then
675 : * acquire the stream lock now and wait for the leader worker to finish
676 : * serializing the changes. Otherwise, the parallel apply worker won't get
677 : * a chance to receive a STREAM_STOP (and acquire the stream lock) until
678 : * the leader had serialized all changes which can lead to undetected
679 : * deadlock.
680 : *
681 : * Note that the fileset state can be FS_SERIALIZE_DONE once the leader
682 : * worker has finished serializing the changes.
683 : */
684 8 : if (fileset_state == FS_SERIALIZE_IN_PROGRESS)
685 : {
90 akapila 686 UNC 0 : pa_lock_stream(MyParallelShared->xid, AccessShareLock);
687 0 : pa_unlock_stream(MyParallelShared->xid, AccessShareLock);
688 :
689 0 : fileset_state = pa_get_fileset_state();
690 : }
691 :
692 : /*
693 : * We cannot read the file immediately after the leader has serialized all
694 : * changes to the file because there may still be messages in the memory
695 : * queue. We will apply all spooled messages the next time we call this
696 : * function and that will ensure there are no messages left in the memory
697 : * queue.
698 : */
90 akapila 699 GNC 8 : if (fileset_state == FS_SERIALIZE_DONE)
700 : {
701 4 : pa_set_fileset_state(MyParallelShared, FS_READY);
702 : }
703 4 : else if (fileset_state == FS_READY)
704 : {
705 4 : apply_spooled_messages(&MyParallelShared->fileset,
706 4 : MyParallelShared->xid,
707 : InvalidXLogRecPtr);
708 4 : pa_set_fileset_state(MyParallelShared, FS_EMPTY);
709 : }
710 :
711 8 : return true;
712 : }
713 :
714 : /*
715 : * Interrupt handler for main loop of parallel apply worker.
716 : */
717 : static void
718 63928 : ProcessParallelApplyInterrupts(void)
719 : {
720 63928 : CHECK_FOR_INTERRUPTS();
721 :
722 63926 : if (ShutdownRequestPending)
723 : {
724 5 : ereport(LOG,
725 : (errmsg("logical replication parallel apply worker for subscription \"%s\" has finished",
726 : MySubscription->name)));
727 :
728 5 : proc_exit(0);
729 : }
730 :
731 63921 : if (ConfigReloadPending)
732 : {
733 4 : ConfigReloadPending = false;
734 4 : ProcessConfigFile(PGC_SIGHUP);
735 : }
736 63921 : }
737 :
738 : /* Parallel apply worker main loop. */
739 : static void
740 10 : LogicalParallelApplyLoop(shm_mq_handle *mqh)
741 : {
742 : shm_mq_result shmq_res;
743 : ErrorContextCallback errcallback;
744 10 : MemoryContext oldcxt = CurrentMemoryContext;
745 :
746 : /*
747 : * Init the ApplyMessageContext which we clean up after each replication
748 : * protocol message.
749 : */
750 10 : ApplyMessageContext = AllocSetContextCreate(ApplyContext,
751 : "ApplyMessageContext",
752 : ALLOCSET_DEFAULT_SIZES);
753 :
754 : /*
755 : * Push apply error context callback. Fields will be filled while applying
756 : * a change.
757 : */
758 10 : errcallback.callback = apply_error_callback;
759 10 : errcallback.previous = error_context_stack;
760 10 : error_context_stack = &errcallback;
761 :
762 : for (;;)
763 63918 : {
764 : void *data;
765 : Size len;
766 :
767 63928 : ProcessParallelApplyInterrupts();
768 :
769 : /* Ensure we are reading the data into our memory context. */
770 63921 : MemoryContextSwitchTo(ApplyMessageContext);
771 :
772 63921 : shmq_res = shm_mq_receive(mqh, &len, &data, true);
773 :
774 63921 : if (shmq_res == SHM_MQ_SUCCESS)
775 : {
776 : StringInfoData s;
777 : int c;
778 :
779 63869 : if (len == 0)
90 akapila 780 UNC 0 : elog(ERROR, "invalid message length");
781 :
90 akapila 782 GNC 63869 : s.cursor = 0;
783 63869 : s.maxlen = -1;
784 63869 : s.data = (char *) data;
785 63869 : s.len = len;
786 :
787 : /*
788 : * The first byte of messages sent from leader apply worker to
789 : * parallel apply workers can only be 'w'.
790 : */
791 63869 : c = pq_getmsgbyte(&s);
792 63869 : if (c != 'w')
90 akapila 793 UNC 0 : elog(ERROR, "unexpected message \"%c\"", c);
794 :
795 : /*
796 : * Ignore statistics fields that have been updated by the leader
797 : * apply worker.
798 : *
799 : * XXX We can avoid sending the statistics fields from the leader
800 : * apply worker but for that, it needs to rebuild the entire
801 : * message by removing these fields which could be more work than
802 : * simply ignoring these fields in the parallel apply worker.
803 : */
90 akapila 804 GNC 63869 : s.cursor += SIZE_STATS_MESSAGE;
805 :
806 63869 : apply_dispatch(&s);
807 : }
808 52 : else if (shmq_res == SHM_MQ_WOULD_BLOCK)
809 : {
810 : /* Replay the changes from the file, if any. */
811 52 : if (!pa_process_spooled_messages_if_required())
812 : {
813 : int rc;
814 :
815 : /* Wait for more work. */
816 44 : rc = WaitLatch(MyLatch,
817 : WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH,
818 : 1000L,
819 : WAIT_EVENT_LOGICAL_PARALLEL_APPLY_MAIN);
820 :
821 44 : if (rc & WL_LATCH_SET)
822 41 : ResetLatch(MyLatch);
823 : }
824 : }
825 : else
826 : {
90 akapila 827 UNC 0 : Assert(shmq_res == SHM_MQ_DETACHED);
828 :
829 0 : ereport(ERROR,
830 : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
831 : errmsg("lost connection to the logical replication apply worker")));
832 : }
833 :
90 akapila 834 GNC 63918 : MemoryContextReset(ApplyMessageContext);
835 63918 : MemoryContextSwitchTo(oldcxt);
836 : }
837 :
838 : /* Pop the error context stack. */
839 : error_context_stack = errcallback.previous;
840 :
841 : MemoryContextSwitchTo(oldcxt);
842 : }
843 :
844 : /*
845 : * Make sure the leader apply worker tries to read from our error queue one more
846 : * time. This guards against the case where we exit uncleanly without sending
847 : * an ErrorResponse, for example because some code calls proc_exit directly.
848 : */
849 : static void
850 10 : pa_shutdown(int code, Datum arg)
851 : {
81 852 10 : SendProcSignal(MyLogicalRepWorker->leader_pid,
853 : PROCSIG_PARALLEL_APPLY_MESSAGE,
854 : InvalidBackendId);
855 :
90 856 10 : dsm_detach((dsm_segment *) DatumGetPointer(arg));
857 10 : }
858 :
859 : /*
860 : * Parallel apply worker entry point.
861 : */
862 : void
863 10 : ParallelApplyWorkerMain(Datum main_arg)
864 : {
865 : ParallelApplyWorkerShared *shared;
866 : dsm_handle handle;
867 : dsm_segment *seg;
868 : shm_toc *toc;
869 : shm_mq *mq;
870 : shm_mq_handle *mqh;
871 : shm_mq_handle *error_mqh;
872 : RepOriginId originid;
873 10 : int worker_slot = DatumGetInt32(main_arg);
874 : char originname[NAMEDATALEN];
875 :
876 : /* Setup signal handling. */
877 10 : pqsignal(SIGHUP, SignalHandlerForConfigReload);
878 10 : pqsignal(SIGINT, SignalHandlerForShutdownRequest);
879 10 : pqsignal(SIGTERM, die);
880 10 : BackgroundWorkerUnblockSignals();
881 :
882 : /*
883 : * Attach to the dynamic shared memory segment for the parallel apply, and
884 : * find its table of contents.
885 : *
886 : * Like parallel query, we don't need resource owner by this time. See
887 : * ParallelWorkerMain.
888 : */
889 10 : memcpy(&handle, MyBgworkerEntry->bgw_extra, sizeof(dsm_handle));
890 10 : seg = dsm_attach(handle);
891 10 : if (!seg)
90 akapila 892 UNC 0 : ereport(ERROR,
893 : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
894 : errmsg("unable to map dynamic shared memory segment")));
895 :
90 akapila 896 GNC 10 : toc = shm_toc_attach(PG_LOGICAL_APPLY_SHM_MAGIC, dsm_segment_address(seg));
897 10 : if (!toc)
90 akapila 898 UNC 0 : ereport(ERROR,
899 : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
900 : errmsg("bad magic number in dynamic shared memory segment")));
901 :
90 akapila 902 GNC 10 : before_shmem_exit(pa_shutdown, PointerGetDatum(seg));
903 :
904 : /* Look up the shared information. */
905 10 : shared = shm_toc_lookup(toc, PARALLEL_APPLY_KEY_SHARED, false);
906 10 : MyParallelShared = shared;
907 :
908 : /*
909 : * Attach to the message queue.
910 : */
911 10 : mq = shm_toc_lookup(toc, PARALLEL_APPLY_KEY_MQ, false);
912 10 : shm_mq_set_receiver(mq, MyProc);
913 10 : mqh = shm_mq_attach(mq, seg, NULL);
914 :
915 : /*
916 : * Primary initialization is complete. Now, we can attach to our slot.
917 : * This is to ensure that the leader apply worker does not write data to
918 : * the uninitialized memory queue.
919 : */
920 10 : logicalrep_worker_attach(worker_slot);
921 :
922 10 : SpinLockAcquire(&MyParallelShared->mutex);
923 10 : MyParallelShared->logicalrep_worker_generation = MyLogicalRepWorker->generation;
924 10 : MyParallelShared->logicalrep_worker_slot_no = worker_slot;
925 10 : SpinLockRelease(&MyParallelShared->mutex);
926 :
927 : /*
928 : * Attach to the error queue.
929 : */
930 10 : mq = shm_toc_lookup(toc, PARALLEL_APPLY_KEY_ERROR_QUEUE, false);
931 10 : shm_mq_set_sender(mq, MyProc);
932 10 : error_mqh = shm_mq_attach(mq, seg, NULL);
933 :
934 10 : pq_redirect_to_shm_mq(seg, error_mqh);
81 935 10 : pq_set_parallel_leader(MyLogicalRepWorker->leader_pid,
936 : InvalidBackendId);
937 :
90 938 10 : MyLogicalRepWorker->last_send_time = MyLogicalRepWorker->last_recv_time =
939 10 : MyLogicalRepWorker->reply_time = 0;
940 :
941 10 : InitializeApplyWorker();
942 :
943 : /* Setup replication origin tracking. */
944 10 : StartTransactionCommand();
945 10 : ReplicationOriginNameForLogicalRep(MySubscription->oid, InvalidOid,
946 : originname, sizeof(originname));
947 10 : originid = replorigin_by_name(originname, false);
948 :
949 : /*
950 : * The parallel apply worker doesn't need to monopolize this replication
951 : * origin which was already acquired by its leader process.
952 : */
81 953 10 : replorigin_session_setup(originid, MyLogicalRepWorker->leader_pid);
90 954 10 : replorigin_session_origin = originid;
955 10 : CommitTransactionCommand();
956 :
957 : /*
958 : * Setup callback for syscache so that we know when something changes in
959 : * the subscription relation state.
960 : */
961 10 : CacheRegisterSyscacheCallback(SUBSCRIPTIONRELMAP,
962 : invalidate_syncing_table_states,
963 : (Datum) 0);
964 :
965 10 : set_apply_error_context_origin(originname);
966 :
967 10 : LogicalParallelApplyLoop(mqh);
968 :
969 : /*
970 : * The parallel apply worker must not get here because the parallel apply
971 : * worker will only stop when it receives a SIGTERM or SIGINT from the
972 : * leader, or when there is an error. None of these cases will allow the
973 : * code to reach here.
974 : */
90 akapila 975 UNC 0 : Assert(false);
976 : }
977 :
978 : /*
979 : * Handle receipt of an interrupt indicating a parallel apply worker message.
980 : *
981 : * Note: this is called within a signal handler! All we can do is set a flag
982 : * that will cause the next CHECK_FOR_INTERRUPTS() to invoke
983 : * HandleParallelApplyMessages().
984 : */
985 : void
90 akapila 986 GNC 5 : HandleParallelApplyMessageInterrupt(void)
987 : {
988 5 : InterruptPending = true;
989 5 : ParallelApplyMessagePending = true;
990 5 : SetLatch(MyLatch);
991 5 : }
992 :
993 : /*
994 : * Handle a single protocol message received from a single parallel apply
995 : * worker.
996 : */
997 : static void
998 1 : HandleParallelApplyMessage(StringInfo msg)
999 : {
1000 : char msgtype;
1001 :
1002 1 : msgtype = pq_getmsgbyte(msg);
1003 :
1004 1 : switch (msgtype)
1005 : {
1006 1 : case 'E': /* ErrorResponse */
1007 : {
1008 : ErrorData edata;
1009 :
1010 : /* Parse ErrorResponse. */
1011 1 : pq_parse_errornotice(msg, &edata);
1012 :
1013 : /*
1014 : * If desired, add a context line to show that this is a
1015 : * message propagated from a parallel apply worker. Otherwise,
1016 : * it can sometimes be confusing to understand what actually
1017 : * happened.
1018 : */
1019 1 : if (edata.context)
1020 1 : edata.context = psprintf("%s\n%s", edata.context,
1021 : _("logical replication parallel apply worker"));
1022 : else
90 akapila 1023 UNC 0 : edata.context = pstrdup(_("logical replication parallel apply worker"));
1024 :
1025 : /*
1026 : * Context beyond that should use the error context callbacks
1027 : * that were in effect in LogicalRepApplyLoop().
1028 : */
90 akapila 1029 GNC 1 : error_context_stack = apply_error_context_stack;
1030 :
1031 : /*
1032 : * The actual error must have been reported by the parallel
1033 : * apply worker.
1034 : */
1035 1 : ereport(ERROR,
1036 : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1037 : errmsg("logical replication parallel apply worker exited due to error"),
1038 : errcontext("%s", edata.context)));
1039 : }
1040 :
1041 : /*
1042 : * Don't need to do anything about NoticeResponse and
1043 : * NotifyResponse as the logical replication worker doesn't need
1044 : * to send messages to the client.
1045 : */
90 akapila 1046 UNC 0 : case 'N':
1047 : case 'A':
1048 0 : break;
1049 :
1050 0 : default:
1051 0 : elog(ERROR, "unrecognized message type received from logical replication parallel apply worker: %c (message length %d bytes)",
1052 : msgtype, msg->len);
1053 : }
1054 0 : }
1055 :
1056 : /*
1057 : * Handle any queued protocol messages received from parallel apply workers.
1058 : */
1059 : void
90 akapila 1060 GNC 1 : HandleParallelApplyMessages(void)
1061 : {
1062 : ListCell *lc;
1063 : MemoryContext oldcontext;
1064 :
1065 : static MemoryContext hpam_context = NULL;
1066 :
1067 : /*
1068 : * This is invoked from ProcessInterrupts(), and since some of the
1069 : * functions it calls contain CHECK_FOR_INTERRUPTS(), there is a potential
1070 : * for recursive calls if more signals are received while this runs. It's
1071 : * unclear that recursive entry would be safe, and it doesn't seem useful
1072 : * even if it is safe, so let's block interrupts until done.
1073 : */
1074 1 : HOLD_INTERRUPTS();
1075 :
1076 : /*
1077 : * Moreover, CurrentMemoryContext might be pointing almost anywhere. We
1078 : * don't want to risk leaking data into long-lived contexts, so let's do
1079 : * our work here in a private context that we can reset on each use.
1080 : */
1081 1 : if (!hpam_context) /* first time through? */
1082 1 : hpam_context = AllocSetContextCreate(TopMemoryContext,
1083 : "HandleParallelApplyMessages",
1084 : ALLOCSET_DEFAULT_SIZES);
1085 : else
90 akapila 1086 UNC 0 : MemoryContextReset(hpam_context);
1087 :
90 akapila 1088 GNC 1 : oldcontext = MemoryContextSwitchTo(hpam_context);
1089 :
1090 1 : ParallelApplyMessagePending = false;
1091 :
1092 1 : foreach(lc, ParallelApplyWorkerPool)
1093 : {
1094 : shm_mq_result res;
1095 : Size nbytes;
1096 : void *data;
1097 1 : ParallelApplyWorkerInfo *winfo = (ParallelApplyWorkerInfo *) lfirst(lc);
1098 :
1099 : /*
1100 : * The leader will detach from the error queue and set it to NULL
1101 : * before preparing to stop all parallel apply workers, so we don't
1102 : * need to handle error messages anymore. See
1103 : * logicalrep_worker_detach.
1104 : */
1105 1 : if (!winfo->error_mq_handle)
90 akapila 1106 UNC 0 : continue;
1107 :
90 akapila 1108 GNC 1 : res = shm_mq_receive(winfo->error_mq_handle, &nbytes, &data, true);
1109 :
1110 1 : if (res == SHM_MQ_WOULD_BLOCK)
90 akapila 1111 UNC 0 : continue;
90 akapila 1112 GNC 1 : else if (res == SHM_MQ_SUCCESS)
1113 : {
1114 : StringInfoData msg;
1115 :
1116 1 : initStringInfo(&msg);
1117 1 : appendBinaryStringInfo(&msg, data, nbytes);
1118 1 : HandleParallelApplyMessage(&msg);
90 akapila 1119 UNC 0 : pfree(msg.data);
1120 : }
1121 : else
1122 0 : ereport(ERROR,
1123 : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1124 : errmsg("lost connection to the logical replication parallel apply worker")));
1125 : }
1126 :
1127 0 : MemoryContextSwitchTo(oldcontext);
1128 :
1129 : /* Might as well clear the context on our way out */
1130 0 : MemoryContextReset(hpam_context);
1131 :
1132 0 : RESUME_INTERRUPTS();
1133 0 : }
1134 :
1135 : /*
1136 : * Send the data to the specified parallel apply worker via shared-memory
1137 : * queue.
1138 : *
1139 : * Returns false if the attempt to send data via shared memory times out, true
1140 : * otherwise.
1141 : */
1142 : bool
90 akapila 1143 GNC 68894 : pa_send_data(ParallelApplyWorkerInfo *winfo, Size nbytes, const void *data)
1144 : {
1145 : int rc;
1146 : shm_mq_result result;
1147 68894 : TimestampTz startTime = 0;
1148 :
1149 68894 : Assert(!IsTransactionState());
1150 68894 : Assert(!winfo->serialize_changes);
1151 :
1152 : /*
1153 : * We don't try to send data to parallel worker for 'immediate' mode. This
1154 : * is primarily used for testing purposes.
1155 : */
66 1156 68894 : if (unlikely(logical_replication_mode == LOGICAL_REP_MODE_IMMEDIATE))
1157 4 : return false;
1158 :
1159 : /*
1160 : * This timeout is a bit arbitrary but testing revealed that it is sufficient
1161 : * to send the message unless the parallel apply worker is waiting on some
1162 : * lock or there is a serious resource crunch. See the comments atop this file
1163 : * to know why we are using a non-blocking way to send the message.
1164 : */
1165 : #define SHM_SEND_RETRY_INTERVAL_MS 1000
1166 : #define SHM_SEND_TIMEOUT_MS (10000 - SHM_SEND_RETRY_INTERVAL_MS)
1167 :
1168 : for (;;)
1169 : {
90 1170 68890 : result = shm_mq_send(winfo->mq_handle, nbytes, data, true, true);
1171 :
1172 68890 : if (result == SHM_MQ_SUCCESS)
1173 68890 : return true;
90 akapila 1174 UNC 0 : else if (result == SHM_MQ_DETACHED)
1175 0 : ereport(ERROR,
1176 : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1177 : errmsg("could not send data to shared-memory queue")));
1178 :
1179 0 : Assert(result == SHM_MQ_WOULD_BLOCK);
1180 :
1181 : /* Wait before retrying. */
1182 0 : rc = WaitLatch(MyLatch,
1183 : WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH,
1184 : SHM_SEND_RETRY_INTERVAL_MS,
1185 : WAIT_EVENT_LOGICAL_APPLY_SEND_DATA);
1186 :
1187 0 : if (rc & WL_LATCH_SET)
1188 : {
1189 0 : ResetLatch(MyLatch);
1190 0 : CHECK_FOR_INTERRUPTS();
1191 : }
1192 :
1193 0 : if (startTime == 0)
1194 0 : startTime = GetCurrentTimestamp();
1195 0 : else if (TimestampDifferenceExceeds(startTime, GetCurrentTimestamp(),
1196 : SHM_SEND_TIMEOUT_MS))
1197 0 : return false;
1198 : }
1199 : }
1200 :
1201 : /*
1202 : * Switch to PARTIAL_SERIALIZE mode for the current transaction -- this means
1203 : * that the current data and any subsequent data for this transaction will be
1204 : * serialized to a file. This is done to prevent possible deadlocks with
1205 : * another parallel apply worker (refer to the comments atop this file).
1206 : */
1207 : void
90 akapila 1208 GNC 4 : pa_switch_to_partial_serialize(ParallelApplyWorkerInfo *winfo,
1209 : bool stream_locked)
1210 : {
66 1211 4 : ereport(LOG,
1212 : (errmsg("logical replication apply worker will serialize the remaining changes of remote transaction %u to a file",
1213 : winfo->shared->xid)));
1214 :
1215 : /*
1216 : * The parallel apply worker could be stuck for some reason (say waiting
1217 : * on some lock by other backend), so stop trying to send data directly to
1218 : * it and start serializing data to the file instead.
1219 : */
90 1220 4 : winfo->serialize_changes = true;
1221 :
1222 : /* Initialize the stream fileset. */
1223 4 : stream_start_internal(winfo->shared->xid, true);
1224 :
1225 : /*
1226 : * Acquires the stream lock if not already to make sure that the parallel
1227 : * apply worker will wait for the leader to release the stream lock until
1228 : * the end of the transaction.
1229 : */
1230 4 : if (!stream_locked)
1231 4 : pa_lock_stream(winfo->shared->xid, AccessExclusiveLock);
1232 :
1233 4 : pa_set_fileset_state(winfo->shared, FS_SERIALIZE_IN_PROGRESS);
1234 4 : }
1235 :
1236 : /*
1237 : * Wait until the parallel apply worker's transaction state has reached or
1238 : * exceeded the given xact_state.
1239 : */
1240 : static void
1241 25 : pa_wait_for_xact_state(ParallelApplyWorkerInfo *winfo,
1242 : ParallelTransState xact_state)
1243 : {
1244 : for (;;)
1245 : {
1246 : /*
1247 : * Stop if the transaction state has reached or exceeded the given
1248 : * xact_state.
1249 : */
1250 291 : if (pa_get_xact_state(winfo->shared) >= xact_state)
1251 25 : break;
1252 :
1253 : /* Wait to be signalled. */
1254 266 : (void) WaitLatch(MyLatch,
1255 : WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH,
1256 : 10L,
1257 : WAIT_EVENT_LOGICAL_PARALLEL_APPLY_STATE_CHANGE);
1258 :
1259 : /* Reset the latch so we don't spin. */
1260 266 : ResetLatch(MyLatch);
1261 :
1262 : /* An interrupt may have occurred while we were waiting. */
1263 266 : CHECK_FOR_INTERRUPTS();
1264 : }
1265 25 : }
1266 :
1267 : /*
1268 : * Wait until the parallel apply worker's transaction finishes.
1269 : */
1270 : static void
1271 25 : pa_wait_for_xact_finish(ParallelApplyWorkerInfo *winfo)
1272 : {
1273 : /*
1274 : * Wait until the parallel apply worker set the state to
1275 : * PARALLEL_TRANS_STARTED which means it has acquired the transaction
1276 : * lock. This is to prevent leader apply worker from acquiring the
1277 : * transaction lock earlier than the parallel apply worker.
1278 : */
1279 25 : pa_wait_for_xact_state(winfo, PARALLEL_TRANS_STARTED);
1280 :
1281 : /*
1282 : * Wait for the transaction lock to be released. This is required to
1283 : * detect deadlock among leader and parallel apply workers. Refer to the
1284 : * comments atop this file.
1285 : */
1286 25 : pa_lock_transaction(winfo->shared->xid, AccessShareLock);
1287 24 : pa_unlock_transaction(winfo->shared->xid, AccessShareLock);
1288 :
1289 : /*
1290 : * Check if the state becomes PARALLEL_TRANS_FINISHED in case the parallel
1291 : * apply worker failed while applying changes causing the lock to be
1292 : * released.
1293 : */
1294 24 : if (pa_get_xact_state(winfo->shared) != PARALLEL_TRANS_FINISHED)
90 akapila 1295 UNC 0 : ereport(ERROR,
1296 : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1297 : errmsg("lost connection to the logical replication parallel apply worker")));
90 akapila 1298 GNC 24 : }
1299 :
1300 : /*
1301 : * Set the transaction state for a given parallel apply worker.
1302 : */
1303 : void
1304 51 : pa_set_xact_state(ParallelApplyWorkerShared *wshared,
1305 : ParallelTransState xact_state)
1306 : {
1307 51 : SpinLockAcquire(&wshared->mutex);
1308 51 : wshared->xact_state = xact_state;
1309 51 : SpinLockRelease(&wshared->mutex);
1310 51 : }
1311 :
1312 : /*
1313 : * Get the transaction state for a given parallel apply worker.
1314 : */
1315 : static ParallelTransState
1316 339 : pa_get_xact_state(ParallelApplyWorkerShared *wshared)
1317 : {
1318 : ParallelTransState xact_state;
1319 :
1320 339 : SpinLockAcquire(&wshared->mutex);
1321 339 : xact_state = wshared->xact_state;
1322 339 : SpinLockRelease(&wshared->mutex);
1323 :
1324 339 : return xact_state;
1325 : }
1326 :
1327 : /*
1328 : * Cache the parallel apply worker information.
1329 : */
1330 : void
1331 504 : pa_set_stream_apply_worker(ParallelApplyWorkerInfo *winfo)
1332 : {
1333 504 : stream_apply_worker = winfo;
1334 504 : }
1335 :
1336 : /*
1337 : * Form a unique savepoint name for the streaming transaction.
1338 : *
1339 : * Note that different subscriptions for publications on different nodes can
1340 : * receive same remote xid, so we need to use subscription id along with it.
1341 : *
1342 : * Returns the name in the supplied buffer.
1343 : */
1344 : static void
1345 27 : pa_savepoint_name(Oid suboid, TransactionId xid, char *spname, Size szsp)
1346 : {
1347 27 : snprintf(spname, szsp, "pg_sp_%u_%u", suboid, xid);
1348 27 : }
1349 :
1350 : /*
1351 : * Define a savepoint for a subxact in parallel apply worker if needed.
1352 : *
1353 : * The parallel apply worker can figure out if a new subtransaction was
1354 : * started by checking if the new change arrived with a different xid. In that
1355 : * case define a named savepoint, so that we are able to rollback to it
1356 : * if required.
1357 : */
1358 : void
1359 68393 : pa_start_subtrans(TransactionId current_xid, TransactionId top_xid)
1360 : {
1361 68393 : if (current_xid != top_xid &&
1362 52 : !list_member_xid(subxactlist, current_xid))
1363 : {
1364 : MemoryContext oldctx;
1365 : char spname[NAMEDATALEN];
1366 :
1367 17 : pa_savepoint_name(MySubscription->oid, current_xid,
1368 : spname, sizeof(spname));
1369 :
1370 17 : elog(DEBUG1, "defining savepoint %s in logical replication parallel apply worker", spname);
1371 :
1372 : /* We must be in transaction block to define the SAVEPOINT. */
1373 17 : if (!IsTransactionBlock())
1374 : {
1375 5 : if (!IsTransactionState())
90 akapila 1376 UNC 0 : StartTransactionCommand();
1377 :
90 akapila 1378 GNC 5 : BeginTransactionBlock();
1379 5 : CommitTransactionCommand();
1380 : }
1381 :
1382 17 : DefineSavepoint(spname);
1383 :
1384 : /*
1385 : * CommitTransactionCommand is needed to start a subtransaction after
1386 : * issuing a SAVEPOINT inside a transaction block (see
1387 : * StartSubTransaction()).
1388 : */
1389 17 : CommitTransactionCommand();
1390 :
1391 17 : oldctx = MemoryContextSwitchTo(TopTransactionContext);
1392 17 : subxactlist = lappend_xid(subxactlist, current_xid);
1393 17 : MemoryContextSwitchTo(oldctx);
1394 : }
1395 68393 : }
1396 :
1397 : /* Reset the list that maintains subtransactions. */
1398 : void
1399 24 : pa_reset_subtrans(void)
1400 : {
1401 : /*
1402 : * We don't need to free this explicitly as the allocated memory will be
1403 : * freed at the transaction end.
1404 : */
1405 24 : subxactlist = NIL;
1406 24 : }
1407 :
1408 : /*
1409 : * Handle STREAM ABORT message when the transaction was applied in a parallel
1410 : * apply worker.
1411 : */
1412 : void
1413 12 : pa_stream_abort(LogicalRepStreamAbortData *abort_data)
1414 : {
1415 12 : TransactionId xid = abort_data->xid;
1416 12 : TransactionId subxid = abort_data->subxid;
1417 :
1418 : /*
1419 : * Update origin state so we can restart streaming from correct position
1420 : * in case of crash.
1421 : */
1422 12 : replorigin_session_origin_lsn = abort_data->abort_lsn;
1423 12 : replorigin_session_origin_timestamp = abort_data->abort_time;
1424 :
1425 : /*
1426 : * If the two XIDs are the same, it's in fact abort of toplevel xact, so
1427 : * just free the subxactlist.
1428 : */
1429 12 : if (subxid == xid)
1430 : {
1431 2 : pa_set_xact_state(MyParallelShared, PARALLEL_TRANS_FINISHED);
1432 :
1433 : /*
1434 : * Release the lock as we might be processing an empty streaming
1435 : * transaction in which case the lock won't be released during
1436 : * transaction rollback.
1437 : *
1438 : * Note that it's ok to release the transaction lock before aborting
1439 : * the transaction because even if the parallel apply worker dies due
1440 : * to crash or some other reason, such a transaction would still be
1441 : * considered aborted.
1442 : */
1443 2 : pa_unlock_transaction(xid, AccessExclusiveLock);
1444 :
1445 2 : AbortCurrentTransaction();
1446 :
1447 2 : if (IsTransactionBlock())
1448 : {
1449 1 : EndTransactionBlock(false);
1450 1 : CommitTransactionCommand();
1451 : }
1452 :
1453 2 : pa_reset_subtrans();
1454 :
1455 2 : pgstat_report_activity(STATE_IDLE, NULL);
1456 : }
1457 : else
1458 : {
1459 : /* OK, so it's a subxact. Rollback to the savepoint. */
1460 : int i;
1461 : char spname[NAMEDATALEN];
1462 :
1463 10 : pa_savepoint_name(MySubscription->oid, subxid, spname, sizeof(spname));
1464 :
1465 10 : elog(DEBUG1, "rolling back to savepoint %s in logical replication parallel apply worker", spname);
1466 :
1467 : /*
1468 : * Search the subxactlist, determine the offset tracked for the
1469 : * subxact, and truncate the list.
1470 : *
1471 : * Note that for an empty sub-transaction we won't find the subxid
1472 : * here.
1473 : */
1474 12 : for (i = list_length(subxactlist) - 1; i >= 0; i--)
1475 : {
1476 11 : TransactionId xid_tmp = lfirst_xid(list_nth_cell(subxactlist, i));
1477 :
1478 11 : if (xid_tmp == subxid)
1479 : {
1480 9 : RollbackToSavepoint(spname);
1481 9 : CommitTransactionCommand();
1482 9 : subxactlist = list_truncate(subxactlist, i);
1483 9 : break;
1484 : }
1485 : }
1486 : }
1487 12 : }
1488 :
1489 : /*
1490 : * Set the fileset state for a particular parallel apply worker. The fileset
1491 : * will be set once the leader worker serialized all changes to the file
1492 : * so that it can be used by parallel apply worker.
1493 : */
1494 : void
1495 16 : pa_set_fileset_state(ParallelApplyWorkerShared *wshared,
1496 : PartialFileSetState fileset_state)
1497 : {
1498 16 : SpinLockAcquire(&wshared->mutex);
1499 16 : wshared->fileset_state = fileset_state;
1500 :
1501 16 : if (fileset_state == FS_SERIALIZE_DONE)
1502 : {
1503 4 : Assert(am_leader_apply_worker());
1504 4 : Assert(MyLogicalRepWorker->stream_fileset);
1505 4 : wshared->fileset = *MyLogicalRepWorker->stream_fileset;
1506 : }
1507 :
1508 16 : SpinLockRelease(&wshared->mutex);
1509 16 : }
1510 :
1511 : /*
1512 : * Get the fileset state for the current parallel apply worker.
1513 : */
1514 : static PartialFileSetState
1515 68 : pa_get_fileset_state(void)
1516 : {
1517 : PartialFileSetState fileset_state;
1518 :
1519 68 : Assert(am_parallel_apply_worker());
1520 :
1521 68 : SpinLockAcquire(&MyParallelShared->mutex);
1522 68 : fileset_state = MyParallelShared->fileset_state;
1523 68 : SpinLockRelease(&MyParallelShared->mutex);
1524 :
1525 68 : return fileset_state;
1526 : }
1527 :
1528 : /*
1529 : * Helper functions to acquire and release a lock for each stream block.
1530 : *
1531 : * Set locktag_field4 to PARALLEL_APPLY_LOCK_STREAM to indicate that it's a
1532 : * stream lock.
1533 : *
1534 : * Refer to the comments atop this file to see how the stream lock is used.
1535 : */
1536 : void
1537 275 : pa_lock_stream(TransactionId xid, LOCKMODE lockmode)
1538 : {
1539 275 : LockApplyTransactionForSession(MyLogicalRepWorker->subid, xid,
1540 : PARALLEL_APPLY_LOCK_STREAM, lockmode);
1541 273 : }
1542 :
1543 : void
1544 271 : pa_unlock_stream(TransactionId xid, LOCKMODE lockmode)
1545 : {
1546 271 : UnlockApplyTransactionForSession(MyLogicalRepWorker->subid, xid,
1547 : PARALLEL_APPLY_LOCK_STREAM, lockmode);
1548 271 : }
1549 :
1550 : /*
1551 : * Helper functions to acquire and release a lock for each local transaction
1552 : * apply.
1553 : *
1554 : * Set locktag_field4 to PARALLEL_APPLY_LOCK_XACT to indicate that it's a
1555 : * transaction lock.
1556 : *
1557 : * Note that all the callers must pass a remote transaction ID instead of a
1558 : * local transaction ID as xid. This is because the local transaction ID will
1559 : * only be assigned while applying the first change in the parallel apply but
1560 : * it's possible that the first change in the parallel apply worker is blocked
1561 : * by a concurrently executing transaction in another parallel apply worker. We
1562 : * can only communicate the local transaction id to the leader after applying
1563 : * the first change so it won't be able to wait after sending the xact finish
1564 : * command using this lock.
1565 : *
1566 : * Refer to the comments atop this file to see how the transaction lock is
1567 : * used.
1568 : */
1569 : void
1570 52 : pa_lock_transaction(TransactionId xid, LOCKMODE lockmode)
1571 : {
1572 52 : LockApplyTransactionForSession(MyLogicalRepWorker->subid, xid,
1573 : PARALLEL_APPLY_LOCK_XACT, lockmode);
1574 51 : }
1575 :
1576 : void
1577 48 : pa_unlock_transaction(TransactionId xid, LOCKMODE lockmode)
1578 : {
1579 48 : UnlockApplyTransactionForSession(MyLogicalRepWorker->subid, xid,
1580 : PARALLEL_APPLY_LOCK_XACT, lockmode);
1581 48 : }
1582 :
1583 : /*
1584 : * Decrement the number of pending streaming blocks and wait on the stream lock
1585 : * if there is no pending block available.
1586 : */
1587 : void
1588 251 : pa_decr_and_wait_stream_block(void)
1589 : {
1590 251 : Assert(am_parallel_apply_worker());
1591 :
1592 : /*
1593 : * It is only possible to not have any pending stream chunks when we are
1594 : * applying spooled messages.
1595 : */
1596 251 : if (pg_atomic_read_u32(&MyParallelShared->pending_stream_count) == 0)
1597 : {
1598 16 : if (pa_has_spooled_message_pending())
1599 16 : return;
1600 :
90 akapila 1601 UNC 0 : elog(ERROR, "invalid pending streaming chunk 0");
1602 : }
1603 :
90 akapila 1604 GNC 235 : if (pg_atomic_sub_fetch_u32(&MyParallelShared->pending_stream_count, 1) == 0)
1605 : {
1606 25 : pa_lock_stream(MyParallelShared->xid, AccessShareLock);
1607 23 : pa_unlock_stream(MyParallelShared->xid, AccessShareLock);
1608 : }
1609 : }
1610 :
1611 : /*
1612 : * Finish processing the streaming transaction in the leader apply worker.
1613 : */
1614 : void
1615 25 : pa_xact_finish(ParallelApplyWorkerInfo *winfo, XLogRecPtr remote_lsn)
1616 : {
1617 25 : Assert(am_leader_apply_worker());
1618 :
1619 : /*
1620 : * Unlock the shared object lock so that parallel apply worker can
1621 : * continue to receive and apply changes.
1622 : */
1623 25 : pa_unlock_stream(winfo->shared->xid, AccessExclusiveLock);
1624 :
1625 : /*
1626 : * Wait for that worker to finish. This is necessary to maintain commit
1627 : * order which avoids failures due to transaction dependencies and
1628 : * deadlocks.
1629 : */
1630 25 : pa_wait_for_xact_finish(winfo);
1631 :
1632 24 : if (!XLogRecPtrIsInvalid(remote_lsn))
1633 22 : store_flush_position(remote_lsn, winfo->shared->last_commit_end);
1634 :
1635 24 : pa_free_worker(winfo);
1636 24 : }
|