Age Owner Branch data TLA Line data Source code
1 : : /*-------------------------------------------------------------------------
2 : : * worker.c
3 : : * PostgreSQL logical replication worker (apply)
4 : : *
5 : : * Copyright (c) 2016-2024, PostgreSQL Global Development Group
6 : : *
7 : : * IDENTIFICATION
8 : : * src/backend/replication/logical/worker.c
9 : : *
10 : : * NOTES
11 : : * This file contains the worker which applies logical changes as they come
12 : : * from remote logical replication stream.
13 : : *
14 : : * The main worker (apply) is started by logical replication worker
15 : : * launcher for every enabled subscription in a database. It uses
16 : : * walsender protocol to communicate with publisher.
17 : : *
18 : : * This module includes server facing code and shares libpqwalreceiver
19 : : * module with walreceiver for providing the libpq specific functionality.
20 : : *
21 : : *
22 : : * STREAMED TRANSACTIONS
23 : : * ---------------------
24 : : * Streamed transactions (large transactions exceeding a memory limit on the
25 : : * upstream) are applied using one of two approaches:
26 : : *
27 : : * 1) Write to temporary files and apply when the final commit arrives
28 : : *
29 : : * This approach is used when the user has set the subscription's streaming
30 : : * option as on.
31 : : *
32 : : * Unlike the regular (non-streamed) case, handling streamed transactions has
33 : : * to handle aborts of both the toplevel transaction and subtransactions. This
34 : : * is achieved by tracking offsets for subtransactions, which is then used
35 : : * to truncate the file with serialized changes.
36 : : *
37 : : * The files are placed in tmp file directory by default, and the filenames
38 : : * include both the XID of the toplevel transaction and OID of the
39 : : * subscription. This is necessary so that different workers processing a
40 : : * remote transaction with the same XID doesn't interfere.
41 : : *
42 : : * We use BufFiles instead of using normal temporary files because (a) the
43 : : * BufFile infrastructure supports temporary files that exceed the OS file size
44 : : * limit, (b) provides a way for automatic clean up on the error and (c) provides
45 : : * a way to survive these files across local transactions and allow to open and
46 : : * close at stream start and close. We decided to use FileSet
47 : : * infrastructure as without that it deletes the files on the closure of the
48 : : * file and if we decide to keep stream files open across the start/stop stream
49 : : * then it will consume a lot of memory (more than 8K for each BufFile and
50 : : * there could be multiple such BufFiles as the subscriber could receive
51 : : * multiple start/stop streams for different transactions before getting the
52 : : * commit). Moreover, if we don't use FileSet then we also need to invent
53 : : * a new way to pass filenames to BufFile APIs so that we are allowed to open
54 : : * the file we desired across multiple stream-open calls for the same
55 : : * transaction.
56 : : *
57 : : * 2) Parallel apply workers.
58 : : *
59 : : * This approach is used when the user has set the subscription's streaming
60 : : * option as parallel. See logical/applyparallelworker.c for information about
61 : : * this approach.
62 : : *
63 : : * TWO_PHASE TRANSACTIONS
64 : : * ----------------------
65 : : * Two phase transactions are replayed at prepare and then committed or
66 : : * rolled back at commit prepared and rollback prepared respectively. It is
67 : : * possible to have a prepared transaction that arrives at the apply worker
68 : : * when the tablesync is busy doing the initial copy. In this case, the apply
69 : : * worker skips all the prepared operations [e.g. inserts] while the tablesync
70 : : * is still busy (see the condition of should_apply_changes_for_rel). The
71 : : * tablesync worker might not get such a prepared transaction because say it
72 : : * was prior to the initial consistent point but might have got some later
73 : : * commits. Now, the tablesync worker will exit without doing anything for the
74 : : * prepared transaction skipped by the apply worker as the sync location for it
75 : : * will be already ahead of the apply worker's current location. This would lead
76 : : * to an "empty prepare", because later when the apply worker does the commit
77 : : * prepare, there is nothing in it (the inserts were skipped earlier).
78 : : *
79 : : * To avoid this, and similar prepare confusions the subscription's two_phase
80 : : * commit is enabled only after the initial sync is over. The two_phase option
81 : : * has been implemented as a tri-state with values DISABLED, PENDING, and
82 : : * ENABLED.
83 : : *
84 : : * Even if the user specifies they want a subscription with two_phase = on,
85 : : * internally it will start with a tri-state of PENDING which only becomes
86 : : * ENABLED after all tablesync initializations are completed - i.e. when all
87 : : * tablesync workers have reached their READY state. In other words, the value
88 : : * PENDING is only a temporary state for subscription start-up.
89 : : *
90 : : * Until the two_phase is properly available (ENABLED) the subscription will
91 : : * behave as if two_phase = off. When the apply worker detects that all
92 : : * tablesyncs have become READY (while the tri-state was PENDING) it will
93 : : * restart the apply worker process. This happens in
94 : : * process_syncing_tables_for_apply.
95 : : *
96 : : * When the (re-started) apply worker finds that all tablesyncs are READY for a
97 : : * two_phase tri-state of PENDING it start streaming messages with the
98 : : * two_phase option which in turn enables the decoding of two-phase commits at
99 : : * the publisher. Then, it updates the tri-state value from PENDING to ENABLED.
100 : : * Now, it is possible that during the time we have not enabled two_phase, the
101 : : * publisher (replication server) would have skipped some prepares but we
102 : : * ensure that such prepares are sent along with commit prepare, see
103 : : * ReorderBufferFinishPrepared.
104 : : *
105 : : * If the subscription has no tables then a two_phase tri-state PENDING is
106 : : * left unchanged. This lets the user still do an ALTER SUBSCRIPTION REFRESH
107 : : * PUBLICATION which might otherwise be disallowed (see below).
108 : : *
109 : : * If ever a user needs to be aware of the tri-state value, they can fetch it
110 : : * from the pg_subscription catalog (see column subtwophasestate).
111 : : *
112 : : * We don't allow to toggle two_phase option of a subscription because it can
113 : : * lead to an inconsistent replica. Consider, initially, it was on and we have
114 : : * received some prepare then we turn it off, now at commit time the server
115 : : * will send the entire transaction data along with the commit. With some more
116 : : * analysis, we can allow changing this option from off to on but not sure if
117 : : * that alone would be useful.
118 : : *
119 : : * Finally, to avoid problems mentioned in previous paragraphs from any
120 : : * subsequent (not READY) tablesyncs (need to toggle two_phase option from 'on'
121 : : * to 'off' and then again back to 'on') there is a restriction for
122 : : * ALTER SUBSCRIPTION REFRESH PUBLICATION. This command is not permitted when
123 : : * the two_phase tri-state is ENABLED, except when copy_data = false.
124 : : *
125 : : * We can get prepare of the same GID more than once for the genuine cases
126 : : * where we have defined multiple subscriptions for publications on the same
127 : : * server and prepared transaction has operations on tables subscribed to those
128 : : * subscriptions. For such cases, if we use the GID sent by publisher one of
129 : : * the prepares will be successful and others will fail, in which case the
130 : : * server will send them again. Now, this can lead to a deadlock if user has
131 : : * set synchronous_standby_names for all the subscriptions on subscriber. To
132 : : * avoid such deadlocks, we generate a unique GID (consisting of the
133 : : * subscription oid and the xid of the prepared transaction) for each prepare
134 : : * transaction on the subscriber.
135 : : *
136 : : * FAILOVER
137 : : * ----------------------
138 : : * The logical slot on the primary can be synced to the standby by specifying
139 : : * failover = true when creating the subscription. Enabling failover allows us
140 : : * to smoothly transition to the promoted standby, ensuring that we can
141 : : * subscribe to the new primary without losing any data.
142 : : *-------------------------------------------------------------------------
143 : : */
144 : :
145 : : #include "postgres.h"
146 : :
147 : : #include <sys/stat.h>
148 : : #include <unistd.h>
149 : :
150 : : #include "access/table.h"
151 : : #include "access/tableam.h"
152 : : #include "access/twophase.h"
153 : : #include "access/xact.h"
154 : : #include "catalog/indexing.h"
155 : : #include "catalog/pg_inherits.h"
156 : : #include "catalog/pg_subscription.h"
157 : : #include "catalog/pg_subscription_rel.h"
158 : : #include "commands/tablecmds.h"
159 : : #include "commands/trigger.h"
160 : : #include "executor/executor.h"
161 : : #include "executor/execPartition.h"
162 : : #include "libpq/pqformat.h"
163 : : #include "miscadmin.h"
164 : : #include "optimizer/optimizer.h"
165 : : #include "parser/parse_relation.h"
166 : : #include "pgstat.h"
167 : : #include "postmaster/bgworker.h"
168 : : #include "postmaster/interrupt.h"
169 : : #include "postmaster/walwriter.h"
170 : : #include "replication/logicallauncher.h"
171 : : #include "replication/logicalproto.h"
172 : : #include "replication/logicalrelation.h"
173 : : #include "replication/logicalworker.h"
174 : : #include "replication/origin.h"
175 : : #include "replication/walreceiver.h"
176 : : #include "replication/worker_internal.h"
177 : : #include "rewrite/rewriteHandler.h"
178 : : #include "storage/buffile.h"
179 : : #include "storage/ipc.h"
180 : : #include "storage/lmgr.h"
181 : : #include "tcop/tcopprot.h"
182 : : #include "utils/acl.h"
183 : : #include "utils/dynahash.h"
184 : : #include "utils/guc.h"
185 : : #include "utils/inval.h"
186 : : #include "utils/lsyscache.h"
187 : : #include "utils/memutils.h"
188 : : #include "utils/pg_lsn.h"
189 : : #include "utils/rel.h"
190 : : #include "utils/rls.h"
191 : : #include "utils/snapmgr.h"
192 : : #include "utils/syscache.h"
193 : : #include "utils/usercontext.h"
194 : :
195 : : #define NAPTIME_PER_CYCLE 1000 /* max sleep time between cycles (1s) */
196 : :
197 : : typedef struct FlushPosition
198 : : {
199 : : dlist_node node;
200 : : XLogRecPtr local_end;
201 : : XLogRecPtr remote_end;
202 : : } FlushPosition;
203 : :
204 : : static dlist_head lsn_mapping = DLIST_STATIC_INIT(lsn_mapping);
205 : :
206 : : typedef struct ApplyExecutionData
207 : : {
208 : : EState *estate; /* executor state, used to track resources */
209 : :
210 : : LogicalRepRelMapEntry *targetRel; /* replication target rel */
211 : : ResultRelInfo *targetRelInfo; /* ResultRelInfo for same */
212 : :
213 : : /* These fields are used when the target relation is partitioned: */
214 : : ModifyTableState *mtstate; /* dummy ModifyTable state */
215 : : PartitionTupleRouting *proute; /* partition routing info */
216 : : } ApplyExecutionData;
217 : :
218 : : /* Struct for saving and restoring apply errcontext information */
219 : : typedef struct ApplyErrorCallbackArg
220 : : {
221 : : LogicalRepMsgType command; /* 0 if invalid */
222 : : LogicalRepRelMapEntry *rel;
223 : :
224 : : /* Remote node information */
225 : : int remote_attnum; /* -1 if invalid */
226 : : TransactionId remote_xid;
227 : : XLogRecPtr finish_lsn;
228 : : char *origin_name;
229 : : } ApplyErrorCallbackArg;
230 : :
231 : : /*
232 : : * The action to be taken for the changes in the transaction.
233 : : *
234 : : * TRANS_LEADER_APPLY:
235 : : * This action means that we are in the leader apply worker or table sync
236 : : * worker. The changes of the transaction are either directly applied or
237 : : * are read from temporary files (for streaming transactions) and then
238 : : * applied by the worker.
239 : : *
240 : : * TRANS_LEADER_SERIALIZE:
241 : : * This action means that we are in the leader apply worker or table sync
242 : : * worker. Changes are written to temporary files and then applied when the
243 : : * final commit arrives.
244 : : *
245 : : * TRANS_LEADER_SEND_TO_PARALLEL:
246 : : * This action means that we are in the leader apply worker and need to send
247 : : * the changes to the parallel apply worker.
248 : : *
249 : : * TRANS_LEADER_PARTIAL_SERIALIZE:
250 : : * This action means that we are in the leader apply worker and have sent some
251 : : * changes directly to the parallel apply worker and the remaining changes are
252 : : * serialized to a file, due to timeout while sending data. The parallel apply
253 : : * worker will apply these serialized changes when the final commit arrives.
254 : : *
255 : : * We can't use TRANS_LEADER_SERIALIZE for this case because, in addition to
256 : : * serializing changes, the leader worker also needs to serialize the
257 : : * STREAM_XXX message to a file, and wait for the parallel apply worker to
258 : : * finish the transaction when processing the transaction finish command. So
259 : : * this new action was introduced to keep the code and logic clear.
260 : : *
261 : : * TRANS_PARALLEL_APPLY:
262 : : * This action means that we are in the parallel apply worker and changes of
263 : : * the transaction are applied directly by the worker.
264 : : */
265 : : typedef enum
266 : : {
267 : : /* The action for non-streaming transactions. */
268 : : TRANS_LEADER_APPLY,
269 : :
270 : : /* Actions for streaming transactions. */
271 : : TRANS_LEADER_SERIALIZE,
272 : : TRANS_LEADER_SEND_TO_PARALLEL,
273 : : TRANS_LEADER_PARTIAL_SERIALIZE,
274 : : TRANS_PARALLEL_APPLY,
275 : : } TransApplyAction;
276 : :
277 : : /* errcontext tracker */
278 : : ApplyErrorCallbackArg apply_error_callback_arg =
279 : : {
280 : : .command = 0,
281 : : .rel = NULL,
282 : : .remote_attnum = -1,
283 : : .remote_xid = InvalidTransactionId,
284 : : .finish_lsn = InvalidXLogRecPtr,
285 : : .origin_name = NULL,
286 : : };
287 : :
288 : : ErrorContextCallback *apply_error_context_stack = NULL;
289 : :
290 : : MemoryContext ApplyMessageContext = NULL;
291 : : MemoryContext ApplyContext = NULL;
292 : :
293 : : /* per stream context for streaming transactions */
294 : : static MemoryContext LogicalStreamingContext = NULL;
295 : :
296 : : WalReceiverConn *LogRepWorkerWalRcvConn = NULL;
297 : :
298 : : Subscription *MySubscription = NULL;
299 : : static bool MySubscriptionValid = false;
300 : :
301 : : static List *on_commit_wakeup_workers_subids = NIL;
302 : :
303 : : bool in_remote_transaction = false;
304 : : static XLogRecPtr remote_final_lsn = InvalidXLogRecPtr;
305 : :
306 : : /* fields valid only when processing streamed transaction */
307 : : static bool in_streamed_transaction = false;
308 : :
309 : : static TransactionId stream_xid = InvalidTransactionId;
310 : :
311 : : /*
312 : : * The number of changes applied by parallel apply worker during one streaming
313 : : * block.
314 : : */
315 : : static uint32 parallel_stream_nchanges = 0;
316 : :
317 : : /* Are we initializing an apply worker? */
318 : : bool InitializingApplyWorker = false;
319 : :
320 : : /*
321 : : * We enable skipping all data modification changes (INSERT, UPDATE, etc.) for
322 : : * the subscription if the remote transaction's finish LSN matches the subskiplsn.
323 : : * Once we start skipping changes, we don't stop it until we skip all changes of
324 : : * the transaction even if pg_subscription is updated and MySubscription->skiplsn
325 : : * gets changed or reset during that. Also, in streaming transaction cases (streaming = on),
326 : : * we don't skip receiving and spooling the changes since we decide whether or not
327 : : * to skip applying the changes when starting to apply changes. The subskiplsn is
328 : : * cleared after successfully skipping the transaction or applying non-empty
329 : : * transaction. The latter prevents the mistakenly specified subskiplsn from
330 : : * being left. Note that we cannot skip the streaming transactions when using
331 : : * parallel apply workers because we cannot get the finish LSN before applying
332 : : * the changes. So, we don't start parallel apply worker when finish LSN is set
333 : : * by the user.
334 : : */
335 : : static XLogRecPtr skip_xact_finish_lsn = InvalidXLogRecPtr;
336 : : #define is_skipping_changes() (unlikely(!XLogRecPtrIsInvalid(skip_xact_finish_lsn)))
337 : :
338 : : /* BufFile handle of the current streaming file */
339 : : static BufFile *stream_fd = NULL;
340 : :
341 : : typedef struct SubXactInfo
342 : : {
343 : : TransactionId xid; /* XID of the subxact */
344 : : int fileno; /* file number in the buffile */
345 : : off_t offset; /* offset in the file */
346 : : } SubXactInfo;
347 : :
348 : : /* Sub-transaction data for the current streaming transaction */
349 : : typedef struct ApplySubXactData
350 : : {
351 : : uint32 nsubxacts; /* number of sub-transactions */
352 : : uint32 nsubxacts_max; /* current capacity of subxacts */
353 : : TransactionId subxact_last; /* xid of the last sub-transaction */
354 : : SubXactInfo *subxacts; /* sub-xact offset in changes file */
355 : : } ApplySubXactData;
356 : :
357 : : static ApplySubXactData subxact_data = {0, 0, InvalidTransactionId, NULL};
358 : :
359 : : static inline void subxact_filename(char *path, Oid subid, TransactionId xid);
360 : : static inline void changes_filename(char *path, Oid subid, TransactionId xid);
361 : :
362 : : /*
363 : : * Information about subtransactions of a given toplevel transaction.
364 : : */
365 : : static void subxact_info_write(Oid subid, TransactionId xid);
366 : : static void subxact_info_read(Oid subid, TransactionId xid);
367 : : static void subxact_info_add(TransactionId xid);
368 : : static inline void cleanup_subxact_info(void);
369 : :
370 : : /*
371 : : * Serialize and deserialize changes for a toplevel transaction.
372 : : */
373 : : static void stream_open_file(Oid subid, TransactionId xid,
374 : : bool first_segment);
375 : : static void stream_write_change(char action, StringInfo s);
376 : : static void stream_open_and_write_change(TransactionId xid, char action, StringInfo s);
377 : : static void stream_close_file(void);
378 : :
379 : : static void send_feedback(XLogRecPtr recvpos, bool force, bool requestReply);
380 : :
381 : : static void apply_handle_commit_internal(LogicalRepCommitData *commit_data);
382 : : static void apply_handle_insert_internal(ApplyExecutionData *edata,
383 : : ResultRelInfo *relinfo,
384 : : TupleTableSlot *remoteslot);
385 : : static void apply_handle_update_internal(ApplyExecutionData *edata,
386 : : ResultRelInfo *relinfo,
387 : : TupleTableSlot *remoteslot,
388 : : LogicalRepTupleData *newtup,
389 : : Oid localindexoid);
390 : : static void apply_handle_delete_internal(ApplyExecutionData *edata,
391 : : ResultRelInfo *relinfo,
392 : : TupleTableSlot *remoteslot,
393 : : Oid localindexoid);
394 : : static bool FindReplTupleInLocalRel(ApplyExecutionData *edata, Relation localrel,
395 : : LogicalRepRelation *remoterel,
396 : : Oid localidxoid,
397 : : TupleTableSlot *remoteslot,
398 : : TupleTableSlot **localslot);
399 : : static void apply_handle_tuple_routing(ApplyExecutionData *edata,
400 : : TupleTableSlot *remoteslot,
401 : : LogicalRepTupleData *newtup,
402 : : CmdType operation);
403 : :
404 : : /* Compute GID for two_phase transactions */
405 : : static void TwoPhaseTransactionGid(Oid subid, TransactionId xid, char *gid, int szgid);
406 : :
407 : : /* Functions for skipping changes */
408 : : static void maybe_start_skipping_changes(XLogRecPtr finish_lsn);
409 : : static void stop_skipping_changes(void);
410 : : static void clear_subscription_skip_lsn(XLogRecPtr finish_lsn);
411 : :
412 : : /* Functions for apply error callback */
413 : : static inline void set_apply_error_context_xact(TransactionId xid, XLogRecPtr lsn);
414 : : static inline void reset_apply_error_context_info(void);
415 : :
416 : : static TransApplyAction get_transaction_apply_action(TransactionId xid,
417 : : ParallelApplyWorkerInfo **winfo);
418 : :
419 : : /*
420 : : * Form the origin name for the subscription.
421 : : *
422 : : * This is a common function for tablesync and other workers. Tablesync workers
423 : : * must pass a valid relid. Other callers must pass relid = InvalidOid.
424 : : *
425 : : * Return the name in the supplied buffer.
426 : : */
427 : : void
551 akapila@postgresql.o 428 :CBC 1163 : ReplicationOriginNameForLogicalRep(Oid suboid, Oid relid,
429 : : char *originname, Size szoriginname)
430 : : {
431 [ + + ]: 1163 : if (OidIsValid(relid))
432 : : {
433 : : /* Replication origin name for tablesync workers. */
434 : 650 : snprintf(originname, szoriginname, "pg_%u_%u", suboid, relid);
435 : : }
436 : : else
437 : : {
438 : : /* Replication origin name for non-tablesync workers. */
439 : 513 : snprintf(originname, szoriginname, "pg_%u", suboid);
440 : : }
441 : 1163 : }
442 : :
443 : : /*
444 : : * Should this worker apply changes for given relation.
445 : : *
446 : : * This is mainly needed for initial relation data sync as that runs in
447 : : * separate worker process running in parallel and we need some way to skip
448 : : * changes coming to the leader apply worker during the sync of a table.
449 : : *
450 : : * Note we need to do smaller or equals comparison for SYNCDONE state because
451 : : * it might hold position of end of initial slot consistent point WAL
452 : : * record + 1 (ie start of next record) and next record can be COMMIT of
453 : : * transaction we are now processing (which is what we set remote_final_lsn
454 : : * to in apply_handle_begin).
455 : : *
456 : : * Note that for streaming transactions that are being applied in the parallel
457 : : * apply worker, we disallow applying changes if the target table in the
458 : : * subscription is not in the READY state, because we cannot decide whether to
459 : : * apply the change as we won't know remote_final_lsn by that time.
460 : : *
461 : : * We already checked this in pa_can_start() before assigning the
462 : : * streaming transaction to the parallel worker, but it also needs to be
463 : : * checked here because if the user executes ALTER SUBSCRIPTION ... REFRESH
464 : : * PUBLICATION in parallel, the new table can be added to pg_subscription_rel
465 : : * while applying this transaction.
466 : : */
467 : : static bool
2579 peter_e@gmx.net 468 : 151087 : should_apply_changes_for_rel(LogicalRepRelMapEntry *rel)
469 : : {
236 akapila@postgresql.o 470 [ + + + - :GNC 151087 : switch (MyLogicalRepWorker->type)
- ]
471 : : {
472 : 10 : case WORKERTYPE_TABLESYNC:
473 : 10 : return MyLogicalRepWorker->relid == rel->localreloid;
474 : :
475 : 68385 : case WORKERTYPE_PARALLEL_APPLY:
476 : : /* We don't synchronize rel's that are in unknown state. */
477 [ - + ]: 68385 : if (rel->state != SUBREL_STATE_READY &&
236 akapila@postgresql.o 478 [ # # ]:UNC 0 : rel->state != SUBREL_STATE_UNKNOWN)
479 [ # # ]: 0 : ereport(ERROR,
480 : : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
481 : : errmsg("logical replication parallel apply worker for subscription \"%s\" will stop",
482 : : MySubscription->name),
483 : : errdetail("Cannot handle streamed replication transactions using parallel apply workers until all tables have been synchronized.")));
484 : :
236 akapila@postgresql.o 485 :GNC 68385 : return rel->state == SUBREL_STATE_READY;
486 : :
487 : 82692 : case WORKERTYPE_APPLY:
488 [ + + ]: 85756 : return (rel->state == SUBREL_STATE_READY ||
489 [ + + ]: 3064 : (rel->state == SUBREL_STATE_SYNCDONE &&
490 [ + - ]: 4 : rel->statelsn <= remote_final_lsn));
491 : :
236 akapila@postgresql.o 492 :UNC 0 : case WORKERTYPE_UNKNOWN:
493 : : /* Should never happen. */
494 [ # # ]: 0 : elog(ERROR, "Unknown worker type");
495 : : }
496 : :
497 : 0 : return false; /* dummy for compiler */
498 : : }
499 : :
500 : : /*
501 : : * Begin one step (one INSERT, UPDATE, etc) of a replication transaction.
502 : : *
503 : : * Start a transaction, if this is the first step (else we keep using the
504 : : * existing transaction).
505 : : * Also provide a global snapshot and ensure we run in ApplyMessageContext.
506 : : */
507 : : static void
1039 tgl@sss.pgh.pa.us 508 :CBC 151581 : begin_replication_step(void)
509 : : {
510 : 151581 : SetCurrentStatementStartTimestamp();
511 : :
512 [ + + ]: 151581 : if (!IsTransactionState())
513 : : {
514 : 925 : StartTransactionCommand();
515 : 925 : maybe_reread_subscription();
516 : : }
517 : :
518 : 151578 : PushActiveSnapshot(GetTransactionSnapshot());
519 : :
2532 peter_e@gmx.net 520 : 151578 : MemoryContextSwitchTo(ApplyMessageContext);
1039 tgl@sss.pgh.pa.us 521 : 151578 : }
522 : :
523 : : /*
524 : : * Finish up one step of a replication transaction.
525 : : * Callers of begin_replication_step() must also call this.
526 : : *
527 : : * We don't close out the transaction here, but we should increment
528 : : * the command counter to make the effects of this step visible.
529 : : */
530 : : static void
531 : 151548 : end_replication_step(void)
532 : : {
533 : 151548 : PopActiveSnapshot();
534 : :
535 : 151548 : CommandCounterIncrement();
2642 peter_e@gmx.net 536 : 151548 : }
537 : :
538 : : /*
539 : : * Handle streamed transactions for both the leader apply worker and the
540 : : * parallel apply workers.
541 : : *
542 : : * In the streaming case (receiving a block of the streamed transaction), for
543 : : * serialize mode, simply redirect it to a file for the proper toplevel
544 : : * transaction, and for parallel mode, the leader apply worker will send the
545 : : * changes to parallel apply workers and the parallel apply worker will define
546 : : * savepoints if needed. (LOGICAL_REP_MSG_RELATION or LOGICAL_REP_MSG_TYPE
547 : : * messages will be applied by both leader apply worker and parallel apply
548 : : * workers).
549 : : *
550 : : * Returns true for streamed transactions (when the change is either serialized
551 : : * to file or sent to parallel apply worker), false otherwise (regular mode or
552 : : * needs to be processed by parallel apply worker).
553 : : *
554 : : * Exception: If the message being processed is LOGICAL_REP_MSG_RELATION
555 : : * or LOGICAL_REP_MSG_TYPE, return false even if the message needs to be sent
556 : : * to a parallel apply worker.
557 : : */
558 : : static bool
1235 akapila@postgresql.o 559 : 327428 : handle_streamed_transaction(LogicalRepMsgType action, StringInfo s)
560 : : {
561 : : TransactionId current_xid;
562 : : ParallelApplyWorkerInfo *winfo;
563 : : TransApplyAction apply_action;
564 : : StringInfoData original_msg;
565 : :
461 566 : 327428 : apply_action = get_transaction_apply_action(stream_xid, &winfo);
567 : :
568 : : /* not in streaming mode */
569 [ + + ]: 327428 : if (apply_action == TRANS_LEADER_APPLY)
1319 570 : 83057 : return false;
571 : :
572 [ - + ]: 244371 : Assert(TransactionIdIsValid(stream_xid));
573 : :
574 : : /*
575 : : * The parallel apply worker needs the xid in this message to decide
576 : : * whether to define a savepoint, so save the original message that has
577 : : * not moved the cursor after the xid. We will serialize this message to a
578 : : * file in PARTIAL_SERIALIZE mode.
579 : : */
461 580 : 244371 : original_msg = *s;
581 : :
582 : : /*
583 : : * We should have received XID of the subxact as the first part of the
584 : : * message, so extract it.
585 : : */
586 : 244371 : current_xid = pq_getmsgint(s, 4);
587 : :
588 [ - + ]: 244371 : if (!TransactionIdIsValid(current_xid))
1037 tgl@sss.pgh.pa.us 589 [ # # ]:UBC 0 : ereport(ERROR,
590 : : (errcode(ERRCODE_PROTOCOL_VIOLATION),
591 : : errmsg_internal("invalid transaction ID in streamed replication transaction")));
592 : :
461 akapila@postgresql.o 593 [ + + + + :CBC 244371 : switch (apply_action)
- ]
594 : : {
595 : 102538 : case TRANS_LEADER_SERIALIZE:
596 [ - + ]: 102538 : Assert(stream_fd);
597 : :
598 : : /* Add the new subxact to the array (unless already there). */
599 : 102538 : subxact_info_add(current_xid);
600 : :
601 : : /* Write the change to the current file */
602 : 102538 : stream_write_change(action, s);
603 : 102538 : return true;
604 : :
605 : 68410 : case TRANS_LEADER_SEND_TO_PARALLEL:
606 [ - + ]: 68410 : Assert(winfo);
607 : :
608 : : /*
609 : : * XXX The publisher side doesn't always send relation/type update
610 : : * messages after the streaming transaction, so also update the
611 : : * relation/type in leader apply worker. See function
612 : : * cleanup_rel_sync_cache.
613 : : */
614 [ + - ]: 68410 : if (pa_send_data(winfo, s->len, s->data))
615 [ + + + - ]: 68410 : return (action != LOGICAL_REP_MSG_RELATION &&
616 : : action != LOGICAL_REP_MSG_TYPE);
617 : :
618 : : /*
619 : : * Switch to serialize mode when we are not able to send the
620 : : * change to parallel apply worker.
621 : : */
461 akapila@postgresql.o 622 :UBC 0 : pa_switch_to_partial_serialize(winfo, false);
623 : :
624 : : /* fall through */
461 akapila@postgresql.o 625 :CBC 5006 : case TRANS_LEADER_PARTIAL_SERIALIZE:
626 : 5006 : stream_write_change(action, &original_msg);
627 : :
628 : : /* Same reason as TRANS_LEADER_SEND_TO_PARALLEL case. */
629 [ + + + - ]: 5006 : return (action != LOGICAL_REP_MSG_RELATION &&
630 : : action != LOGICAL_REP_MSG_TYPE);
631 : :
632 : 68417 : case TRANS_PARALLEL_APPLY:
633 : 68417 : parallel_stream_nchanges += 1;
634 : :
635 : : /* Define a savepoint for a subxact if needed. */
636 : 68417 : pa_start_subtrans(current_xid, stream_xid);
637 : 68417 : return false;
638 : :
461 akapila@postgresql.o 639 :UBC 0 : default:
356 msawada@postgresql.o 640 [ # # ]: 0 : elog(ERROR, "unexpected apply action: %d", (int) apply_action);
641 : : return false; /* silence compiler warning */
642 : : }
643 : : }
644 : :
645 : : /*
646 : : * Executor state preparation for evaluation of constraint expressions,
647 : : * indexes and triggers for the specified relation.
648 : : *
649 : : * Note that the caller must open and close any indexes to be updated.
650 : : */
651 : : static ApplyExecutionData *
1058 tgl@sss.pgh.pa.us 652 :CBC 147998 : create_edata_for_relation(LogicalRepRelMapEntry *rel)
653 : : {
654 : : ApplyExecutionData *edata;
655 : : EState *estate;
656 : : RangeTblEntry *rte;
405 657 : 147998 : List *perminfos = NIL;
658 : : ResultRelInfo *resultRelInfo;
659 : :
1058 660 : 147998 : edata = (ApplyExecutionData *) palloc0(sizeof(ApplyExecutionData));
661 : 147998 : edata->targetRel = rel;
662 : :
663 : 147998 : edata->estate = estate = CreateExecutorState();
664 : :
2642 peter_e@gmx.net 665 : 147998 : rte = makeNode(RangeTblEntry);
666 : 147998 : rte->rtekind = RTE_RELATION;
667 : 147998 : rte->relid = RelationGetRelid(rel->localrel);
668 : 147998 : rte->relkind = rel->localrel->rd_rel->relkind;
2023 tgl@sss.pgh.pa.us 669 : 147998 : rte->rellockmode = AccessShareLock;
670 : :
405 671 : 147998 : addRTEPermissionInfo(&perminfos, rte);
672 : :
673 : 147998 : ExecInitRangeTable(estate, list_make1(rte), perminfos);
674 : :
1058 675 : 147998 : edata->targetRelInfo = resultRelInfo = makeNode(ResultRelInfo);
676 : :
677 : : /*
678 : : * Use Relation opened by logicalrep_rel_open() instead of opening it
679 : : * again.
680 : : */
681 : 147998 : InitResultRelInfo(resultRelInfo, rel->localrel, 1, NULL, 0);
682 : :
683 : : /*
684 : : * We put the ResultRelInfo in the es_opened_result_relations list, even
685 : : * though we don't populate the es_result_relations array. That's a bit
686 : : * bogus, but it's enough to make ExecGetTriggerResultRel() find them.
687 : : *
688 : : * ExecOpenIndices() is not called here either, each execution path doing
689 : : * an apply operation being responsible for that.
690 : : */
1088 michael@paquier.xyz 691 : 147998 : estate->es_opened_result_relations =
1058 tgl@sss.pgh.pa.us 692 : 147998 : lappend(estate->es_opened_result_relations, resultRelInfo);
693 : :
2335 simon@2ndQuadrant.co 694 : 147998 : estate->es_output_cid = GetCurrentCommandId(true);
695 : :
696 : : /* Prepare to catch AFTER triggers. */
2599 peter_e@gmx.net 697 : 147998 : AfterTriggerBeginQuery();
698 : :
699 : : /* other fields of edata remain NULL for now */
700 : :
1058 tgl@sss.pgh.pa.us 701 : 147998 : return edata;
702 : : }
703 : :
704 : : /*
705 : : * Finish any operations related to the executor state created by
706 : : * create_edata_for_relation().
707 : : */
708 : : static void
709 : 147978 : finish_edata(ApplyExecutionData *edata)
710 : : {
711 : 147978 : EState *estate = edata->estate;
712 : :
713 : : /* Handle any queued AFTER triggers. */
1088 michael@paquier.xyz 714 : 147978 : AfterTriggerEndQuery(estate);
715 : :
716 : : /* Shut down tuple routing, if any was done. */
1058 tgl@sss.pgh.pa.us 717 [ + + ]: 147978 : if (edata->proute)
718 : 73 : ExecCleanupTupleRouting(edata->mtstate, edata->proute);
719 : :
720 : : /*
721 : : * Cleanup. It might seem that we should call ExecCloseResultRelations()
722 : : * here, but we intentionally don't. It would close the rel we added to
723 : : * es_opened_result_relations above, which is wrong because we took no
724 : : * corresponding refcount. We rely on ExecCleanupTupleRouting() to close
725 : : * any other relations opened during execution.
726 : : */
1088 michael@paquier.xyz 727 : 147978 : ExecResetTupleTable(estate->es_tupleTable, false);
728 : 147978 : FreeExecutorState(estate);
1058 tgl@sss.pgh.pa.us 729 : 147978 : pfree(edata);
1088 michael@paquier.xyz 730 : 147978 : }
731 : :
732 : : /*
733 : : * Executes default values for columns for which we can't map to remote
734 : : * relation columns.
735 : : *
736 : : * This allows us to support tables which have more columns on the downstream
737 : : * than on the upstream.
738 : : */
739 : : static void
2642 peter_e@gmx.net 740 : 75749 : slot_fill_defaults(LogicalRepRelMapEntry *rel, EState *estate,
741 : : TupleTableSlot *slot)
742 : : {
743 : 75749 : TupleDesc desc = RelationGetDescr(rel->localrel);
744 : 75749 : int num_phys_attrs = desc->natts;
745 : : int i;
746 : : int attnum,
747 : 75749 : num_defaults = 0;
748 : : int *defmap;
749 : : ExprState **defexprs;
750 : : ExprContext *econtext;
751 : :
752 [ + - ]: 75749 : econtext = GetPerTupleExprContext(estate);
753 : :
754 : : /* We got all the data via replication, no need to evaluate anything. */
755 [ + + ]: 75749 : if (num_phys_attrs == rel->remoterel.natts)
756 : 35593 : return;
757 : :
758 : 40156 : defmap = (int *) palloc(num_phys_attrs * sizeof(int));
759 : 40156 : defexprs = (ExprState **) palloc(num_phys_attrs * sizeof(ExprState *));
760 : :
1579 michael@paquier.xyz 761 [ - + ]: 40156 : Assert(rel->attrmap->maplen == num_phys_attrs);
2642 peter_e@gmx.net 762 [ + + ]: 210725 : for (attnum = 0; attnum < num_phys_attrs; attnum++)
763 : : {
764 : : Expr *defexpr;
765 : :
1842 peter@eisentraut.org 766 [ + - + + ]: 170569 : if (TupleDescAttr(desc, attnum)->attisdropped || TupleDescAttr(desc, attnum)->attgenerated)
2642 peter_e@gmx.net 767 : 4 : continue;
768 : :
1579 michael@paquier.xyz 769 [ + + ]: 170565 : if (rel->attrmap->attnums[attnum] >= 0)
2642 peter_e@gmx.net 770 : 92296 : continue;
771 : :
772 : 78269 : defexpr = (Expr *) build_column_default(rel->localrel, attnum + 1);
773 : :
774 [ + + ]: 78269 : if (defexpr != NULL)
775 : : {
776 : : /* Run the expression through planner */
777 : 70167 : defexpr = expression_planner(defexpr);
778 : :
779 : : /* Initialize executable expression in copycontext */
780 : 70167 : defexprs[num_defaults] = ExecInitExpr(defexpr, NULL);
781 : 70167 : defmap[num_defaults] = attnum;
782 : 70167 : num_defaults++;
783 : : }
784 : : }
785 : :
786 [ + + ]: 110323 : for (i = 0; i < num_defaults; i++)
787 : 70167 : slot->tts_values[defmap[i]] =
788 : 70167 : ExecEvalExpr(defexprs[i], econtext, &slot->tts_isnull[defmap[i]]);
789 : : }
790 : :
791 : : /*
792 : : * Store tuple data into slot.
793 : : *
794 : : * Incoming data can be either text or binary format.
795 : : */
796 : : static void
1366 tgl@sss.pgh.pa.us 797 : 147998 : slot_store_data(TupleTableSlot *slot, LogicalRepRelMapEntry *rel,
798 : : LogicalRepTupleData *tupleData)
799 : : {
2524 bruce@momjian.us 800 : 147998 : int natts = slot->tts_tupleDescriptor->natts;
801 : : int i;
802 : :
2642 peter_e@gmx.net 803 : 147998 : ExecClearTuple(slot);
804 : :
805 : : /* Call the "in" function for each non-dropped, non-null attribute */
1579 michael@paquier.xyz 806 [ - + ]: 147998 : Assert(natts == rel->attrmap->maplen);
2642 peter_e@gmx.net 807 [ + + ]: 657449 : for (i = 0; i < natts; i++)
808 : : {
2429 andres@anarazel.de 809 : 509451 : Form_pg_attribute att = TupleDescAttr(slot->tts_tupleDescriptor, i);
1579 michael@paquier.xyz 810 : 509451 : int remoteattnum = rel->attrmap->attnums[i];
811 : :
1366 tgl@sss.pgh.pa.us 812 [ + + + + ]: 509451 : if (!att->attisdropped && remoteattnum >= 0)
2642 peter_e@gmx.net 813 : 302469 : {
1366 tgl@sss.pgh.pa.us 814 : 302469 : StringInfo colvalue = &tupleData->colvalues[remoteattnum];
815 : :
1364 816 [ - + ]: 302469 : Assert(remoteattnum < tupleData->ncols);
817 : :
818 : : /* Set attnum for error callback */
961 akapila@postgresql.o 819 : 302469 : apply_error_callback_arg.remote_attnum = remoteattnum;
820 : :
1366 tgl@sss.pgh.pa.us 821 [ + + ]: 302469 : if (tupleData->colstatus[remoteattnum] == LOGICALREP_COLUMN_TEXT)
822 : : {
823 : : Oid typinput;
824 : : Oid typioparam;
825 : :
826 : 142162 : getTypeInputInfo(att->atttypid, &typinput, &typioparam);
827 : 284324 : slot->tts_values[i] =
828 : 142162 : OidInputFunctionCall(typinput, colvalue->data,
829 : : typioparam, att->atttypmod);
830 : 142162 : slot->tts_isnull[i] = false;
831 : : }
832 [ + + ]: 160307 : else if (tupleData->colstatus[remoteattnum] == LOGICALREP_COLUMN_BINARY)
833 : : {
834 : : Oid typreceive;
835 : : Oid typioparam;
836 : :
837 : : /*
838 : : * In some code paths we may be asked to re-parse the same
839 : : * tuple data. Reset the StringInfo's cursor so that works.
840 : : */
841 : 109980 : colvalue->cursor = 0;
842 : :
843 : 109980 : getTypeBinaryInputInfo(att->atttypid, &typreceive, &typioparam);
844 : 219960 : slot->tts_values[i] =
845 : 109980 : OidReceiveFunctionCall(typreceive, colvalue,
846 : : typioparam, att->atttypmod);
847 : :
848 : : /* Trouble if it didn't eat the whole buffer */
849 [ - + ]: 109980 : if (colvalue->cursor != colvalue->len)
1366 tgl@sss.pgh.pa.us 850 [ # # ]:UBC 0 : ereport(ERROR,
851 : : (errcode(ERRCODE_INVALID_BINARY_REPRESENTATION),
852 : : errmsg("incorrect binary data format in logical replication column %d",
853 : : remoteattnum + 1)));
1366 tgl@sss.pgh.pa.us 854 :CBC 109980 : slot->tts_isnull[i] = false;
855 : : }
856 : : else
857 : : {
858 : : /*
859 : : * NULL value from remote. (We don't expect to see
860 : : * LOGICALREP_COLUMN_UNCHANGED here, but if we do, treat it as
861 : : * NULL.)
862 : : */
863 : 50327 : slot->tts_values[i] = (Datum) 0;
864 : 50327 : slot->tts_isnull[i] = true;
865 : : }
866 : :
867 : : /* Reset attnum for error callback */
961 akapila@postgresql.o 868 : 302469 : apply_error_callback_arg.remote_attnum = -1;
869 : : }
870 : : else
871 : : {
872 : : /*
873 : : * We assign NULL to dropped attributes and missing values
874 : : * (missing values should be later filled using
875 : : * slot_fill_defaults).
876 : : */
2642 peter_e@gmx.net 877 : 206982 : slot->tts_values[i] = (Datum) 0;
878 : 206982 : slot->tts_isnull[i] = true;
879 : : }
880 : : }
881 : :
882 : 147998 : ExecStoreVirtualTuple(slot);
883 : 147998 : }
884 : :
885 : : /*
886 : : * Replace updated columns with data from the LogicalRepTupleData struct.
887 : : * This is somewhat similar to heap_modify_tuple but also calls the type
888 : : * input functions on the user data.
889 : : *
890 : : * "slot" is filled with a copy of the tuple in "srcslot", replacing
891 : : * columns provided in "tupleData" and leaving others as-is.
892 : : *
893 : : * Caution: unreplaced pass-by-ref columns in "slot" will point into the
894 : : * storage for "srcslot". This is OK for current usage, but someday we may
895 : : * need to materialize "slot" at the end to make it independent of "srcslot".
896 : : */
897 : : static void
1366 tgl@sss.pgh.pa.us 898 : 31926 : slot_modify_data(TupleTableSlot *slot, TupleTableSlot *srcslot,
899 : : LogicalRepRelMapEntry *rel,
900 : : LogicalRepTupleData *tupleData)
901 : : {
2524 bruce@momjian.us 902 : 31926 : int natts = slot->tts_tupleDescriptor->natts;
903 : : int i;
904 : :
905 : : /* We'll fill "slot" with a virtual tuple, so we must start with ... */
2642 peter_e@gmx.net 906 : 31926 : ExecClearTuple(slot);
907 : :
908 : : /*
909 : : * Copy all the column data from srcslot, so that we'll have valid values
910 : : * for unreplaced columns.
911 : : */
1605 tgl@sss.pgh.pa.us 912 [ - + ]: 31926 : Assert(natts == srcslot->tts_tupleDescriptor->natts);
913 : 31926 : slot_getallattrs(srcslot);
914 : 31926 : memcpy(slot->tts_values, srcslot->tts_values, natts * sizeof(Datum));
915 : 31926 : memcpy(slot->tts_isnull, srcslot->tts_isnull, natts * sizeof(bool));
916 : :
917 : : /* Call the "in" function for each replaced attribute */
1579 michael@paquier.xyz 918 [ - + ]: 31926 : Assert(natts == rel->attrmap->maplen);
2642 peter_e@gmx.net 919 [ + + ]: 159301 : for (i = 0; i < natts; i++)
920 : : {
2429 andres@anarazel.de 921 : 127375 : Form_pg_attribute att = TupleDescAttr(slot->tts_tupleDescriptor, i);
1579 michael@paquier.xyz 922 : 127375 : int remoteattnum = rel->attrmap->attnums[i];
923 : :
2354 peter_e@gmx.net 924 [ + + ]: 127375 : if (remoteattnum < 0)
2642 925 : 58542 : continue;
926 : :
1364 tgl@sss.pgh.pa.us 927 [ - + ]: 68833 : Assert(remoteattnum < tupleData->ncols);
928 : :
1366 929 [ + + ]: 68833 : if (tupleData->colstatus[remoteattnum] != LOGICALREP_COLUMN_UNCHANGED)
930 : : {
931 : 68830 : StringInfo colvalue = &tupleData->colvalues[remoteattnum];
932 : :
933 : : /* Set attnum for error callback */
961 akapila@postgresql.o 934 : 68830 : apply_error_callback_arg.remote_attnum = remoteattnum;
935 : :
1366 tgl@sss.pgh.pa.us 936 [ + + ]: 68830 : if (tupleData->colstatus[remoteattnum] == LOGICALREP_COLUMN_TEXT)
937 : : {
938 : : Oid typinput;
939 : : Oid typioparam;
940 : :
941 : 25427 : getTypeInputInfo(att->atttypid, &typinput, &typioparam);
942 : 50854 : slot->tts_values[i] =
943 : 25427 : OidInputFunctionCall(typinput, colvalue->data,
944 : : typioparam, att->atttypmod);
945 : 25427 : slot->tts_isnull[i] = false;
946 : : }
947 [ + + ]: 43403 : else if (tupleData->colstatus[remoteattnum] == LOGICALREP_COLUMN_BINARY)
948 : : {
949 : : Oid typreceive;
950 : : Oid typioparam;
951 : :
952 : : /*
953 : : * In some code paths we may be asked to re-parse the same
954 : : * tuple data. Reset the StringInfo's cursor so that works.
955 : : */
956 : 43356 : colvalue->cursor = 0;
957 : :
958 : 43356 : getTypeBinaryInputInfo(att->atttypid, &typreceive, &typioparam);
959 : 86712 : slot->tts_values[i] =
960 : 43356 : OidReceiveFunctionCall(typreceive, colvalue,
961 : : typioparam, att->atttypmod);
962 : :
963 : : /* Trouble if it didn't eat the whole buffer */
964 [ - + ]: 43356 : if (colvalue->cursor != colvalue->len)
1366 tgl@sss.pgh.pa.us 965 [ # # ]:UBC 0 : ereport(ERROR,
966 : : (errcode(ERRCODE_INVALID_BINARY_REPRESENTATION),
967 : : errmsg("incorrect binary data format in logical replication column %d",
968 : : remoteattnum + 1)));
1366 tgl@sss.pgh.pa.us 969 :CBC 43356 : slot->tts_isnull[i] = false;
970 : : }
971 : : else
972 : : {
973 : : /* must be LOGICALREP_COLUMN_NULL */
974 : 47 : slot->tts_values[i] = (Datum) 0;
975 : 47 : slot->tts_isnull[i] = true;
976 : : }
977 : :
978 : : /* Reset attnum for error callback */
961 akapila@postgresql.o 979 : 68830 : apply_error_callback_arg.remote_attnum = -1;
980 : : }
981 : : }
982 : :
983 : : /* And finally, declare that "slot" contains a valid virtual tuple */
2642 peter_e@gmx.net 984 : 31926 : ExecStoreVirtualTuple(slot);
985 : 31926 : }
986 : :
987 : : /*
988 : : * Handle BEGIN message.
989 : : */
990 : : static void
991 : 412 : apply_handle_begin(StringInfo s)
992 : : {
993 : : LogicalRepBeginData begin_data;
994 : :
995 : : /* There must not be an active streaming transaction. */
453 akapila@postgresql.o 996 [ - + ]: 412 : Assert(!TransactionIdIsValid(stream_xid));
997 : :
2642 peter_e@gmx.net 998 : 412 : logicalrep_read_begin(s, &begin_data);
768 akapila@postgresql.o 999 : 412 : set_apply_error_context_xact(begin_data.xid, begin_data.final_lsn);
1000 : :
2579 peter_e@gmx.net 1001 : 412 : remote_final_lsn = begin_data.final_lsn;
1002 : :
754 akapila@postgresql.o 1003 : 412 : maybe_start_skipping_changes(begin_data.final_lsn);
1004 : :
2642 peter_e@gmx.net 1005 : 412 : in_remote_transaction = true;
1006 : :
1007 : 412 : pgstat_report_activity(STATE_RUNNING, NULL);
1008 : 412 : }
1009 : :
1010 : : /*
1011 : : * Handle COMMIT message.
1012 : : *
1013 : : * TODO, support tracking of multiple origins
1014 : : */
1015 : : static void
1016 : 382 : apply_handle_commit(StringInfo s)
1017 : : {
1018 : : LogicalRepCommitData commit_data;
1019 : :
1020 : 382 : logicalrep_read_commit(s, &commit_data);
1021 : :
1037 tgl@sss.pgh.pa.us 1022 [ - + ]: 382 : if (commit_data.commit_lsn != remote_final_lsn)
1037 tgl@sss.pgh.pa.us 1023 [ # # ]:UBC 0 : ereport(ERROR,
1024 : : (errcode(ERRCODE_PROTOCOL_VIOLATION),
1025 : : errmsg_internal("incorrect commit LSN %X/%X in commit message (expected %X/%X)",
1026 : : LSN_FORMAT_ARGS(commit_data.commit_lsn),
1027 : : LSN_FORMAT_ARGS(remote_final_lsn))));
1028 : :
989 akapila@postgresql.o 1029 :CBC 382 : apply_handle_commit_internal(&commit_data);
1030 : :
1031 : : /* Process any tables that are being synchronized in parallel. */
2579 peter_e@gmx.net 1032 : 382 : process_syncing_tables(commit_data.end_lsn);
1033 : :
2642 1034 : 382 : pgstat_report_activity(STATE_IDLE, NULL);
961 akapila@postgresql.o 1035 : 382 : reset_apply_error_context_info();
2642 peter_e@gmx.net 1036 : 382 : }
1037 : :
1038 : : /*
1039 : : * Handle BEGIN PREPARE message.
1040 : : */
1041 : : static void
1005 akapila@postgresql.o 1042 : 19 : apply_handle_begin_prepare(StringInfo s)
1043 : : {
1044 : : LogicalRepPreparedTxnData begin_data;
1045 : :
1046 : : /* Tablesync should never receive prepare. */
1047 [ - + ]: 19 : if (am_tablesync_worker())
1005 akapila@postgresql.o 1048 [ # # ]:UBC 0 : ereport(ERROR,
1049 : : (errcode(ERRCODE_PROTOCOL_VIOLATION),
1050 : : errmsg_internal("tablesync worker received a BEGIN PREPARE message")));
1051 : :
1052 : : /* There must not be an active streaming transaction. */
453 akapila@postgresql.o 1053 [ - + ]:CBC 19 : Assert(!TransactionIdIsValid(stream_xid));
1054 : :
1005 1055 : 19 : logicalrep_read_begin_prepare(s, &begin_data);
768 1056 : 19 : set_apply_error_context_xact(begin_data.xid, begin_data.prepare_lsn);
1057 : :
1005 1058 : 19 : remote_final_lsn = begin_data.prepare_lsn;
1059 : :
754 1060 : 19 : maybe_start_skipping_changes(begin_data.prepare_lsn);
1061 : :
1005 1062 : 19 : in_remote_transaction = true;
1063 : :
1064 : 19 : pgstat_report_activity(STATE_RUNNING, NULL);
1065 : 19 : }
1066 : :
1067 : : /*
1068 : : * Common function to prepare the GID.
1069 : : */
1070 : : static void
990 1071 : 32 : apply_handle_prepare_internal(LogicalRepPreparedTxnData *prepare_data)
1072 : : {
1073 : : char gid[GIDSIZE];
1074 : :
1075 : : /*
1076 : : * Compute unique GID for two_phase transactions. We don't use GID of
1077 : : * prepared transaction sent by server as that can lead to deadlock when
1078 : : * we have multiple subscriptions from same node point to publications on
1079 : : * the same node. See comments atop worker.c
1080 : : */
1081 : 32 : TwoPhaseTransactionGid(MySubscription->oid, prepare_data->xid,
1082 : : gid, sizeof(gid));
1083 : :
1084 : : /*
1085 : : * BeginTransactionBlock is necessary to balance the EndTransactionBlock
1086 : : * called within the PrepareTransactionBlock below.
1087 : : */
461 1088 [ + - ]: 32 : if (!IsTransactionBlock())
1089 : : {
1090 : 32 : BeginTransactionBlock();
1091 : 32 : CommitTransactionCommand(); /* Completes the preceding Begin command. */
1092 : : }
1093 : :
1094 : : /*
1095 : : * Update origin state so we can restart streaming from correct position
1096 : : * in case of crash.
1097 : : */
990 1098 : 32 : replorigin_session_origin_lsn = prepare_data->end_lsn;
1099 : 32 : replorigin_session_origin_timestamp = prepare_data->prepare_time;
1100 : :
1101 : 32 : PrepareTransactionBlock(gid);
1102 : 32 : }
1103 : :
1104 : : /*
1105 : : * Handle PREPARE message.
1106 : : */
1107 : : static void
1005 1108 : 18 : apply_handle_prepare(StringInfo s)
1109 : : {
1110 : : LogicalRepPreparedTxnData prepare_data;
1111 : :
1112 : 18 : logicalrep_read_prepare(s, &prepare_data);
1113 : :
1114 [ - + ]: 18 : if (prepare_data.prepare_lsn != remote_final_lsn)
1005 akapila@postgresql.o 1115 [ # # ]:UBC 0 : ereport(ERROR,
1116 : : (errcode(ERRCODE_PROTOCOL_VIOLATION),
1117 : : errmsg_internal("incorrect prepare LSN %X/%X in prepare message (expected %X/%X)",
1118 : : LSN_FORMAT_ARGS(prepare_data.prepare_lsn),
1119 : : LSN_FORMAT_ARGS(remote_final_lsn))));
1120 : :
1121 : : /*
1122 : : * Unlike commit, here, we always prepare the transaction even though no
1123 : : * change has happened in this transaction or all changes are skipped. It
1124 : : * is done this way because at commit prepared time, we won't know whether
1125 : : * we have skipped preparing a transaction because of those reasons.
1126 : : *
1127 : : * XXX, We can optimize such that at commit prepared time, we first check
1128 : : * whether we have prepared the transaction or not but that doesn't seem
1129 : : * worthwhile because such cases shouldn't be common.
1130 : : */
1005 akapila@postgresql.o 1131 :CBC 18 : begin_replication_step();
1132 : :
990 1133 : 18 : apply_handle_prepare_internal(&prepare_data);
1134 : :
1005 1135 : 18 : end_replication_step();
1136 : 18 : CommitTransactionCommand();
1137 : 18 : pgstat_report_stat(false);
1138 : :
461 1139 : 18 : store_flush_position(prepare_data.end_lsn, XactLastCommitEnd);
1140 : :
1005 1141 : 18 : in_remote_transaction = false;
1142 : :
1143 : : /* Process any tables that are being synchronized in parallel. */
1144 : 18 : process_syncing_tables(prepare_data.end_lsn);
1145 : :
1146 : : /*
1147 : : * Since we have already prepared the transaction, in a case where the
1148 : : * server crashes before clearing the subskiplsn, it will be left but the
1149 : : * transaction won't be resent. But that's okay because it's a rare case
1150 : : * and the subskiplsn will be cleared when finishing the next transaction.
1151 : : */
754 1152 : 18 : stop_skipping_changes();
1153 : 18 : clear_subscription_skip_lsn(prepare_data.prepare_lsn);
1154 : :
1005 1155 : 18 : pgstat_report_activity(STATE_IDLE, NULL);
961 1156 : 18 : reset_apply_error_context_info();
1005 1157 : 18 : }
1158 : :
1159 : : /*
1160 : : * Handle a COMMIT PREPARED of a previously PREPARED transaction.
1161 : : *
1162 : : * Note that we don't need to wait here if the transaction was prepared in a
1163 : : * parallel apply worker. In that case, we have already waited for the prepare
1164 : : * to finish in apply_handle_stream_prepare() which will ensure all the
1165 : : * operations in that transaction have happened in the subscriber, so no
1166 : : * concurrent transaction can cause deadlock or transaction dependency issues.
1167 : : */
1168 : : static void
1169 : 23 : apply_handle_commit_prepared(StringInfo s)
1170 : : {
1171 : : LogicalRepCommitPreparedTxnData prepare_data;
1172 : : char gid[GIDSIZE];
1173 : :
1174 : 23 : logicalrep_read_commit_prepared(s, &prepare_data);
768 1175 : 23 : set_apply_error_context_xact(prepare_data.xid, prepare_data.commit_lsn);
1176 : :
1177 : : /* Compute GID for two_phase transactions. */
1005 1178 : 23 : TwoPhaseTransactionGid(MySubscription->oid, prepare_data.xid,
1179 : : gid, sizeof(gid));
1180 : :
1181 : : /* There is no transaction when COMMIT PREPARED is called */
1182 : 23 : begin_replication_step();
1183 : :
1184 : : /*
1185 : : * Update origin state so we can restart streaming from correct position
1186 : : * in case of crash.
1187 : : */
1188 : 23 : replorigin_session_origin_lsn = prepare_data.end_lsn;
1189 : 23 : replorigin_session_origin_timestamp = prepare_data.commit_time;
1190 : :
1191 : 23 : FinishPreparedTransaction(gid, true);
1192 : 23 : end_replication_step();
1193 : 23 : CommitTransactionCommand();
1194 : 23 : pgstat_report_stat(false);
1195 : :
461 1196 : 23 : store_flush_position(prepare_data.end_lsn, XactLastCommitEnd);
1005 1197 : 23 : in_remote_transaction = false;
1198 : :
1199 : : /* Process any tables that are being synchronized in parallel. */
1200 : 23 : process_syncing_tables(prepare_data.end_lsn);
1201 : :
754 1202 : 23 : clear_subscription_skip_lsn(prepare_data.end_lsn);
1203 : :
1005 1204 : 23 : pgstat_report_activity(STATE_IDLE, NULL);
961 1205 : 23 : reset_apply_error_context_info();
1005 1206 : 23 : }
1207 : :
1208 : : /*
1209 : : * Handle a ROLLBACK PREPARED of a previously PREPARED TRANSACTION.
1210 : : *
1211 : : * Note that we don't need to wait here if the transaction was prepared in a
1212 : : * parallel apply worker. In that case, we have already waited for the prepare
1213 : : * to finish in apply_handle_stream_prepare() which will ensure all the
1214 : : * operations in that transaction have happened in the subscriber, so no
1215 : : * concurrent transaction can cause deadlock or transaction dependency issues.
1216 : : */
1217 : : static void
1218 : 9 : apply_handle_rollback_prepared(StringInfo s)
1219 : : {
1220 : : LogicalRepRollbackPreparedTxnData rollback_data;
1221 : : char gid[GIDSIZE];
1222 : :
1223 : 9 : logicalrep_read_rollback_prepared(s, &rollback_data);
768 1224 : 9 : set_apply_error_context_xact(rollback_data.xid, rollback_data.rollback_end_lsn);
1225 : :
1226 : : /* Compute GID for two_phase transactions. */
1005 1227 : 9 : TwoPhaseTransactionGid(MySubscription->oid, rollback_data.xid,
1228 : : gid, sizeof(gid));
1229 : :
1230 : : /*
1231 : : * It is possible that we haven't received prepare because it occurred
1232 : : * before walsender reached a consistent point or the two_phase was still
1233 : : * not enabled by that time, so in such cases, we need to skip rollback
1234 : : * prepared.
1235 : : */
1236 [ + - ]: 9 : if (LookupGXact(gid, rollback_data.prepare_end_lsn,
1237 : : rollback_data.prepare_time))
1238 : : {
1239 : : /*
1240 : : * Update origin state so we can restart streaming from correct
1241 : : * position in case of crash.
1242 : : */
1243 : 9 : replorigin_session_origin_lsn = rollback_data.rollback_end_lsn;
1244 : 9 : replorigin_session_origin_timestamp = rollback_data.rollback_time;
1245 : :
1246 : : /* There is no transaction when ABORT/ROLLBACK PREPARED is called */
1247 : 9 : begin_replication_step();
1248 : 9 : FinishPreparedTransaction(gid, false);
1249 : 9 : end_replication_step();
1250 : 9 : CommitTransactionCommand();
1251 : :
754 1252 : 9 : clear_subscription_skip_lsn(rollback_data.rollback_end_lsn);
1253 : : }
1254 : :
1005 1255 : 9 : pgstat_report_stat(false);
1256 : :
461 1257 : 9 : store_flush_position(rollback_data.rollback_end_lsn, XactLastCommitEnd);
1005 1258 : 9 : in_remote_transaction = false;
1259 : :
1260 : : /* Process any tables that are being synchronized in parallel. */
1261 : 9 : process_syncing_tables(rollback_data.rollback_end_lsn);
1262 : :
1263 : 9 : pgstat_report_activity(STATE_IDLE, NULL);
961 1264 : 9 : reset_apply_error_context_info();
1005 1265 : 9 : }
1266 : :
1267 : : /*
1268 : : * Handle STREAM PREPARE.
1269 : : */
1270 : : static void
984 1271 : 20 : apply_handle_stream_prepare(StringInfo s)
1272 : : {
1273 : : LogicalRepPreparedTxnData prepare_data;
1274 : : ParallelApplyWorkerInfo *winfo;
1275 : : TransApplyAction apply_action;
1276 : :
1277 : : /* Save the message before it is consumed. */
461 1278 : 20 : StringInfoData original_msg = *s;
1279 : :
984 1280 [ - + ]: 20 : if (in_streamed_transaction)
984 akapila@postgresql.o 1281 [ # # ]:UBC 0 : ereport(ERROR,
1282 : : (errcode(ERRCODE_PROTOCOL_VIOLATION),
1283 : : errmsg_internal("STREAM PREPARE message without STREAM STOP")));
1284 : :
1285 : : /* Tablesync should never receive prepare. */
984 akapila@postgresql.o 1286 [ - + ]:CBC 20 : if (am_tablesync_worker())
984 akapila@postgresql.o 1287 [ # # ]:UBC 0 : ereport(ERROR,
1288 : : (errcode(ERRCODE_PROTOCOL_VIOLATION),
1289 : : errmsg_internal("tablesync worker received a STREAM PREPARE message")));
1290 : :
984 akapila@postgresql.o 1291 :CBC 20 : logicalrep_read_stream_prepare(s, &prepare_data);
768 1292 : 20 : set_apply_error_context_xact(prepare_data.xid, prepare_data.prepare_lsn);
1293 : :
461 1294 : 20 : apply_action = get_transaction_apply_action(prepare_data.xid, &winfo);
1295 : :
1296 [ + + + + : 20 : switch (apply_action)
- ]
1297 : : {
453 1298 : 8 : case TRANS_LEADER_APPLY:
1299 : :
1300 : : /*
1301 : : * The transaction has been serialized to file, so replay all the
1302 : : * spooled operations.
1303 : : */
461 1304 : 8 : apply_spooled_messages(MyLogicalRepWorker->stream_fileset,
1305 : : prepare_data.xid, prepare_data.prepare_lsn);
1306 : :
1307 : : /* Mark the transaction as prepared. */
1308 : 8 : apply_handle_prepare_internal(&prepare_data);
1309 : :
1310 : 8 : CommitTransactionCommand();
1311 : :
1312 : 8 : store_flush_position(prepare_data.end_lsn, XactLastCommitEnd);
1313 : :
1314 : 8 : in_remote_transaction = false;
1315 : :
1316 : : /* Unlink the files with serialized changes and subxact info. */
1317 : 8 : stream_cleanup_files(MyLogicalRepWorker->subid, prepare_data.xid);
1318 : :
1319 [ - + ]: 8 : elog(DEBUG1, "finished processing the STREAM PREPARE command");
1320 : 8 : break;
1321 : :
1322 : 5 : case TRANS_LEADER_SEND_TO_PARALLEL:
1323 [ - + ]: 5 : Assert(winfo);
1324 : :
1325 [ + - ]: 5 : if (pa_send_data(winfo, s->len, s->data))
1326 : : {
1327 : : /* Finish processing the streaming transaction. */
1328 : 5 : pa_xact_finish(winfo, prepare_data.end_lsn);
1329 : 5 : break;
1330 : : }
1331 : :
1332 : : /*
1333 : : * Switch to serialize mode when we are not able to send the
1334 : : * change to parallel apply worker.
1335 : : */
461 akapila@postgresql.o 1336 :UBC 0 : pa_switch_to_partial_serialize(winfo, true);
1337 : :
1338 : : /* fall through */
461 akapila@postgresql.o 1339 :CBC 1 : case TRANS_LEADER_PARTIAL_SERIALIZE:
1340 [ - + ]: 1 : Assert(winfo);
1341 : :
1342 : 1 : stream_open_and_write_change(prepare_data.xid,
1343 : : LOGICAL_REP_MSG_STREAM_PREPARE,
1344 : : &original_msg);
1345 : :
1346 : 1 : pa_set_fileset_state(winfo->shared, FS_SERIALIZE_DONE);
1347 : :
1348 : : /* Finish processing the streaming transaction. */
1349 : 1 : pa_xact_finish(winfo, prepare_data.end_lsn);
1350 : 1 : break;
1351 : :
1352 : 6 : case TRANS_PARALLEL_APPLY:
1353 : :
1354 : : /*
1355 : : * If the parallel apply worker is applying spooled messages then
1356 : : * close the file before preparing.
1357 : : */
1358 [ + + ]: 6 : if (stream_fd)
1359 : 1 : stream_close_file();
1360 : :
1361 : 6 : begin_replication_step();
1362 : :
1363 : : /* Mark the transaction as prepared. */
1364 : 6 : apply_handle_prepare_internal(&prepare_data);
1365 : :
1366 : 6 : end_replication_step();
1367 : :
1368 : 6 : CommitTransactionCommand();
1369 : :
1370 : 6 : MyParallelShared->last_commit_end = XactLastCommitEnd;
1371 : :
1372 : 6 : pa_set_xact_state(MyParallelShared, PARALLEL_TRANS_FINISHED);
1373 : 6 : pa_unlock_transaction(MyParallelShared->xid, AccessExclusiveLock);
1374 : :
1375 : 6 : pa_reset_subtrans();
1376 : :
1377 [ + + ]: 6 : elog(DEBUG1, "finished processing the STREAM PREPARE command");
1378 : 6 : break;
1379 : :
461 akapila@postgresql.o 1380 :UBC 0 : default:
453 1381 [ # # ]: 0 : elog(ERROR, "unexpected apply action: %d", (int) apply_action);
1382 : : break;
1383 : : }
1384 : :
461 akapila@postgresql.o 1385 :CBC 20 : pgstat_report_stat(false);
1386 : :
1387 : : /* Process any tables that are being synchronized in parallel. */
984 1388 : 20 : process_syncing_tables(prepare_data.end_lsn);
1389 : :
1390 : : /*
1391 : : * Similar to prepare case, the subskiplsn could be left in a case of
1392 : : * server crash but it's okay. See the comments in apply_handle_prepare().
1393 : : */
754 1394 : 20 : stop_skipping_changes();
1395 : 20 : clear_subscription_skip_lsn(prepare_data.prepare_lsn);
1396 : :
984 1397 : 20 : pgstat_report_activity(STATE_IDLE, NULL);
1398 : :
961 1399 : 20 : reset_apply_error_context_info();
984 1400 : 20 : }
1401 : :
1402 : : /*
1403 : : * Handle ORIGIN message.
1404 : : *
1405 : : * TODO, support tracking of multiple origins
1406 : : */
1407 : : static void
2642 peter_e@gmx.net 1408 : 8 : apply_handle_origin(StringInfo s)
1409 : : {
1410 : : /*
1411 : : * ORIGIN message can only come inside streaming transaction or inside
1412 : : * remote transaction and before any actual writes.
1413 : : */
1319 akapila@postgresql.o 1414 [ + + ]: 8 : if (!in_streamed_transaction &&
1415 [ + - - + ]: 12 : (!in_remote_transaction ||
1416 [ - - ]: 6 : (IsTransactionState() && !am_tablesync_worker())))
2642 peter_e@gmx.net 1417 [ # # ]:UBC 0 : ereport(ERROR,
1418 : : (errcode(ERRCODE_PROTOCOL_VIOLATION),
1419 : : errmsg_internal("ORIGIN message sent out of order")));
2642 peter_e@gmx.net 1420 :CBC 8 : }
1421 : :
1422 : : /*
1423 : : * Initialize fileset (if not already done).
1424 : : *
1425 : : * Create a new file when first_segment is true, otherwise open the existing
1426 : : * file.
1427 : : */
1428 : : void
461 akapila@postgresql.o 1429 : 384 : stream_start_internal(TransactionId xid, bool first_segment)
1430 : : {
1431 : 384 : begin_replication_step();
1432 : :
1433 : : /*
1434 : : * Initialize the worker's stream_fileset if we haven't yet. This will be
1435 : : * used for the entire duration of the worker so create it in a permanent
1436 : : * context. We create this on the very first streaming message from any
1437 : : * transaction and then use it for this and other streaming transactions.
1438 : : * Now, we could create a fileset at the start of the worker as well but
1439 : : * then we won't be sure that it will ever be used.
1440 : : */
1441 [ + + ]: 384 : if (!MyLogicalRepWorker->stream_fileset)
1442 : : {
1443 : : MemoryContext oldctx;
1444 : :
1445 : 15 : oldctx = MemoryContextSwitchTo(ApplyContext);
1446 : :
1447 : 15 : MyLogicalRepWorker->stream_fileset = palloc(sizeof(FileSet));
1448 : 15 : FileSetInit(MyLogicalRepWorker->stream_fileset);
1449 : :
1450 : 15 : MemoryContextSwitchTo(oldctx);
1451 : : }
1452 : :
1453 : : /* Open the spool file for this transaction. */
1454 : 384 : stream_open_file(MyLogicalRepWorker->subid, xid, first_segment);
1455 : :
1456 : : /* If this is not the first segment, open existing subxact file. */
1457 [ + + ]: 383 : if (!first_segment)
1458 : 347 : subxact_info_read(MyLogicalRepWorker->subid, xid);
1459 : :
1460 : 383 : end_replication_step();
1461 : 383 : }
1462 : :
1463 : : /*
1464 : : * Handle STREAM START message.
1465 : : */
1466 : : static void
1319 1467 : 916 : apply_handle_stream_start(StringInfo s)
1468 : : {
1469 : : bool first_segment;
1470 : : ParallelApplyWorkerInfo *winfo;
1471 : : TransApplyAction apply_action;
1472 : :
1473 : : /* Save the message before it is consumed. */
461 1474 : 916 : StringInfoData original_msg = *s;
1475 : :
1037 tgl@sss.pgh.pa.us 1476 [ - + ]: 916 : if (in_streamed_transaction)
1037 tgl@sss.pgh.pa.us 1477 [ # # ]:UBC 0 : ereport(ERROR,
1478 : : (errcode(ERRCODE_PROTOCOL_VIOLATION),
1479 : : errmsg_internal("duplicate STREAM START message")));
1480 : :
1481 : : /* There must not be an active streaming transaction. */
453 akapila@postgresql.o 1482 [ - + ]:CBC 916 : Assert(!TransactionIdIsValid(stream_xid));
1483 : :
1484 : : /* notify handle methods we're processing a remote transaction */
1319 1485 : 916 : in_streamed_transaction = true;
1486 : :
1487 : : /* extract XID of the top-level transaction */
1488 : 916 : stream_xid = logicalrep_read_stream_start(s, &first_segment);
1489 : :
1037 tgl@sss.pgh.pa.us 1490 [ - + ]: 916 : if (!TransactionIdIsValid(stream_xid))
1037 tgl@sss.pgh.pa.us 1491 [ # # ]:UBC 0 : ereport(ERROR,
1492 : : (errcode(ERRCODE_PROTOCOL_VIOLATION),
1493 : : errmsg_internal("invalid transaction ID in streamed replication transaction")));
1494 : :
768 akapila@postgresql.o 1495 :CBC 916 : set_apply_error_context_xact(stream_xid, InvalidXLogRecPtr);
1496 : :
1497 : : /* Try to allocate a worker for the streaming transaction. */
461 1498 [ + + ]: 916 : if (first_segment)
1499 : 95 : pa_allocate_worker(stream_xid);
1500 : :
1501 : 916 : apply_action = get_transaction_apply_action(stream_xid, &winfo);
1502 : :
1503 [ + + + + : 916 : switch (apply_action)
- ]
1504 : : {
1505 : 364 : case TRANS_LEADER_SERIALIZE:
1506 : :
1507 : : /*
1508 : : * Function stream_start_internal starts a transaction. This
1509 : : * transaction will be committed on the stream stop unless it is a
1510 : : * tablesync worker in which case it will be committed after
1511 : : * processing all the messages. We need this transaction for
1512 : : * handling the BufFile, used for serializing the streaming data
1513 : : * and subxact info.
1514 : : */
1515 : 364 : stream_start_internal(stream_xid, first_segment);
1516 : 363 : break;
1517 : :
1518 : 270 : case TRANS_LEADER_SEND_TO_PARALLEL:
1519 [ - + ]: 270 : Assert(winfo);
1520 : :
1521 : : /*
1522 : : * Once we start serializing the changes, the parallel apply
1523 : : * worker will wait for the leader to release the stream lock
1524 : : * until the end of the transaction. So, we don't need to release
1525 : : * the lock or increment the stream count in that case.
1526 : : */
1527 [ + + ]: 270 : if (pa_send_data(winfo, s->len, s->data))
1528 : : {
1529 : : /*
1530 : : * Unlock the shared object lock so that the parallel apply
1531 : : * worker can continue to receive changes.
1532 : : */
1533 [ + + ]: 266 : if (!first_segment)
1534 : 239 : pa_unlock_stream(winfo->shared->xid, AccessExclusiveLock);
1535 : :
1536 : : /*
1537 : : * Increment the number of streaming blocks waiting to be
1538 : : * processed by parallel apply worker.
1539 : : */
1540 : 266 : pg_atomic_add_fetch_u32(&winfo->shared->pending_stream_count, 1);
1541 : :
1542 : : /* Cache the parallel apply worker for this transaction. */
1543 : 266 : pa_set_stream_apply_worker(winfo);
1544 : 266 : break;
1545 : : }
1546 : :
1547 : : /*
1548 : : * Switch to serialize mode when we are not able to send the
1549 : : * change to parallel apply worker.
1550 : : */
1551 : 4 : pa_switch_to_partial_serialize(winfo, !first_segment);
1552 : :
1553 : : /* fall through */
1554 : 15 : case TRANS_LEADER_PARTIAL_SERIALIZE:
1555 [ - + ]: 15 : Assert(winfo);
1556 : :
1557 : : /*
1558 : : * Open the spool file unless it was already opened when switching
1559 : : * to serialize mode. The transaction started in
1560 : : * stream_start_internal will be committed on the stream stop.
1561 : : */
1562 [ + + ]: 15 : if (apply_action != TRANS_LEADER_SEND_TO_PARALLEL)
1563 : 11 : stream_start_internal(stream_xid, first_segment);
1564 : :
1565 : 15 : stream_write_change(LOGICAL_REP_MSG_STREAM_START, &original_msg);
1566 : :
1567 : : /* Cache the parallel apply worker for this transaction. */
1568 : 15 : pa_set_stream_apply_worker(winfo);
1569 : 15 : break;
1570 : :
1571 : 271 : case TRANS_PARALLEL_APPLY:
1572 [ + + ]: 271 : if (first_segment)
1573 : : {
1574 : : /* Hold the lock until the end of the transaction. */
1575 : 31 : pa_lock_transaction(MyParallelShared->xid, AccessExclusiveLock);
1576 : 31 : pa_set_xact_state(MyParallelShared, PARALLEL_TRANS_STARTED);
1577 : :
1578 : : /*
1579 : : * Signal the leader apply worker, as it may be waiting for
1580 : : * us.
1581 : : */
1582 : 31 : logicalrep_worker_wakeup(MyLogicalRepWorker->subid, InvalidOid);
1583 : : }
1584 : :
1585 : 271 : parallel_stream_nchanges = 0;
1586 : 271 : break;
1587 : :
461 akapila@postgresql.o 1588 :UBC 0 : default:
453 1589 [ # # ]: 0 : elog(ERROR, "unexpected apply action: %d", (int) apply_action);
1590 : : break;
1591 : : }
1592 : :
461 akapila@postgresql.o 1593 :CBC 915 : pgstat_report_activity(STATE_RUNNING, NULL);
1319 1594 : 915 : }
1595 : :
1596 : : /*
1597 : : * Update the information about subxacts and close the file.
1598 : : *
1599 : : * This function should be called when the stream_start_internal function has
1600 : : * been called.
1601 : : */
1602 : : void
461 1603 : 383 : stream_stop_internal(TransactionId xid)
1604 : : {
1605 : : /*
1606 : : * Serialize information about subxacts for the toplevel transaction, then
1607 : : * close the stream messages spool file.
1608 : : */
1609 : 383 : subxact_info_write(MyLogicalRepWorker->subid, xid);
1319 1610 : 383 : stream_close_file();
1611 : :
1612 : : /* We must be in a valid transaction state */
1613 [ - + ]: 383 : Assert(IsTransactionState());
1614 : :
1615 : : /* Commit the per-stream transaction */
1157 1616 : 383 : CommitTransactionCommand();
1617 : :
1618 : : /* Reset per-stream context */
1319 1619 : 383 : MemoryContextReset(LogicalStreamingContext);
1620 : 383 : }
1621 : :
1622 : : /*
1623 : : * Handle STREAM STOP message.
1624 : : */
1625 : : static void
461 1626 : 914 : apply_handle_stream_stop(StringInfo s)
1627 : : {
1628 : : ParallelApplyWorkerInfo *winfo;
1629 : : TransApplyAction apply_action;
1630 : :
1631 [ - + ]: 914 : if (!in_streamed_transaction)
1037 tgl@sss.pgh.pa.us 1632 [ # # ]:UBC 0 : ereport(ERROR,
1633 : : (errcode(ERRCODE_PROTOCOL_VIOLATION),
1634 : : errmsg_internal("STREAM STOP message without STREAM START")));
1635 : :
461 akapila@postgresql.o 1636 :CBC 914 : apply_action = get_transaction_apply_action(stream_xid, &winfo);
1637 : :
1638 [ + + + + : 914 : switch (apply_action)
- ]
1639 : : {
1640 : 363 : case TRANS_LEADER_SERIALIZE:
1641 : 363 : stream_stop_internal(stream_xid);
1642 : 363 : break;
1643 : :
1644 : 266 : case TRANS_LEADER_SEND_TO_PARALLEL:
1645 [ - + ]: 266 : Assert(winfo);
1646 : :
1647 : : /*
1648 : : * Lock before sending the STREAM_STOP message so that the leader
1649 : : * can hold the lock first and the parallel apply worker will wait
1650 : : * for leader to release the lock. See Locking Considerations atop
1651 : : * applyparallelworker.c.
1652 : : */
1653 : 266 : pa_lock_stream(winfo->shared->xid, AccessExclusiveLock);
1654 : :
1655 [ + - ]: 266 : if (pa_send_data(winfo, s->len, s->data))
1656 : : {
1657 : 266 : pa_set_stream_apply_worker(NULL);
1658 : 266 : break;
1659 : : }
1660 : :
1661 : : /*
1662 : : * Switch to serialize mode when we are not able to send the
1663 : : * change to parallel apply worker.
1664 : : */
461 akapila@postgresql.o 1665 :UBC 0 : pa_switch_to_partial_serialize(winfo, true);
1666 : :
1667 : : /* fall through */
461 akapila@postgresql.o 1668 :CBC 15 : case TRANS_LEADER_PARTIAL_SERIALIZE:
1669 : 15 : stream_write_change(LOGICAL_REP_MSG_STREAM_STOP, s);
1670 : 15 : stream_stop_internal(stream_xid);
1671 : 15 : pa_set_stream_apply_worker(NULL);
1672 : 15 : break;
1673 : :
1674 : 270 : case TRANS_PARALLEL_APPLY:
1675 [ + + ]: 270 : elog(DEBUG1, "applied %u changes in the streaming chunk",
1676 : : parallel_stream_nchanges);
1677 : :
1678 : : /*
1679 : : * By the time parallel apply worker is processing the changes in
1680 : : * the current streaming block, the leader apply worker may have
1681 : : * sent multiple streaming blocks. This can lead to parallel apply
1682 : : * worker start waiting even when there are more chunk of streams
1683 : : * in the queue. So, try to lock only if there is no message left
1684 : : * in the queue. See Locking Considerations atop
1685 : : * applyparallelworker.c.
1686 : : *
1687 : : * Note that here we have a race condition where we can start
1688 : : * waiting even when there are pending streaming chunks. This can
1689 : : * happen if the leader sends another streaming block and acquires
1690 : : * the stream lock again after the parallel apply worker checks
1691 : : * that there is no pending streaming block and before it actually
1692 : : * starts waiting on a lock. We can handle this case by not
1693 : : * allowing the leader to increment the stream block count during
1694 : : * the time parallel apply worker acquires the lock but it is not
1695 : : * clear whether that is worth the complexity.
1696 : : *
1697 : : * Now, if this missed chunk contains rollback to savepoint, then
1698 : : * there is a risk of deadlock which probably shouldn't happen
1699 : : * after restart.
1700 : : */
1701 : 270 : pa_decr_and_wait_stream_block();
1702 : 268 : break;
1703 : :
461 akapila@postgresql.o 1704 :UBC 0 : default:
453 1705 [ # # ]: 0 : elog(ERROR, "unexpected apply action: %d", (int) apply_action);
1706 : : break;
1707 : : }
1708 : :
461 akapila@postgresql.o 1709 :CBC 912 : in_streamed_transaction = false;
453 1710 : 912 : stream_xid = InvalidTransactionId;
1711 : :
1712 : : /*
1713 : : * The parallel apply worker could be in a transaction in which case we
1714 : : * need to report the state as STATE_IDLEINTRANSACTION.
1715 : : */
461 1716 [ + + ]: 912 : if (IsTransactionOrTransactionBlock())
1717 : 268 : pgstat_report_activity(STATE_IDLEINTRANSACTION, NULL);
1718 : : else
1719 : 644 : pgstat_report_activity(STATE_IDLE, NULL);
1720 : :
1721 : 912 : reset_apply_error_context_info();
1722 : 912 : }
1723 : :
1724 : : /*
1725 : : * Helper function to handle STREAM ABORT message when the transaction was
1726 : : * serialized to file.
1727 : : */
1728 : : static void
1729 : 14 : stream_abort_internal(TransactionId xid, TransactionId subxid)
1730 : : {
1731 : : /*
1732 : : * If the two XIDs are the same, it's in fact abort of toplevel xact, so
1733 : : * just delete the files with serialized info.
1734 : : */
1319 1735 [ + + ]: 14 : if (xid == subxid)
1736 : 1 : stream_cleanup_files(MyLogicalRepWorker->subid, xid);
1737 : : else
1738 : : {
1739 : : /*
1740 : : * OK, so it's a subxact. We need to read the subxact file for the
1741 : : * toplevel transaction, determine the offset tracked for the subxact,
1742 : : * and truncate the file with changes. We also remove the subxacts
1743 : : * with higher offsets (or rather higher XIDs).
1744 : : *
1745 : : * We intentionally scan the array from the tail, because we're likely
1746 : : * aborting a change for the most recent subtransactions.
1747 : : *
1748 : : * We can't use the binary search here as subxact XIDs won't
1749 : : * necessarily arrive in sorted order, consider the case where we have
1750 : : * released the savepoint for multiple subtransactions and then
1751 : : * performed rollback to savepoint for one of the earlier
1752 : : * sub-transaction.
1753 : : */
1754 : : int64 i;
1755 : : int64 subidx;
1756 : : BufFile *fd;
1757 : 13 : bool found = false;
1758 : : char path[MAXPGPATH];
1759 : :
1760 : 13 : subidx = -1;
1039 tgl@sss.pgh.pa.us 1761 : 13 : begin_replication_step();
1319 akapila@postgresql.o 1762 : 13 : subxact_info_read(MyLogicalRepWorker->subid, xid);
1763 : :
1764 [ + + ]: 15 : for (i = subxact_data.nsubxacts; i > 0; i--)
1765 : : {
1766 [ + + ]: 11 : if (subxact_data.subxacts[i - 1].xid == subxid)
1767 : : {
1768 : 9 : subidx = (i - 1);
1769 : 9 : found = true;
1770 : 9 : break;
1771 : : }
1772 : : }
1773 : :
1774 : : /*
1775 : : * If it's an empty sub-transaction then we will not find the subxid
1776 : : * here so just cleanup the subxact info and return.
1777 : : */
1778 [ + + ]: 13 : if (!found)
1779 : : {
1780 : : /* Cleanup the subxact info */
1781 : 4 : cleanup_subxact_info();
1039 tgl@sss.pgh.pa.us 1782 : 4 : end_replication_step();
1157 akapila@postgresql.o 1783 : 4 : CommitTransactionCommand();
1319 1784 : 4 : return;
1785 : : }
1786 : :
1787 : : /* open the changes file */
461 1788 : 9 : changes_filename(path, MyLogicalRepWorker->subid, xid);
1789 : 9 : fd = BufFileOpenFileSet(MyLogicalRepWorker->stream_fileset, path,
1790 : : O_RDWR, false);
1791 : :
1792 : : /* OK, truncate the file at the right offset */
1793 : 9 : BufFileTruncateFileSet(fd, subxact_data.subxacts[subidx].fileno,
1794 : 9 : subxact_data.subxacts[subidx].offset);
1795 : 9 : BufFileClose(fd);
1796 : :
1797 : : /* discard the subxacts added later */
1798 : 9 : subxact_data.nsubxacts = subidx;
1799 : :
1800 : : /* write the updated subxact list */
1801 : 9 : subxact_info_write(MyLogicalRepWorker->subid, xid);
1802 : :
1803 : 9 : end_replication_step();
1804 : 9 : CommitTransactionCommand();
1805 : : }
1806 : : }
1807 : :
1808 : : /*
1809 : : * Handle STREAM ABORT message.
1810 : : */
1811 : : static void
1812 : 38 : apply_handle_stream_abort(StringInfo s)
1813 : : {
1814 : : TransactionId xid;
1815 : : TransactionId subxid;
1816 : : LogicalRepStreamAbortData abort_data;
1817 : : ParallelApplyWorkerInfo *winfo;
1818 : : TransApplyAction apply_action;
1819 : :
1820 : : /* Save the message before it is consumed. */
1821 : 38 : StringInfoData original_msg = *s;
1822 : : bool toplevel_xact;
1823 : :
1824 [ - + ]: 38 : if (in_streamed_transaction)
461 akapila@postgresql.o 1825 [ # # ]:UBC 0 : ereport(ERROR,
1826 : : (errcode(ERRCODE_PROTOCOL_VIOLATION),
1827 : : errmsg_internal("STREAM ABORT message without STREAM STOP")));
1828 : :
1829 : : /* We receive abort information only when we can apply in parallel. */
461 akapila@postgresql.o 1830 :CBC 38 : logicalrep_read_stream_abort(s, &abort_data,
1831 : 38 : MyLogicalRepWorker->parallel_apply);
1832 : :
1833 : 38 : xid = abort_data.xid;
1834 : 38 : subxid = abort_data.subxid;
1835 : 38 : toplevel_xact = (xid == subxid);
1836 : :
1837 : 38 : set_apply_error_context_xact(subxid, abort_data.abort_lsn);
1838 : :
1839 : 38 : apply_action = get_transaction_apply_action(xid, &winfo);
1840 : :
1841 [ + + + + : 38 : switch (apply_action)
- ]
1842 : : {
453 1843 : 14 : case TRANS_LEADER_APPLY:
1844 : :
1845 : : /*
1846 : : * We are in the leader apply worker and the transaction has been
1847 : : * serialized to file.
1848 : : */
461 1849 : 14 : stream_abort_internal(xid, subxid);
1850 : :
1851 [ - + ]: 14 : elog(DEBUG1, "finished processing the STREAM ABORT command");
1852 : 14 : break;
1853 : :
1854 : 10 : case TRANS_LEADER_SEND_TO_PARALLEL:
1855 [ - + ]: 10 : Assert(winfo);
1856 : :
1857 : : /*
1858 : : * For the case of aborting the subtransaction, we increment the
1859 : : * number of streaming blocks and take the lock again before
1860 : : * sending the STREAM_ABORT to ensure that the parallel apply
1861 : : * worker will wait on the lock for the next set of changes after
1862 : : * processing the STREAM_ABORT message if it is not already
1863 : : * waiting for STREAM_STOP message.
1864 : : *
1865 : : * It is important to perform this locking before sending the
1866 : : * STREAM_ABORT message so that the leader can hold the lock first
1867 : : * and the parallel apply worker will wait for the leader to
1868 : : * release the lock. This is the same as what we do in
1869 : : * apply_handle_stream_stop. See Locking Considerations atop
1870 : : * applyparallelworker.c.
1871 : : */
1872 [ + + ]: 10 : if (!toplevel_xact)
1873 : : {
1874 : 9 : pa_unlock_stream(xid, AccessExclusiveLock);
1875 : 9 : pg_atomic_add_fetch_u32(&winfo->shared->pending_stream_count, 1);
1876 : 9 : pa_lock_stream(xid, AccessExclusiveLock);
1877 : : }
1878 : :
1879 [ + - ]: 10 : if (pa_send_data(winfo, s->len, s->data))
1880 : : {
1881 : : /*
1882 : : * Unlike STREAM_COMMIT and STREAM_PREPARE, we don't need to
1883 : : * wait here for the parallel apply worker to finish as that
1884 : : * is not required to maintain the commit order and won't have
1885 : : * the risk of failures due to transaction dependencies and
1886 : : * deadlocks. However, it is possible that before the parallel
1887 : : * worker finishes and we clear the worker info, the xid
1888 : : * wraparound happens on the upstream and a new transaction
1889 : : * with the same xid can appear and that can lead to duplicate
1890 : : * entries in ParallelApplyTxnHash. Yet another problem could
1891 : : * be that we may have serialized the changes in partial
1892 : : * serialize mode and the file containing xact changes may
1893 : : * already exist, and after xid wraparound trying to create
1894 : : * the file for the same xid can lead to an error. To avoid
1895 : : * these problems, we decide to wait for the aborts to finish.
1896 : : *
1897 : : * Note, it is okay to not update the flush location position
1898 : : * for aborts as in worst case that means such a transaction
1899 : : * won't be sent again after restart.
1900 : : */
1901 [ + + ]: 10 : if (toplevel_xact)
1902 : 1 : pa_xact_finish(winfo, InvalidXLogRecPtr);
1903 : :
1904 : 10 : break;
1905 : : }
1906 : :
1907 : : /*
1908 : : * Switch to serialize mode when we are not able to send the
1909 : : * change to parallel apply worker.
1910 : : */
461 akapila@postgresql.o 1911 :UBC 0 : pa_switch_to_partial_serialize(winfo, true);
1912 : :
1913 : : /* fall through */
461 akapila@postgresql.o 1914 :CBC 2 : case TRANS_LEADER_PARTIAL_SERIALIZE:
1915 [ - + ]: 2 : Assert(winfo);
1916 : :
1917 : : /*
1918 : : * Parallel apply worker might have applied some changes, so write
1919 : : * the STREAM_ABORT message so that it can rollback the
1920 : : * subtransaction if needed.
1921 : : */
1922 : 2 : stream_open_and_write_change(xid, LOGICAL_REP_MSG_STREAM_ABORT,
1923 : : &original_msg);
1924 : :
1925 [ + + ]: 2 : if (toplevel_xact)
1926 : : {
1927 : 1 : pa_set_fileset_state(winfo->shared, FS_SERIALIZE_DONE);
1928 : 1 : pa_xact_finish(winfo, InvalidXLogRecPtr);
1929 : : }
1930 : 2 : break;
1931 : :
1932 : 12 : case TRANS_PARALLEL_APPLY:
1933 : :
1934 : : /*
1935 : : * If the parallel apply worker is applying spooled messages then
1936 : : * close the file before aborting.
1937 : : */
1938 [ + + + + ]: 12 : if (toplevel_xact && stream_fd)
1939 : 1 : stream_close_file();
1940 : :
1941 : 12 : pa_stream_abort(&abort_data);
1942 : :
1943 : : /*
1944 : : * We need to wait after processing rollback to savepoint for the
1945 : : * next set of changes.
1946 : : *
1947 : : * We have a race condition here due to which we can start waiting
1948 : : * here when there are more chunk of streams in the queue. See
1949 : : * apply_handle_stream_stop.
1950 : : */
1951 [ + + ]: 12 : if (!toplevel_xact)
1952 : 10 : pa_decr_and_wait_stream_block();
1953 : :
1954 [ + + ]: 12 : elog(DEBUG1, "finished processing the STREAM ABORT command");
1955 : 12 : break;
1956 : :
461 akapila@postgresql.o 1957 :UBC 0 : default:
453 1958 [ # # ]: 0 : elog(ERROR, "unexpected apply action: %d", (int) apply_action);
1959 : : break;
1960 : : }
1961 : :
461 akapila@postgresql.o 1962 :CBC 38 : reset_apply_error_context_info();
1963 : 38 : }
1964 : :
1965 : : /*
1966 : : * Ensure that the passed location is fileset's end.
1967 : : */
1968 : : static void
1969 : 4 : ensure_last_message(FileSet *stream_fileset, TransactionId xid, int fileno,
1970 : : off_t offset)
1971 : : {
1972 : : char path[MAXPGPATH];
1973 : : BufFile *fd;
1974 : : int last_fileno;
1975 : : off_t last_offset;
1976 : :
1977 [ - + ]: 4 : Assert(!IsTransactionState());
1978 : :
1979 : 4 : begin_replication_step();
1980 : :
1981 : 4 : changes_filename(path, MyLogicalRepWorker->subid, xid);
1982 : :
1983 : 4 : fd = BufFileOpenFileSet(stream_fileset, path, O_RDONLY, false);
1984 : :
1985 : 4 : BufFileSeek(fd, 0, 0, SEEK_END);
1986 : 4 : BufFileTell(fd, &last_fileno, &last_offset);
1987 : :
1988 : 4 : BufFileClose(fd);
1989 : :
1990 : 4 : end_replication_step();
1991 : :
1992 [ + - - + ]: 4 : if (last_fileno != fileno || last_offset != offset)
461 akapila@postgresql.o 1993 [ # # ]:UBC 0 : elog(ERROR, "unexpected message left in streaming transaction's changes file \"%s\"",
1994 : : path);
1319 akapila@postgresql.o 1995 :CBC 4 : }
1996 : :
1997 : : /*
1998 : : * Common spoolfile processing.
1999 : : */
2000 : : void
461 2001 : 35 : apply_spooled_messages(FileSet *stream_fileset, TransactionId xid,
2002 : : XLogRecPtr lsn)
2003 : : {
2004 : : int nchanges;
2005 : : char path[MAXPGPATH];
1319 2006 : 35 : char *buffer = NULL;
2007 : : MemoryContext oldcxt;
2008 : : ResourceOwner oldowner;
2009 : : int fileno;
2010 : : off_t offset;
2011 : :
461 2012 [ + + ]: 35 : if (!am_parallel_apply_worker())
2013 : 31 : maybe_start_skipping_changes(lsn);
2014 : :
2015 : : /* Make sure we have an open transaction */
1039 tgl@sss.pgh.pa.us 2016 : 35 : begin_replication_step();
2017 : :
2018 : : /*
2019 : : * Allocate file handle and memory required to process all the messages in
2020 : : * TopTransactionContext to avoid them getting reset after each message is
2021 : : * processed.
2022 : : */
1319 akapila@postgresql.o 2023 : 35 : oldcxt = MemoryContextSwitchTo(TopTransactionContext);
2024 : :
2025 : : /* Open the spool file for the committed/prepared transaction */
2026 : 35 : changes_filename(path, MyLogicalRepWorker->subid, xid);
2027 [ - + ]: 35 : elog(DEBUG1, "replaying changes from file \"%s\"", path);
2028 : :
2029 : : /*
2030 : : * Make sure the file is owned by the toplevel transaction so that the
2031 : : * file will not be accidentally closed when aborting a subtransaction.
2032 : : */
461 2033 : 35 : oldowner = CurrentResourceOwner;
2034 : 35 : CurrentResourceOwner = TopTransactionResourceOwner;
2035 : :
2036 : 35 : stream_fd = BufFileOpenFileSet(stream_fileset, path, O_RDONLY, false);
2037 : :
2038 : 35 : CurrentResourceOwner = oldowner;
2039 : :
1319 2040 : 35 : buffer = palloc(BLCKSZ);
2041 : :
2042 : 35 : MemoryContextSwitchTo(oldcxt);
2043 : :
990 2044 : 35 : remote_final_lsn = lsn;
2045 : :
2046 : : /*
2047 : : * Make sure the handle apply_dispatch methods are aware we're in a remote
2048 : : * transaction.
2049 : : */
1319 2050 : 35 : in_remote_transaction = true;
2051 : 35 : pgstat_report_activity(STATE_RUNNING, NULL);
2052 : :
1039 tgl@sss.pgh.pa.us 2053 : 35 : end_replication_step();
2054 : :
2055 : : /*
2056 : : * Read the entries one by one and pass them through the same logic as in
2057 : : * apply_dispatch.
2058 : : */
1319 akapila@postgresql.o 2059 : 35 : nchanges = 0;
2060 : : while (true)
2061 : 88495 : {
2062 : : StringInfoData s2;
2063 : : size_t nbytes;
2064 : : int len;
2065 : :
2066 [ - + ]: 88530 : CHECK_FOR_INTERRUPTS();
2067 : :
2068 : : /* read length of the on-disk record */
454 peter@eisentraut.org 2069 : 88530 : nbytes = BufFileReadMaybeEOF(stream_fd, &len, sizeof(len), true);
2070 : :
2071 : : /* have we reached end of the file? */
1319 akapila@postgresql.o 2072 [ + + ]: 88530 : if (nbytes == 0)
2073 : 30 : break;
2074 : :
2075 : : /* do we have a correct length? */
1037 tgl@sss.pgh.pa.us 2076 [ - + ]: 88500 : if (len <= 0)
1037 tgl@sss.pgh.pa.us 2077 [ # # ]:UBC 0 : elog(ERROR, "incorrect length %d in streaming transaction's changes file \"%s\"",
2078 : : len, path);
2079 : :
2080 : : /* make sure we have sufficiently large buffer */
1319 akapila@postgresql.o 2081 :CBC 88500 : buffer = repalloc(buffer, len);
2082 : :
2083 : : /* and finally read the data into the buffer */
454 peter@eisentraut.org 2084 : 88500 : BufFileReadExact(stream_fd, buffer, len);
2085 : :
461 akapila@postgresql.o 2086 : 88500 : BufFileTell(stream_fd, &fileno, &offset);
2087 : :
2088 : : /* init a stringinfo using the buffer and call apply_dispatch */
159 drowley@postgresql.o 2089 :GNC 88500 : initReadOnlyStringInfo(&s2, buffer, len);
2090 : :
2091 : : /* Ensure we are reading the data into our memory context. */
1319 akapila@postgresql.o 2092 :CBC 88500 : oldcxt = MemoryContextSwitchTo(ApplyMessageContext);
2093 : :
2094 : 88500 : apply_dispatch(&s2);
2095 : :
2096 : 88499 : MemoryContextReset(ApplyMessageContext);
2097 : :
2098 : 88499 : MemoryContextSwitchTo(oldcxt);
2099 : :
2100 : 88499 : nchanges++;
2101 : :
2102 : : /*
2103 : : * It is possible the file has been closed because we have processed
2104 : : * the transaction end message like stream_commit in which case that
2105 : : * must be the last message.
2106 : : */
461 2107 [ + + ]: 88499 : if (!stream_fd)
2108 : : {
2109 : 4 : ensure_last_message(stream_fileset, xid, fileno, offset);
2110 : 4 : break;
2111 : : }
2112 : :
1319 2113 [ + + ]: 88495 : if (nchanges % 1000 == 0)
1037 tgl@sss.pgh.pa.us 2114 [ - + ]: 84 : elog(DEBUG1, "replayed %d changes from file \"%s\"",
2115 : : nchanges, path);
2116 : : }
2117 : :
461 akapila@postgresql.o 2118 [ + + ]: 34 : if (stream_fd)
2119 : 30 : stream_close_file();
2120 : :
1319 2121 [ - + ]: 34 : elog(DEBUG1, "replayed %d (all) changes from file \"%s\"",
2122 : : nchanges, path);
2123 : :
990 2124 : 34 : return;
2125 : : }
2126 : :
2127 : : /*
2128 : : * Handle STREAM COMMIT message.
2129 : : */
2130 : : static void
2131 : 64 : apply_handle_stream_commit(StringInfo s)
2132 : : {
2133 : : TransactionId xid;
2134 : : LogicalRepCommitData commit_data;
2135 : : ParallelApplyWorkerInfo *winfo;
2136 : : TransApplyAction apply_action;
2137 : :
2138 : : /* Save the message before it is consumed. */
461 2139 : 64 : StringInfoData original_msg = *s;
2140 : :
990 2141 [ - + ]: 64 : if (in_streamed_transaction)
990 akapila@postgresql.o 2142 [ # # ]:UBC 0 : ereport(ERROR,
2143 : : (errcode(ERRCODE_PROTOCOL_VIOLATION),
2144 : : errmsg_internal("STREAM COMMIT message without STREAM STOP")));
2145 : :
990 akapila@postgresql.o 2146 :CBC 64 : xid = logicalrep_read_stream_commit(s, &commit_data);
768 2147 : 64 : set_apply_error_context_xact(xid, commit_data.commit_lsn);
2148 : :
461 2149 : 64 : apply_action = get_transaction_apply_action(xid, &winfo);
2150 : :
2151 [ + + + + : 64 : switch (apply_action)
- ]
2152 : : {
453 2153 : 23 : case TRANS_LEADER_APPLY:
2154 : :
2155 : : /*
2156 : : * The transaction has been serialized to file, so replay all the
2157 : : * spooled operations.
2158 : : */
461 2159 : 23 : apply_spooled_messages(MyLogicalRepWorker->stream_fileset, xid,
2160 : : commit_data.commit_lsn);
2161 : :
2162 : 22 : apply_handle_commit_internal(&commit_data);
2163 : :
2164 : : /* Unlink the files with serialized changes and subxact info. */
2165 : 22 : stream_cleanup_files(MyLogicalRepWorker->subid, xid);
2166 : :
2167 [ - + ]: 22 : elog(DEBUG1, "finished processing the STREAM COMMIT command");
2168 : 22 : break;
2169 : :
2170 : 19 : case TRANS_LEADER_SEND_TO_PARALLEL:
2171 [ - + ]: 19 : Assert(winfo);
2172 : :
2173 [ + - ]: 19 : if (pa_send_data(winfo, s->len, s->data))
2174 : : {
2175 : : /* Finish processing the streaming transaction. */
2176 : 19 : pa_xact_finish(winfo, commit_data.end_lsn);
2177 : 18 : break;
2178 : : }
2179 : :
2180 : : /*
2181 : : * Switch to serialize mode when we are not able to send the
2182 : : * change to parallel apply worker.
2183 : : */
461 akapila@postgresql.o 2184 :UBC 0 : pa_switch_to_partial_serialize(winfo, true);
2185 : :
2186 : : /* fall through */
461 akapila@postgresql.o 2187 :CBC 2 : case TRANS_LEADER_PARTIAL_SERIALIZE:
2188 [ - + ]: 2 : Assert(winfo);
2189 : :
2190 : 2 : stream_open_and_write_change(xid, LOGICAL_REP_MSG_STREAM_COMMIT,
2191 : : &original_msg);
2192 : :
2193 : 2 : pa_set_fileset_state(winfo->shared, FS_SERIALIZE_DONE);
2194 : :
2195 : : /* Finish processing the streaming transaction. */
2196 : 2 : pa_xact_finish(winfo, commit_data.end_lsn);
2197 : 2 : break;
2198 : :
2199 : 20 : case TRANS_PARALLEL_APPLY:
2200 : :
2201 : : /*
2202 : : * If the parallel apply worker is applying spooled messages then
2203 : : * close the file before committing.
2204 : : */
2205 [ + + ]: 20 : if (stream_fd)
2206 : 2 : stream_close_file();
2207 : :
2208 : 20 : apply_handle_commit_internal(&commit_data);
2209 : :
2210 : 20 : MyParallelShared->last_commit_end = XactLastCommitEnd;
2211 : :
2212 : : /*
2213 : : * It is important to set the transaction state as finished before
2214 : : * releasing the lock. See pa_wait_for_xact_finish.
2215 : : */
2216 : 20 : pa_set_xact_state(MyParallelShared, PARALLEL_TRANS_FINISHED);
2217 : 20 : pa_unlock_transaction(xid, AccessExclusiveLock);
2218 : :
2219 : 20 : pa_reset_subtrans();
2220 : :
2221 [ + + ]: 20 : elog(DEBUG1, "finished processing the STREAM COMMIT command");
2222 : 20 : break;
2223 : :
461 akapila@postgresql.o 2224 :UBC 0 : default:
453 2225 [ # # ]: 0 : elog(ERROR, "unexpected apply action: %d", (int) apply_action);
2226 : : break;
2227 : : }
2228 : :
2229 : : /* Process any tables that are being synchronized in parallel. */
1234 akapila@postgresql.o 2230 :CBC 62 : process_syncing_tables(commit_data.end_lsn);
2231 : :
1319 2232 : 62 : pgstat_report_activity(STATE_IDLE, NULL);
2233 : :
961 2234 : 62 : reset_apply_error_context_info();
1319 2235 : 62 : }
2236 : :
2237 : : /*
2238 : : * Helper function for apply_handle_commit and apply_handle_stream_commit.
2239 : : */
2240 : : static void
989 2241 : 424 : apply_handle_commit_internal(LogicalRepCommitData *commit_data)
2242 : : {
754 2243 [ + + ]: 424 : if (is_skipping_changes())
2244 : : {
2245 : 2 : stop_skipping_changes();
2246 : :
2247 : : /*
2248 : : * Start a new transaction to clear the subskiplsn, if not started
2249 : : * yet.
2250 : : */
2251 [ + + ]: 2 : if (!IsTransactionState())
2252 : 1 : StartTransactionCommand();
2253 : : }
2254 : :
1157 2255 [ + - ]: 424 : if (IsTransactionState())
2256 : : {
2257 : : /*
2258 : : * The transaction is either non-empty or skipped, so we clear the
2259 : : * subskiplsn.
2260 : : */
754 2261 : 424 : clear_subscription_skip_lsn(commit_data->commit_lsn);
2262 : :
2263 : : /*
2264 : : * Update origin state so we can restart streaming from correct
2265 : : * position in case of crash.
2266 : : */
1234 2267 : 424 : replorigin_session_origin_lsn = commit_data->end_lsn;
2268 : 424 : replorigin_session_origin_timestamp = commit_data->committime;
2269 : :
2270 : 424 : CommitTransactionCommand();
2271 : :
461 2272 [ + + ]: 424 : if (IsTransactionBlock())
2273 : : {
2274 : 4 : EndTransactionBlock(false);
2275 : 4 : CommitTransactionCommand();
2276 : : }
2277 : :
1234 2278 : 424 : pgstat_report_stat(false);
2279 : :
461 2280 : 424 : store_flush_position(commit_data->end_lsn, XactLastCommitEnd);
2281 : : }
2282 : : else
2283 : : {
2284 : : /* Process any invalidation messages that might have accumulated. */
1234 akapila@postgresql.o 2285 :UBC 0 : AcceptInvalidationMessages();
2286 : 0 : maybe_reread_subscription();
2287 : : }
2288 : :
1234 akapila@postgresql.o 2289 :CBC 424 : in_remote_transaction = false;
2290 : 424 : }
2291 : :
2292 : : /*
2293 : : * Handle RELATION message.
2294 : : *
2295 : : * Note we don't do validation against local schema here. The validation
2296 : : * against local schema is postponed until first change for given relation
2297 : : * comes as we only care about it when applying changes for it anyway and we
2298 : : * do less locking this way.
2299 : : */
2300 : : static void
2642 peter_e@gmx.net 2301 : 440 : apply_handle_relation(StringInfo s)
2302 : : {
2303 : : LogicalRepRelation *rel;
2304 : :
1235 akapila@postgresql.o 2305 [ + + ]: 440 : if (handle_streamed_transaction(LOGICAL_REP_MSG_RELATION, s))
1319 2306 : 41 : return;
2307 : :
2642 peter_e@gmx.net 2308 : 399 : rel = logicalrep_read_rel(s);
2309 : 399 : logicalrep_relmap_update(rel);
2310 : :
2311 : : /* Also reset all entries in the partition map that refer to remoterel. */
668 akapila@postgresql.o 2312 : 399 : logicalrep_partmap_reset_relmap(rel);
2313 : : }
2314 : :
2315 : : /*
2316 : : * Handle TYPE message.
2317 : : *
2318 : : * This implementation pays no attention to TYPE messages; we expect the user
2319 : : * to have set things up so that the incoming data is acceptable to the input
2320 : : * functions for the locally subscribed tables. Hence, we just read and
2321 : : * discard the message.
2322 : : */
2323 : : static void
2642 peter_e@gmx.net 2324 : 18 : apply_handle_type(StringInfo s)
2325 : : {
2326 : : LogicalRepTyp typ;
2327 : :
1235 akapila@postgresql.o 2328 [ - + ]: 18 : if (handle_streamed_transaction(LOGICAL_REP_MSG_TYPE, s))
1319 akapila@postgresql.o 2329 :UBC 0 : return;
2330 : :
2642 peter_e@gmx.net 2331 :CBC 18 : logicalrep_read_typ(s, &typ);
2332 : : }
2333 : :
2334 : : /*
2335 : : * Check that we (the subscription owner) have sufficient privileges on the
2336 : : * target relation to perform the given operation.
2337 : : */
2338 : : static void
828 jdavis@postgresql.or 2339 : 220271 : TargetPrivilegesCheck(Relation rel, AclMode mode)
2340 : : {
2341 : : Oid relid;
2342 : : AclResult aclresult;
2343 : :
2344 : 220271 : relid = RelationGetRelid(rel);
2345 : 220271 : aclresult = pg_class_aclcheck(relid, GetUserId(), mode);
2346 [ + + ]: 220271 : if (aclresult != ACLCHECK_OK)
2347 : 9 : aclcheck_error(aclresult,
2348 : 9 : get_relkind_objtype(rel->rd_rel->relkind),
2349 : 9 : get_rel_name(relid));
2350 : :
2351 : : /*
2352 : : * We lack the infrastructure to honor RLS policies. It might be possible
2353 : : * to add such infrastructure here, but tablesync workers lack it, too, so
2354 : : * we don't bother. RLS does not ordinarily apply to TRUNCATE commands,
2355 : : * but it seems dangerous to replicate a TRUNCATE and then refuse to
2356 : : * replicate subsequent INSERTs, so we forbid all commands the same.
2357 : : */
2358 [ + + ]: 220262 : if (check_enable_rls(relid, InvalidOid, false) == RLS_ENABLED)
2359 [ + - ]: 3 : ereport(ERROR,
2360 : : (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
2361 : : errmsg("user \"%s\" cannot replicate into relation with row-level security enabled: \"%s\"",
2362 : : GetUserNameFromId(GetUserId(), true),
2363 : : RelationGetRelationName(rel))));
2364 : 220259 : }
2365 : :
2366 : : /*
2367 : : * Handle INSERT message.
2368 : : */
2369 : :
2370 : : static void
2642 peter_e@gmx.net 2371 : 188845 : apply_handle_insert(StringInfo s)
2372 : : {
2373 : : LogicalRepRelMapEntry *rel;
2374 : : LogicalRepTupleData newtup;
2375 : : LogicalRepRelId relid;
2376 : : UserContext ucxt;
2377 : : ApplyExecutionData *edata;
2378 : : EState *estate;
2379 : : TupleTableSlot *remoteslot;
2380 : : MemoryContext oldctx;
2381 : : bool run_as_owner;
2382 : :
2383 : : /*
2384 : : * Quick return if we are skipping data modification changes or handling
2385 : : * streamed transactions.
2386 : : */
754 akapila@postgresql.o 2387 [ + + + + ]: 367688 : if (is_skipping_changes() ||
2388 : 178843 : handle_streamed_transaction(LOGICAL_REP_MSG_INSERT, s))
1319 2389 : 113086 : return;
2390 : :
1039 tgl@sss.pgh.pa.us 2391 : 78819 : begin_replication_step();
2392 : :
2642 peter_e@gmx.net 2393 : 78817 : relid = logicalrep_read_insert(s, &newtup);
2394 : 78817 : rel = logicalrep_rel_open(relid, RowExclusiveLock);
2579 2395 [ + + ]: 78809 : if (!should_apply_changes_for_rel(rel))
2396 : : {
2397 : : /*
2398 : : * The relation can't become interesting in the middle of the
2399 : : * transaction so it's safe to unlock it.
2400 : : */
2401 : 3060 : logicalrep_rel_close(rel, RowExclusiveLock);
1039 tgl@sss.pgh.pa.us 2402 : 3060 : end_replication_step();
2579 peter_e@gmx.net 2403 : 3060 : return;
2404 : : }
2405 : :
2406 : : /*
2407 : : * Make sure that any user-supplied code runs as the table owner, unless
2408 : : * the user has opted out of that behavior.
2409 : : */
376 rhaas@postgresql.org 2410 : 75749 : run_as_owner = MySubscription->runasowner;
2411 [ + + ]: 75749 : if (!run_as_owner)
2412 : 75740 : SwitchToUntrustedUser(rel->localrel->rd_rel->relowner, &ucxt);
2413 : :
2414 : : /* Set relation for error callback */
961 akapila@postgresql.o 2415 : 75749 : apply_error_callback_arg.rel = rel;
2416 : :
2417 : : /* Initialize the executor state. */
1058 tgl@sss.pgh.pa.us 2418 : 75749 : edata = create_edata_for_relation(rel);
2419 : 75749 : estate = edata->estate;
2249 andres@anarazel.de 2420 : 75749 : remoteslot = ExecInitExtraTupleSlot(estate,
1977 2421 : 75749 : RelationGetDescr(rel->localrel),
2422 : : &TTSOpsVirtual);
2423 : :
2424 : : /* Process and store remote tuple in the slot */
2642 peter_e@gmx.net 2425 [ - + ]: 75749 : oldctx = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
1366 tgl@sss.pgh.pa.us 2426 : 75749 : slot_store_data(remoteslot, rel, &newtup);
2642 peter_e@gmx.net 2427 : 75749 : slot_fill_defaults(rel, estate, remoteslot);
2428 : 75749 : MemoryContextSwitchTo(oldctx);
2429 : :
2430 : : /* For a partitioned table, insert the tuple into a partition. */
1469 peter@eisentraut.org 2431 [ + + ]: 75749 : if (rel->localrel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
1058 tgl@sss.pgh.pa.us 2432 : 44 : apply_handle_tuple_routing(edata,
2433 : : remoteslot, NULL, CMD_INSERT);
2434 : : else
2435 : 75705 : apply_handle_insert_internal(edata, edata->targetRelInfo,
2436 : : remoteslot);
2437 : :
2438 : 75734 : finish_edata(edata);
2439 : :
2440 : : /* Reset relation for error callback */
961 akapila@postgresql.o 2441 : 75734 : apply_error_callback_arg.rel = NULL;
2442 : :
376 rhaas@postgresql.org 2443 [ + + ]: 75734 : if (!run_as_owner)
2444 : 75729 : RestoreUserContext(&ucxt);
2445 : :
2642 peter_e@gmx.net 2446 : 75734 : logicalrep_rel_close(rel, NoLock);
2447 : :
1039 tgl@sss.pgh.pa.us 2448 : 75734 : end_replication_step();
2449 : : }
2450 : :
2451 : : /*
2452 : : * Workhorse for apply_handle_insert()
2453 : : * relinfo is for the relation we're actually inserting into
2454 : : * (could be a child partition of edata->targetRelInfo)
2455 : : */
2456 : : static void
1058 2457 : 75750 : apply_handle_insert_internal(ApplyExecutionData *edata,
2458 : : ResultRelInfo *relinfo,
2459 : : TupleTableSlot *remoteslot)
2460 : : {
2461 : 75750 : EState *estate = edata->estate;
2462 : :
2463 : : /* We must open indexes here. */
1482 peter@eisentraut.org 2464 : 75750 : ExecOpenIndices(relinfo, false);
2465 : :
2466 : : /* Do the insert. */
828 jdavis@postgresql.or 2467 : 75750 : TargetPrivilegesCheck(relinfo->ri_RelationDesc, ACL_INSERT);
1278 heikki.linnakangas@i 2468 : 75743 : ExecSimpleRelationInsert(relinfo, estate, remoteslot);
2469 : :
2470 : : /* Cleanup. */
1482 peter@eisentraut.org 2471 : 75735 : ExecCloseIndices(relinfo);
2472 : 75735 : }
2473 : :
2474 : : /*
2475 : : * Check if the logical replication relation is updatable and throw
2476 : : * appropriate error if it isn't.
2477 : : */
2478 : : static void
2642 peter_e@gmx.net 2479 : 72279 : check_relation_updatable(LogicalRepRelMapEntry *rel)
2480 : : {
2481 : : /*
2482 : : * For partitioned tables, we only need to care if the target partition is
2483 : : * updatable (aka has PK or RI defined for it).
2484 : : */
663 akapila@postgresql.o 2485 [ + + ]: 72279 : if (rel->localrel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
2486 : 29 : return;
2487 : :
2488 : : /* Updatable, no error. */
2642 peter_e@gmx.net 2489 [ + - ]: 72250 : if (rel->updatable)
2490 : 72250 : return;
2491 : :
2492 : : /*
2493 : : * We are in error mode so it's fine this is somewhat slow. It's better to
2494 : : * give user correct error.
2495 : : */
2642 peter_e@gmx.net 2496 [ # # ]:UBC 0 : if (OidIsValid(GetRelationIdentityOrPK(rel->localrel)))
2497 : : {
2498 [ # # ]: 0 : ereport(ERROR,
2499 : : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
2500 : : errmsg("publisher did not send replica identity column "
2501 : : "expected by the logical replication target relation \"%s.%s\"",
2502 : : rel->remoterel.nspname, rel->remoterel.relname)));
2503 : : }
2504 : :
2505 [ # # ]: 0 : ereport(ERROR,
2506 : : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
2507 : : errmsg("logical replication target relation \"%s.%s\" has "
2508 : : "neither REPLICA IDENTITY index nor PRIMARY "
2509 : : "KEY and published relation does not have "
2510 : : "REPLICA IDENTITY FULL",
2511 : : rel->remoterel.nspname, rel->remoterel.relname)));
2512 : : }
2513 : :
2514 : : /*
2515 : : * Handle UPDATE message.
2516 : : *
2517 : : * TODO: FDW support
2518 : : */
2519 : : static void
2642 peter_e@gmx.net 2520 :CBC 66168 : apply_handle_update(StringInfo s)
2521 : : {
2522 : : LogicalRepRelMapEntry *rel;
2523 : : LogicalRepRelId relid;
2524 : : UserContext ucxt;
2525 : : ApplyExecutionData *edata;
2526 : : EState *estate;
2527 : : LogicalRepTupleData oldtup;
2528 : : LogicalRepTupleData newtup;
2529 : : bool has_oldtup;
2530 : : TupleTableSlot *remoteslot;
2531 : : RTEPermissionInfo *target_perminfo;
2532 : : MemoryContext oldctx;
2533 : : bool run_as_owner;
2534 : :
2535 : : /*
2536 : : * Quick return if we are skipping data modification changes or handling
2537 : : * streamed transactions.
2538 : : */
754 akapila@postgresql.o 2539 [ + - + + ]: 132336 : if (is_skipping_changes() ||
2540 : 66168 : handle_streamed_transaction(LOGICAL_REP_MSG_UPDATE, s))
1319 2541 : 34232 : return;
2542 : :
1039 tgl@sss.pgh.pa.us 2543 : 31936 : begin_replication_step();
2544 : :
2642 peter_e@gmx.net 2545 : 31935 : relid = logicalrep_read_update(s, &has_oldtup, &oldtup,
2546 : : &newtup);
2547 : 31935 : rel = logicalrep_rel_open(relid, RowExclusiveLock);
2579 2548 [ - + ]: 31935 : if (!should_apply_changes_for_rel(rel))
2549 : : {
2550 : : /*
2551 : : * The relation can't become interesting in the middle of the
2552 : : * transaction so it's safe to unlock it.
2553 : : */
2579 peter_e@gmx.net 2554 :UBC 0 : logicalrep_rel_close(rel, RowExclusiveLock);
1039 tgl@sss.pgh.pa.us 2555 : 0 : end_replication_step();
2579 peter_e@gmx.net 2556 : 0 : return;
2557 : : }
2558 : :
2559 : : /* Set relation for error callback */
961 akapila@postgresql.o 2560 :CBC 31935 : apply_error_callback_arg.rel = rel;
2561 : :
2562 : : /* Check if we can do the update. */
2642 peter_e@gmx.net 2563 : 31935 : check_relation_updatable(rel);
2564 : :
2565 : : /*
2566 : : * Make sure that any user-supplied code runs as the table owner, unless
2567 : : * the user has opted out of that behavior.
2568 : : */
376 rhaas@postgresql.org 2569 : 31935 : run_as_owner = MySubscription->runasowner;
2570 [ + + ]: 31935 : if (!run_as_owner)
2571 : 31931 : SwitchToUntrustedUser(rel->localrel->rd_rel->relowner, &ucxt);
2572 : :
2573 : : /* Initialize the executor state. */
1058 tgl@sss.pgh.pa.us 2574 : 31934 : edata = create_edata_for_relation(rel);
2575 : 31934 : estate = edata->estate;
2249 andres@anarazel.de 2576 : 31934 : remoteslot = ExecInitExtraTupleSlot(estate,
1977 2577 : 31934 : RelationGetDescr(rel->localrel),
2578 : : &TTSOpsVirtual);
2579 : :
2580 : : /*
2581 : : * Populate updatedCols so that per-column triggers can fire, and so
2582 : : * executor can correctly pass down indexUnchanged hint. This could
2583 : : * include more columns than were actually changed on the publisher
2584 : : * because the logical replication protocol doesn't contain that
2585 : : * information. But it would for example exclude columns that only exist
2586 : : * on the subscriber, since we are not touching those.
2587 : : */
495 alvherre@alvh.no-ip. 2588 : 31934 : target_perminfo = list_nth(estate->es_rteperminfos, 0);
1560 peter@eisentraut.org 2589 [ + + ]: 159319 : for (int i = 0; i < remoteslot->tts_tupleDescriptor->natts; i++)
2590 : : {
1364 tgl@sss.pgh.pa.us 2591 : 127385 : Form_pg_attribute att = TupleDescAttr(remoteslot->tts_tupleDescriptor, i);
2592 : 127385 : int remoteattnum = rel->attrmap->attnums[i];
2593 : :
2594 [ + + + + ]: 127385 : if (!att->attisdropped && remoteattnum >= 0)
2595 : : {
2596 [ - + ]: 68845 : Assert(remoteattnum < newtup.ncols);
2597 [ + + ]: 68845 : if (newtup.colstatus[remoteattnum] != LOGICALREP_COLUMN_UNCHANGED)
495 alvherre@alvh.no-ip. 2598 : 68842 : target_perminfo->updatedCols =
2599 : 68842 : bms_add_member(target_perminfo->updatedCols,
2600 : : i + 1 - FirstLowInvalidHeapAttributeNumber);
2601 : : }
2602 : : }
2603 : :
2604 : : /* Build the search tuple. */
2642 peter_e@gmx.net 2605 [ - + ]: 31934 : oldctx = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
1366 tgl@sss.pgh.pa.us 2606 : 31934 : slot_store_data(remoteslot, rel,
2607 [ + + ]: 31934 : has_oldtup ? &oldtup : &newtup);
2642 peter_e@gmx.net 2608 : 31934 : MemoryContextSwitchTo(oldctx);
2609 : :
2610 : : /* For a partitioned table, apply update to correct partition. */
1469 peter@eisentraut.org 2611 [ + + ]: 31934 : if (rel->localrel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
1058 tgl@sss.pgh.pa.us 2612 : 12 : apply_handle_tuple_routing(edata,
2613 : : remoteslot, &newtup, CMD_UPDATE);
2614 : : else
2615 : 31922 : apply_handle_update_internal(edata, edata->targetRelInfo,
2616 : : remoteslot, &newtup, rel->localindexoid);
2617 : :
2618 : 31929 : finish_edata(edata);
2619 : :
2620 : : /* Reset relation for error callback */
961 akapila@postgresql.o 2621 : 31929 : apply_error_callback_arg.rel = NULL;
2622 : :
376 rhaas@postgresql.org 2623 [ + + ]: 31929 : if (!run_as_owner)
2624 : 31927 : RestoreUserContext(&ucxt);
2625 : :
1482 peter@eisentraut.org 2626 : 31929 : logicalrep_rel_close(rel, NoLock);
2627 : :
1039 tgl@sss.pgh.pa.us 2628 : 31929 : end_replication_step();
2629 : : }
2630 : :
2631 : : /*
2632 : : * Workhorse for apply_handle_update()
2633 : : * relinfo is for the relation we're actually updating in
2634 : : * (could be a child partition of edata->targetRelInfo)
2635 : : */
2636 : : static void
1058 2637 : 31922 : apply_handle_update_internal(ApplyExecutionData *edata,
2638 : : ResultRelInfo *relinfo,
2639 : : TupleTableSlot *remoteslot,
2640 : : LogicalRepTupleData *newtup,
2641 : : Oid localindexoid)
2642 : : {
2643 : 31922 : EState *estate = edata->estate;
2644 : 31922 : LogicalRepRelMapEntry *relmapentry = edata->targetRel;
1482 peter@eisentraut.org 2645 : 31922 : Relation localrel = relinfo->ri_RelationDesc;
2646 : : EPQState epqstate;
2647 : : TupleTableSlot *localslot;
2648 : : bool found;
2649 : : MemoryContext oldctx;
2650 : :
331 tgl@sss.pgh.pa.us 2651 : 31922 : EvalPlanQualInit(&epqstate, estate, NULL, NIL, -1, NIL);
1482 peter@eisentraut.org 2652 : 31922 : ExecOpenIndices(relinfo, false);
2653 : :
264 msawada@postgresql.o 2654 : 31922 : found = FindReplTupleInLocalRel(edata, localrel,
2655 : : &relmapentry->remoterel,
2656 : : localindexoid,
2657 : : remoteslot, &localslot);
2642 peter_e@gmx.net 2658 : 31917 : ExecClearTuple(remoteslot);
2659 : :
2660 : : /*
2661 : : * Tuple found.
2662 : : *
2663 : : * Note this will fail if there are other conflicting unique indexes.
2664 : : */
2665 [ + + ]: 31917 : if (found)
2666 : : {
2667 : : /* Process and store remote tuple in the slot */
2668 [ + - ]: 31916 : oldctx = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
1366 tgl@sss.pgh.pa.us 2669 : 31916 : slot_modify_data(remoteslot, localslot, relmapentry, newtup);
2642 peter_e@gmx.net 2670 : 31916 : MemoryContextSwitchTo(oldctx);
2671 : :
2672 : 31916 : EvalPlanQualSetSlot(&epqstate, remoteslot);
2673 : :
2674 : : /* Do the actual update. */
828 jdavis@postgresql.or 2675 : 31916 : TargetPrivilegesCheck(relinfo->ri_RelationDesc, ACL_UPDATE);
1278 heikki.linnakangas@i 2676 : 31916 : ExecSimpleRelationUpdate(relinfo, estate, &epqstate, localslot,
2677 : : remoteslot);
2678 : : }
2679 : : else
2680 : : {
2681 : : /*
2682 : : * The tuple to be updated could not be found. Do nothing except for
2683 : : * emitting a log message.
2684 : : *
2685 : : * XXX should this be promoted to ereport(LOG) perhaps?
2686 : : */
2642 peter_e@gmx.net 2687 [ + - ]: 1 : elog(DEBUG1,
2688 : : "logical replication did not find row to be updated "
2689 : : "in replication target relation \"%s\"",
2690 : : RelationGetRelationName(localrel));
2691 : : }
2692 : :
2693 : : /* Cleanup. */
1482 peter@eisentraut.org 2694 : 31917 : ExecCloseIndices(relinfo);
2642 peter_e@gmx.net 2695 : 31917 : EvalPlanQualEnd(&epqstate);
2696 : 31917 : }
2697 : :
2698 : : /*
2699 : : * Handle DELETE message.
2700 : : *
2701 : : * TODO: FDW support
2702 : : */
2703 : : static void
2704 : 81940 : apply_handle_delete(StringInfo s)
2705 : : {
2706 : : LogicalRepRelMapEntry *rel;
2707 : : LogicalRepTupleData oldtup;
2708 : : LogicalRepRelId relid;
2709 : : UserContext ucxt;
2710 : : ApplyExecutionData *edata;
2711 : : EState *estate;
2712 : : TupleTableSlot *remoteslot;
2713 : : MemoryContext oldctx;
2714 : : bool run_as_owner;
2715 : :
2716 : : /*
2717 : : * Quick return if we are skipping data modification changes or handling
2718 : : * streamed transactions.
2719 : : */
754 akapila@postgresql.o 2720 [ + - + + ]: 163880 : if (is_skipping_changes() ||
2721 : 81940 : handle_streamed_transaction(LOGICAL_REP_MSG_DELETE, s))
1319 2722 : 41625 : return;
2723 : :
1039 tgl@sss.pgh.pa.us 2724 : 40315 : begin_replication_step();
2725 : :
2642 peter_e@gmx.net 2726 : 40315 : relid = logicalrep_read_delete(s, &oldtup);
2727 : 40315 : rel = logicalrep_rel_open(relid, RowExclusiveLock);
2579 2728 [ - + ]: 40315 : if (!should_apply_changes_for_rel(rel))
2729 : : {
2730 : : /*
2731 : : * The relation can't become interesting in the middle of the
2732 : : * transaction so it's safe to unlock it.
2733 : : */
2579 peter_e@gmx.net 2734 :UBC 0 : logicalrep_rel_close(rel, RowExclusiveLock);
1039 tgl@sss.pgh.pa.us 2735 : 0 : end_replication_step();
2579 peter_e@gmx.net 2736 : 0 : return;
2737 : : }
2738 : :
2739 : : /* Set relation for error callback */
961 akapila@postgresql.o 2740 :CBC 40315 : apply_error_callback_arg.rel = rel;
2741 : :
2742 : : /* Check if we can do the delete. */
2642 peter_e@gmx.net 2743 : 40315 : check_relation_updatable(rel);
2744 : :
2745 : : /*
2746 : : * Make sure that any user-supplied code runs as the table owner, unless
2747 : : * the user has opted out of that behavior.
2748 : : */
376 rhaas@postgresql.org 2749 : 40315 : run_as_owner = MySubscription->runasowner;
2750 [ + + ]: 40315 : if (!run_as_owner)
2751 : 40313 : SwitchToUntrustedUser(rel->localrel->rd_rel->relowner, &ucxt);
2752 : :
2753 : : /* Initialize the executor state. */
1058 tgl@sss.pgh.pa.us 2754 : 40315 : edata = create_edata_for_relation(rel);
2755 : 40315 : estate = edata->estate;
2249 andres@anarazel.de 2756 : 40315 : remoteslot = ExecInitExtraTupleSlot(estate,
1977 2757 : 40315 : RelationGetDescr(rel->localrel),
2758 : : &TTSOpsVirtual);
2759 : :
2760 : : /* Build the search tuple. */
2642 peter_e@gmx.net 2761 [ - + ]: 40315 : oldctx = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
1366 tgl@sss.pgh.pa.us 2762 : 40315 : slot_store_data(remoteslot, rel, &oldtup);
2642 peter_e@gmx.net 2763 : 40315 : MemoryContextSwitchTo(oldctx);
2764 : :
2765 : : /* For a partitioned table, apply delete to correct partition. */
1469 peter@eisentraut.org 2766 [ + + ]: 40315 : if (rel->localrel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
1058 tgl@sss.pgh.pa.us 2767 : 17 : apply_handle_tuple_routing(edata,
2768 : : remoteslot, NULL, CMD_DELETE);
2769 : : else
2770 : 40298 : apply_handle_delete_internal(edata, edata->targetRelInfo,
2771 : : remoteslot, rel->localindexoid);
2772 : :
2773 : 40315 : finish_edata(edata);
2774 : :
2775 : : /* Reset relation for error callback */
961 akapila@postgresql.o 2776 : 40315 : apply_error_callback_arg.rel = NULL;
2777 : :
376 rhaas@postgresql.org 2778 [ + + ]: 40315 : if (!run_as_owner)
2779 : 40313 : RestoreUserContext(&ucxt);
2780 : :
1482 peter@eisentraut.org 2781 : 40315 : logicalrep_rel_close(rel, NoLock);
2782 : :
1039 tgl@sss.pgh.pa.us 2783 : 40315 : end_replication_step();
2784 : : }
2785 : :
2786 : : /*
2787 : : * Workhorse for apply_handle_delete()
2788 : : * relinfo is for the relation we're actually deleting from
2789 : : * (could be a child partition of edata->targetRelInfo)
2790 : : */
2791 : : static void
1058 2792 : 40316 : apply_handle_delete_internal(ApplyExecutionData *edata,
2793 : : ResultRelInfo *relinfo,
2794 : : TupleTableSlot *remoteslot,
2795 : : Oid localindexoid)
2796 : : {
2797 : 40316 : EState *estate = edata->estate;
1482 peter@eisentraut.org 2798 : 40316 : Relation localrel = relinfo->ri_RelationDesc;
1058 tgl@sss.pgh.pa.us 2799 : 40316 : LogicalRepRelation *remoterel = &edata->targetRel->remoterel;
2800 : : EPQState epqstate;
2801 : : TupleTableSlot *localslot;
2802 : : bool found;
2803 : :
331 2804 : 40316 : EvalPlanQualInit(&epqstate, estate, NULL, NIL, -1, NIL);
1482 peter@eisentraut.org 2805 : 40316 : ExecOpenIndices(relinfo, false);
2806 : :
264 msawada@postgresql.o 2807 : 40316 : found = FindReplTupleInLocalRel(edata, localrel, remoterel, localindexoid,
2808 : : remoteslot, &localslot);
2809 : :
2810 : : /* If found delete it. */
2642 peter_e@gmx.net 2811 [ + + ]: 40316 : if (found)
2812 : : {
2813 : 40311 : EvalPlanQualSetSlot(&epqstate, localslot);
2814 : :
2815 : : /* Do the actual delete. */
828 jdavis@postgresql.or 2816 : 40311 : TargetPrivilegesCheck(relinfo->ri_RelationDesc, ACL_DELETE);
1278 heikki.linnakangas@i 2817 : 40311 : ExecSimpleRelationDelete(relinfo, estate, &epqstate, localslot);
2818 : : }
2819 : : else
2820 : : {
2821 : : /*
2822 : : * The tuple to be deleted could not be found. Do nothing except for
2823 : : * emitting a log message.
2824 : : *
2825 : : * XXX should this be promoted to ereport(LOG) perhaps?
2826 : : */
2133 peter_e@gmx.net 2827 [ + - ]: 5 : elog(DEBUG1,
2828 : : "logical replication did not find row to be deleted "
2829 : : "in replication target relation \"%s\"",
2830 : : RelationGetRelationName(localrel));
2831 : : }
2832 : :
2833 : : /* Cleanup. */
1482 peter@eisentraut.org 2834 : 40316 : ExecCloseIndices(relinfo);
2642 peter_e@gmx.net 2835 : 40316 : EvalPlanQualEnd(&epqstate);
2836 : 40316 : }
2837 : :
2838 : : /*
2839 : : * Try to find a tuple received from the publication side (in 'remoteslot') in
2840 : : * the corresponding local relation using either replica identity index,
2841 : : * primary key, index or if needed, sequential scan.
2842 : : *
2843 : : * Local tuple, if found, is returned in '*localslot'.
2844 : : */
2845 : : static bool
264 msawada@postgresql.o 2846 : 72250 : FindReplTupleInLocalRel(ApplyExecutionData *edata, Relation localrel,
2847 : : LogicalRepRelation *remoterel,
2848 : : Oid localidxoid,
2849 : : TupleTableSlot *remoteslot,
2850 : : TupleTableSlot **localslot)
2851 : : {
2852 : 72250 : EState *estate = edata->estate;
2853 : : bool found;
2854 : :
2855 : : /*
2856 : : * Regardless of the top-level operation, we're performing a read here, so
2857 : : * check for SELECT privileges.
2858 : : */
827 jdavis@postgresql.or 2859 : 72250 : TargetPrivilegesCheck(localrel, ACL_SELECT);
2860 : :
1474 peter@eisentraut.org 2861 : 72245 : *localslot = table_slot_create(localrel, &estate->es_tupleTable);
2862 : :
396 akapila@postgresql.o 2863 [ + + - + ]: 72245 : Assert(OidIsValid(localidxoid) ||
2864 : : (remoterel->replident == REPLICA_IDENTITY_FULL));
2865 : :
2866 [ + + ]: 72245 : if (OidIsValid(localidxoid))
2867 : : {
2868 : : #ifdef USE_ASSERT_CHECKING
264 msawada@postgresql.o 2869 : 72099 : Relation idxrel = index_open(localidxoid, AccessShareLock);
2870 : :
2871 : : /* Index must be PK, RI, or usable for REPLICA IDENTITY FULL tables */
2872 [ + - - + ]: 72099 : Assert(GetRelationIdentityOrPK(idxrel) == localidxoid ||
2873 : : IsIndexUsableForReplicaIdentityFull(BuildIndexInfo(idxrel),
2874 : : edata->targetRel->attrmap));
2875 : 72099 : index_close(idxrel, AccessShareLock);
2876 : : #endif
2877 : :
396 akapila@postgresql.o 2878 : 72099 : found = RelationFindReplTupleByIndex(localrel, localidxoid,
2879 : : LockTupleExclusive,
2880 : : remoteslot, *localslot);
2881 : : }
2882 : : else
1474 peter@eisentraut.org 2883 : 146 : found = RelationFindReplTupleSeq(localrel, LockTupleExclusive,
2884 : : remoteslot, *localslot);
2885 : :
2886 : 72245 : return found;
2887 : : }
2888 : :
2889 : : /*
2890 : : * This handles insert, update, delete on a partitioned table.
2891 : : */
2892 : : static void
1058 tgl@sss.pgh.pa.us 2893 : 73 : apply_handle_tuple_routing(ApplyExecutionData *edata,
2894 : : TupleTableSlot *remoteslot,
2895 : : LogicalRepTupleData *newtup,
2896 : : CmdType operation)
2897 : : {
2898 : 73 : EState *estate = edata->estate;
2899 : 73 : LogicalRepRelMapEntry *relmapentry = edata->targetRel;
2900 : 73 : ResultRelInfo *relinfo = edata->targetRelInfo;
1469 peter@eisentraut.org 2901 : 73 : Relation parentrel = relinfo->ri_RelationDesc;
2902 : : ModifyTableState *mtstate;
2903 : : PartitionTupleRouting *proute;
2904 : : ResultRelInfo *partrelinfo;
2905 : : Relation partrel;
2906 : : TupleTableSlot *remoteslot_part;
2907 : : TupleConversionMap *map;
2908 : : MemoryContext oldctx;
663 akapila@postgresql.o 2909 : 73 : LogicalRepRelMapEntry *part_entry = NULL;
2910 : 73 : AttrMap *attrmap = NULL;
2911 : :
2912 : : /* ModifyTableState is needed for ExecFindPartition(). */
1058 tgl@sss.pgh.pa.us 2913 : 73 : edata->mtstate = mtstate = makeNode(ModifyTableState);
1469 peter@eisentraut.org 2914 : 73 : mtstate->ps.plan = NULL;
2915 : 73 : mtstate->ps.state = estate;
2916 : 73 : mtstate->operation = operation;
2917 : 73 : mtstate->resultRelInfo = relinfo;
2918 : :
2919 : : /* ... as is PartitionTupleRouting. */
1058 tgl@sss.pgh.pa.us 2920 : 73 : edata->proute = proute = ExecSetupPartitionTupleRouting(estate, parentrel);
2921 : :
2922 : : /*
2923 : : * Find the partition to which the "search tuple" belongs.
2924 : : */
1469 peter@eisentraut.org 2925 [ - + ]: 73 : Assert(remoteslot != NULL);
2926 [ + - ]: 73 : oldctx = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
2927 : 73 : partrelinfo = ExecFindPartition(mtstate, relinfo, proute,
2928 : : remoteslot, estate);
2929 [ - + ]: 73 : Assert(partrelinfo != NULL);
2930 : 73 : partrel = partrelinfo->ri_RelationDesc;
2931 : :
2932 : : /*
2933 : : * Check for supported relkind. We need this since partitions might be of
2934 : : * unsupported relkinds; and the set of partitions can change, so checking
2935 : : * at CREATE/ALTER SUBSCRIPTION would be insufficient.
2936 : : */
529 tgl@sss.pgh.pa.us 2937 : 73 : CheckSubscriptionRelkind(partrel->rd_rel->relkind,
2938 : 73 : get_namespace_name(RelationGetNamespace(partrel)),
2939 : 73 : RelationGetRelationName(partrel));
2940 : :
2941 : : /*
2942 : : * To perform any of the operations below, the tuple must match the
2943 : : * partition's rowtype. Convert if needed or just copy, using a dedicated
2944 : : * slot to store the tuple in any case.
2945 : : */
1273 heikki.linnakangas@i 2946 : 73 : remoteslot_part = partrelinfo->ri_PartitionTupleSlot;
1469 peter@eisentraut.org 2947 [ + + ]: 73 : if (remoteslot_part == NULL)
2948 : 41 : remoteslot_part = table_slot_create(partrel, &estate->es_tupleTable);
499 alvherre@alvh.no-ip. 2949 : 73 : map = ExecGetRootToChildMap(partrelinfo, estate);
1469 peter@eisentraut.org 2950 [ + + ]: 73 : if (map != NULL)
2951 : : {
663 akapila@postgresql.o 2952 : 32 : attrmap = map->attrMap;
2953 : 32 : remoteslot_part = execute_attr_map_slot(attrmap, remoteslot,
2954 : : remoteslot_part);
2955 : : }
2956 : : else
2957 : : {
1469 peter@eisentraut.org 2958 : 41 : remoteslot_part = ExecCopySlot(remoteslot_part, remoteslot);
2959 : 41 : slot_getallattrs(remoteslot_part);
2960 : : }
2961 : 73 : MemoryContextSwitchTo(oldctx);
2962 : :
2963 : : /* Check if we can do the update or delete on the leaf partition. */
663 akapila@postgresql.o 2964 [ + + + + ]: 73 : if (operation == CMD_UPDATE || operation == CMD_DELETE)
2965 : : {
2966 : 29 : part_entry = logicalrep_partition_open(relmapentry, partrel,
2967 : : attrmap);
2968 : 29 : check_relation_updatable(part_entry);
2969 : : }
2970 : :
1469 peter@eisentraut.org 2971 [ + + + - ]: 73 : switch (operation)
2972 : : {
2973 : 44 : case CMD_INSERT:
1058 tgl@sss.pgh.pa.us 2974 : 44 : apply_handle_insert_internal(edata, partrelinfo,
2975 : : remoteslot_part);
1469 peter@eisentraut.org 2976 : 44 : break;
2977 : :
2978 : 17 : case CMD_DELETE:
1058 tgl@sss.pgh.pa.us 2979 : 17 : apply_handle_delete_internal(edata, partrelinfo,
2980 : : remoteslot_part,
2981 : : part_entry->localindexoid);
1469 peter@eisentraut.org 2982 : 17 : break;
2983 : :
2984 : 12 : case CMD_UPDATE:
2985 : :
2986 : : /*
2987 : : * For UPDATE, depending on whether or not the updated tuple
2988 : : * satisfies the partition's constraint, perform a simple UPDATE
2989 : : * of the partition or move the updated tuple into a different
2990 : : * suitable partition.
2991 : : */
2992 : : {
2993 : : TupleTableSlot *localslot;
2994 : : ResultRelInfo *partrelinfo_new;
2995 : : Relation partrel_new;
2996 : : bool found;
2997 : :
2998 : : /* Get the matching local tuple from the partition. */
264 msawada@postgresql.o 2999 : 12 : found = FindReplTupleInLocalRel(edata, partrel,
3000 : : &part_entry->remoterel,
3001 : : part_entry->localindexoid,
3002 : : remoteslot_part, &localslot);
1038 tgl@sss.pgh.pa.us 3003 [ + + ]: 12 : if (!found)
3004 : : {
3005 : : /*
3006 : : * The tuple to be updated could not be found. Do nothing
3007 : : * except for emitting a log message.
3008 : : *
3009 : : * XXX should this be promoted to ereport(LOG) perhaps?
3010 : : */
1469 peter@eisentraut.org 3011 [ + - ]: 2 : elog(DEBUG1,
3012 : : "logical replication did not find row to be updated "
3013 : : "in replication target relation's partition \"%s\"",
3014 : : RelationGetRelationName(partrel));
1038 tgl@sss.pgh.pa.us 3015 : 2 : return;
3016 : : }
3017 : :
3018 : : /*
3019 : : * Apply the update to the local tuple, putting the result in
3020 : : * remoteslot_part.
3021 : : */
3022 [ + - ]: 10 : oldctx = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
3023 : 10 : slot_modify_data(remoteslot_part, localslot, part_entry,
3024 : : newtup);
3025 : 10 : MemoryContextSwitchTo(oldctx);
3026 : :
3027 : : /*
3028 : : * Does the updated tuple still satisfy the current
3029 : : * partition's constraint?
3030 : : */
1306 3031 [ + - + + ]: 20 : if (!partrel->rd_rel->relispartition ||
1469 peter@eisentraut.org 3032 : 10 : ExecPartitionCheck(partrelinfo, remoteslot_part, estate,
3033 : : false))
3034 : 9 : {
3035 : : /*
3036 : : * Yes, so simply UPDATE the partition. We don't call
3037 : : * apply_handle_update_internal() here, which would
3038 : : * normally do the following work, to avoid repeating some
3039 : : * work already done above to find the local tuple in the
3040 : : * partition.
3041 : : */
3042 : : EPQState epqstate;
3043 : :
331 tgl@sss.pgh.pa.us 3044 : 9 : EvalPlanQualInit(&epqstate, estate, NULL, NIL, -1, NIL);
1469 peter@eisentraut.org 3045 : 9 : ExecOpenIndices(partrelinfo, false);
3046 : :
3047 : 9 : EvalPlanQualSetSlot(&epqstate, remoteslot_part);
828 jdavis@postgresql.or 3048 : 9 : TargetPrivilegesCheck(partrelinfo->ri_RelationDesc,
3049 : : ACL_UPDATE);
1278 heikki.linnakangas@i 3050 : 9 : ExecSimpleRelationUpdate(partrelinfo, estate, &epqstate,
3051 : : localslot, remoteslot_part);
1469 peter@eisentraut.org 3052 : 9 : ExecCloseIndices(partrelinfo);
3053 : 9 : EvalPlanQualEnd(&epqstate);
3054 : : }
3055 : : else
3056 : : {
3057 : : /* Move the tuple into the new partition. */
3058 : :
3059 : : /*
3060 : : * New partition will be found using tuple routing, which
3061 : : * can only occur via the parent table. We might need to
3062 : : * convert the tuple to the parent's rowtype. Note that
3063 : : * this is the tuple found in the partition, not the
3064 : : * original search tuple received by this function.
3065 : : */
3066 [ + - ]: 1 : if (map)
3067 : : {
3068 : : TupleConversionMap *PartitionToRootMap =
331 tgl@sss.pgh.pa.us 3069 : 1 : convert_tuples_by_name(RelationGetDescr(partrel),
3070 : : RelationGetDescr(parentrel));
3071 : :
3072 : : remoteslot =
1469 peter@eisentraut.org 3073 : 1 : execute_attr_map_slot(PartitionToRootMap->attrMap,
3074 : : remoteslot_part, remoteslot);
3075 : : }
3076 : : else
3077 : : {
1469 peter@eisentraut.org 3078 :UBC 0 : remoteslot = ExecCopySlot(remoteslot, remoteslot_part);
3079 : 0 : slot_getallattrs(remoteslot);
3080 : : }
3081 : :
3082 : : /* Find the new partition. */
1469 peter@eisentraut.org 3083 [ + - ]:CBC 1 : oldctx = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
3084 : 1 : partrelinfo_new = ExecFindPartition(mtstate, relinfo,
3085 : : proute, remoteslot,
3086 : : estate);
3087 : 1 : MemoryContextSwitchTo(oldctx);
3088 [ - + ]: 1 : Assert(partrelinfo_new != partrelinfo);
529 tgl@sss.pgh.pa.us 3089 : 1 : partrel_new = partrelinfo_new->ri_RelationDesc;
3090 : :
3091 : : /* Check that new partition also has supported relkind. */
3092 : 1 : CheckSubscriptionRelkind(partrel_new->rd_rel->relkind,
3093 : 1 : get_namespace_name(RelationGetNamespace(partrel_new)),
3094 : 1 : RelationGetRelationName(partrel_new));
3095 : :
3096 : : /* DELETE old tuple found in the old partition. */
1058 3097 : 1 : apply_handle_delete_internal(edata, partrelinfo,
3098 : : localslot,
3099 : : part_entry->localindexoid);
3100 : :
3101 : : /* INSERT new tuple into the new partition. */
3102 : :
3103 : : /*
3104 : : * Convert the replacement tuple to match the destination
3105 : : * partition rowtype.
3106 : : */
1469 peter@eisentraut.org 3107 [ + - ]: 1 : oldctx = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
1273 heikki.linnakangas@i 3108 : 1 : remoteslot_part = partrelinfo_new->ri_PartitionTupleSlot;
1469 peter@eisentraut.org 3109 [ + - ]: 1 : if (remoteslot_part == NULL)
529 tgl@sss.pgh.pa.us 3110 : 1 : remoteslot_part = table_slot_create(partrel_new,
3111 : : &estate->es_tupleTable);
499 alvherre@alvh.no-ip. 3112 : 1 : map = ExecGetRootToChildMap(partrelinfo_new, estate);
1469 peter@eisentraut.org 3113 [ - + ]: 1 : if (map != NULL)
3114 : : {
1469 peter@eisentraut.org 3115 :UBC 0 : remoteslot_part = execute_attr_map_slot(map->attrMap,
3116 : : remoteslot,
3117 : : remoteslot_part);
3118 : : }
3119 : : else
3120 : : {
1469 peter@eisentraut.org 3121 :CBC 1 : remoteslot_part = ExecCopySlot(remoteslot_part,
3122 : : remoteslot);
3123 : 1 : slot_getallattrs(remoteslot);
3124 : : }
3125 : 1 : MemoryContextSwitchTo(oldctx);
1058 tgl@sss.pgh.pa.us 3126 : 1 : apply_handle_insert_internal(edata, partrelinfo_new,
3127 : : remoteslot_part);
3128 : : }
3129 : : }
1469 peter@eisentraut.org 3130 : 10 : break;
3131 : :
1469 peter@eisentraut.org 3132 :UBC 0 : default:
3133 [ # # ]: 0 : elog(ERROR, "unrecognized CmdType: %d", (int) operation);
3134 : : break;
3135 : : }
3136 : : }
3137 : :
3138 : : /*
3139 : : * Handle TRUNCATE message.
3140 : : *
3141 : : * TODO: FDW support
3142 : : */
3143 : : static void
2199 peter_e@gmx.net 3144 :CBC 19 : apply_handle_truncate(StringInfo s)
3145 : : {
2180 tgl@sss.pgh.pa.us 3146 : 19 : bool cascade = false;
3147 : 19 : bool restart_seqs = false;
3148 : 19 : List *remote_relids = NIL;
3149 : 19 : List *remote_rels = NIL;
3150 : 19 : List *rels = NIL;
1469 peter@eisentraut.org 3151 : 19 : List *part_rels = NIL;
2180 tgl@sss.pgh.pa.us 3152 : 19 : List *relids = NIL;
3153 : 19 : List *relids_logged = NIL;
3154 : : ListCell *lc;
1059 3155 : 19 : LOCKMODE lockmode = AccessExclusiveLock;
3156 : :
3157 : : /*
3158 : : * Quick return if we are skipping data modification changes or handling
3159 : : * streamed transactions.
3160 : : */
754 akapila@postgresql.o 3161 [ + - - + ]: 38 : if (is_skipping_changes() ||
3162 : 19 : handle_streamed_transaction(LOGICAL_REP_MSG_TRUNCATE, s))
1319 akapila@postgresql.o 3163 :UBC 0 : return;
3164 : :
1039 tgl@sss.pgh.pa.us 3165 :CBC 19 : begin_replication_step();
3166 : :
2199 peter_e@gmx.net 3167 : 19 : remote_relids = logicalrep_read_truncate(s, &cascade, &restart_seqs);
3168 : :
3169 [ + - + + : 47 : foreach(lc, remote_relids)
+ + ]
3170 : : {
3171 : 28 : LogicalRepRelId relid = lfirst_oid(lc);
3172 : : LogicalRepRelMapEntry *rel;
3173 : :
1059 akapila@postgresql.o 3174 : 28 : rel = logicalrep_rel_open(relid, lockmode);
2199 peter_e@gmx.net 3175 [ - + ]: 28 : if (!should_apply_changes_for_rel(rel))
3176 : : {
3177 : : /*
3178 : : * The relation can't become interesting in the middle of the
3179 : : * transaction so it's safe to unlock it.
3180 : : */
1059 akapila@postgresql.o 3181 :UBC 0 : logicalrep_rel_close(rel, lockmode);
2199 peter_e@gmx.net 3182 : 0 : continue;
3183 : : }
3184 : :
2199 peter_e@gmx.net 3185 :CBC 28 : remote_rels = lappend(remote_rels, rel);
828 jdavis@postgresql.or 3186 : 28 : TargetPrivilegesCheck(rel->localrel, ACL_TRUNCATE);
2199 peter_e@gmx.net 3187 : 28 : rels = lappend(rels, rel->localrel);
3188 : 28 : relids = lappend_oid(relids, rel->localreloid);
3189 [ - + - - : 28 : if (RelationIsLogicallyLogged(rel->localrel))
- - - - -
- - - -
- ]
2183 peter_e@gmx.net 3190 :LBC (26) : relids_logged = lappend_oid(relids_logged, rel->localreloid);
3191 : :
3192 : : /*
3193 : : * Truncate partitions if we got a message to truncate a partitioned
3194 : : * table.
3195 : : */
1469 peter@eisentraut.org 3196 [ + + ]:CBC 28 : if (rel->localrel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
3197 : : {
3198 : : ListCell *child;
3199 : 4 : List *children = find_all_inheritors(rel->localreloid,
3200 : : lockmode,
3201 : : NULL);
3202 : :
3203 [ + - + + : 15 : foreach(child, children)
+ + ]
3204 : : {
3205 : 11 : Oid childrelid = lfirst_oid(child);
3206 : : Relation childrel;
3207 : :
3208 [ + + ]: 11 : if (list_member_oid(relids, childrelid))
3209 : 4 : continue;
3210 : :
3211 : : /* find_all_inheritors already got lock */
3212 : 7 : childrel = table_open(childrelid, NoLock);
3213 : :
3214 : : /*
3215 : : * Ignore temp tables of other backends. See similar code in
3216 : : * ExecuteTruncate().
3217 : : */
3218 [ - + - - ]: 7 : if (RELATION_IS_OTHER_TEMP(childrel))
3219 : : {
1059 akapila@postgresql.o 3220 :UBC 0 : table_close(childrel, lockmode);
1469 peter@eisentraut.org 3221 : 0 : continue;
3222 : : }
3223 : :
828 jdavis@postgresql.or 3224 :CBC 7 : TargetPrivilegesCheck(childrel, ACL_TRUNCATE);
1469 peter@eisentraut.org 3225 : 7 : rels = lappend(rels, childrel);
3226 : 7 : part_rels = lappend(part_rels, childrel);
3227 : 7 : relids = lappend_oid(relids, childrelid);
3228 : : /* Log this relation only if needed for logical decoding */
3229 [ - + - - : 7 : if (RelationIsLogicallyLogged(childrel))
- - - - -
- - - -
- ]
1469 peter@eisentraut.org 3230 :LBC (7) : relids_logged = lappend_oid(relids_logged, childrelid);
3231 : : }
3232 : : }
3233 : : }
3234 : :
3235 : : /*
3236 : : * Even if we used CASCADE on the upstream primary we explicitly default
3237 : : * to replaying changes without further cascading. This might be later
3238 : : * changeable with a user specified option.
3239 : : *
3240 : : * MySubscription->runasowner tells us whether we want to execute
3241 : : * replication actions as the subscription owner; the last argument to
3242 : : * TruncateGuts tells it whether we want to switch to the table owner.
3243 : : * Those are exactly opposite conditions.
3244 : : */
1102 fujii@postgresql.org 3245 :CBC 19 : ExecuteTruncateGuts(rels,
3246 : : relids,
3247 : : relids_logged,
3248 : : DROP_RESTRICT,
3249 : : restart_seqs,
376 rhaas@postgresql.org 3250 : 19 : !MySubscription->runasowner);
2199 peter_e@gmx.net 3251 [ + - + + : 47 : foreach(lc, remote_rels)
+ + ]
3252 : : {
3253 : 28 : LogicalRepRelMapEntry *rel = lfirst(lc);
3254 : :
3255 : 28 : logicalrep_rel_close(rel, NoLock);
3256 : : }
1469 peter@eisentraut.org 3257 [ + + + + : 26 : foreach(lc, part_rels)
+ + ]
3258 : : {
3259 : 7 : Relation rel = lfirst(lc);
3260 : :
3261 : 7 : table_close(rel, NoLock);
3262 : : }
3263 : :
1039 tgl@sss.pgh.pa.us 3264 : 19 : end_replication_step();
3265 : : }
3266 : :
3267 : :
3268 : : /*
3269 : : * Logical replication protocol message dispatcher.
3270 : : */
3271 : : void
2642 peter_e@gmx.net 3272 : 340253 : apply_dispatch(StringInfo s)
3273 : : {
1259 akapila@postgresql.o 3274 : 340253 : LogicalRepMsgType action = pq_getmsgbyte(s);
3275 : : LogicalRepMsgType saved_command;
3276 : :
3277 : : /*
3278 : : * Set the current command being applied. Since this function can be
3279 : : * called recursively when applying spooled changes, save the current
3280 : : * command.
3281 : : */
961 3282 : 340253 : saved_command = apply_error_callback_arg.command;
3283 : 340253 : apply_error_callback_arg.command = action;
3284 : :
2642 peter_e@gmx.net 3285 [ + + + + : 340253 : switch (action)
+ + + + +
- + + + +
+ + + + +
- ]
3286 : : {
1259 akapila@postgresql.o 3287 : 412 : case LOGICAL_REP_MSG_BEGIN:
2642 peter_e@gmx.net 3288 : 412 : apply_handle_begin(s);
961 akapila@postgresql.o 3289 : 412 : break;
3290 : :
1259 3291 : 382 : case LOGICAL_REP_MSG_COMMIT:
2642 peter_e@gmx.net 3292 : 382 : apply_handle_commit(s);
961 akapila@postgresql.o 3293 : 382 : break;
3294 : :
1259 3295 : 188845 : case LOGICAL_REP_MSG_INSERT:
2642 peter_e@gmx.net 3296 : 188845 : apply_handle_insert(s);
961 akapila@postgresql.o 3297 : 188820 : break;
3298 : :
1259 3299 : 66168 : case LOGICAL_REP_MSG_UPDATE:
2642 peter_e@gmx.net 3300 : 66168 : apply_handle_update(s);
961 akapila@postgresql.o 3301 : 66161 : break;
3302 : :
1259 3303 : 81940 : case LOGICAL_REP_MSG_DELETE:
2642 peter_e@gmx.net 3304 : 81940 : apply_handle_delete(s);
961 akapila@postgresql.o 3305 : 81940 : break;
3306 : :
1259 3307 : 19 : case LOGICAL_REP_MSG_TRUNCATE:
2199 peter_e@gmx.net 3308 : 19 : apply_handle_truncate(s);
961 akapila@postgresql.o 3309 : 19 : break;
3310 : :
1259 3311 : 440 : case LOGICAL_REP_MSG_RELATION:
2642 peter_e@gmx.net 3312 : 440 : apply_handle_relation(s);
961 akapila@postgresql.o 3313 : 440 : break;
3314 : :
1259 3315 : 18 : case LOGICAL_REP_MSG_TYPE:
2642 peter_e@gmx.net 3316 : 18 : apply_handle_type(s);
961 akapila@postgresql.o 3317 : 18 : break;
3318 : :
1259 3319 : 8 : case LOGICAL_REP_MSG_ORIGIN:
2642 peter_e@gmx.net 3320 : 8 : apply_handle_origin(s);
961 akapila@postgresql.o 3321 : 8 : break;
3322 : :
1104 akapila@postgresql.o 3323 :UBC 0 : case LOGICAL_REP_MSG_MESSAGE:
3324 : :
3325 : : /*
3326 : : * Logical replication does not use generic logical messages yet.
3327 : : * Although, it could be used by other applications that use this
3328 : : * output plugin.
3329 : : */
961 3330 : 0 : break;
3331 : :
1259 akapila@postgresql.o 3332 :CBC 916 : case LOGICAL_REP_MSG_STREAM_START:
1319 3333 : 916 : apply_handle_stream_start(s);
961 3334 : 915 : break;
3335 : :
969 3336 : 914 : case LOGICAL_REP_MSG_STREAM_STOP:
1319 3337 : 914 : apply_handle_stream_stop(s);
961 3338 : 912 : break;
3339 : :
1259 3340 : 38 : case LOGICAL_REP_MSG_STREAM_ABORT:
1319 3341 : 38 : apply_handle_stream_abort(s);
961 3342 : 38 : break;
3343 : :
1259 3344 : 64 : case LOGICAL_REP_MSG_STREAM_COMMIT:
1319 3345 : 64 : apply_handle_stream_commit(s);
961 3346 : 62 : break;
3347 : :
1005 3348 : 19 : case LOGICAL_REP_MSG_BEGIN_PREPARE:
3349 : 19 : apply_handle_begin_prepare(s);
961 3350 : 19 : break;
3351 : :
1005 3352 : 18 : case LOGICAL_REP_MSG_PREPARE:
3353 : 18 : apply_handle_prepare(s);
961 3354 : 18 : break;
3355 : :
1005 3356 : 23 : case LOGICAL_REP_MSG_COMMIT_PREPARED:
3357 : 23 : apply_handle_commit_prepared(s);
961 3358 : 23 : break;
3359 : :
1005 3360 : 9 : case LOGICAL_REP_MSG_ROLLBACK_PREPARED:
3361 : 9 : apply_handle_rollback_prepared(s);
961 3362 : 9 : break;
3363 : :
984 3364 : 20 : case LOGICAL_REP_MSG_STREAM_PREPARE:
3365 : 20 : apply_handle_stream_prepare(s);
961 3366 : 20 : break;
3367 : :
961 akapila@postgresql.o 3368 :UBC 0 : default:
3369 [ # # ]: 0 : ereport(ERROR,
3370 : : (errcode(ERRCODE_PROTOCOL_VIOLATION),
3371 : : errmsg("invalid logical replication message type \"??? (%d)\"", action)));
3372 : : }
3373 : :
3374 : : /* Reset the current command */
961 akapila@postgresql.o 3375 :CBC 340216 : apply_error_callback_arg.command = saved_command;
2642 peter_e@gmx.net 3376 : 340216 : }
3377 : :
3378 : : /*
3379 : : * Figure out which write/flush positions to report to the walsender process.
3380 : : *
3381 : : * We can't simply report back the last LSN the walsender sent us because the
3382 : : * local transaction might not yet be flushed to disk locally. Instead we
3383 : : * build a list that associates local with remote LSNs for every commit. When
3384 : : * reporting back the flush position to the sender we iterate that list and
3385 : : * check which entries on it are already locally flushed. Those we can report
3386 : : * as having been flushed.
3387 : : *
3388 : : * The have_pending_txes is true if there are outstanding transactions that
3389 : : * need to be flushed.
3390 : : */
3391 : : static void
3392 : 75318 : get_flush_position(XLogRecPtr *write, XLogRecPtr *flush,
3393 : : bool *have_pending_txes)
3394 : : {
3395 : : dlist_mutable_iter iter;
891 rhaas@postgresql.org 3396 : 75318 : XLogRecPtr local_flush = GetFlushRecPtr(NULL);
3397 : :
2642 peter_e@gmx.net 3398 : 75318 : *write = InvalidXLogRecPtr;
3399 : 75318 : *flush = InvalidXLogRecPtr;
3400 : :
3401 [ + - + + ]: 75781 : dlist_foreach_modify(iter, &lsn_mapping)
3402 : : {
3403 : 20212 : FlushPosition *pos =
331 tgl@sss.pgh.pa.us 3404 : 20212 : dlist_container(FlushPosition, node, iter.cur);
3405 : :
2642 peter_e@gmx.net 3406 : 20212 : *write = pos->remote_end;
3407 : :
3408 [ + + ]: 20212 : if (pos->local_end <= local_flush)
3409 : : {
3410 : 463 : *flush = pos->remote_end;
3411 : 463 : dlist_delete(iter.cur);
3412 : 463 : pfree(pos);
3413 : : }
3414 : : else
3415 : : {
3416 : : /*
3417 : : * Don't want to uselessly iterate over the rest of the list which
3418 : : * could potentially be long. Instead get the last element and
3419 : : * grab the write position from there.
3420 : : */
3421 : 19749 : pos = dlist_tail_element(FlushPosition, node,
3422 : : &lsn_mapping);
3423 : 19749 : *write = pos->remote_end;
3424 : 19749 : *have_pending_txes = true;
3425 : 19749 : return;
3426 : : }
3427 : : }
3428 : :
3429 : 55569 : *have_pending_txes = !dlist_is_empty(&lsn_mapping);
3430 : : }
3431 : :
3432 : : /*
3433 : : * Store current remote/local lsn pair in the tracking list.
3434 : : */
3435 : : void
461 akapila@postgresql.o 3436 : 508 : store_flush_position(XLogRecPtr remote_lsn, XLogRecPtr local_lsn)
3437 : : {
3438 : : FlushPosition *flushpos;
3439 : :
3440 : : /*
3441 : : * Skip for parallel apply workers, because the lsn_mapping is maintained
3442 : : * by the leader apply worker.
3443 : : */
3444 [ + + ]: 508 : if (am_parallel_apply_worker())
3445 : 20 : return;
3446 : :
3447 : : /* Need to do this in permanent context */
2532 peter_e@gmx.net 3448 : 488 : MemoryContextSwitchTo(ApplyContext);
3449 : :
3450 : : /* Track commit lsn */
2642 3451 : 488 : flushpos = (FlushPosition *) palloc(sizeof(FlushPosition));
461 akapila@postgresql.o 3452 : 488 : flushpos->local_end = local_lsn;
2642 peter_e@gmx.net 3453 : 488 : flushpos->remote_end = remote_lsn;
3454 : :
3455 : 488 : dlist_push_tail(&lsn_mapping, &flushpos->node);
2532 3456 : 488 : MemoryContextSwitchTo(ApplyMessageContext);
3457 : : }
3458 : :
3459 : :
3460 : : /* Update statistics of the worker. */
3461 : : static void
2642 3462 : 191030 : UpdateWorkerStats(XLogRecPtr last_lsn, TimestampTz send_time, bool reply)
3463 : : {
3464 : 191030 : MyLogicalRepWorker->last_lsn = last_lsn;
3465 : 191030 : MyLogicalRepWorker->last_send_time = send_time;
3466 : 191030 : MyLogicalRepWorker->last_recv_time = GetCurrentTimestamp();
3467 [ + + ]: 191030 : if (reply)
3468 : : {
3469 : 3232 : MyLogicalRepWorker->reply_lsn = last_lsn;
3470 : 3232 : MyLogicalRepWorker->reply_time = send_time;
3471 : : }
3472 : 191030 : }
3473 : :
3474 : : /*
3475 : : * Apply main loop.
3476 : : */
3477 : : static void
2579 3478 : 333 : LogicalRepApplyLoop(XLogRecPtr last_received)
3479 : : {
1640 michael@paquier.xyz 3480 : 333 : TimestampTz last_recv_timestamp = GetCurrentTimestamp();
1318 tgl@sss.pgh.pa.us 3481 : 333 : bool ping_sent = false;
3482 : : TimeLineID tli;
3483 : : ErrorContextCallback errcallback;
3484 : :
3485 : : /*
3486 : : * Init the ApplyMessageContext which we clean up after each replication
3487 : : * protocol message.
3488 : : */
2532 peter_e@gmx.net 3489 : 333 : ApplyMessageContext = AllocSetContextCreate(ApplyContext,
3490 : : "ApplyMessageContext",
3491 : : ALLOCSET_DEFAULT_SIZES);
3492 : :
3493 : : /*
3494 : : * This memory context is used for per-stream data when the streaming mode
3495 : : * is enabled. This context is reset on each stream stop.
3496 : : */
1319 akapila@postgresql.o 3497 : 333 : LogicalStreamingContext = AllocSetContextCreate(ApplyContext,
3498 : : "LogicalStreamingContext",
3499 : : ALLOCSET_DEFAULT_SIZES);
3500 : :
3501 : : /* mark as idle, before starting to loop */
2642 peter_e@gmx.net 3502 : 333 : pgstat_report_activity(STATE_IDLE, NULL);
3503 : :
3504 : : /*
3505 : : * Push apply error context callback. Fields will be filled while applying
3506 : : * a change.
3507 : : */
961 akapila@postgresql.o 3508 : 333 : errcallback.callback = apply_error_callback;
3509 : 333 : errcallback.previous = error_context_stack;
3510 : 333 : error_context_stack = &errcallback;
461 3511 : 333 : apply_error_context_stack = error_context_stack;
3512 : :
3513 : : /* This outer loop iterates once per wait. */
3514 : : for (;;)
2642 peter_e@gmx.net 3515 : 71642 : {
3516 : 71975 : pgsocket fd = PGINVALID_SOCKET;
3517 : : int rc;
3518 : : int len;
3519 : 71975 : char *buf = NULL;
3520 : 71975 : bool endofstream = false;
3521 : : long wait_time;
3522 : :
2508 3523 [ - + ]: 71975 : CHECK_FOR_INTERRUPTS();
3524 : :
2532 3525 : 71975 : MemoryContextSwitchTo(ApplyMessageContext);
3526 : :
1068 alvherre@alvh.no-ip. 3527 : 71975 : len = walrcv_receive(LogRepWorkerWalRcvConn, &buf, &fd);
3528 : :
2642 peter_e@gmx.net 3529 [ + + ]: 71954 : if (len != 0)
3530 : : {
3531 : : /* Loop to process all available data (without blocking). */
3532 : : for (;;)
3533 : : {
3534 [ - + ]: 262134 : CHECK_FOR_INTERRUPTS();
3535 : :
3536 [ + + ]: 262134 : if (len == 0)
3537 : : {
3538 : 71098 : break;
3539 : : }
3540 [ + + ]: 191036 : else if (len < 0)
3541 : : {
3542 [ + - ]: 6 : ereport(LOG,
3543 : : (errmsg("data stream from publisher has ended")));
3544 : 6 : endofstream = true;
3545 : 6 : break;
3546 : : }
3547 : : else
3548 : : {
3549 : : int c;
3550 : : StringInfoData s;
3551 : :
312 akapila@postgresql.o 3552 [ - + ]: 191030 : if (ConfigReloadPending)
3553 : : {
312 akapila@postgresql.o 3554 :UBC 0 : ConfigReloadPending = false;
3555 : 0 : ProcessConfigFile(PGC_SIGHUP);
3556 : : }
3557 : :
3558 : : /* Reset timeout. */
2642 peter_e@gmx.net 3559 :CBC 191030 : last_recv_timestamp = GetCurrentTimestamp();
3560 : 191030 : ping_sent = false;
3561 : :
3562 : : /* Ensure we are reading the data into our memory context. */
2532 3563 : 191030 : MemoryContextSwitchTo(ApplyMessageContext);
3564 : :
171 drowley@postgresql.o 3565 :GNC 191030 : initReadOnlyStringInfo(&s, buf, len);
3566 : :
2642 peter_e@gmx.net 3567 :CBC 191030 : c = pq_getmsgbyte(&s);
3568 : :
3569 [ + + ]: 191030 : if (c == 'w')
3570 : : {
3571 : : XLogRecPtr start_lsn;
3572 : : XLogRecPtr end_lsn;
3573 : : TimestampTz send_time;
3574 : :
3575 : 187798 : start_lsn = pq_getmsgint64(&s);
3576 : 187798 : end_lsn = pq_getmsgint64(&s);
2607 tgl@sss.pgh.pa.us 3577 : 187798 : send_time = pq_getmsgint64(&s);
3578 : :
2642 peter_e@gmx.net 3579 [ + + ]: 187798 : if (last_received < start_lsn)
3580 : 150884 : last_received = start_lsn;
3581 : :
3582 [ - + ]: 187798 : if (last_received < end_lsn)
2642 peter_e@gmx.net 3583 :UBC 0 : last_received = end_lsn;
3584 : :
2642 peter_e@gmx.net 3585 :CBC 187798 : UpdateWorkerStats(last_received, send_time, false);
3586 : :
3587 : 187798 : apply_dispatch(&s);
3588 : : }
3589 [ + - ]: 3232 : else if (c == 'k')
3590 : : {
3591 : : XLogRecPtr end_lsn;
3592 : : TimestampTz timestamp;
3593 : : bool reply_requested;
3594 : :
2579 3595 : 3232 : end_lsn = pq_getmsgint64(&s);
2607 tgl@sss.pgh.pa.us 3596 : 3232 : timestamp = pq_getmsgint64(&s);
2642 peter_e@gmx.net 3597 : 3232 : reply_requested = pq_getmsgbyte(&s);
3598 : :
2579 3599 [ + + ]: 3232 : if (last_received < end_lsn)
3600 : 832 : last_received = end_lsn;
3601 : :
3602 : 3232 : send_feedback(last_received, reply_requested, false);
2642 3603 : 3232 : UpdateWorkerStats(last_received, timestamp, true);
3604 : : }
3605 : : /* other message types are purposefully ignored */
3606 : :
2532 3607 : 190996 : MemoryContextReset(ApplyMessageContext);
3608 : : }
3609 : :
1068 alvherre@alvh.no-ip. 3610 : 190996 : len = walrcv_receive(LogRepWorkerWalRcvConn, &buf, &fd);
3611 : : }
3612 : : }
3613 : :
3614 : : /* confirm all writes so far */
2479 tgl@sss.pgh.pa.us 3615 : 71920 : send_feedback(last_received, false, false);
3616 : :
1319 akapila@postgresql.o 3617 [ + + + + ]: 71920 : if (!in_remote_transaction && !in_streamed_transaction)
3618 : : {
3619 : : /*
3620 : : * If we didn't get any transactions for a while there might be
3621 : : * unconsumed invalidation messages in the queue, consume them
3622 : : * now.
3623 : : */
2579 peter_e@gmx.net 3624 : 4133 : AcceptInvalidationMessages();
2507 3625 : 4133 : maybe_reread_subscription();
3626 : :
3627 : : /* Process any table synchronization changes. */
2579 3628 : 4104 : process_syncing_tables(last_received);
3629 : : }
3630 : :
3631 : : /* Cleanup the memory. */
151 nathan@postgresql.or 3632 :GNC 71725 : MemoryContextReset(ApplyMessageContext);
2642 peter_e@gmx.net 3633 :CBC 71725 : MemoryContextSwitchTo(TopMemoryContext);
3634 : :
3635 : : /* Check if we need to exit the streaming loop. */
3636 [ + + ]: 71725 : if (endofstream)
3637 : 6 : break;
3638 : :
3639 : : /*
3640 : : * Wait for more data or latch. If we have unflushed transactions,
3641 : : * wake up after WalWriterDelay to see if they've been flushed yet (in
3642 : : * which case we should send a feedback message). Otherwise, there's
3643 : : * no particular urgency about waking up unless we get data or a
3644 : : * signal.
3645 : : */
2479 tgl@sss.pgh.pa.us 3646 [ + + ]: 71719 : if (!dlist_is_empty(&lsn_mapping))
3647 : 17569 : wait_time = WalWriterDelay;
3648 : : else
3649 : 54150 : wait_time = NAPTIME_PER_CYCLE;
3650 : :
2504 andres@anarazel.de 3651 : 71719 : rc = WaitLatchOrSocket(MyLatch,
3652 : : WL_SOCKET_READABLE | WL_LATCH_SET |
3653 : : WL_TIMEOUT | WL_EXIT_ON_PM_DEATH,
3654 : : fd, wait_time,
3655 : : WAIT_EVENT_LOGICAL_APPLY_MAIN);
3656 : :
3657 [ + + ]: 71713 : if (rc & WL_LATCH_SET)
3658 : : {
3659 : 463 : ResetLatch(MyLatch);
3660 [ + + ]: 463 : CHECK_FOR_INTERRUPTS();
3661 : : }
3662 : :
1580 rhaas@postgresql.org 3663 [ + + ]: 71642 : if (ConfigReloadPending)
3664 : : {
3665 : 14 : ConfigReloadPending = false;
2561 peter_e@gmx.net 3666 : 14 : ProcessConfigFile(PGC_SIGHUP);
3667 : : }
3668 : :
2642 3669 [ + + ]: 71642 : if (rc & WL_TIMEOUT)
3670 : : {
3671 : : /*
3672 : : * We didn't receive anything new. If we haven't heard anything
3673 : : * from the server for more than wal_receiver_timeout / 2, ping
3674 : : * the server. Also, if it's been longer than
3675 : : * wal_receiver_status_interval since the last update we sent,
3676 : : * send a status update to the primary anyway, to report any
3677 : : * progress in applying WAL.
3678 : : */
3679 : 166 : bool requestReply = false;
3680 : :
3681 : : /*
3682 : : * Check if time since last receive from primary has reached the
3683 : : * configured limit.
3684 : : */
3685 [ + - ]: 166 : if (wal_receiver_timeout > 0)
3686 : : {
3687 : 166 : TimestampTz now = GetCurrentTimestamp();
3688 : : TimestampTz timeout;
3689 : :
3690 : 166 : timeout =
3691 : 166 : TimestampTzPlusMilliseconds(last_recv_timestamp,
3692 : : wal_receiver_timeout);
3693 : :
3694 [ - + ]: 166 : if (now >= timeout)
2642 peter_e@gmx.net 3695 [ # # ]:UBC 0 : ereport(ERROR,
3696 : : (errcode(ERRCODE_CONNECTION_FAILURE),
3697 : : errmsg("terminating logical replication worker due to timeout")));
3698 : :
3699 : : /* Check to see if it's time for a ping. */
2642 peter_e@gmx.net 3700 [ + - ]:CBC 166 : if (!ping_sent)
3701 : : {
3702 : 166 : timeout = TimestampTzPlusMilliseconds(last_recv_timestamp,
3703 : : (wal_receiver_timeout / 2));
3704 [ - + ]: 166 : if (now >= timeout)
3705 : : {
2642 peter_e@gmx.net 3706 :UBC 0 : requestReply = true;
3707 : 0 : ping_sent = true;
3708 : : }
3709 : : }
3710 : : }
3711 : :
2642 peter_e@gmx.net 3712 :CBC 166 : send_feedback(last_received, requestReply, requestReply);
3713 : :
3714 : : /*
3715 : : * Force reporting to ensure long idle periods don't lead to
3716 : : * arbitrarily delayed stats. Stats can only be reported outside
3717 : : * of (implicit or explicit) transactions. That shouldn't lead to
3718 : : * stats being delayed for long, because transactions are either
3719 : : * sent as a whole on commit or streamed. Streamed transactions
3720 : : * are spilled to disk and applied on commit.
3721 : : */
703 andres@anarazel.de 3722 [ + - ]: 166 : if (!IsTransactionState())
3723 : 166 : pgstat_report_stat(true);
3724 : : }
3725 : : }
3726 : :
3727 : : /* Pop the error context stack */
961 akapila@postgresql.o 3728 : 6 : error_context_stack = errcallback.previous;
461 3729 : 6 : apply_error_context_stack = error_context_stack;
3730 : :
3731 : : /* All done */
1068 alvherre@alvh.no-ip. 3732 : 6 : walrcv_endstreaming(LogRepWorkerWalRcvConn, &tli);
2642 peter_e@gmx.net 3733 :UBC 0 : }
3734 : :
3735 : : /*
3736 : : * Send a Standby Status Update message to server.
3737 : : *
3738 : : * 'recvpos' is the latest LSN we've received data to, force is set if we need
3739 : : * to send a response to avoid timeouts.
3740 : : */
3741 : : static void
2642 peter_e@gmx.net 3742 :CBC 75318 : send_feedback(XLogRecPtr recvpos, bool force, bool requestReply)
3743 : : {
3744 : : static StringInfo reply_message = NULL;
3745 : : static TimestampTz send_time = 0;
3746 : :
3747 : : static XLogRecPtr last_recvpos = InvalidXLogRecPtr;
3748 : : static XLogRecPtr last_writepos = InvalidXLogRecPtr;
3749 : : static XLogRecPtr last_flushpos = InvalidXLogRecPtr;
3750 : :
3751 : : XLogRecPtr writepos;
3752 : : XLogRecPtr flushpos;
3753 : : TimestampTz now;
3754 : : bool have_pending_txes;
3755 : :
3756 : : /*
3757 : : * If the user doesn't want status to be reported to the publisher, be
3758 : : * sure to exit before doing anything at all.
3759 : : */
3760 [ + + - + ]: 75318 : if (!force && wal_receiver_status_interval <= 0)
2642 peter_e@gmx.net 3761 :UBC 0 : return;
3762 : :
3763 : : /* It's legal to not pass a recvpos */
2642 peter_e@gmx.net 3764 [ - + ]:CBC 75318 : if (recvpos < last_recvpos)
2642 peter_e@gmx.net 3765 :UBC 0 : recvpos = last_recvpos;
3766 : :
2642 peter_e@gmx.net 3767 :CBC 75318 : get_flush_position(&writepos, &flushpos, &have_pending_txes);
3768 : :
3769 : : /*
3770 : : * No outstanding transactions to flush, we can report the latest received
3771 : : * position. This is important for synchronous replication.
3772 : : */
3773 [ + + ]: 75318 : if (!have_pending_txes)
3774 : 55569 : flushpos = writepos = recvpos;
3775 : :
3776 [ - + ]: 75318 : if (writepos < last_writepos)
2642 peter_e@gmx.net 3777 :UBC 0 : writepos = last_writepos;
3778 : :
2642 peter_e@gmx.net 3779 [ + + ]:CBC 75318 : if (flushpos < last_flushpos)
3780 : 19719 : flushpos = last_flushpos;
3781 : :
3782 : 75318 : now = GetCurrentTimestamp();
3783 : :
3784 : : /* if we've already reported everything we're good */
3785 [ + + ]: 75318 : if (!force &&
3786 [ + + ]: 73602 : writepos == last_writepos &&
3787 [ + + ]: 28974 : flushpos == last_flushpos &&
3788 [ + + ]: 28853 : !TimestampDifferenceExceeds(send_time, now,
3789 : : wal_receiver_status_interval * 1000))
3790 : 28771 : return;
3791 : 46547 : send_time = now;
3792 : :
3793 [ + + ]: 46547 : if (!reply_message)
3794 : : {
2524 bruce@momjian.us 3795 : 333 : MemoryContext oldctx = MemoryContextSwitchTo(ApplyContext);
3796 : :
2642 peter_e@gmx.net 3797 : 333 : reply_message = makeStringInfo();
3798 : 333 : MemoryContextSwitchTo(oldctx);
3799 : : }
3800 : : else
3801 : 46214 : resetStringInfo(reply_message);
3802 : :
3803 : 46547 : pq_sendbyte(reply_message, 'r');
2489 tgl@sss.pgh.pa.us 3804 : 46547 : pq_sendint64(reply_message, recvpos); /* write */
3805 : 46547 : pq_sendint64(reply_message, flushpos); /* flush */
3806 : 46547 : pq_sendint64(reply_message, writepos); /* apply */
2524 bruce@momjian.us 3807 : 46547 : pq_sendint64(reply_message, now); /* sendTime */
2642 peter_e@gmx.net 3808 : 46547 : pq_sendbyte(reply_message, requestReply); /* replyRequested */
3809 : :
3810 [ - + ]: 46547 : elog(DEBUG2, "sending feedback (force %d) to recv %X/%X, write %X/%X, flush %X/%X",
3811 : : force,
3812 : : LSN_FORMAT_ARGS(recvpos),
3813 : : LSN_FORMAT_ARGS(writepos),
3814 : : LSN_FORMAT_ARGS(flushpos));
3815 : :
1068 alvherre@alvh.no-ip. 3816 : 46547 : walrcv_send(LogRepWorkerWalRcvConn,
3817 : : reply_message->data, reply_message->len);
3818 : :
2642 peter_e@gmx.net 3819 [ + + ]: 46547 : if (recvpos > last_recvpos)
3820 : 44629 : last_recvpos = recvpos;
3821 [ + + ]: 46547 : if (writepos > last_writepos)
3822 : 44629 : last_writepos = writepos;
3823 [ + + ]: 46547 : if (flushpos > last_flushpos)
3824 : 44400 : last_flushpos = flushpos;
3825 : : }
3826 : :
3827 : : /*
3828 : : * Exit routine for apply workers due to subscription parameter changes.
3829 : : */
3830 : : static void
461 akapila@postgresql.o 3831 : 32 : apply_worker_exit(void)
3832 : : {
3833 [ - + ]: 32 : if (am_parallel_apply_worker())
3834 : : {
3835 : : /*
3836 : : * Don't stop the parallel apply worker as the leader will detect the
3837 : : * subscription parameter change and restart logical replication later
3838 : : * anyway. This also prevents the leader from reporting errors when
3839 : : * trying to communicate with a stopped parallel apply worker, which
3840 : : * would accidentally disable subscriptions if disable_on_error was
3841 : : * set.
3842 : : */
461 akapila@postgresql.o 3843 :UBC 0 : return;
3844 : : }
3845 : :
3846 : : /*
3847 : : * Reset the last-start time for this apply worker so that the launcher
3848 : : * will restart it without waiting for wal_retrieve_retry_interval if the
3849 : : * subscription is still active, and so that we won't leak that hash table
3850 : : * entry if it isn't.
3851 : : */
254 akapila@postgresql.o 3852 [ + - ]:GNC 32 : if (am_leader_apply_worker())
448 tgl@sss.pgh.pa.us 3853 :CBC 32 : ApplyLauncherForgetWorkerStartTime(MyLogicalRepWorker->subid);
3854 : :
461 akapila@postgresql.o 3855 : 32 : proc_exit(0);
3856 : : }
3857 : :
3858 : : /*
3859 : : * Reread subscription info if needed. Most changes will be exit.
3860 : : */
3861 : : void
2507 peter_e@gmx.net 3862 : 5122 : maybe_reread_subscription(void)
3863 : : {
3864 : : MemoryContext oldctx;
3865 : : Subscription *newsub;
2524 bruce@momjian.us 3866 : 5122 : bool started_tx = false;
3867 : :
3868 : : /* When cache state is valid there is nothing to do here. */
2507 peter_e@gmx.net 3869 [ + + ]: 5122 : if (MySubscriptionValid)
3870 : 5056 : return;
3871 : :
3872 : : /* This function might be called inside or outside of transaction. */
2579 3873 [ + + ]: 66 : if (!IsTransactionState())
3874 : : {
3875 : 60 : StartTransactionCommand();
3876 : 60 : started_tx = true;
3877 : : }
3878 : :
3879 : : /* Ensure allocations in permanent context. */
2532 3880 : 66 : oldctx = MemoryContextSwitchTo(ApplyContext);
3881 : :
2642 3882 : 66 : newsub = GetSubscription(MyLogicalRepWorker->subid, true);
3883 : :
3884 : : /*
3885 : : * Exit if the subscription was removed. This normally should not happen
3886 : : * as the worker gets killed during DROP SUBSCRIPTION.
3887 : : */
2638 3888 [ - + ]: 66 : if (!newsub)
3889 : : {
2642 peter_e@gmx.net 3890 [ # # ]:UBC 0 : ereport(LOG,
3891 : : (errmsg("logical replication worker for subscription \"%s\" will stop because the subscription was removed",
3892 : : MySubscription->name)));
3893 : :
3894 : : /* Ensure we remove no-longer-useful entry for worker's start time */
254 akapila@postgresql.o 3895 [ # # ]:UNC 0 : if (am_leader_apply_worker())
448 tgl@sss.pgh.pa.us 3896 :UBC 0 : ApplyLauncherForgetWorkerStartTime(MyLogicalRepWorker->subid);
3897 : :
2642 peter_e@gmx.net 3898 : 0 : proc_exit(0);
3899 : : }
3900 : :
3901 : : /* Exit if the subscription was disabled. */
2532 peter_e@gmx.net 3902 [ + + ]:CBC 66 : if (!newsub->enabled)
3903 : : {
3904 [ + - ]: 6 : ereport(LOG,
3905 : : (errmsg("logical replication worker for subscription \"%s\" will stop because the subscription was disabled",
3906 : : MySubscription->name)));
3907 : :
461 akapila@postgresql.o 3908 : 6 : apply_worker_exit();
3909 : : }
3910 : :
3911 : : /* !slotname should never happen when enabled is true. */
2532 peter_e@gmx.net 3912 [ - + ]: 60 : Assert(newsub->slotname);
3913 : :
3914 : : /* two-phase should not be altered */
1005 akapila@postgresql.o 3915 [ - + ]: 60 : Assert(newsub->twophasestate == MySubscription->twophasestate);
3916 : :
3917 : : /*
3918 : : * Exit if any parameter that affects the remote connection was changed.
3919 : : * The launcher will start a new worker but note that the parallel apply
3920 : : * worker won't restart if the streaming option's value is changed from
3921 : : * 'parallel' to any other value or the server decides not to stream the
3922 : : * in-progress transaction.
3923 : : */
1366 tgl@sss.pgh.pa.us 3924 [ + + ]: 60 : if (strcmp(newsub->conninfo, MySubscription->conninfo) != 0 ||
3925 [ + + ]: 58 : strcmp(newsub->name, MySubscription->name) != 0 ||
3926 [ + - ]: 57 : strcmp(newsub->slotname, MySubscription->slotname) != 0 ||
3927 [ + + ]: 57 : newsub->binary != MySubscription->binary ||
1319 akapila@postgresql.o 3928 [ + + ]: 51 : newsub->stream != MySubscription->stream ||
360 3929 [ + - ]: 46 : newsub->passwordrequired != MySubscription->passwordrequired ||
633 3930 [ + - ]: 46 : strcmp(newsub->origin, MySubscription->origin) != 0 ||
828 jdavis@postgresql.or 3931 [ + + ]: 46 : newsub->owner != MySubscription->owner ||
1366 tgl@sss.pgh.pa.us 3932 [ + + ]: 45 : !equal(newsub->publications, MySubscription->publications))
3933 : : {
461 akapila@postgresql.o 3934 [ - + ]: 22 : if (am_parallel_apply_worker())
461 akapila@postgresql.o 3935 [ # # ]:UBC 0 : ereport(LOG,
3936 : : (errmsg("logical replication parallel apply worker for subscription \"%s\" will stop because of a parameter change",
3937 : : MySubscription->name)));
3938 : : else
461 akapila@postgresql.o 3939 [ + - ]:CBC 22 : ereport(LOG,
3940 : : (errmsg("logical replication worker for subscription \"%s\" will restart because of a parameter change",
3941 : : MySubscription->name)));
3942 : :
3943 : 22 : apply_worker_exit();
3944 : : }
3945 : :
3946 : : /*
3947 : : * Exit if the subscription owner's superuser privileges have been
3948 : : * revoked.
3949 : : */
180 akapila@postgresql.o 3950 [ + + + + ]:GNC 38 : if (!newsub->ownersuperuser && MySubscription->ownersuperuser)
3951 : : {
3952 [ - + ]: 4 : if (am_parallel_apply_worker())
180 akapila@postgresql.o 3953 [ # # ]:UNC 0 : ereport(LOG,
3954 : : errmsg("logical replication parallel apply worker for subscription \"%s\" will stop because the subscription owner's superuser privileges have been revoked",
3955 : : MySubscription->name));
3956 : : else
180 akapila@postgresql.o 3957 [ + - ]:GNC 4 : ereport(LOG,
3958 : : errmsg("logical replication worker for subscription \"%s\" will restart because the subscription owner's superuser privileges have been revoked",
3959 : : MySubscription->name));
3960 : :
3961 : 4 : apply_worker_exit();
3962 : : }
3963 : :
3964 : : /* Check for other changes that should never happen too. */
2568 peter_e@gmx.net 3965 [ - + ]:CBC 34 : if (newsub->dbid != MySubscription->dbid)
3966 : : {
2642 peter_e@gmx.net 3967 [ # # ]:UBC 0 : elog(ERROR, "subscription %u changed unexpectedly",
3968 : : MyLogicalRepWorker->subid);
3969 : : }
3970 : :
3971 : : /* Clean old subscription info and switch to new one. */
2642 peter_e@gmx.net 3972 :CBC 34 : FreeSubscription(MySubscription);
3973 : 34 : MySubscription = newsub;
3974 : :
3975 : 34 : MemoryContextSwitchTo(oldctx);
3976 : :
3977 : : /* Change synchronous commit according to the user's wishes */
2557 3978 : 34 : SetConfigOption("synchronous_commit", MySubscription->synccommit,
3979 : : PGC_BACKEND, PGC_S_OVERRIDE);
3980 : :
2579 3981 [ + + ]: 34 : if (started_tx)
3982 : 31 : CommitTransactionCommand();
3983 : :
2642 3984 : 34 : MySubscriptionValid = true;
3985 : : }
3986 : :
3987 : : /*
3988 : : * Callback from subscription syscache invalidation.
3989 : : */
3990 : : static void
3991 : 68 : subscription_change_cb(Datum arg, int cacheid, uint32 hashvalue)
3992 : : {
3993 : 68 : MySubscriptionValid = false;
3994 : 68 : }
3995 : :
3996 : : /*
3997 : : * subxact_info_write
3998 : : * Store information about subxacts for a toplevel transaction.
3999 : : *
4000 : : * For each subxact we store offset of it's first change in the main file.
4001 : : * The file is always over-written as a whole.
4002 : : *
4003 : : * XXX We should only store subxacts that were not aborted yet.
4004 : : */
4005 : : static void
1319 akapila@postgresql.o 4006 : 392 : subxact_info_write(Oid subid, TransactionId xid)
4007 : : {
4008 : : char path[MAXPGPATH];
4009 : : Size len;
4010 : : BufFile *fd;
4011 : :
4012 [ - + ]: 392 : Assert(TransactionIdIsValid(xid));
4013 : :
4014 : : /* construct the subxact filename */
955 4015 : 392 : subxact_filename(path, subid, xid);
4016 : :
4017 : : /* Delete the subxacts file, if exists. */
1319 4018 [ + + ]: 392 : if (subxact_data.nsubxacts == 0)
4019 : : {
955 4020 : 310 : cleanup_subxact_info();
4021 : 310 : BufFileDeleteFileSet(MyLogicalRepWorker->stream_fileset, path, true);
4022 : :
1319 4023 : 310 : return;
4024 : : }
4025 : :
4026 : : /*
4027 : : * Create the subxact file if it not already created, otherwise open the
4028 : : * existing file.
4029 : : */
955 4030 : 82 : fd = BufFileOpenFileSet(MyLogicalRepWorker->stream_fileset, path, O_RDWR,
4031 : : true);
4032 [ + + ]: 82 : if (fd == NULL)
4033 : 8 : fd = BufFileCreateFileSet(MyLogicalRepWorker->stream_fileset, path);
4034 : :
1319 4035 : 82 : len = sizeof(SubXactInfo) * subxact_data.nsubxacts;
4036 : :
4037 : : /* Write the subxact count and subxact info */
4038 : 82 : BufFileWrite(fd, &subxact_data.nsubxacts, sizeof(subxact_data.nsubxacts));
4039 : 82 : BufFileWrite(fd, subxact_data.subxacts, len);
4040 : :
4041 : 82 : BufFileClose(fd);
4042 : :
4043 : : /* free the memory allocated for subxact info */
4044 : 82 : cleanup_subxact_info();
4045 : : }
4046 : :
4047 : : /*
4048 : : * subxact_info_read
4049 : : * Restore information about subxacts of a streamed transaction.
4050 : : *
4051 : : * Read information about subxacts into the structure subxact_data that can be
4052 : : * used later.
4053 : : */
4054 : : static void
4055 : 360 : subxact_info_read(Oid subid, TransactionId xid)
4056 : : {
4057 : : char path[MAXPGPATH];
4058 : : Size len;
4059 : : BufFile *fd;
4060 : : MemoryContext oldctx;
4061 : :
4062 [ - + ]: 360 : Assert(!subxact_data.subxacts);
4063 [ - + ]: 360 : Assert(subxact_data.nsubxacts == 0);
4064 [ - + ]: 360 : Assert(subxact_data.nsubxacts_max == 0);
4065 : :
4066 : : /*
4067 : : * If the subxact file doesn't exist that means we don't have any subxact
4068 : : * info.
4069 : : */
4070 : 360 : subxact_filename(path, subid, xid);
955 4071 : 360 : fd = BufFileOpenFileSet(MyLogicalRepWorker->stream_fileset, path, O_RDONLY,
4072 : : true);
4073 [ + + ]: 360 : if (fd == NULL)
4074 : 281 : return;
4075 : :
4076 : : /* read number of subxact items */
454 peter@eisentraut.org 4077 : 79 : BufFileReadExact(fd, &subxact_data.nsubxacts, sizeof(subxact_data.nsubxacts));
4078 : :
1319 akapila@postgresql.o 4079 : 79 : len = sizeof(SubXactInfo) * subxact_data.nsubxacts;
4080 : :
4081 : : /* we keep the maximum as a power of 2 */
4082 : 79 : subxact_data.nsubxacts_max = 1 << my_log2(subxact_data.nsubxacts);
4083 : :
4084 : : /*
4085 : : * Allocate subxact information in the logical streaming context. We need
4086 : : * this information during the complete stream so that we can add the sub
4087 : : * transaction info to this. On stream stop we will flush this information
4088 : : * to the subxact file and reset the logical streaming context.
4089 : : */
4090 : 79 : oldctx = MemoryContextSwitchTo(LogicalStreamingContext);
4091 : 79 : subxact_data.subxacts = palloc(subxact_data.nsubxacts_max *
4092 : : sizeof(SubXactInfo));
4093 : 79 : MemoryContextSwitchTo(oldctx);
4094 : :
454 peter@eisentraut.org 4095 [ + - ]: 79 : if (len > 0)
4096 : 79 : BufFileReadExact(fd, subxact_data.subxacts, len);
4097 : :
1319 akapila@postgresql.o 4098 : 79 : BufFileClose(fd);
4099 : : }
4100 : :
4101 : : /*
4102 : : * subxact_info_add
4103 : : * Add information about a subxact (offset in the main file).
4104 : : */
4105 : : static void
4106 : 102538 : subxact_info_add(TransactionId xid)
4107 : : {
4108 : 102538 : SubXactInfo *subxacts = subxact_data.subxacts;
4109 : : int64 i;
4110 : :
4111 : : /* We must have a valid top level stream xid and a stream fd. */
4112 [ - + ]: 102538 : Assert(TransactionIdIsValid(stream_xid));
4113 [ - + ]: 102538 : Assert(stream_fd != NULL);
4114 : :
4115 : : /*
4116 : : * If the XID matches the toplevel transaction, we don't want to add it.
4117 : : */
4118 [ + + ]: 102538 : if (stream_xid == xid)
4119 : 92413 : return;
4120 : :
4121 : : /*
4122 : : * In most cases we're checking the same subxact as we've already seen in
4123 : : * the last call, so make sure to ignore it (this change comes later).
4124 : : */
4125 [ + + ]: 10125 : if (subxact_data.subxact_last == xid)
4126 : 10049 : return;
4127 : :
4128 : : /* OK, remember we're processing this XID. */
4129 : 76 : subxact_data.subxact_last = xid;
4130 : :
4131 : : /*
4132 : : * Check if the transaction is already present in the array of subxact. We
4133 : : * intentionally scan the array from the tail, because we're likely adding
4134 : : * a change for the most recent subtransactions.
4135 : : *
4136 : : * XXX Can we rely on the subxact XIDs arriving in sorted order? That
4137 : : * would allow us to use binary search here.
4138 : : */
4139 [ + + ]: 95 : for (i = subxact_data.nsubxacts; i > 0; i--)
4140 : : {
4141 : : /* found, so we're done */
4142 [ + + ]: 76 : if (subxacts[i - 1].xid == xid)
4143 : 57 : return;
4144 : : }
4145 : :
4146 : : /* This is a new subxact, so we need to add it to the array. */
4147 [ + + ]: 19 : if (subxact_data.nsubxacts == 0)
4148 : : {
4149 : : MemoryContext oldctx;
4150 : :
4151 : 8 : subxact_data.nsubxacts_max = 128;
4152 : :
4153 : : /*
4154 : : * Allocate this memory for subxacts in per-stream context, see
4155 : : * subxact_info_read.
4156 : : */
4157 : 8 : oldctx = MemoryContextSwitchTo(LogicalStreamingContext);
4158 : 8 : subxacts = palloc(subxact_data.nsubxacts_max * sizeof(SubXactInfo));
4159 : 8 : MemoryContextSwitchTo(oldctx);
4160 : : }
4161 [ + + ]: 11 : else if (subxact_data.nsubxacts == subxact_data.nsubxacts_max)
4162 : : {
4163 : 10 : subxact_data.nsubxacts_max *= 2;
4164 : 10 : subxacts = repalloc(subxacts,
4165 : 10 : subxact_data.nsubxacts_max * sizeof(SubXactInfo));
4166 : : }
4167 : :
4168 : 19 : subxacts[subxact_data.nsubxacts].xid = xid;
4169 : :
4170 : : /*
4171 : : * Get the current offset of the stream file and store it as offset of
4172 : : * this subxact.
4173 : : */
4174 : 19 : BufFileTell(stream_fd,
4175 : 19 : &subxacts[subxact_data.nsubxacts].fileno,
4176 : 19 : &subxacts[subxact_data.nsubxacts].offset);
4177 : :
4178 : 19 : subxact_data.nsubxacts++;
4179 : 19 : subxact_data.subxacts = subxacts;
4180 : : }
4181 : :
4182 : : /* format filename for file containing the info about subxacts */
4183 : : static inline void
4184 : 787 : subxact_filename(char *path, Oid subid, TransactionId xid)
4185 : : {
4186 : 787 : snprintf(path, MAXPGPATH, "%u-%u.subxacts", subid, xid);
4187 : 787 : }
4188 : :
4189 : : /* format filename for file containing serialized changes */
4190 : : static inline void
4191 : 467 : changes_filename(char *path, Oid subid, TransactionId xid)
4192 : : {
4193 : 467 : snprintf(path, MAXPGPATH, "%u-%u.changes", subid, xid);
4194 : 467 : }
4195 : :
4196 : : /*
4197 : : * stream_cleanup_files
4198 : : * Cleanup files for a subscription / toplevel transaction.
4199 : : *
4200 : : * Remove files with serialized changes and subxact info for a particular
4201 : : * toplevel transaction. Each subscription has a separate set of files
4202 : : * for any toplevel transaction.
4203 : : */
4204 : : void
4205 : 35 : stream_cleanup_files(Oid subid, TransactionId xid)
4206 : : {
4207 : : char path[MAXPGPATH];
4208 : :
4209 : : /* Delete the changes file. */
4210 : 35 : changes_filename(path, subid, xid);
955 4211 : 35 : BufFileDeleteFileSet(MyLogicalRepWorker->stream_fileset, path, false);
4212 : :
4213 : : /* Delete the subxact file, if it exists. */
4214 : 35 : subxact_filename(path, subid, xid);
4215 : 35 : BufFileDeleteFileSet(MyLogicalRepWorker->stream_fileset, path, true);
1319 4216 : 35 : }
4217 : :
4218 : : /*
4219 : : * stream_open_file
4220 : : * Open a file that we'll use to serialize changes for a toplevel
4221 : : * transaction.
4222 : : *
4223 : : * Open a file for streamed changes from a toplevel transaction identified
4224 : : * by stream_xid (global variable). If it's the first chunk of streamed
4225 : : * changes for this transaction, create the buffile, otherwise open the
4226 : : * previously created file.
4227 : : */
4228 : : static void
4229 : 384 : stream_open_file(Oid subid, TransactionId xid, bool first_segment)
4230 : : {
4231 : : char path[MAXPGPATH];
4232 : : MemoryContext oldcxt;
4233 : :
4234 [ - + ]: 384 : Assert(OidIsValid(subid));
4235 [ - + ]: 384 : Assert(TransactionIdIsValid(xid));
4236 [ - + ]: 384 : Assert(stream_fd == NULL);
4237 : :
4238 : :
4239 : 384 : changes_filename(path, subid, xid);
4240 [ - + ]: 384 : elog(DEBUG1, "opening file \"%s\" for streamed changes", path);
4241 : :
4242 : : /*
4243 : : * Create/open the buffiles under the logical streaming context so that we
4244 : : * have those files until stream stop.
4245 : : */
4246 : 384 : oldcxt = MemoryContextSwitchTo(LogicalStreamingContext);
4247 : :
4248 : : /*
4249 : : * If this is the first streamed segment, create the changes file.
4250 : : * Otherwise, just open the file for writing, in append mode.
4251 : : */
4252 [ + + ]: 384 : if (first_segment)
955 4253 : 37 : stream_fd = BufFileCreateFileSet(MyLogicalRepWorker->stream_fileset,
4254 : : path);
4255 : : else
4256 : : {
4257 : : /*
4258 : : * Open the file and seek to the end of the file because we always
4259 : : * append the changes file.
4260 : : */
4261 : 347 : stream_fd = BufFileOpenFileSet(MyLogicalRepWorker->stream_fileset,
4262 : : path, O_RDWR, false);
1319 4263 : 347 : BufFileSeek(stream_fd, 0, 0, SEEK_END);
4264 : : }
4265 : :
4266 : 383 : MemoryContextSwitchTo(oldcxt);
4267 : 383 : }
4268 : :
4269 : : /*
4270 : : * stream_close_file
4271 : : * Close the currently open file with streamed changes.
4272 : : */
4273 : : static void
4274 : 417 : stream_close_file(void)
4275 : : {
4276 [ - + ]: 417 : Assert(stream_fd != NULL);
4277 : :
4278 : 417 : BufFileClose(stream_fd);
4279 : :
4280 : 417 : stream_fd = NULL;
4281 : 417 : }
4282 : :
4283 : : /*
4284 : : * stream_write_change
4285 : : * Serialize a change to a file for the current toplevel transaction.
4286 : : *
4287 : : * The change is serialized in a simple format, with length (not including
4288 : : * the length), action code (identifying the message type) and message
4289 : : * contents (without the subxact TransactionId value).
4290 : : */
4291 : : static void
4292 : 107579 : stream_write_change(char action, StringInfo s)
4293 : : {
4294 : : int len;
4295 : :
4296 [ - + ]: 107579 : Assert(stream_fd != NULL);
4297 : :
4298 : : /* total on-disk size, including the action type character */
4299 : 107579 : len = (s->len - s->cursor) + sizeof(char);
4300 : :
4301 : : /* first write the size */
4302 : 107579 : BufFileWrite(stream_fd, &len, sizeof(len));
4303 : :
4304 : : /* then the action */
4305 : 107579 : BufFileWrite(stream_fd, &action, sizeof(action));
4306 : :
4307 : : /* and finally the remaining part of the buffer (after the XID) */
4308 : 107579 : len = (s->len - s->cursor);
4309 : :
4310 : 107579 : BufFileWrite(stream_fd, &s->data[s->cursor], len);
4311 : 107579 : }
4312 : :
4313 : : /*
4314 : : * stream_open_and_write_change
4315 : : * Serialize a message to a file for the given transaction.
4316 : : *
4317 : : * This function is similar to stream_write_change except that it will open the
4318 : : * target file if not already before writing the message and close the file at
4319 : : * the end.
4320 : : */
4321 : : static void
461 4322 : 5 : stream_open_and_write_change(TransactionId xid, char action, StringInfo s)
4323 : : {
4324 [ - + ]: 5 : Assert(!in_streamed_transaction);
4325 : :
4326 [ + - ]: 5 : if (!stream_fd)
4327 : 5 : stream_start_internal(xid, false);
4328 : :
4329 : 5 : stream_write_change(action, s);
4330 : 5 : stream_stop_internal(xid);
4331 : 5 : }
4332 : :
4333 : : /*
4334 : : * Sets streaming options including replication slot name and origin start
4335 : : * position. Workers need these options for logical replication.
4336 : : */
4337 : : void
255 akapila@postgresql.o 4338 :GNC 333 : set_stream_options(WalRcvStreamOptions *options,
4339 : : char *slotname,
4340 : : XLogRecPtr *origin_startpos)
4341 : : {
4342 : : int server_version;
4343 : :
4344 : 333 : options->logical = true;
4345 : 333 : options->startpoint = *origin_startpos;
4346 : 333 : options->slotname = slotname;
4347 : :
4348 : 333 : server_version = walrcv_server_version(LogRepWorkerWalRcvConn);
4349 : 333 : options->proto.logical.proto_version =
4350 [ - + - - : 333 : server_version >= 160000 ? LOGICALREP_PROTO_STREAM_PARALLEL_VERSION_NUM :
- - ]
4351 : : server_version >= 150000 ? LOGICALREP_PROTO_TWOPHASE_VERSION_NUM :
4352 : : server_version >= 140000 ? LOGICALREP_PROTO_STREAM_VERSION_NUM :
4353 : : LOGICALREP_PROTO_VERSION_NUM;
4354 : :
4355 : 333 : options->proto.logical.publication_names = MySubscription->publications;
4356 : 333 : options->proto.logical.binary = MySubscription->binary;
4357 : :
4358 : : /*
4359 : : * Assign the appropriate option value for streaming option according to
4360 : : * the 'streaming' mode and the publisher's ability to support that mode.
4361 : : */
4362 [ + - ]: 333 : if (server_version >= 160000 &&
4363 [ + + ]: 333 : MySubscription->stream == LOGICALREP_STREAM_PARALLEL)
4364 : : {
4365 : 8 : options->proto.logical.streaming_str = "parallel";
4366 : 8 : MyLogicalRepWorker->parallel_apply = true;
4367 : : }
4368 [ + - ]: 325 : else if (server_version >= 140000 &&
4369 [ + + ]: 325 : MySubscription->stream != LOGICALREP_STREAM_OFF)
4370 : : {
4371 : 27 : options->proto.logical.streaming_str = "on";
4372 : 27 : MyLogicalRepWorker->parallel_apply = false;
4373 : : }
4374 : : else
4375 : : {
4376 : 298 : options->proto.logical.streaming_str = NULL;
4377 : 298 : MyLogicalRepWorker->parallel_apply = false;
4378 : : }
4379 : :
4380 : 333 : options->proto.logical.twophase = false;
4381 : 333 : options->proto.logical.origin = pstrdup(MySubscription->origin);
4382 : 333 : }
4383 : :
4384 : : /*
4385 : : * Cleanup the memory for subxacts and reset the related variables.
4386 : : */
4387 : : static inline void
1319 akapila@postgresql.o 4388 :CBC 396 : cleanup_subxact_info()
4389 : : {
4390 [ + + ]: 396 : if (subxact_data.subxacts)
4391 : 87 : pfree(subxact_data.subxacts);
4392 : :
4393 : 396 : subxact_data.subxacts = NULL;
4394 : 396 : subxact_data.subxact_last = InvalidTransactionId;
4395 : 396 : subxact_data.nsubxacts = 0;
4396 : 396 : subxact_data.nsubxacts_max = 0;
4397 : 396 : }
4398 : :
4399 : : /*
4400 : : * Form the prepared transaction GID for two_phase transactions.
4401 : : *
4402 : : * Return the GID in the supplied buffer.
4403 : : */
4404 : : static void
1005 4405 : 64 : TwoPhaseTransactionGid(Oid subid, TransactionId xid, char *gid, int szgid)
4406 : : {
4407 [ - + ]: 64 : Assert(subid != InvalidRepOriginId);
4408 : :
4409 [ - + ]: 64 : if (!TransactionIdIsValid(xid))
1005 akapila@postgresql.o 4410 [ # # ]:UBC 0 : ereport(ERROR,
4411 : : (errcode(ERRCODE_PROTOCOL_VIOLATION),
4412 : : errmsg_internal("invalid two-phase transaction ID")));
4413 : :
1005 akapila@postgresql.o 4414 :CBC 64 : snprintf(gid, szgid, "pg_gid_%u_%u", subid, xid);
4415 : 64 : }
4416 : :
4417 : : /*
4418 : : * Common function to run the apply loop with error handling. Disable the
4419 : : * subscription, if necessary.
4420 : : *
4421 : : * Note that we don't handle FATAL errors which are probably because
4422 : : * of system resource error and are not repeatable.
4423 : : */
4424 : : void
255 4425 : 333 : start_apply(XLogRecPtr origin_startpos)
4426 : : {
762 4427 [ + + ]: 333 : PG_TRY();
4428 : : {
255 4429 : 333 : LogicalRepApplyLoop(origin_startpos);
4430 : : }
762 4431 : 57 : PG_CATCH();
4432 : : {
4433 [ + + ]: 57 : if (MySubscription->disableonerr)
4434 : 3 : DisableSubscriptionAndExit();
4435 : : else
4436 : : {
4437 : : /*
4438 : : * Report the worker failed while applying changes. Abort the
4439 : : * current transaction so that the stats message is sent in an
4440 : : * idle state.
4441 : : */
4442 : 54 : AbortOutOfAnyTransaction();
255 4443 : 54 : pgstat_report_subscription_error(MySubscription->oid, !am_tablesync_worker());
4444 : :
762 4445 : 54 : PG_RE_THROW();
4446 : : }
4447 : : }
762 akapila@postgresql.o 4448 [ # # ]:UBC 0 : PG_END_TRY();
4449 : 0 : }
4450 : :
4451 : : /*
4452 : : * Runs the leader apply worker.
4453 : : *
4454 : : * It sets up replication origin, streaming options and then starts streaming.
4455 : : */
4456 : : static void
255 akapila@postgresql.o 4457 :GNC 266 : run_apply_worker()
4458 : : {
4459 : : char originname[NAMEDATALEN];
4460 : 266 : XLogRecPtr origin_startpos = InvalidXLogRecPtr;
4461 : 266 : char *slotname = NULL;
4462 : : WalRcvStreamOptions options;
4463 : : RepOriginId originid;
4464 : : TimeLineID startpointTLI;
4465 : : char *err;
4466 : : bool must_use_password;
4467 : :
4468 : 266 : slotname = MySubscription->slotname;
4469 : :
4470 : : /*
4471 : : * This shouldn't happen if the subscription is enabled, but guard against
4472 : : * DDL bugs or manual catalog changes. (libpqwalreceiver will crash if
4473 : : * slot is NULL.)
4474 : : */
4475 [ - + ]: 266 : if (!slotname)
255 akapila@postgresql.o 4476 [ # # ]:UNC 0 : ereport(ERROR,
4477 : : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
4478 : : errmsg("subscription has no replication slot set")));
4479 : :
4480 : : /* Setup replication origin tracking. */
255 akapila@postgresql.o 4481 :GNC 266 : ReplicationOriginNameForLogicalRep(MySubscription->oid, InvalidOid,
4482 : : originname, sizeof(originname));
4483 : 266 : StartTransactionCommand();
4484 : 266 : originid = replorigin_by_name(originname, true);
4485 [ - + ]: 266 : if (!OidIsValid(originid))
255 akapila@postgresql.o 4486 :UNC 0 : originid = replorigin_create(originname);
255 akapila@postgresql.o 4487 :GNC 266 : replorigin_session_setup(originid, 0);
4488 : 266 : replorigin_session_origin = originid;
4489 : 266 : origin_startpos = replorigin_session_get_progress(false);
180 4490 : 266 : CommitTransactionCommand();
4491 : :
4492 : : /* Is the use of a password mandatory? */
255 4493 [ + + ]: 509 : must_use_password = MySubscription->passwordrequired &&
180 4494 [ + + ]: 243 : !MySubscription->ownersuperuser;
4495 : :
255 4496 : 266 : LogRepWorkerWalRcvConn = walrcv_connect(MySubscription->conninfo, true,
4497 : : true, must_use_password,
4498 : : MySubscription->name, &err);
4499 : :
4500 [ + + ]: 251 : if (LogRepWorkerWalRcvConn == NULL)
4501 [ + - ]: 78 : ereport(ERROR,
4502 : : (errcode(ERRCODE_CONNECTION_FAILURE),
4503 : : errmsg("could not connect to the publisher: %s", err)));
4504 : :
4505 : : /*
4506 : : * We don't really use the output identify_system for anything but it does
4507 : : * some initializations on the upstream so let's still call it.
4508 : : */
4509 : 173 : (void) walrcv_identify_system(LogRepWorkerWalRcvConn, &startpointTLI);
4510 : :
4511 : 173 : set_apply_error_context_origin(originname);
4512 : :
4513 : 173 : set_stream_options(&options, slotname, &origin_startpos);
4514 : :
4515 : : /*
4516 : : * Even when the two_phase mode is requested by the user, it remains as
4517 : : * the tri-state PENDING until all tablesyncs have reached READY state.
4518 : : * Only then, can it become ENABLED.
4519 : : *
4520 : : * Note: If the subscription has no tables then leave the state as
4521 : : * PENDING, which allows ALTER SUBSCRIPTION ... REFRESH PUBLICATION to
4522 : : * work.
4523 : : */
4524 [ + + + + ]: 186 : if (MySubscription->twophasestate == LOGICALREP_TWOPHASE_STATE_PENDING &&
4525 : 13 : AllTablesyncsReady())
4526 : : {
4527 : : /* Start streaming with two_phase enabled */
4528 : 6 : options.proto.logical.twophase = true;
4529 : 6 : walrcv_startstreaming(LogRepWorkerWalRcvConn, &options);
4530 : :
4531 : 6 : StartTransactionCommand();
4532 : 6 : UpdateTwoPhaseState(MySubscription->oid, LOGICALREP_TWOPHASE_STATE_ENABLED);
4533 : 6 : MySubscription->twophasestate = LOGICALREP_TWOPHASE_STATE_ENABLED;
4534 : 6 : CommitTransactionCommand();
4535 : : }
4536 : : else
4537 : : {
4538 : 167 : walrcv_startstreaming(LogRepWorkerWalRcvConn, &options);
4539 : : }
4540 : :
4541 [ + + + + : 173 : ereport(DEBUG1,
+ - + - ]
4542 : : (errmsg_internal("logical replication apply worker for subscription \"%s\" two_phase is %s",
4543 : : MySubscription->name,
4544 : : MySubscription->twophasestate == LOGICALREP_TWOPHASE_STATE_DISABLED ? "DISABLED" :
4545 : : MySubscription->twophasestate == LOGICALREP_TWOPHASE_STATE_PENDING ? "PENDING" :
4546 : : MySubscription->twophasestate == LOGICALREP_TWOPHASE_STATE_ENABLED ? "ENABLED" :
4547 : : "?")));
4548 : :
4549 : : /* Run the main loop. */
4550 : 173 : start_apply(origin_startpos);
762 akapila@postgresql.o 4551 :UNC 0 : }
4552 : :
4553 : : /*
4554 : : * Common initialization for leader apply worker, parallel apply worker and
4555 : : * tablesync worker.
4556 : : *
4557 : : * Initialize the database connection, in-memory subscription and necessary
4558 : : * config options.
4559 : : */
4560 : : void
255 akapila@postgresql.o 4561 :GNC 451 : InitializeLogRepWorker(void)
4562 : : {
4563 : : MemoryContext oldctx;
4564 : :
4565 : : /* Run as replica session replication role. */
2642 peter_e@gmx.net 4566 :CBC 451 : SetConfigOption("session_replication_role", "replica",
4567 : : PGC_SUSET, PGC_S_OVERRIDE);
4568 : :
4569 : : /* Connect to our database. */
4570 : 451 : BackgroundWorkerInitializeConnectionByOid(MyLogicalRepWorker->dbid,
2201 magnus@hagander.net 4571 : 451 : MyLogicalRepWorker->userid,
4572 : : 0);
4573 : :
4574 : : /*
4575 : : * Set always-secure search path, so malicious users can't redirect user
4576 : : * code (e.g. pg_index.indexprs).
4577 : : */
1343 noah@leadboat.com 4578 : 446 : SetConfigOption("search_path", "", PGC_SUSET, PGC_S_OVERRIDE);
4579 : :
4580 : : /* Load the subscription into persistent memory context. */
2532 peter_e@gmx.net 4581 : 446 : ApplyContext = AllocSetContextCreate(TopMemoryContext,
4582 : : "ApplyContext",
4583 : : ALLOCSET_DEFAULT_SIZES);
2642 4584 : 446 : StartTransactionCommand();
2532 4585 : 446 : oldctx = MemoryContextSwitchTo(ApplyContext);
4586 : :
2200 4587 : 446 : MySubscription = GetSubscription(MyLogicalRepWorker->subid, true);
4588 [ - + ]: 446 : if (!MySubscription)
4589 : : {
2200 peter_e@gmx.net 4590 [ # # ]:UBC 0 : ereport(LOG,
4591 : : (errmsg("logical replication worker for subscription %u will not start because the subscription was removed during startup",
4592 : : MyLogicalRepWorker->subid)));
4593 : :
4594 : : /* Ensure we remove no-longer-useful entry for worker's start time */
254 akapila@postgresql.o 4595 [ # # ]:UNC 0 : if (am_leader_apply_worker())
448 tgl@sss.pgh.pa.us 4596 :UBC 0 : ApplyLauncherForgetWorkerStartTime(MyLogicalRepWorker->subid);
4597 : :
2200 peter_e@gmx.net 4598 : 0 : proc_exit(0);
4599 : : }
4600 : :
2642 peter_e@gmx.net 4601 :CBC 446 : MySubscriptionValid = true;
4602 : 446 : MemoryContextSwitchTo(oldctx);
4603 : :
4604 [ - + ]: 446 : if (!MySubscription->enabled)
4605 : : {
2642 peter_e@gmx.net 4606 [ # # ]:UBC 0 : ereport(LOG,
4607 : : (errmsg("logical replication worker for subscription \"%s\" will not start because the subscription was disabled during startup",
4608 : : MySubscription->name)));
4609 : :
461 akapila@postgresql.o 4610 : 0 : apply_worker_exit();
4611 : : }
4612 : :
4613 : : /* Setup synchronous commit according to the user's wishes */
2200 peter_e@gmx.net 4614 :CBC 446 : SetConfigOption("synchronous_commit", MySubscription->synccommit,
4615 : : PGC_BACKEND, PGC_S_OVERRIDE);
4616 : :
4617 : : /*
4618 : : * Keep us informed about subscription or role changes. Note that the
4619 : : * role's superuser privilege can be revoked.
4620 : : */
2642 4621 : 446 : CacheRegisterSyscacheCallback(SUBSCRIPTIONOID,
4622 : : subscription_change_cb,
4623 : : (Datum) 0);
4624 : :
180 akapila@postgresql.o 4625 :GNC 446 : CacheRegisterSyscacheCallback(AUTHOID,
4626 : : subscription_change_cb,
4627 : : (Datum) 0);
4628 : :
2579 peter_e@gmx.net 4629 [ + + ]:CBC 446 : if (am_tablesync_worker())
2517 4630 [ + - ]: 169 : ereport(LOG,
4631 : : (errmsg("logical replication table synchronization worker for subscription \"%s\", table \"%s\" has started",
4632 : : MySubscription->name,
4633 : : get_rel_name(MyLogicalRepWorker->relid))));
4634 : : else
4635 [ + - ]: 277 : ereport(LOG,
4636 : : (errmsg("logical replication apply worker for subscription \"%s\" has started",
4637 : : MySubscription->name)));
4638 : :
2642 4639 : 446 : CommitTransactionCommand();
461 akapila@postgresql.o 4640 : 446 : }
4641 : :
4642 : : /* Common function to setup the leader apply or tablesync worker. */
4643 : : void
255 akapila@postgresql.o 4644 :GNC 441 : SetupApplyOrSyncWorker(int worker_slot)
4645 : : {
4646 : : /* Attach to slot */
461 akapila@postgresql.o 4647 :CBC 441 : logicalrep_worker_attach(worker_slot);
4648 : :
255 akapila@postgresql.o 4649 [ + + - + ]:GNC 441 : Assert(am_tablesync_worker() || am_leader_apply_worker());
4650 : :
4651 : : /* Setup signal handling */
461 akapila@postgresql.o 4652 :CBC 441 : pqsignal(SIGHUP, SignalHandlerForConfigReload);
4653 : 441 : pqsignal(SIGTERM, die);
4654 : 441 : BackgroundWorkerUnblockSignals();
4655 : :
4656 : : /*
4657 : : * We don't currently need any ResourceOwner in a walreceiver process, but
4658 : : * if we did, we could call CreateAuxProcessResourceOwner here.
4659 : : */
4660 : :
4661 : : /* Initialise stats to a sanish value */
4662 : 441 : MyLogicalRepWorker->last_send_time = MyLogicalRepWorker->last_recv_time =
4663 : 441 : MyLogicalRepWorker->reply_time = GetCurrentTimestamp();
4664 : :
4665 : : /* Load the libpq-specific functions */
4666 : 441 : load_file("libpqwalreceiver", false);
4667 : :
255 akapila@postgresql.o 4668 :GNC 440 : InitializeLogRepWorker();
4669 : :
4670 : : /* Connect to the origin and start the replication. */
2642 peter_e@gmx.net 4671 [ + + ]:CBC 435 : elog(DEBUG1, "connecting to publisher using connection string \"%s\"",
4672 : : MySubscription->conninfo);
4673 : :
4674 : : /*
4675 : : * Setup callback for syscache so that we know when something changes in
4676 : : * the subscription relation state.
4677 : : */
2579 4678 : 435 : CacheRegisterSyscacheCallback(SUBSCRIPTIONRELMAP,
4679 : : invalidate_syncing_table_states,
4680 : : (Datum) 0);
255 akapila@postgresql.o 4681 :GNC 435 : }
4682 : :
4683 : : /* Logical Replication Apply worker entry point */
4684 : : void
4685 : 272 : ApplyWorkerMain(Datum main_arg)
4686 : : {
4687 : 272 : int worker_slot = DatumGetInt32(main_arg);
4688 : :
4689 : 272 : InitializingApplyWorker = true;
4690 : :
4691 : 272 : SetupApplyOrSyncWorker(worker_slot);
4692 : :
4693 : 266 : InitializingApplyWorker = false;
4694 : :
4695 : 266 : run_apply_worker();
4696 : :
762 akapila@postgresql.o 4697 :UBC 0 : proc_exit(0);
4698 : : }
4699 : :
4700 : : /*
4701 : : * After error recovery, disable the subscription in a new transaction
4702 : : * and exit cleanly.
4703 : : */
4704 : : void
762 akapila@postgresql.o 4705 :CBC 4 : DisableSubscriptionAndExit(void)
4706 : : {
4707 : : /*
4708 : : * Emit the error message, and recover from the error state to an idle
4709 : : * state
4710 : : */
4711 : 4 : HOLD_INTERRUPTS();
4712 : :
4713 : 4 : EmitErrorReport();
4714 : 4 : AbortOutOfAnyTransaction();
4715 : 4 : FlushErrorState();
4716 : :
4717 [ - + ]: 4 : RESUME_INTERRUPTS();
4718 : :
4719 : : /* Report the worker failed during either table synchronization or apply */
4720 : 4 : pgstat_report_subscription_error(MyLogicalRepWorker->subid,
4721 : 4 : !am_tablesync_worker());
4722 : :
4723 : : /* Disable the subscription */
4724 : 4 : StartTransactionCommand();
4725 : 4 : DisableSubscription(MySubscription->oid);
4726 : 4 : CommitTransactionCommand();
4727 : :
4728 : : /* Ensure we remove no-longer-useful entry for worker's start time */
254 akapila@postgresql.o 4729 [ + + ]:GNC 4 : if (am_leader_apply_worker())
448 tgl@sss.pgh.pa.us 4730 :CBC 3 : ApplyLauncherForgetWorkerStartTime(MyLogicalRepWorker->subid);
4731 : :
4732 : : /* Notify the subscription has been disabled and exit */
762 akapila@postgresql.o 4733 [ + - ]: 4 : ereport(LOG,
4734 : : errmsg("subscription \"%s\" has been disabled because of an error",
4735 : : MySubscription->name));
4736 : :
2642 peter_e@gmx.net 4737 : 4 : proc_exit(0);
4738 : : }
4739 : :
4740 : : /*
4741 : : * Is current process a logical replication worker?
4742 : : */
4743 : : bool
2508 4744 : 1786 : IsLogicalWorker(void)
4745 : : {
4746 : 1786 : return MyLogicalRepWorker != NULL;
4747 : : }
4748 : :
4749 : : /*
4750 : : * Is current process a logical replication parallel apply worker?
4751 : : */
4752 : : bool
461 akapila@postgresql.o 4753 : 1335 : IsLogicalParallelApplyWorker(void)
4754 : : {
4755 [ + + + - ]: 1335 : return IsLogicalWorker() && am_parallel_apply_worker();
4756 : : }
4757 : :
4758 : : /*
4759 : : * Start skipping changes of the transaction if the given LSN matches the
4760 : : * LSN specified by subscription's skiplsn.
4761 : : */
4762 : : static void
754 4763 : 462 : maybe_start_skipping_changes(XLogRecPtr finish_lsn)
4764 : : {
4765 [ - + ]: 462 : Assert(!is_skipping_changes());
4766 [ - + ]: 462 : Assert(!in_remote_transaction);
4767 [ - + ]: 462 : Assert(!in_streamed_transaction);
4768 : :
4769 : : /*
4770 : : * Quick return if it's not requested to skip this transaction. This
4771 : : * function is called for every remote transaction and we assume that
4772 : : * skipping the transaction is not used often.
4773 : : */
4774 [ + + - + : 462 : if (likely(XLogRecPtrIsInvalid(MySubscription->skiplsn) ||
+ + ]
4775 : : MySubscription->skiplsn != finish_lsn))
4776 : 459 : return;
4777 : :
4778 : : /* Start skipping all changes of this transaction */
4779 : 3 : skip_xact_finish_lsn = finish_lsn;
4780 : :
4781 [ + - ]: 3 : ereport(LOG,
4782 : : errmsg("logical replication starts skipping transaction at LSN %X/%X",
4783 : : LSN_FORMAT_ARGS(skip_xact_finish_lsn)));
4784 : : }
4785 : :
4786 : : /*
4787 : : * Stop skipping changes by resetting skip_xact_finish_lsn if enabled.
4788 : : */
4789 : : static void
4790 : 40 : stop_skipping_changes(void)
4791 : : {
4792 [ + + ]: 40 : if (!is_skipping_changes())
4793 : 37 : return;
4794 : :
4795 [ + - ]: 3 : ereport(LOG,
4796 : : (errmsg("logical replication completed skipping transaction at LSN %X/%X",
4797 : : LSN_FORMAT_ARGS(skip_xact_finish_lsn))));
4798 : :
4799 : : /* Stop skipping changes */
4800 : 3 : skip_xact_finish_lsn = InvalidXLogRecPtr;
4801 : : }
4802 : :
4803 : : /*
4804 : : * Clear subskiplsn of pg_subscription catalog.
4805 : : *
4806 : : * finish_lsn is the transaction's finish LSN that is used to check if the
4807 : : * subskiplsn matches it. If not matched, we raise a warning when clearing the
4808 : : * subskiplsn in order to inform users for cases e.g., where the user mistakenly
4809 : : * specified the wrong subskiplsn.
4810 : : */
4811 : : static void
4812 : 494 : clear_subscription_skip_lsn(XLogRecPtr finish_lsn)
4813 : : {
4814 : : Relation rel;
4815 : : Form_pg_subscription subform;
4816 : : HeapTuple tup;
4817 : 494 : XLogRecPtr myskiplsn = MySubscription->skiplsn;
4818 : 494 : bool started_tx = false;
4819 : :
461 4820 [ + + - + ]: 494 : if (likely(XLogRecPtrIsInvalid(myskiplsn)) || am_parallel_apply_worker())
754 4821 : 491 : return;
4822 : :
4823 [ + + ]: 3 : if (!IsTransactionState())
4824 : : {
4825 : 1 : StartTransactionCommand();
4826 : 1 : started_tx = true;
4827 : : }
4828 : :
4829 : : /*
4830 : : * Protect subskiplsn of pg_subscription from being concurrently updated
4831 : : * while clearing it.
4832 : : */
4833 : 3 : LockSharedObject(SubscriptionRelationId, MySubscription->oid, 0,
4834 : : AccessShareLock);
4835 : :
4836 : 3 : rel = table_open(SubscriptionRelationId, RowExclusiveLock);
4837 : :
4838 : : /* Fetch the existing tuple. */
4839 : 3 : tup = SearchSysCacheCopy1(SUBSCRIPTIONOID,
4840 : : ObjectIdGetDatum(MySubscription->oid));
4841 : :
4842 [ - + ]: 3 : if (!HeapTupleIsValid(tup))
754 akapila@postgresql.o 4843 [ # # ]:UBC 0 : elog(ERROR, "subscription \"%s\" does not exist", MySubscription->name);
4844 : :
754 akapila@postgresql.o 4845 :CBC 3 : subform = (Form_pg_subscription) GETSTRUCT(tup);
4846 : :
4847 : : /*
4848 : : * Clear the subskiplsn. If the user has already changed subskiplsn before
4849 : : * clearing it we don't update the catalog and the replication origin
4850 : : * state won't get advanced. So in the worst case, if the server crashes
4851 : : * before sending an acknowledgment of the flush position the transaction
4852 : : * will be sent again and the user needs to set subskiplsn again. We can
4853 : : * reduce the possibility by logging a replication origin WAL record to
4854 : : * advance the origin LSN instead but there is no way to advance the
4855 : : * origin timestamp and it doesn't seem to be worth doing anything about
4856 : : * it since it's a very rare case.
4857 : : */
4858 [ + - ]: 3 : if (subform->subskiplsn == myskiplsn)
4859 : : {
4860 : : bool nulls[Natts_pg_subscription];
4861 : : bool replaces[Natts_pg_subscription];
4862 : : Datum values[Natts_pg_subscription];
4863 : :
4864 : 3 : memset(values, 0, sizeof(values));
4865 : 3 : memset(nulls, false, sizeof(nulls));
4866 : 3 : memset(replaces, false, sizeof(replaces));
4867 : :
4868 : : /* reset subskiplsn */
4869 : 3 : values[Anum_pg_subscription_subskiplsn - 1] = LSNGetDatum(InvalidXLogRecPtr);
4870 : 3 : replaces[Anum_pg_subscription_subskiplsn - 1] = true;
4871 : :
4872 : 3 : tup = heap_modify_tuple(tup, RelationGetDescr(rel), values, nulls,
4873 : : replaces);
4874 : 3 : CatalogTupleUpdate(rel, &tup->t_self, tup);
4875 : :
4876 [ - + ]: 3 : if (myskiplsn != finish_lsn)
754 akapila@postgresql.o 4877 [ # # ]:UBC 0 : ereport(WARNING,
4878 : : errmsg("skip-LSN of subscription \"%s\" cleared", MySubscription->name),
4879 : : errdetail("Remote transaction's finish WAL location (LSN) %X/%X did not match skip-LSN %X/%X.",
4880 : : LSN_FORMAT_ARGS(finish_lsn),
4881 : : LSN_FORMAT_ARGS(myskiplsn)));
4882 : : }
4883 : :
754 akapila@postgresql.o 4884 :CBC 3 : heap_freetuple(tup);
4885 : 3 : table_close(rel, NoLock);
4886 : :
4887 [ + + ]: 3 : if (started_tx)
4888 : 1 : CommitTransactionCommand();
4889 : : }
4890 : :
4891 : : /* Error callback to give more context info about the change being applied */
4892 : : void
961 4893 : 680 : apply_error_callback(void *arg)
4894 : : {
4895 : 680 : ApplyErrorCallbackArg *errarg = &apply_error_callback_arg;
4896 : :
4897 [ + + ]: 680 : if (apply_error_callback_arg.command == 0)
4898 : 304 : return;
4899 : :
768 4900 [ - + ]: 376 : Assert(errarg->origin_name);
4901 : :
769 4902 [ + + ]: 376 : if (errarg->rel == NULL)
4903 : : {
4904 [ - + ]: 347 : if (!TransactionIdIsValid(errarg->remote_xid))
568 peter@eisentraut.org 4905 :UBC 0 : errcontext("processing remote data for replication origin \"%s\" during message type \"%s\"",
4906 : : errarg->origin_name,
4907 : : logicalrep_message_type(errarg->command));
768 akapila@postgresql.o 4908 [ + + ]:CBC 347 : else if (XLogRecPtrIsInvalid(errarg->finish_lsn))
568 peter@eisentraut.org 4909 : 287 : errcontext("processing remote data for replication origin \"%s\" during message type \"%s\" in transaction %u",
4910 : : errarg->origin_name,
4911 : : logicalrep_message_type(errarg->command),
4912 : : errarg->remote_xid);
4913 : : else
4914 : 120 : errcontext("processing remote data for replication origin \"%s\" during message type \"%s\" in transaction %u, finished at %X/%X",
4915 : : errarg->origin_name,
4916 : : logicalrep_message_type(errarg->command),
4917 : : errarg->remote_xid,
768 akapila@postgresql.o 4918 : 60 : LSN_FORMAT_ARGS(errarg->finish_lsn));
4919 : : }
4920 : : else
4921 : : {
461 4922 [ + - ]: 29 : if (errarg->remote_attnum < 0)
4923 : : {
4924 [ - + ]: 29 : if (XLogRecPtrIsInvalid(errarg->finish_lsn))
461 akapila@postgresql.o 4925 :UBC 0 : errcontext("processing remote data for replication origin \"%s\" during message type \"%s\" for replication target relation \"%s.%s\" in transaction %u",
4926 : : errarg->origin_name,
4927 : : logicalrep_message_type(errarg->command),
4928 : 0 : errarg->rel->remoterel.nspname,
4929 : 0 : errarg->rel->remoterel.relname,
4930 : : errarg->remote_xid);
4931 : : else
461 akapila@postgresql.o 4932 :CBC 58 : errcontext("processing remote data for replication origin \"%s\" during message type \"%s\" for replication target relation \"%s.%s\" in transaction %u, finished at %X/%X",
4933 : : errarg->origin_name,
4934 : : logicalrep_message_type(errarg->command),
4935 : 29 : errarg->rel->remoterel.nspname,
4936 : 29 : errarg->rel->remoterel.relname,
4937 : : errarg->remote_xid,
4938 : 29 : LSN_FORMAT_ARGS(errarg->finish_lsn));
4939 : : }
4940 : : else
4941 : : {
461 akapila@postgresql.o 4942 [ # # ]:UBC 0 : if (XLogRecPtrIsInvalid(errarg->finish_lsn))
4943 : 0 : errcontext("processing remote data for replication origin \"%s\" during message type \"%s\" for replication target relation \"%s.%s\" column \"%s\" in transaction %u",
4944 : : errarg->origin_name,
4945 : : logicalrep_message_type(errarg->command),
4946 : 0 : errarg->rel->remoterel.nspname,
4947 : 0 : errarg->rel->remoterel.relname,
4948 : 0 : errarg->rel->remoterel.attnames[errarg->remote_attnum],
4949 : : errarg->remote_xid);
4950 : : else
4951 : 0 : errcontext("processing remote data for replication origin \"%s\" during message type \"%s\" for replication target relation \"%s.%s\" column \"%s\" in transaction %u, finished at %X/%X",
4952 : : errarg->origin_name,
4953 : : logicalrep_message_type(errarg->command),
4954 : 0 : errarg->rel->remoterel.nspname,
4955 : 0 : errarg->rel->remoterel.relname,
4956 : 0 : errarg->rel->remoterel.attnames[errarg->remote_attnum],
4957 : : errarg->remote_xid,
4958 : 0 : LSN_FORMAT_ARGS(errarg->finish_lsn));
4959 : : }
4960 : : }
4961 : : }
4962 : :
4963 : : /* Set transaction information of apply error callback */
4964 : : static inline void
768 akapila@postgresql.o 4965 :CBC 2965 : set_apply_error_context_xact(TransactionId xid, XLogRecPtr lsn)
4966 : : {
961 4967 : 2965 : apply_error_callback_arg.remote_xid = xid;
768 4968 : 2965 : apply_error_callback_arg.finish_lsn = lsn;
961 4969 : 2965 : }
4970 : :
4971 : : /* Reset all information of apply error callback */
4972 : : static inline void
4973 : 1464 : reset_apply_error_context_info(void)
4974 : : {
4975 : 1464 : apply_error_callback_arg.command = 0;
4976 : 1464 : apply_error_callback_arg.rel = NULL;
4977 : 1464 : apply_error_callback_arg.remote_attnum = -1;
768 4978 : 1464 : set_apply_error_context_xact(InvalidTransactionId, InvalidXLogRecPtr);
961 4979 : 1464 : }
4980 : :
4981 : : /*
4982 : : * Request wakeup of the workers for the given subscription OID
4983 : : * at commit of the current transaction.
4984 : : *
4985 : : * This is used to ensure that the workers process assorted changes
4986 : : * as soon as possible.
4987 : : */
4988 : : void
464 tgl@sss.pgh.pa.us 4989 : 179 : LogicalRepWorkersWakeupAtCommit(Oid subid)
4990 : : {
4991 : : MemoryContext oldcxt;
4992 : :
4993 : 179 : oldcxt = MemoryContextSwitchTo(TopTransactionContext);
4994 : 179 : on_commit_wakeup_workers_subids =
4995 : 179 : list_append_unique_oid(on_commit_wakeup_workers_subids, subid);
4996 : 179 : MemoryContextSwitchTo(oldcxt);
4997 : 179 : }
4998 : :
4999 : : /*
5000 : : * Wake up the workers of any subscriptions that were changed in this xact.
5001 : : */
5002 : : void
5003 : 432906 : AtEOXact_LogicalRepWorkers(bool isCommit)
5004 : : {
5005 [ + + + + ]: 432906 : if (isCommit && on_commit_wakeup_workers_subids != NIL)
5006 : : {
5007 : : ListCell *lc;
5008 : :
5009 : 174 : LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
5010 [ + - + + : 348 : foreach(lc, on_commit_wakeup_workers_subids)
+ + ]
5011 : : {
5012 : 174 : Oid subid = lfirst_oid(lc);
5013 : : List *workers;
5014 : : ListCell *lc2;
5015 : :
5016 : 174 : workers = logicalrep_workers_find(subid, true);
5017 [ + + + + : 227 : foreach(lc2, workers)
+ + ]
5018 : : {
5019 : 53 : LogicalRepWorker *worker = (LogicalRepWorker *) lfirst(lc2);
5020 : :
5021 : 53 : logicalrep_worker_wakeup_ptr(worker);
5022 : : }
5023 : : }
5024 : 174 : LWLockRelease(LogicalRepWorkerLock);
5025 : : }
5026 : :
5027 : : /* The List storage will be reclaimed automatically in xact cleanup. */
5028 : 432906 : on_commit_wakeup_workers_subids = NIL;
5029 : 432906 : }
5030 : :
5031 : : /*
5032 : : * Allocate the origin name in long-lived context for error context message.
5033 : : */
5034 : : void
461 akapila@postgresql.o 5035 : 344 : set_apply_error_context_origin(char *originname)
5036 : : {
5037 : 344 : apply_error_callback_arg.origin_name = MemoryContextStrdup(ApplyContext,
5038 : : originname);
5039 : 344 : }
5040 : :
5041 : : /*
5042 : : * Return the action to be taken for the given transaction. See
5043 : : * TransApplyAction for information on each of the actions.
5044 : : *
5045 : : * *winfo is assigned to the destination parallel worker info when the leader
5046 : : * apply worker has to pass all the transaction's changes to the parallel
5047 : : * apply worker.
5048 : : */
5049 : : static TransApplyAction
5050 : 329380 : get_transaction_apply_action(TransactionId xid, ParallelApplyWorkerInfo **winfo)
5051 : : {
5052 : 329380 : *winfo = NULL;
5053 : :
5054 [ + + ]: 329380 : if (am_parallel_apply_worker())
5055 : : {
5056 : 68996 : return TRANS_PARALLEL_APPLY;
5057 : : }
5058 : :
5059 : : /*
5060 : : * If we are processing this transaction using a parallel apply worker
5061 : : * then either we send the changes to the parallel worker or if the worker
5062 : : * is busy then serialize the changes to the file which will later be
5063 : : * processed by the parallel worker.
5064 : : */
5065 : 260384 : *winfo = pa_find_worker(xid);
5066 : :
453 5067 [ + + + + ]: 260384 : if (*winfo && (*winfo)->serialize_changes)
5068 : : {
5069 : 5037 : return TRANS_LEADER_PARTIAL_SERIALIZE;
5070 : : }
5071 [ + + ]: 255347 : else if (*winfo)
5072 : : {
5073 : 68980 : return TRANS_LEADER_SEND_TO_PARALLEL;
5074 : : }
5075 : :
5076 : : /*
5077 : : * If there is no parallel worker involved to process this transaction
5078 : : * then we either directly apply the change or serialize it to a file
5079 : : * which will later be applied when the transaction finish message is
5080 : : * processed.
5081 : : */
5082 [ + + ]: 186367 : else if (in_streamed_transaction)
5083 : : {
5084 : 103265 : return TRANS_LEADER_SERIALIZE;
5085 : : }
5086 : : else
5087 : : {
5088 : 83102 : return TRANS_LEADER_APPLY;
5089 : : }
5090 : : }
|