Age Owner Branch data TLA Line data Source code
1 : : /*-------------------------------------------------------------------------
2 : : * logical.c
3 : : * PostgreSQL logical decoding coordination
4 : : *
5 : : * Copyright (c) 2012-2024, PostgreSQL Global Development Group
6 : : *
7 : : * IDENTIFICATION
8 : : * src/backend/replication/logical/logical.c
9 : : *
10 : : * NOTES
11 : : * This file coordinates interaction between the various modules that
12 : : * together provide logical decoding, primarily by providing so
13 : : * called LogicalDecodingContexts. The goal is to encapsulate most of the
14 : : * internal complexity for consumers of logical decoding, so they can
15 : : * create and consume a changestream with a low amount of code. Builtin
16 : : * consumers are the walsender and SQL SRF interface, but it's possible to
17 : : * add further ones without changing core code, e.g. to consume changes in
18 : : * a bgworker.
19 : : *
20 : : * The idea is that a consumer provides three callbacks, one to read WAL,
21 : : * one to prepare a data write, and a final one for actually writing since
22 : : * their implementation depends on the type of consumer. Check
23 : : * logicalfuncs.c for an example implementation of a fairly simple consumer
24 : : * and an implementation of a WAL reading callback that's suitable for
25 : : * simple consumers.
26 : : *-------------------------------------------------------------------------
27 : : */
28 : :
29 : : #include "postgres.h"
30 : :
31 : : #include "access/xact.h"
32 : : #include "access/xlogutils.h"
33 : : #include "fmgr.h"
34 : : #include "miscadmin.h"
35 : : #include "pgstat.h"
36 : : #include "replication/decode.h"
37 : : #include "replication/logical.h"
38 : : #include "replication/reorderbuffer.h"
39 : : #include "replication/slotsync.h"
40 : : #include "replication/snapbuild.h"
41 : : #include "storage/proc.h"
42 : : #include "storage/procarray.h"
43 : : #include "utils/builtins.h"
44 : : #include "utils/inval.h"
45 : : #include "utils/memutils.h"
46 : :
47 : : /* data for errcontext callback */
48 : : typedef struct LogicalErrorCallbackState
49 : : {
50 : : LogicalDecodingContext *ctx;
51 : : const char *callback_name;
52 : : XLogRecPtr report_location;
53 : : } LogicalErrorCallbackState;
54 : :
55 : : /* wrappers around output plugin callbacks */
56 : : static void output_plugin_error_callback(void *arg);
57 : : static void startup_cb_wrapper(LogicalDecodingContext *ctx, OutputPluginOptions *opt,
58 : : bool is_init);
59 : : static void shutdown_cb_wrapper(LogicalDecodingContext *ctx);
60 : : static void begin_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn);
61 : : static void commit_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
62 : : XLogRecPtr commit_lsn);
63 : : static void begin_prepare_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn);
64 : : static void prepare_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
65 : : XLogRecPtr prepare_lsn);
66 : : static void commit_prepared_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
67 : : XLogRecPtr commit_lsn);
68 : : static void rollback_prepared_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
69 : : XLogRecPtr prepare_end_lsn, TimestampTz prepare_time);
70 : : static void change_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
71 : : Relation relation, ReorderBufferChange *change);
72 : : static void truncate_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
73 : : int nrelations, Relation relations[], ReorderBufferChange *change);
74 : : static void message_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
75 : : XLogRecPtr message_lsn, bool transactional,
76 : : const char *prefix, Size message_size, const char *message);
77 : :
78 : : /* streaming callbacks */
79 : : static void stream_start_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
80 : : XLogRecPtr first_lsn);
81 : : static void stream_stop_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
82 : : XLogRecPtr last_lsn);
83 : : static void stream_abort_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
84 : : XLogRecPtr abort_lsn);
85 : : static void stream_prepare_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
86 : : XLogRecPtr prepare_lsn);
87 : : static void stream_commit_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
88 : : XLogRecPtr commit_lsn);
89 : : static void stream_change_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
90 : : Relation relation, ReorderBufferChange *change);
91 : : static void stream_message_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
92 : : XLogRecPtr message_lsn, bool transactional,
93 : : const char *prefix, Size message_size, const char *message);
94 : : static void stream_truncate_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
95 : : int nrelations, Relation relations[], ReorderBufferChange *change);
96 : :
97 : : /* callback to update txn's progress */
98 : : static void update_progress_txn_cb_wrapper(ReorderBuffer *cache,
99 : : ReorderBufferTXN *txn,
100 : : XLogRecPtr lsn);
101 : :
102 : : static void LoadOutputPlugin(OutputPluginCallbacks *callbacks, const char *plugin);
103 : :
104 : : /*
105 : : * Make sure the current settings & environment are capable of doing logical
106 : : * decoding.
107 : : */
108 : : void
3695 rhaas@postgresql.org 109 :CBC 1353 : CheckLogicalDecodingRequirements(void)
110 : : {
111 : 1353 : CheckSlotRequirements();
112 : :
113 : : /*
114 : : * NB: Adding a new requirement likely means that RestoreSlotFromDisk()
115 : : * needs the same check.
116 : : */
117 : :
118 [ - + ]: 1353 : if (wal_level < WAL_LEVEL_LOGICAL)
3695 rhaas@postgresql.org 119 [ # # ]:UBC 0 : ereport(ERROR,
120 : : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
121 : : errmsg("logical decoding requires wal_level >= logical")));
122 : :
3695 rhaas@postgresql.org 123 [ + + ]:CBC 1353 : if (MyDatabaseId == InvalidOid)
124 [ + - ]: 1 : ereport(ERROR,
125 : : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
126 : : errmsg("logical decoding requires a database connection")));
127 : :
128 [ + + ]: 1352 : if (RecoveryInProgress())
129 : : {
130 : : /*
131 : : * This check may have race conditions, but whenever
132 : : * XLOG_PARAMETER_CHANGE indicates that wal_level has changed, we
133 : : * verify that there are no existing logical replication slots. And to
134 : : * avoid races around creating a new slot,
135 : : * CheckLogicalDecodingRequirements() is called once before creating
136 : : * the slot, and once when logical decoding is initially starting up.
137 : : */
372 andres@anarazel.de 138 [ + + ]: 68 : if (GetActiveWalLevelOnStandby() < WAL_LEVEL_LOGICAL)
139 [ + - ]: 1 : ereport(ERROR,
140 : : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
141 : : errmsg("logical decoding on standby requires wal_level >= logical on the primary")));
142 : : }
3695 rhaas@postgresql.org 143 : 1351 : }
144 : :
145 : : /*
146 : : * Helper function for CreateInitDecodingContext() and
147 : : * CreateDecodingContext() performing common tasks.
148 : : */
149 : : static LogicalDecodingContext *
150 : 952 : StartupDecodingContext(List *output_plugin_options,
151 : : XLogRecPtr start_lsn,
152 : : TransactionId xmin_horizon,
153 : : bool need_full_snapshot,
154 : : bool fast_forward,
155 : : XLogReaderRoutine *xl_routine,
156 : : LogicalOutputPluginWriterPrepareWrite prepare_write,
157 : : LogicalOutputPluginWriterWrite do_write,
158 : : LogicalOutputPluginWriterUpdateProgress update_progress)
159 : : {
160 : : ReplicationSlot *slot;
161 : : MemoryContext context,
162 : : old_context;
163 : : LogicalDecodingContext *ctx;
164 : :
165 : : /* shorter lines... */
166 : 952 : slot = MyReplicationSlot;
167 : :
168 : 952 : context = AllocSetContextCreate(CurrentMemoryContext,
169 : : "Logical decoding context",
170 : : ALLOCSET_DEFAULT_SIZES);
171 : 952 : old_context = MemoryContextSwitchTo(context);
172 : 952 : ctx = palloc0(sizeof(LogicalDecodingContext));
173 : :
174 : 952 : ctx->context = context;
175 : :
176 : : /*
177 : : * (re-)load output plugins, so we detect a bad (removed) output plugin
178 : : * now.
179 : : */
2279 simon@2ndQuadrant.co 180 [ + + ]: 952 : if (!fast_forward)
181 : 934 : LoadOutputPlugin(&ctx->callbacks, NameStr(slot->data.plugin));
182 : :
183 : : /*
184 : : * Now that the slot's xmin has been set, we can announce ourselves as a
185 : : * logical decoding backend which doesn't need to be checked individually
186 : : * when computing the xmin horizon because the xmin is enforced via
187 : : * replication slots.
188 : : *
189 : : * We can only do so if we're outside of a transaction (i.e. the case when
190 : : * streaming changes via walsender), otherwise an already setup
191 : : * snapshot/xid would end up being ignored. That's not a particularly
192 : : * bothersome restriction since the SQL interface can't be used for
193 : : * streaming anyway.
194 : : */
3421 andres@anarazel.de 195 [ + + ]: 951 : if (!IsTransactionOrTransactionBlock())
196 : : {
1235 alvherre@alvh.no-ip. 197 : 463 : LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE);
1245 198 : 463 : MyProc->statusFlags |= PROC_IN_LOGICAL_DECODING;
199 : 463 : ProcGlobal->statusFlags[MyProc->pgxactoff] = MyProc->statusFlags;
3421 andres@anarazel.de 200 : 463 : LWLockRelease(ProcArrayLock);
201 : : }
202 : :
3695 rhaas@postgresql.org 203 : 951 : ctx->slot = slot;
204 : :
1070 tmunro@postgresql.or 205 : 951 : ctx->reader = XLogReaderAllocate(wal_segment_size, NULL, xl_routine, ctx);
3299 fujii@postgresql.org 206 [ - + ]: 951 : if (!ctx->reader)
3299 fujii@postgresql.org 207 [ # # ]:UBC 0 : ereport(ERROR,
208 : : (errcode(ERRCODE_OUT_OF_MEMORY),
209 : : errmsg("out of memory"),
210 : : errdetail("Failed while allocating a WAL reading processor.")));
211 : :
3695 rhaas@postgresql.org 212 :CBC 951 : ctx->reorder = ReorderBufferAllocate();
213 : 951 : ctx->snapshot_builder =
2544 andres@anarazel.de 214 : 951 : AllocateSnapshotBuilder(ctx->reorder, xmin_horizon, start_lsn,
215 : : need_full_snapshot, slot->data.two_phase_at);
216 : :
3695 rhaas@postgresql.org 217 : 951 : ctx->reorder->private_data = ctx;
218 : :
219 : : /* wrap output plugin callbacks, so we can add error context information */
220 : 951 : ctx->reorder->begin = begin_cb_wrapper;
221 : 951 : ctx->reorder->apply_change = change_cb_wrapper;
2199 peter_e@gmx.net 222 : 951 : ctx->reorder->apply_truncate = truncate_cb_wrapper;
3695 rhaas@postgresql.org 223 : 951 : ctx->reorder->commit = commit_cb_wrapper;
2930 simon@2ndQuadrant.co 224 : 951 : ctx->reorder->message = message_cb_wrapper;
225 : :
226 : : /*
227 : : * To support streaming, we require start/stop/abort/commit/change
228 : : * callbacks. The message and truncate callbacks are optional, similar to
229 : : * regular output plugins. We however enable streaming when at least one
230 : : * of the methods is enabled so that we can easily identify missing
231 : : * methods.
232 : : *
233 : : * We decide it here, but only check it later in the wrappers.
234 : : */
1356 akapila@postgresql.o 235 : 1920 : ctx->streaming = (ctx->callbacks.stream_start_cb != NULL) ||
236 [ + - ]: 18 : (ctx->callbacks.stream_stop_cb != NULL) ||
237 [ + - ]: 18 : (ctx->callbacks.stream_abort_cb != NULL) ||
238 [ + - ]: 18 : (ctx->callbacks.stream_commit_cb != NULL) ||
239 [ + - ]: 18 : (ctx->callbacks.stream_change_cb != NULL) ||
240 [ + + + - ]: 987 : (ctx->callbacks.stream_message_cb != NULL) ||
241 [ - + ]: 18 : (ctx->callbacks.stream_truncate_cb != NULL);
242 : :
243 : : /*
244 : : * streaming callbacks
245 : : *
246 : : * stream_message and stream_truncate callbacks are optional, so we do not
247 : : * fail with ERROR when missing, but the wrappers simply do nothing. We
248 : : * must set the ReorderBuffer callbacks to something, otherwise the calls
249 : : * from there will crash (we don't want to move the checks there).
250 : : */
251 : 951 : ctx->reorder->stream_start = stream_start_cb_wrapper;
252 : 951 : ctx->reorder->stream_stop = stream_stop_cb_wrapper;
253 : 951 : ctx->reorder->stream_abort = stream_abort_cb_wrapper;
1201 254 : 951 : ctx->reorder->stream_prepare = stream_prepare_cb_wrapper;
1356 255 : 951 : ctx->reorder->stream_commit = stream_commit_cb_wrapper;
256 : 951 : ctx->reorder->stream_change = stream_change_cb_wrapper;
257 : 951 : ctx->reorder->stream_message = stream_message_cb_wrapper;
258 : 951 : ctx->reorder->stream_truncate = stream_truncate_cb_wrapper;
259 : :
260 : :
261 : : /*
262 : : * To support two-phase logical decoding, we require
263 : : * begin_prepare/prepare/commit-prepare/abort-prepare callbacks. The
264 : : * filter_prepare callback is optional. We however enable two-phase
265 : : * logical decoding when at least one of the methods is enabled so that we
266 : : * can easily identify missing methods.
267 : : *
268 : : * We decide it here, but only check it later in the wrappers.
269 : : */
1201 270 : 1920 : ctx->twophase = (ctx->callbacks.begin_prepare_cb != NULL) ||
271 [ + - ]: 18 : (ctx->callbacks.prepare_cb != NULL) ||
272 [ + - ]: 18 : (ctx->callbacks.commit_prepared_cb != NULL) ||
273 [ + - ]: 18 : (ctx->callbacks.rollback_prepared_cb != NULL) ||
274 [ + + + - ]: 987 : (ctx->callbacks.stream_prepare_cb != NULL) ||
275 [ - + ]: 18 : (ctx->callbacks.filter_prepare_cb != NULL);
276 : :
277 : : /*
278 : : * Callback to support decoding at prepare time.
279 : : */
280 : 951 : ctx->reorder->begin_prepare = begin_prepare_cb_wrapper;
281 : 951 : ctx->reorder->prepare = prepare_cb_wrapper;
282 : 951 : ctx->reorder->commit_prepared = commit_prepared_cb_wrapper;
283 : 951 : ctx->reorder->rollback_prepared = rollback_prepared_cb_wrapper;
284 : :
285 : : /*
286 : : * Callback to support updating progress during sending data of a
287 : : * transaction (and its subtransactions) to the output plugin.
288 : : */
431 289 : 951 : ctx->reorder->update_progress_txn = update_progress_txn_cb_wrapper;
290 : :
3695 rhaas@postgresql.org 291 : 951 : ctx->out = makeStringInfo();
292 : 951 : ctx->prepare_write = prepare_write;
293 : 951 : ctx->write = do_write;
2529 simon@2ndQuadrant.co 294 : 951 : ctx->update_progress = update_progress;
295 : :
3695 rhaas@postgresql.org 296 : 951 : ctx->output_plugin_options = output_plugin_options;
297 : :
2279 simon@2ndQuadrant.co 298 : 951 : ctx->fast_forward = fast_forward;
299 : :
3695 rhaas@postgresql.org 300 : 951 : MemoryContextSwitchTo(old_context);
301 : :
302 : 951 : return ctx;
303 : : }
304 : :
305 : : /*
306 : : * Create a new decoding context, for a new logical slot.
307 : : *
308 : : * plugin -- contains the name of the output plugin
309 : : * output_plugin_options -- contains options passed to the output plugin
310 : : * need_full_snapshot -- if true, must obtain a snapshot able to read all
311 : : * tables; if false, one that can read only catalogs is acceptable.
312 : : * restart_lsn -- if given as invalid, it's this routine's responsibility to
313 : : * mark WAL as reserved by setting a convenient restart_lsn for the slot.
314 : : * Otherwise, we set for decoding to start from the given LSN without
315 : : * marking WAL reserved beforehand. In that scenario, it's up to the
316 : : * caller to guarantee that WAL remains available.
317 : : * xl_routine -- XLogReaderRoutine for underlying XLogReader
318 : : * prepare_write, do_write, update_progress --
319 : : * callbacks that perform the use-case dependent, actual, work.
320 : : *
321 : : * Needs to be called while in a memory context that's at least as long lived
322 : : * as the decoding context because further memory contexts will be created
323 : : * inside it.
324 : : *
325 : : * Returns an initialized decoding context after calling the output plugin's
326 : : * startup function.
327 : : */
328 : : LogicalDecodingContext *
1345 peter@eisentraut.org 329 : 398 : CreateInitDecodingContext(const char *plugin,
330 : : List *output_plugin_options,
331 : : bool need_full_snapshot,
332 : : XLogRecPtr restart_lsn,
333 : : XLogReaderRoutine *xl_routine,
334 : : LogicalOutputPluginWriterPrepareWrite prepare_write,
335 : : LogicalOutputPluginWriterWrite do_write,
336 : : LogicalOutputPluginWriterUpdateProgress update_progress)
337 : : {
3631 bruce@momjian.us 338 : 398 : TransactionId xmin_horizon = InvalidTransactionId;
339 : : ReplicationSlot *slot;
340 : : NameData plugin_name;
341 : : LogicalDecodingContext *ctx;
342 : : MemoryContext old_context;
343 : :
344 : : /*
345 : : * On a standby, this check is also required while creating the slot.
346 : : * Check the comments in the function.
347 : : */
372 andres@anarazel.de 348 : 398 : CheckLogicalDecodingRequirements();
349 : :
350 : : /* shorter lines... */
3695 rhaas@postgresql.org 351 : 398 : slot = MyReplicationSlot;
352 : :
353 : : /* first some sanity checks that are unlikely to be violated */
354 [ - + ]: 398 : if (slot == NULL)
3252 heikki.linnakangas@i 355 [ # # ]:UBC 0 : elog(ERROR, "cannot perform logical decoding without an acquired slot");
356 : :
3695 rhaas@postgresql.org 357 [ - + ]:CBC 398 : if (plugin == NULL)
3695 rhaas@postgresql.org 358 [ # # ]:UBC 0 : elog(ERROR, "cannot initialize logical decoding without a specified plugin");
359 : :
360 : : /* Make sure the passed slot is suitable. These are user facing errors. */
3169 andres@anarazel.de 361 [ - + ]:CBC 398 : if (SlotIsPhysical(slot))
3695 rhaas@postgresql.org 362 [ # # ]:UBC 0 : ereport(ERROR,
363 : : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
364 : : errmsg("cannot use physical replication slot for logical decoding")));
365 : :
3695 rhaas@postgresql.org 366 [ - + ]:CBC 398 : if (slot->data.database != MyDatabaseId)
3695 rhaas@postgresql.org 367 [ # # ]:UBC 0 : ereport(ERROR,
368 : : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
369 : : errmsg("replication slot \"%s\" was not created in this database",
370 : : NameStr(slot->data.name))));
371 : :
3695 rhaas@postgresql.org 372 [ + + + + ]:CBC 680 : if (IsTransactionState() &&
373 : 282 : GetTopTransactionIdIfAny() != InvalidTransactionId)
374 [ + - ]: 2 : ereport(ERROR,
375 : : (errcode(ERRCODE_ACTIVE_SQL_TRANSACTION),
376 : : errmsg("cannot create logical replication slot in transaction that has performed writes")));
377 : :
378 : : /*
379 : : * Register output plugin name with slot. We need the mutex to avoid
380 : : * concurrent reading of a partially copied string. But we don't want any
381 : : * complicated code while holding a spinlock, so do namestrcpy() outside.
382 : : */
1343 peter@eisentraut.org 383 : 396 : namestrcpy(&plugin_name, plugin);
3695 rhaas@postgresql.org 384 [ - + ]: 396 : SpinLockAcquire(&slot->mutex);
1343 peter@eisentraut.org 385 : 396 : slot->data.plugin = plugin_name;
3695 rhaas@postgresql.org 386 : 396 : SpinLockRelease(&slot->mutex);
387 : :
1836 alvherre@alvh.no-ip. 388 [ + + ]: 396 : if (XLogRecPtrIsInvalid(restart_lsn))
389 : 390 : ReplicationSlotReserveWal();
390 : : else
391 : : {
392 [ - + ]: 6 : SpinLockAcquire(&slot->mutex);
393 : 6 : slot->data.restart_lsn = restart_lsn;
394 : 6 : SpinLockRelease(&slot->mutex);
395 : : }
396 : :
397 : : /* ----
398 : : * This is a bit tricky: We need to determine a safe xmin horizon to start
399 : : * decoding from, to avoid starting from a running xacts record referring
400 : : * to xids whose rows have been vacuumed or pruned
401 : : * already. GetOldestSafeDecodingTransactionId() returns such a value, but
402 : : * without further interlock its return value might immediately be out of
403 : : * date.
404 : : *
405 : : * So we have to acquire the ProcArrayLock to prevent computation of new
406 : : * xmin horizons by other backends, get the safe decoding xid, and inform
407 : : * the slot machinery about the new limit. Once that's done the
408 : : * ProcArrayLock can be released as the slot machinery now is
409 : : * protecting against vacuum.
410 : : *
411 : : * Note that, temporarily, the data, not just the catalog, xmin has to be
412 : : * reserved if a data snapshot is to be exported. Otherwise the initial
413 : : * data snapshot created here is not guaranteed to be valid. After that
414 : : * the data xmin doesn't need to be managed anymore and the global xmin
415 : : * should be recomputed. As we are fine with losing the pegged data xmin
416 : : * after crash - no chance a snapshot would get exported anymore - we can
417 : : * get away with just setting the slot's
418 : : * effective_xmin. ReplicationSlotRelease will reset it again.
419 : : *
420 : : * ----
421 : : */
3695 rhaas@postgresql.org 422 : 396 : LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE);
423 : :
2444 andres@anarazel.de 424 : 396 : xmin_horizon = GetOldestSafeDecodingTransactionId(!need_full_snapshot);
425 : :
2144 michael@paquier.xyz 426 [ - + ]: 396 : SpinLockAcquire(&slot->mutex);
2548 andres@anarazel.de 427 : 396 : slot->effective_catalog_xmin = xmin_horizon;
428 : 396 : slot->data.catalog_xmin = xmin_horizon;
429 [ + + ]: 396 : if (need_full_snapshot)
430 : 169 : slot->effective_xmin = xmin_horizon;
2144 michael@paquier.xyz 431 : 396 : SpinLockRelease(&slot->mutex);
432 : :
3695 rhaas@postgresql.org 433 : 396 : ReplicationSlotsComputeRequiredXmin(true);
434 : :
435 : 396 : LWLockRelease(ProcArrayLock);
436 : :
437 : 396 : ReplicationSlotMarkDirty();
438 : 396 : ReplicationSlotSave();
439 : :
1836 alvherre@alvh.no-ip. 440 : 396 : ctx = StartupDecodingContext(NIL, restart_lsn, xmin_horizon,
441 : : need_full_snapshot, false,
442 : : xl_routine, prepare_write, do_write,
443 : : update_progress);
444 : :
445 : : /* call output plugin initialization callback */
3695 rhaas@postgresql.org 446 : 395 : old_context = MemoryContextSwitchTo(ctx->context);
447 [ + - ]: 395 : if (ctx->callbacks.startup_cb != NULL)
448 : 395 : startup_cb_wrapper(ctx, &ctx->options, true);
449 : 395 : MemoryContextSwitchTo(old_context);
450 : :
451 : : /*
452 : : * We allow decoding of prepared transactions when the two_phase is
453 : : * enabled at the time of slot creation, or when the two_phase option is
454 : : * given at the streaming start, provided the plugin supports all the
455 : : * callbacks for two-phase.
456 : : */
1005 akapila@postgresql.o 457 : 395 : ctx->twophase &= slot->data.two_phase;
458 : :
2216 peter_e@gmx.net 459 : 395 : ctx->reorder->output_rewrites = ctx->options.receive_rewrites;
460 : :
3695 rhaas@postgresql.org 461 : 395 : return ctx;
462 : : }
463 : :
464 : : /*
465 : : * Create a new decoding context, for a logical slot that has previously been
466 : : * used already.
467 : : *
468 : : * start_lsn
469 : : * The LSN at which to start decoding. If InvalidXLogRecPtr, restart
470 : : * from the slot's confirmed_flush; otherwise, start from the specified
471 : : * location (but move it forwards to confirmed_flush if it's older than
472 : : * that, see below).
473 : : *
474 : : * output_plugin_options
475 : : * options passed to the output plugin.
476 : : *
477 : : * fast_forward
478 : : * bypass the generation of logical changes.
479 : : *
480 : : * xl_routine
481 : : * XLogReaderRoutine used by underlying xlogreader
482 : : *
483 : : * prepare_write, do_write, update_progress
484 : : * callbacks that have to be filled to perform the use-case dependent,
485 : : * actual work.
486 : : *
487 : : * Needs to be called while in a memory context that's at least as long lived
488 : : * as the decoding context because further memory contexts will be created
489 : : * inside it.
490 : : *
491 : : * Returns an initialized decoding context after calling the output plugin's
492 : : * startup function.
493 : : */
494 : : LogicalDecodingContext *
495 : 566 : CreateDecodingContext(XLogRecPtr start_lsn,
496 : : List *output_plugin_options,
497 : : bool fast_forward,
498 : : XLogReaderRoutine *xl_routine,
499 : : LogicalOutputPluginWriterPrepareWrite prepare_write,
500 : : LogicalOutputPluginWriterWrite do_write,
501 : : LogicalOutputPluginWriterUpdateProgress update_progress)
502 : : {
503 : : LogicalDecodingContext *ctx;
504 : : ReplicationSlot *slot;
505 : : MemoryContext old_context;
506 : :
507 : : /* shorter lines... */
508 : 566 : slot = MyReplicationSlot;
509 : :
510 : : /* first some sanity checks that are unlikely to be violated */
511 [ - + ]: 566 : if (slot == NULL)
3252 heikki.linnakangas@i 512 [ # # ]:UBC 0 : elog(ERROR, "cannot perform logical decoding without an acquired slot");
513 : :
514 : : /* make sure the passed slot is suitable, these are user facing errors */
3169 andres@anarazel.de 515 [ + + ]:CBC 566 : if (SlotIsPhysical(slot))
3695 rhaas@postgresql.org 516 [ + - ]: 1 : ereport(ERROR,
517 : : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
518 : : errmsg("cannot use physical replication slot for logical decoding")));
519 : :
520 : : /*
521 : : * We need to access the system tables during decoding to build the
522 : : * logical changes unless we are in fast_forward mode where no changes are
523 : : * generated.
524 : : */
11 akapila@postgresql.o 525 [ + + + - ]:GNC 565 : if (slot->data.database != MyDatabaseId && !fast_forward)
3695 rhaas@postgresql.org 526 [ + - ]:CBC 3 : ereport(ERROR,
527 : : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
528 : : errmsg("replication slot \"%s\" was not created in this database",
529 : : NameStr(slot->data.name))));
530 : :
531 : : /*
532 : : * The slots being synced from the primary can't be used for decoding as
533 : : * they are used after failover. However, we do allow advancing the LSNs
534 : : * during the synchronization of slots. See update_local_synced_slot.
535 : : */
11 akapila@postgresql.o 536 [ + + + + :GNC 562 : if (RecoveryInProgress() && slot->data.synced && !IsSyncingReplicationSlots())
+ + ]
60 537 [ + - ]: 1 : ereport(ERROR,
538 : : errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
539 : : errmsg("cannot use replication slot \"%s\" for logical decoding",
540 : : NameStr(slot->data.name)),
541 : : errdetail("This slot is being synchronized from the primary server."),
542 : : errhint("Specify another replication slot."));
543 : :
544 : : /*
545 : : * Check if slot has been invalidated due to max_slot_wal_keep_size. Avoid
546 : : * "cannot get changes" wording in this errmsg because that'd be
547 : : * confusingly ambiguous about no changes being available when called from
548 : : * pg_logical_slot_get_changes_guts().
549 : : */
373 andres@anarazel.de 550 [ - + ]:CBC 561 : if (MyReplicationSlot->data.invalidated == RS_INVAL_WAL_REMOVED)
373 andres@anarazel.de 551 [ # # ]:UBC 0 : ereport(ERROR,
552 : : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
553 : : errmsg("can no longer get changes from replication slot \"%s\"",
554 : : NameStr(MyReplicationSlot->data.name)),
555 : : errdetail("This slot has been invalidated because it exceeded the maximum reserved size.")));
556 : :
373 andres@anarazel.de 557 [ + + ]:CBC 561 : if (MyReplicationSlot->data.invalidated != RS_INVAL_NONE)
558 [ + - ]: 5 : ereport(ERROR,
559 : : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
560 : : errmsg("can no longer get changes from replication slot \"%s\"",
561 : : NameStr(MyReplicationSlot->data.name)),
562 : : errdetail("This slot has been invalidated because it was conflicting with recovery.")));
563 : :
564 [ - + ]: 556 : Assert(MyReplicationSlot->data.invalidated == RS_INVAL_NONE);
565 [ - + ]: 556 : Assert(MyReplicationSlot->data.restart_lsn != InvalidXLogRecPtr);
566 : :
3695 rhaas@postgresql.org 567 [ + + ]: 556 : if (start_lsn == InvalidXLogRecPtr)
568 : : {
569 : : /* continue from last position */
570 : 337 : start_lsn = slot->data.confirmed_flush;
571 : : }
572 [ + + ]: 219 : else if (start_lsn < slot->data.confirmed_flush)
573 : : {
574 : : /*
575 : : * It might seem like we should error out in this case, but it's
576 : : * pretty common for a client to acknowledge a LSN it doesn't have to
577 : : * do anything for, and thus didn't store persistently, because the
578 : : * xlog records didn't result in anything relevant for logical
579 : : * decoding. Clients have to be able to do that to support synchronous
580 : : * replication.
581 : : *
582 : : * Starting at a different LSN than requested might not catch certain
583 : : * kinds of client errors; so the client may wish to check that
584 : : * confirmed_flush_lsn matches its expectations.
585 : : */
989 jdavis@postgresql.or 586 [ + - ]: 16 : elog(LOG, "%X/%X has been already streamed, forwarding to %X/%X",
587 : : LSN_FORMAT_ARGS(start_lsn),
588 : : LSN_FORMAT_ARGS(slot->data.confirmed_flush));
589 : :
3175 andres@anarazel.de 590 : 16 : start_lsn = slot->data.confirmed_flush;
591 : : }
592 : :
3695 rhaas@postgresql.org 593 : 556 : ctx = StartupDecodingContext(output_plugin_options,
594 : : start_lsn, InvalidTransactionId, false,
595 : : fast_forward, xl_routine, prepare_write,
596 : : do_write, update_progress);
597 : :
598 : : /* call output plugin initialization callback */
599 : 556 : old_context = MemoryContextSwitchTo(ctx->context);
600 [ + + ]: 556 : if (ctx->callbacks.startup_cb != NULL)
3667 601 : 538 : startup_cb_wrapper(ctx, &ctx->options, false);
3695 602 : 553 : MemoryContextSwitchTo(old_context);
603 : :
604 : : /*
605 : : * We allow decoding of prepared transactions when the two_phase is
606 : : * enabled at the time of slot creation, or when the two_phase option is
607 : : * given at the streaming start, provided the plugin supports all the
608 : : * callbacks for two-phase.
609 : : */
1005 akapila@postgresql.o 610 [ + + + + ]: 553 : ctx->twophase &= (slot->data.two_phase || ctx->twophase_opt_given);
611 : :
612 : : /* Mark slot to allow two_phase decoding if not already marked */
613 [ + + + + ]: 553 : if (ctx->twophase && !slot->data.two_phase)
614 : : {
458 michael@paquier.xyz 615 [ - + ]: 5 : SpinLockAcquire(&slot->mutex);
1005 akapila@postgresql.o 616 : 5 : slot->data.two_phase = true;
617 : 5 : slot->data.two_phase_at = start_lsn;
458 michael@paquier.xyz 618 : 5 : SpinLockRelease(&slot->mutex);
1005 akapila@postgresql.o 619 : 5 : ReplicationSlotMarkDirty();
620 : 5 : ReplicationSlotSave();
621 : 5 : SnapBuildSetTwoPhaseAt(ctx->snapshot_builder, start_lsn);
622 : : }
623 : :
2216 peter_e@gmx.net 624 : 553 : ctx->reorder->output_rewrites = ctx->options.receive_rewrites;
625 : :
3695 rhaas@postgresql.org 626 [ + - ]: 553 : ereport(LOG,
627 : : (errmsg("starting logical decoding for slot \"%s\"",
628 : : NameStr(slot->data.name)),
629 : : errdetail("Streaming transactions committing after %X/%X, reading WAL from %X/%X.",
630 : : LSN_FORMAT_ARGS(slot->data.confirmed_flush),
631 : : LSN_FORMAT_ARGS(slot->data.restart_lsn))));
632 : :
633 : 553 : return ctx;
634 : : }
635 : :
636 : : /*
637 : : * Returns true if a consistent initial decoding snapshot has been built.
638 : : */
639 : : bool
640 : 434 : DecodingContextReady(LogicalDecodingContext *ctx)
641 : : {
642 : 434 : return SnapBuildCurrentState(ctx->snapshot_builder) == SNAPBUILD_CONSISTENT;
643 : : }
644 : :
645 : : /*
646 : : * Read from the decoding slot, until it is ready to start extracting changes.
647 : : */
648 : : void
649 : 389 : DecodingContextFindStartpoint(LogicalDecodingContext *ctx)
650 : : {
2144 michael@paquier.xyz 651 : 389 : ReplicationSlot *slot = ctx->slot;
652 : :
653 : : /* Initialize from where to start reading WAL. */
1540 heikki.linnakangas@i 654 : 389 : XLogBeginRead(ctx->reader, slot->data.restart_lsn);
655 : :
3695 rhaas@postgresql.org 656 [ + + ]: 389 : elog(DEBUG1, "searching for logical decoding starting point, starting at %X/%X",
657 : : LSN_FORMAT_ARGS(slot->data.restart_lsn));
658 : :
659 : : /* Wait for a consistent starting point */
660 : : for (;;)
661 : 43 : {
662 : : XLogRecord *record;
663 : 432 : char *err = NULL;
664 : :
665 : : /* the read_page callback waits for new WAL */
1070 tmunro@postgresql.or 666 : 432 : record = XLogReadRecord(ctx->reader, &err);
3695 rhaas@postgresql.org 667 [ - + ]: 432 : if (err)
886 michael@paquier.xyz 668 [ # # ]:UBC 0 : elog(ERROR, "could not find logical decoding starting point: %s", err);
3433 heikki.linnakangas@i 669 [ - + ]:CBC 432 : if (!record)
886 michael@paquier.xyz 670 [ # # ]:UBC 0 : elog(ERROR, "could not find logical decoding starting point");
671 : :
3433 heikki.linnakangas@i 672 :CBC 432 : LogicalDecodingProcessRecord(ctx, ctx->reader);
673 : :
674 : : /* only continue till we found a consistent spot */
3695 rhaas@postgresql.org 675 [ + + ]: 430 : if (DecodingContextReady(ctx))
676 : 387 : break;
677 : :
3577 andres@anarazel.de 678 [ - + ]: 43 : CHECK_FOR_INTERRUPTS();
679 : : }
680 : :
2144 michael@paquier.xyz 681 [ - + ]: 387 : SpinLockAcquire(&slot->mutex);
682 : 387 : slot->data.confirmed_flush = ctx->reader->EndRecPtr;
1005 akapila@postgresql.o 683 [ + + ]: 387 : if (slot->data.two_phase)
684 : 6 : slot->data.two_phase_at = ctx->reader->EndRecPtr;
2144 michael@paquier.xyz 685 : 387 : SpinLockRelease(&slot->mutex);
3695 rhaas@postgresql.org 686 : 387 : }
687 : :
688 : : /*
689 : : * Free a previously allocated decoding context, invoking the shutdown
690 : : * callback if necessary.
691 : : */
692 : : void
693 : 764 : FreeDecodingContext(LogicalDecodingContext *ctx)
694 : : {
695 [ + + ]: 764 : if (ctx->callbacks.shutdown_cb != NULL)
696 : 746 : shutdown_cb_wrapper(ctx);
697 : :
698 : 764 : ReorderBufferFree(ctx->reorder);
699 : 764 : FreeSnapshotBuilder(ctx->snapshot_builder);
700 : 764 : XLogReaderFree(ctx->reader);
701 : 764 : MemoryContextDelete(ctx->context);
702 : 764 : }
703 : :
704 : : /*
705 : : * Prepare a write using the context's output routine.
706 : : */
707 : : void
708 : 339229 : OutputPluginPrepareWrite(struct LogicalDecodingContext *ctx, bool last_write)
709 : : {
710 [ - + ]: 339229 : if (!ctx->accept_writes)
3695 rhaas@postgresql.org 711 [ # # ]:UBC 0 : elog(ERROR, "writes are only accepted in commit, begin and change callbacks");
712 : :
3695 rhaas@postgresql.org 713 :CBC 339229 : ctx->prepare_write(ctx, ctx->write_location, ctx->write_xid, last_write);
714 : 339229 : ctx->prepared_write = true;
715 : 339229 : }
716 : :
717 : : /*
718 : : * Perform a write using the context's output routine.
719 : : */
720 : : void
721 : 339229 : OutputPluginWrite(struct LogicalDecodingContext *ctx, bool last_write)
722 : : {
723 [ - + ]: 339229 : if (!ctx->prepared_write)
3695 rhaas@postgresql.org 724 [ # # ]:UBC 0 : elog(ERROR, "OutputPluginPrepareWrite needs to be called before OutputPluginWrite");
725 : :
3695 rhaas@postgresql.org 726 :CBC 339229 : ctx->write(ctx, ctx->write_location, ctx->write_xid, last_write);
727 : 339223 : ctx->prepared_write = false;
728 : 339223 : }
729 : :
730 : : /*
731 : : * Update progress tracking (if supported).
732 : : */
733 : : void
746 akapila@postgresql.o 734 : 4048 : OutputPluginUpdateProgress(struct LogicalDecodingContext *ctx,
735 : : bool skipped_xact)
736 : : {
2529 simon@2ndQuadrant.co 737 [ + + ]: 4048 : if (!ctx->update_progress)
738 : 1581 : return;
739 : :
746 akapila@postgresql.o 740 : 2467 : ctx->update_progress(ctx, ctx->write_location, ctx->write_xid,
741 : : skipped_xact);
742 : : }
743 : :
744 : : /*
745 : : * Load the output plugin, lookup its output plugin init function, and check
746 : : * that it provides the required callbacks.
747 : : */
748 : : static void
1345 peter@eisentraut.org 749 : 934 : LoadOutputPlugin(OutputPluginCallbacks *callbacks, const char *plugin)
750 : : {
751 : : LogicalOutputPluginInit plugin_init;
752 : :
3695 rhaas@postgresql.org 753 : 933 : plugin_init = (LogicalOutputPluginInit)
754 : 934 : load_external_function(plugin, "_PG_output_plugin_init", false, NULL);
755 : :
756 [ - + ]: 933 : if (plugin_init == NULL)
3695 rhaas@postgresql.org 757 [ # # ]:UBC 0 : elog(ERROR, "output plugins have to declare the _PG_output_plugin_init symbol");
758 : :
759 : : /* ask the output plugin to fill the callback struct */
3695 rhaas@postgresql.org 760 :CBC 933 : plugin_init(callbacks);
761 : :
762 [ - + ]: 933 : if (callbacks->begin_cb == NULL)
3695 rhaas@postgresql.org 763 [ # # ]:UBC 0 : elog(ERROR, "output plugins have to register a begin callback");
3695 rhaas@postgresql.org 764 [ - + ]:CBC 933 : if (callbacks->change_cb == NULL)
3695 rhaas@postgresql.org 765 [ # # ]:UBC 0 : elog(ERROR, "output plugins have to register a change callback");
3695 rhaas@postgresql.org 766 [ - + ]:CBC 933 : if (callbacks->commit_cb == NULL)
3695 rhaas@postgresql.org 767 [ # # ]:UBC 0 : elog(ERROR, "output plugins have to register a commit callback");
3695 rhaas@postgresql.org 768 :CBC 933 : }
769 : :
770 : : static void
771 : 13 : output_plugin_error_callback(void *arg)
772 : : {
773 : 13 : LogicalErrorCallbackState *state = (LogicalErrorCallbackState *) arg;
774 : :
775 : : /* not all callbacks have an associated LSN */
776 [ + + ]: 13 : if (state->report_location != InvalidXLogRecPtr)
777 : 10 : errcontext("slot \"%s\", output plugin \"%s\", in the %s callback, associated LSN %X/%X",
778 : 10 : NameStr(state->ctx->slot->data.name),
779 : 10 : NameStr(state->ctx->slot->data.plugin),
780 : : state->callback_name,
1146 peter@eisentraut.org 781 : 10 : LSN_FORMAT_ARGS(state->report_location));
782 : : else
3695 rhaas@postgresql.org 783 : 3 : errcontext("slot \"%s\", output plugin \"%s\", in the %s callback",
784 : 3 : NameStr(state->ctx->slot->data.name),
785 : 3 : NameStr(state->ctx->slot->data.plugin),
786 : : state->callback_name);
787 : 13 : }
788 : :
789 : : static void
790 : 933 : startup_cb_wrapper(LogicalDecodingContext *ctx, OutputPluginOptions *opt, bool is_init)
791 : : {
792 : : LogicalErrorCallbackState state;
793 : : ErrorContextCallback errcallback;
794 : :
2279 simon@2ndQuadrant.co 795 [ - + ]: 933 : Assert(!ctx->fast_forward);
796 : :
797 : : /* Push callback + info on the error context stack */
3695 rhaas@postgresql.org 798 : 933 : state.ctx = ctx;
799 : 933 : state.callback_name = "startup";
800 : 933 : state.report_location = InvalidXLogRecPtr;
801 : 933 : errcallback.callback = output_plugin_error_callback;
802 : 933 : errcallback.arg = (void *) &state;
803 : 933 : errcallback.previous = error_context_stack;
804 : 933 : error_context_stack = &errcallback;
805 : :
806 : : /* set output state */
807 : 933 : ctx->accept_writes = false;
704 akapila@postgresql.o 808 : 933 : ctx->end_xact = false;
809 : :
810 : : /* do the actual work: call callback */
3695 rhaas@postgresql.org 811 : 933 : ctx->callbacks.startup_cb(ctx, opt, is_init);
812 : :
813 : : /* Pop the error context stack */
814 : 930 : error_context_stack = errcallback.previous;
815 : 930 : }
816 : :
817 : : static void
818 : 746 : shutdown_cb_wrapper(LogicalDecodingContext *ctx)
819 : : {
820 : : LogicalErrorCallbackState state;
821 : : ErrorContextCallback errcallback;
822 : :
2279 simon@2ndQuadrant.co 823 [ - + ]: 746 : Assert(!ctx->fast_forward);
824 : :
825 : : /* Push callback + info on the error context stack */
3695 rhaas@postgresql.org 826 : 746 : state.ctx = ctx;
827 : 746 : state.callback_name = "shutdown";
828 : 746 : state.report_location = InvalidXLogRecPtr;
829 : 746 : errcallback.callback = output_plugin_error_callback;
830 : 746 : errcallback.arg = (void *) &state;
831 : 746 : errcallback.previous = error_context_stack;
832 : 746 : error_context_stack = &errcallback;
833 : :
834 : : /* set output state */
835 : 746 : ctx->accept_writes = false;
704 akapila@postgresql.o 836 : 746 : ctx->end_xact = false;
837 : :
838 : : /* do the actual work: call callback */
3695 rhaas@postgresql.org 839 : 746 : ctx->callbacks.shutdown_cb(ctx);
840 : :
841 : : /* Pop the error context stack */
842 : 746 : error_context_stack = errcallback.previous;
843 : 746 : }
844 : :
845 : :
846 : : /*
847 : : * Callbacks for ReorderBuffer which add in some more information and then call
848 : : * output_plugin.h plugins.
849 : : */
850 : : static void
851 : 1192 : begin_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn)
852 : : {
853 : 1192 : LogicalDecodingContext *ctx = cache->private_data;
854 : : LogicalErrorCallbackState state;
855 : : ErrorContextCallback errcallback;
856 : :
2279 simon@2ndQuadrant.co 857 [ - + ]: 1192 : Assert(!ctx->fast_forward);
858 : :
859 : : /* Push callback + info on the error context stack */
3695 rhaas@postgresql.org 860 : 1192 : state.ctx = ctx;
861 : 1192 : state.callback_name = "begin";
862 : 1192 : state.report_location = txn->first_lsn;
863 : 1192 : errcallback.callback = output_plugin_error_callback;
864 : 1192 : errcallback.arg = (void *) &state;
865 : 1192 : errcallback.previous = error_context_stack;
866 : 1192 : error_context_stack = &errcallback;
867 : :
868 : : /* set output state */
869 : 1192 : ctx->accept_writes = true;
870 : 1192 : ctx->write_xid = txn->xid;
871 : 1192 : ctx->write_location = txn->first_lsn;
704 akapila@postgresql.o 872 : 1192 : ctx->end_xact = false;
873 : :
874 : : /* do the actual work: call callback */
3695 rhaas@postgresql.org 875 : 1192 : ctx->callbacks.begin_cb(ctx, txn);
876 : :
877 : : /* Pop the error context stack */
878 : 1192 : error_context_stack = errcallback.previous;
879 : 1192 : }
880 : :
881 : : static void
882 : 1189 : commit_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
883 : : XLogRecPtr commit_lsn)
884 : : {
885 : 1189 : LogicalDecodingContext *ctx = cache->private_data;
886 : : LogicalErrorCallbackState state;
887 : : ErrorContextCallback errcallback;
888 : :
2279 simon@2ndQuadrant.co 889 [ - + ]: 1189 : Assert(!ctx->fast_forward);
890 : :
891 : : /* Push callback + info on the error context stack */
3695 rhaas@postgresql.org 892 : 1189 : state.ctx = ctx;
893 : 1189 : state.callback_name = "commit";
2489 tgl@sss.pgh.pa.us 894 : 1189 : state.report_location = txn->final_lsn; /* beginning of commit record */
3695 rhaas@postgresql.org 895 : 1189 : errcallback.callback = output_plugin_error_callback;
896 : 1189 : errcallback.arg = (void *) &state;
897 : 1189 : errcallback.previous = error_context_stack;
898 : 1189 : error_context_stack = &errcallback;
899 : :
900 : : /* set output state */
901 : 1189 : ctx->accept_writes = true;
902 : 1189 : ctx->write_xid = txn->xid;
903 : 1189 : ctx->write_location = txn->end_lsn; /* points to the end of the record */
704 akapila@postgresql.o 904 : 1189 : ctx->end_xact = true;
905 : :
906 : : /* do the actual work: call callback */
3695 rhaas@postgresql.org 907 : 1189 : ctx->callbacks.commit_cb(ctx, txn, commit_lsn);
908 : :
909 : : /* Pop the error context stack */
910 : 1183 : error_context_stack = errcallback.previous;
911 : 1183 : }
912 : :
913 : : /*
914 : : * The functionality of begin_prepare is quite similar to begin with the
915 : : * exception that this will have gid (global transaction id) information which
916 : : * can be used by plugin. Now, we thought about extending the existing begin
917 : : * but that would break the replication protocol and additionally this looks
918 : : * cleaner.
919 : : */
920 : : static void
1201 akapila@postgresql.o 921 : 22 : begin_prepare_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn)
922 : : {
923 : 22 : LogicalDecodingContext *ctx = cache->private_data;
924 : : LogicalErrorCallbackState state;
925 : : ErrorContextCallback errcallback;
926 : :
927 [ - + ]: 22 : Assert(!ctx->fast_forward);
928 : :
929 : : /* We're only supposed to call this when two-phase commits are supported */
930 [ - + ]: 22 : Assert(ctx->twophase);
931 : :
932 : : /* Push callback + info on the error context stack */
933 : 22 : state.ctx = ctx;
934 : 22 : state.callback_name = "begin_prepare";
935 : 22 : state.report_location = txn->first_lsn;
936 : 22 : errcallback.callback = output_plugin_error_callback;
937 : 22 : errcallback.arg = (void *) &state;
938 : 22 : errcallback.previous = error_context_stack;
939 : 22 : error_context_stack = &errcallback;
940 : :
941 : : /* set output state */
942 : 22 : ctx->accept_writes = true;
943 : 22 : ctx->write_xid = txn->xid;
944 : 22 : ctx->write_location = txn->first_lsn;
704 945 : 22 : ctx->end_xact = false;
946 : :
947 : : /*
948 : : * If the plugin supports two-phase commits then begin prepare callback is
949 : : * mandatory
950 : : */
1201 951 [ - + ]: 22 : if (ctx->callbacks.begin_prepare_cb == NULL)
1201 akapila@postgresql.o 952 [ # # ]:UBC 0 : ereport(ERROR,
953 : : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
954 : : errmsg("logical replication at prepare time requires a %s callback",
955 : : "begin_prepare_cb")));
956 : :
957 : : /* do the actual work: call callback */
1201 akapila@postgresql.o 958 :CBC 22 : ctx->callbacks.begin_prepare_cb(ctx, txn);
959 : :
960 : : /* Pop the error context stack */
961 : 22 : error_context_stack = errcallback.previous;
962 : 22 : }
963 : :
964 : : static void
965 : 22 : prepare_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
966 : : XLogRecPtr prepare_lsn)
967 : : {
968 : 22 : LogicalDecodingContext *ctx = cache->private_data;
969 : : LogicalErrorCallbackState state;
970 : : ErrorContextCallback errcallback;
971 : :
972 [ - + ]: 22 : Assert(!ctx->fast_forward);
973 : :
974 : : /* We're only supposed to call this when two-phase commits are supported */
975 [ - + ]: 22 : Assert(ctx->twophase);
976 : :
977 : : /* Push callback + info on the error context stack */
978 : 22 : state.ctx = ctx;
979 : 22 : state.callback_name = "prepare";
980 : 22 : state.report_location = txn->final_lsn; /* beginning of prepare record */
981 : 22 : errcallback.callback = output_plugin_error_callback;
982 : 22 : errcallback.arg = (void *) &state;
983 : 22 : errcallback.previous = error_context_stack;
984 : 22 : error_context_stack = &errcallback;
985 : :
986 : : /* set output state */
987 : 22 : ctx->accept_writes = true;
988 : 22 : ctx->write_xid = txn->xid;
989 : 22 : ctx->write_location = txn->end_lsn; /* points to the end of the record */
704 990 : 22 : ctx->end_xact = true;
991 : :
992 : : /*
993 : : * If the plugin supports two-phase commits then prepare callback is
994 : : * mandatory
995 : : */
1201 996 [ - + ]: 22 : if (ctx->callbacks.prepare_cb == NULL)
1201 akapila@postgresql.o 997 [ # # ]:UBC 0 : ereport(ERROR,
998 : : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
999 : : errmsg("logical replication at prepare time requires a %s callback",
1000 : : "prepare_cb")));
1001 : :
1002 : : /* do the actual work: call callback */
1201 akapila@postgresql.o 1003 :CBC 22 : ctx->callbacks.prepare_cb(ctx, txn, prepare_lsn);
1004 : :
1005 : : /* Pop the error context stack */
1006 : 22 : error_context_stack = errcallback.previous;
1007 : 22 : }
1008 : :
1009 : : static void
1010 : 29 : commit_prepared_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
1011 : : XLogRecPtr commit_lsn)
1012 : : {
1013 : 29 : LogicalDecodingContext *ctx = cache->private_data;
1014 : : LogicalErrorCallbackState state;
1015 : : ErrorContextCallback errcallback;
1016 : :
1017 [ - + ]: 29 : Assert(!ctx->fast_forward);
1018 : :
1019 : : /* We're only supposed to call this when two-phase commits are supported */
1020 [ - + ]: 29 : Assert(ctx->twophase);
1021 : :
1022 : : /* Push callback + info on the error context stack */
1023 : 29 : state.ctx = ctx;
1024 : 29 : state.callback_name = "commit_prepared";
1025 : 29 : state.report_location = txn->final_lsn; /* beginning of commit record */
1026 : 29 : errcallback.callback = output_plugin_error_callback;
1027 : 29 : errcallback.arg = (void *) &state;
1028 : 29 : errcallback.previous = error_context_stack;
1029 : 29 : error_context_stack = &errcallback;
1030 : :
1031 : : /* set output state */
1032 : 29 : ctx->accept_writes = true;
1033 : 29 : ctx->write_xid = txn->xid;
1034 : 29 : ctx->write_location = txn->end_lsn; /* points to the end of the record */
704 1035 : 29 : ctx->end_xact = true;
1036 : :
1037 : : /*
1038 : : * If the plugin support two-phase commits then commit prepared callback
1039 : : * is mandatory
1040 : : */
1201 1041 [ - + ]: 29 : if (ctx->callbacks.commit_prepared_cb == NULL)
1201 akapila@postgresql.o 1042 [ # # ]:UBC 0 : ereport(ERROR,
1043 : : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1044 : : errmsg("logical replication at prepare time requires a %s callback",
1045 : : "commit_prepared_cb")));
1046 : :
1047 : : /* do the actual work: call callback */
1201 akapila@postgresql.o 1048 :CBC 29 : ctx->callbacks.commit_prepared_cb(ctx, txn, commit_lsn);
1049 : :
1050 : : /* Pop the error context stack */
1051 : 29 : error_context_stack = errcallback.previous;
1052 : 29 : }
1053 : :
1054 : : static void
1055 : 9 : rollback_prepared_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
1056 : : XLogRecPtr prepare_end_lsn,
1057 : : TimestampTz prepare_time)
1058 : : {
1059 : 9 : LogicalDecodingContext *ctx = cache->private_data;
1060 : : LogicalErrorCallbackState state;
1061 : : ErrorContextCallback errcallback;
1062 : :
1063 [ - + ]: 9 : Assert(!ctx->fast_forward);
1064 : :
1065 : : /* We're only supposed to call this when two-phase commits are supported */
1066 [ - + ]: 9 : Assert(ctx->twophase);
1067 : :
1068 : : /* Push callback + info on the error context stack */
1069 : 9 : state.ctx = ctx;
1070 : 9 : state.callback_name = "rollback_prepared";
1071 : 9 : state.report_location = txn->final_lsn; /* beginning of commit record */
1072 : 9 : errcallback.callback = output_plugin_error_callback;
1073 : 9 : errcallback.arg = (void *) &state;
1074 : 9 : errcallback.previous = error_context_stack;
1075 : 9 : error_context_stack = &errcallback;
1076 : :
1077 : : /* set output state */
1078 : 9 : ctx->accept_writes = true;
1079 : 9 : ctx->write_xid = txn->xid;
1080 : 9 : ctx->write_location = txn->end_lsn; /* points to the end of the record */
704 1081 : 9 : ctx->end_xact = true;
1082 : :
1083 : : /*
1084 : : * If the plugin support two-phase commits then rollback prepared callback
1085 : : * is mandatory
1086 : : */
1201 1087 [ - + ]: 9 : if (ctx->callbacks.rollback_prepared_cb == NULL)
1201 akapila@postgresql.o 1088 [ # # ]:UBC 0 : ereport(ERROR,
1089 : : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1090 : : errmsg("logical replication at prepare time requires a %s callback",
1091 : : "rollback_prepared_cb")));
1092 : :
1093 : : /* do the actual work: call callback */
1201 akapila@postgresql.o 1094 :CBC 9 : ctx->callbacks.rollback_prepared_cb(ctx, txn, prepare_end_lsn,
1095 : : prepare_time);
1096 : :
1097 : : /* Pop the error context stack */
1098 : 9 : error_context_stack = errcallback.previous;
1099 : 9 : }
1100 : :
1101 : : static void
3695 rhaas@postgresql.org 1102 : 167066 : change_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
1103 : : Relation relation, ReorderBufferChange *change)
1104 : : {
1105 : 167066 : LogicalDecodingContext *ctx = cache->private_data;
1106 : : LogicalErrorCallbackState state;
1107 : : ErrorContextCallback errcallback;
1108 : :
2279 simon@2ndQuadrant.co 1109 [ - + ]: 167066 : Assert(!ctx->fast_forward);
1110 : :
1111 : : /* Push callback + info on the error context stack */
3695 rhaas@postgresql.org 1112 : 167066 : state.ctx = ctx;
1113 : 167066 : state.callback_name = "change";
1114 : 167066 : state.report_location = change->lsn;
1115 : 167066 : errcallback.callback = output_plugin_error_callback;
1116 : 167066 : errcallback.arg = (void *) &state;
1117 : 167066 : errcallback.previous = error_context_stack;
1118 : 167066 : error_context_stack = &errcallback;
1119 : :
1120 : : /* set output state */
1121 : 167066 : ctx->accept_writes = true;
1122 : 167066 : ctx->write_xid = txn->xid;
1123 : :
1124 : : /*
1125 : : * Report this change's lsn so replies from clients can give an up-to-date
1126 : : * answer. This won't ever be enough (and shouldn't be!) to confirm
1127 : : * receipt of this transaction, but it might allow another transaction's
1128 : : * commit to be confirmed with one message.
1129 : : */
1130 : 167066 : ctx->write_location = change->lsn;
1131 : :
704 akapila@postgresql.o 1132 : 167066 : ctx->end_xact = false;
1133 : :
3695 rhaas@postgresql.org 1134 : 167066 : ctx->callbacks.change_cb(ctx, txn, relation, change);
1135 : :
1136 : : /* Pop the error context stack */
1137 : 167063 : error_context_stack = errcallback.previous;
1138 : 167063 : }
1139 : :
1140 : : static void
2199 peter_e@gmx.net 1141 : 42 : truncate_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
1142 : : int nrelations, Relation relations[], ReorderBufferChange *change)
1143 : : {
1144 : 42 : LogicalDecodingContext *ctx = cache->private_data;
1145 : : LogicalErrorCallbackState state;
1146 : : ErrorContextCallback errcallback;
1147 : :
1148 [ - + ]: 42 : Assert(!ctx->fast_forward);
1149 : :
1150 [ - + ]: 42 : if (!ctx->callbacks.truncate_cb)
2199 peter_e@gmx.net 1151 :UBC 0 : return;
1152 : :
1153 : : /* Push callback + info on the error context stack */
2199 peter_e@gmx.net 1154 :CBC 42 : state.ctx = ctx;
1155 : 42 : state.callback_name = "truncate";
1156 : 42 : state.report_location = change->lsn;
1157 : 42 : errcallback.callback = output_plugin_error_callback;
1158 : 42 : errcallback.arg = (void *) &state;
1159 : 42 : errcallback.previous = error_context_stack;
1160 : 42 : error_context_stack = &errcallback;
1161 : :
1162 : : /* set output state */
1163 : 42 : ctx->accept_writes = true;
1164 : 42 : ctx->write_xid = txn->xid;
1165 : :
1166 : : /*
1167 : : * Report this change's lsn so replies from clients can give an up-to-date
1168 : : * answer. This won't ever be enough (and shouldn't be!) to confirm
1169 : : * receipt of this transaction, but it might allow another transaction's
1170 : : * commit to be confirmed with one message.
1171 : : */
1172 : 42 : ctx->write_location = change->lsn;
1173 : :
704 akapila@postgresql.o 1174 : 42 : ctx->end_xact = false;
1175 : :
2199 peter_e@gmx.net 1176 : 42 : ctx->callbacks.truncate_cb(ctx, txn, nrelations, relations, change);
1177 : :
1178 : : /* Pop the error context stack */
1179 : 42 : error_context_stack = errcallback.previous;
1180 : : }
1181 : :
1182 : : bool
1111 akapila@postgresql.o 1183 : 115 : filter_prepare_cb_wrapper(LogicalDecodingContext *ctx, TransactionId xid,
1184 : : const char *gid)
1185 : : {
1186 : : LogicalErrorCallbackState state;
1187 : : ErrorContextCallback errcallback;
1188 : : bool ret;
1189 : :
1201 1190 [ - + ]: 115 : Assert(!ctx->fast_forward);
1191 : :
1192 : : /* Push callback + info on the error context stack */
1193 : 115 : state.ctx = ctx;
1194 : 115 : state.callback_name = "filter_prepare";
1195 : 115 : state.report_location = InvalidXLogRecPtr;
1196 : 115 : errcallback.callback = output_plugin_error_callback;
1197 : 115 : errcallback.arg = (void *) &state;
1198 : 115 : errcallback.previous = error_context_stack;
1199 : 115 : error_context_stack = &errcallback;
1200 : :
1201 : : /* set output state */
1202 : 115 : ctx->accept_writes = false;
704 1203 : 115 : ctx->end_xact = false;
1204 : :
1205 : : /* do the actual work: call callback */
1111 1206 : 115 : ret = ctx->callbacks.filter_prepare_cb(ctx, xid, gid);
1207 : :
1208 : : /* Pop the error context stack */
1201 1209 : 115 : error_context_stack = errcallback.previous;
1210 : :
1211 : 115 : return ret;
1212 : : }
1213 : :
1214 : : bool
3273 andres@anarazel.de 1215 : 1507610 : filter_by_origin_cb_wrapper(LogicalDecodingContext *ctx, RepOriginId origin_id)
1216 : : {
1217 : : LogicalErrorCallbackState state;
1218 : : ErrorContextCallback errcallback;
1219 : : bool ret;
1220 : :
2279 simon@2ndQuadrant.co 1221 [ - + ]: 1507610 : Assert(!ctx->fast_forward);
1222 : :
1223 : : /* Push callback + info on the error context stack */
3273 andres@anarazel.de 1224 : 1507610 : state.ctx = ctx;
3040 rhaas@postgresql.org 1225 : 1507610 : state.callback_name = "filter_by_origin";
3273 andres@anarazel.de 1226 : 1507610 : state.report_location = InvalidXLogRecPtr;
1227 : 1507610 : errcallback.callback = output_plugin_error_callback;
1228 : 1507610 : errcallback.arg = (void *) &state;
1229 : 1507610 : errcallback.previous = error_context_stack;
1230 : 1507610 : error_context_stack = &errcallback;
1231 : :
1232 : : /* set output state */
1233 : 1507610 : ctx->accept_writes = false;
704 akapila@postgresql.o 1234 : 1507610 : ctx->end_xact = false;
1235 : :
1236 : : /* do the actual work: call callback */
3273 andres@anarazel.de 1237 : 1507610 : ret = ctx->callbacks.filter_by_origin_cb(ctx, origin_id);
1238 : :
1239 : : /* Pop the error context stack */
1240 : 1507610 : error_context_stack = errcallback.previous;
1241 : :
1242 : 1507610 : return ret;
1243 : : }
1244 : :
1245 : : static void
2930 simon@2ndQuadrant.co 1246 : 16 : message_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
1247 : : XLogRecPtr message_lsn, bool transactional,
1248 : : const char *prefix, Size message_size, const char *message)
1249 : : {
1250 : 16 : LogicalDecodingContext *ctx = cache->private_data;
1251 : : LogicalErrorCallbackState state;
1252 : : ErrorContextCallback errcallback;
1253 : :
2279 1254 [ - + ]: 16 : Assert(!ctx->fast_forward);
1255 : :
2930 1256 [ - + ]: 16 : if (ctx->callbacks.message_cb == NULL)
2930 simon@2ndQuadrant.co 1257 :UBC 0 : return;
1258 : :
1259 : : /* Push callback + info on the error context stack */
2930 simon@2ndQuadrant.co 1260 :CBC 16 : state.ctx = ctx;
1261 : 16 : state.callback_name = "message";
1262 : 16 : state.report_location = message_lsn;
1263 : 16 : errcallback.callback = output_plugin_error_callback;
1264 : 16 : errcallback.arg = (void *) &state;
1265 : 16 : errcallback.previous = error_context_stack;
1266 : 16 : error_context_stack = &errcallback;
1267 : :
1268 : : /* set output state */
1269 : 16 : ctx->accept_writes = true;
1270 [ + + ]: 16 : ctx->write_xid = txn != NULL ? txn->xid : InvalidTransactionId;
1271 : 16 : ctx->write_location = message_lsn;
704 akapila@postgresql.o 1272 : 16 : ctx->end_xact = false;
1273 : :
1274 : : /* do the actual work: call callback */
2930 simon@2ndQuadrant.co 1275 : 16 : ctx->callbacks.message_cb(ctx, txn, message_lsn, transactional, prefix,
1276 : : message_size, message);
1277 : :
1278 : : /* Pop the error context stack */
1279 : 16 : error_context_stack = errcallback.previous;
1280 : : }
1281 : :
1282 : : static void
1356 akapila@postgresql.o 1283 : 663 : stream_start_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
1284 : : XLogRecPtr first_lsn)
1285 : : {
1286 : 663 : LogicalDecodingContext *ctx = cache->private_data;
1287 : : LogicalErrorCallbackState state;
1288 : : ErrorContextCallback errcallback;
1289 : :
1290 [ - + ]: 663 : Assert(!ctx->fast_forward);
1291 : :
1292 : : /* We're only supposed to call this when streaming is supported. */
1293 [ - + ]: 663 : Assert(ctx->streaming);
1294 : :
1295 : : /* Push callback + info on the error context stack */
1296 : 663 : state.ctx = ctx;
1297 : 663 : state.callback_name = "stream_start";
1298 : 663 : state.report_location = first_lsn;
1299 : 663 : errcallback.callback = output_plugin_error_callback;
1300 : 663 : errcallback.arg = (void *) &state;
1301 : 663 : errcallback.previous = error_context_stack;
1302 : 663 : error_context_stack = &errcallback;
1303 : :
1304 : : /* set output state */
1305 : 663 : ctx->accept_writes = true;
1306 : 663 : ctx->write_xid = txn->xid;
1307 : :
1308 : : /*
1309 : : * Report this message's lsn so replies from clients can give an
1310 : : * up-to-date answer. This won't ever be enough (and shouldn't be!) to
1311 : : * confirm receipt of this transaction, but it might allow another
1312 : : * transaction's commit to be confirmed with one message.
1313 : : */
1314 : 663 : ctx->write_location = first_lsn;
1315 : :
704 1316 : 663 : ctx->end_xact = false;
1317 : :
1318 : : /* in streaming mode, stream_start_cb is required */
1356 1319 [ - + ]: 663 : if (ctx->callbacks.stream_start_cb == NULL)
1356 akapila@postgresql.o 1320 [ # # ]:UBC 0 : ereport(ERROR,
1321 : : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1322 : : errmsg("logical streaming requires a %s callback",
1323 : : "stream_start_cb")));
1324 : :
1356 akapila@postgresql.o 1325 :CBC 663 : ctx->callbacks.stream_start_cb(ctx, txn);
1326 : :
1327 : : /* Pop the error context stack */
1328 : 663 : error_context_stack = errcallback.previous;
1329 : 663 : }
1330 : :
1331 : : static void
1332 : 663 : stream_stop_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
1333 : : XLogRecPtr last_lsn)
1334 : : {
1335 : 663 : LogicalDecodingContext *ctx = cache->private_data;
1336 : : LogicalErrorCallbackState state;
1337 : : ErrorContextCallback errcallback;
1338 : :
1339 [ - + ]: 663 : Assert(!ctx->fast_forward);
1340 : :
1341 : : /* We're only supposed to call this when streaming is supported. */
1342 [ - + ]: 663 : Assert(ctx->streaming);
1343 : :
1344 : : /* Push callback + info on the error context stack */
1345 : 663 : state.ctx = ctx;
1346 : 663 : state.callback_name = "stream_stop";
1347 : 663 : state.report_location = last_lsn;
1348 : 663 : errcallback.callback = output_plugin_error_callback;
1349 : 663 : errcallback.arg = (void *) &state;
1350 : 663 : errcallback.previous = error_context_stack;
1351 : 663 : error_context_stack = &errcallback;
1352 : :
1353 : : /* set output state */
1354 : 663 : ctx->accept_writes = true;
1355 : 663 : ctx->write_xid = txn->xid;
1356 : :
1357 : : /*
1358 : : * Report this message's lsn so replies from clients can give an
1359 : : * up-to-date answer. This won't ever be enough (and shouldn't be!) to
1360 : : * confirm receipt of this transaction, but it might allow another
1361 : : * transaction's commit to be confirmed with one message.
1362 : : */
1363 : 663 : ctx->write_location = last_lsn;
1364 : :
704 1365 : 663 : ctx->end_xact = false;
1366 : :
1367 : : /* in streaming mode, stream_stop_cb is required */
1356 1368 [ - + ]: 663 : if (ctx->callbacks.stream_stop_cb == NULL)
1356 akapila@postgresql.o 1369 [ # # ]:UBC 0 : ereport(ERROR,
1370 : : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1371 : : errmsg("logical streaming requires a %s callback",
1372 : : "stream_stop_cb")));
1373 : :
1356 akapila@postgresql.o 1374 :CBC 663 : ctx->callbacks.stream_stop_cb(ctx, txn);
1375 : :
1376 : : /* Pop the error context stack */
1377 : 663 : error_context_stack = errcallback.previous;
1378 : 663 : }
1379 : :
1380 : : static void
1381 : 29 : stream_abort_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
1382 : : XLogRecPtr abort_lsn)
1383 : : {
1384 : 29 : LogicalDecodingContext *ctx = cache->private_data;
1385 : : LogicalErrorCallbackState state;
1386 : : ErrorContextCallback errcallback;
1387 : :
1388 [ - + ]: 29 : Assert(!ctx->fast_forward);
1389 : :
1390 : : /* We're only supposed to call this when streaming is supported. */
1391 [ - + ]: 29 : Assert(ctx->streaming);
1392 : :
1393 : : /* Push callback + info on the error context stack */
1394 : 29 : state.ctx = ctx;
1395 : 29 : state.callback_name = "stream_abort";
1396 : 29 : state.report_location = abort_lsn;
1397 : 29 : errcallback.callback = output_plugin_error_callback;
1398 : 29 : errcallback.arg = (void *) &state;
1399 : 29 : errcallback.previous = error_context_stack;
1400 : 29 : error_context_stack = &errcallback;
1401 : :
1402 : : /* set output state */
1403 : 29 : ctx->accept_writes = true;
1404 : 29 : ctx->write_xid = txn->xid;
1405 : 29 : ctx->write_location = abort_lsn;
704 1406 : 29 : ctx->end_xact = true;
1407 : :
1408 : : /* in streaming mode, stream_abort_cb is required */
1356 1409 [ - + ]: 29 : if (ctx->callbacks.stream_abort_cb == NULL)
1356 akapila@postgresql.o 1410 [ # # ]:UBC 0 : ereport(ERROR,
1411 : : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1412 : : errmsg("logical streaming requires a %s callback",
1413 : : "stream_abort_cb")));
1414 : :
1356 akapila@postgresql.o 1415 :CBC 29 : ctx->callbacks.stream_abort_cb(ctx, txn, abort_lsn);
1416 : :
1417 : : /* Pop the error context stack */
1418 : 29 : error_context_stack = errcallback.previous;
1419 : 29 : }
1420 : :
1421 : : static void
1201 1422 : 16 : stream_prepare_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
1423 : : XLogRecPtr prepare_lsn)
1424 : : {
1425 : 16 : LogicalDecodingContext *ctx = cache->private_data;
1426 : : LogicalErrorCallbackState state;
1427 : : ErrorContextCallback errcallback;
1428 : :
1429 [ - + ]: 16 : Assert(!ctx->fast_forward);
1430 : :
1431 : : /*
1432 : : * We're only supposed to call this when streaming and two-phase commits
1433 : : * are supported.
1434 : : */
1435 [ - + ]: 16 : Assert(ctx->streaming);
1436 [ - + ]: 16 : Assert(ctx->twophase);
1437 : :
1438 : : /* Push callback + info on the error context stack */
1439 : 16 : state.ctx = ctx;
1440 : 16 : state.callback_name = "stream_prepare";
1441 : 16 : state.report_location = txn->final_lsn;
1442 : 16 : errcallback.callback = output_plugin_error_callback;
1443 : 16 : errcallback.arg = (void *) &state;
1444 : 16 : errcallback.previous = error_context_stack;
1445 : 16 : error_context_stack = &errcallback;
1446 : :
1447 : : /* set output state */
1448 : 16 : ctx->accept_writes = true;
1449 : 16 : ctx->write_xid = txn->xid;
1450 : 16 : ctx->write_location = txn->end_lsn;
704 1451 : 16 : ctx->end_xact = true;
1452 : :
1453 : : /* in streaming mode with two-phase commits, stream_prepare_cb is required */
1201 1454 [ - + ]: 16 : if (ctx->callbacks.stream_prepare_cb == NULL)
1201 akapila@postgresql.o 1455 [ # # ]:UBC 0 : ereport(ERROR,
1456 : : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1457 : : errmsg("logical streaming at prepare time requires a %s callback",
1458 : : "stream_prepare_cb")));
1459 : :
1201 akapila@postgresql.o 1460 :CBC 16 : ctx->callbacks.stream_prepare_cb(ctx, txn, prepare_lsn);
1461 : :
1462 : : /* Pop the error context stack */
1463 : 16 : error_context_stack = errcallback.previous;
1464 : 16 : }
1465 : :
1466 : : static void
1356 1467 : 50 : stream_commit_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
1468 : : XLogRecPtr commit_lsn)
1469 : : {
1470 : 50 : LogicalDecodingContext *ctx = cache->private_data;
1471 : : LogicalErrorCallbackState state;
1472 : : ErrorContextCallback errcallback;
1473 : :
1474 [ - + ]: 50 : Assert(!ctx->fast_forward);
1475 : :
1476 : : /* We're only supposed to call this when streaming is supported. */
1477 [ - + ]: 50 : Assert(ctx->streaming);
1478 : :
1479 : : /* Push callback + info on the error context stack */
1480 : 50 : state.ctx = ctx;
1481 : 50 : state.callback_name = "stream_commit";
1482 : 50 : state.report_location = txn->final_lsn;
1483 : 50 : errcallback.callback = output_plugin_error_callback;
1484 : 50 : errcallback.arg = (void *) &state;
1485 : 50 : errcallback.previous = error_context_stack;
1486 : 50 : error_context_stack = &errcallback;
1487 : :
1488 : : /* set output state */
1489 : 50 : ctx->accept_writes = true;
1490 : 50 : ctx->write_xid = txn->xid;
1491 : 50 : ctx->write_location = txn->end_lsn;
704 1492 : 50 : ctx->end_xact = true;
1493 : :
1494 : : /* in streaming mode, stream_commit_cb is required */
1356 1495 [ - + ]: 50 : if (ctx->callbacks.stream_commit_cb == NULL)
1356 akapila@postgresql.o 1496 [ # # ]:UBC 0 : ereport(ERROR,
1497 : : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1498 : : errmsg("logical streaming requires a %s callback",
1499 : : "stream_commit_cb")));
1500 : :
1356 akapila@postgresql.o 1501 :CBC 50 : ctx->callbacks.stream_commit_cb(ctx, txn, commit_lsn);
1502 : :
1503 : : /* Pop the error context stack */
1504 : 50 : error_context_stack = errcallback.previous;
1505 : 50 : }
1506 : :
1507 : : static void
1508 : 176010 : stream_change_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
1509 : : Relation relation, ReorderBufferChange *change)
1510 : : {
1511 : 176010 : LogicalDecodingContext *ctx = cache->private_data;
1512 : : LogicalErrorCallbackState state;
1513 : : ErrorContextCallback errcallback;
1514 : :
1515 [ - + ]: 176010 : Assert(!ctx->fast_forward);
1516 : :
1517 : : /* We're only supposed to call this when streaming is supported. */
1518 [ - + ]: 176010 : Assert(ctx->streaming);
1519 : :
1520 : : /* Push callback + info on the error context stack */
1521 : 176010 : state.ctx = ctx;
1522 : 176010 : state.callback_name = "stream_change";
1523 : 176010 : state.report_location = change->lsn;
1524 : 176010 : errcallback.callback = output_plugin_error_callback;
1525 : 176010 : errcallback.arg = (void *) &state;
1526 : 176010 : errcallback.previous = error_context_stack;
1527 : 176010 : error_context_stack = &errcallback;
1528 : :
1529 : : /* set output state */
1530 : 176010 : ctx->accept_writes = true;
1531 : 176010 : ctx->write_xid = txn->xid;
1532 : :
1533 : : /*
1534 : : * Report this change's lsn so replies from clients can give an up-to-date
1535 : : * answer. This won't ever be enough (and shouldn't be!) to confirm
1536 : : * receipt of this transaction, but it might allow another transaction's
1537 : : * commit to be confirmed with one message.
1538 : : */
1539 : 176010 : ctx->write_location = change->lsn;
1540 : :
704 1541 : 176010 : ctx->end_xact = false;
1542 : :
1543 : : /* in streaming mode, stream_change_cb is required */
1356 1544 [ - + ]: 176010 : if (ctx->callbacks.stream_change_cb == NULL)
1356 akapila@postgresql.o 1545 [ # # ]:UBC 0 : ereport(ERROR,
1546 : : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1547 : : errmsg("logical streaming requires a %s callback",
1548 : : "stream_change_cb")));
1549 : :
1356 akapila@postgresql.o 1550 :CBC 176010 : ctx->callbacks.stream_change_cb(ctx, txn, relation, change);
1551 : :
1552 : : /* Pop the error context stack */
1553 : 176010 : error_context_stack = errcallback.previous;
1554 : 176010 : }
1555 : :
1556 : : static void
1557 : 3 : stream_message_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
1558 : : XLogRecPtr message_lsn, bool transactional,
1559 : : const char *prefix, Size message_size, const char *message)
1560 : : {
1561 : 3 : LogicalDecodingContext *ctx = cache->private_data;
1562 : : LogicalErrorCallbackState state;
1563 : : ErrorContextCallback errcallback;
1564 : :
1565 [ - + ]: 3 : Assert(!ctx->fast_forward);
1566 : :
1567 : : /* We're only supposed to call this when streaming is supported. */
1568 [ - + ]: 3 : Assert(ctx->streaming);
1569 : :
1570 : : /* this callback is optional */
1571 [ - + ]: 3 : if (ctx->callbacks.stream_message_cb == NULL)
1356 akapila@postgresql.o 1572 :UBC 0 : return;
1573 : :
1574 : : /* Push callback + info on the error context stack */
1356 akapila@postgresql.o 1575 :CBC 3 : state.ctx = ctx;
1576 : 3 : state.callback_name = "stream_message";
1577 : 3 : state.report_location = message_lsn;
1578 : 3 : errcallback.callback = output_plugin_error_callback;
1579 : 3 : errcallback.arg = (void *) &state;
1580 : 3 : errcallback.previous = error_context_stack;
1581 : 3 : error_context_stack = &errcallback;
1582 : :
1583 : : /* set output state */
1584 : 3 : ctx->accept_writes = true;
1585 [ + - ]: 3 : ctx->write_xid = txn != NULL ? txn->xid : InvalidTransactionId;
1586 : 3 : ctx->write_location = message_lsn;
704 1587 : 3 : ctx->end_xact = false;
1588 : :
1589 : : /* do the actual work: call callback */
1356 1590 : 3 : ctx->callbacks.stream_message_cb(ctx, txn, message_lsn, transactional, prefix,
1591 : : message_size, message);
1592 : :
1593 : : /* Pop the error context stack */
1594 : 3 : error_context_stack = errcallback.previous;
1595 : : }
1596 : :
1597 : : static void
1356 akapila@postgresql.o 1598 :UBC 0 : stream_truncate_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
1599 : : int nrelations, Relation relations[],
1600 : : ReorderBufferChange *change)
1601 : : {
1602 : 0 : LogicalDecodingContext *ctx = cache->private_data;
1603 : : LogicalErrorCallbackState state;
1604 : : ErrorContextCallback errcallback;
1605 : :
1606 [ # # ]: 0 : Assert(!ctx->fast_forward);
1607 : :
1608 : : /* We're only supposed to call this when streaming is supported. */
1609 [ # # ]: 0 : Assert(ctx->streaming);
1610 : :
1611 : : /* this callback is optional */
1612 [ # # ]: 0 : if (!ctx->callbacks.stream_truncate_cb)
1613 : 0 : return;
1614 : :
1615 : : /* Push callback + info on the error context stack */
1616 : 0 : state.ctx = ctx;
1617 : 0 : state.callback_name = "stream_truncate";
1618 : 0 : state.report_location = change->lsn;
1619 : 0 : errcallback.callback = output_plugin_error_callback;
1620 : 0 : errcallback.arg = (void *) &state;
1621 : 0 : errcallback.previous = error_context_stack;
1622 : 0 : error_context_stack = &errcallback;
1623 : :
1624 : : /* set output state */
1625 : 0 : ctx->accept_writes = true;
1626 : 0 : ctx->write_xid = txn->xid;
1627 : :
1628 : : /*
1629 : : * Report this change's lsn so replies from clients can give an up-to-date
1630 : : * answer. This won't ever be enough (and shouldn't be!) to confirm
1631 : : * receipt of this transaction, but it might allow another transaction's
1632 : : * commit to be confirmed with one message.
1633 : : */
1634 : 0 : ctx->write_location = change->lsn;
1635 : :
704 1636 : 0 : ctx->end_xact = false;
1637 : :
1356 1638 : 0 : ctx->callbacks.stream_truncate_cb(ctx, txn, nrelations, relations, change);
1639 : :
1640 : : /* Pop the error context stack */
1641 : 0 : error_context_stack = errcallback.previous;
1642 : : }
1643 : :
1644 : : static void
431 akapila@postgresql.o 1645 :CBC 3187 : update_progress_txn_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
1646 : : XLogRecPtr lsn)
1647 : : {
1648 : 3187 : LogicalDecodingContext *ctx = cache->private_data;
1649 : : LogicalErrorCallbackState state;
1650 : : ErrorContextCallback errcallback;
1651 : :
1652 [ - + ]: 3187 : Assert(!ctx->fast_forward);
1653 : :
1654 : : /* Push callback + info on the error context stack */
1655 : 3187 : state.ctx = ctx;
1656 : 3187 : state.callback_name = "update_progress_txn";
1657 : 3187 : state.report_location = lsn;
1658 : 3187 : errcallback.callback = output_plugin_error_callback;
1659 : 3187 : errcallback.arg = (void *) &state;
1660 : 3187 : errcallback.previous = error_context_stack;
1661 : 3187 : error_context_stack = &errcallback;
1662 : :
1663 : : /* set output state */
1664 : 3187 : ctx->accept_writes = false;
1665 : 3187 : ctx->write_xid = txn->xid;
1666 : :
1667 : : /*
1668 : : * Report this change's lsn so replies from clients can give an up-to-date
1669 : : * answer. This won't ever be enough (and shouldn't be!) to confirm
1670 : : * receipt of this transaction, but it might allow another transaction's
1671 : : * commit to be confirmed with one message.
1672 : : */
1673 : 3187 : ctx->write_location = lsn;
1674 : :
1675 : 3187 : ctx->end_xact = false;
1676 : :
1677 : 3187 : OutputPluginUpdateProgress(ctx, false);
1678 : :
1679 : : /* Pop the error context stack */
1680 : 3187 : error_context_stack = errcallback.previous;
1681 : 3187 : }
1682 : :
1683 : : /*
1684 : : * Set the required catalog xmin horizon for historic snapshots in the current
1685 : : * replication slot.
1686 : : *
1687 : : * Note that in the most cases, we won't be able to immediately use the xmin
1688 : : * to increase the xmin horizon: we need to wait till the client has confirmed
1689 : : * receiving current_lsn with LogicalConfirmReceivedLocation().
1690 : : */
1691 : : void
3695 rhaas@postgresql.org 1692 : 409 : LogicalIncreaseXminForSlot(XLogRecPtr current_lsn, TransactionId xmin)
1693 : : {
3631 bruce@momjian.us 1694 : 409 : bool updated_xmin = false;
1695 : : ReplicationSlot *slot;
950 akapila@postgresql.o 1696 : 409 : bool got_new_xmin = false;
1697 : :
3695 rhaas@postgresql.org 1698 : 409 : slot = MyReplicationSlot;
1699 : :
1700 [ - + ]: 409 : Assert(slot != NULL);
1701 : :
1702 [ - + ]: 409 : SpinLockAcquire(&slot->mutex);
1703 : :
1704 : : /*
1705 : : * don't overwrite if we already have a newer xmin. This can happen if we
1706 : : * restart decoding in a slot.
1707 : : */
1708 [ + + ]: 409 : if (TransactionIdPrecedesOrEquals(xmin, slot->data.catalog_xmin))
1709 : : {
1710 : : }
1711 : :
1712 : : /*
1713 : : * If the client has already confirmed up to this lsn, we directly can
1714 : : * mark this as accepted. This can happen if we restart decoding in a
1715 : : * slot.
1716 : : */
1717 [ + + ]: 104 : else if (current_lsn <= slot->data.confirmed_flush)
1718 : : {
1719 : 54 : slot->candidate_catalog_xmin = xmin;
1720 : 54 : slot->candidate_xmin_lsn = current_lsn;
1721 : :
1722 : : /* our candidate can directly be used */
1723 : 54 : updated_xmin = true;
1724 : : }
1725 : :
1726 : : /*
1727 : : * Only increase if the previous values have been applied, otherwise we
1728 : : * might never end up updating if the receiver acks too slowly.
1729 : : */
1730 [ + + ]: 50 : else if (slot->candidate_xmin_lsn == InvalidXLogRecPtr)
1731 : : {
1732 : 25 : slot->candidate_catalog_xmin = xmin;
1733 : 25 : slot->candidate_xmin_lsn = current_lsn;
1734 : :
1735 : : /*
1736 : : * Log new xmin at an appropriate log level after releasing the
1737 : : * spinlock.
1738 : : */
950 akapila@postgresql.o 1739 : 25 : got_new_xmin = true;
1740 : : }
3695 rhaas@postgresql.org 1741 : 409 : SpinLockRelease(&slot->mutex);
1742 : :
950 akapila@postgresql.o 1743 [ + + ]: 409 : if (got_new_xmin)
1744 [ + + ]: 25 : elog(DEBUG1, "got new catalog xmin %u at %X/%X", xmin,
1745 : : LSN_FORMAT_ARGS(current_lsn));
1746 : :
1747 : : /* candidate already valid with the current flush position, apply */
3695 rhaas@postgresql.org 1748 [ + + ]: 409 : if (updated_xmin)
1749 : 54 : LogicalConfirmReceivedLocation(slot->data.confirmed_flush);
1750 : 409 : }
1751 : :
1752 : : /*
1753 : : * Mark the minimal LSN (restart_lsn) we need to read to replay all
1754 : : * transactions that have not yet committed at current_lsn.
1755 : : *
1756 : : * Just like LogicalIncreaseXminForSlot this only takes effect when the
1757 : : * client has confirmed to have received current_lsn.
1758 : : */
1759 : : void
1760 : 368 : LogicalIncreaseRestartDecodingForSlot(XLogRecPtr current_lsn, XLogRecPtr restart_lsn)
1761 : : {
3631 bruce@momjian.us 1762 : 368 : bool updated_lsn = false;
1763 : : ReplicationSlot *slot;
1764 : :
3695 rhaas@postgresql.org 1765 : 368 : slot = MyReplicationSlot;
1766 : :
1767 [ - + ]: 368 : Assert(slot != NULL);
1768 [ - + ]: 368 : Assert(restart_lsn != InvalidXLogRecPtr);
1769 [ - + ]: 368 : Assert(current_lsn != InvalidXLogRecPtr);
1770 : :
1771 [ - + ]: 368 : SpinLockAcquire(&slot->mutex);
1772 : :
1773 : : /* don't overwrite if have a newer restart lsn */
1774 [ + + ]: 368 : if (restart_lsn <= slot->data.restart_lsn)
1775 : : {
1776 : : }
1777 : :
1778 : : /*
1779 : : * We might have already flushed far enough to directly accept this lsn,
1780 : : * in this case there is no need to check for existing candidate LSNs
1781 : : */
1782 [ + + ]: 357 : else if (current_lsn <= slot->data.confirmed_flush)
1783 : : {
1784 : 270 : slot->candidate_restart_valid = current_lsn;
1785 : 270 : slot->candidate_restart_lsn = restart_lsn;
1786 : :
1787 : : /* our candidate can directly be used */
1788 : 270 : updated_lsn = true;
1789 : : }
1790 : :
1791 : : /*
1792 : : * Only increase if the previous values have been applied, otherwise we
1793 : : * might never end up updating if the receiver acks too slowly. A missed
1794 : : * value here will just cause some extra effort after reconnecting.
1795 : : */
1796 [ + + ]: 368 : if (slot->candidate_restart_valid == InvalidXLogRecPtr)
1797 : : {
1798 : 57 : slot->candidate_restart_valid = current_lsn;
1799 : 57 : slot->candidate_restart_lsn = restart_lsn;
1410 michael@paquier.xyz 1800 : 57 : SpinLockRelease(&slot->mutex);
1801 : :
3695 rhaas@postgresql.org 1802 [ + + ]: 57 : elog(DEBUG1, "got new restart lsn %X/%X at %X/%X",
1803 : : LSN_FORMAT_ARGS(restart_lsn),
1804 : : LSN_FORMAT_ARGS(current_lsn));
1805 : : }
1806 : : else
1807 : : {
1808 : : XLogRecPtr candidate_restart_lsn;
1809 : : XLogRecPtr candidate_restart_valid;
1810 : : XLogRecPtr confirmed_flush;
1811 : :
1410 michael@paquier.xyz 1812 : 311 : candidate_restart_lsn = slot->candidate_restart_lsn;
1813 : 311 : candidate_restart_valid = slot->candidate_restart_valid;
1814 : 311 : confirmed_flush = slot->data.confirmed_flush;
1815 : 311 : SpinLockRelease(&slot->mutex);
1816 : :
3695 rhaas@postgresql.org 1817 [ + + ]: 311 : elog(DEBUG1, "failed to increase restart lsn: proposed %X/%X, after %X/%X, current candidate %X/%X, current after %X/%X, flushed up to %X/%X",
1818 : : LSN_FORMAT_ARGS(restart_lsn),
1819 : : LSN_FORMAT_ARGS(current_lsn),
1820 : : LSN_FORMAT_ARGS(candidate_restart_lsn),
1821 : : LSN_FORMAT_ARGS(candidate_restart_valid),
1822 : : LSN_FORMAT_ARGS(confirmed_flush));
1823 : : }
1824 : :
1825 : : /* candidates are already valid with the current flush position, apply */
1826 [ + + ]: 368 : if (updated_lsn)
1827 : 270 : LogicalConfirmReceivedLocation(slot->data.confirmed_flush);
1828 : 368 : }
1829 : :
1830 : : /*
1831 : : * Handle a consumer's confirmation having received all changes up to lsn.
1832 : : */
1833 : : void
1834 : 46969 : LogicalConfirmReceivedLocation(XLogRecPtr lsn)
1835 : : {
1836 [ - + ]: 46969 : Assert(lsn != InvalidXLogRecPtr);
1837 : :
1838 : : /* Do an unlocked check for candidate_lsn first. */
1839 [ + + ]: 46969 : if (MyReplicationSlot->candidate_xmin_lsn != InvalidXLogRecPtr ||
1840 [ + + ]: 46886 : MyReplicationSlot->candidate_restart_valid != InvalidXLogRecPtr)
1841 : 387 : {
1842 : 387 : bool updated_xmin = false;
1843 : 387 : bool updated_restart = false;
1844 : :
3113 1845 [ - + ]: 387 : SpinLockAcquire(&MyReplicationSlot->mutex);
1846 : :
1847 : 387 : MyReplicationSlot->data.confirmed_flush = lsn;
1848 : :
1849 : : /* if we're past the location required for bumping xmin, do so */
1850 [ + + ]: 387 : if (MyReplicationSlot->candidate_xmin_lsn != InvalidXLogRecPtr &&
1851 [ + + ]: 83 : MyReplicationSlot->candidate_xmin_lsn <= lsn)
1852 : : {
1853 : : /*
1854 : : * We have to write the changed xmin to disk *before* we change
1855 : : * the in-memory value, otherwise after a crash we wouldn't know
1856 : : * that some catalog tuples might have been removed already.
1857 : : *
1858 : : * Ensure that by first writing to ->xmin and only update
1859 : : * ->effective_xmin once the new state is synced to disk. After a
1860 : : * crash ->effective_xmin is set to ->xmin.
1861 : : */
1862 [ + - ]: 71 : if (TransactionIdIsValid(MyReplicationSlot->candidate_catalog_xmin) &&
1863 [ + - ]: 71 : MyReplicationSlot->data.catalog_xmin != MyReplicationSlot->candidate_catalog_xmin)
1864 : : {
1865 : 71 : MyReplicationSlot->data.catalog_xmin = MyReplicationSlot->candidate_catalog_xmin;
1866 : 71 : MyReplicationSlot->candidate_catalog_xmin = InvalidTransactionId;
1867 : 71 : MyReplicationSlot->candidate_xmin_lsn = InvalidXLogRecPtr;
3695 1868 : 71 : updated_xmin = true;
1869 : : }
1870 : : }
1871 : :
3113 1872 [ + + ]: 387 : if (MyReplicationSlot->candidate_restart_valid != InvalidXLogRecPtr &&
1873 [ + + ]: 333 : MyReplicationSlot->candidate_restart_valid <= lsn)
1874 : : {
1875 [ - + ]: 320 : Assert(MyReplicationSlot->candidate_restart_lsn != InvalidXLogRecPtr);
1876 : :
1877 : 320 : MyReplicationSlot->data.restart_lsn = MyReplicationSlot->candidate_restart_lsn;
1878 : 320 : MyReplicationSlot->candidate_restart_lsn = InvalidXLogRecPtr;
1879 : 320 : MyReplicationSlot->candidate_restart_valid = InvalidXLogRecPtr;
3695 1880 : 320 : updated_restart = true;
1881 : : }
1882 : :
3113 1883 : 387 : SpinLockRelease(&MyReplicationSlot->mutex);
1884 : :
1885 : : /* first write new xmin to disk, so we know what's up after a crash */
3695 1886 [ + + + + ]: 387 : if (updated_xmin || updated_restart)
1887 : : {
1888 : 374 : ReplicationSlotMarkDirty();
1889 : 374 : ReplicationSlotSave();
1890 [ - + ]: 374 : elog(DEBUG1, "updated xmin: %u restart: %u", updated_xmin, updated_restart);
1891 : : }
1892 : :
1893 : : /*
1894 : : * Now the new xmin is safely on disk, we can let the global value
1895 : : * advance. We do not take ProcArrayLock or similar since we only
1896 : : * advance xmin here and there's not much harm done by a concurrent
1897 : : * computation missing that.
1898 : : */
1899 [ + + ]: 387 : if (updated_xmin)
1900 : : {
3113 1901 [ - + ]: 71 : SpinLockAcquire(&MyReplicationSlot->mutex);
1902 : 71 : MyReplicationSlot->effective_catalog_xmin = MyReplicationSlot->data.catalog_xmin;
1903 : 71 : SpinLockRelease(&MyReplicationSlot->mutex);
1904 : :
3695 1905 : 71 : ReplicationSlotsComputeRequiredXmin(false);
1906 : 71 : ReplicationSlotsComputeRequiredLSN();
1907 : : }
1908 : : }
1909 : : else
1910 : : {
3113 1911 [ - + ]: 46582 : SpinLockAcquire(&MyReplicationSlot->mutex);
1912 : 46582 : MyReplicationSlot->data.confirmed_flush = lsn;
1913 : 46582 : SpinLockRelease(&MyReplicationSlot->mutex);
1914 : : }
3695 1915 : 46969 : }
1916 : :
1917 : : /*
1918 : : * Clear logical streaming state during (sub)transaction abort.
1919 : : */
1920 : : void
1345 akapila@postgresql.o 1921 : 27381 : ResetLogicalStreamingState(void)
1922 : : {
1923 : 27381 : CheckXidAlive = InvalidTransactionId;
1924 : 27381 : bsysscan = false;
1925 : 27381 : }
1926 : :
1927 : : /*
1928 : : * Report stats for a slot.
1929 : : */
1930 : : void
1284 1931 : 5385 : UpdateDecodingStats(LogicalDecodingContext *ctx)
1932 : : {
1933 : 5385 : ReorderBuffer *rb = ctx->reorder;
1934 : : PgStat_StatReplSlotEntry repSlotStat;
1935 : :
1936 : : /* Nothing to do if we don't have any replication stats to be sent. */
1094 1937 [ + + + + : 5385 : if (rb->spillBytes <= 0 && rb->streamBytes <= 0 && rb->totalBytes <= 0)
+ + ]
1284 1938 : 246 : return;
1939 : :
1094 1940 [ + + ]: 5139 : elog(DEBUG2, "UpdateDecodingStats: updating stats %p %lld %lld %lld %lld %lld %lld %lld %lld",
1941 : : rb,
1942 : : (long long) rb->spillTxns,
1943 : : (long long) rb->spillCount,
1944 : : (long long) rb->spillBytes,
1945 : : (long long) rb->streamTxns,
1946 : : (long long) rb->streamCount,
1947 : : (long long) rb->streamBytes,
1948 : : (long long) rb->totalTxns,
1949 : : (long long) rb->totalBytes);
1950 : :
1096 1951 : 5139 : repSlotStat.spill_txns = rb->spillTxns;
1952 : 5139 : repSlotStat.spill_count = rb->spillCount;
1953 : 5139 : repSlotStat.spill_bytes = rb->spillBytes;
1954 : 5139 : repSlotStat.stream_txns = rb->streamTxns;
1955 : 5139 : repSlotStat.stream_count = rb->streamCount;
1956 : 5139 : repSlotStat.stream_bytes = rb->streamBytes;
1094 1957 : 5139 : repSlotStat.total_txns = rb->totalTxns;
1958 : 5139 : repSlotStat.total_bytes = rb->totalBytes;
1959 : :
739 andres@anarazel.de 1960 : 5139 : pgstat_report_replslot(ctx->slot, &repSlotStat);
1961 : :
1284 akapila@postgresql.o 1962 : 5139 : rb->spillTxns = 0;
1963 : 5139 : rb->spillCount = 0;
1964 : 5139 : rb->spillBytes = 0;
1263 1965 : 5139 : rb->streamTxns = 0;
1966 : 5139 : rb->streamCount = 0;
1967 : 5139 : rb->streamBytes = 0;
1094 1968 : 5139 : rb->totalTxns = 0;
1969 : 5139 : rb->totalBytes = 0;
1970 : : }
1971 : :
1972 : : /*
1973 : : * Read up to the end of WAL starting from the decoding slot's restart_lsn.
1974 : : * Return true if any meaningful/decodable WAL records are encountered,
1975 : : * otherwise false.
1976 : : */
1977 : : bool
171 akapila@postgresql.o 1978 :GNC 5 : LogicalReplicationSlotHasPendingWal(XLogRecPtr end_of_wal)
1979 : : {
1980 : 5 : bool has_pending_wal = false;
1981 : :
1982 [ - + ]: 5 : Assert(MyReplicationSlot);
1983 : :
1984 [ + - ]: 5 : PG_TRY();
1985 : : {
1986 : : LogicalDecodingContext *ctx;
1987 : :
1988 : : /*
1989 : : * Create our decoding context in fast_forward mode, passing start_lsn
1990 : : * as InvalidXLogRecPtr, so that we start processing from the slot's
1991 : : * confirmed_flush.
1992 : : */
1993 : 10 : ctx = CreateDecodingContext(InvalidXLogRecPtr,
1994 : : NIL,
1995 : : true, /* fast_forward */
1996 : 5 : XL_ROUTINE(.page_read = read_local_xlog_page,
1997 : : .segment_open = wal_segment_open,
1998 : : .segment_close = wal_segment_close),
1999 : : NULL, NULL, NULL);
2000 : :
2001 : : /*
2002 : : * Start reading at the slot's restart_lsn, which we know points to a
2003 : : * valid record.
2004 : : */
2005 : 5 : XLogBeginRead(ctx->reader, MyReplicationSlot->data.restart_lsn);
2006 : :
2007 : : /* Invalidate non-timetravel entries */
2008 : 5 : InvalidateSystemCaches();
2009 : :
2010 : : /* Loop until the end of WAL or some changes are processed */
2011 [ + + + + ]: 150 : while (!has_pending_wal && ctx->reader->EndRecPtr < end_of_wal)
2012 : : {
2013 : : XLogRecord *record;
2014 : 145 : char *errm = NULL;
2015 : :
2016 : 145 : record = XLogReadRecord(ctx->reader, &errm);
2017 : :
2018 [ - + ]: 145 : if (errm)
171 akapila@postgresql.o 2019 [ # # ]:UNC 0 : elog(ERROR, "could not find record for logical decoding: %s", errm);
2020 : :
171 akapila@postgresql.o 2021 [ + - ]:GNC 145 : if (record != NULL)
2022 : 145 : LogicalDecodingProcessRecord(ctx, ctx->reader);
2023 : :
2024 : 145 : has_pending_wal = ctx->processing_required;
2025 : :
2026 [ - + ]: 145 : CHECK_FOR_INTERRUPTS();
2027 : : }
2028 : :
2029 : : /* Clean up */
2030 : 5 : FreeDecodingContext(ctx);
2031 : 5 : InvalidateSystemCaches();
2032 : : }
171 akapila@postgresql.o 2033 :UNC 0 : PG_CATCH();
2034 : : {
2035 : : /* clear all timetravel entries */
2036 : 0 : InvalidateSystemCaches();
2037 : :
2038 : 0 : PG_RE_THROW();
2039 : : }
171 akapila@postgresql.o 2040 [ - + ]:GNC 5 : PG_END_TRY();
2041 : :
2042 : 5 : return has_pending_wal;
2043 : : }
2044 : :
2045 : : /*
2046 : : * Helper function for advancing our logical replication slot forward.
2047 : : *
2048 : : * The slot's restart_lsn is used as start point for reading records, while
2049 : : * confirmed_flush is used as base point for the decoding context.
2050 : : *
2051 : : * We cannot just do LogicalConfirmReceivedLocation to update confirmed_flush,
2052 : : * because we need to digest WAL to advance restart_lsn allowing to recycle
2053 : : * WAL and removal of old catalog tuples. As decoding is done in fast_forward
2054 : : * mode, no changes are generated anyway.
2055 : : *
2056 : : * *found_consistent_snapshot will be true if the initial decoding snapshot has
2057 : : * been built; Otherwise, it will be false.
2058 : : */
2059 : : XLogRecPtr
11 2060 : 13 : LogicalSlotAdvanceAndCheckSnapState(XLogRecPtr moveto,
2061 : : bool *found_consistent_snapshot)
2062 : : {
2063 : : LogicalDecodingContext *ctx;
2064 : 13 : ResourceOwner old_resowner = CurrentResourceOwner;
2065 : : XLogRecPtr retlsn;
2066 : :
2067 [ - + ]: 13 : Assert(moveto != InvalidXLogRecPtr);
2068 : :
2069 [ + + ]: 13 : if (found_consistent_snapshot)
2070 : 4 : *found_consistent_snapshot = false;
2071 : :
2072 [ + - ]: 13 : PG_TRY();
2073 : : {
2074 : : /*
2075 : : * Create our decoding context in fast_forward mode, passing start_lsn
2076 : : * as InvalidXLogRecPtr, so that we start processing from my slot's
2077 : : * confirmed_flush.
2078 : : */
2079 : 26 : ctx = CreateDecodingContext(InvalidXLogRecPtr,
2080 : : NIL,
2081 : : true, /* fast_forward */
2082 : 13 : XL_ROUTINE(.page_read = read_local_xlog_page,
2083 : : .segment_open = wal_segment_open,
2084 : : .segment_close = wal_segment_close),
2085 : : NULL, NULL, NULL);
2086 : :
2087 : : /*
2088 : : * Wait for specified streaming replication standby servers (if any)
2089 : : * to confirm receipt of WAL up to moveto lsn.
2090 : : */
2091 : 13 : WaitForStandbyConfirmation(moveto);
2092 : :
2093 : : /*
2094 : : * Start reading at the slot's restart_lsn, which we know to point to
2095 : : * a valid record.
2096 : : */
2097 : 13 : XLogBeginRead(ctx->reader, MyReplicationSlot->data.restart_lsn);
2098 : :
2099 : : /* invalidate non-timetravel entries */
2100 : 13 : InvalidateSystemCaches();
2101 : :
2102 : : /* Decode records until we reach the requested target */
2103 [ + + ]: 1692 : while (ctx->reader->EndRecPtr < moveto)
2104 : : {
2105 : 1679 : char *errm = NULL;
2106 : : XLogRecord *record;
2107 : :
2108 : : /*
2109 : : * Read records. No changes are generated in fast_forward mode,
2110 : : * but snapbuilder/slot statuses are updated properly.
2111 : : */
2112 : 1679 : record = XLogReadRecord(ctx->reader, &errm);
2113 [ - + ]: 1679 : if (errm)
11 akapila@postgresql.o 2114 [ # # ]:UNC 0 : elog(ERROR, "could not find record while advancing replication slot: %s",
2115 : : errm);
2116 : :
2117 : : /*
2118 : : * Process the record. Storage-level changes are ignored in
2119 : : * fast_forward mode, but other modules (such as snapbuilder)
2120 : : * might still have critical updates to do.
2121 : : */
11 akapila@postgresql.o 2122 [ + - ]:GNC 1679 : if (record)
2123 : 1679 : LogicalDecodingProcessRecord(ctx, ctx->reader);
2124 : :
2125 [ - + ]: 1679 : CHECK_FOR_INTERRUPTS();
2126 : : }
2127 : :
2128 [ + + + - ]: 13 : if (found_consistent_snapshot && DecodingContextReady(ctx))
2129 : 4 : *found_consistent_snapshot = true;
2130 : :
2131 : : /*
2132 : : * Logical decoding could have clobbered CurrentResourceOwner during
2133 : : * transaction management, so restore the executor's value. (This is
2134 : : * a kluge, but it's not worth cleaning up right now.)
2135 : : */
2136 : 13 : CurrentResourceOwner = old_resowner;
2137 : :
2138 [ + - ]: 13 : if (ctx->reader->EndRecPtr != InvalidXLogRecPtr)
2139 : : {
2140 : 13 : LogicalConfirmReceivedLocation(moveto);
2141 : :
2142 : : /*
2143 : : * If only the confirmed_flush LSN has changed the slot won't get
2144 : : * marked as dirty by the above. Callers on the walsender
2145 : : * interface are expected to keep track of their own progress and
2146 : : * don't need it written out. But SQL-interface users cannot
2147 : : * specify their own start positions and it's harder for them to
2148 : : * keep track of their progress, so we should make more of an
2149 : : * effort to save it for them.
2150 : : *
2151 : : * Dirty the slot so it is written out at the next checkpoint. The
2152 : : * LSN position advanced to may still be lost on a crash but this
2153 : : * makes the data consistent after a clean shutdown.
2154 : : */
2155 : 13 : ReplicationSlotMarkDirty();
2156 : : }
2157 : :
2158 : 13 : retlsn = MyReplicationSlot->data.confirmed_flush;
2159 : :
2160 : : /* free context, call shutdown callback */
2161 : 13 : FreeDecodingContext(ctx);
2162 : :
2163 : 13 : InvalidateSystemCaches();
2164 : : }
11 akapila@postgresql.o 2165 :UNC 0 : PG_CATCH();
2166 : : {
2167 : : /* clear all timetravel entries */
2168 : 0 : InvalidateSystemCaches();
2169 : :
2170 : 0 : PG_RE_THROW();
2171 : : }
11 akapila@postgresql.o 2172 [ - + ]:GNC 13 : PG_END_TRY();
2173 : :
2174 : 13 : return retlsn;
2175 : : }
|