Age Owner TLA Line data Source code
1 : /*-------------------------------------------------------------------------
2 : * worker.c
3 : * PostgreSQL logical replication worker (apply)
4 : *
5 : * Copyright (c) 2016-2023, PostgreSQL Global Development Group
6 : *
7 : * IDENTIFICATION
8 : * src/backend/replication/logical/worker.c
9 : *
10 : * NOTES
11 : * This file contains the worker which applies logical changes as they come
12 : * from remote logical replication stream.
13 : *
14 : * The main worker (apply) is started by logical replication worker
15 : * launcher for every enabled subscription in a database. It uses
16 : * walsender protocol to communicate with publisher.
17 : *
18 : * This module includes server facing code and shares libpqwalreceiver
19 : * module with walreceiver for providing the libpq specific functionality.
20 : *
21 : *
22 : * STREAMED TRANSACTIONS
23 : * ---------------------
24 : * Streamed transactions (large transactions exceeding a memory limit on the
25 : * upstream) are applied using one of two approaches:
26 : *
27 : * 1) Write to temporary files and apply when the final commit arrives
28 : *
29 : * This approach is used when the user has set the subscription's streaming
30 : * option as on.
31 : *
32 : * Unlike the regular (non-streamed) case, handling streamed transactions has
33 : * to handle aborts of both the toplevel transaction and subtransactions. This
34 : * is achieved by tracking offsets for subtransactions, which is then used
35 : * to truncate the file with serialized changes.
36 : *
37 : * The files are placed in tmp file directory by default, and the filenames
38 : * include both the XID of the toplevel transaction and OID of the
39 : * subscription. This is necessary so that different workers processing a
40 : * remote transaction with the same XID doesn't interfere.
41 : *
42 : * We use BufFiles instead of using normal temporary files because (a) the
43 : * BufFile infrastructure supports temporary files that exceed the OS file size
44 : * limit, (b) provides a way for automatic clean up on the error and (c) provides
45 : * a way to survive these files across local transactions and allow to open and
46 : * close at stream start and close. We decided to use FileSet
47 : * infrastructure as without that it deletes the files on the closure of the
48 : * file and if we decide to keep stream files open across the start/stop stream
49 : * then it will consume a lot of memory (more than 8K for each BufFile and
50 : * there could be multiple such BufFiles as the subscriber could receive
51 : * multiple start/stop streams for different transactions before getting the
52 : * commit). Moreover, if we don't use FileSet then we also need to invent
53 : * a new way to pass filenames to BufFile APIs so that we are allowed to open
54 : * the file we desired across multiple stream-open calls for the same
55 : * transaction.
56 : *
57 : * 2) Parallel apply workers.
58 : *
59 : * This approach is used when the user has set the subscription's streaming
60 : * option as parallel. See logical/applyparallelworker.c for information about
61 : * this approach.
62 : *
63 : * TWO_PHASE TRANSACTIONS
64 : * ----------------------
65 : * Two phase transactions are replayed at prepare and then committed or
66 : * rolled back at commit prepared and rollback prepared respectively. It is
67 : * possible to have a prepared transaction that arrives at the apply worker
68 : * when the tablesync is busy doing the initial copy. In this case, the apply
69 : * worker skips all the prepared operations [e.g. inserts] while the tablesync
70 : * is still busy (see the condition of should_apply_changes_for_rel). The
71 : * tablesync worker might not get such a prepared transaction because say it
72 : * was prior to the initial consistent point but might have got some later
73 : * commits. Now, the tablesync worker will exit without doing anything for the
74 : * prepared transaction skipped by the apply worker as the sync location for it
75 : * will be already ahead of the apply worker's current location. This would lead
76 : * to an "empty prepare", because later when the apply worker does the commit
77 : * prepare, there is nothing in it (the inserts were skipped earlier).
78 : *
79 : * To avoid this, and similar prepare confusions the subscription's two_phase
80 : * commit is enabled only after the initial sync is over. The two_phase option
81 : * has been implemented as a tri-state with values DISABLED, PENDING, and
82 : * ENABLED.
83 : *
84 : * Even if the user specifies they want a subscription with two_phase = on,
85 : * internally it will start with a tri-state of PENDING which only becomes
86 : * ENABLED after all tablesync initializations are completed - i.e. when all
87 : * tablesync workers have reached their READY state. In other words, the value
88 : * PENDING is only a temporary state for subscription start-up.
89 : *
90 : * Until the two_phase is properly available (ENABLED) the subscription will
91 : * behave as if two_phase = off. When the apply worker detects that all
92 : * tablesyncs have become READY (while the tri-state was PENDING) it will
93 : * restart the apply worker process. This happens in
94 : * process_syncing_tables_for_apply.
95 : *
96 : * When the (re-started) apply worker finds that all tablesyncs are READY for a
97 : * two_phase tri-state of PENDING it start streaming messages with the
98 : * two_phase option which in turn enables the decoding of two-phase commits at
99 : * the publisher. Then, it updates the tri-state value from PENDING to ENABLED.
100 : * Now, it is possible that during the time we have not enabled two_phase, the
101 : * publisher (replication server) would have skipped some prepares but we
102 : * ensure that such prepares are sent along with commit prepare, see
103 : * ReorderBufferFinishPrepared.
104 : *
105 : * If the subscription has no tables then a two_phase tri-state PENDING is
106 : * left unchanged. This lets the user still do an ALTER SUBSCRIPTION REFRESH
107 : * PUBLICATION which might otherwise be disallowed (see below).
108 : *
109 : * If ever a user needs to be aware of the tri-state value, they can fetch it
110 : * from the pg_subscription catalog (see column subtwophasestate).
111 : *
112 : * We don't allow to toggle two_phase option of a subscription because it can
113 : * lead to an inconsistent replica. Consider, initially, it was on and we have
114 : * received some prepare then we turn it off, now at commit time the server
115 : * will send the entire transaction data along with the commit. With some more
116 : * analysis, we can allow changing this option from off to on but not sure if
117 : * that alone would be useful.
118 : *
119 : * Finally, to avoid problems mentioned in previous paragraphs from any
120 : * subsequent (not READY) tablesyncs (need to toggle two_phase option from 'on'
121 : * to 'off' and then again back to 'on') there is a restriction for
122 : * ALTER SUBSCRIPTION REFRESH PUBLICATION. This command is not permitted when
123 : * the two_phase tri-state is ENABLED, except when copy_data = false.
124 : *
125 : * We can get prepare of the same GID more than once for the genuine cases
126 : * where we have defined multiple subscriptions for publications on the same
127 : * server and prepared transaction has operations on tables subscribed to those
128 : * subscriptions. For such cases, if we use the GID sent by publisher one of
129 : * the prepares will be successful and others will fail, in which case the
130 : * server will send them again. Now, this can lead to a deadlock if user has
131 : * set synchronous_standby_names for all the subscriptions on subscriber. To
132 : * avoid such deadlocks, we generate a unique GID (consisting of the
133 : * subscription oid and the xid of the prepared transaction) for each prepare
134 : * transaction on the subscriber.
135 : *-------------------------------------------------------------------------
136 : */
137 :
138 : #include "postgres.h"
139 :
140 : #include <sys/stat.h>
141 : #include <unistd.h>
142 :
143 : #include "access/table.h"
144 : #include "access/tableam.h"
145 : #include "access/twophase.h"
146 : #include "access/xact.h"
147 : #include "access/xlog_internal.h"
148 : #include "catalog/catalog.h"
149 : #include "catalog/indexing.h"
150 : #include "catalog/namespace.h"
151 : #include "catalog/partition.h"
152 : #include "catalog/pg_inherits.h"
153 : #include "catalog/pg_subscription.h"
154 : #include "catalog/pg_subscription_rel.h"
155 : #include "catalog/pg_tablespace.h"
156 : #include "commands/tablecmds.h"
157 : #include "commands/tablespace.h"
158 : #include "commands/trigger.h"
159 : #include "executor/executor.h"
160 : #include "executor/execPartition.h"
161 : #include "executor/nodeModifyTable.h"
162 : #include "funcapi.h"
163 : #include "libpq/pqformat.h"
164 : #include "libpq/pqsignal.h"
165 : #include "mb/pg_wchar.h"
166 : #include "miscadmin.h"
167 : #include "nodes/makefuncs.h"
168 : #include "optimizer/optimizer.h"
169 : #include "parser/parse_relation.h"
170 : #include "pgstat.h"
171 : #include "postmaster/bgworker.h"
172 : #include "postmaster/interrupt.h"
173 : #include "postmaster/postmaster.h"
174 : #include "postmaster/walwriter.h"
175 : #include "replication/decode.h"
176 : #include "replication/logical.h"
177 : #include "replication/logicallauncher.h"
178 : #include "replication/logicalproto.h"
179 : #include "replication/logicalrelation.h"
180 : #include "replication/logicalworker.h"
181 : #include "replication/origin.h"
182 : #include "replication/reorderbuffer.h"
183 : #include "replication/snapbuild.h"
184 : #include "replication/walreceiver.h"
185 : #include "replication/worker_internal.h"
186 : #include "rewrite/rewriteHandler.h"
187 : #include "storage/buffile.h"
188 : #include "storage/bufmgr.h"
189 : #include "storage/fd.h"
190 : #include "storage/ipc.h"
191 : #include "storage/lmgr.h"
192 : #include "storage/proc.h"
193 : #include "storage/procarray.h"
194 : #include "tcop/tcopprot.h"
195 : #include "utils/acl.h"
196 : #include "utils/builtins.h"
197 : #include "utils/catcache.h"
198 : #include "utils/dynahash.h"
199 : #include "utils/datum.h"
200 : #include "utils/fmgroids.h"
201 : #include "utils/guc.h"
202 : #include "utils/inval.h"
203 : #include "utils/lsyscache.h"
204 : #include "utils/memutils.h"
205 : #include "utils/pg_lsn.h"
206 : #include "utils/rel.h"
207 : #include "utils/rls.h"
208 : #include "utils/syscache.h"
209 : #include "utils/timeout.h"
210 : #include "utils/usercontext.h"
211 :
212 : #define NAPTIME_PER_CYCLE 1000 /* max sleep time between cycles (1s) */
213 :
214 : typedef struct FlushPosition
215 : {
216 : dlist_node node;
217 : XLogRecPtr local_end;
218 : XLogRecPtr remote_end;
219 : } FlushPosition;
220 :
221 : static dlist_head lsn_mapping = DLIST_STATIC_INIT(lsn_mapping);
222 :
223 : typedef struct ApplyExecutionData
224 : {
225 : EState *estate; /* executor state, used to track resources */
226 :
227 : LogicalRepRelMapEntry *targetRel; /* replication target rel */
228 : ResultRelInfo *targetRelInfo; /* ResultRelInfo for same */
229 :
230 : /* These fields are used when the target relation is partitioned: */
231 : ModifyTableState *mtstate; /* dummy ModifyTable state */
232 : PartitionTupleRouting *proute; /* partition routing info */
233 : } ApplyExecutionData;
234 :
235 : /* Struct for saving and restoring apply errcontext information */
236 : typedef struct ApplyErrorCallbackArg
237 : {
238 : LogicalRepMsgType command; /* 0 if invalid */
239 : LogicalRepRelMapEntry *rel;
240 :
241 : /* Remote node information */
242 : int remote_attnum; /* -1 if invalid */
243 : TransactionId remote_xid;
244 : XLogRecPtr finish_lsn;
245 : char *origin_name;
246 : } ApplyErrorCallbackArg;
247 :
248 : /*
249 : * The action to be taken for the changes in the transaction.
250 : *
251 : * TRANS_LEADER_APPLY:
252 : * This action means that we are in the leader apply worker or table sync
253 : * worker. The changes of the transaction are either directly applied or
254 : * are read from temporary files (for streaming transactions) and then
255 : * applied by the worker.
256 : *
257 : * TRANS_LEADER_SERIALIZE:
258 : * This action means that we are in the leader apply worker or table sync
259 : * worker. Changes are written to temporary files and then applied when the
260 : * final commit arrives.
261 : *
262 : * TRANS_LEADER_SEND_TO_PARALLEL:
263 : * This action means that we are in the leader apply worker and need to send
264 : * the changes to the parallel apply worker.
265 : *
266 : * TRANS_LEADER_PARTIAL_SERIALIZE:
267 : * This action means that we are in the leader apply worker and have sent some
268 : * changes directly to the parallel apply worker and the remaining changes are
269 : * serialized to a file, due to timeout while sending data. The parallel apply
270 : * worker will apply these serialized changes when the final commit arrives.
271 : *
272 : * We can't use TRANS_LEADER_SERIALIZE for this case because, in addition to
273 : * serializing changes, the leader worker also needs to serialize the
274 : * STREAM_XXX message to a file, and wait for the parallel apply worker to
275 : * finish the transaction when processing the transaction finish command. So
276 : * this new action was introduced to keep the code and logic clear.
277 : *
278 : * TRANS_PARALLEL_APPLY:
279 : * This action means that we are in the parallel apply worker and changes of
280 : * the transaction are applied directly by the worker.
281 : */
282 : typedef enum
283 : {
284 : /* The action for non-streaming transactions. */
285 : TRANS_LEADER_APPLY,
286 :
287 : /* Actions for streaming transactions. */
288 : TRANS_LEADER_SERIALIZE,
289 : TRANS_LEADER_SEND_TO_PARALLEL,
290 : TRANS_LEADER_PARTIAL_SERIALIZE,
291 : TRANS_PARALLEL_APPLY
292 : } TransApplyAction;
293 :
294 : /* errcontext tracker */
295 : ApplyErrorCallbackArg apply_error_callback_arg =
296 : {
297 : .command = 0,
298 : .rel = NULL,
299 : .remote_attnum = -1,
300 : .remote_xid = InvalidTransactionId,
301 : .finish_lsn = InvalidXLogRecPtr,
302 : .origin_name = NULL,
303 : };
304 :
305 : ErrorContextCallback *apply_error_context_stack = NULL;
306 :
307 : MemoryContext ApplyMessageContext = NULL;
308 : MemoryContext ApplyContext = NULL;
309 :
310 : /* per stream context for streaming transactions */
311 : static MemoryContext LogicalStreamingContext = NULL;
312 :
313 : WalReceiverConn *LogRepWorkerWalRcvConn = NULL;
314 :
315 : Subscription *MySubscription = NULL;
316 : static bool MySubscriptionValid = false;
317 :
318 : static List *on_commit_wakeup_workers_subids = NIL;
319 :
320 : bool in_remote_transaction = false;
321 : static XLogRecPtr remote_final_lsn = InvalidXLogRecPtr;
322 :
323 : /* fields valid only when processing streamed transaction */
324 : static bool in_streamed_transaction = false;
325 :
326 : static TransactionId stream_xid = InvalidTransactionId;
327 :
328 : /*
329 : * The number of changes applied by parallel apply worker during one streaming
330 : * block.
331 : */
332 : static uint32 parallel_stream_nchanges = 0;
333 :
334 : /*
335 : * We enable skipping all data modification changes (INSERT, UPDATE, etc.) for
336 : * the subscription if the remote transaction's finish LSN matches the subskiplsn.
337 : * Once we start skipping changes, we don't stop it until we skip all changes of
338 : * the transaction even if pg_subscription is updated and MySubscription->skiplsn
339 : * gets changed or reset during that. Also, in streaming transaction cases (streaming = on),
340 : * we don't skip receiving and spooling the changes since we decide whether or not
341 : * to skip applying the changes when starting to apply changes. The subskiplsn is
342 : * cleared after successfully skipping the transaction or applying non-empty
343 : * transaction. The latter prevents the mistakenly specified subskiplsn from
344 : * being left. Note that we cannot skip the streaming transactions when using
345 : * parallel apply workers because we cannot get the finish LSN before applying
346 : * the changes. So, we don't start parallel apply worker when finish LSN is set
347 : * by the user.
348 : */
349 : static XLogRecPtr skip_xact_finish_lsn = InvalidXLogRecPtr;
350 : #define is_skipping_changes() (unlikely(!XLogRecPtrIsInvalid(skip_xact_finish_lsn)))
351 :
352 : /* BufFile handle of the current streaming file */
353 : static BufFile *stream_fd = NULL;
354 :
355 : typedef struct SubXactInfo
356 : {
357 : TransactionId xid; /* XID of the subxact */
358 : int fileno; /* file number in the buffile */
359 : off_t offset; /* offset in the file */
360 : } SubXactInfo;
361 :
362 : /* Sub-transaction data for the current streaming transaction */
363 : typedef struct ApplySubXactData
364 : {
365 : uint32 nsubxacts; /* number of sub-transactions */
366 : uint32 nsubxacts_max; /* current capacity of subxacts */
367 : TransactionId subxact_last; /* xid of the last sub-transaction */
368 : SubXactInfo *subxacts; /* sub-xact offset in changes file */
369 : } ApplySubXactData;
370 :
371 : static ApplySubXactData subxact_data = {0, 0, InvalidTransactionId, NULL};
372 :
373 : static inline void subxact_filename(char *path, Oid subid, TransactionId xid);
374 : static inline void changes_filename(char *path, Oid subid, TransactionId xid);
375 :
376 : /*
377 : * Information about subtransactions of a given toplevel transaction.
378 : */
379 : static void subxact_info_write(Oid subid, TransactionId xid);
380 : static void subxact_info_read(Oid subid, TransactionId xid);
381 : static void subxact_info_add(TransactionId xid);
382 : static inline void cleanup_subxact_info(void);
383 :
384 : /*
385 : * Serialize and deserialize changes for a toplevel transaction.
386 : */
387 : static void stream_open_file(Oid subid, TransactionId xid,
388 : bool first_segment);
389 : static void stream_write_change(char action, StringInfo s);
390 : static void stream_open_and_write_change(TransactionId xid, char action, StringInfo s);
391 : static void stream_close_file(void);
392 :
393 : static void send_feedback(XLogRecPtr recvpos, bool force, bool requestReply);
394 :
395 : static void DisableSubscriptionAndExit(void);
396 :
397 : static void apply_handle_commit_internal(LogicalRepCommitData *commit_data);
398 : static void apply_handle_insert_internal(ApplyExecutionData *edata,
399 : ResultRelInfo *relinfo,
400 : TupleTableSlot *remoteslot);
401 : static void apply_handle_update_internal(ApplyExecutionData *edata,
402 : ResultRelInfo *relinfo,
403 : TupleTableSlot *remoteslot,
404 : LogicalRepTupleData *newtup,
405 : Oid localindexoid);
406 : static void apply_handle_delete_internal(ApplyExecutionData *edata,
407 : ResultRelInfo *relinfo,
408 : TupleTableSlot *remoteslot,
409 : Oid localindexoid);
410 : static bool FindReplTupleInLocalRel(EState *estate, Relation localrel,
411 : LogicalRepRelation *remoterel,
412 : Oid localidxoid,
413 : TupleTableSlot *remoteslot,
414 : TupleTableSlot **localslot);
415 : static void apply_handle_tuple_routing(ApplyExecutionData *edata,
416 : TupleTableSlot *remoteslot,
417 : LogicalRepTupleData *newtup,
418 : CmdType operation);
419 :
420 : /* Compute GID for two_phase transactions */
421 : static void TwoPhaseTransactionGid(Oid subid, TransactionId xid, char *gid, int szgid);
422 :
423 : /* Functions for skipping changes */
424 : static void maybe_start_skipping_changes(XLogRecPtr finish_lsn);
425 : static void stop_skipping_changes(void);
426 : static void clear_subscription_skip_lsn(XLogRecPtr finish_lsn);
427 :
428 : /* Functions for apply error callback */
429 : static inline void set_apply_error_context_xact(TransactionId xid, XLogRecPtr lsn);
430 : static inline void reset_apply_error_context_info(void);
431 :
432 : static TransApplyAction get_transaction_apply_action(TransactionId xid,
433 : ParallelApplyWorkerInfo **winfo);
434 :
435 : /*
436 : * Return the name of the logical replication worker.
437 : */
438 : static const char *
90 akapila 439 GNC 187 : get_worker_name(void)
440 : {
441 187 : if (am_tablesync_worker())
90 akapila 442 UNC 0 : return _("logical replication table synchronization worker");
90 akapila 443 GNC 187 : else if (am_parallel_apply_worker())
444 10 : return _("logical replication parallel apply worker");
445 : else
446 177 : return _("logical replication apply worker");
447 : }
448 :
449 : /*
450 : * Form the origin name for the subscription.
451 : *
452 : * This is a common function for tablesync and other workers. Tablesync workers
453 : * must pass a valid relid. Other callers must pass relid = InvalidOid.
454 : *
455 : * Return the name in the supplied buffer.
456 : */
457 : void
180 458 940 : ReplicationOriginNameForLogicalRep(Oid suboid, Oid relid,
459 : char *originname, Size szoriginname)
460 : {
461 940 : if (OidIsValid(relid))
462 : {
463 : /* Replication origin name for tablesync workers. */
464 598 : snprintf(originname, szoriginname, "pg_%u_%u", suboid, relid);
465 : }
466 : else
467 : {
468 : /* Replication origin name for non-tablesync workers. */
469 342 : snprintf(originname, szoriginname, "pg_%u", suboid);
470 : }
471 940 : }
472 :
473 : /*
474 : * Should this worker apply changes for given relation.
475 : *
476 : * This is mainly needed for initial relation data sync as that runs in
477 : * separate worker process running in parallel and we need some way to skip
478 : * changes coming to the leader apply worker during the sync of a table.
479 : *
480 : * Note we need to do smaller or equals comparison for SYNCDONE state because
481 : * it might hold position of end of initial slot consistent point WAL
482 : * record + 1 (ie start of next record) and next record can be COMMIT of
483 : * transaction we are now processing (which is what we set remote_final_lsn
484 : * to in apply_handle_begin).
485 : *
486 : * Note that for streaming transactions that are being applied in the parallel
487 : * apply worker, we disallow applying changes if the target table in the
488 : * subscription is not in the READY state, because we cannot decide whether to
489 : * apply the change as we won't know remote_final_lsn by that time.
490 : *
491 : * We already checked this in pa_can_start() before assigning the
492 : * streaming transaction to the parallel worker, but it also needs to be
493 : * checked here because if the user executes ALTER SUBSCRIPTION ... REFRESH
494 : * PUBLICATION in parallel, the new table can be added to pg_subscription_rel
495 : * while applying this transaction.
496 : */
497 : static bool
2208 peter_e 498 GIC 147877 : should_apply_changes_for_rel(LogicalRepRelMapEntry *rel)
499 : {
500 147877 : if (am_tablesync_worker())
2208 peter_e 501 UIC 0 : return MyLogicalRepWorker->relid == rel->localreloid;
90 akapila 502 GNC 147877 : else if (am_parallel_apply_worker())
503 : {
504 : /* We don't synchronize rel's that are in unknown state. */
505 68365 : if (rel->state != SUBREL_STATE_READY &&
90 akapila 506 UNC 0 : rel->state != SUBREL_STATE_UNKNOWN)
507 0 : ereport(ERROR,
508 : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
509 : errmsg("logical replication parallel apply worker for subscription \"%s\" will stop",
510 : MySubscription->name),
511 : errdetail("Cannot handle streamed replication transactions using parallel apply workers until all tables have been synchronized.")));
512 :
90 akapila 513 GNC 68365 : return rel->state == SUBREL_STATE_READY;
514 : }
515 : else
2208 peter_e 516 GIC 79554 : return (rel->state == SUBREL_STATE_READY ||
517 42 : (rel->state == SUBREL_STATE_SYNCDONE &&
518 4 : rel->statelsn <= remote_final_lsn));
519 : }
520 :
521 : /*
522 : * Begin one step (one INSERT, UPDATE, etc) of a replication transaction.
523 : *
524 : * Start a transaction, if this is the first step (else we keep using the
525 : * existing transaction).
526 : * Also provide a global snapshot and ensure we run in ApplyMessageContext.
527 : */
528 : static void
668 tgl 529 148324 : begin_replication_step(void)
530 : {
531 148324 : SetCurrentStatementStartTimestamp();
532 :
533 148324 : if (!IsTransactionState())
534 : {
535 849 : StartTransactionCommand();
536 849 : maybe_reread_subscription();
537 : }
538 :
539 148324 : PushActiveSnapshot(GetTransactionSnapshot());
540 :
2161 peter_e 541 148324 : MemoryContextSwitchTo(ApplyMessageContext);
668 tgl 542 148324 : }
543 :
544 : /*
545 : * Finish up one step of a replication transaction.
546 : * Callers of begin_replication_step() must also call this.
547 : *
548 : * We don't close out the transaction here, but we should increment
549 : * the command counter to make the effects of this step visible.
550 : */
551 : static void
552 148296 : end_replication_step(void)
553 : {
554 148296 : PopActiveSnapshot();
555 :
556 148296 : CommandCounterIncrement();
2271 peter_e 557 148296 : }
558 :
559 : /*
560 : * Handle streamed transactions for both the leader apply worker and the
561 : * parallel apply workers.
562 : *
563 : * In the streaming case (receiving a block of the streamed transaction), for
564 : * serialize mode, simply redirect it to a file for the proper toplevel
565 : * transaction, and for parallel mode, the leader apply worker will send the
566 : * changes to parallel apply workers and the parallel apply worker will define
567 : * savepoints if needed. (LOGICAL_REP_MSG_RELATION or LOGICAL_REP_MSG_TYPE
568 : * messages will be applied by both leader apply worker and parallel apply
569 : * workers).
570 : *
571 : * Returns true for streamed transactions (when the change is either serialized
572 : * to file or sent to parallel apply worker), false otherwise (regular mode or
573 : * needs to be processed by parallel apply worker).
574 : *
575 : * Exception: If the message being processed is LOGICAL_REP_MSG_RELATION
576 : * or LOGICAL_REP_MSG_TYPE, return false even if the message needs to be sent
577 : * to a parallel apply worker.
578 : */
579 : static bool
864 akapila 580 324124 : handle_streamed_transaction(LogicalRepMsgType action, StringInfo s)
581 : {
582 : TransactionId current_xid;
583 : ParallelApplyWorkerInfo *winfo;
584 : TransApplyAction apply_action;
585 : StringInfoData original_msg;
586 :
90 akapila 587 GNC 324124 : apply_action = get_transaction_apply_action(stream_xid, &winfo);
588 :
948 akapila 589 ECB : /* not in streaming mode */
90 akapila 590 GNC 324124 : if (apply_action == TRANS_LEADER_APPLY)
948 akapila 591 CBC 79826 : return false;
948 akapila 592 ECB :
948 akapila 593 CBC 244298 : Assert(TransactionIdIsValid(stream_xid));
594 :
595 : /*
596 : * The parallel apply worker needs the xid in this message to decide
597 : * whether to define a savepoint, so save the original message that has
598 : * not moved the cursor after the xid. We will serialize this message to a
599 : * file in PARTIAL_SERIALIZE mode.
600 : */
90 akapila 601 GNC 244298 : original_msg = *s;
602 :
603 : /*
604 : * We should have received XID of the subxact as the first part of the
605 : * message, so extract it.
606 : */
607 244298 : current_xid = pq_getmsgint(s, 4);
608 :
609 244298 : if (!TransactionIdIsValid(current_xid))
666 tgl 610 UIC 0 : ereport(ERROR,
611 : (errcode(ERRCODE_PROTOCOL_VIOLATION),
612 : errmsg_internal("invalid transaction ID in streamed replication transaction")));
948 akapila 613 ECB :
90 akapila 614 GNC 244298 : switch (apply_action)
615 : {
616 102513 : case TRANS_LEADER_SERIALIZE:
617 102513 : Assert(stream_fd);
948 akapila 618 ECB :
619 : /* Add the new subxact to the array (unless already there). */
90 akapila 620 GNC 102513 : subxact_info_add(current_xid);
90 akapila 621 ECB :
622 : /* Write the change to the current file */
90 akapila 623 GNC 102513 : stream_write_change(action, s);
624 102513 : return true;
625 :
626 68386 : case TRANS_LEADER_SEND_TO_PARALLEL:
627 68386 : Assert(winfo);
628 :
629 : /*
630 : * XXX The publisher side doesn't always send relation/type update
631 : * messages after the streaming transaction, so also update the
632 : * relation/type in leader apply worker. See function
633 : * cleanup_rel_sync_cache.
634 : */
635 68386 : if (pa_send_data(winfo, s->len, s->data))
636 68386 : return (action != LOGICAL_REP_MSG_RELATION &&
637 : action != LOGICAL_REP_MSG_TYPE);
638 :
639 : /*
640 : * Switch to serialize mode when we are not able to send the
641 : * change to parallel apply worker.
642 : */
90 akapila 643 UNC 0 : pa_switch_to_partial_serialize(winfo, false);
644 :
645 : /* fall through */
90 akapila 646 GNC 5006 : case TRANS_LEADER_PARTIAL_SERIALIZE:
647 5006 : stream_write_change(action, &original_msg);
648 :
649 : /* Same reason as TRANS_LEADER_SEND_TO_PARALLEL case. */
650 5006 : return (action != LOGICAL_REP_MSG_RELATION &&
651 : action != LOGICAL_REP_MSG_TYPE);
652 :
653 68393 : case TRANS_PARALLEL_APPLY:
654 68393 : parallel_stream_nchanges += 1;
655 :
656 : /* Define a savepoint for a subxact if needed. */
657 68393 : pa_start_subtrans(current_xid, stream_xid);
658 68393 : return false;
659 :
90 akapila 660 UNC 0 : default:
661 0 : Assert(false);
662 : return false; /* silence compiler warning */
663 : }
664 : }
665 :
666 : /*
2271 peter_e 667 ECB : * Executor state preparation for evaluation of constraint expressions,
668 : * indexes and triggers for the specified relation.
669 : *
670 : * Note that the caller must open and close any indexes to be updated.
671 : */
672 : static ApplyExecutionData *
687 tgl 673 GIC 147811 : create_edata_for_relation(LogicalRepRelMapEntry *rel)
674 : {
675 : ApplyExecutionData *edata;
676 : EState *estate;
677 : RangeTblEntry *rte;
34 tgl 678 GNC 147811 : List *perminfos = NIL;
679 : ResultRelInfo *resultRelInfo;
680 :
687 tgl 681 GIC 147811 : edata = (ApplyExecutionData *) palloc0(sizeof(ApplyExecutionData));
682 147811 : edata->targetRel = rel;
683 :
684 147811 : edata->estate = estate = CreateExecutorState();
685 :
2271 peter_e 686 147811 : rte = makeNode(RangeTblEntry);
687 147811 : rte->rtekind = RTE_RELATION;
688 147811 : rte->relid = RelationGetRelid(rel->localrel);
689 147811 : rte->relkind = rel->localrel->rd_rel->relkind;
1652 tgl 690 147811 : rte->rellockmode = AccessShareLock;
691 :
34 tgl 692 GNC 147811 : addRTEPermissionInfo(&perminfos, rte);
693 :
694 147811 : ExecInitRangeTable(estate, list_make1(rte), perminfos);
695 :
687 tgl 696 GIC 147811 : edata->targetRelInfo = resultRelInfo = makeNode(ResultRelInfo);
697 :
698 : /*
699 : * Use Relation opened by logicalrep_rel_open() instead of opening it
717 michael 700 ECB : * again.
701 : */
687 tgl 702 CBC 147811 : InitResultRelInfo(resultRelInfo, rel->localrel, 1, NULL, 0);
717 michael 703 EUB :
717 michael 704 ECB : /*
705 : * We put the ResultRelInfo in the es_opened_result_relations list, even
706 : * though we don't populate the es_result_relations array. That's a bit
707 : * bogus, but it's enough to make ExecGetTriggerResultRel() find them.
717 michael 708 EUB : *
709 : * ExecOpenIndices() is not called here either, each execution path doing
710 : * an apply operation being responsible for that.
711 : */
717 michael 712 GIC 147811 : estate->es_opened_result_relations =
687 tgl 713 147811 : lappend(estate->es_opened_result_relations, resultRelInfo);
714 :
1964 simon 715 CBC 147811 : estate->es_output_cid = GetCurrentCommandId(true);
716 :
717 : /* Prepare to catch AFTER triggers. */
2228 peter_e 718 147811 : AfterTriggerBeginQuery();
2228 peter_e 719 ECB :
687 tgl 720 : /* other fields of edata remain NULL for now */
721 :
687 tgl 722 GIC 147811 : return edata;
723 : }
724 :
725 : /*
726 : * Finish any operations related to the executor state created by
727 : * create_edata_for_relation().
728 : */
729 : static void
730 147792 : finish_edata(ApplyExecutionData *edata)
717 michael 731 ECB : {
687 tgl 732 GIC 147792 : EState *estate = edata->estate;
687 tgl 733 ECB :
734 : /* Handle any queued AFTER triggers. */
717 michael 735 CBC 147792 : AfterTriggerEndQuery(estate);
736 :
687 tgl 737 ECB : /* Shut down tuple routing, if any was done. */
687 tgl 738 CBC 147792 : if (edata->proute)
687 tgl 739 GIC 73 : ExecCleanupTupleRouting(edata->mtstate, edata->proute);
740 :
687 tgl 741 ECB : /*
742 : * Cleanup. It might seem that we should call ExecCloseResultRelations()
743 : * here, but we intentionally don't. It would close the rel we added to
744 : * es_opened_result_relations above, which is wrong because we took no
745 : * corresponding refcount. We rely on ExecCleanupTupleRouting() to close
746 : * any other relations opened during execution.
747 : */
717 michael 748 GIC 147792 : ExecResetTupleTable(estate->es_tupleTable, false);
749 147792 : FreeExecutorState(estate);
687 tgl 750 147792 : pfree(edata);
717 michael 751 147792 : }
752 :
753 : /*
2271 peter_e 754 ECB : * Executes default values for columns for which we can't map to remote
755 : * relation columns.
756 : *
757 : * This allows us to support tables which have more columns on the downstream
758 : * than on the upstream.
759 : */
760 : static void
2271 peter_e 761 GIC 75593 : slot_fill_defaults(LogicalRepRelMapEntry *rel, EState *estate,
762 : TupleTableSlot *slot)
763 : {
764 75593 : TupleDesc desc = RelationGetDescr(rel->localrel);
765 75593 : int num_phys_attrs = desc->natts;
766 : int i;
767 : int attnum,
768 75593 : num_defaults = 0;
769 : int *defmap;
770 : ExprState **defexprs;
771 : ExprContext *econtext;
772 :
773 75593 : econtext = GetPerTupleExprContext(estate);
774 :
775 : /* We got all the data via replication, no need to evaluate anything. */
776 75593 : if (num_phys_attrs == rel->remoterel.natts)
777 35459 : return;
778 :
779 40134 : defmap = (int *) palloc(num_phys_attrs * sizeof(int));
780 40134 : defexprs = (ExprState **) palloc(num_phys_attrs * sizeof(ExprState *));
781 :
1208 michael 782 CBC 40134 : Assert(rel->attrmap->maplen == num_phys_attrs);
2271 peter_e 783 GIC 210619 : for (attnum = 0; attnum < num_phys_attrs; attnum++)
784 : {
785 : Expr *defexpr;
786 :
1471 peter 787 170485 : if (TupleDescAttr(desc, attnum)->attisdropped || TupleDescAttr(desc, attnum)->attgenerated)
2271 peter_e 788 UIC 0 : continue;
2271 peter_e 789 ECB :
1208 michael 790 GIC 170485 : if (rel->attrmap->attnums[attnum] >= 0)
2271 peter_e 791 92256 : continue;
2271 peter_e 792 ECB :
2271 peter_e 793 CBC 78229 : defexpr = (Expr *) build_column_default(rel->localrel, attnum + 1);
794 :
795 78229 : if (defexpr != NULL)
796 : {
797 : /* Run the expression through planner */
2271 peter_e 798 GIC 70131 : defexpr = expression_planner(defexpr);
799 :
800 : /* Initialize executable expression in copycontext */
801 70131 : defexprs[num_defaults] = ExecInitExpr(defexpr, NULL);
802 70131 : defmap[num_defaults] = attnum;
2271 peter_e 803 CBC 70131 : num_defaults++;
804 : }
805 : }
806 :
2271 peter_e 807 GIC 110265 : for (i = 0; i < num_defaults; i++)
808 70131 : slot->tts_values[defmap[i]] =
2271 peter_e 809 CBC 70131 : ExecEvalExpr(defexprs[i], econtext, &slot->tts_isnull[defmap[i]]);
810 : }
2271 peter_e 811 ECB :
2271 peter_e 812 EUB : /*
813 : * Store tuple data into slot.
814 : *
815 : * Incoming data can be either text or binary format.
2271 peter_e 816 ECB : */
817 : static void
995 tgl 818 CBC 147811 : slot_store_data(TupleTableSlot *slot, LogicalRepRelMapEntry *rel,
995 tgl 819 ECB : LogicalRepTupleData *tupleData)
820 : {
2153 bruce 821 GIC 147811 : int natts = slot->tts_tupleDescriptor->natts;
2153 bruce 822 ECB : int i;
823 :
2271 peter_e 824 GIC 147811 : ExecClearTuple(slot);
2271 peter_e 825 ECB :
995 tgl 826 : /* Call the "in" function for each non-dropped, non-null attribute */
1208 michael 827 GIC 147811 : Assert(natts == rel->attrmap->maplen);
2271 peter_e 828 CBC 656938 : for (i = 0; i < natts; i++)
2271 peter_e 829 ECB : {
2058 andres 830 GIC 509127 : Form_pg_attribute att = TupleDescAttr(slot->tts_tupleDescriptor, i);
1208 michael 831 509127 : int remoteattnum = rel->attrmap->attnums[i];
832 :
995 tgl 833 509127 : if (!att->attisdropped && remoteattnum >= 0)
2271 peter_e 834 302237 : {
995 tgl 835 302237 : StringInfo colvalue = &tupleData->colvalues[remoteattnum];
836 :
993 tgl 837 CBC 302237 : Assert(remoteattnum < tupleData->ncols);
993 tgl 838 ECB :
839 : /* Set attnum for error callback */
590 akapila 840 GIC 302237 : apply_error_callback_arg.remote_attnum = remoteattnum;
841 :
995 tgl 842 302237 : if (tupleData->colstatus[remoteattnum] == LOGICALREP_COLUMN_TEXT)
843 : {
844 : Oid typinput;
995 tgl 845 EUB : Oid typioparam;
846 :
995 tgl 847 GIC 141940 : getTypeInputInfo(att->atttypid, &typinput, &typioparam);
995 tgl 848 CBC 283880 : slot->tts_values[i] =
849 141940 : OidInputFunctionCall(typinput, colvalue->data,
850 : typioparam, att->atttypmod);
995 tgl 851 GIC 141940 : slot->tts_isnull[i] = false;
995 tgl 852 ECB : }
995 tgl 853 GIC 160297 : else if (tupleData->colstatus[remoteattnum] == LOGICALREP_COLUMN_BINARY)
854 : {
995 tgl 855 ECB : Oid typreceive;
856 : Oid typioparam;
857 :
858 : /*
859 : * In some code paths we may be asked to re-parse the same
860 : * tuple data. Reset the StringInfo's cursor so that works.
861 : */
995 tgl 862 GBC 109980 : colvalue->cursor = 0;
995 tgl 863 EUB :
995 tgl 864 GIC 109980 : getTypeBinaryInputInfo(att->atttypid, &typreceive, &typioparam);
865 219960 : slot->tts_values[i] =
866 109980 : OidReceiveFunctionCall(typreceive, colvalue,
867 : typioparam, att->atttypmod);
868 :
869 : /* Trouble if it didn't eat the whole buffer */
870 109980 : if (colvalue->cursor != colvalue->len)
995 tgl 871 UIC 0 : ereport(ERROR,
872 : (errcode(ERRCODE_INVALID_BINARY_REPRESENTATION),
873 : errmsg("incorrect binary data format in logical replication column %d",
874 : remoteattnum + 1)));
995 tgl 875 CBC 109980 : slot->tts_isnull[i] = false;
876 : }
877 : else
878 : {
879 : /*
995 tgl 880 ECB : * NULL value from remote. (We don't expect to see
881 : * LOGICALREP_COLUMN_UNCHANGED here, but if we do, treat it as
882 : * NULL.)
883 : */
995 tgl 884 CBC 50317 : slot->tts_values[i] = (Datum) 0;
995 tgl 885 GIC 50317 : slot->tts_isnull[i] = true;
995 tgl 886 ECB : }
887 :
590 akapila 888 : /* Reset attnum for error callback */
590 akapila 889 CBC 302237 : apply_error_callback_arg.remote_attnum = -1;
2271 peter_e 890 ECB : }
891 : else
892 : {
893 : /*
995 tgl 894 : * We assign NULL to dropped attributes and missing values
895 : * (missing values should be later filled using
2271 peter_e 896 : * slot_fill_defaults).
897 : */
2271 peter_e 898 CBC 206890 : slot->tts_values[i] = (Datum) 0;
2271 peter_e 899 GIC 206890 : slot->tts_isnull[i] = true;
900 : }
901 : }
902 :
903 147811 : ExecStoreVirtualTuple(slot);
2271 peter_e 904 CBC 147811 : }
905 :
906 : /*
907 : * Replace updated columns with data from the LogicalRepTupleData struct.
908 : * This is somewhat similar to heap_modify_tuple but also calls the type
909 : * input functions on the user data.
910 : *
911 : * "slot" is filled with a copy of the tuple in "srcslot", replacing
912 : * columns provided in "tupleData" and leaving others as-is.
913 : *
1234 tgl 914 ECB : * Caution: unreplaced pass-by-ref columns in "slot" will point into the
915 : * storage for "srcslot". This is OK for current usage, but someday we may
916 : * need to materialize "slot" at the end to make it independent of "srcslot".
2271 peter_e 917 : */
918 : static void
995 tgl 919 GIC 31908 : slot_modify_data(TupleTableSlot *slot, TupleTableSlot *srcslot,
995 tgl 920 ECB : LogicalRepRelMapEntry *rel,
921 : LogicalRepTupleData *tupleData)
922 : {
2153 bruce 923 GIC 31908 : int natts = slot->tts_tupleDescriptor->natts;
2153 bruce 924 ECB : int i;
925 :
926 : /* We'll fill "slot" with a virtual tuple, so we must start with ... */
2271 peter_e 927 GIC 31908 : ExecClearTuple(slot);
928 :
929 : /*
930 : * Copy all the column data from srcslot, so that we'll have valid values
931 : * for unreplaced columns.
1234 tgl 932 ECB : */
1234 tgl 933 GIC 31908 : Assert(natts == srcslot->tts_tupleDescriptor->natts);
1234 tgl 934 CBC 31908 : slot_getallattrs(srcslot);
1234 tgl 935 GIC 31908 : memcpy(slot->tts_values, srcslot->tts_values, natts * sizeof(Datum));
936 31908 : memcpy(slot->tts_isnull, srcslot->tts_isnull, natts * sizeof(bool));
1234 tgl 937 ECB :
938 : /* Call the "in" function for each replaced attribute */
1208 michael 939 GIC 31908 : Assert(natts == rel->attrmap->maplen);
2271 peter_e 940 CBC 159221 : for (i = 0; i < natts; i++)
2271 peter_e 941 ECB : {
2058 andres 942 GIC 127313 : Form_pg_attribute att = TupleDescAttr(slot->tts_tupleDescriptor, i);
1208 michael 943 127313 : int remoteattnum = rel->attrmap->attnums[i];
944 :
1983 peter_e 945 127313 : if (remoteattnum < 0)
2271 946 58514 : continue;
947 :
993 tgl 948 68799 : Assert(remoteattnum < tupleData->ncols);
949 :
995 tgl 950 CBC 68799 : if (tupleData->colstatus[remoteattnum] != LOGICALREP_COLUMN_UNCHANGED)
2271 peter_e 951 ECB : {
995 tgl 952 CBC 68796 : StringInfo colvalue = &tupleData->colvalues[remoteattnum];
2271 peter_e 953 ECB :
954 : /* Set attnum for error callback */
590 akapila 955 GIC 68796 : apply_error_callback_arg.remote_attnum = remoteattnum;
956 :
995 tgl 957 68796 : if (tupleData->colstatus[remoteattnum] == LOGICALREP_COLUMN_TEXT)
958 : {
959 : Oid typinput;
960 : Oid typioparam;
961 :
962 25393 : getTypeInputInfo(att->atttypid, &typinput, &typioparam);
995 tgl 963 CBC 50786 : slot->tts_values[i] =
995 tgl 964 GIC 25393 : OidInputFunctionCall(typinput, colvalue->data,
965 : typioparam, att->atttypmod);
995 tgl 966 CBC 25393 : slot->tts_isnull[i] = false;
995 tgl 967 ECB : }
995 tgl 968 GIC 43403 : else if (tupleData->colstatus[remoteattnum] == LOGICALREP_COLUMN_BINARY)
969 : {
995 tgl 970 ECB : Oid typreceive;
971 : Oid typioparam;
972 :
973 : /*
974 : * In some code paths we may be asked to re-parse the same
975 : * tuple data. Reset the StringInfo's cursor so that works.
976 : */
995 tgl 977 GIC 43356 : colvalue->cursor = 0;
995 tgl 978 ECB :
995 tgl 979 CBC 43356 : getTypeBinaryInputInfo(att->atttypid, &typreceive, &typioparam);
995 tgl 980 GIC 86712 : slot->tts_values[i] =
995 tgl 981 CBC 43356 : OidReceiveFunctionCall(typreceive, colvalue,
995 tgl 982 ECB : typioparam, att->atttypmod);
983 :
984 : /* Trouble if it didn't eat the whole buffer */
995 tgl 985 CBC 43356 : if (colvalue->cursor != colvalue->len)
995 tgl 986 UIC 0 : ereport(ERROR,
987 : (errcode(ERRCODE_INVALID_BINARY_REPRESENTATION),
988 : errmsg("incorrect binary data format in logical replication column %d",
995 tgl 989 ECB : remoteattnum + 1)));
995 tgl 990 GBC 43356 : slot->tts_isnull[i] = false;
991 : }
995 tgl 992 ECB : else
993 : {
994 : /* must be LOGICALREP_COLUMN_NULL */
995 tgl 995 CBC 47 : slot->tts_values[i] = (Datum) 0;
995 tgl 996 GIC 47 : slot->tts_isnull[i] = true;
995 tgl 997 ECB : }
998 :
999 : /* Reset attnum for error callback */
590 akapila 1000 CBC 68796 : apply_error_callback_arg.remote_attnum = -1;
1001 : }
1002 : }
2271 peter_e 1003 ECB :
1234 tgl 1004 : /* And finally, declare that "slot" contains a valid virtual tuple */
2271 peter_e 1005 CBC 31908 : ExecStoreVirtualTuple(slot);
2271 peter_e 1006 GIC 31908 : }
1007 :
1008 : /*
2271 peter_e 1009 ECB : * Handle BEGIN message.
1010 : */
1011 : static void
2271 peter_e 1012 GIC 380 : apply_handle_begin(StringInfo s)
1013 : {
1014 : LogicalRepBeginData begin_data;
1015 :
1016 : /* There must not be an active streaming transaction. */
82 akapila 1017 GNC 380 : Assert(!TransactionIdIsValid(stream_xid));
1018 :
2271 peter_e 1019 GIC 380 : logicalrep_read_begin(s, &begin_data);
397 akapila 1020 380 : set_apply_error_context_xact(begin_data.xid, begin_data.final_lsn);
1021 :
2208 peter_e 1022 380 : remote_final_lsn = begin_data.final_lsn;
2208 peter_e 1023 ECB :
383 akapila 1024 GIC 380 : maybe_start_skipping_changes(begin_data.final_lsn);
1025 :
2271 peter_e 1026 CBC 380 : in_remote_transaction = true;
1027 :
2271 peter_e 1028 GIC 380 : pgstat_report_activity(STATE_RUNNING, NULL);
2271 peter_e 1029 CBC 380 : }
1030 :
1031 : /*
2271 peter_e 1032 ECB : * Handle COMMIT message.
1033 : *
1034 : * TODO, support tracking of multiple origins
1035 : */
1036 : static void
2271 peter_e 1037 GIC 355 : apply_handle_commit(StringInfo s)
2271 peter_e 1038 ECB : {
2153 bruce 1039 : LogicalRepCommitData commit_data;
2271 peter_e 1040 :
2271 peter_e 1041 GIC 355 : logicalrep_read_commit(s, &commit_data);
2271 peter_e 1042 ECB :
666 tgl 1043 GIC 355 : if (commit_data.commit_lsn != remote_final_lsn)
666 tgl 1044 UIC 0 : ereport(ERROR,
666 tgl 1045 ECB : (errcode(ERRCODE_PROTOCOL_VIOLATION),
1046 : errmsg_internal("incorrect commit LSN %X/%X in commit message (expected %X/%X)",
1047 : LSN_FORMAT_ARGS(commit_data.commit_lsn),
1048 : LSN_FORMAT_ARGS(remote_final_lsn))));
1049 :
618 akapila 1050 GIC 355 : apply_handle_commit_internal(&commit_data);
1051 :
2208 peter_e 1052 ECB : /* Process any tables that are being synchronized in parallel. */
2208 peter_e 1053 CBC 355 : process_syncing_tables(commit_data.end_lsn);
2208 peter_e 1054 ECB :
2271 peter_e 1055 GIC 355 : pgstat_report_activity(STATE_IDLE, NULL);
590 akapila 1056 CBC 355 : reset_apply_error_context_info();
2271 peter_e 1057 GIC 355 : }
2271 peter_e 1058 ECB :
1059 : /*
1060 : * Handle BEGIN PREPARE message.
1061 : */
1062 : static void
634 akapila 1063 GIC 14 : apply_handle_begin_prepare(StringInfo s)
1064 : {
1065 : LogicalRepPreparedTxnData begin_data;
1066 :
634 akapila 1067 ECB : /* Tablesync should never receive prepare. */
634 akapila 1068 GIC 14 : if (am_tablesync_worker())
634 akapila 1069 LBC 0 : ereport(ERROR,
634 akapila 1070 ECB : (errcode(ERRCODE_PROTOCOL_VIOLATION),
1071 : errmsg_internal("tablesync worker received a BEGIN PREPARE message")));
1072 :
1073 : /* There must not be an active streaming transaction. */
82 akapila 1074 GNC 14 : Assert(!TransactionIdIsValid(stream_xid));
1075 :
634 akapila 1076 GIC 14 : logicalrep_read_begin_prepare(s, &begin_data);
397 1077 14 : set_apply_error_context_xact(begin_data.xid, begin_data.prepare_lsn);
634 akapila 1078 ECB :
634 akapila 1079 GBC 14 : remote_final_lsn = begin_data.prepare_lsn;
1080 :
383 akapila 1081 GIC 14 : maybe_start_skipping_changes(begin_data.prepare_lsn);
1082 :
634 akapila 1083 CBC 14 : in_remote_transaction = true;
1084 :
634 akapila 1085 GIC 14 : pgstat_report_activity(STATE_RUNNING, NULL);
1086 14 : }
1087 :
1088 : /*
1089 : * Common function to prepare the GID.
1090 : */
1091 : static void
619 akapila 1092 CBC 21 : apply_handle_prepare_internal(LogicalRepPreparedTxnData *prepare_data)
619 akapila 1093 ECB : {
1094 : char gid[GIDSIZE];
1095 :
1096 : /*
1097 : * Compute unique GID for two_phase transactions. We don't use GID of
1098 : * prepared transaction sent by server as that can lead to deadlock when
1099 : * we have multiple subscriptions from same node point to publications on
1100 : * the same node. See comments atop worker.c
1101 : */
619 akapila 1102 GIC 21 : TwoPhaseTransactionGid(MySubscription->oid, prepare_data->xid,
1103 : gid, sizeof(gid));
1104 :
1105 : /*
619 akapila 1106 ECB : * BeginTransactionBlock is necessary to balance the EndTransactionBlock
1107 : * called within the PrepareTransactionBlock below.
1108 : */
90 akapila 1109 GNC 21 : if (!IsTransactionBlock())
1110 : {
1111 21 : BeginTransactionBlock();
1112 21 : CommitTransactionCommand(); /* Completes the preceding Begin command. */
1113 : }
619 akapila 1114 ECB :
1115 : /*
1116 : * Update origin state so we can restart streaming from correct position
1117 : * in case of crash.
1118 : */
619 akapila 1119 GIC 21 : replorigin_session_origin_lsn = prepare_data->end_lsn;
1120 21 : replorigin_session_origin_timestamp = prepare_data->prepare_time;
1121 :
1122 21 : PrepareTransactionBlock(gid);
1123 21 : }
1124 :
1125 : /*
1126 : * Handle PREPARE message.
1127 : */
1128 : static void
634 1129 13 : apply_handle_prepare(StringInfo s)
634 akapila 1130 ECB : {
1131 : LogicalRepPreparedTxnData prepare_data;
1132 :
634 akapila 1133 GIC 13 : logicalrep_read_prepare(s, &prepare_data);
634 akapila 1134 ECB :
634 akapila 1135 GIC 13 : if (prepare_data.prepare_lsn != remote_final_lsn)
634 akapila 1136 UIC 0 : ereport(ERROR,
1137 : (errcode(ERRCODE_PROTOCOL_VIOLATION),
634 akapila 1138 ECB : errmsg_internal("incorrect prepare LSN %X/%X in prepare message (expected %X/%X)",
1139 : LSN_FORMAT_ARGS(prepare_data.prepare_lsn),
1140 : LSN_FORMAT_ARGS(remote_final_lsn))));
1141 :
1142 : /*
1143 : * Unlike commit, here, we always prepare the transaction even though no
383 1144 : * change has happened in this transaction or all changes are skipped. It
1145 : * is done this way because at commit prepared time, we won't know whether
1146 : * we have skipped preparing a transaction because of those reasons.
634 1147 : *
1148 : * XXX, We can optimize such that at commit prepared time, we first check
1149 : * whether we have prepared the transaction or not but that doesn't seem
1150 : * worthwhile because such cases shouldn't be common.
1151 : */
634 akapila 1152 GIC 13 : begin_replication_step();
634 akapila 1153 ECB :
619 akapila 1154 CBC 13 : apply_handle_prepare_internal(&prepare_data);
1155 :
634 1156 13 : end_replication_step();
1157 13 : CommitTransactionCommand();
634 akapila 1158 GIC 13 : pgstat_report_stat(false);
634 akapila 1159 ECB :
90 akapila 1160 GNC 13 : store_flush_position(prepare_data.end_lsn, XactLastCommitEnd);
634 akapila 1161 ECB :
634 akapila 1162 GIC 13 : in_remote_transaction = false;
634 akapila 1163 ECB :
1164 : /* Process any tables that are being synchronized in parallel. */
634 akapila 1165 GIC 13 : process_syncing_tables(prepare_data.end_lsn);
634 akapila 1166 ECB :
1167 : /*
383 1168 : * Since we have already prepared the transaction, in a case where the
1169 : * server crashes before clearing the subskiplsn, it will be left but the
1170 : * transaction won't be resent. But that's okay because it's a rare case
1171 : * and the subskiplsn will be cleared when finishing the next transaction.
1172 : */
383 akapila 1173 CBC 13 : stop_skipping_changes();
1174 13 : clear_subscription_skip_lsn(prepare_data.prepare_lsn);
383 akapila 1175 ECB :
634 akapila 1176 GIC 13 : pgstat_report_activity(STATE_IDLE, NULL);
590 akapila 1177 CBC 13 : reset_apply_error_context_info();
634 akapila 1178 GIC 13 : }
634 akapila 1179 ECB :
1180 : /*
1181 : * Handle a COMMIT PREPARED of a previously PREPARED transaction.
1182 : *
1183 : * Note that we don't need to wait here if the transaction was prepared in a
1184 : * parallel apply worker. In that case, we have already waited for the prepare
1185 : * to finish in apply_handle_stream_prepare() which will ensure all the
1186 : * operations in that transaction have happened in the subscriber, so no
1187 : * concurrent transaction can cause deadlock or transaction dependency issues.
1188 : */
1189 : static void
634 akapila 1190 GIC 19 : apply_handle_commit_prepared(StringInfo s)
1191 : {
1192 : LogicalRepCommitPreparedTxnData prepare_data;
1193 : char gid[GIDSIZE];
634 akapila 1194 ECB :
634 akapila 1195 GIC 19 : logicalrep_read_commit_prepared(s, &prepare_data);
397 akapila 1196 CBC 19 : set_apply_error_context_xact(prepare_data.xid, prepare_data.commit_lsn);
634 akapila 1197 ECB :
1198 : /* Compute GID for two_phase transactions. */
634 akapila 1199 GIC 19 : TwoPhaseTransactionGid(MySubscription->oid, prepare_data.xid,
1200 : gid, sizeof(gid));
1201 :
634 akapila 1202 ECB : /* There is no transaction when COMMIT PREPARED is called */
634 akapila 1203 GBC 19 : begin_replication_step();
1204 :
1205 : /*
1206 : * Update origin state so we can restart streaming from correct position
634 akapila 1207 ECB : * in case of crash.
1208 : */
634 akapila 1209 GIC 19 : replorigin_session_origin_lsn = prepare_data.end_lsn;
1210 19 : replorigin_session_origin_timestamp = prepare_data.commit_time;
1211 :
634 akapila 1212 CBC 19 : FinishPreparedTransaction(gid, true);
1213 19 : end_replication_step();
634 akapila 1214 GIC 19 : CommitTransactionCommand();
1215 19 : pgstat_report_stat(false);
1216 :
90 akapila 1217 GNC 19 : store_flush_position(prepare_data.end_lsn, XactLastCommitEnd);
634 akapila 1218 GIC 19 : in_remote_transaction = false;
1219 :
1220 : /* Process any tables that are being synchronized in parallel. */
1221 19 : process_syncing_tables(prepare_data.end_lsn);
634 akapila 1222 ECB :
383 akapila 1223 CBC 19 : clear_subscription_skip_lsn(prepare_data.end_lsn);
1224 :
634 akapila 1225 GIC 19 : pgstat_report_activity(STATE_IDLE, NULL);
590 1226 19 : reset_apply_error_context_info();
634 1227 19 : }
1228 :
634 akapila 1229 ECB : /*
1230 : * Handle a ROLLBACK PREPARED of a previously PREPARED TRANSACTION.
1231 : *
1232 : * Note that we don't need to wait here if the transaction was prepared in a
1233 : * parallel apply worker. In that case, we have already waited for the prepare
1234 : * to finish in apply_handle_stream_prepare() which will ensure all the
1235 : * operations in that transaction have happened in the subscriber, so no
1236 : * concurrent transaction can cause deadlock or transaction dependency issues.
1237 : */
1238 : static void
634 akapila 1239 GIC 5 : apply_handle_rollback_prepared(StringInfo s)
634 akapila 1240 ECB : {
1241 : LogicalRepRollbackPreparedTxnData rollback_data;
1242 : char gid[GIDSIZE];
1243 :
634 akapila 1244 GIC 5 : logicalrep_read_rollback_prepared(s, &rollback_data);
397 akapila 1245 CBC 5 : set_apply_error_context_xact(rollback_data.xid, rollback_data.rollback_end_lsn);
1246 :
634 akapila 1247 ECB : /* Compute GID for two_phase transactions. */
634 akapila 1248 GIC 5 : TwoPhaseTransactionGid(MySubscription->oid, rollback_data.xid,
634 akapila 1249 ECB : gid, sizeof(gid));
1250 :
1251 : /*
1252 : * It is possible that we haven't received prepare because it occurred
1253 : * before walsender reached a consistent point or the two_phase was still
1254 : * not enabled by that time, so in such cases, we need to skip rollback
1255 : * prepared.
1256 : */
634 akapila 1257 GIC 5 : if (LookupGXact(gid, rollback_data.prepare_end_lsn,
1258 : rollback_data.prepare_time))
1259 : {
634 akapila 1260 ECB : /*
1261 : * Update origin state so we can restart streaming from correct
1262 : * position in case of crash.
1263 : */
634 akapila 1264 CBC 5 : replorigin_session_origin_lsn = rollback_data.rollback_end_lsn;
634 akapila 1265 GIC 5 : replorigin_session_origin_timestamp = rollback_data.rollback_time;
634 akapila 1266 ECB :
634 akapila 1267 EUB : /* There is no transaction when ABORT/ROLLBACK PREPARED is called */
634 akapila 1268 GIC 5 : begin_replication_step();
1269 5 : FinishPreparedTransaction(gid, false);
1270 5 : end_replication_step();
1271 5 : CommitTransactionCommand();
1272 :
383 akapila 1273 CBC 5 : clear_subscription_skip_lsn(rollback_data.rollback_end_lsn);
1274 : }
1275 :
634 1276 5 : pgstat_report_stat(false);
1277 :
90 akapila 1278 GNC 5 : store_flush_position(rollback_data.rollback_end_lsn, XactLastCommitEnd);
634 akapila 1279 CBC 5 : in_remote_transaction = false;
634 akapila 1280 ECB :
1281 : /* Process any tables that are being synchronized in parallel. */
634 akapila 1282 GIC 5 : process_syncing_tables(rollback_data.rollback_end_lsn);
1283 :
1284 5 : pgstat_report_activity(STATE_IDLE, NULL);
590 1285 5 : reset_apply_error_context_info();
634 akapila 1286 CBC 5 : }
1287 :
1288 : /*
1289 : * Handle STREAM PREPARE.
1290 : */
1291 : static void
613 akapila 1292 GIC 11 : apply_handle_stream_prepare(StringInfo s)
613 akapila 1293 ECB : {
1294 : LogicalRepPreparedTxnData prepare_data;
1295 : ParallelApplyWorkerInfo *winfo;
1296 : TransApplyAction apply_action;
1297 :
1298 : /* Save the message before it is consumed. */
90 akapila 1299 GNC 11 : StringInfoData original_msg = *s;
613 akapila 1300 ECB :
613 akapila 1301 CBC 11 : if (in_streamed_transaction)
613 akapila 1302 UIC 0 : ereport(ERROR,
613 akapila 1303 ECB : (errcode(ERRCODE_PROTOCOL_VIOLATION),
1304 : errmsg_internal("STREAM PREPARE message without STREAM STOP")));
1305 :
1306 : /* Tablesync should never receive prepare. */
613 akapila 1307 CBC 11 : if (am_tablesync_worker())
613 akapila 1308 UIC 0 : ereport(ERROR,
613 akapila 1309 ECB : (errcode(ERRCODE_PROTOCOL_VIOLATION),
1310 : errmsg_internal("tablesync worker received a STREAM PREPARE message")));
1311 :
613 akapila 1312 GIC 11 : logicalrep_read_stream_prepare(s, &prepare_data);
397 1313 11 : set_apply_error_context_xact(prepare_data.xid, prepare_data.prepare_lsn);
1314 :
90 akapila 1315 GNC 11 : apply_action = get_transaction_apply_action(prepare_data.xid, &winfo);
613 akapila 1316 ECB :
90 akapila 1317 GNC 11 : switch (apply_action)
1318 : {
82 1319 5 : case TRANS_LEADER_APPLY:
1320 :
1321 : /*
1322 : * The transaction has been serialized to file, so replay all the
1323 : * spooled operations.
1324 : */
90 1325 5 : apply_spooled_messages(MyLogicalRepWorker->stream_fileset,
1326 : prepare_data.xid, prepare_data.prepare_lsn);
1327 :
1328 : /* Mark the transaction as prepared. */
1329 5 : apply_handle_prepare_internal(&prepare_data);
1330 :
1331 5 : CommitTransactionCommand();
1332 :
1333 5 : store_flush_position(prepare_data.end_lsn, XactLastCommitEnd);
1334 :
1335 5 : in_remote_transaction = false;
1336 :
1337 : /* Unlink the files with serialized changes and subxact info. */
1338 5 : stream_cleanup_files(MyLogicalRepWorker->subid, prepare_data.xid);
1339 :
1340 5 : elog(DEBUG1, "finished processing the STREAM PREPARE command");
1341 5 : break;
1342 :
1343 2 : case TRANS_LEADER_SEND_TO_PARALLEL:
1344 2 : Assert(winfo);
1345 :
1346 2 : if (pa_send_data(winfo, s->len, s->data))
1347 : {
1348 : /* Finish processing the streaming transaction. */
1349 2 : pa_xact_finish(winfo, prepare_data.end_lsn);
1350 2 : break;
1351 : }
1352 :
1353 : /*
1354 : * Switch to serialize mode when we are not able to send the
1355 : * change to parallel apply worker.
1356 : */
90 akapila 1357 UNC 0 : pa_switch_to_partial_serialize(winfo, true);
1358 :
1359 : /* fall through */
90 akapila 1360 GNC 1 : case TRANS_LEADER_PARTIAL_SERIALIZE:
1361 1 : Assert(winfo);
1362 :
1363 1 : stream_open_and_write_change(prepare_data.xid,
1364 : LOGICAL_REP_MSG_STREAM_PREPARE,
1365 : &original_msg);
1366 :
1367 1 : pa_set_fileset_state(winfo->shared, FS_SERIALIZE_DONE);
1368 :
1369 : /* Finish processing the streaming transaction. */
1370 1 : pa_xact_finish(winfo, prepare_data.end_lsn);
1371 1 : break;
1372 :
1373 3 : case TRANS_PARALLEL_APPLY:
1374 :
1375 : /*
1376 : * If the parallel apply worker is applying spooled messages then
1377 : * close the file before preparing.
1378 : */
1379 3 : if (stream_fd)
1380 1 : stream_close_file();
1381 :
1382 3 : begin_replication_step();
1383 :
1384 : /* Mark the transaction as prepared. */
1385 3 : apply_handle_prepare_internal(&prepare_data);
1386 :
1387 3 : end_replication_step();
1388 :
1389 3 : CommitTransactionCommand();
1390 :
1391 3 : MyParallelShared->last_commit_end = XactLastCommitEnd;
1392 :
1393 3 : pa_set_xact_state(MyParallelShared, PARALLEL_TRANS_FINISHED);
1394 3 : pa_unlock_transaction(MyParallelShared->xid, AccessExclusiveLock);
1395 :
1396 3 : pa_reset_subtrans();
1397 :
1398 3 : elog(DEBUG1, "finished processing the STREAM PREPARE command");
1399 3 : break;
1400 :
90 akapila 1401 UNC 0 : default:
82 1402 0 : elog(ERROR, "unexpected apply action: %d", (int) apply_action);
1403 : break;
1404 : }
1405 :
90 akapila 1406 GIC 11 : pgstat_report_stat(false);
613 akapila 1407 ECB :
1408 : /* Process any tables that are being synchronized in parallel. */
613 akapila 1409 CBC 11 : process_syncing_tables(prepare_data.end_lsn);
613 akapila 1410 ECB :
1411 : /*
1412 : * Similar to prepare case, the subskiplsn could be left in a case of
1413 : * server crash but it's okay. See the comments in apply_handle_prepare().
1414 : */
383 akapila 1415 GIC 11 : stop_skipping_changes();
1416 11 : clear_subscription_skip_lsn(prepare_data.prepare_lsn);
383 akapila 1417 ECB :
613 akapila 1418 CBC 11 : pgstat_report_activity(STATE_IDLE, NULL);
1419 :
590 1420 11 : reset_apply_error_context_info();
613 1421 11 : }
1422 :
1423 : /*
1424 : * Handle ORIGIN message.
1425 : *
1426 : * TODO, support tracking of multiple origins
2271 peter_e 1427 ECB : */
1428 : static void
2271 peter_e 1429 GIC 7 : apply_handle_origin(StringInfo s)
1430 : {
2271 peter_e 1431 ECB : /*
1432 : * ORIGIN message can only come inside streaming transaction or inside
948 akapila 1433 : * remote transaction and before any actual writes.
2271 peter_e 1434 EUB : */
948 akapila 1435 GIC 7 : if (!in_streamed_transaction &&
1436 10 : (!in_remote_transaction ||
1437 5 : (IsTransactionState() && !am_tablesync_worker())))
2271 peter_e 1438 UIC 0 : ereport(ERROR,
1439 : (errcode(ERRCODE_PROTOCOL_VIOLATION),
1440 : errmsg_internal("ORIGIN message sent out of order")));
2271 peter_e 1441 GIC 7 : }
1442 :
1443 : /*
1444 : * Initialize fileset (if not already done).
1445 : *
1446 : * Create a new file when first_segment is true, otherwise open the existing
1447 : * file.
1448 : */
1449 : void
90 akapila 1450 GNC 361 : stream_start_internal(TransactionId xid, bool first_segment)
1451 : {
1452 361 : begin_replication_step();
1453 :
1454 : /*
1455 : * Initialize the worker's stream_fileset if we haven't yet. This will be
1456 : * used for the entire duration of the worker so create it in a permanent
1457 : * context. We create this on the very first streaming message from any
1458 : * transaction and then use it for this and other streaming transactions.
1459 : * Now, we could create a fileset at the start of the worker as well but
1460 : * then we won't be sure that it will ever be used.
1461 : */
1462 361 : if (!MyLogicalRepWorker->stream_fileset)
1463 : {
1464 : MemoryContext oldctx;
1465 :
1466 14 : oldctx = MemoryContextSwitchTo(ApplyContext);
1467 :
1468 14 : MyLogicalRepWorker->stream_fileset = palloc(sizeof(FileSet));
1469 14 : FileSetInit(MyLogicalRepWorker->stream_fileset);
1470 :
1471 14 : MemoryContextSwitchTo(oldctx);
1472 : }
1473 :
1474 : /* Open the spool file for this transaction. */
1475 361 : stream_open_file(MyLogicalRepWorker->subid, xid, first_segment);
1476 :
1477 : /* If this is not the first segment, open existing subxact file. */
1478 361 : if (!first_segment)
1479 329 : subxact_info_read(MyLogicalRepWorker->subid, xid);
1480 :
1481 361 : end_replication_step();
1482 361 : }
1483 :
1484 : /*
1485 : * Handle STREAM START message.
1486 : */
1487 : static void
948 akapila 1488 GIC 835 : apply_handle_stream_start(StringInfo s)
1489 : {
1490 : bool first_segment;
1491 : ParallelApplyWorkerInfo *winfo;
1492 : TransApplyAction apply_action;
1493 :
1494 : /* Save the message before it is consumed. */
90 akapila 1495 GNC 835 : StringInfoData original_msg = *s;
948 akapila 1496 ECB :
666 tgl 1497 GIC 835 : if (in_streamed_transaction)
666 tgl 1498 LBC 0 : ereport(ERROR,
1499 : (errcode(ERRCODE_PROTOCOL_VIOLATION),
666 tgl 1500 ECB : errmsg_internal("duplicate STREAM START message")));
948 akapila 1501 :
1502 : /* There must not be an active streaming transaction. */
82 akapila 1503 GNC 835 : Assert(!TransactionIdIsValid(stream_xid));
1504 :
1505 : /* notify handle methods we're processing a remote transaction */
948 akapila 1506 GIC 835 : in_streamed_transaction = true;
1507 :
1508 : /* extract XID of the top-level transaction */
1509 835 : stream_xid = logicalrep_read_stream_start(s, &first_segment);
1510 :
666 tgl 1511 CBC 835 : if (!TransactionIdIsValid(stream_xid))
666 tgl 1512 LBC 0 : ereport(ERROR,
1513 : (errcode(ERRCODE_PROTOCOL_VIOLATION),
666 tgl 1514 ECB : errmsg_internal("invalid transaction ID in streamed replication transaction")));
1515 :
397 akapila 1516 CBC 835 : set_apply_error_context_xact(stream_xid, InvalidXLogRecPtr);
1517 :
1518 : /* Try to allocate a worker for the streaming transaction. */
90 akapila 1519 GNC 835 : if (first_segment)
1520 82 : pa_allocate_worker(stream_xid);
1521 :
1522 835 : apply_action = get_transaction_apply_action(stream_xid, &winfo);
1523 :
1524 835 : switch (apply_action)
1525 : {
1526 341 : case TRANS_LEADER_SERIALIZE:
1527 :
1528 : /*
1529 : * Function stream_start_internal starts a transaction. This
1530 : * transaction will be committed on the stream stop unless it is a
1531 : * tablesync worker in which case it will be committed after
1532 : * processing all the messages. We need this transaction for
1533 : * handling the BufFile, used for serializing the streaming data
1534 : * and subxact info.
1535 : */
1536 341 : stream_start_internal(stream_xid, first_segment);
1537 341 : break;
1538 :
1539 241 : case TRANS_LEADER_SEND_TO_PARALLEL:
1540 241 : Assert(winfo);
584 akapila 1541 ECB :
1542 : /*
1543 : * Once we start serializing the changes, the parallel apply
1544 : * worker will wait for the leader to release the stream lock
1545 : * until the end of the transaction. So, we don't need to release
1546 : * the lock or increment the stream count in that case.
1547 : */
90 akapila 1548 GNC 241 : if (pa_send_data(winfo, s->len, s->data))
1549 : {
1550 : /*
1551 : * Unlock the shared object lock so that the parallel apply
1552 : * worker can continue to receive changes.
1553 : */
1554 237 : if (!first_segment)
1555 214 : pa_unlock_stream(winfo->shared->xid, AccessExclusiveLock);
1556 :
1557 : /*
1558 : * Increment the number of streaming blocks waiting to be
1559 : * processed by parallel apply worker.
1560 : */
1561 237 : pg_atomic_add_fetch_u32(&winfo->shared->pending_stream_count, 1);
1562 :
1563 : /* Cache the parallel apply worker for this transaction. */
1564 237 : pa_set_stream_apply_worker(winfo);
1565 237 : break;
1566 : }
1567 :
1568 : /*
1569 : * Switch to serialize mode when we are not able to send the
1570 : * change to parallel apply worker.
1571 : */
1572 4 : pa_switch_to_partial_serialize(winfo, !first_segment);
1573 :
1574 : /* fall through */
1575 15 : case TRANS_LEADER_PARTIAL_SERIALIZE:
1576 15 : Assert(winfo);
1577 :
1578 : /*
1579 : * Open the spool file unless it was already opened when switching
1580 : * to serialize mode. The transaction started in
1581 : * stream_start_internal will be committed on the stream stop.
1582 : */
1583 15 : if (apply_action != TRANS_LEADER_SEND_TO_PARALLEL)
1584 11 : stream_start_internal(stream_xid, first_segment);
1585 :
1586 15 : stream_write_change(LOGICAL_REP_MSG_STREAM_START, &original_msg);
1587 :
1588 : /* Cache the parallel apply worker for this transaction. */
1589 15 : pa_set_stream_apply_worker(winfo);
1590 15 : break;
1591 :
1592 242 : case TRANS_PARALLEL_APPLY:
1593 242 : if (first_segment)
1594 : {
1595 : /* Hold the lock until the end of the transaction. */
1596 27 : pa_lock_transaction(MyParallelShared->xid, AccessExclusiveLock);
1597 27 : pa_set_xact_state(MyParallelShared, PARALLEL_TRANS_STARTED);
1598 :
1599 : /*
1600 : * Signal the leader apply worker, as it may be waiting for
1601 : * us.
1602 : */
1603 27 : logicalrep_worker_wakeup(MyLogicalRepWorker->subid, InvalidOid);
1604 : }
1605 :
1606 242 : parallel_stream_nchanges = 0;
1607 242 : break;
1608 :
90 akapila 1609 UNC 0 : default:
82 1610 0 : elog(ERROR, "unexpected apply action: %d", (int) apply_action);
1611 : break;
1612 : }
90 akapila 1613 ECB :
90 akapila 1614 GIC 835 : pgstat_report_activity(STATE_RUNNING, NULL);
948 akapila 1615 CBC 835 : }
1616 :
948 akapila 1617 ECB : /*
1618 : * Update the information about subxacts and close the file.
1619 : *
1620 : * This function should be called when the stream_start_internal function has
1621 : * been called.
1622 : */
1623 : void
90 akapila 1624 GNC 361 : stream_stop_internal(TransactionId xid)
948 akapila 1625 ECB : {
1626 : /*
1627 : * Serialize information about subxacts for the toplevel transaction, then
1628 : * close the stream messages spool file.
1629 : */
90 akapila 1630 GNC 361 : subxact_info_write(MyLogicalRepWorker->subid, xid);
948 akapila 1631 GIC 361 : stream_close_file();
1632 :
1633 : /* We must be in a valid transaction state */
1634 361 : Assert(IsTransactionState());
1635 :
1636 : /* Commit the per-stream transaction */
786 1637 361 : CommitTransactionCommand();
1638 :
1639 : /* Reset per-stream context */
948 akapila 1640 CBC 361 : MemoryContextReset(LogicalStreamingContext);
948 akapila 1641 GNC 361 : }
1642 :
1643 : /*
1644 : * Handle STREAM STOP message.
1645 : */
1646 : static void
90 1647 834 : apply_handle_stream_stop(StringInfo s)
1648 : {
1649 : ParallelApplyWorkerInfo *winfo;
1650 : TransApplyAction apply_action;
1651 :
1652 834 : if (!in_streamed_transaction)
666 tgl 1653 UNC 0 : ereport(ERROR,
1654 : (errcode(ERRCODE_PROTOCOL_VIOLATION),
1655 : errmsg_internal("STREAM STOP message without STREAM START")));
1656 :
90 akapila 1657 GNC 834 : apply_action = get_transaction_apply_action(stream_xid, &winfo);
1658 :
1659 834 : switch (apply_action)
1660 : {
1661 341 : case TRANS_LEADER_SERIALIZE:
1662 341 : stream_stop_internal(stream_xid);
1663 341 : break;
1664 :
1665 237 : case TRANS_LEADER_SEND_TO_PARALLEL:
1666 237 : Assert(winfo);
1667 :
1668 : /*
1669 : * Lock before sending the STREAM_STOP message so that the leader
1670 : * can hold the lock first and the parallel apply worker will wait
1671 : * for leader to release the lock. See Locking Considerations atop
1672 : * applyparallelworker.c.
1673 : */
1674 237 : pa_lock_stream(winfo->shared->xid, AccessExclusiveLock);
1675 :
1676 237 : if (pa_send_data(winfo, s->len, s->data))
1677 : {
1678 237 : pa_set_stream_apply_worker(NULL);
1679 237 : break;
1680 : }
1681 :
1682 : /*
1683 : * Switch to serialize mode when we are not able to send the
1684 : * change to parallel apply worker.
1685 : */
90 akapila 1686 UNC 0 : pa_switch_to_partial_serialize(winfo, true);
1687 :
1688 : /* fall through */
90 akapila 1689 GNC 15 : case TRANS_LEADER_PARTIAL_SERIALIZE:
1690 15 : stream_write_change(LOGICAL_REP_MSG_STREAM_STOP, s);
1691 15 : stream_stop_internal(stream_xid);
1692 15 : pa_set_stream_apply_worker(NULL);
1693 15 : break;
1694 :
1695 241 : case TRANS_PARALLEL_APPLY:
1696 241 : elog(DEBUG1, "applied %u changes in the streaming chunk",
1697 : parallel_stream_nchanges);
1698 :
1699 : /*
1700 : * By the time parallel apply worker is processing the changes in
1701 : * the current streaming block, the leader apply worker may have
1702 : * sent multiple streaming blocks. This can lead to parallel apply
1703 : * worker start waiting even when there are more chunk of streams
1704 : * in the queue. So, try to lock only if there is no message left
1705 : * in the queue. See Locking Considerations atop
1706 : * applyparallelworker.c.
1707 : *
1708 : * Note that here we have a race condition where we can start
1709 : * waiting even when there are pending streaming chunks. This can
1710 : * happen if the leader sends another streaming block and acquires
1711 : * the stream lock again after the parallel apply worker checks
1712 : * that there is no pending streaming block and before it actually
1713 : * starts waiting on a lock. We can handle this case by not
1714 : * allowing the leader to increment the stream block count during
1715 : * the time parallel apply worker acquires the lock but it is not
1716 : * clear whether that is worth the complexity.
1717 : *
1718 : * Now, if this missed chunk contains rollback to savepoint, then
1719 : * there is a risk of deadlock which probably shouldn't happen
1720 : * after restart.
1721 : */
1722 241 : pa_decr_and_wait_stream_block();
1723 239 : break;
1724 :
90 akapila 1725 UNC 0 : default:
82 1726 0 : elog(ERROR, "unexpected apply action: %d", (int) apply_action);
1727 : break;
1728 : }
1729 :
90 akapila 1730 GNC 832 : in_streamed_transaction = false;
82 1731 832 : stream_xid = InvalidTransactionId;
1732 :
1733 : /*
1734 : * The parallel apply worker could be in a transaction in which case we
1735 : * need to report the state as STATE_IDLEINTRANSACTION.
1736 : */
90 1737 832 : if (IsTransactionOrTransactionBlock())
1738 239 : pgstat_report_activity(STATE_IDLEINTRANSACTION, NULL);
1739 : else
1740 593 : pgstat_report_activity(STATE_IDLE, NULL);
1741 :
90 akapila 1742 GIC 832 : reset_apply_error_context_info();
1743 832 : }
948 akapila 1744 ECB :
90 1745 : /*
1746 : * Helper function to handle STREAM ABORT message when the transaction was
1747 : * serialized to file.
1748 : */
1749 : static void
90 akapila 1750 GNC 14 : stream_abort_internal(TransactionId xid, TransactionId subxid)
1751 : {
1752 : /*
1753 : * If the two XIDs are the same, it's in fact abort of toplevel xact, so
1754 : * just delete the files with serialized info.
948 akapila 1755 ECB : */
948 akapila 1756 CBC 14 : if (xid == subxid)
1757 1 : stream_cleanup_files(MyLogicalRepWorker->subid, xid);
948 akapila 1758 ECB : else
1759 : {
1760 : /*
1761 : * OK, so it's a subxact. We need to read the subxact file for the
1762 : * toplevel transaction, determine the offset tracked for the subxact,
1763 : * and truncate the file with changes. We also remove the subxacts
1764 : * with higher offsets (or rather higher XIDs).
1765 : *
1766 : * We intentionally scan the array from the tail, because we're likely
1767 : * aborting a change for the most recent subtransactions.
1768 : *
1769 : * We can't use the binary search here as subxact XIDs won't
1770 : * necessarily arrive in sorted order, consider the case where we have
1771 : * released the savepoint for multiple subtransactions and then
1772 : * performed rollback to savepoint for one of the earlier
1773 : * sub-transaction.
1774 : */
1775 : int64 i;
1776 : int64 subidx;
1777 : BufFile *fd;
948 akapila 1778 GIC 13 : bool found = false;
1779 : char path[MAXPGPATH];
948 akapila 1780 ECB :
948 akapila 1781 GIC 13 : subidx = -1;
668 tgl 1782 13 : begin_replication_step();
948 akapila 1783 13 : subxact_info_read(MyLogicalRepWorker->subid, xid);
1784 :
948 akapila 1785 CBC 15 : for (i = subxact_data.nsubxacts; i > 0; i--)
1786 : {
1787 11 : if (subxact_data.subxacts[i - 1].xid == subxid)
948 akapila 1788 EUB : {
948 akapila 1789 GIC 9 : subidx = (i - 1);
1790 9 : found = true;
1791 9 : break;
1792 : }
948 akapila 1793 ECB : }
948 akapila 1794 EUB :
1795 : /*
1796 : * If it's an empty sub-transaction then we will not find the subxid
1797 : * here so just cleanup the subxact info and return.
948 akapila 1798 ECB : */
948 akapila 1799 CBC 13 : if (!found)
1800 : {
948 akapila 1801 ECB : /* Cleanup the subxact info */
948 akapila 1802 GIC 4 : cleanup_subxact_info();
668 tgl 1803 CBC 4 : end_replication_step();
786 akapila 1804 GIC 4 : CommitTransactionCommand();
948 1805 4 : return;
1806 : }
1807 :
1808 : /* open the changes file */
90 1809 9 : changes_filename(path, MyLogicalRepWorker->subid, xid);
90 akapila 1810 CBC 9 : fd = BufFileOpenFileSet(MyLogicalRepWorker->stream_fileset, path,
1811 : O_RDWR, false);
1812 :
1813 : /* OK, truncate the file at the right offset */
1814 9 : BufFileTruncateFileSet(fd, subxact_data.subxacts[subidx].fileno,
90 akapila 1815 GIC 9 : subxact_data.subxacts[subidx].offset);
90 akapila 1816 CBC 9 : BufFileClose(fd);
1817 :
90 akapila 1818 ECB : /* discard the subxacts added later */
90 akapila 1819 GIC 9 : subxact_data.nsubxacts = subidx;
90 akapila 1820 ECB :
1821 : /* write the updated subxact list */
90 akapila 1822 GIC 9 : subxact_info_write(MyLogicalRepWorker->subid, xid);
90 akapila 1823 ECB :
90 akapila 1824 GIC 9 : end_replication_step();
90 akapila 1825 CBC 9 : CommitTransactionCommand();
90 akapila 1826 ECB : }
1827 : }
1828 :
1829 : /*
1830 : * Handle STREAM ABORT message.
1831 : */
1832 : static void
90 akapila 1833 GNC 38 : apply_handle_stream_abort(StringInfo s)
1834 : {
1835 : TransactionId xid;
1836 : TransactionId subxid;
1837 : LogicalRepStreamAbortData abort_data;
1838 : ParallelApplyWorkerInfo *winfo;
1839 : TransApplyAction apply_action;
1840 :
1841 : /* Save the message before it is consumed. */
1842 38 : StringInfoData original_msg = *s;
1843 : bool toplevel_xact;
1844 :
1845 38 : if (in_streamed_transaction)
90 akapila 1846 UNC 0 : ereport(ERROR,
1847 : (errcode(ERRCODE_PROTOCOL_VIOLATION),
1848 : errmsg_internal("STREAM ABORT message without STREAM STOP")));
1849 :
1850 : /* We receive abort information only when we can apply in parallel. */
90 akapila 1851 GNC 38 : logicalrep_read_stream_abort(s, &abort_data,
1852 38 : MyLogicalRepWorker->parallel_apply);
1853 :
1854 38 : xid = abort_data.xid;
1855 38 : subxid = abort_data.subxid;
1856 38 : toplevel_xact = (xid == subxid);
1857 :
1858 38 : set_apply_error_context_xact(subxid, abort_data.abort_lsn);
1859 :
1860 38 : apply_action = get_transaction_apply_action(xid, &winfo);
1861 :
1862 38 : switch (apply_action)
1863 : {
82 1864 14 : case TRANS_LEADER_APPLY:
1865 :
1866 : /*
1867 : * We are in the leader apply worker and the transaction has been
1868 : * serialized to file.
1869 : */
90 1870 14 : stream_abort_internal(xid, subxid);
1871 :
1872 14 : elog(DEBUG1, "finished processing the STREAM ABORT command");
1873 14 : break;
1874 :
1875 10 : case TRANS_LEADER_SEND_TO_PARALLEL:
1876 10 : Assert(winfo);
1877 :
1878 : /*
1879 : * For the case of aborting the subtransaction, we increment the
1880 : * number of streaming blocks and take the lock again before
1881 : * sending the STREAM_ABORT to ensure that the parallel apply
1882 : * worker will wait on the lock for the next set of changes after
1883 : * processing the STREAM_ABORT message if it is not already
1884 : * waiting for STREAM_STOP message.
1885 : *
1886 : * It is important to perform this locking before sending the
1887 : * STREAM_ABORT message so that the leader can hold the lock first
1888 : * and the parallel apply worker will wait for the leader to
1889 : * release the lock. This is the same as what we do in
1890 : * apply_handle_stream_stop. See Locking Considerations atop
1891 : * applyparallelworker.c.
1892 : */
1893 10 : if (!toplevel_xact)
1894 : {
1895 9 : pa_unlock_stream(xid, AccessExclusiveLock);
1896 9 : pg_atomic_add_fetch_u32(&winfo->shared->pending_stream_count, 1);
1897 9 : pa_lock_stream(xid, AccessExclusiveLock);
1898 : }
1899 :
1900 10 : if (pa_send_data(winfo, s->len, s->data))
1901 : {
1902 : /*
1903 : * Unlike STREAM_COMMIT and STREAM_PREPARE, we don't need to
1904 : * wait here for the parallel apply worker to finish as that
1905 : * is not required to maintain the commit order and won't have
1906 : * the risk of failures due to transaction dependencies and
1907 : * deadlocks. However, it is possible that before the parallel
1908 : * worker finishes and we clear the worker info, the xid
1909 : * wraparound happens on the upstream and a new transaction
1910 : * with the same xid can appear and that can lead to duplicate
1911 : * entries in ParallelApplyTxnHash. Yet another problem could
1912 : * be that we may have serialized the changes in partial
1913 : * serialize mode and the file containing xact changes may
1914 : * already exist, and after xid wraparound trying to create
1915 : * the file for the same xid can lead to an error. To avoid
1916 : * these problems, we decide to wait for the aborts to finish.
1917 : *
1918 : * Note, it is okay to not update the flush location position
1919 : * for aborts as in worst case that means such a transaction
1920 : * won't be sent again after restart.
1921 : */
1922 10 : if (toplevel_xact)
1923 1 : pa_xact_finish(winfo, InvalidXLogRecPtr);
1924 :
1925 10 : break;
1926 : }
1927 :
1928 : /*
1929 : * Switch to serialize mode when we are not able to send the
1930 : * change to parallel apply worker.
1931 : */
90 akapila 1932 UNC 0 : pa_switch_to_partial_serialize(winfo, true);
1933 :
1934 : /* fall through */
90 akapila 1935 GNC 2 : case TRANS_LEADER_PARTIAL_SERIALIZE:
1936 2 : Assert(winfo);
1937 :
1938 : /*
1939 : * Parallel apply worker might have applied some changes, so write
1940 : * the STREAM_ABORT message so that it can rollback the
1941 : * subtransaction if needed.
1942 : */
1943 2 : stream_open_and_write_change(xid, LOGICAL_REP_MSG_STREAM_ABORT,
1944 : &original_msg);
1945 :
1946 2 : if (toplevel_xact)
1947 : {
1948 1 : pa_set_fileset_state(winfo->shared, FS_SERIALIZE_DONE);
1949 1 : pa_xact_finish(winfo, InvalidXLogRecPtr);
1950 : }
1951 2 : break;
1952 :
1953 12 : case TRANS_PARALLEL_APPLY:
1954 :
1955 : /*
1956 : * If the parallel apply worker is applying spooled messages then
1957 : * close the file before aborting.
1958 : */
1959 12 : if (toplevel_xact && stream_fd)
1960 1 : stream_close_file();
1961 :
1962 12 : pa_stream_abort(&abort_data);
1963 :
1964 : /*
1965 : * We need to wait after processing rollback to savepoint for the
1966 : * next set of changes.
1967 : *
1968 : * We have a race condition here due to which we can start waiting
1969 : * here when there are more chunk of streams in the queue. See
1970 : * apply_handle_stream_stop.
1971 : */
1972 12 : if (!toplevel_xact)
1973 10 : pa_decr_and_wait_stream_block();
1974 :
1975 12 : elog(DEBUG1, "finished processing the STREAM ABORT command");
1976 12 : break;
1977 :
90 akapila 1978 UNC 0 : default:
82 1979 0 : elog(ERROR, "unexpected apply action: %d", (int) apply_action);
1980 : break;
1981 : }
1982 :
90 akapila 1983 CBC 38 : reset_apply_error_context_info();
1984 38 : }
1985 :
90 akapila 1986 ECB : /*
1987 : * Ensure that the passed location is fileset's end.
1988 : */
1989 : static void
90 akapila 1990 GNC 4 : ensure_last_message(FileSet *stream_fileset, TransactionId xid, int fileno,
1991 : off_t offset)
1992 : {
1993 : char path[MAXPGPATH];
1994 : BufFile *fd;
1995 : int last_fileno;
1996 : off_t last_offset;
1997 :
1998 4 : Assert(!IsTransactionState());
1999 :
2000 4 : begin_replication_step();
2001 :
2002 4 : changes_filename(path, MyLogicalRepWorker->subid, xid);
2003 :
2004 4 : fd = BufFileOpenFileSet(stream_fileset, path, O_RDONLY, false);
2005 :
2006 4 : BufFileSeek(fd, 0, 0, SEEK_END);
2007 4 : BufFileTell(fd, &last_fileno, &last_offset);
2008 :
2009 4 : BufFileClose(fd);
2010 :
2011 4 : end_replication_step();
2012 :
2013 4 : if (last_fileno != fileno || last_offset != offset)
90 akapila 2014 UNC 0 : elog(ERROR, "unexpected message left in streaming transaction's changes file \"%s\"",
2015 : path);
948 akapila 2016 GNC 4 : }
2017 :
2018 : /*
2019 : * Common spoolfile processing.
2020 : */
2021 : void
90 2022 31 : apply_spooled_messages(FileSet *stream_fileset, TransactionId xid,
2023 : XLogRecPtr lsn)
2024 : {
2025 : StringInfoData s2;
2026 : int nchanges;
2027 : char path[MAXPGPATH];
948 akapila 2028 GIC 31 : char *buffer = NULL;
2029 : MemoryContext oldcxt;
2030 : ResourceOwner oldowner;
2031 : int fileno;
2032 : off_t offset;
2033 :
90 akapila 2034 GNC 31 : if (!am_parallel_apply_worker())
2035 27 : maybe_start_skipping_changes(lsn);
383 akapila 2036 ECB :
668 tgl 2037 : /* Make sure we have an open transaction */
668 tgl 2038 GIC 31 : begin_replication_step();
948 akapila 2039 ECB :
2040 : /*
2041 : * Allocate file handle and memory required to process all the messages in
2042 : * TopTransactionContext to avoid them getting reset after each message is
2043 : * processed.
2044 : */
948 akapila 2045 GIC 31 : oldcxt = MemoryContextSwitchTo(TopTransactionContext);
948 akapila 2046 ECB :
613 2047 : /* Open the spool file for the committed/prepared transaction */
948 akapila 2048 GIC 31 : changes_filename(path, MyLogicalRepWorker->subid, xid);
948 akapila 2049 CBC 31 : elog(DEBUG1, "replaying changes from file \"%s\"", path);
2050 :
2051 : /*
2052 : * Make sure the file is owned by the toplevel transaction so that the
2053 : * file will not be accidentally closed when aborting a subtransaction.
2054 : */
90 akapila 2055 GNC 31 : oldowner = CurrentResourceOwner;
2056 31 : CurrentResourceOwner = TopTransactionResourceOwner;
2057 :
2058 31 : stream_fd = BufFileOpenFileSet(stream_fileset, path, O_RDONLY, false);
2059 :
2060 31 : CurrentResourceOwner = oldowner;
2061 :
948 akapila 2062 GIC 31 : buffer = palloc(BLCKSZ);
948 akapila 2063 CBC 31 : initStringInfo(&s2);
948 akapila 2064 ECB :
948 akapila 2065 GIC 31 : MemoryContextSwitchTo(oldcxt);
948 akapila 2066 ECB :
619 akapila 2067 GIC 31 : remote_final_lsn = lsn;
2068 :
948 akapila 2069 ECB : /*
2070 : * Make sure the handle apply_dispatch methods are aware we're in a remote
2071 : * transaction.
2072 : */
948 akapila 2073 CBC 31 : in_remote_transaction = true;
948 akapila 2074 GIC 31 : pgstat_report_activity(STATE_RUNNING, NULL);
948 akapila 2075 ECB :
668 tgl 2076 GIC 31 : end_replication_step();
668 tgl 2077 ECB :
948 akapila 2078 : /*
2079 : * Read the entries one by one and pass them through the same logic as in
2080 : * apply_dispatch.
2081 : */
948 akapila 2082 CBC 31 : nchanges = 0;
948 akapila 2083 ECB : while (true)
948 akapila 2084 GIC 88470 : {
83 peter 2085 EUB : size_t nbytes;
948 akapila 2086 : int len;
2087 :
948 akapila 2088 GIC 88501 : CHECK_FOR_INTERRUPTS();
2089 :
948 akapila 2090 ECB : /* read length of the on-disk record */
83 peter 2091 GNC 88501 : nbytes = BufFileReadMaybeEOF(stream_fd, &len, sizeof(len), true);
2092 :
948 akapila 2093 ECB : /* have we reached end of the file? */
948 akapila 2094 GIC 88501 : if (nbytes == 0)
2095 26 : break;
2096 :
2097 : /* do we have a correct length? */
666 tgl 2098 CBC 88475 : if (len <= 0)
666 tgl 2099 LBC 0 : elog(ERROR, "incorrect length %d in streaming transaction's changes file \"%s\"",
2100 : len, path);
2101 :
2102 : /* make sure we have sufficiently large buffer */
948 akapila 2103 GIC 88475 : buffer = repalloc(buffer, len);
2104 :
2105 : /* and finally read the data into the buffer */
83 peter 2106 GNC 88475 : BufFileReadExact(stream_fd, buffer, len);
2107 :
90 akapila 2108 88475 : BufFileTell(stream_fd, &fileno, &offset);
2109 :
948 akapila 2110 ECB : /* copy the buffer to the stringinfo and call apply_dispatch */
948 akapila 2111 CBC 88475 : resetStringInfo(&s2);
2112 88475 : appendBinaryStringInfo(&s2, buffer, len);
948 akapila 2113 EUB :
2114 : /* Ensure we are reading the data into our memory context. */
948 akapila 2115 GIC 88475 : oldcxt = MemoryContextSwitchTo(ApplyMessageContext);
948 akapila 2116 ECB :
948 akapila 2117 GIC 88475 : apply_dispatch(&s2);
2118 :
2119 88474 : MemoryContextReset(ApplyMessageContext);
2120 :
2121 88474 : MemoryContextSwitchTo(oldcxt);
2122 :
2123 88474 : nchanges++;
2124 :
2125 : /*
2126 : * It is possible the file has been closed because we have processed
2127 : * the transaction end message like stream_commit in which case that
2128 : * must be the last message.
2129 : */
90 akapila 2130 GNC 88474 : if (!stream_fd)
2131 : {
2132 4 : ensure_last_message(stream_fileset, xid, fileno, offset);
2133 4 : break;
2134 : }
2135 :
948 akapila 2136 CBC 88470 : if (nchanges % 1000 == 0)
666 tgl 2137 GIC 84 : elog(DEBUG1, "replayed %d changes from file \"%s\"",
948 akapila 2138 ECB : nchanges, path);
2139 : }
2140 :
90 akapila 2141 GNC 30 : if (stream_fd)
2142 26 : stream_close_file();
2143 :
948 akapila 2144 GIC 30 : elog(DEBUG1, "replayed %d (all) changes from file \"%s\"",
2145 : nchanges, path);
948 akapila 2146 ECB :
619 akapila 2147 GIC 30 : return;
2148 : }
2149 :
619 akapila 2150 ECB : /*
2151 : * Handle STREAM COMMIT message.
2152 : */
2153 : static void
619 akapila 2154 GIC 61 : apply_handle_stream_commit(StringInfo s)
619 akapila 2155 ECB : {
2156 : TransactionId xid;
2157 : LogicalRepCommitData commit_data;
2158 : ParallelApplyWorkerInfo *winfo;
2159 : TransApplyAction apply_action;
2160 :
2161 : /* Save the message before it is consumed. */
90 akapila 2162 GNC 61 : StringInfoData original_msg = *s;
2163 :
619 akapila 2164 CBC 61 : if (in_streamed_transaction)
619 akapila 2165 UIC 0 : ereport(ERROR,
2166 : (errcode(ERRCODE_PROTOCOL_VIOLATION),
619 akapila 2167 ECB : errmsg_internal("STREAM COMMIT message without STREAM STOP")));
2168 :
619 akapila 2169 GIC 61 : xid = logicalrep_read_stream_commit(s, &commit_data);
397 akapila 2170 CBC 61 : set_apply_error_context_xact(xid, commit_data.commit_lsn);
619 akapila 2171 ECB :
90 akapila 2172 GNC 61 : apply_action = get_transaction_apply_action(xid, &winfo);
2173 :
2174 61 : switch (apply_action)
2175 : {
82 2176 22 : case TRANS_LEADER_APPLY:
2177 :
2178 : /*
2179 : * The transaction has been serialized to file, so replay all the
2180 : * spooled operations.
2181 : */
90 2182 22 : apply_spooled_messages(MyLogicalRepWorker->stream_fileset, xid,
2183 : commit_data.commit_lsn);
90 akapila 2184 ECB :
90 akapila 2185 GNC 21 : apply_handle_commit_internal(&commit_data);
2186 :
2187 : /* Unlink the files with serialized changes and subxact info. */
2188 21 : stream_cleanup_files(MyLogicalRepWorker->subid, xid);
2189 :
2190 21 : elog(DEBUG1, "finished processing the STREAM COMMIT command");
2191 21 : break;
2192 :
2193 18 : case TRANS_LEADER_SEND_TO_PARALLEL:
2194 18 : Assert(winfo);
2195 :
2196 18 : if (pa_send_data(winfo, s->len, s->data))
2197 : {
2198 : /* Finish processing the streaming transaction. */
2199 18 : pa_xact_finish(winfo, commit_data.end_lsn);
2200 17 : break;
2201 : }
2202 :
2203 : /*
2204 : * Switch to serialize mode when we are not able to send the
2205 : * change to parallel apply worker.
2206 : */
90 akapila 2207 UNC 0 : pa_switch_to_partial_serialize(winfo, true);
2208 :
2209 : /* fall through */
90 akapila 2210 GNC 2 : case TRANS_LEADER_PARTIAL_SERIALIZE:
2211 2 : Assert(winfo);
2212 :
2213 2 : stream_open_and_write_change(xid, LOGICAL_REP_MSG_STREAM_COMMIT,
2214 : &original_msg);
2215 :
2216 2 : pa_set_fileset_state(winfo->shared, FS_SERIALIZE_DONE);
2217 :
2218 : /* Finish processing the streaming transaction. */
2219 2 : pa_xact_finish(winfo, commit_data.end_lsn);
2220 2 : break;
2221 :
2222 19 : case TRANS_PARALLEL_APPLY:
2223 :
2224 : /*
2225 : * If the parallel apply worker is applying spooled messages then
2226 : * close the file before committing.
2227 : */
2228 19 : if (stream_fd)
2229 2 : stream_close_file();
2230 :
2231 19 : apply_handle_commit_internal(&commit_data);
2232 :
2233 19 : MyParallelShared->last_commit_end = XactLastCommitEnd;
2234 :
2235 : /*
2236 : * It is important to set the transaction state as finished before
2237 : * releasing the lock. See pa_wait_for_xact_finish.
2238 : */
2239 19 : pa_set_xact_state(MyParallelShared, PARALLEL_TRANS_FINISHED);
2240 19 : pa_unlock_transaction(xid, AccessExclusiveLock);
2241 :
2242 19 : pa_reset_subtrans();
2243 :
2244 19 : elog(DEBUG1, "finished processing the STREAM COMMIT command");
2245 19 : break;
2246 :
90 akapila 2247 UNC 0 : default:
82 2248 0 : elog(ERROR, "unexpected apply action: %d", (int) apply_action);
2249 : break;
2250 : }
2251 :
2252 : /* Process any tables that are being synchronized in parallel. */
863 akapila 2253 GIC 59 : process_syncing_tables(commit_data.end_lsn);
2254 :
948 akapila 2255 CBC 59 : pgstat_report_activity(STATE_IDLE, NULL);
2256 :
590 2257 59 : reset_apply_error_context_info();
948 akapila 2258 GBC 59 : }
2259 :
2260 : /*
2261 : * Helper function for apply_handle_commit and apply_handle_stream_commit.
2262 : */
863 akapila 2263 ECB : static void
618 akapila 2264 GIC 395 : apply_handle_commit_internal(LogicalRepCommitData *commit_data)
2265 : {
383 akapila 2266 CBC 395 : if (is_skipping_changes())
2267 : {
383 akapila 2268 GIC 2 : stop_skipping_changes();
383 akapila 2269 ECB :
2270 : /*
2271 : * Start a new transaction to clear the subskiplsn, if not started
383 akapila 2272 EUB : * yet.
2273 : */
383 akapila 2274 GIC 2 : if (!IsTransactionState())
2275 1 : StartTransactionCommand();
383 akapila 2276 ECB : }
2277 :
786 akapila 2278 GIC 395 : if (IsTransactionState())
863 akapila 2279 ECB : {
383 2280 : /*
2281 : * The transaction is either non-empty or skipped, so we clear the
2282 : * subskiplsn.
2283 : */
383 akapila 2284 CBC 395 : clear_subscription_skip_lsn(commit_data->commit_lsn);
2285 :
863 akapila 2286 ECB : /*
2287 : * Update origin state so we can restart streaming from correct
2288 : * position in case of crash.
2289 : */
863 akapila 2290 GIC 395 : replorigin_session_origin_lsn = commit_data->end_lsn;
2291 395 : replorigin_session_origin_timestamp = commit_data->committime;
2292 :
2293 395 : CommitTransactionCommand();
2294 :
90 akapila 2295 GNC 395 : if (IsTransactionBlock())
2296 : {
2297 4 : EndTransactionBlock(false);
2298 4 : CommitTransactionCommand();
2299 : }
2300 :
863 akapila 2301 GIC 395 : pgstat_report_stat(false);
2302 :
90 akapila 2303 GNC 395 : store_flush_position(commit_data->end_lsn, XactLastCommitEnd);
863 akapila 2304 ECB : }
2305 : else
2306 : {
2307 : /* Process any invalidation messages that might have accumulated. */
863 akapila 2308 UIC 0 : AcceptInvalidationMessages();
2309 0 : maybe_reread_subscription();
2310 : }
2311 :
863 akapila 2312 GIC 395 : in_remote_transaction = false;
2313 395 : }
2314 :
2271 peter_e 2315 ECB : /*
2316 : * Handle RELATION message.
2317 : *
2318 : * Note we don't do validation against local schema here. The validation
2319 : * against local schema is postponed until first change for given relation
2320 : * comes as we only care about it when applying changes for it anyway and we
2321 : * do less locking this way.
2322 : */
2323 : static void
2271 peter_e 2324 GIC 390 : apply_handle_relation(StringInfo s)
2325 : {
2326 : LogicalRepRelation *rel;
2327 :
864 akapila 2328 CBC 390 : if (handle_streamed_transaction(LOGICAL_REP_MSG_RELATION, s))
948 akapila 2329 GIC 36 : return;
2330 :
2271 peter_e 2331 CBC 354 : rel = logicalrep_read_rel(s);
2332 354 : logicalrep_relmap_update(rel);
2333 :
2334 : /* Also reset all entries in the partition map that refer to remoterel. */
297 akapila 2335 GIC 354 : logicalrep_partmap_reset_relmap(rel);
2336 : }
2337 :
2338 : /*
2271 peter_e 2339 ECB : * Handle TYPE message.
2340 : *
2341 : * This implementation pays no attention to TYPE messages; we expect the user
515 tgl 2342 : * to have set things up so that the incoming data is acceptable to the input
2343 : * functions for the locally subscribed tables. Hence, we just read and
2344 : * discard the message.
2345 : */
2346 : static void
2271 peter_e 2347 GIC 18 : apply_handle_type(StringInfo s)
2348 : {
2349 : LogicalRepTyp typ;
2271 peter_e 2350 ECB :
864 akapila 2351 CBC 18 : if (handle_streamed_transaction(LOGICAL_REP_MSG_TYPE, s))
948 akapila 2352 UIC 0 : return;
948 akapila 2353 ECB :
2271 peter_e 2354 GIC 18 : logicalrep_read_typ(s, &typ);
2355 : }
2271 peter_e 2356 ECB :
2357 : /*
457 jdavis 2358 EUB : * Check that we (the subscription owner) have sufficient privileges on the
2359 : * target relation to perform the given operation.
2360 : */
2361 : static void
457 jdavis 2362 GIC 220052 : TargetPrivilegesCheck(Relation rel, AclMode mode)
457 jdavis 2363 ECB : {
332 tgl 2364 : Oid relid;
2365 : AclResult aclresult;
2366 :
457 jdavis 2367 GIC 220052 : relid = RelationGetRelid(rel);
2368 220052 : aclresult = pg_class_aclcheck(relid, GetUserId(), mode);
2369 220052 : if (aclresult != ACLCHECK_OK)
2370 7 : aclcheck_error(aclresult,
2371 7 : get_relkind_objtype(rel->rd_rel->relkind),
2372 7 : get_rel_name(relid));
457 jdavis 2373 ECB :
2374 : /*
2375 : * We lack the infrastructure to honor RLS policies. It might be possible
2376 : * to add such infrastructure here, but tablesync workers lack it, too, so
2377 : * we don't bother. RLS does not ordinarily apply to TRUNCATE commands,
2378 : * but it seems dangerous to replicate a TRUNCATE and then refuse to
2379 : * replicate subsequent INSERTs, so we forbid all commands the same.
2380 : */
457 jdavis 2381 GIC 220045 : if (check_enable_rls(relid, InvalidOid, false) == RLS_ENABLED)
2382 3 : ereport(ERROR,
457 jdavis 2383 ECB : (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
2384 : errmsg("user \"%s\" cannot replicate into relation with row-level security enabled: \"%s\"",
2385 : GetUserNameFromId(GetUserId(), true),
2386 : RelationGetRelationName(rel))));
457 jdavis 2387 GIC 220042 : }
2388 :
2271 peter_e 2389 ECB : /*
2390 : * Handle INSERT message.
2391 : */
2392 :
2393 : static void
2271 peter_e 2394 GIC 185646 : apply_handle_insert(StringInfo s)
2395 : {
2271 peter_e 2396 ECB : LogicalRepRelMapEntry *rel;
2397 : LogicalRepTupleData newtup;
2398 : LogicalRepRelId relid;
2399 : UserContext ucxt;
2400 : ApplyExecutionData *edata;
2401 : EState *estate;
2153 bruce 2402 : TupleTableSlot *remoteslot;
2153 bruce 2403 EUB : MemoryContext oldctx;
2404 : bool run_as_owner;
2405 :
2406 : /*
2407 : * Quick return if we are skipping data modification changes or handling
383 akapila 2408 ECB : * streamed transactions.
2409 : */
383 akapila 2410 CBC 361290 : if (is_skipping_changes() ||
383 akapila 2411 GIC 175644 : handle_streamed_transaction(LOGICAL_REP_MSG_INSERT, s))
948 akapila 2412 CBC 110046 : return;
948 akapila 2413 ECB :
668 tgl 2414 CBC 75638 : begin_replication_step();
2415 :
2271 peter_e 2416 75638 : relid = logicalrep_read_insert(s, &newtup);
2417 75638 : rel = logicalrep_rel_open(relid, RowExclusiveLock);
2208 peter_e 2418 GIC 75631 : if (!should_apply_changes_for_rel(rel))
2419 : {
2420 : /*
2421 : * The relation can't become interesting in the middle of the
2422 : * transaction so it's safe to unlock it.
2423 : */
2424 38 : logicalrep_rel_close(rel, RowExclusiveLock);
668 tgl 2425 CBC 38 : end_replication_step();
2208 peter_e 2426 GIC 38 : return;
2208 peter_e 2427 ECB : }
2428 :
2429 : /*
2430 : * Make sure that any user-supplied code runs as the table owner, unless
2431 : * the user has opted out of that behavior.
2432 : */
5 rhaas 2433 GNC 75593 : run_as_owner = MySubscription->runasowner;
2434 75593 : if (!run_as_owner)
2435 75585 : SwitchToUntrustedUser(rel->localrel->rd_rel->relowner, &ucxt);
2436 :
590 akapila 2437 ECB : /* Set relation for error callback */
590 akapila 2438 CBC 75593 : apply_error_callback_arg.rel = rel;
2439 :
2440 : /* Initialize the executor state. */
687 tgl 2441 GIC 75593 : edata = create_edata_for_relation(rel);
2442 75593 : estate = edata->estate;
1878 andres 2443 75593 : remoteslot = ExecInitExtraTupleSlot(estate,
1606 2444 75593 : RelationGetDescr(rel->localrel),
1500 andres 2445 EUB : &TTSOpsVirtual);
2446 :
2447 : /* Process and store remote tuple in the slot */
2271 peter_e 2448 CBC 75593 : oldctx = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
995 tgl 2449 75593 : slot_store_data(remoteslot, rel, &newtup);
2271 peter_e 2450 75593 : slot_fill_defaults(rel, estate, remoteslot);
2451 75593 : MemoryContextSwitchTo(oldctx);
2271 peter_e 2452 ECB :
2453 : /* For a partitioned table, insert the tuple into a partition. */
1098 peter 2454 CBC 75593 : if (rel->localrel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
687 tgl 2455 44 : apply_handle_tuple_routing(edata,
2456 : remoteslot, NULL, CMD_INSERT);
2457 : else
687 tgl 2458 GIC 75549 : apply_handle_insert_internal(edata, edata->targetRelInfo,
2459 : remoteslot);
2460 :
2461 75578 : finish_edata(edata);
2462 :
2463 : /* Reset relation for error callback */
590 akapila 2464 75578 : apply_error_callback_arg.rel = NULL;
2465 :
5 rhaas 2466 GNC 75578 : if (!run_as_owner)
2467 75573 : RestoreUserContext(&ucxt);
2468 :
2271 peter_e 2469 GIC 75578 : logicalrep_rel_close(rel, NoLock);
2470 :
668 tgl 2471 75578 : end_replication_step();
2472 : }
2473 :
2474 : /*
2475 : * Workhorse for apply_handle_insert()
2476 : * relinfo is for the relation we're actually inserting into
2477 : * (could be a child partition of edata->targetRelInfo)
2478 : */
2479 : static void
687 2480 75594 : apply_handle_insert_internal(ApplyExecutionData *edata,
2481 : ResultRelInfo *relinfo,
2482 : TupleTableSlot *remoteslot)
2483 : {
687 tgl 2484 CBC 75594 : EState *estate = edata->estate;
687 tgl 2485 ECB :
2486 : /* We must open indexes here. */
1111 peter 2487 GBC 75594 : ExecOpenIndices(relinfo, false);
1111 peter 2488 EUB :
2489 : /* Do the insert. */
457 jdavis 2490 GIC 75594 : TargetPrivilegesCheck(relinfo->ri_RelationDesc, ACL_INSERT);
907 heikki.linnakangas 2491 75588 : ExecSimpleRelationInsert(relinfo, estate, remoteslot);
1111 peter 2492 ECB :
2493 : /* Cleanup. */
1111 peter 2494 GIC 75579 : ExecCloseIndices(relinfo);
2495 75579 : }
2496 :
2497 : /*
2498 : * Check if the logical replication relation is updatable and throw
2271 peter_e 2499 ECB : * appropriate error if it isn't.
2500 : */
2501 : static void
2271 peter_e 2502 CBC 72249 : check_relation_updatable(LogicalRepRelMapEntry *rel)
2503 : {
292 akapila 2504 ECB : /*
2505 : * For partitioned tables, we only need to care if the target partition is
2506 : * updatable (aka has PK or RI defined for it).
2507 : */
292 akapila 2508 GIC 72249 : if (rel->localrel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
2509 29 : return;
2510 :
2511 : /* Updatable, no error. */
2271 peter_e 2512 CBC 72220 : if (rel->updatable)
2271 peter_e 2513 GIC 72220 : return;
2514 :
2515 : /*
2516 : * We are in error mode so it's fine this is somewhat slow. It's better to
2517 : * give user correct error.
2271 peter_e 2518 ECB : */
2271 peter_e 2519 LBC 0 : if (OidIsValid(GetRelationIdentityOrPK(rel->localrel)))
2520 : {
2271 peter_e 2521 UIC 0 : ereport(ERROR,
2522 : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
2523 : errmsg("publisher did not send replica identity column "
2524 : "expected by the logical replication target relation \"%s.%s\"",
2525 : rel->remoterel.nspname, rel->remoterel.relname)));
2526 : }
2527 :
2528 0 : ereport(ERROR,
2529 : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
2530 : errmsg("logical replication target relation \"%s.%s\" has "
2531 : "neither REPLICA IDENTITY index nor PRIMARY "
2532 : "KEY and published relation does not have "
2533 : "REPLICA IDENTITY FULL",
2534 : rel->remoterel.nspname, rel->remoterel.relname)));
2535 : }
2536 :
2537 : /*
2538 : * Handle UPDATE message.
2539 : *
2271 peter_e 2540 ECB : * TODO: FDW support
2541 : */
2542 : static void
2271 peter_e 2543 CBC 66137 : apply_handle_update(StringInfo s)
2271 peter_e 2544 ECB : {
2545 : LogicalRepRelMapEntry *rel;
2546 : LogicalRepRelId relid;
2547 : UserContext ucxt;
687 tgl 2548 : ApplyExecutionData *edata;
2549 : EState *estate;
2153 bruce 2550 : LogicalRepTupleData oldtup;
2551 : LogicalRepTupleData newtup;
2552 : bool has_oldtup;
2553 : TupleTableSlot *remoteslot;
2554 : RTEPermissionInfo *target_perminfo;
2555 : MemoryContext oldctx;
2556 : bool run_as_owner;
2557 :
2558 : /*
2559 : * Quick return if we are skipping data modification changes or handling
2560 : * streamed transactions.
2561 : */
383 akapila 2562 GIC 132274 : if (is_skipping_changes() ||
383 akapila 2563 CBC 66137 : handle_streamed_transaction(LOGICAL_REP_MSG_UPDATE, s))
948 akapila 2564 GIC 34220 : return;
2565 :
668 tgl 2566 CBC 31917 : begin_replication_step();
2271 peter_e 2567 ECB :
2271 peter_e 2568 CBC 31917 : relid = logicalrep_read_update(s, &has_oldtup, &oldtup,
2271 peter_e 2569 ECB : &newtup);
2271 peter_e 2570 GIC 31917 : rel = logicalrep_rel_open(relid, RowExclusiveLock);
2208 2571 31917 : if (!should_apply_changes_for_rel(rel))
2572 : {
2208 peter_e 2573 ECB : /*
2574 : * The relation can't become interesting in the middle of the
2575 : * transaction so it's safe to unlock it.
2576 : */
2208 peter_e 2577 UIC 0 : logicalrep_rel_close(rel, RowExclusiveLock);
668 tgl 2578 LBC 0 : end_replication_step();
2208 peter_e 2579 0 : return;
2208 peter_e 2580 ECB : }
2581 :
2582 : /* Set relation for error callback */
590 akapila 2583 CBC 31917 : apply_error_callback_arg.rel = rel;
2584 :
2585 : /* Check if we can do the update. */
2271 peter_e 2586 31917 : check_relation_updatable(rel);
2587 :
2588 : /*
2589 : * Make sure that any user-supplied code runs as the table owner, unless
2590 : * the user has opted out of that behavior.
2591 : */
5 rhaas 2592 GNC 31917 : run_as_owner = MySubscription->runasowner;
2593 31917 : if (!run_as_owner)
2594 31914 : SwitchToUntrustedUser(rel->localrel->rd_rel->relowner, &ucxt);
2595 :
2271 peter_e 2596 ECB : /* Initialize the executor state. */
687 tgl 2597 CBC 31915 : edata = create_edata_for_relation(rel);
687 tgl 2598 GIC 31915 : estate = edata->estate;
1878 andres 2599 31915 : remoteslot = ExecInitExtraTupleSlot(estate,
1606 2600 31915 : RelationGetDescr(rel->localrel),
2601 : &TTSOpsVirtual);
2602 :
2603 : /*
2604 : * Populate updatedCols so that per-column triggers can fire, and so
816 pg 2605 ECB : * executor can correctly pass down indexUnchanged hint. This could
2606 : * include more columns than were actually changed on the publisher
2607 : * because the logical replication protocol doesn't contain that
2608 : * information. But it would for example exclude columns that only exist
2609 : * on the subscriber, since we are not touching those.
2610 : */
124 alvherre 2611 GNC 31915 : target_perminfo = list_nth(estate->es_rteperminfos, 0);
1189 peter 2612 GIC 159237 : for (int i = 0; i < remoteslot->tts_tupleDescriptor->natts; i++)
2613 : {
993 tgl 2614 CBC 127322 : Form_pg_attribute att = TupleDescAttr(remoteslot->tts_tupleDescriptor, i);
993 tgl 2615 GIC 127322 : int remoteattnum = rel->attrmap->attnums[i];
2616 :
993 tgl 2617 CBC 127322 : if (!att->attisdropped && remoteattnum >= 0)
993 tgl 2618 EUB : {
993 tgl 2619 GIC 68810 : Assert(remoteattnum < newtup.ncols);
2620 68810 : if (newtup.colstatus[remoteattnum] != LOGICALREP_COLUMN_UNCHANGED)
124 alvherre 2621 GNC 68807 : target_perminfo->updatedCols =
2622 68807 : bms_add_member(target_perminfo->updatedCols,
993 tgl 2623 ECB : i + 1 - FirstLowInvalidHeapAttributeNumber);
2624 : }
2625 : }
1189 peter 2626 :
2271 peter_e 2627 : /* Build the search tuple. */
2271 peter_e 2628 CBC 31915 : oldctx = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
995 tgl 2629 GIC 31915 : slot_store_data(remoteslot, rel,
995 tgl 2630 CBC 31915 : has_oldtup ? &oldtup : &newtup);
2271 peter_e 2631 GIC 31915 : MemoryContextSwitchTo(oldctx);
2271 peter_e 2632 ECB :
2633 : /* For a partitioned table, apply update to correct partition. */
1098 peter 2634 CBC 31915 : if (rel->localrel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
687 tgl 2635 GIC 12 : apply_handle_tuple_routing(edata,
687 tgl 2636 ECB : remoteslot, &newtup, CMD_UPDATE);
2637 : else
687 tgl 2638 GIC 31903 : apply_handle_update_internal(edata, edata->targetRelInfo,
2639 : remoteslot, &newtup, rel->localindexoid);
2640 :
2641 31911 : finish_edata(edata);
1111 peter 2642 ECB :
2643 : /* Reset relation for error callback */
590 akapila 2644 CBC 31911 : apply_error_callback_arg.rel = NULL;
590 akapila 2645 ECB :
5 rhaas 2646 GNC 31911 : if (!run_as_owner)
2647 31909 : RestoreUserContext(&ucxt);
2648 :
1111 peter 2649 GIC 31911 : logicalrep_rel_close(rel, NoLock);
1111 peter 2650 ECB :
668 tgl 2651 CBC 31911 : end_replication_step();
2652 : }
2653 :
2654 : /*
2655 : * Workhorse for apply_handle_update()
2656 : * relinfo is for the relation we're actually updating in
2657 : * (could be a child partition of edata->targetRelInfo)
2658 : */
2659 : static void
687 tgl 2660 GIC 31903 : apply_handle_update_internal(ApplyExecutionData *edata,
2661 : ResultRelInfo *relinfo,
2662 : TupleTableSlot *remoteslot,
2663 : LogicalRepTupleData *newtup,
2664 : Oid localindexoid)
2665 : {
2666 31903 : EState *estate = edata->estate;
2667 31903 : LogicalRepRelMapEntry *relmapentry = edata->targetRel;
1111 peter 2668 31903 : Relation localrel = relinfo->ri_RelationDesc;
1111 peter 2669 ECB : EPQState epqstate;
2670 : TupleTableSlot *localslot;
2671 : bool found;
2672 : MemoryContext oldctx;
2673 :
1111 peter 2674 GIC 31903 : EvalPlanQualInit(&epqstate, estate, NULL, NIL, -1);
2675 31903 : ExecOpenIndices(relinfo, false);
1111 peter 2676 ECB :
1103 peter 2677 GIC 31903 : found = FindReplTupleInLocalRel(estate, localrel,
2678 : &relmapentry->remoterel,
2679 : localindexoid,
2680 : remoteslot, &localslot);
2271 peter_e 2681 31899 : ExecClearTuple(remoteslot);
2682 :
2683 : /*
2684 : * Tuple found.
2685 : *
2686 : * Note this will fail if there are other conflicting unique indexes.
2687 : */
2688 31899 : if (found)
2689 : {
2690 : /* Process and store remote tuple in the slot */
2691 31898 : oldctx = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
995 tgl 2692 31898 : slot_modify_data(remoteslot, localslot, relmapentry, newtup);
2271 peter_e 2693 31898 : MemoryContextSwitchTo(oldctx);
2694 :
2695 31898 : EvalPlanQualSetSlot(&epqstate, remoteslot);
2696 :
2697 : /* Do the actual update. */
457 jdavis 2698 31898 : TargetPrivilegesCheck(relinfo->ri_RelationDesc, ACL_UPDATE);
907 heikki.linnakangas 2699 CBC 31898 : ExecSimpleRelationUpdate(relinfo, estate, &epqstate, localslot,
907 heikki.linnakangas 2700 ECB : remoteslot);
2701 : }
2271 peter_e 2702 : else
2703 : {
2704 : /*
2705 : * The tuple to be updated could not be found. Do nothing except for
2706 : * emitting a log message.
2707 : *
2708 : * XXX should this be promoted to ereport(LOG) perhaps?
2271 peter_e 2709 EUB : */
2271 peter_e 2710 GIC 1 : elog(DEBUG1,
2711 : "logical replication did not find row to be updated "
2271 peter_e 2712 ECB : "in replication target relation \"%s\"",
1111 peter 2713 : RelationGetRelationName(localrel));
2714 : }
2715 :
2716 : /* Cleanup. */
1111 peter 2717 GIC 31899 : ExecCloseIndices(relinfo);
2271 peter_e 2718 31899 : EvalPlanQualEnd(&epqstate);
2719 31899 : }
2271 peter_e 2720 ECB :
2721 : /*
2722 : * Handle DELETE message.
2723 : *
2724 : * TODO: FDW support
2725 : */
2726 : static void
2271 peter_e 2727 GIC 81918 : apply_handle_delete(StringInfo s)
2271 peter_e 2728 ECB : {
2729 : LogicalRepRelMapEntry *rel;
2153 bruce 2730 : LogicalRepTupleData oldtup;
2731 : LogicalRepRelId relid;
2732 : UserContext ucxt;
2733 : ApplyExecutionData *edata;
2734 : EState *estate;
2735 : TupleTableSlot *remoteslot;
2736 : MemoryContext oldctx;
2737 : bool run_as_owner;
2271 peter_e 2738 :
383 akapila 2739 : /*
2740 : * Quick return if we are skipping data modification changes or handling
2741 : * streamed transactions.
2742 : */
383 akapila 2743 GIC 163836 : if (is_skipping_changes() ||
2744 81918 : handle_streamed_transaction(LOGICAL_REP_MSG_DELETE, s))
948 2745 41615 : return;
2746 :
668 tgl 2747 40303 : begin_replication_step();
2748 :
2271 peter_e 2749 40303 : relid = logicalrep_read_delete(s, &oldtup);
2750 40303 : rel = logicalrep_rel_open(relid, RowExclusiveLock);
2208 peter_e 2751 CBC 40303 : if (!should_apply_changes_for_rel(rel))
2208 peter_e 2752 ECB : {
2753 : /*
2754 : * The relation can't become interesting in the middle of the
2755 : * transaction so it's safe to unlock it.
2756 : */
2208 peter_e 2757 UBC 0 : logicalrep_rel_close(rel, RowExclusiveLock);
668 tgl 2758 0 : end_replication_step();
2208 peter_e 2759 UIC 0 : return;
2760 : }
2761 :
590 akapila 2762 ECB : /* Set relation for error callback */
590 akapila 2763 CBC 40303 : apply_error_callback_arg.rel = rel;
2764 :
2765 : /* Check if we can do the delete. */
2271 peter_e 2766 GIC 40303 : check_relation_updatable(rel);
2767 :
2768 : /*
2769 : * Make sure that any user-supplied code runs as the table owner, unless
2770 : * the user has opted out of that behavior.
2771 : */
5 rhaas 2772 GNC 40303 : run_as_owner = MySubscription->runasowner;
2773 40303 : if (!run_as_owner)
2774 40301 : SwitchToUntrustedUser(rel->localrel->rd_rel->relowner, &ucxt);
2775 :
2776 : /* Initialize the executor state. */
687 tgl 2777 CBC 40303 : edata = create_edata_for_relation(rel);
687 tgl 2778 GIC 40303 : estate = edata->estate;
1878 andres 2779 40303 : remoteslot = ExecInitExtraTupleSlot(estate,
1606 2780 40303 : RelationGetDescr(rel->localrel),
2781 : &TTSOpsVirtual);
2782 :
2783 : /* Build the search tuple. */
2271 peter_e 2784 40303 : oldctx = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
995 tgl 2785 CBC 40303 : slot_store_data(remoteslot, rel, &oldtup);
2271 peter_e 2786 GIC 40303 : MemoryContextSwitchTo(oldctx);
2271 peter_e 2787 ECB :
2788 : /* For a partitioned table, apply delete to correct partition. */
1098 peter 2789 CBC 40303 : if (rel->localrel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
687 tgl 2790 GIC 17 : apply_handle_tuple_routing(edata,
687 tgl 2791 ECB : remoteslot, NULL, CMD_DELETE);
2792 : else
687 tgl 2793 CBC 40286 : apply_handle_delete_internal(edata, edata->targetRelInfo,
2794 : remoteslot, rel->localindexoid);
2795 :
2796 40303 : finish_edata(edata);
2797 :
590 akapila 2798 ECB : /* Reset relation for error callback */
590 akapila 2799 GIC 40303 : apply_error_callback_arg.rel = NULL;
590 akapila 2800 ECB :
5 rhaas 2801 GNC 40303 : if (!run_as_owner)
2802 40301 : RestoreUserContext(&ucxt);
2803 :
1111 peter 2804 GBC 40303 : logicalrep_rel_close(rel, NoLock);
2805 :
668 tgl 2806 CBC 40303 : end_replication_step();
2807 : }
2808 :
2809 : /*
2810 : * Workhorse for apply_handle_delete()
2811 : * relinfo is for the relation we're actually deleting from
687 tgl 2812 ECB : * (could be a child partition of edata->targetRelInfo)
2813 : */
2814 : static void
687 tgl 2815 GIC 40304 : apply_handle_delete_internal(ApplyExecutionData *edata,
2816 : ResultRelInfo *relinfo,
2817 : TupleTableSlot *remoteslot,
2818 : Oid localindexoid)
1111 peter 2819 ECB : {
687 tgl 2820 GIC 40304 : EState *estate = edata->estate;
1111 peter 2821 40304 : Relation localrel = relinfo->ri_RelationDesc;
687 tgl 2822 40304 : LogicalRepRelation *remoterel = &edata->targetRel->remoterel;
2823 : EPQState epqstate;
2824 : TupleTableSlot *localslot;
1111 peter 2825 ECB : bool found;
2826 :
1111 peter 2827 GIC 40304 : EvalPlanQualInit(&epqstate, estate, NULL, NIL, -1);
2828 40304 : ExecOpenIndices(relinfo, false);
1111 peter 2829 ECB :
25 akapila 2830 GNC 40304 : found = FindReplTupleInLocalRel(estate, localrel, remoterel, localindexoid,
2831 : remoteslot, &localslot);
2832 :
2833 : /* If found delete it. */
2271 peter_e 2834 GIC 40304 : if (found)
2835 : {
2271 peter_e 2836 CBC 40299 : EvalPlanQualSetSlot(&epqstate, localslot);
2837 :
2838 : /* Do the actual delete. */
457 jdavis 2839 40299 : TargetPrivilegesCheck(relinfo->ri_RelationDesc, ACL_DELETE);
907 heikki.linnakangas 2840 40299 : ExecSimpleRelationDelete(relinfo, estate, &epqstate, localslot);
2841 : }
2842 : else
2843 : {
2844 : /*
2845 : * The tuple to be deleted could not be found. Do nothing except for
667 tgl 2846 ECB : * emitting a log message.
2847 : *
2848 : * XXX should this be promoted to ereport(LOG) perhaps?
2849 : */
1762 peter_e 2850 GIC 5 : elog(DEBUG1,
667 tgl 2851 ECB : "logical replication did not find row to be deleted "
2852 : "in replication target relation \"%s\"",
1111 peter 2853 : RelationGetRelationName(localrel));
2271 peter_e 2854 : }
2855 :
2856 : /* Cleanup. */
1111 peter 2857 GIC 40304 : ExecCloseIndices(relinfo);
2271 peter_e 2858 CBC 40304 : EvalPlanQualEnd(&epqstate);
2271 peter_e 2859 GIC 40304 : }
2860 :
2861 : /*
2862 : * Try to find a tuple received from the publication side (in 'remoteslot') in
2863 : * the corresponding local relation using either replica identity index,
2864 : * primary key, index or if needed, sequential scan.
1103 peter 2865 ECB : *
2866 : * Local tuple, if found, is returned in '*localslot'.
2867 : */
2868 : static bool
1103 peter 2869 GIC 72219 : FindReplTupleInLocalRel(EState *estate, Relation localrel,
2870 : LogicalRepRelation *remoterel,
2871 : Oid localidxoid,
2872 : TupleTableSlot *remoteslot,
2873 : TupleTableSlot **localslot)
1103 peter 2874 ECB : {
2875 : bool found;
2876 :
2877 : /*
2878 : * Regardless of the top-level operation, we're performing a read here, so
456 jdavis 2879 : * check for SELECT privileges.
2880 : */
456 jdavis 2881 GIC 72219 : TargetPrivilegesCheck(localrel, ACL_SELECT);
456 jdavis 2882 ECB :
1103 peter 2883 GIC 72215 : *localslot = table_slot_create(localrel, &estate->es_tupleTable);
2884 :
25 akapila 2885 GNC 72215 : Assert(OidIsValid(localidxoid) ||
2886 : (remoterel->replident == REPLICA_IDENTITY_FULL));
2887 :
2888 72215 : if (OidIsValid(localidxoid))
2889 72071 : found = RelationFindReplTupleByIndex(localrel, localidxoid,
2890 : LockTupleExclusive,
2891 : remoteslot, *localslot);
2892 : else
1103 peter 2893 CBC 144 : found = RelationFindReplTupleSeq(localrel, LockTupleExclusive,
2894 : remoteslot, *localslot);
2895 :
2896 72215 : return found;
2897 : }
1103 peter 2898 ECB :
2899 : /*
2900 : * This handles insert, update, delete on a partitioned table.
1098 2901 : */
2902 : static void
687 tgl 2903 GIC 73 : apply_handle_tuple_routing(ApplyExecutionData *edata,
2904 : TupleTableSlot *remoteslot,
1098 peter 2905 ECB : LogicalRepTupleData *newtup,
2906 : CmdType operation)
2907 : {
687 tgl 2908 GIC 73 : EState *estate = edata->estate;
687 tgl 2909 CBC 73 : LogicalRepRelMapEntry *relmapentry = edata->targetRel;
687 tgl 2910 GIC 73 : ResultRelInfo *relinfo = edata->targetRelInfo;
1098 peter 2911 CBC 73 : Relation parentrel = relinfo->ri_RelationDesc;
2912 : ModifyTableState *mtstate;
687 tgl 2913 ECB : PartitionTupleRouting *proute;
2914 : ResultRelInfo *partrelinfo;
2915 : Relation partrel;
2916 : TupleTableSlot *remoteslot_part;
2917 : TupleConversionMap *map;
2918 : MemoryContext oldctx;
292 akapila 2919 GIC 73 : LogicalRepRelMapEntry *part_entry = NULL;
292 akapila 2920 CBC 73 : AttrMap *attrmap = NULL;
2921 :
1098 peter 2922 ECB : /* ModifyTableState is needed for ExecFindPartition(). */
687 tgl 2923 CBC 73 : edata->mtstate = mtstate = makeNode(ModifyTableState);
1098 peter 2924 GIC 73 : mtstate->ps.plan = NULL;
2925 73 : mtstate->ps.state = estate;
1098 peter 2926 CBC 73 : mtstate->operation = operation;
2927 73 : mtstate->resultRelInfo = relinfo;
2928 :
2929 : /* ... as is PartitionTupleRouting. */
687 tgl 2930 GIC 73 : edata->proute = proute = ExecSetupPartitionTupleRouting(estate, parentrel);
1098 peter 2931 ECB :
2932 : /*
2933 : * Find the partition to which the "search tuple" belongs.
2934 : */
1098 peter 2935 GIC 73 : Assert(remoteslot != NULL);
2936 73 : oldctx = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
1098 peter 2937 CBC 73 : partrelinfo = ExecFindPartition(mtstate, relinfo, proute,
2938 : remoteslot, estate);
1098 peter 2939 GIC 73 : Assert(partrelinfo != NULL);
2940 73 : partrel = partrelinfo->ri_RelationDesc;
2941 :
2942 : /*
2943 : * Check for supported relkind. We need this since partitions might be of
158 tgl 2944 ECB : * unsupported relkinds; and the set of partitions can change, so checking
2945 : * at CREATE/ALTER SUBSCRIPTION would be insufficient.
2946 : */
158 tgl 2947 GIC 73 : CheckSubscriptionRelkind(partrel->rd_rel->relkind,
2948 73 : get_namespace_name(RelationGetNamespace(partrel)),
2949 73 : RelationGetRelationName(partrel));
2950 :
2951 : /*
1098 peter 2952 ECB : * To perform any of the operations below, the tuple must match the
2953 : * partition's rowtype. Convert if needed or just copy, using a dedicated
2954 : * slot to store the tuple in any case.
1098 peter 2955 EUB : */
902 heikki.linnakangas 2956 GIC 73 : remoteslot_part = partrelinfo->ri_PartitionTupleSlot;
1098 peter 2957 73 : if (remoteslot_part == NULL)
2958 41 : remoteslot_part = table_slot_create(partrel, &estate->es_tupleTable);
128 alvherre 2959 GNC 73 : map = ExecGetRootToChildMap(partrelinfo, estate);
1098 peter 2960 CBC 73 : if (map != NULL)
2961 : {
292 akapila 2962 32 : attrmap = map->attrMap;
292 akapila 2963 GIC 32 : remoteslot_part = execute_attr_map_slot(attrmap, remoteslot,
1098 peter 2964 ECB : remoteslot_part);
2965 : }
2966 : else
2967 : {
1098 peter 2968 GIC 41 : remoteslot_part = ExecCopySlot(remoteslot_part, remoteslot);
2969 41 : slot_getallattrs(remoteslot_part);
2970 : }
2971 73 : MemoryContextSwitchTo(oldctx);
1098 peter 2972 ECB :
2973 : /* Check if we can do the update or delete on the leaf partition. */
292 akapila 2974 GIC 73 : if (operation == CMD_UPDATE || operation == CMD_DELETE)
292 akapila 2975 ECB : {
292 akapila 2976 GIC 29 : part_entry = logicalrep_partition_open(relmapentry, partrel,
2977 : attrmap);
292 akapila 2978 CBC 29 : check_relation_updatable(part_entry);
2979 : }
292 akapila 2980 ECB :
1098 peter 2981 CBC 73 : switch (operation)
2982 : {
2983 44 : case CMD_INSERT:
687 tgl 2984 44 : apply_handle_insert_internal(edata, partrelinfo,
2985 : remoteslot_part);
1098 peter 2986 44 : break;
2987 :
1098 peter 2988 GIC 17 : case CMD_DELETE:
687 tgl 2989 CBC 17 : apply_handle_delete_internal(edata, partrelinfo,
2990 : remoteslot_part,
2991 : part_entry->localindexoid);
1098 peter 2992 GIC 17 : break;
2993 :
2994 12 : case CMD_UPDATE:
2995 :
2996 : /*
2997 : * For UPDATE, depending on whether or not the updated tuple
1098 peter 2998 EUB : * satisfies the partition's constraint, perform a simple UPDATE
2999 : * of the partition or move the updated tuple into a different
3000 : * suitable partition.
1098 peter 3001 ECB : */
3002 : {
3003 : TupleTableSlot *localslot;
3004 : ResultRelInfo *partrelinfo_new;
3005 : Relation partrel_new;
3006 : bool found;
3007 :
3008 : /* Get the matching local tuple from the partition. */
1098 peter 3009 GIC 12 : found = FindReplTupleInLocalRel(estate, partrel,
1098 peter 3010 ECB : &part_entry->remoterel,
3011 : part_entry->localindexoid,
3012 : remoteslot_part, &localslot);
667 tgl 3013 GIC 12 : if (!found)
1098 peter 3014 ECB : {
3015 : /*
3016 : * The tuple to be updated could not be found. Do nothing
3017 : * except for emitting a log message.
3018 : *
3019 : * XXX should this be promoted to ereport(LOG) perhaps?
3020 : */
1098 peter 3021 CBC 2 : elog(DEBUG1,
3022 : "logical replication did not find row to be updated "
667 tgl 3023 ECB : "in replication target relation's partition \"%s\"",
3024 : RelationGetRelationName(partrel));
667 tgl 3025 CBC 2 : return;
3026 : }
3027 :
3028 : /*
3029 : * Apply the update to the local tuple, putting the result in
3030 : * remoteslot_part.
667 tgl 3031 ECB : */
667 tgl 3032 CBC 10 : oldctx = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
667 tgl 3033 GIC 10 : slot_modify_data(remoteslot_part, localslot, part_entry,
667 tgl 3034 ECB : newtup);
667 tgl 3035 GIC 10 : MemoryContextSwitchTo(oldctx);
667 tgl 3036 ECB :
1098 peter 3037 : /*
3038 : * Does the updated tuple still satisfy the current
1098 peter 3039 EUB : * partition's constraint?
3040 : */
935 tgl 3041 GIC 20 : if (!partrel->rd_rel->relispartition ||
1098 peter 3042 10 : ExecPartitionCheck(partrelinfo, remoteslot_part, estate,
3043 : false))
3044 9 : {
1098 peter 3045 ECB : /*
3046 : * Yes, so simply UPDATE the partition. We don't call
3047 : * apply_handle_update_internal() here, which would
3048 : * normally do the following work, to avoid repeating some
3049 : * work already done above to find the local tuple in the
3050 : * partition.
3051 : */
3052 : EPQState epqstate;
3053 :
1098 peter 3054 GIC 9 : EvalPlanQualInit(&epqstate, estate, NULL, NIL, -1);
3055 9 : ExecOpenIndices(partrelinfo, false);
1098 peter 3056 ECB :
1098 peter 3057 GIC 9 : EvalPlanQualSetSlot(&epqstate, remoteslot_part);
457 jdavis 3058 CBC 9 : TargetPrivilegesCheck(partrelinfo->ri_RelationDesc,
3059 : ACL_UPDATE);
907 heikki.linnakangas 3060 9 : ExecSimpleRelationUpdate(partrelinfo, estate, &epqstate,
3061 : localslot, remoteslot_part);
1098 peter 3062 GIC 9 : ExecCloseIndices(partrelinfo);
3063 9 : EvalPlanQualEnd(&epqstate);
3064 : }
3065 : else
1098 peter 3066 ECB : {
3067 : /* Move the tuple into the new partition. */
3068 :
3069 : /*
3070 : * New partition will be found using tuple routing, which
3071 : * can only occur via the parent table. We might need to
3072 : * convert the tuple to the parent's rowtype. Note that
3073 : * this is the tuple found in the partition, not the
3074 : * original search tuple received by this function.
3075 : */
1098 peter 3076 CBC 1 : if (map)
3077 : {
3078 : TupleConversionMap *PartitionToRootMap =
1098 peter 3079 GIC 1 : convert_tuples_by_name(RelationGetDescr(partrel),
3080 : RelationGetDescr(parentrel));
3081 :
1098 peter 3082 ECB : remoteslot =
1098 peter 3083 CBC 1 : execute_attr_map_slot(PartitionToRootMap->attrMap,
3084 : remoteslot_part, remoteslot);
1098 peter 3085 ECB : }
3086 : else
3087 : {
1098 peter 3088 UIC 0 : remoteslot = ExecCopySlot(remoteslot, remoteslot_part);
1098 peter 3089 LBC 0 : slot_getallattrs(remoteslot);
1098 peter 3090 ECB : }
3091 :
3092 : /* Find the new partition. */
1098 peter 3093 CBC 1 : oldctx = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
1098 peter 3094 GIC 1 : partrelinfo_new = ExecFindPartition(mtstate, relinfo,
1098 peter 3095 ECB : proute, remoteslot,
3096 : estate);
1098 peter 3097 GIC 1 : MemoryContextSwitchTo(oldctx);
3098 1 : Assert(partrelinfo_new != partrelinfo);
158 tgl 3099 1 : partrel_new = partrelinfo_new->ri_RelationDesc;
158 tgl 3100 EUB :
3101 : /* Check that new partition also has supported relkind. */
158 tgl 3102 GIC 1 : CheckSubscriptionRelkind(partrel_new->rd_rel->relkind,
3103 1 : get_namespace_name(RelationGetNamespace(partrel_new)),
158 tgl 3104 CBC 1 : RelationGetRelationName(partrel_new));
1098 peter 3105 ECB :
3106 : /* DELETE old tuple found in the old partition. */
687 tgl 3107 GIC 1 : apply_handle_delete_internal(edata, partrelinfo,
3108 : localslot,
3109 : part_entry->localindexoid);
3110 :
3111 : /* INSERT new tuple into the new partition. */
3112 :
3113 : /*
3114 : * Convert the replacement tuple to match the destination
3115 : * partition rowtype.
3116 : */
1098 peter 3117 CBC 1 : oldctx = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
902 heikki.linnakangas 3118 GIC 1 : remoteslot_part = partrelinfo_new->ri_PartitionTupleSlot;
1098 peter 3119 1 : if (remoteslot_part == NULL)
158 tgl 3120 1 : remoteslot_part = table_slot_create(partrel_new,
1098 peter 3121 ECB : &estate->es_tupleTable);
128 alvherre 3122 GNC 1 : map = ExecGetRootToChildMap(partrelinfo_new, estate);
1098 peter 3123 GIC 1 : if (map != NULL)
1098 peter 3124 ECB : {
1098 peter 3125 LBC 0 : remoteslot_part = execute_attr_map_slot(map->attrMap,
3126 : remoteslot,
3127 : remoteslot_part);
1098 peter 3128 ECB : }
3129 : else
3130 : {
1098 peter 3131 GIC 1 : remoteslot_part = ExecCopySlot(remoteslot_part,
3132 : remoteslot);
3133 1 : slot_getallattrs(remoteslot);
3134 : }
3135 1 : MemoryContextSwitchTo(oldctx);
687 tgl 3136 1 : apply_handle_insert_internal(edata, partrelinfo_new,
3137 : remoteslot_part);
3138 : }
3139 : }
1098 peter 3140 CBC 10 : break;
3141 :
1098 peter 3142 UIC 0 : default:
3143 0 : elog(ERROR, "unrecognized CmdType: %d", (int) operation);
1098 peter 3144 ECB : break;
1098 peter 3145 EUB : }
3146 : }
1098 peter 3147 ECB :
3148 : /*
3149 : * Handle TRUNCATE message.
3150 : *
3151 : * TODO: FDW support
3152 : */
3153 : static void
1828 peter_e 3154 GIC 17 : apply_handle_truncate(StringInfo s)
1828 peter_e 3155 ECB : {
1809 tgl 3156 GIC 17 : bool cascade = false;
3157 17 : bool restart_seqs = false;
3158 17 : List *remote_relids = NIL;
3159 17 : List *remote_rels = NIL;
1809 tgl 3160 CBC 17 : List *rels = NIL;
1098 peter 3161 17 : List *part_rels = NIL;
1809 tgl 3162 17 : List *relids = NIL;
3163 17 : List *relids_logged = NIL;
1809 tgl 3164 ECB : ListCell *lc;
688 tgl 3165 CBC 17 : LOCKMODE lockmode = AccessExclusiveLock;
3166 :
3167 : /*
3168 : * Quick return if we are skipping data modification changes or handling
3169 : * streamed transactions.
3170 : */
383 akapila 3171 GIC 34 : if (is_skipping_changes() ||
3172 17 : handle_streamed_transaction(LOGICAL_REP_MSG_TRUNCATE, s))
948 akapila 3173 UIC 0 : return;
948 akapila 3174 ECB :
668 tgl 3175 CBC 17 : begin_replication_step();
3176 :
1828 peter_e 3177 GIC 17 : remote_relids = logicalrep_read_truncate(s, &cascade, &restart_seqs);
3178 :
3179 43 : foreach(lc, remote_relids)
1828 peter_e 3180 ECB : {
1828 peter_e 3181 GIC 26 : LogicalRepRelId relid = lfirst_oid(lc);
3182 : LogicalRepRelMapEntry *rel;
3183 :
688 akapila 3184 26 : rel = logicalrep_rel_open(relid, lockmode);
1828 peter_e 3185 26 : if (!should_apply_changes_for_rel(rel))
3186 : {
1828 peter_e 3187 ECB : /*
3188 : * The relation can't become interesting in the middle of the
3189 : * transaction so it's safe to unlock it.
3190 : */
688 akapila 3191 UIC 0 : logicalrep_rel_close(rel, lockmode);
1828 peter_e 3192 0 : continue;
3193 : }
3194 :
1828 peter_e 3195 GIC 26 : remote_rels = lappend(remote_rels, rel);
457 jdavis 3196 26 : TargetPrivilegesCheck(rel->localrel, ACL_TRUNCATE);
1828 peter_e 3197 26 : rels = lappend(rels, rel->localrel);
3198 26 : relids = lappend_oid(relids, rel->localreloid);
3199 26 : if (RelationIsLogicallyLogged(rel->localrel))
1812 3200 26 : relids_logged = lappend_oid(relids_logged, rel->localreloid);
3201 :
3202 : /*
1098 peter 3203 ECB : * Truncate partitions if we got a message to truncate a partitioned
3204 : * table.
3205 : */
1098 peter 3206 GIC 26 : if (rel->localrel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
1098 peter 3207 ECB : {
3208 : ListCell *child;
1098 peter 3209 CBC 4 : List *children = find_all_inheritors(rel->localreloid,
688 akapila 3210 ECB : lockmode,
1098 peter 3211 : NULL);
3212 :
1098 peter 3213 GIC 15 : foreach(child, children)
3214 : {
3215 11 : Oid childrelid = lfirst_oid(child);
3216 : Relation childrel;
1098 peter 3217 ECB :
1098 peter 3218 CBC 11 : if (list_member_oid(relids, childrelid))
3219 4 : continue;
3220 :
3221 : /* find_all_inheritors already got lock */
1098 peter 3222 GIC 7 : childrel = table_open(childrelid, NoLock);
3223 :
3224 : /*
3225 : * Ignore temp tables of other backends. See similar code in
1098 peter 3226 ECB : * ExecuteTruncate().
3227 : */
1098 peter 3228 CBC 7 : if (RELATION_IS_OTHER_TEMP(childrel))
3229 : {
688 akapila 3230 UIC 0 : table_close(childrel, lockmode);
1098 peter 3231 LBC 0 : continue;
3232 : }
3233 :
457 jdavis 3234 CBC 7 : TargetPrivilegesCheck(childrel, ACL_TRUNCATE);
1098 peter 3235 7 : rels = lappend(rels, childrel);
3236 7 : part_rels = lappend(part_rels, childrel);
3237 7 : relids = lappend_oid(relids, childrelid);
3238 : /* Log this relation only if needed for logical decoding */
1098 peter 3239 GIC 7 : if (RelationIsLogicallyLogged(childrel))
3240 7 : relids_logged = lappend_oid(relids_logged, childrelid);
1098 peter 3241 ECB : }
3242 : }
1828 peter_e 3243 : }
3244 :
3245 : /*
3246 : * Even if we used CASCADE on the upstream primary we explicitly default
995 tgl 3247 : * to replaying changes without further cascading. This might be later
1809 3248 : * changeable with a user specified option.
3249 : *
3250 : * MySubscription->runasowner tells us whether we want to execute
3251 : * replication actions as the subscription owner; the last argument to
3252 : * TruncateGuts tells it whether we want to switch to the table owner.
3253 : * Those are exactly opposite conditions.
3254 : */
731 fujii 3255 GIC 17 : ExecuteTruncateGuts(rels,
731 fujii 3256 ECB : relids,
3257 : relids_logged,
3258 : DROP_RESTRICT,
3259 : restart_seqs,
5 rhaas 3260 GNC 17 : !MySubscription->runasowner);
1828 peter_e 3261 GIC 43 : foreach(lc, remote_rels)
3262 : {
1828 peter_e 3263 CBC 26 : LogicalRepRelMapEntry *rel = lfirst(lc);
3264 :
3265 26 : logicalrep_rel_close(rel, NoLock);
1828 peter_e 3266 ECB : }
1098 peter 3267 GIC 24 : foreach(lc, part_rels)
1098 peter 3268 ECB : {
1098 peter 3269 GIC 7 : Relation rel = lfirst(lc);
1098 peter 3270 ECB :
1098 peter 3271 GIC 7 : table_close(rel, NoLock);
3272 : }
3273 :
668 tgl 3274 17 : end_replication_step();
3275 : }
3276 :
3277 :
3278 : /*
2271 peter_e 3279 ECB : * Logical replication protocol message dispatcher.
3280 : */
3281 : void
2271 peter_e 3282 GIC 336698 : apply_dispatch(StringInfo s)
2271 peter_e 3283 ECB : {
888 akapila 3284 GIC 336698 : LogicalRepMsgType action = pq_getmsgbyte(s);
3285 : LogicalRepMsgType saved_command;
590 akapila 3286 ECB :
3287 : /*
3288 : * Set the current command being applied. Since this function can be
439 michael 3289 : * called recursively when applying spooled changes, save the current
590 akapila 3290 : * command.
3291 : */
590 akapila 3292 GIC 336698 : saved_command = apply_error_callback_arg.command;
590 akapila 3293 CBC 336698 : apply_error_callback_arg.command = action;
2271 peter_e 3294 ECB :
2271 peter_e 3295 GIC 336698 : switch (action)
3296 : {
888 akapila 3297 380 : case LOGICAL_REP_MSG_BEGIN:
2271 peter_e 3298 380 : apply_handle_begin(s);
590 akapila 3299 380 : break;
3300 :
888 akapila 3301 CBC 355 : case LOGICAL_REP_MSG_COMMIT:
2271 peter_e 3302 GIC 355 : apply_handle_commit(s);
590 akapila 3303 355 : break;
3304 :
888 3305 185646 : case LOGICAL_REP_MSG_INSERT:
2271 peter_e 3306 185646 : apply_handle_insert(s);
590 akapila 3307 CBC 185624 : break;
888 akapila 3308 ECB :
888 akapila 3309 GIC 66137 : case LOGICAL_REP_MSG_UPDATE:
2271 peter_e 3310 66137 : apply_handle_update(s);
590 akapila 3311 CBC 66131 : break;
888 akapila 3312 ECB :
888 akapila 3313 GIC 81918 : case LOGICAL_REP_MSG_DELETE:
2271 peter_e 3314 81918 : apply_handle_delete(s);
590 akapila 3315 81918 : break;
3316 :
888 3317 17 : case LOGICAL_REP_MSG_TRUNCATE:
1828 peter_e 3318 GBC 17 : apply_handle_truncate(s);
590 akapila 3319 GIC 17 : break;
888 akapila 3320 EUB :
888 akapila 3321 GIC 390 : case LOGICAL_REP_MSG_RELATION:
2271 peter_e 3322 390 : apply_handle_relation(s);
590 akapila 3323 390 : break;
3324 :
888 3325 18 : case LOGICAL_REP_MSG_TYPE:
2271 peter_e 3326 18 : apply_handle_type(s);
590 akapila 3327 GBC 18 : break;
3328 :
888 akapila 3329 GIC 7 : case LOGICAL_REP_MSG_ORIGIN:
2271 peter_e 3330 7 : apply_handle_origin(s);
590 akapila 3331 7 : break;
3332 :
733 akapila 3333 UIC 0 : case LOGICAL_REP_MSG_MESSAGE:
3334 :
3335 : /*
3336 : * Logical replication does not use generic logical messages yet.
3337 : * Although, it could be used by other applications that use this
3338 : * output plugin.
3339 : */
590 3340 0 : break;
3341 :
888 akapila 3342 CBC 835 : case LOGICAL_REP_MSG_STREAM_START:
948 akapila 3343 GIC 835 : apply_handle_stream_start(s);
590 3344 835 : break;
3345 :
598 3346 834 : case LOGICAL_REP_MSG_STREAM_STOP:
948 3347 834 : apply_handle_stream_stop(s);
590 3348 832 : break;
3349 :
888 3350 38 : case LOGICAL_REP_MSG_STREAM_ABORT:
948 3351 38 : apply_handle_stream_abort(s);
590 3352 38 : break;
3353 :
888 3354 61 : case LOGICAL_REP_MSG_STREAM_COMMIT:
948 3355 61 : apply_handle_stream_commit(s);
590 3356 59 : break;
3357 :
634 3358 14 : case LOGICAL_REP_MSG_BEGIN_PREPARE:
3359 14 : apply_handle_begin_prepare(s);
590 3360 14 : break;
634 akapila 3361 ECB :
634 akapila 3362 CBC 13 : case LOGICAL_REP_MSG_PREPARE:
3363 13 : apply_handle_prepare(s);
590 akapila 3364 GIC 13 : break;
634 akapila 3365 ECB :
634 akapila 3366 GIC 19 : case LOGICAL_REP_MSG_COMMIT_PREPARED:
634 akapila 3367 CBC 19 : apply_handle_commit_prepared(s);
590 akapila 3368 GIC 19 : break;
634 akapila 3369 ECB :
634 akapila 3370 CBC 5 : case LOGICAL_REP_MSG_ROLLBACK_PREPARED:
634 akapila 3371 GIC 5 : apply_handle_rollback_prepared(s);
590 3372 5 : break;
3373 :
613 3374 11 : case LOGICAL_REP_MSG_STREAM_PREPARE:
3375 11 : apply_handle_stream_prepare(s);
590 akapila 3376 GBC 11 : break;
590 akapila 3377 EUB :
590 akapila 3378 UBC 0 : default:
590 akapila 3379 UIC 0 : ereport(ERROR,
3380 : (errcode(ERRCODE_PROTOCOL_VIOLATION),
3381 : errmsg("invalid logical replication message type \"%c\"", action)));
2271 peter_e 3382 ECB : }
3383 :
3384 : /* Reset the current command */
590 akapila 3385 CBC 336666 : apply_error_callback_arg.command = saved_command;
2271 peter_e 3386 GIC 336666 : }
3387 :
3388 : /*
3389 : * Figure out which write/flush positions to report to the walsender process.
3390 : *
2271 peter_e 3391 ECB : * We can't simply report back the last LSN the walsender sent us because the
3392 : * local transaction might not yet be flushed to disk locally. Instead we
3393 : * build a list that associates local with remote LSNs for every commit. When
3394 : * reporting back the flush position to the sender we iterate that list and
3395 : * check which entries on it are already locally flushed. Those we can report
3396 : * as having been flushed.
3397 : *
3398 : * The have_pending_txes is true if there are outstanding transactions that
3399 : * need to be flushed.
3400 : */
3401 : static void
2271 peter_e 3402 GIC 58743 : get_flush_position(XLogRecPtr *write, XLogRecPtr *flush,
3403 : bool *have_pending_txes)
3404 : {
3405 : dlist_mutable_iter iter;
520 rhaas 3406 58743 : XLogRecPtr local_flush = GetFlushRecPtr(NULL);
3407 :
2271 peter_e 3408 58743 : *write = InvalidXLogRecPtr;
3409 58743 : *flush = InvalidXLogRecPtr;
2271 peter_e 3410 ECB :
2271 peter_e 3411 CBC 59146 : dlist_foreach_modify(iter, &lsn_mapping)
3412 : {
3413 7251 : FlushPosition *pos =
2153 bruce 3414 7251 : dlist_container(FlushPosition, node, iter.cur);
3415 :
2271 peter_e 3416 7251 : *write = pos->remote_end;
3417 :
3418 7251 : if (pos->local_end <= local_flush)
2271 peter_e 3419 ECB : {
2271 peter_e 3420 CBC 403 : *flush = pos->remote_end;
3421 403 : dlist_delete(iter.cur);
2271 peter_e 3422 GIC 403 : pfree(pos);
3423 : }
3424 : else
3425 : {
3426 : /*
2271 peter_e 3427 ECB : * Don't want to uselessly iterate over the rest of the list which
3428 : * could potentially be long. Instead get the last element and
3429 : * grab the write position from there.
3430 : */
2271 peter_e 3431 GIC 6848 : pos = dlist_tail_element(FlushPosition, node,
3432 : &lsn_mapping);
2271 peter_e 3433 CBC 6848 : *write = pos->remote_end;
3434 6848 : *have_pending_txes = true;
2271 peter_e 3435 GIC 6848 : return;
3436 : }
2271 peter_e 3437 ECB : }
3438 :
2271 peter_e 3439 GIC 51895 : *have_pending_txes = !dlist_is_empty(&lsn_mapping);
2271 peter_e 3440 ECB : }
3441 :
3442 : /*
3443 : * Store current remote/local lsn pair in the tracking list.
3444 : */
3445 : void
90 akapila 3446 GNC 459 : store_flush_position(XLogRecPtr remote_lsn, XLogRecPtr local_lsn)
3447 : {
2271 peter_e 3448 ECB : FlushPosition *flushpos;
3449 :
3450 : /*
3451 : * Skip for parallel apply workers, because the lsn_mapping is maintained
3452 : * by the leader apply worker.
3453 : */
90 akapila 3454 GNC 459 : if (am_parallel_apply_worker())
3455 19 : return;
3456 :
2271 peter_e 3457 ECB : /* Need to do this in permanent context */
2161 peter_e 3458 GIC 440 : MemoryContextSwitchTo(ApplyContext);
3459 :
3460 : /* Track commit lsn */
2271 3461 440 : flushpos = (FlushPosition *) palloc(sizeof(FlushPosition));
90 akapila 3462 GNC 440 : flushpos->local_end = local_lsn;
2271 peter_e 3463 GIC 440 : flushpos->remote_end = remote_lsn;
3464 :
3465 440 : dlist_push_tail(&lsn_mapping, &flushpos->node);
2161 peter_e 3466 CBC 440 : MemoryContextSwitchTo(ApplyMessageContext);
3467 : }
3468 :
3469 :
3470 : /* Update statistics of the worker. */
3471 : static void
2271 3472 185649 : UpdateWorkerStats(XLogRecPtr last_lsn, TimestampTz send_time, bool reply)
2271 peter_e 3473 ECB : {
2271 peter_e 3474 CBC 185649 : MyLogicalRepWorker->last_lsn = last_lsn;
2271 peter_e 3475 GIC 185649 : MyLogicalRepWorker->last_send_time = send_time;
3476 185649 : MyLogicalRepWorker->last_recv_time = GetCurrentTimestamp();
3477 185649 : if (reply)
3478 : {
3479 1295 : MyLogicalRepWorker->reply_lsn = last_lsn;
2271 peter_e 3480 CBC 1295 : MyLogicalRepWorker->reply_time = send_time;
2271 peter_e 3481 ECB : }
2271 peter_e 3482 GIC 185649 : }
2271 peter_e 3483 ECB :
3484 : /*
3485 : * Apply main loop.
3486 : */
3487 : static void
2208 peter_e 3488 GIC 282 : LogicalRepApplyLoop(XLogRecPtr last_received)
3489 : {
1269 michael 3490 282 : TimestampTz last_recv_timestamp = GetCurrentTimestamp();
947 tgl 3491 282 : bool ping_sent = false;
3492 : TimeLineID tli;
3493 : ErrorContextCallback errcallback;
1269 michael 3494 ECB :
3495 : /*
3496 : * Init the ApplyMessageContext which we clean up after each replication
2153 bruce 3497 : * protocol message.
2161 peter_e 3498 : */
2161 peter_e 3499 CBC 282 : ApplyMessageContext = AllocSetContextCreate(ApplyContext,
3500 : "ApplyMessageContext",
2153 bruce 3501 ECB : ALLOCSET_DEFAULT_SIZES);
3502 :
3503 : /*
948 akapila 3504 : * This memory context is used for per-stream data when the streaming mode
3505 : * is enabled. This context is reset on each stream stop.
3506 : */
948 akapila 3507 GIC 282 : LogicalStreamingContext = AllocSetContextCreate(ApplyContext,
3508 : "LogicalStreamingContext",
3509 : ALLOCSET_DEFAULT_SIZES);
3510 :
3511 : /* mark as idle, before starting to loop */
2271 peter_e 3512 282 : pgstat_report_activity(STATE_IDLE, NULL);
3513 :
3514 : /*
3515 : * Push apply error context callback. Fields will be filled while applying
439 michael 3516 ECB : * a change.
3517 : */
590 akapila 3518 GIC 282 : errcallback.callback = apply_error_callback;
3519 282 : errcallback.previous = error_context_stack;
3520 282 : error_context_stack = &errcallback;
90 akapila 3521 GNC 282 : apply_error_context_stack = error_context_stack;
3522 :
3523 : /* This outer loop iterates once per wait. */
2137 peter_e 3524 ECB : for (;;)
2271 peter_e 3525 CBC 57102 : {
3526 57384 : pgsocket fd = PGINVALID_SOCKET;
3527 : int rc;
3528 : int len;
2271 peter_e 3529 GIC 57384 : char *buf = NULL;
3530 57384 : bool endofstream = false;
3531 : long wait_time;
3532 :
2137 3533 57384 : CHECK_FOR_INTERRUPTS();
2137 peter_e 3534 ECB :
2161 peter_e 3535 GIC 57384 : MemoryContextSwitchTo(ApplyMessageContext);
3536 :
697 alvherre 3537 57384 : len = walrcv_receive(LogRepWorkerWalRcvConn, &buf, &fd);
3538 :
2271 peter_e 3539 57371 : if (len != 0)
3540 : {
3541 : /* Loop to process all available data (without blocking). */
3542 : for (;;)
3543 : {
3544 242244 : CHECK_FOR_INTERRUPTS();
3545 :
3546 242244 : if (len == 0)
3547 : {
3548 56592 : break;
3549 : }
2271 peter_e 3550 CBC 185652 : else if (len < 0)
2271 peter_e 3551 ECB : {
2271 peter_e 3552 CBC 3 : ereport(LOG,
3553 : (errmsg("data stream from publisher has ended")));
3554 3 : endofstream = true;
2271 peter_e 3555 GIC 3 : break;
2271 peter_e 3556 ECB : }
3557 : else
3558 : {
3559 : int c;
3560 : StringInfoData s;
3561 :
3562 : /* Reset timeout. */
2271 peter_e 3563 GIC 185649 : last_recv_timestamp = GetCurrentTimestamp();
2271 peter_e 3564 GBC 185649 : ping_sent = false;
2271 peter_e 3565 EUB :
3566 : /* Ensure we are reading the data into our memory context. */
2161 peter_e 3567 GIC 185649 : MemoryContextSwitchTo(ApplyMessageContext);
3568 :
2271 3569 185649 : s.data = buf;
2271 peter_e 3570 CBC 185649 : s.len = len;
2271 peter_e 3571 GIC 185649 : s.cursor = 0;
3572 185649 : s.maxlen = -1;
2271 peter_e 3573 ECB :
2271 peter_e 3574 GIC 185649 : c = pq_getmsgbyte(&s);
3575 :
3576 185649 : if (c == 'w')
3577 : {
3578 : XLogRecPtr start_lsn;
2271 peter_e 3579 ECB : XLogRecPtr end_lsn;
2236 tgl 3580 : TimestampTz send_time;
2271 peter_e 3581 :
2271 peter_e 3582 GIC 184354 : start_lsn = pq_getmsgint64(&s);
3583 184354 : end_lsn = pq_getmsgint64(&s);
2236 tgl 3584 CBC 184354 : send_time = pq_getmsgint64(&s);
2271 peter_e 3585 ECB :
2271 peter_e 3586 CBC 184354 : if (last_received < start_lsn)
3587 148772 : last_received = start_lsn;
3588 :
2271 peter_e 3589 GIC 184354 : if (last_received < end_lsn)
2271 peter_e 3590 UIC 0 : last_received = end_lsn;
2271 peter_e 3591 ECB :
2271 peter_e 3592 CBC 184354 : UpdateWorkerStats(last_received, send_time, false);
2271 peter_e 3593 ECB :
2271 peter_e 3594 GIC 184354 : apply_dispatch(&s);
3595 : }
2271 peter_e 3596 CBC 1295 : else if (c == 'k')
2271 peter_e 3597 ECB : {
3598 : XLogRecPtr end_lsn;
3599 : TimestampTz timestamp;
2236 tgl 3600 : bool reply_requested;
3601 :
2208 peter_e 3602 GIC 1295 : end_lsn = pq_getmsgint64(&s);
2236 tgl 3603 CBC 1295 : timestamp = pq_getmsgint64(&s);
2271 peter_e 3604 GIC 1295 : reply_requested = pq_getmsgbyte(&s);
3605 :
2208 peter_e 3606 CBC 1295 : if (last_received < end_lsn)
2208 peter_e 3607 GIC 678 : last_received = end_lsn;
2208 peter_e 3608 ECB :
2208 peter_e 3609 CBC 1295 : send_feedback(last_received, reply_requested, false);
2271 peter_e 3610 GIC 1295 : UpdateWorkerStats(last_received, timestamp, true);
2271 peter_e 3611 ECB : }
3612 : /* other message types are purposefully ignored */
2161 3613 :
2161 peter_e 3614 GIC 185621 : MemoryContextReset(ApplyMessageContext);
3615 : }
3616 :
697 alvherre 3617 185621 : len = walrcv_receive(LogRepWorkerWalRcvConn, &buf, &fd);
3618 : }
3619 : }
3620 :
3621 : /* confirm all writes so far */
2108 tgl 3622 CBC 57343 : send_feedback(last_received, false, false);
3623 :
948 akapila 3624 GIC 57343 : if (!in_remote_transaction && !in_streamed_transaction)
3625 : {
3626 : /*
2271 peter_e 3627 ECB : * If we didn't get any transactions for a while there might be
2153 bruce 3628 : * unconsumed invalidation messages in the queue, consume them
3629 : * now.
3630 : */
2208 peter_e 3631 GIC 2237 : AcceptInvalidationMessages();
2136 3632 2237 : maybe_reread_subscription();
3633 :
2208 peter_e 3634 ECB : /* Process any table synchronization changes. */
2208 peter_e 3635 CBC 2206 : process_syncing_tables(last_received);
3636 : }
2271 peter_e 3637 ECB :
3638 : /* Cleanup the memory. */
2161 peter_e 3639 GIC 57158 : MemoryContextResetAndDeleteChildren(ApplyMessageContext);
2271 3640 57158 : MemoryContextSwitchTo(TopMemoryContext);
2271 peter_e 3641 ECB :
3642 : /* Check if we need to exit the streaming loop. */
2271 peter_e 3643 CBC 57158 : if (endofstream)
2271 peter_e 3644 GIC 3 : break;
3645 :
2271 peter_e 3646 ECB : /*
2108 tgl 3647 : * Wait for more data or latch. If we have unflushed transactions,
3648 : * wake up after WalWriterDelay to see if they've been flushed yet (in
3649 : * which case we should send a feedback message). Otherwise, there's
3650 : * no particular urgency about waking up unless we get data or a
3651 : * signal.
3652 : */
2108 tgl 3653 GIC 57155 : if (!dlist_is_empty(&lsn_mapping))
3654 6346 : wait_time = WalWriterDelay;
3655 : else
3656 50809 : wait_time = NAPTIME_PER_CYCLE;
2108 tgl 3657 ECB :
2133 andres 3658 GIC 57155 : rc = WaitLatchOrSocket(MyLatch,
3659 : WL_SOCKET_READABLE | WL_LATCH_SET |
3660 : WL_TIMEOUT | WL_EXIT_ON_PM_DEATH,
3661 : fd, wait_time,
3662 : WAIT_EVENT_LOGICAL_APPLY_MAIN);
3663 :
2133 andres 3664 CBC 57155 : if (rc & WL_LATCH_SET)
2133 andres 3665 ECB : {
2133 andres 3666 CBC 431 : ResetLatch(MyLatch);
2133 andres 3667 GIC 431 : CHECK_FOR_INTERRUPTS();
3668 : }
3669 :
1209 rhaas 3670 57102 : if (ConfigReloadPending)
3671 : {
3672 13 : ConfigReloadPending = false;
2190 peter_e 3673 13 : ProcessConfigFile(PGC_SIGHUP);
3674 : }
3675 :
2271 peter_e 3676 CBC 57102 : if (rc & WL_TIMEOUT)
3677 : {
3678 : /*
3679 : * We didn't receive anything new. If we haven't heard anything
3680 : * from the server for more than wal_receiver_timeout / 2, ping
3681 : * the server. Also, if it's been longer than
3682 : * wal_receiver_status_interval since the last update we sent,
3683 : * send a status update to the primary anyway, to report any
3684 : * progress in applying WAL.
3685 : */
2271 peter_e 3686 GIC 105 : bool requestReply = false;
3687 :
2271 peter_e 3688 ECB : /*
3689 : * Check if time since last receive from primary has reached the
2153 bruce 3690 : * configured limit.
3691 : */
2271 peter_e 3692 CBC 105 : if (wal_receiver_timeout > 0)
3693 : {
2271 peter_e 3694 GIC 105 : TimestampTz now = GetCurrentTimestamp();
2271 peter_e 3695 ECB : TimestampTz timeout;
3696 :
2271 peter_e 3697 GIC 105 : timeout =
3698 105 : TimestampTzPlusMilliseconds(last_recv_timestamp,
3699 : wal_receiver_timeout);
2271 peter_e 3700 ECB :
2271 peter_e 3701 GIC 105 : if (now >= timeout)
2271 peter_e 3702 UIC 0 : ereport(ERROR,
662 tgl 3703 ECB : (errcode(ERRCODE_CONNECTION_FAILURE),
3704 : errmsg("terminating logical replication worker due to timeout")));
3705 :
3706 : /* Check to see if it's time for a ping. */
2271 peter_e 3707 GIC 105 : if (!ping_sent)
3708 : {
3709 105 : timeout = TimestampTzPlusMilliseconds(last_recv_timestamp,
2118 tgl 3710 ECB : (wal_receiver_timeout / 2));
2271 peter_e 3711 GIC 105 : if (now >= timeout)
3712 : {
2271 peter_e 3713 UIC 0 : requestReply = true;
3714 0 : ping_sent = true;
2271 peter_e 3715 ECB : }
3716 : }
3717 : }
3718 :
2271 peter_e 3719 GIC 105 : send_feedback(last_received, requestReply, requestReply);
3720 :
3721 : /*
3722 : * Force reporting to ensure long idle periods don't lead to
3723 : * arbitrarily delayed stats. Stats can only be reported outside
3724 : * of (implicit or explicit) transactions. That shouldn't lead to
3725 : * stats being delayed for long, because transactions are either
332 andres 3726 ECB : * sent as a whole on commit or streamed. Streamed transactions
3727 : * are spilled to disk and applied on commit.
3728 : */
332 andres 3729 GIC 105 : if (!IsTransactionState())
332 andres 3730 CBC 105 : pgstat_report_stat(true);
2271 peter_e 3731 ECB : }
3732 : }
906 alvherre 3733 :
590 akapila 3734 : /* Pop the error context stack */
590 akapila 3735 GIC 3 : error_context_stack = errcallback.previous;
90 akapila 3736 GNC 3 : apply_error_context_stack = error_context_stack;
3737 :
906 alvherre 3738 ECB : /* All done */
697 alvherre 3739 GIC 3 : walrcv_endstreaming(LogRepWorkerWalRcvConn, &tli);
2271 peter_e 3740 UIC 0 : }
3741 :
3742 : /*
2271 peter_e 3743 ECB : * Send a Standby Status Update message to server.
3744 : *
3745 : * 'recvpos' is the latest LSN we've received data to, force is set if we need
3746 : * to send a response to avoid timeouts.
3747 : */
3748 : static void
2271 peter_e 3749 GIC 58743 : send_feedback(XLogRecPtr recvpos, bool force, bool requestReply)
3750 : {
3751 : static StringInfo reply_message = NULL;
3752 : static TimestampTz send_time = 0;
3753 :
3754 : static XLogRecPtr last_recvpos = InvalidXLogRecPtr;
2271 peter_e 3755 ECB : static XLogRecPtr last_writepos = InvalidXLogRecPtr;
3756 : static XLogRecPtr last_flushpos = InvalidXLogRecPtr;
3757 :
3758 : XLogRecPtr writepos;
3759 : XLogRecPtr flushpos;
3760 : TimestampTz now;
3761 : bool have_pending_txes;
3762 :
3763 : /*
3764 : * If the user doesn't want status to be reported to the publisher, be
3765 : * sure to exit before doing anything at all.
3766 : */
2271 peter_e 3767 CBC 58743 : if (!force && wal_receiver_status_interval <= 0)
3768 15273 : return;
3769 :
2271 peter_e 3770 ECB : /* It's legal to not pass a recvpos */
2271 peter_e 3771 CBC 58743 : if (recvpos < last_recvpos)
2271 peter_e 3772 UIC 0 : recvpos = last_recvpos;
3773 :
2271 peter_e 3774 GIC 58743 : get_flush_position(&writepos, &flushpos, &have_pending_txes);
3775 :
2271 peter_e 3776 ECB : /*
2153 bruce 3777 : * No outstanding transactions to flush, we can report the latest received
3778 : * position. This is important for synchronous replication.
2271 peter_e 3779 : */
2271 peter_e 3780 GIC 58743 : if (!have_pending_txes)
3781 51895 : flushpos = writepos = recvpos;
2271 peter_e 3782 ECB :
2271 peter_e 3783 GIC 58743 : if (writepos < last_writepos)
2271 peter_e 3784 LBC 0 : writepos = last_writepos;
3785 :
2271 peter_e 3786 CBC 58743 : if (flushpos < last_flushpos)
2271 peter_e 3787 GIC 6812 : flushpos = last_flushpos;
3788 :
2271 peter_e 3789 CBC 58743 : now = GetCurrentTimestamp();
3790 :
2271 peter_e 3791 ECB : /* if we've already reported everything we're good */
2271 peter_e 3792 CBC 58743 : if (!force &&
2271 peter_e 3793 GIC 58742 : writepos == last_writepos &&
2271 peter_e 3794 CBC 15447 : flushpos == last_flushpos &&
2271 peter_e 3795 GIC 15350 : !TimestampDifferenceExceeds(send_time, now,
2271 peter_e 3796 ECB : wal_receiver_status_interval * 1000))
2271 peter_e 3797 CBC 15273 : return;
2271 peter_e 3798 GIC 43470 : send_time = now;
3799 :
2271 peter_e 3800 CBC 43470 : if (!reply_message)
3801 : {
2153 bruce 3802 282 : MemoryContext oldctx = MemoryContextSwitchTo(ApplyContext);
3803 :
2271 peter_e 3804 GIC 282 : reply_message = makeStringInfo();
3805 282 : MemoryContextSwitchTo(oldctx);
3806 : }
3807 : else
3808 43188 : resetStringInfo(reply_message);
3809 :
3810 43470 : pq_sendbyte(reply_message, 'r');
2118 tgl 3811 43470 : pq_sendint64(reply_message, recvpos); /* write */
3812 43470 : pq_sendint64(reply_message, flushpos); /* flush */
3813 43470 : pq_sendint64(reply_message, writepos); /* apply */
2153 bruce 3814 43470 : pq_sendint64(reply_message, now); /* sendTime */
2271 peter_e 3815 43470 : pq_sendbyte(reply_message, requestReply); /* replyRequested */
3816 :
2271 peter_e 3817 CBC 43470 : elog(DEBUG2, "sending feedback (force %d) to recv %X/%X, write %X/%X, flush %X/%X",
3818 : force,
3819 : LSN_FORMAT_ARGS(recvpos),
3820 : LSN_FORMAT_ARGS(writepos),
775 peter 3821 ECB : LSN_FORMAT_ARGS(flushpos));
3822 :
697 alvherre 3823 GIC 43470 : walrcv_send(LogRepWorkerWalRcvConn,
3824 : reply_message->data, reply_message->len);
3825 :
2271 peter_e 3826 43470 : if (recvpos > last_recvpos)
3827 43295 : last_recvpos = recvpos;
3828 43470 : if (writepos > last_writepos)
2271 peter_e 3829 CBC 43295 : last_writepos = writepos;
2271 peter_e 3830 GIC 43470 : if (flushpos > last_flushpos)
3831 43070 : last_flushpos = flushpos;
3832 : }
2271 peter_e 3833 ECB :
3834 : /*
3835 : * Exit routine for apply workers due to subscription parameter changes.
3836 : */
3837 : static void
90 akapila 3838 GNC 31 : apply_worker_exit(void)
3839 : {
3840 31 : if (am_parallel_apply_worker())
3841 : {
3842 : /*
3843 : * Don't stop the parallel apply worker as the leader will detect the
3844 : * subscription parameter change and restart logical replication later
3845 : * anyway. This also prevents the leader from reporting errors when
3846 : * trying to communicate with a stopped parallel apply worker, which
3847 : * would accidentally disable subscriptions if disable_on_error was
3848 : * set.
3849 : */
90 akapila 3850 UNC 0 : return;
3851 : }
3852 :
3853 : /*
3854 : * Reset the last-start time for this apply worker so that the launcher
3855 : * will restart it without waiting for wal_retrieve_retry_interval if the
3856 : * subscription is still active, and so that we won't leak that hash table
3857 : * entry if it isn't.
3858 : */
77 tgl 3859 GNC 31 : if (!am_tablesync_worker())
3860 31 : ApplyLauncherForgetWorkerStartTime(MyLogicalRepWorker->subid);
3861 :
90 akapila 3862 31 : proc_exit(0);
3863 : }
3864 :
3865 : /*
3866 : * Reread subscription info if needed. Most changes will be exit.
3867 : */
3868 : void
2136 peter_e 3869 GIC 3141 : maybe_reread_subscription(void)
3870 : {
2153 bruce 3871 ECB : MemoryContext oldctx;
3872 : Subscription *newsub;
2153 bruce 3873 GIC 3141 : bool started_tx = false;
2208 peter_e 3874 ECB :
3875 : /* When cache state is valid there is nothing to do here. */
2136 peter_e 3876 GIC 3141 : if (MySubscriptionValid)
3877 3084 : return;
3878 :
3879 : /* This function might be called inside or outside of transaction. */
2208 peter_e 3880 CBC 57 : if (!IsTransactionState())
2208 peter_e 3881 ECB : {
2208 peter_e 3882 GIC 55 : StartTransactionCommand();
2208 peter_e 3883 CBC 55 : started_tx = true;
3884 : }
3885 :
3886 : /* Ensure allocations in permanent context. */
2161 peter_e 3887 GIC 57 : oldctx = MemoryContextSwitchTo(ApplyContext);
3888 :
2271 3889 57 : newsub = GetSubscription(MyLogicalRepWorker->subid, true);
3890 :
3891 : /*
3892 : * Exit if the subscription was removed. This normally should not happen
2153 bruce 3893 ECB : * as the worker gets killed during DROP SUBSCRIPTION.
2271 peter_e 3894 : */
2267 peter_e 3895 GIC 57 : if (!newsub)
2271 peter_e 3896 ECB : {
2271 peter_e 3897 LBC 0 : ereport(LOG,
3898 : /* translator: first %s is the name of logical replication worker */
3899 : (errmsg("%s for subscription \"%s\" will stop because the subscription was removed",
3900 : get_worker_name(), MySubscription->name)));
2271 peter_e 3901 ECB :
3902 : /* Ensure we remove no-longer-useful entry for worker's start time */
77 tgl 3903 UNC 0 : if (!am_tablesync_worker() && !am_parallel_apply_worker())
3904 0 : ApplyLauncherForgetWorkerStartTime(MyLogicalRepWorker->subid);
2271 peter_e 3905 LBC 0 : proc_exit(0);
3906 : }
3907 :
3908 : /* Exit if the subscription was disabled. */
2161 peter_e 3909 GIC 57 : if (!newsub->enabled)
3910 : {
3911 2 : ereport(LOG,
3912 : /* translator: first %s is the name of logical replication worker */
3913 : (errmsg("%s for subscription \"%s\" will stop because the subscription was disabled",
3914 : get_worker_name(), MySubscription->name)));
3915 :
90 akapila 3916 GNC 2 : apply_worker_exit();
3917 : }
2161 peter_e 3918 ECB :
3919 : /* !slotname should never happen when enabled is true. */
2161 peter_e 3920 GIC 55 : Assert(newsub->slotname);
2161 peter_e 3921 ECB :
3922 : /* two-phase should not be altered */
634 akapila 3923 GIC 55 : Assert(newsub->twophasestate == MySubscription->twophasestate);
3924 :
2197 peter_e 3925 ECB : /*
3926 : * Exit if any parameter that affects the remote connection was changed.
3927 : * The launcher will start a new worker but note that the parallel apply
3928 : * worker won't restart if the streaming option's value is changed from
3929 : * 'parallel' to any other value or the server decides not to stream the
3930 : * in-progress transaction.
3931 : */
995 tgl 3932 GIC 55 : if (strcmp(newsub->conninfo, MySubscription->conninfo) != 0 ||
995 tgl 3933 GBC 54 : strcmp(newsub->name, MySubscription->name) != 0 ||
3934 53 : strcmp(newsub->slotname, MySubscription->slotname) != 0 ||
995 tgl 3935 GIC 53 : newsub->binary != MySubscription->binary ||
948 akapila 3936 47 : newsub->stream != MySubscription->stream ||
262 akapila 3937 GNC 42 : strcmp(newsub->origin, MySubscription->origin) != 0 ||
457 jdavis 3938 GIC 42 : newsub->owner != MySubscription->owner ||
995 tgl 3939 CBC 42 : !equal(newsub->publications, MySubscription->publications))
2271 peter_e 3940 ECB : {
90 akapila 3941 GNC 29 : if (am_parallel_apply_worker())
90 akapila 3942 UNC 0 : ereport(LOG,
3943 : (errmsg("logical replication parallel apply worker for subscription \"%s\" will stop because of a parameter change",
3944 : MySubscription->name)));
3945 : else
90 akapila 3946 GNC 29 : ereport(LOG,
3947 : /* translator: first %s is the name of logical replication worker */
3948 : (errmsg("%s for subscription \"%s\" will restart because of a parameter change",
3949 : get_worker_name(), MySubscription->name)));
2271 peter_e 3950 ECB :
90 akapila 3951 GNC 29 : apply_worker_exit();
3952 : }
3953 :
2271 peter_e 3954 ECB : /* Check for other changes that should never happen too. */
2197 peter_e 3955 CBC 26 : if (newsub->dbid != MySubscription->dbid)
2271 peter_e 3956 ECB : {
2271 peter_e 3957 UIC 0 : elog(ERROR, "subscription %u changed unexpectedly",
3958 : MyLogicalRepWorker->subid);
2271 peter_e 3959 ECB : }
3960 :
3961 : /* Clean old subscription info and switch to new one. */
2271 peter_e 3962 GIC 26 : FreeSubscription(MySubscription);
3963 26 : MySubscription = newsub;
3964 :
3965 26 : MemoryContextSwitchTo(oldctx);
3966 :
3967 : /* Change synchronous commit according to the user's wishes */
2186 3968 26 : SetConfigOption("synchronous_commit", MySubscription->synccommit,
2186 peter_e 3969 ECB : PGC_BACKEND, PGC_S_OVERRIDE);
3970 :
2208 peter_e 3971 CBC 26 : if (started_tx)
3972 24 : CommitTransactionCommand();
3973 :
2271 3974 26 : MySubscriptionValid = true;
2271 peter_e 3975 ECB : }
3976 :
2271 peter_e 3977 EUB : /*
3978 : * Callback from subscription syscache invalidation.
3979 : */
3980 : static void
2271 peter_e 3981 GIC 58 : subscription_change_cb(Datum arg, int cacheid, uint32 hashvalue)
3982 : {
2271 peter_e 3983 CBC 58 : MySubscriptionValid = false;
2271 peter_e 3984 GIC 58 : }
2271 peter_e 3985 ECB :
3986 : /*
948 akapila 3987 : * subxact_info_write
3988 : * Store information about subxacts for a toplevel transaction.
3989 : *
3990 : * For each subxact we store offset of it's first change in the main file.
3991 : * The file is always over-written as a whole.
3992 : *
3993 : * XXX We should only store subxacts that were not aborted yet.
948 akapila 3994 EUB : */
3995 : static void
948 akapila 3996 GIC 370 : subxact_info_write(Oid subid, TransactionId xid)
3997 : {
3998 : char path[MAXPGPATH];
3999 : Size len;
4000 : BufFile *fd;
4001 :
4002 370 : Assert(TransactionIdIsValid(xid));
4003 :
4004 : /* construct the subxact filename */
584 4005 370 : subxact_filename(path, subid, xid);
948 akapila 4006 ECB :
4007 : /* Delete the subxacts file, if exists. */
948 akapila 4008 CBC 370 : if (subxact_data.nsubxacts == 0)
948 akapila 4009 ECB : {
584 akapila 4010 CBC 288 : cleanup_subxact_info();
4011 288 : BufFileDeleteFileSet(MyLogicalRepWorker->stream_fileset, path, true);
584 akapila 4012 ECB :
948 akapila 4013 CBC 288 : return;
948 akapila 4014 ECB : }
4015 :
4016 : /*
4017 : * Create the subxact file if it not already created, otherwise open the
4018 : * existing file.
4019 : */
584 akapila 4020 GIC 82 : fd = BufFileOpenFileSet(MyLogicalRepWorker->stream_fileset, path, O_RDWR,
4021 : true);
4022 82 : if (fd == NULL)
584 akapila 4023 CBC 8 : fd = BufFileCreateFileSet(MyLogicalRepWorker->stream_fileset, path);
948 akapila 4024 ECB :
948 akapila 4025 GBC 82 : len = sizeof(SubXactInfo) * subxact_data.nsubxacts;
4026 :
948 akapila 4027 ECB : /* Write the subxact count and subxact info */
948 akapila 4028 GIC 82 : BufFileWrite(fd, &subxact_data.nsubxacts, sizeof(subxact_data.nsubxacts));
948 akapila 4029 CBC 82 : BufFileWrite(fd, subxact_data.subxacts, len);
4030 :
4031 82 : BufFileClose(fd);
4032 :
948 akapila 4033 ECB : /* free the memory allocated for subxact info */
948 akapila 4034 GIC 82 : cleanup_subxact_info();
4035 : }
948 akapila 4036 ECB :
4037 : /*
4038 : * subxact_info_read
4039 : * Restore information about subxacts of a streamed transaction.
4040 : *
4041 : * Read information about subxacts into the structure subxact_data that can be
4042 : * used later.
948 akapila 4043 EUB : */
4044 : static void
948 akapila 4045 GIC 342 : subxact_info_read(Oid subid, TransactionId xid)
4046 : {
948 akapila 4047 ECB : char path[MAXPGPATH];
4048 : Size len;
4049 : BufFile *fd;
4050 : MemoryContext oldctx;
4051 :
948 akapila 4052 GIC 342 : Assert(!subxact_data.subxacts);
4053 342 : Assert(subxact_data.nsubxacts == 0);
4054 342 : Assert(subxact_data.nsubxacts_max == 0);
4055 :
4056 : /*
584 akapila 4057 ECB : * If the subxact file doesn't exist that means we don't have any subxact
4058 : * info.
4059 : */
948 akapila 4060 CBC 342 : subxact_filename(path, subid, xid);
584 akapila 4061 GIC 342 : fd = BufFileOpenFileSet(MyLogicalRepWorker->stream_fileset, path, O_RDONLY,
4062 : true);
4063 342 : if (fd == NULL)
584 akapila 4064 CBC 263 : return;
4065 :
948 akapila 4066 ECB : /* read number of subxact items */
83 peter 4067 GNC 79 : BufFileReadExact(fd, &subxact_data.nsubxacts, sizeof(subxact_data.nsubxacts));
948 akapila 4068 ECB :
948 akapila 4069 GIC 79 : len = sizeof(SubXactInfo) * subxact_data.nsubxacts;
4070 :
4071 : /* we keep the maximum as a power of 2 */
4072 79 : subxact_data.nsubxacts_max = 1 << my_log2(subxact_data.nsubxacts);
4073 :
948 akapila 4074 ECB : /*
4075 : * Allocate subxact information in the logical streaming context. We need
948 akapila 4076 EUB : * this information during the complete stream so that we can add the sub
4077 : * transaction info to this. On stream stop we will flush this information
4078 : * to the subxact file and reset the logical streaming context.
4079 : */
948 akapila 4080 CBC 79 : oldctx = MemoryContextSwitchTo(LogicalStreamingContext);
4081 79 : subxact_data.subxacts = palloc(subxact_data.nsubxacts_max *
948 akapila 4082 ECB : sizeof(SubXactInfo));
948 akapila 4083 CBC 79 : MemoryContextSwitchTo(oldctx);
4084 :
83 peter 4085 79 : if (len > 0)
83 peter 4086 GNC 79 : BufFileReadExact(fd, subxact_data.subxacts, len);
4087 :
948 akapila 4088 GIC 79 : BufFileClose(fd);
4089 : }
4090 :
4091 : /*
4092 : * subxact_info_add
4093 : * Add information about a subxact (offset in the main file).
948 akapila 4094 ECB : */
4095 : static void
948 akapila 4096 GIC 102513 : subxact_info_add(TransactionId xid)
4097 : {
4098 102513 : SubXactInfo *subxacts = subxact_data.subxacts;
948 akapila 4099 ECB : int64 i;
4100 :
4101 : /* We must have a valid top level stream xid and a stream fd. */
948 akapila 4102 CBC 102513 : Assert(TransactionIdIsValid(stream_xid));
948 akapila 4103 GIC 102513 : Assert(stream_fd != NULL);
948 akapila 4104 ECB :
4105 : /*
4106 : * If the XID matches the toplevel transaction, we don't want to add it.
4107 : */
948 akapila 4108 CBC 102513 : if (stream_xid == xid)
948 akapila 4109 GIC 92389 : return;
948 akapila 4110 ECB :
4111 : /*
4112 : * In most cases we're checking the same subxact as we've already seen in
4113 : * the last call, so make sure to ignore it (this change comes later).
4114 : */
948 akapila 4115 GIC 10124 : if (subxact_data.subxact_last == xid)
4116 10048 : return;
4117 :
4118 : /* OK, remember we're processing this XID. */
4119 76 : subxact_data.subxact_last = xid;
4120 :
948 akapila 4121 ECB : /*
4122 : * Check if the transaction is already present in the array of subxact. We
4123 : * intentionally scan the array from the tail, because we're likely adding
4124 : * a change for the most recent subtransactions.
4125 : *
4126 : * XXX Can we rely on the subxact XIDs arriving in sorted order? That
4127 : * would allow us to use binary search here.
4128 : */
948 akapila 4129 GIC 95 : for (i = subxact_data.nsubxacts; i > 0; i--)
4130 : {
948 akapila 4131 ECB : /* found, so we're done */
948 akapila 4132 CBC 76 : if (subxacts[i - 1].xid == xid)
948 akapila 4133 GIC 57 : return;
948 akapila 4134 ECB : }
4135 :
4136 : /* This is a new subxact, so we need to add it to the array. */
948 akapila 4137 CBC 19 : if (subxact_data.nsubxacts == 0)
948 akapila 4138 ECB : {
4139 : MemoryContext oldctx;
4140 :
948 akapila 4141 CBC 8 : subxact_data.nsubxacts_max = 128;
948 akapila 4142 ECB :
4143 : /*
4144 : * Allocate this memory for subxacts in per-stream context, see
4145 : * subxact_info_read.
4146 : */
948 akapila 4147 GIC 8 : oldctx = MemoryContextSwitchTo(LogicalStreamingContext);
948 akapila 4148 CBC 8 : subxacts = palloc(subxact_data.nsubxacts_max * sizeof(SubXactInfo));
4149 8 : MemoryContextSwitchTo(oldctx);
948 akapila 4150 ECB : }
948 akapila 4151 GIC 11 : else if (subxact_data.nsubxacts == subxact_data.nsubxacts_max)
948 akapila 4152 ECB : {
948 akapila 4153 CBC 10 : subxact_data.nsubxacts_max *= 2;
4154 10 : subxacts = repalloc(subxacts,
948 akapila 4155 GIC 10 : subxact_data.nsubxacts_max * sizeof(SubXactInfo));
948 akapila 4156 ECB : }
4157 :
948 akapila 4158 CBC 19 : subxacts[subxact_data.nsubxacts].xid = xid;
4159 :
948 akapila 4160 ECB : /*
4161 : * Get the current offset of the stream file and store it as offset of
4162 : * this subxact.
4163 : */
948 akapila 4164 CBC 19 : BufFileTell(stream_fd,
4165 19 : &subxacts[subxact_data.nsubxacts].fileno,
4166 19 : &subxacts[subxact_data.nsubxacts].offset);
4167 :
4168 19 : subxact_data.nsubxacts++;
4169 19 : subxact_data.subxacts = subxacts;
948 akapila 4170 ECB : }
4171 :
948 akapila 4172 EUB : /* format filename for file containing the info about subxacts */
4173 : static inline void
948 akapila 4174 GIC 743 : subxact_filename(char *path, Oid subid, TransactionId xid)
4175 : {
4176 743 : snprintf(path, MAXPGPATH, "%u-%u.subxacts", subid, xid);
4177 743 : }
4178 :
948 akapila 4179 EUB : /* format filename for file containing serialized changes */
4180 : static inline void
948 akapila 4181 CBC 436 : changes_filename(char *path, Oid subid, TransactionId xid)
948 akapila 4182 ECB : {
948 akapila 4183 CBC 436 : snprintf(path, MAXPGPATH, "%u-%u.changes", subid, xid);
948 akapila 4184 GIC 436 : }
948 akapila 4185 ECB :
4186 : /*
4187 : * stream_cleanup_files
4188 : * Cleanup files for a subscription / toplevel transaction.
4189 : *
4190 : * Remove files with serialized changes and subxact info for a particular
584 4191 : * toplevel transaction. Each subscription has a separate set of files
4192 : * for any toplevel transaction.
948 4193 : */
4194 : void
948 akapila 4195 CBC 31 : stream_cleanup_files(Oid subid, TransactionId xid)
4196 : {
948 akapila 4197 ECB : char path[MAXPGPATH];
4198 :
584 4199 : /* Delete the changes file. */
948 akapila 4200 GIC 31 : changes_filename(path, subid, xid);
584 akapila 4201 CBC 31 : BufFileDeleteFileSet(MyLogicalRepWorker->stream_fileset, path, false);
948 akapila 4202 ECB :
584 4203 : /* Delete the subxact file, if it exists. */
584 akapila 4204 GIC 31 : subxact_filename(path, subid, xid);
584 akapila 4205 CBC 31 : BufFileDeleteFileSet(MyLogicalRepWorker->stream_fileset, path, true);
948 4206 31 : }
948 akapila 4207 ECB :
4208 : /*
4209 : * stream_open_file
4210 : * Open a file that we'll use to serialize changes for a toplevel
4211 : * transaction.
4212 : *
4213 : * Open a file for streamed changes from a toplevel transaction identified
4214 : * by stream_xid (global variable). If it's the first chunk of streamed
584 4215 : * changes for this transaction, create the buffile, otherwise open the
4216 : * previously created file.
4217 : */
4218 : static void
948 akapila 4219 GIC 361 : stream_open_file(Oid subid, TransactionId xid, bool first_segment)
4220 : {
948 akapila 4221 ECB : char path[MAXPGPATH];
4222 : MemoryContext oldcxt;
4223 :
948 akapila 4224 GIC 361 : Assert(OidIsValid(subid));
4225 361 : Assert(TransactionIdIsValid(xid));
4226 361 : Assert(stream_fd == NULL);
4227 :
4228 :
4229 361 : changes_filename(path, subid, xid);
4230 361 : elog(DEBUG1, "opening file \"%s\" for streamed changes", path);
4231 :
4232 : /*
4233 : * Create/open the buffiles under the logical streaming context so that we
4234 : * have those files until stream stop.
4235 : */
4236 361 : oldcxt = MemoryContextSwitchTo(LogicalStreamingContext);
948 akapila 4237 ECB :
4238 : /*
4239 : * If this is the first streamed segment, create the changes file.
4240 : * Otherwise, just open the file for writing, in append mode.
4241 : */
948 akapila 4242 GIC 361 : if (first_segment)
584 akapila 4243 CBC 32 : stream_fd = BufFileCreateFileSet(MyLogicalRepWorker->stream_fileset,
584 akapila 4244 ECB : path);
4245 : else
948 4246 : {
4247 : /*
4248 : * Open the file and seek to the end of the file because we always
4249 : * append the changes file.
4250 : */
584 akapila 4251 CBC 329 : stream_fd = BufFileOpenFileSet(MyLogicalRepWorker->stream_fileset,
4252 : path, O_RDWR, false);
948 4253 329 : BufFileSeek(stream_fd, 0, 0, SEEK_END);
4254 : }
948 akapila 4255 ECB :
948 akapila 4256 CBC 361 : MemoryContextSwitchTo(oldcxt);
4257 361 : }
4258 :
4259 : /*
4260 : * stream_close_file
4261 : * Close the currently open file with streamed changes.
4262 : */
948 akapila 4263 ECB : static void
948 akapila 4264 GIC 391 : stream_close_file(void)
948 akapila 4265 ECB : {
948 akapila 4266 GIC 391 : Assert(stream_fd != NULL);
4267 :
4268 391 : BufFileClose(stream_fd);
948 akapila 4269 ECB :
948 akapila 4270 GIC 391 : stream_fd = NULL;
4271 391 : }
4272 :
4273 : /*
4274 : * stream_write_change
948 akapila 4275 ECB : * Serialize a change to a file for the current toplevel transaction.
4276 : *
4277 : * The change is serialized in a simple format, with length (not including
4278 : * the length), action code (identifying the message type) and message
4279 : * contents (without the subxact TransactionId value).
4280 : */
4281 : static void
948 akapila 4282 GIC 107554 : stream_write_change(char action, StringInfo s)
948 akapila 4283 ECB : {
4284 : int len;
4285 :
948 akapila 4286 GIC 107554 : Assert(stream_fd != NULL);
4287 :
948 akapila 4288 ECB : /* total on-disk size, including the action type character */
948 akapila 4289 CBC 107554 : len = (s->len - s->cursor) + sizeof(char);
948 akapila 4290 ECB :
4291 : /* first write the size */
948 akapila 4292 CBC 107554 : BufFileWrite(stream_fd, &len, sizeof(len));
948 akapila 4293 ECB :
4294 : /* then the action */
948 akapila 4295 GIC 107554 : BufFileWrite(stream_fd, &action, sizeof(action));
4296 :
4297 : /* and finally the remaining part of the buffer (after the XID) */
4298 107554 : len = (s->len - s->cursor);
948 akapila 4299 ECB :
948 akapila 4300 GIC 107554 : BufFileWrite(stream_fd, &s->data[s->cursor], len);
948 akapila 4301 CBC 107554 : }
948 akapila 4302 ECB :
4303 : /*
4304 : * stream_open_and_write_change
4305 : * Serialize a message to a file for the given transaction.
4306 : *
4307 : * This function is similar to stream_write_change except that it will open the
4308 : * target file if not already before writing the message and close the file at
4309 : * the end.
4310 : */
4311 : static void
90 akapila 4312 GNC 5 : stream_open_and_write_change(TransactionId xid, char action, StringInfo s)
4313 : {
4314 5 : Assert(!in_streamed_transaction);
4315 :
4316 5 : if (!stream_fd)
4317 5 : stream_start_internal(xid, false);
4318 :
4319 5 : stream_write_change(action, s);
4320 5 : stream_stop_internal(xid);
4321 5 : }
4322 :
948 akapila 4323 ECB : /*
4324 : * Cleanup the memory for subxacts and reset the related variables.
4325 : */
4326 : static inline void
948 akapila 4327 CBC 374 : cleanup_subxact_info()
4328 : {
4329 374 : if (subxact_data.subxacts)
948 akapila 4330 GIC 87 : pfree(subxact_data.subxacts);
4331 :
4332 374 : subxact_data.subxacts = NULL;
4333 374 : subxact_data.subxact_last = InvalidTransactionId;
4334 374 : subxact_data.nsubxacts = 0;
948 akapila 4335 CBC 374 : subxact_data.nsubxacts_max = 0;
948 akapila 4336 GIC 374 : }
948 akapila 4337 ECB :
634 4338 : /*
4339 : * Form the prepared transaction GID for two_phase transactions.
4340 : *
4341 : * Return the GID in the supplied buffer.
4342 : */
4343 : static void
634 akapila 4344 GIC 45 : TwoPhaseTransactionGid(Oid subid, TransactionId xid, char *gid, int szgid)
4345 : {
634 akapila 4346 CBC 45 : Assert(subid != InvalidRepOriginId);
4347 :
634 akapila 4348 GIC 45 : if (!TransactionIdIsValid(xid))
634 akapila 4349 UIC 0 : ereport(ERROR,
4350 : (errcode(ERRCODE_PROTOCOL_VIOLATION),
4351 : errmsg_internal("invalid two-phase transaction ID")));
4352 :
634 akapila 4353 GIC 45 : snprintf(gid, szgid, "pg_gid_%u_%u", subid, xid);
634 akapila 4354 CBC 45 : }
4355 :
4356 : /*
4357 : * Execute the initial sync with error handling. Disable the subscription,
4358 : * if it's required.
391 akapila 4359 ECB : *
4360 : * Allocate the slot name in long-lived context on return. Note that we don't
4361 : * handle FATAL errors which are probably because of system resource error and
4362 : * are not repeatable.
4363 : */
4364 : static void
391 akapila 4365 CBC 156 : start_table_sync(XLogRecPtr *origin_startpos, char **myslotname)
391 akapila 4366 ECB : {
390 akapila 4367 CBC 156 : char *syncslotname = NULL;
391 akapila 4368 ECB :
391 akapila 4369 GIC 156 : Assert(am_tablesync_worker());
4370 :
4371 156 : PG_TRY();
391 akapila 4372 ECB : {
4373 : /* Call initial sync. */
391 akapila 4374 GIC 156 : syncslotname = LogicalRepSyncTableStart(origin_startpos);
4375 : }
391 akapila 4376 CBC 7 : PG_CATCH();
391 akapila 4377 ECB : {
391 akapila 4378 GIC 7 : if (MySubscription->disableonerr)
4379 1 : DisableSubscriptionAndExit();
391 akapila 4380 ECB : else
4381 : {
4382 : /*
4383 : * Report the worker failed during table synchronization. Abort
4384 : * the current transaction so that the stats message is sent in an
4385 : * idle state.
4386 : */
391 akapila 4387 GIC 6 : AbortOutOfAnyTransaction();
4388 6 : pgstat_report_subscription_error(MySubscription->oid, false);
4389 :
4390 6 : PG_RE_THROW();
391 akapila 4391 ECB : }
4392 : }
391 akapila 4393 CBC 149 : PG_END_TRY();
4394 :
391 akapila 4395 ECB : /* allocate slot name in long-lived context */
391 akapila 4396 GIC 149 : *myslotname = MemoryContextStrdup(ApplyContext, syncslotname);
391 akapila 4397 CBC 149 : pfree(syncslotname);
391 akapila 4398 GIC 149 : }
391 akapila 4399 ECB :
4400 : /*
4401 : * Run the apply loop with error handling. Disable the subscription,
4402 : * if necessary.
4403 : *
4404 : * Note that we don't handle FATAL errors which are probably because
4405 : * of system resource error and are not repeatable.
4406 : */
4407 : static void
391 akapila 4408 GIC 282 : start_apply(XLogRecPtr origin_startpos)
4409 : {
391 akapila 4410 CBC 282 : PG_TRY();
391 akapila 4411 ECB : {
391 akapila 4412 GIC 282 : LogicalRepApplyLoop(origin_startpos);
4413 : }
391 akapila 4414 CBC 45 : PG_CATCH();
4415 : {
4416 45 : if (MySubscription->disableonerr)
4417 3 : DisableSubscriptionAndExit();
391 akapila 4418 ECB : else
4419 : {
4420 : /*
4421 : * Report the worker failed while applying changes. Abort the
4422 : * current transaction so that the stats message is sent in an
4423 : * idle state.
4424 : */
391 akapila 4425 GIC 42 : AbortOutOfAnyTransaction();
4426 42 : pgstat_report_subscription_error(MySubscription->oid, !am_tablesync_worker());
4427 :
4428 42 : PG_RE_THROW();
391 akapila 4429 ECB : }
4430 : }
391 akapila 4431 LBC 0 : PG_END_TRY();
391 akapila 4432 UIC 0 : }
391 akapila 4433 ECB :
4434 : /*
4435 : * Common initialization for leader apply worker and parallel apply worker.
4436 : *
4437 : * Initialize the database connection, in-memory subscription and necessary
4438 : * config options.
4439 : */
4440 : void
90 akapila 4441 GNC 315 : InitializeApplyWorker(void)
2271 peter_e 4442 EUB : {
2153 bruce 4443 ECB : MemoryContext oldctx;
4444 :
4445 : /* Run as replica session replication role. */
2271 peter_e 4446 GIC 315 : SetConfigOption("session_replication_role", "replica",
4447 : PGC_SUSET, PGC_S_OVERRIDE);
2271 peter_e 4448 ECB :
4449 : /* Connect to our database. */
2271 peter_e 4450 CBC 315 : BackgroundWorkerInitializeConnectionByOid(MyLogicalRepWorker->dbid,
1830 magnus 4451 GIC 315 : MyLogicalRepWorker->userid,
4452 : 0);
4453 :
4454 : /*
4455 : * Set always-secure search path, so malicious users can't redirect user
4456 : * code (e.g. pg_index.indexprs).
972 noah 4457 ECB : */
972 noah 4458 CBC 312 : SetConfigOption("search_path", "", PGC_SUSET, PGC_S_OVERRIDE);
4459 :
4460 : /* Load the subscription into persistent memory context. */
2161 peter_e 4461 312 : ApplyContext = AllocSetContextCreate(TopMemoryContext,
4462 : "ApplyContext",
4463 : ALLOCSET_DEFAULT_SIZES);
2271 peter_e 4464 GIC 312 : StartTransactionCommand();
2161 peter_e 4465 CBC 312 : oldctx = MemoryContextSwitchTo(ApplyContext);
1829 peter_e 4466 ECB :
1829 peter_e 4467 GIC 312 : MySubscription = GetSubscription(MyLogicalRepWorker->subid, true);
4468 312 : if (!MySubscription)
1829 peter_e 4469 ECB : {
1829 peter_e 4470 LBC 0 : ereport(LOG,
4471 : /* translator: %s is the name of logical replication worker */
4472 : (errmsg("%s for subscription %u will not start because the subscription was removed during startup",
4473 : get_worker_name(), MyLogicalRepWorker->subid)));
4474 :
4475 : /* Ensure we remove no-longer-useful entry for worker's start time */
77 tgl 4476 UNC 0 : if (!am_tablesync_worker() && !am_parallel_apply_worker())
4477 0 : ApplyLauncherForgetWorkerStartTime(MyLogicalRepWorker->subid);
1829 peter_e 4478 UIC 0 : proc_exit(0);
4479 : }
4480 :
2271 peter_e 4481 GIC 312 : MySubscriptionValid = true;
4482 312 : MemoryContextSwitchTo(oldctx);
2271 peter_e 4483 ECB :
2271 peter_e 4484 CBC 312 : if (!MySubscription->enabled)
4485 : {
2271 peter_e 4486 LBC 0 : ereport(LOG,
4487 : /* translator: first %s is the name of logical replication worker */
4488 : (errmsg("%s for subscription \"%s\" will not start because the subscription was disabled during startup",
4489 : get_worker_name(), MySubscription->name)));
4490 :
90 akapila 4491 UNC 0 : apply_worker_exit();
4492 : }
4493 :
1829 peter_e 4494 ECB : /* Setup synchronous commit according to the user's wishes */
1829 peter_e 4495 GIC 312 : SetConfigOption("synchronous_commit", MySubscription->synccommit,
1829 peter_e 4496 ECB : PGC_BACKEND, PGC_S_OVERRIDE);
4497 :
4498 : /* Keep us informed about subscription changes. */
2271 peter_e 4499 GIC 312 : CacheRegisterSyscacheCallback(SUBSCRIPTIONOID,
2271 peter_e 4500 ECB : subscription_change_cb,
4501 : (Datum) 0);
4502 :
2208 peter_e 4503 CBC 312 : if (am_tablesync_worker())
2146 peter_e 4504 GIC 156 : ereport(LOG,
4505 : (errmsg("logical replication table synchronization worker for subscription \"%s\", table \"%s\" has started",
4506 : MySubscription->name,
4507 : get_rel_name(MyLogicalRepWorker->relid))));
4508 : else
4509 156 : ereport(LOG,
4510 : /* translator: first %s is the name of logical replication worker */
4511 : (errmsg("%s for subscription \"%s\" has started",
4512 : get_worker_name(), MySubscription->name)));
4513 :
2271 4514 312 : CommitTransactionCommand();
90 akapila 4515 GNC 312 : }
4516 :
4517 : /* Logical Replication Apply worker entry point */
4518 : void
4519 305 : ApplyWorkerMain(Datum main_arg)
4520 : {
4521 305 : int worker_slot = DatumGetInt32(main_arg);
4522 : char originname[NAMEDATALEN];
4523 305 : XLogRecPtr origin_startpos = InvalidXLogRecPtr;
4524 305 : char *myslotname = NULL;
4525 : WalRcvStreamOptions options;
4526 : int server_version;
4527 :
4528 : /* Attach to slot */
4529 305 : logicalrep_worker_attach(worker_slot);
4530 :
4531 : /* Setup signal handling */
4532 305 : pqsignal(SIGHUP, SignalHandlerForConfigReload);
4533 305 : pqsignal(SIGTERM, die);
4534 305 : BackgroundWorkerUnblockSignals();
4535 :
4536 : /*
4537 : * We don't currently need any ResourceOwner in a walreceiver process, but
4538 : * if we did, we could call CreateAuxProcessResourceOwner here.
4539 : */
4540 :
4541 : /* Initialise stats to a sanish value */
4542 305 : MyLogicalRepWorker->last_send_time = MyLogicalRepWorker->last_recv_time =
4543 305 : MyLogicalRepWorker->reply_time = GetCurrentTimestamp();
4544 :
4545 : /* Load the libpq-specific functions */
4546 305 : load_file("libpqwalreceiver", false);
4547 :
4548 305 : InitializeApplyWorker();
4549 :
4550 : /* Connect to the origin and start the replication. */
2271 peter_e 4551 GIC 302 : elog(DEBUG1, "connecting to publisher using connection string \"%s\"",
2271 peter_e 4552 ECB : MySubscription->conninfo);
4553 :
2208 peter_e 4554 GIC 302 : if (am_tablesync_worker())
4555 : {
391 akapila 4556 156 : start_table_sync(&origin_startpos, &myslotname);
4557 :
180 akapila 4558 GNC 149 : ReplicationOriginNameForLogicalRep(MySubscription->oid,
4559 149 : MyLogicalRepWorker->relid,
4560 : originname,
4561 : sizeof(originname));
90 4562 149 : set_apply_error_context_origin(originname);
2208 peter_e 4563 EUB : }
4564 : else
4565 : {
4566 : /* This is the leader apply worker */
4567 : RepOriginId originid;
2153 bruce 4568 ECB : TimeLineID startpointTLI;
4569 : char *err;
4570 : bool must_use_password;
2208 peter_e 4571 :
2208 peter_e 4572 GIC 146 : myslotname = MySubscription->slotname;
2208 peter_e 4573 ECB :
4574 : /*
2153 peter_e 4575 EUB : * This shouldn't happen if the subscription is enabled, but guard
2126 tgl 4576 : * against DDL bugs or manual catalog changes. (libpqwalreceiver will
4577 : * crash if slot is NULL.)
4578 : */
2153 peter_e 4579 GIC 146 : if (!myslotname)
2153 peter_e 4580 UIC 0 : ereport(ERROR,
666 tgl 4581 ECB : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
4582 : errmsg("subscription has no replication slot set")));
4583 :
4584 : /* Setup replication origin tracking. */
2208 peter_e 4585 GIC 146 : StartTransactionCommand();
180 akapila 4586 GNC 146 : ReplicationOriginNameForLogicalRep(MySubscription->oid, InvalidOid,
4587 : originname, sizeof(originname));
2208 peter_e 4588 GIC 146 : originid = replorigin_by_name(originname, true);
4589 146 : if (!OidIsValid(originid))
2208 peter_e 4590 UIC 0 : originid = replorigin_create(originname);
90 akapila 4591 GNC 146 : replorigin_session_setup(originid, 0);
2208 peter_e 4592 CBC 146 : replorigin_session_origin = originid;
4593 146 : origin_startpos = replorigin_session_get_progress(false);
4594 :
4595 : /* Is the use of a password mandatory? */
10 rhaas 4596 GNC 278 : must_use_password = MySubscription->passwordrequired &&
4597 132 : !superuser_arg(MySubscription->owner);
4598 :
4599 : /* Note that the superuser_arg call can access the DB */
6 rhaas 4600 GIC 146 : CommitTransactionCommand();
4601 :
697 alvherre 4602 146 : LogRepWorkerWalRcvConn = walrcv_connect(MySubscription->conninfo, true,
4603 : must_use_password,
4604 : MySubscription->name, &err);
697 alvherre 4605 CBC 145 : if (LogRepWorkerWalRcvConn == NULL)
2208 peter_e 4606 12 : ereport(ERROR,
4607 : (errcode(ERRCODE_CONNECTION_FAILURE),
4608 : errmsg("could not connect to the publisher: %s", err)));
2208 peter_e 4609 ECB :
2208 peter_e 4610 EUB : /*
4611 : * We don't really use the output identify_system for anything but it
4612 : * does some initializations on the upstream so let's still call it.
4613 : */
697 alvherre 4614 GIC 133 : (void) walrcv_identify_system(LogRepWorkerWalRcvConn, &startpointTLI);
4615 :
90 akapila 4616 GNC 133 : set_apply_error_context_origin(originname);
4617 : }
4618 :
4619 : /*
4620 : * Setup callback for syscache so that we know when something changes in
4621 : * the subscription relation state.
4622 : */
2208 peter_e 4623 GIC 282 : CacheRegisterSyscacheCallback(SUBSCRIPTIONRELMAP,
4624 : invalidate_syncing_table_states,
4625 : (Datum) 0);
4626 :
4627 : /* Build logical replication streaming options. */
2271 4628 282 : options.logical = true;
4629 282 : options.startpoint = origin_startpos;
2208 4630 282 : options.slotname = myslotname;
4631 :
634 akapila 4632 CBC 282 : server_version = walrcv_server_version(LogRepWorkerWalRcvConn);
925 4633 282 : options.proto.logical.proto_version =
90 akapila 4634 GNC 282 : server_version >= 160000 ? LOGICALREP_PROTO_STREAM_PARALLEL_VERSION_NUM :
4635 : server_version >= 150000 ? LOGICALREP_PROTO_TWOPHASE_VERSION_NUM :
4636 : server_version >= 140000 ? LOGICALREP_PROTO_STREAM_VERSION_NUM :
634 akapila 4637 ECB : LOGICALREP_PROTO_VERSION_NUM;
634 akapila 4638 EUB :
2271 peter_e 4639 GIC 282 : options.proto.logical.publication_names = MySubscription->publications;
995 tgl 4640 CBC 282 : options.proto.logical.binary = MySubscription->binary;
4641 :
4642 : /*
4643 : * Assign the appropriate option value for streaming option according to
4644 : * the 'streaming' mode and the publisher's ability to support that mode.
4645 : */
90 akapila 4646 GNC 282 : if (server_version >= 160000 &&
4647 282 : MySubscription->stream == LOGICALREP_STREAM_PARALLEL)
4648 : {
4649 7 : options.proto.logical.streaming_str = "parallel";
4650 7 : MyLogicalRepWorker->parallel_apply = true;
4651 : }
4652 275 : else if (server_version >= 140000 &&
4653 275 : MySubscription->stream != LOGICALREP_STREAM_OFF)
4654 : {
4655 26 : options.proto.logical.streaming_str = "on";
4656 26 : MyLogicalRepWorker->parallel_apply = false;
4657 : }
4658 : else
4659 : {
4660 249 : options.proto.logical.streaming_str = NULL;
4661 249 : MyLogicalRepWorker->parallel_apply = false;
4662 : }
4663 :
634 akapila 4664 GIC 282 : options.proto.logical.twophase = false;
262 akapila 4665 GNC 282 : options.proto.logical.origin = pstrdup(MySubscription->origin);
4666 :
634 akapila 4667 GIC 282 : if (!am_tablesync_worker())
4668 : {
634 akapila 4669 ECB : /*
4670 : * Even when the two_phase mode is requested by the user, it remains
4671 : * as the tri-state PENDING until all tablesyncs have reached READY
4672 : * state. Only then, can it become ENABLED.
634 akapila 4673 EUB : *
4674 : * Note: If the subscription has no tables then leave the state as
634 akapila 4675 ECB : * PENDING, which allows ALTER SUBSCRIPTION ... REFRESH PUBLICATION to
4676 : * work.
4677 : */
634 akapila 4678 CBC 142 : if (MySubscription->twophasestate == LOGICALREP_TWOPHASE_STATE_PENDING &&
634 akapila 4679 GIC 9 : AllTablesyncsReady())
4680 : {
634 akapila 4681 ECB : /* Start streaming with two_phase enabled */
634 akapila 4682 CBC 3 : options.proto.logical.twophase = true;
4683 3 : walrcv_startstreaming(LogRepWorkerWalRcvConn, &options);
2271 peter_e 4684 ECB :
634 akapila 4685 GIC 3 : StartTransactionCommand();
634 akapila 4686 CBC 3 : UpdateTwoPhaseState(MySubscription->oid, LOGICALREP_TWOPHASE_STATE_ENABLED);
4687 3 : MySubscription->twophasestate = LOGICALREP_TWOPHASE_STATE_ENABLED;
634 akapila 4688 GIC 3 : CommitTransactionCommand();
634 akapila 4689 ECB : }
4690 : else
4691 : {
634 akapila 4692 GIC 130 : walrcv_startstreaming(LogRepWorkerWalRcvConn, &options);
634 akapila 4693 ECB : }
4694 :
634 akapila 4695 GIC 133 : ereport(DEBUG1,
4696 : (errmsg_internal("logical replication apply worker for subscription \"%s\" two_phase is %s",
634 akapila 4697 ECB : MySubscription->name,
4698 : MySubscription->twophasestate == LOGICALREP_TWOPHASE_STATE_DISABLED ? "DISABLED" :
4699 : MySubscription->twophasestate == LOGICALREP_TWOPHASE_STATE_PENDING ? "PENDING" :
4700 : MySubscription->twophasestate == LOGICALREP_TWOPHASE_STATE_ENABLED ? "ENABLED" :
4701 : "?")));
4702 : }
4703 : else
4704 : {
4705 : /* Start normal logical streaming replication. */
634 akapila 4706 CBC 149 : walrcv_startstreaming(LogRepWorkerWalRcvConn, &options);
4707 : }
4708 :
4709 : /* Run the main loop. */
391 akapila 4710 GIC 282 : start_apply(origin_startpos);
4711 :
391 akapila 4712 LBC 0 : proc_exit(0);
4713 : }
4714 :
391 akapila 4715 ECB : /*
4716 : * After error recovery, disable the subscription in a new transaction
4717 : * and exit cleanly.
4718 : */
4719 : static void
391 akapila 4720 CBC 4 : DisableSubscriptionAndExit(void)
4721 : {
4722 : /*
4723 : * Emit the error message, and recover from the error state to an idle
4724 : * state
4725 : */
391 akapila 4726 GIC 4 : HOLD_INTERRUPTS();
391 akapila 4727 ECB :
391 akapila 4728 GIC 4 : EmitErrorReport();
391 akapila 4729 CBC 4 : AbortOutOfAnyTransaction();
391 akapila 4730 GIC 4 : FlushErrorState();
4731 :
4732 4 : RESUME_INTERRUPTS();
4733 :
4734 : /* Report the worker failed during either table synchronization or apply */
4735 4 : pgstat_report_subscription_error(MyLogicalRepWorker->subid,
4736 4 : !am_tablesync_worker());
4737 :
4738 : /* Disable the subscription */
391 akapila 4739 GBC 4 : StartTransactionCommand();
391 akapila 4740 GIC 4 : DisableSubscription(MySubscription->oid);
4741 4 : CommitTransactionCommand();
4742 :
4743 : /* Ensure we remove no-longer-useful entry for worker's start time */
77 tgl 4744 GNC 4 : if (!am_tablesync_worker() && !am_parallel_apply_worker())
4745 3 : ApplyLauncherForgetWorkerStartTime(MyLogicalRepWorker->subid);
4746 :
4747 : /* Notify the subscription has been disabled and exit */
391 akapila 4748 GIC 4 : ereport(LOG,
4749 : errmsg("subscription \"%s\" has been disabled because of an error",
4750 : MySubscription->name));
4751 :
2271 peter_e 4752 CBC 4 : proc_exit(0);
2271 peter_e 4753 ECB : }
4754 :
2137 4755 : /*
4756 : * Is current process a logical replication worker?
4757 : */
4758 : bool
2137 peter_e 4759 GIC 3003 : IsLogicalWorker(void)
4760 : {
4761 3003 : return MyLogicalRepWorker != NULL;
2137 peter_e 4762 ECB : }
4763 :
4764 : /*
4765 : * Is current process a logical replication parallel apply worker?
4766 : */
4767 : bool
90 akapila 4768 GNC 2606 : IsLogicalParallelApplyWorker(void)
4769 : {
4770 2606 : return IsLogicalWorker() && am_parallel_apply_worker();
4771 : }
4772 :
4773 : /*
4774 : * Start skipping changes of the transaction if the given LSN matches the
383 akapila 4775 ECB : * LSN specified by subscription's skiplsn.
4776 : */
4777 : static void
383 akapila 4778 CBC 421 : maybe_start_skipping_changes(XLogRecPtr finish_lsn)
383 akapila 4779 ECB : {
383 akapila 4780 GIC 421 : Assert(!is_skipping_changes());
4781 421 : Assert(!in_remote_transaction);
383 akapila 4782 CBC 421 : Assert(!in_streamed_transaction);
4783 :
383 akapila 4784 ECB : /*
4785 : * Quick return if it's not requested to skip this transaction. This
4786 : * function is called for every remote transaction and we assume that
4787 : * skipping the transaction is not used often.
4788 : */
383 akapila 4789 CBC 421 : if (likely(XLogRecPtrIsInvalid(MySubscription->skiplsn) ||
4790 : MySubscription->skiplsn != finish_lsn))
4791 418 : return;
4792 :
4793 : /* Start skipping all changes of this transaction */
383 akapila 4794 GIC 3 : skip_xact_finish_lsn = finish_lsn;
4795 :
4796 3 : ereport(LOG,
214 alvherre 4797 ECB : errmsg("logical replication starts skipping transaction at LSN %X/%X",
4798 : LSN_FORMAT_ARGS(skip_xact_finish_lsn)));
383 akapila 4799 EUB : }
4800 :
4801 : /*
4802 : * Stop skipping changes by resetting skip_xact_finish_lsn if enabled.
4803 : */
4804 : static void
383 akapila 4805 GBC 26 : stop_skipping_changes(void)
383 akapila 4806 EUB : {
383 akapila 4807 GBC 26 : if (!is_skipping_changes())
383 akapila 4808 GIC 23 : return;
4809 :
4810 3 : ereport(LOG,
214 alvherre 4811 ECB : (errmsg("logical replication completed skipping transaction at LSN %X/%X",
4812 : LSN_FORMAT_ARGS(skip_xact_finish_lsn))));
383 akapila 4813 :
4814 : /* Stop skipping changes */
383 akapila 4815 GIC 3 : skip_xact_finish_lsn = InvalidXLogRecPtr;
4816 : }
4817 :
383 akapila 4818 ECB : /*
4819 : * Clear subskiplsn of pg_subscription catalog.
4820 : *
4821 : * finish_lsn is the transaction's finish LSN that is used to check if the
4822 : * subskiplsn matches it. If not matched, we raise a warning when clearing the
4823 : * subskiplsn in order to inform users for cases e.g., where the user mistakenly
4824 : * specified the wrong subskiplsn.
4825 : */
4826 : static void
383 akapila 4827 GIC 443 : clear_subscription_skip_lsn(XLogRecPtr finish_lsn)
4828 : {
4829 : Relation rel;
4830 : Form_pg_subscription subform;
4831 : HeapTuple tup;
4832 443 : XLogRecPtr myskiplsn = MySubscription->skiplsn;
4833 443 : bool started_tx = false;
383 akapila 4834 ECB :
90 akapila 4835 GNC 443 : if (likely(XLogRecPtrIsInvalid(myskiplsn)) || am_parallel_apply_worker())
383 akapila 4836 CBC 440 : return;
383 akapila 4837 ECB :
383 akapila 4838 CBC 3 : if (!IsTransactionState())
383 akapila 4839 ECB : {
383 akapila 4840 CBC 1 : StartTransactionCommand();
4841 1 : started_tx = true;
4842 : }
383 akapila 4843 ECB :
383 akapila 4844 EUB : /*
4845 : * Protect subskiplsn of pg_subscription from being concurrently updated
4846 : * while clearing it.
4847 : */
383 akapila 4848 CBC 3 : LockSharedObject(SubscriptionRelationId, MySubscription->oid, 0,
4849 : AccessShareLock);
4850 :
383 akapila 4851 GIC 3 : rel = table_open(SubscriptionRelationId, RowExclusiveLock);
4852 :
383 akapila 4853 ECB : /* Fetch the existing tuple. */
383 akapila 4854 GIC 3 : tup = SearchSysCacheCopy1(SUBSCRIPTIONOID,
4855 : ObjectIdGetDatum(MySubscription->oid));
4856 :
383 akapila 4857 CBC 3 : if (!HeapTupleIsValid(tup))
383 akapila 4858 UIC 0 : elog(ERROR, "subscription \"%s\" does not exist", MySubscription->name);
383 akapila 4859 EUB :
383 akapila 4860 GIC 3 : subform = (Form_pg_subscription) GETSTRUCT(tup);
4861 :
4862 : /*
4863 : * Clear the subskiplsn. If the user has already changed subskiplsn before
383 akapila 4864 ECB : * clearing it we don't update the catalog and the replication origin
4865 : * state won't get advanced. So in the worst case, if the server crashes
4866 : * before sending an acknowledgment of the flush position the transaction
4867 : * will be sent again and the user needs to set subskiplsn again. We can
4868 : * reduce the possibility by logging a replication origin WAL record to
4869 : * advance the origin LSN instead but there is no way to advance the
4870 : * origin timestamp and it doesn't seem to be worth doing anything about
4871 : * it since it's a very rare case.
4872 : */
383 akapila 4873 CBC 3 : if (subform->subskiplsn == myskiplsn)
383 akapila 4874 ECB : {
4875 : bool nulls[Natts_pg_subscription];
4876 : bool replaces[Natts_pg_subscription];
4877 : Datum values[Natts_pg_subscription];
4878 :
383 akapila 4879 GIC 3 : memset(values, 0, sizeof(values));
4880 3 : memset(nulls, false, sizeof(nulls));
4881 3 : memset(replaces, false, sizeof(replaces));
4882 :
383 akapila 4883 ECB : /* reset subskiplsn */
383 akapila 4884 GIC 3 : values[Anum_pg_subscription_subskiplsn - 1] = LSNGetDatum(InvalidXLogRecPtr);
383 akapila 4885 CBC 3 : replaces[Anum_pg_subscription_subskiplsn - 1] = true;
383 akapila 4886 ECB :
383 akapila 4887 GIC 3 : tup = heap_modify_tuple(tup, RelationGetDescr(rel), values, nulls,
4888 : replaces);
4889 3 : CatalogTupleUpdate(rel, &tup->t_self, tup);
4890 :
4891 3 : if (myskiplsn != finish_lsn)
383 akapila 4892 UIC 0 : ereport(WARNING,
4893 : errmsg("skip-LSN of subscription \"%s\" cleared", MySubscription->name),
4894 : errdetail("Remote transaction's finish WAL location (LSN) %X/%X did not match skip-LSN %X/%X.",
4895 : LSN_FORMAT_ARGS(finish_lsn),
4896 : LSN_FORMAT_ARGS(myskiplsn)));
4897 : }
383 akapila 4898 ECB :
383 akapila 4899 GIC 3 : heap_freetuple(tup);
4900 3 : table_close(rel, NoLock);
4901 :
4902 3 : if (started_tx)
4903 1 : CommitTransactionCommand();
383 akapila 4904 ECB : }
4905 :
4906 : /* Error callback to give more context info about the change being applied */
4907 : void
590 akapila 4908 GIC 599 : apply_error_callback(void *arg)
4909 : {
590 akapila 4910 CBC 599 : ApplyErrorCallbackArg *errarg = &apply_error_callback_arg;
4911 :
4912 599 : if (apply_error_callback_arg.command == 0)
4913 261 : return;
4914 :
397 4915 338 : Assert(errarg->origin_name);
4916 :
398 akapila 4917 GIC 338 : if (errarg->rel == NULL)
4918 : {
4919 309 : if (!TransactionIdIsValid(errarg->remote_xid))
197 peter 4920 UIC 0 : errcontext("processing remote data for replication origin \"%s\" during message type \"%s\"",
4921 : errarg->origin_name,
398 akapila 4922 ECB : logicalrep_message_type(errarg->command));
397 akapila 4923 GIC 309 : else if (XLogRecPtrIsInvalid(errarg->finish_lsn))
197 peter 4924 CBC 257 : errcontext("processing remote data for replication origin \"%s\" during message type \"%s\" in transaction %u",
397 akapila 4925 ECB : errarg->origin_name,
4926 : logicalrep_message_type(errarg->command),
398 4927 : errarg->remote_xid);
4928 : else
197 peter 4929 GIC 104 : errcontext("processing remote data for replication origin \"%s\" during message type \"%s\" in transaction %u, finished at %X/%X",
397 akapila 4930 ECB : errarg->origin_name,
4931 : logicalrep_message_type(errarg->command),
4932 : errarg->remote_xid,
397 akapila 4933 CBC 52 : LSN_FORMAT_ARGS(errarg->finish_lsn));
4934 : }
4935 : else
4936 : {
90 akapila 4937 GNC 29 : if (errarg->remote_attnum < 0)
4938 : {
4939 29 : if (XLogRecPtrIsInvalid(errarg->finish_lsn))
4940 2 : errcontext("processing remote data for replication origin \"%s\" during message type \"%s\" for replication target relation \"%s.%s\" in transaction %u",
4941 : errarg->origin_name,
4942 : logicalrep_message_type(errarg->command),
4943 1 : errarg->rel->remoterel.nspname,
4944 1 : errarg->rel->remoterel.relname,
4945 : errarg->remote_xid);
4946 : else
4947 56 : errcontext("processing remote data for replication origin \"%s\" during message type \"%s\" for replication target relation \"%s.%s\" in transaction %u, finished at %X/%X",
4948 : errarg->origin_name,
4949 : logicalrep_message_type(errarg->command),
4950 28 : errarg->rel->remoterel.nspname,
4951 28 : errarg->rel->remoterel.relname,
4952 : errarg->remote_xid,
4953 28 : LSN_FORMAT_ARGS(errarg->finish_lsn));
4954 : }
4955 : else
4956 : {
90 akapila 4957 UNC 0 : if (XLogRecPtrIsInvalid(errarg->finish_lsn))
4958 0 : errcontext("processing remote data for replication origin \"%s\" during message type \"%s\" for replication target relation \"%s.%s\" column \"%s\" in transaction %u",
4959 : errarg->origin_name,
4960 : logicalrep_message_type(errarg->command),
4961 0 : errarg->rel->remoterel.nspname,
4962 0 : errarg->rel->remoterel.relname,
4963 0 : errarg->rel->remoterel.attnames[errarg->remote_attnum],
4964 : errarg->remote_xid);
4965 : else
4966 0 : errcontext("processing remote data for replication origin \"%s\" during message type \"%s\" for replication target relation \"%s.%s\" column \"%s\" in transaction %u, finished at %X/%X",
4967 : errarg->origin_name,
4968 : logicalrep_message_type(errarg->command),
4969 0 : errarg->rel->remoterel.nspname,
4970 0 : errarg->rel->remoterel.relname,
4971 0 : errarg->rel->remoterel.attnames[errarg->remote_attnum],
4972 : errarg->remote_xid,
4973 0 : LSN_FORMAT_ARGS(errarg->finish_lsn));
4974 : }
4975 : }
4976 : }
4977 :
590 akapila 4978 ECB : /* Set transaction information of apply error callback */
4979 : static inline void
397 akapila 4980 CBC 2695 : set_apply_error_context_xact(TransactionId xid, XLogRecPtr lsn)
4981 : {
590 akapila 4982 GIC 2695 : apply_error_callback_arg.remote_xid = xid;
397 4983 2695 : apply_error_callback_arg.finish_lsn = lsn;
590 4984 2695 : }
4985 :
590 akapila 4986 ECB : /* Reset all information of apply error callback */
4987 : static inline void
590 akapila 4988 GIC 1332 : reset_apply_error_context_info(void)
590 akapila 4989 ECB : {
590 akapila 4990 CBC 1332 : apply_error_callback_arg.command = 0;
590 akapila 4991 GIC 1332 : apply_error_callback_arg.rel = NULL;
4992 1332 : apply_error_callback_arg.remote_attnum = -1;
397 akapila 4993 CBC 1332 : set_apply_error_context_xact(InvalidTransactionId, InvalidXLogRecPtr);
590 akapila 4994 GIC 1332 : }
4995 :
4996 : /*
4997 : * Request wakeup of the workers for the given subscription OID
4998 : * at commit of the current transaction.
4999 : *
5000 : * This is used to ensure that the workers process assorted changes
5001 : * as soon as possible.
5002 : */
5003 : void
93 tgl 5004 GNC 160 : LogicalRepWorkersWakeupAtCommit(Oid subid)
5005 : {
5006 : MemoryContext oldcxt;
5007 :
5008 160 : oldcxt = MemoryContextSwitchTo(TopTransactionContext);
5009 160 : on_commit_wakeup_workers_subids =
5010 160 : list_append_unique_oid(on_commit_wakeup_workers_subids, subid);
5011 160 : MemoryContextSwitchTo(oldcxt);
5012 160 : }
5013 :
5014 : /*
5015 : * Wake up the workers of any subscriptions that were changed in this xact.
5016 : */
5017 : void
5018 485916 : AtEOXact_LogicalRepWorkers(bool isCommit)
5019 : {
5020 485916 : if (isCommit && on_commit_wakeup_workers_subids != NIL)
5021 : {
5022 : ListCell *lc;
5023 :
5024 156 : LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
5025 312 : foreach(lc, on_commit_wakeup_workers_subids)
5026 : {
5027 156 : Oid subid = lfirst_oid(lc);
5028 : List *workers;
5029 : ListCell *lc2;
5030 :
5031 156 : workers = logicalrep_workers_find(subid, true);
5032 207 : foreach(lc2, workers)
5033 : {
5034 51 : LogicalRepWorker *worker = (LogicalRepWorker *) lfirst(lc2);
5035 :
5036 51 : logicalrep_worker_wakeup_ptr(worker);
5037 : }
5038 : }
5039 156 : LWLockRelease(LogicalRepWorkerLock);
5040 : }
5041 :
5042 : /* The List storage will be reclaimed automatically in xact cleanup. */
5043 485916 : on_commit_wakeup_workers_subids = NIL;
5044 485916 : }
5045 :
5046 : /*
5047 : * Allocate the origin name in long-lived context for error context message.
5048 : */
5049 : void
90 akapila 5050 292 : set_apply_error_context_origin(char *originname)
5051 : {
5052 292 : apply_error_callback_arg.origin_name = MemoryContextStrdup(ApplyContext,
5053 : originname);
5054 292 : }
5055 :
5056 : /*
5057 : * Return the action to be taken for the given transaction. See
5058 : * TransApplyAction for information on each of the actions.
5059 : *
5060 : * *winfo is assigned to the destination parallel worker info when the leader
5061 : * apply worker has to pass all the transaction's changes to the parallel
5062 : * apply worker.
5063 : */
5064 : static TransApplyAction
5065 325903 : get_transaction_apply_action(TransactionId xid, ParallelApplyWorkerInfo **winfo)
5066 : {
5067 325903 : *winfo = NULL;
5068 :
5069 325903 : if (am_parallel_apply_worker())
5070 : {
5071 68910 : return TRANS_PARALLEL_APPLY;
5072 : }
5073 :
5074 : /*
5075 : * If we are processing this transaction using a parallel apply worker then
5076 : * either we send the changes to the parallel worker or if the worker is busy
5077 : * then serialize the changes to the file which will later be processed by
5078 : * the parallel worker.
5079 : */
5080 256993 : *winfo = pa_find_worker(xid);
5081 :
82 5082 256993 : if (*winfo && (*winfo)->serialize_changes)
5083 : {
5084 5037 : return TRANS_LEADER_PARTIAL_SERIALIZE;
5085 : }
5086 251956 : else if (*winfo)
5087 : {
5088 68894 : return TRANS_LEADER_SEND_TO_PARALLEL;
5089 : }
5090 :
5091 : /*
5092 : * If there is no parallel worker involved to process this transaction then
5093 : * we either directly apply the change or serialize it to a file which will
5094 : * later be applied when the transaction finish message is processed.
5095 : */
5096 183062 : else if (in_streamed_transaction)
5097 : {
5098 103195 : return TRANS_LEADER_SERIALIZE;
5099 : }
5100 : else
5101 : {
5102 79867 : return TRANS_LEADER_APPLY;
5103 : }
5104 : }
|