Age Owner TLA Line data Source code
1 : /*-------------------------------------------------------------------------
2 : *
3 : * snapbuild.c
4 : *
5 : * Infrastructure for building historic catalog snapshots based on contents
6 : * of the WAL, for the purpose of decoding heapam.c style values in the
7 : * WAL.
8 : *
9 : * NOTES:
10 : *
11 : * We build snapshots which can *only* be used to read catalog contents and we
12 : * do so by reading and interpreting the WAL stream. The aim is to build a
13 : * snapshot that behaves the same as a freshly taken MVCC snapshot would have
14 : * at the time the XLogRecord was generated.
15 : *
16 : * To build the snapshots we reuse the infrastructure built for Hot
17 : * Standby. The in-memory snapshots we build look different than HS' because
18 : * we have different needs. To successfully decode data from the WAL we only
19 : * need to access catalog tables and (sys|rel|cat)cache, not the actual user
20 : * tables since the data we decode is wholly contained in the WAL
21 : * records. Also, our snapshots need to be different in comparison to normal
22 : * MVCC ones because in contrast to those we cannot fully rely on the clog and
23 : * pg_subtrans for information about committed transactions because they might
24 : * commit in the future from the POV of the WAL entry we're currently
25 : * decoding. This definition has the advantage that we only need to prevent
26 : * removal of catalog rows, while normal table's rows can still be
27 : * removed. This is achieved by using the replication slot mechanism.
28 : *
29 : * As the percentage of transactions modifying the catalog normally is fairly
30 : * small in comparisons to ones only manipulating user data, we keep track of
31 : * the committed catalog modifying ones inside [xmin, xmax) instead of keeping
32 : * track of all running transactions like it's done in a normal snapshot. Note
33 : * that we're generally only looking at transactions that have acquired an
34 : * xid. That is we keep a list of transactions between snapshot->(xmin, xmax)
35 : * that we consider committed, everything else is considered aborted/in
36 : * progress. That also allows us not to care about subtransactions before they
37 : * have committed which means this module, in contrast to HS, doesn't have to
38 : * care about suboverflowed subtransactions and similar.
39 : *
40 : * One complexity of doing this is that to e.g. handle mixed DDL/DML
41 : * transactions we need Snapshots that see intermediate versions of the
42 : * catalog in a transaction. During normal operation this is achieved by using
43 : * CommandIds/cmin/cmax. The problem with that however is that for space
44 : * efficiency reasons only one value of that is stored
45 : * (cf. combocid.c). Since combo CIDs are only available in memory we log
46 : * additional information which allows us to get the original (cmin, cmax)
47 : * pair during visibility checks. Check the reorderbuffer.c's comment above
48 : * ResolveCminCmaxDuringDecoding() for details.
49 : *
50 : * To facilitate all this we need our own visibility routine, as the normal
51 : * ones are optimized for different usecases.
52 : *
53 : * To replace the normal catalog snapshots with decoding ones use the
54 : * SetupHistoricSnapshot() and TeardownHistoricSnapshot() functions.
55 : *
56 : *
57 : *
58 : * The snapbuild machinery is starting up in several stages, as illustrated
59 : * by the following graph describing the SnapBuild->state transitions:
60 : *
61 : * +-------------------------+
62 : * +----| START |-------------+
63 : * | +-------------------------+ |
64 : * | | |
65 : * | | |
66 : * | running_xacts #1 |
67 : * | | |
68 : * | | |
69 : * | v |
70 : * | +-------------------------+ v
71 : * | | BUILDING_SNAPSHOT |------------>|
72 : * | +-------------------------+ |
73 : * | | |
74 : * | | |
75 : * | running_xacts #2, xacts from #1 finished |
76 : * | | |
77 : * | | |
78 : * | v |
79 : * | +-------------------------+ v
80 : * | | FULL_SNAPSHOT |------------>|
81 : * | +-------------------------+ |
82 : * | | |
83 : * running_xacts | saved snapshot
84 : * with zero xacts | at running_xacts's lsn
85 : * | | |
86 : * | running_xacts with xacts from #2 finished |
87 : * | | |
88 : * | v |
89 : * | +-------------------------+ |
90 : * +--->|SNAPBUILD_CONSISTENT |<------------+
91 : * +-------------------------+
92 : *
93 : * Initially the machinery is in the START stage. When an xl_running_xacts
94 : * record is read that is sufficiently new (above the safe xmin horizon),
95 : * there's a state transition. If there were no running xacts when the
96 : * xl_running_xacts record was generated, we'll directly go into CONSISTENT
97 : * state, otherwise we'll switch to the BUILDING_SNAPSHOT state. Having a full
98 : * snapshot means that all transactions that start henceforth can be decoded
99 : * in their entirety, but transactions that started previously can't. In
100 : * FULL_SNAPSHOT we'll switch into CONSISTENT once all those previously
101 : * running transactions have committed or aborted.
102 : *
103 : * Only transactions that commit after CONSISTENT state has been reached will
104 : * be replayed, even though they might have started while still in
105 : * FULL_SNAPSHOT. That ensures that we'll reach a point where no previous
106 : * changes has been exported, but all the following ones will be. That point
107 : * is a convenient point to initialize replication from, which is why we
108 : * export a snapshot at that point, which *can* be used to read normal data.
109 : *
110 : * Copyright (c) 2012-2023, PostgreSQL Global Development Group
111 : *
112 : * IDENTIFICATION
113 : * src/backend/replication/logical/snapbuild.c
114 : *
115 : *-------------------------------------------------------------------------
116 : */
117 :
118 : #include "postgres.h"
119 :
120 : #include <sys/stat.h>
121 : #include <unistd.h>
122 :
123 : #include "access/heapam_xlog.h"
124 : #include "access/transam.h"
125 : #include "access/xact.h"
126 : #include "common/file_utils.h"
127 : #include "miscadmin.h"
128 : #include "pgstat.h"
129 : #include "replication/logical.h"
130 : #include "replication/reorderbuffer.h"
131 : #include "replication/snapbuild.h"
132 : #include "storage/block.h" /* debugging output */
133 : #include "storage/fd.h"
134 : #include "storage/lmgr.h"
135 : #include "storage/proc.h"
136 : #include "storage/procarray.h"
137 : #include "storage/standby.h"
138 : #include "utils/builtins.h"
139 : #include "utils/memutils.h"
140 : #include "utils/snapmgr.h"
141 : #include "utils/snapshot.h"
142 :
143 : /*
144 : * This struct contains the current state of the snapshot building
145 : * machinery. Besides a forward declaration in the header, it is not exposed
146 : * to the public, so we can easily change its contents.
147 : */
148 : struct SnapBuild
149 : {
150 : /* how far are we along building our first full snapshot */
151 : SnapBuildState state;
152 :
153 : /* private memory context used to allocate memory for this module. */
154 : MemoryContext context;
155 :
156 : /* all transactions < than this have committed/aborted */
157 : TransactionId xmin;
158 :
159 : /* all transactions >= than this are uncommitted */
160 : TransactionId xmax;
161 :
162 : /*
163 : * Don't replay commits from an LSN < this LSN. This can be set externally
164 : * but it will also be advanced (never retreat) from within snapbuild.c.
165 : */
166 : XLogRecPtr start_decoding_at;
167 :
168 : /*
169 : * LSN at which two-phase decoding was enabled or LSN at which we found a
170 : * consistent point at the time of slot creation.
171 : *
172 : * The prepared transactions, that were skipped because previously
173 : * two-phase was not enabled or are not covered by initial snapshot, need
174 : * to be sent later along with commit prepared and they must be before
175 : * this point.
176 : */
177 : XLogRecPtr two_phase_at;
178 :
179 : /*
180 : * Don't start decoding WAL until the "xl_running_xacts" information
181 : * indicates there are no running xids with an xid smaller than this.
182 : */
183 : TransactionId initial_xmin_horizon;
184 :
185 : /* Indicates if we are building full snapshot or just catalog one. */
186 : bool building_full_snapshot;
187 :
188 : /*
189 : * Snapshot that's valid to see the catalog state seen at this moment.
190 : */
191 : Snapshot snapshot;
192 :
193 : /*
194 : * LSN of the last location we are sure a snapshot has been serialized to.
195 : */
196 : XLogRecPtr last_serialized_snapshot;
197 :
198 : /*
199 : * The reorderbuffer we need to update with usable snapshots et al.
200 : */
201 : ReorderBuffer *reorder;
202 :
203 : /*
204 : * TransactionId at which the next phase of initial snapshot building will
205 : * happen. InvalidTransactionId if not known (i.e. SNAPBUILD_START), or
206 : * when no next phase necessary (SNAPBUILD_CONSISTENT).
207 : */
208 : TransactionId next_phase_at;
209 :
210 : /*
211 : * Array of transactions which could have catalog changes that committed
212 : * between xmin and xmax.
213 : */
214 : struct
215 : {
216 : /* number of committed transactions */
217 : size_t xcnt;
218 :
219 : /* available space for committed transactions */
220 : size_t xcnt_space;
221 :
222 : /*
223 : * Until we reach a CONSISTENT state, we record commits of all
224 : * transactions, not just the catalog changing ones. Record when that
225 : * changes so we know we cannot export a snapshot safely anymore.
226 : */
227 : bool includes_all_transactions;
228 :
229 : /*
230 : * Array of committed transactions that have modified the catalog.
231 : *
232 : * As this array is frequently modified we do *not* keep it in
233 : * xidComparator order. Instead we sort the array when building &
234 : * distributing a snapshot.
235 : *
236 : * TODO: It's unclear whether that reasoning has much merit. Every
237 : * time we add something here after becoming consistent will also
238 : * require distributing a snapshot. Storing them sorted would
239 : * potentially also make it easier to purge (but more complicated wrt
240 : * wraparound?). Should be improved if sorting while building the
241 : * snapshot shows up in profiles.
242 : */
243 : TransactionId *xip;
244 : } committed;
245 :
246 : /*
247 : * Array of transactions and subtransactions that had modified catalogs
248 : * and were running when the snapshot was serialized.
249 : *
250 : * We normally rely on some WAL record types such as HEAP2_NEW_CID to know
251 : * if the transaction has changed the catalog. But it could happen that
252 : * the logical decoding decodes only the commit record of the transaction
253 : * after restoring the previously serialized snapshot in which case we
254 : * will miss adding the xid to the snapshot and end up looking at the
255 : * catalogs with the wrong snapshot.
256 : *
257 : * Now to avoid the above problem, we serialize the transactions that had
258 : * modified the catalogs and are still running at the time of snapshot
259 : * serialization. We fill this array while restoring the snapshot and then
260 : * refer it while decoding commit to ensure if the xact has modified the
261 : * catalog. We discard this array when all the xids in the list become old
262 : * enough to matter. See SnapBuildPurgeOlderTxn for details.
263 : */
264 : struct
265 : {
266 : /* number of transactions */
267 : size_t xcnt;
268 :
269 : /* This array must be sorted in xidComparator order */
270 : TransactionId *xip;
271 : } catchange;
272 : };
273 :
274 : /*
275 : * Starting a transaction -- which we need to do while exporting a snapshot --
276 : * removes knowledge about the previously used resowner, so we save it here.
277 : */
278 : static ResourceOwner SavedResourceOwnerDuringExport = NULL;
279 : static bool ExportInProgress = false;
280 :
281 : /* ->committed and ->catchange manipulation */
282 : static void SnapBuildPurgeOlderTxn(SnapBuild *builder);
283 :
284 : /* snapshot building/manipulation/distribution functions */
285 : static Snapshot SnapBuildBuildSnapshot(SnapBuild *builder);
286 :
287 : static void SnapBuildFreeSnapshot(Snapshot snap);
288 :
289 : static void SnapBuildSnapIncRefcount(Snapshot snap);
290 :
291 : static void SnapBuildDistributeNewCatalogSnapshot(SnapBuild *builder, XLogRecPtr lsn);
292 :
293 : static inline bool SnapBuildXidHasCatalogChanges(SnapBuild *builder, TransactionId xid,
294 : uint32 xinfo);
295 :
296 : /* xlog reading helper functions for SnapBuildProcessRunningXacts */
297 : static bool SnapBuildFindSnapshot(SnapBuild *builder, XLogRecPtr lsn, xl_running_xacts *running);
298 : static void SnapBuildWaitSnapshot(xl_running_xacts *running, TransactionId cutoff);
299 :
300 : /* serialization functions */
301 : static void SnapBuildSerialize(SnapBuild *builder, XLogRecPtr lsn);
302 : static bool SnapBuildRestore(SnapBuild *builder, XLogRecPtr lsn);
303 : static void SnapBuildRestoreContents(int fd, char *dest, Size size, const char *path);
304 :
305 : /*
306 : * Allocate a new snapshot builder.
307 : *
308 : * xmin_horizon is the xid >= which we can be sure no catalog rows have been
309 : * removed, start_lsn is the LSN >= we want to replay commits.
310 : */
311 : SnapBuild *
3324 rhaas 312 GIC 821 : AllocateSnapshotBuilder(ReorderBuffer *reorder,
313 : TransactionId xmin_horizon,
2173 andres 314 ECB : XLogRecPtr start_lsn,
315 : bool need_full_snapshot,
316 : XLogRecPtr two_phase_at)
317 : {
318 : MemoryContext context;
319 : MemoryContext oldcontext;
320 : SnapBuild *builder;
321 :
322 : /* allocate memory in own context, to have better accountability */
3324 rhaas 323 GIC 821 : context = AllocSetContextCreate(CurrentMemoryContext,
324 : "snapshot builder context",
2416 tgl 325 ECB : ALLOCSET_DEFAULT_SIZES);
3324 rhaas 326 GIC 821 : oldcontext = MemoryContextSwitchTo(context);
327 :
3324 rhaas 328 CBC 821 : builder = palloc0(sizeof(SnapBuild));
329 :
330 821 : builder->state = SNAPBUILD_START;
3324 rhaas 331 GIC 821 : builder->context = context;
3324 rhaas 332 CBC 821 : builder->reorder = reorder;
3324 rhaas 333 ECB : /* Other struct members initialized by zeroing via palloc0 above */
334 :
3324 rhaas 335 GIC 821 : builder->committed.xcnt = 0;
2118 tgl 336 821 : builder->committed.xcnt_space = 128; /* arbitrary number */
3324 rhaas 337 CBC 821 : builder->committed.xip =
338 821 : palloc0(builder->committed.xcnt_space * sizeof(TransactionId));
339 821 : builder->committed.includes_all_transactions = true;
3257 rhaas 340 ECB :
241 akapila 341 GNC 821 : builder->catchange.xcnt = 0;
342 821 : builder->catchange.xip = NULL;
343 :
3324 rhaas 344 CBC 821 : builder->initial_xmin_horizon = xmin_horizon;
3230 andres 345 GIC 821 : builder->start_decoding_at = start_lsn;
2173 andres 346 CBC 821 : builder->building_full_snapshot = need_full_snapshot;
634 akapila 347 821 : builder->two_phase_at = two_phase_at;
348 :
3324 rhaas 349 821 : MemoryContextSwitchTo(oldcontext);
3324 rhaas 350 ECB :
3324 rhaas 351 CBC 821 : return builder;
352 : }
3324 rhaas 353 ECB :
354 : /*
355 : * Free a snapshot builder.
356 : */
357 : void
3324 rhaas 358 GIC 679 : FreeSnapshotBuilder(SnapBuild *builder)
359 : {
3324 rhaas 360 CBC 679 : MemoryContext context = builder->context;
361 :
3324 rhaas 362 ECB : /* free snapshot explicitly, that contains some error checking */
3324 rhaas 363 GIC 679 : if (builder->snapshot != NULL)
364 : {
3324 rhaas 365 CBC 184 : SnapBuildSnapDecRefcount(builder->snapshot);
3324 rhaas 366 GIC 184 : builder->snapshot = NULL;
3324 rhaas 367 ECB : }
368 :
369 : /* other resources are deallocated via memory context reset */
3324 rhaas 370 GIC 679 : MemoryContextDelete(context);
371 679 : }
372 :
373 : /*
374 : * Free an unreferenced snapshot that has previously been built by us.
3324 rhaas 375 ECB : */
376 : static void
3324 rhaas 377 GIC 1148 : SnapBuildFreeSnapshot(Snapshot snap)
3324 rhaas 378 ECB : {
379 : /* make sure we don't get passed an external snapshot */
1539 andres 380 GIC 1148 : Assert(snap->snapshot_type == SNAPSHOT_HISTORIC_MVCC);
3324 rhaas 381 ECB :
382 : /* make sure nobody modified our snapshot */
3324 rhaas 383 CBC 1148 : Assert(snap->curcid == FirstCommandId);
384 1148 : Assert(!snap->suboverflowed);
3324 rhaas 385 GIC 1148 : Assert(!snap->takenDuringRecovery);
2915 heikki.linnakangas 386 1148 : Assert(snap->regd_count == 0);
3324 rhaas 387 ECB :
3324 rhaas 388 EUB : /* slightly more likely, so it's checked even without c-asserts */
3324 rhaas 389 GIC 1148 : if (snap->copied)
3324 rhaas 390 LBC 0 : elog(ERROR, "cannot free a copied snapshot");
3324 rhaas 391 EUB :
3324 rhaas 392 GIC 1148 : if (snap->active_count)
3324 rhaas 393 LBC 0 : elog(ERROR, "cannot free an active snapshot");
3324 rhaas 394 ECB :
3324 rhaas 395 GIC 1148 : pfree(snap);
396 1148 : }
397 :
398 : /*
399 : * In which state of snapshot building are we?
3324 rhaas 400 ECB : */
401 : SnapBuildState
3324 rhaas 402 CBC 2409461 : SnapBuildCurrentState(SnapBuild *builder)
403 : {
3324 rhaas 404 GIC 2409461 : return builder->state;
405 : }
406 :
407 : /*
408 : * Return the LSN at which the two-phase decoding was first enabled.
769 akapila 409 ECB : */
410 : XLogRecPtr
634 akapila 411 CBC 29 : SnapBuildGetTwoPhaseAt(SnapBuild *builder)
412 : {
634 akapila 413 GIC 29 : return builder->two_phase_at;
414 : }
415 :
416 : /*
417 : * Set the LSN at which two-phase decoding is enabled.
634 akapila 418 ECB : */
419 : void
634 akapila 420 CBC 4 : SnapBuildSetTwoPhaseAt(SnapBuild *builder, XLogRecPtr ptr)
634 akapila 421 ECB : {
634 akapila 422 GIC 4 : builder->two_phase_at = ptr;
769 423 4 : }
424 :
425 : /*
426 : * Should the contents of transaction ending at 'ptr' be decoded?
3324 rhaas 427 ECB : */
428 : bool
3324 rhaas 429 CBC 497915 : SnapBuildXactNeedsSkip(SnapBuild *builder, XLogRecPtr ptr)
430 : {
3230 andres 431 GIC 497915 : return ptr < builder->start_decoding_at;
432 : }
433 :
434 : /*
435 : * Increase refcount of a snapshot.
436 : *
437 : * This is used when handing out a snapshot to some external resource or when
438 : * adding a Snapshot as builder->snapshot.
3324 rhaas 439 ECB : */
440 : static void
3324 rhaas 441 CBC 4907 : SnapBuildSnapIncRefcount(Snapshot snap)
3324 rhaas 442 ECB : {
3324 rhaas 443 GIC 4907 : snap->active_count++;
444 4907 : }
445 :
446 : /*
447 : * Decrease refcount of a snapshot and free if the refcount reaches zero.
448 : *
449 : * Externally visible, so that external resources that have been handed an
450 : * IncRef'ed Snapshot can adjust its refcount easily.
3324 rhaas 451 ECB : */
452 : void
3324 rhaas 453 GIC 4742 : SnapBuildSnapDecRefcount(Snapshot snap)
3324 rhaas 454 ECB : {
455 : /* make sure we don't get passed an external snapshot */
1539 andres 456 GIC 4742 : Assert(snap->snapshot_type == SNAPSHOT_HISTORIC_MVCC);
3324 rhaas 457 ECB :
458 : /* make sure nobody modified our snapshot */
3324 rhaas 459 CBC 4742 : Assert(snap->curcid == FirstCommandId);
3324 rhaas 460 GIC 4742 : Assert(!snap->suboverflowed);
3324 rhaas 461 CBC 4742 : Assert(!snap->takenDuringRecovery);
462 :
2915 heikki.linnakangas 463 4742 : Assert(snap->regd_count == 0);
464 :
2915 heikki.linnakangas 465 GIC 4742 : Assert(snap->active_count > 0);
3324 rhaas 466 ECB :
3286 heikki.linnakangas 467 EUB : /* slightly more likely, so it's checked even without casserts */
3324 rhaas 468 GIC 4742 : if (snap->copied)
3324 rhaas 469 LBC 0 : elog(ERROR, "cannot free a copied snapshot");
3324 rhaas 470 ECB :
3324 rhaas 471 CBC 4742 : snap->active_count--;
2915 heikki.linnakangas 472 4742 : if (snap->active_count == 0)
3324 rhaas 473 GIC 1148 : SnapBuildFreeSnapshot(snap);
474 4742 : }
475 :
476 : /*
477 : * Build a new snapshot, based on currently committed catalog-modifying
478 : * transactions.
479 : *
480 : * In-progress transactions with catalog access are *not* allowed to modify
481 : * these snapshots; they have to copy them and fill in appropriate ->curcid
482 : * and ->subxip/subxcnt values.
3324 rhaas 483 ECB : */
484 : static Snapshot
2125 andres 485 GIC 1453 : SnapBuildBuildSnapshot(SnapBuild *builder)
486 : {
487 : Snapshot snapshot;
3324 rhaas 488 ECB : Size ssize;
489 :
3324 rhaas 490 CBC 1453 : Assert(builder->state >= SNAPBUILD_FULL_SNAPSHOT);
3324 rhaas 491 ECB :
3324 rhaas 492 CBC 1453 : ssize = sizeof(SnapshotData)
3324 rhaas 493 GIC 1453 : + sizeof(TransactionId) * builder->committed.xcnt
3324 rhaas 494 CBC 1453 : + sizeof(TransactionId) * 1 /* toplevel xid */ ;
495 :
496 1453 : snapshot = MemoryContextAllocZero(builder->context, ssize);
497 :
1539 andres 498 GIC 1453 : snapshot->snapshot_type = SNAPSHOT_HISTORIC_MVCC;
499 :
500 : /*
501 : * We misuse the original meaning of SnapshotData's xip and subxip fields
502 : * to make the more fitting for our needs.
503 : *
504 : * In the 'xip' array we store transactions that have to be treated as
505 : * committed. Since we will only ever look at tuples from transactions
506 : * that have modified the catalog it's more efficient to store those few
507 : * that exist between xmin and xmax (frequently there are none).
508 : *
509 : * Snapshots that are used in transactions that have modified the catalog
510 : * also use the 'subxip' array to store their toplevel xid and all the
511 : * subtransaction xids so we can recognize when we need to treat rows as
512 : * visible that are not in xip but still need to be visible. Subxip only
513 : * gets filled when the transaction is copied into the context of a
514 : * catalog modifying transaction since we otherwise share a snapshot
515 : * between transactions. As long as a txn hasn't modified the catalog it
516 : * doesn't need to treat any uncommitted rows as visible, so there is no
517 : * need for those xids.
518 : *
3324 rhaas 519 ECB : * Both arrays are qsort'ed so that we can use bsearch() on them.
520 : */
3324 rhaas 521 GIC 1453 : Assert(TransactionIdIsNormal(builder->xmin));
3324 rhaas 522 CBC 1453 : Assert(TransactionIdIsNormal(builder->xmax));
3324 rhaas 523 ECB :
3324 rhaas 524 GIC 1453 : snapshot->xmin = builder->xmin;
525 1453 : snapshot->xmax = builder->xmax;
3324 rhaas 526 ECB :
527 : /* store all transactions to be treated as committed by this snapshot */
3324 rhaas 528 CBC 1453 : snapshot->xip =
529 1453 : (TransactionId *) ((char *) snapshot + sizeof(SnapshotData));
530 1453 : snapshot->xcnt = builder->committed.xcnt;
531 1453 : memcpy(snapshot->xip,
3324 rhaas 532 GIC 1453 : builder->committed.xip,
533 1453 : builder->committed.xcnt * sizeof(TransactionId));
3324 rhaas 534 ECB :
535 : /* sort so we can bsearch() */
3324 rhaas 536 GIC 1453 : qsort(snapshot->xip, snapshot->xcnt, sizeof(TransactionId), xidComparator);
537 :
538 : /*
539 : * Initially, subxip is empty, i.e. it's a snapshot to be used by
540 : * transactions that don't modify the catalog. Will be filled by
3324 rhaas 541 ECB : * ReorderBufferCopySnap() if necessary.
542 : */
3324 rhaas 543 GIC 1453 : snapshot->subxcnt = 0;
3324 rhaas 544 CBC 1453 : snapshot->subxip = NULL;
3324 rhaas 545 ECB :
3324 rhaas 546 CBC 1453 : snapshot->suboverflowed = false;
547 1453 : snapshot->takenDuringRecovery = false;
548 1453 : snapshot->copied = false;
549 1453 : snapshot->curcid = FirstCommandId;
550 1453 : snapshot->active_count = 0;
2915 heikki.linnakangas 551 GIC 1453 : snapshot->regd_count = 0;
965 andres 552 CBC 1453 : snapshot->snapXactCompletionCount = 0;
553 :
3324 rhaas 554 GIC 1453 : return snapshot;
555 : }
556 :
557 : /*
558 : * Build the initial slot snapshot and convert it to a normal snapshot that
559 : * is understood by HeapTupleSatisfiesMVCC.
560 : *
561 : * The snapshot will be usable directly in current transaction or exported
562 : * for loading in different transaction.
3324 rhaas 563 ECB : */
564 : Snapshot
2205 tgl 565 GIC 155 : SnapBuildInitialSnapshot(SnapBuild *builder)
566 : {
567 : Snapshot snap;
568 : TransactionId xid;
569 : TransactionId safeXid;
3324 rhaas 570 ECB : TransactionId *newxip;
3324 rhaas 571 GIC 155 : int newxcnt = 0;
3324 rhaas 572 ECB :
2205 tgl 573 GIC 155 : Assert(XactIsoLevel == XACT_REPEATABLE_READ);
139 akapila 574 GNC 155 : Assert(builder->building_full_snapshot);
575 :
576 : /* don't allow older snapshots */
577 155 : InvalidateCatalogSnapshot(); /* about to overwrite MyProc->xmin */
578 155 : if (HaveRegisteredOrActiveSnapshot())
139 akapila 579 UNC 0 : elog(ERROR, "cannot build an initial slot snapshot when snapshots exist");
139 akapila 580 GNC 155 : Assert(!HistoricSnapshotActive());
581 :
3324 rhaas 582 CBC 155 : if (builder->state != SNAPBUILD_CONSISTENT)
2208 peter_e 583 LBC 0 : elog(ERROR, "cannot build an initial slot snapshot before reaching a consistent state");
3324 rhaas 584 EUB :
3324 rhaas 585 CBC 155 : if (!builder->committed.includes_all_transactions)
2208 peter_e 586 UIC 0 : elog(ERROR, "cannot build an initial slot snapshot, not all transactions are monitored anymore");
3324 rhaas 587 ECB :
3324 rhaas 588 EUB : /* so we don't overwrite the existing value */
969 andres 589 GIC 155 : if (TransactionIdIsValid(MyProc->xmin))
969 andres 590 LBC 0 : elog(ERROR, "cannot build an initial slot snapshot when MyProc->xmin already is valid");
3324 rhaas 591 EUB :
2125 andres 592 GIC 155 : snap = SnapBuildBuildSnapshot(builder);
593 :
3324 rhaas 594 ECB : /*
3324 rhaas 595 EUB : * We know that snap->xmin is alive, enforced by the logical xmin
596 : * mechanism. Due to that we can do this without locks, we're only
3324 rhaas 597 ECB : * changing our own value.
598 : *
599 : * Building an initial snapshot is expensive and an unenforced xmin
600 : * horizon would have bad consequences, therefore always double-check that
601 : * the horizon is enforced.
602 : */
139 akapila 603 GNC 155 : LWLockAcquire(ProcArrayLock, LW_SHARED);
604 155 : safeXid = GetOldestSafeDecodingTransactionId(false);
605 155 : LWLockRelease(ProcArrayLock);
606 :
607 155 : if (TransactionIdFollows(safeXid, snap->xmin))
139 akapila 608 UNC 0 : elog(ERROR, "cannot build an initial slot snapshot as oldest safe xid %u follows snapshot's xmin %u",
609 : safeXid, snap->xmin);
2177 andres 610 ECB :
969 andres 611 GIC 155 : MyProc->xmin = snap->xmin;
3324 rhaas 612 ECB :
3324 rhaas 613 EUB : /* allocate in transaction context */
614 : newxip = (TransactionId *)
3324 rhaas 615 GIC 155 : palloc(sizeof(TransactionId) * GetMaxSnapshotXidCount());
3324 rhaas 616 ECB :
617 : /*
618 : * snapbuild.c builds transactions in an "inverted" manner, which means it
619 : * stores committed transactions in ->xip, not ones in progress. Build a
620 : * classical snapshot by marking all non-committed transactions as
621 : * in-progress. This can be expensive.
622 : */
3324 rhaas 623 GIC 155 : for (xid = snap->xmin; NormalTransactionIdPrecedes(xid, snap->xmax);)
624 : {
625 : void *test;
626 :
627 : /*
3324 rhaas 628 ECB : * Check whether transaction committed using the decoding snapshot
629 : * meaning of ->xip.
630 : */
3324 rhaas 631 UIC 0 : test = bsearch(&xid, snap->xip, snap->xcnt,
632 : sizeof(TransactionId), xidComparator);
633 :
634 0 : if (test == NULL)
635 : {
3324 rhaas 636 UBC 0 : if (newxcnt >= GetMaxSnapshotXidCount())
2208 peter_e 637 UIC 0 : ereport(ERROR,
638 : (errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),
2153 bruce 639 EUB : errmsg("initial slot snapshot too large")));
640 :
3324 rhaas 641 UBC 0 : newxip[newxcnt++] = xid;
3324 rhaas 642 EUB : }
643 :
3324 rhaas 644 UIC 0 : TransactionIdAdvance(xid);
645 : }
3324 rhaas 646 EUB :
647 : /* adjust remaining snapshot fields as needed */
1509 michael 648 GIC 155 : snap->snapshot_type = SNAPSHOT_MVCC;
3324 rhaas 649 GBC 155 : snap->xcnt = newxcnt;
3324 rhaas 650 GIC 155 : snap->xip = newxip;
651 :
2208 peter_e 652 155 : return snap;
2208 peter_e 653 ECB : }
654 :
655 : /*
656 : * Export a snapshot so it can be set in another session with SET TRANSACTION
657 : * SNAPSHOT.
658 : *
659 : * For that we need to start a transaction in the current backend as the
660 : * importing side checks whether the source transaction is still open to make
661 : * sure the xmin horizon hasn't advanced since then.
662 : */
663 : const char *
2208 peter_e 664 UIC 0 : SnapBuildExportSnapshot(SnapBuild *builder)
665 : {
666 : Snapshot snap;
667 : char *snapname;
668 :
2208 peter_e 669 UBC 0 : if (IsTransactionOrTransactionBlock())
2208 peter_e 670 UIC 0 : elog(ERROR, "cannot export a snapshot from within a transaction");
671 :
672 0 : if (SavedResourceOwnerDuringExport)
673 0 : elog(ERROR, "can only export one snapshot at a time");
2208 peter_e 674 EUB :
2208 peter_e 675 UBC 0 : SavedResourceOwnerDuringExport = CurrentResourceOwner;
2208 peter_e 676 UIC 0 : ExportInProgress = true;
2208 peter_e 677 EUB :
2208 peter_e 678 UBC 0 : StartTransactionCommand();
679 :
2208 peter_e 680 EUB : /* There doesn't seem to a nice API to set these */
2208 peter_e 681 UBC 0 : XactIsoLevel = XACT_REPEATABLE_READ;
2208 peter_e 682 UIC 0 : XactReadOnly = true;
2208 peter_e 683 EUB :
2205 tgl 684 UIC 0 : snap = SnapBuildInitialSnapshot(builder);
685 :
3324 rhaas 686 EUB : /*
2208 peter_e 687 : * now that we've built a plain snapshot, make it active and use the
688 : * normal mechanisms for exporting it
3324 rhaas 689 : */
3324 rhaas 690 UIC 0 : snapname = ExportSnapshot(snap);
691 :
692 0 : ereport(LOG,
693 : (errmsg_plural("exported logical decoding snapshot: \"%s\" with %u transaction ID",
694 : "exported logical decoding snapshot: \"%s\" with %u transaction IDs",
3071 peter_e 695 EUB : snap->xcnt,
696 : snapname, snap->xcnt)));
3324 rhaas 697 UBC 0 : return snapname;
698 : }
699 :
700 : /*
701 : * Ensure there is a snapshot and if not build one for current transaction.
2559 simon 702 EUB : */
703 : Snapshot
195 akapila 704 GNC 6 : SnapBuildGetOrBuildSnapshot(SnapBuild *builder)
705 : {
2559 simon 706 GIC 6 : Assert(builder->state == SNAPBUILD_CONSISTENT);
707 :
708 : /* only build a new snapshot if we don't have a prebuilt one */
2559 simon 709 CBC 6 : if (builder->snapshot == NULL)
710 : {
2125 andres 711 LBC 0 : builder->snapshot = SnapBuildBuildSnapshot(builder);
712 : /* increase refcount for the snapshot builder */
2559 simon 713 UIC 0 : SnapBuildSnapIncRefcount(builder->snapshot);
2559 simon 714 ECB : }
715 :
2559 simon 716 GBC 6 : return builder->snapshot;
717 : }
2559 simon 718 EUB :
719 : /*
720 : * Reset a previously SnapBuildExportSnapshot()'ed snapshot if there is
3324 rhaas 721 ECB : * any. Aborts the previously started transaction and resets the resource
722 : * owner back to its original value.
723 : */
724 : void
2794 andres 725 GIC 3912 : SnapBuildClearExportedSnapshot(void)
726 : {
727 : ResourceOwner tmpResOwner;
728 :
729 : /* nothing exported, that is the usual case */
3324 rhaas 730 CBC 3912 : if (!ExportInProgress)
3324 rhaas 731 GIC 3912 : return;
732 :
3324 rhaas 733 UIC 0 : if (!IsTransactionState())
734 0 : elog(ERROR, "clearing exported snapshot in wrong transaction state");
3324 rhaas 735 ECB :
538 michael 736 : /*
737 : * AbortCurrentTransaction() takes care of resetting the snapshot state,
538 michael 738 EUB : * so remember SavedResourceOwnerDuringExport.
739 : */
538 michael 740 UIC 0 : tmpResOwner = SavedResourceOwnerDuringExport;
741 :
742 : /* make sure nothing could have ever happened */
3324 rhaas 743 0 : AbortCurrentTransaction();
744 :
538 michael 745 UBC 0 : CurrentResourceOwner = tmpResOwner;
746 : }
747 :
538 michael 748 EUB : /*
749 : * Clear snapshot export state during transaction abort.
750 : */
751 : void
538 michael 752 GIC 20125 : SnapBuildResetExportedSnapshotState(void)
753 : {
3324 rhaas 754 20125 : SavedResourceOwnerDuringExport = NULL;
755 20125 : ExportInProgress = false;
756 20125 : }
3324 rhaas 757 ECB :
758 : /*
759 : * Handle the effects of a single heap change, appropriate to the current state
760 : * of the snapshot builder and returns whether changes made at (xid, lsn) can
761 : * be decoded.
762 : */
763 : bool
3324 rhaas 764 GIC 1694388 : SnapBuildProcessChange(SnapBuild *builder, TransactionId xid, XLogRecPtr lsn)
765 : {
766 : /*
767 : * We can't handle data in transactions if we haven't built a snapshot
768 : * yet, so don't store them.
3324 rhaas 769 ECB : */
3324 rhaas 770 GIC 1694388 : if (builder->state < SNAPBUILD_FULL_SNAPSHOT)
3324 rhaas 771 UIC 0 : return false;
772 :
773 : /*
774 : * No point in keeping track of changes in transactions that we don't have
3324 rhaas 775 ECB : * enough information about to decode. This means that they started before
3324 rhaas 776 EUB : * we got into the SNAPBUILD_FULL_SNAPSHOT state.
777 : */
3324 rhaas 778 GIC 1694399 : if (builder->state < SNAPBUILD_CONSISTENT &&
783 andres 779 11 : TransactionIdPrecedes(xid, builder->next_phase_at))
3324 rhaas 780 4 : return false;
781 :
782 : /*
3324 rhaas 783 ECB : * If the reorderbuffer doesn't yet have a snapshot, add one now, it will
784 : * be needed to decode the change we're currently processing.
785 : */
2591 andres 786 GIC 1694384 : if (!ReorderBufferXidHasBaseSnapshot(builder->reorder, xid))
787 : {
788 : /* only build a new snapshot if we don't have a prebuilt one */
3324 rhaas 789 2598 : if (builder->snapshot == NULL)
790 : {
2125 andres 791 CBC 297 : builder->snapshot = SnapBuildBuildSnapshot(builder);
792 : /* increase refcount for the snapshot builder */
3324 rhaas 793 GIC 297 : SnapBuildSnapIncRefcount(builder->snapshot);
3324 rhaas 794 ECB : }
795 :
796 : /*
797 : * Increase refcount for the transaction we're handing the snapshot
798 : * out to.
799 : */
3324 rhaas 800 GIC 2598 : SnapBuildSnapIncRefcount(builder->snapshot);
801 2598 : ReorderBufferSetBaseSnapshot(builder->reorder, xid, lsn,
802 : builder->snapshot);
803 : }
804 :
3324 rhaas 805 CBC 1694384 : return true;
3324 rhaas 806 ECB : }
807 :
808 : /*
809 : * Do CommandId/combo CID handling after reading an xl_heap_new_cid record.
2881 heikki.linnakangas 810 : * This implies that a transaction has done some form of write to system
811 : * catalogs.
812 : */
813 : void
3324 rhaas 814 GIC 22808 : SnapBuildProcessNewCid(SnapBuild *builder, TransactionId xid,
815 : XLogRecPtr lsn, xl_heap_new_cid *xlrec)
816 : {
817 : CommandId cid;
818 :
3324 rhaas 819 ECB : /*
820 : * we only log new_cid's if a catalog tuple was modified, so mark the
821 : * transaction as containing catalog modifications
822 : */
3260 bruce 823 GIC 22808 : ReorderBufferXidSetCatalogChanges(builder->reorder, xid, lsn);
824 :
3324 rhaas 825 22808 : ReorderBufferAddNewTupleCids(builder->reorder, xlrec->top_xid, lsn,
826 : xlrec->target_locator, xlrec->target_tid,
827 : xlrec->cmin, xlrec->cmax,
3324 rhaas 828 ECB : xlrec->combocid);
829 :
830 : /* figure out new command id */
3324 rhaas 831 GIC 22808 : if (xlrec->cmin != InvalidCommandId &&
832 19055 : xlrec->cmax != InvalidCommandId)
833 3030 : cid = Max(xlrec->cmin, xlrec->cmax);
834 19778 : else if (xlrec->cmax != InvalidCommandId)
835 3753 : cid = xlrec->cmax;
3324 rhaas 836 CBC 16025 : else if (xlrec->cmin != InvalidCommandId)
837 16025 : cid = xlrec->cmin;
3324 rhaas 838 ECB : else
839 : {
3260 bruce 840 LBC 0 : cid = InvalidCommandId; /* silence compiler */
3324 rhaas 841 0 : elog(ERROR, "xl_heap_new_cid record without a valid CommandId");
3324 rhaas 842 ECB : }
843 :
3324 rhaas 844 GIC 22808 : ReorderBufferAddNewCommandId(builder->reorder, xid, lsn, cid + 1);
3324 rhaas 845 GBC 22808 : }
3324 rhaas 846 EUB :
847 : /*
848 : * Add a new Snapshot to all transactions we're decoding that currently are
3324 rhaas 849 ECB : * in-progress so they can see new catalog contents made by the transaction
850 : * that just committed. This is necessary because those in-progress
851 : * transactions will use the new catalog's contents from here on (at the very
852 : * least everything they do needs to be compatible with newer catalog
853 : * contents).
854 : */
855 : static void
3324 rhaas 856 GIC 995 : SnapBuildDistributeNewCatalogSnapshot(SnapBuild *builder, XLogRecPtr lsn)
857 : {
858 : dlist_iter txn_i;
859 : ReorderBufferTXN *txn;
860 :
3324 rhaas 861 ECB : /*
862 : * Iterate through all toplevel transactions. This can include
863 : * subtransactions which we just don't yet know to be that, but that's
864 : * fine, they will just get an unnecessary snapshot queued.
865 : */
3324 rhaas 866 GIC 2031 : dlist_foreach(txn_i, &builder->reorder->toplevel_by_lsn)
867 : {
868 1036 : txn = dlist_container(ReorderBufferTXN, node, txn_i.cur);
869 :
870 1036 : Assert(TransactionIdIsValid(txn->xid));
3324 rhaas 871 ECB :
872 : /*
873 : * If we don't have a base snapshot yet, there are no changes in this
874 : * transaction which in turn implies we don't yet need a snapshot at
3310 fujii 875 : * all. We'll add a snapshot when the first change gets queued.
876 : *
877 : * NB: This works correctly even for subtransactions because
878 : * ReorderBufferAssignChild() takes care to transfer the base snapshot
879 : * to the top-level transaction, and while iterating the changequeue
880 : * we'll get the change from the subtxn.
881 : */
3324 rhaas 882 GIC 1036 : if (!ReorderBufferXidHasBaseSnapshot(builder->reorder, txn->xid))
883 12 : continue;
884 :
885 : /*
886 : * We don't need to add snapshot to prepared transactions as they
825 akapila 887 ECB : * should not see the new catalog contents.
888 : */
825 akapila 889 GIC 1024 : if (rbtxn_prepared(txn) || rbtxn_skip_prepared(txn))
890 22 : continue;
891 :
3324 rhaas 892 1002 : elog(DEBUG2, "adding a new snapshot to %u at %X/%X",
893 : txn->xid, LSN_FORMAT_ARGS(lsn));
3324 rhaas 894 ECB :
895 : /*
896 : * increase the snapshot's refcount for the transaction we are handing
897 : * it out to
898 : */
3324 rhaas 899 GIC 1002 : SnapBuildSnapIncRefcount(builder->snapshot);
900 1002 : ReorderBufferAddSnapshot(builder->reorder, txn->xid, lsn,
901 : builder->snapshot);
902 : }
903 995 : }
3324 rhaas 904 ECB :
905 : /*
906 : * Keep track of a new catalog changing transaction that has committed.
907 : */
908 : static void
3324 rhaas 909 GIC 1004 : SnapBuildAddCommittedTxn(SnapBuild *builder, TransactionId xid)
910 : {
911 1004 : Assert(TransactionIdIsValid(xid));
912 :
913 1004 : if (builder->committed.xcnt == builder->committed.xcnt_space)
3324 rhaas 914 ECB : {
3324 rhaas 915 UIC 0 : builder->committed.xcnt_space = builder->committed.xcnt_space * 2 + 1;
3324 rhaas 916 ECB :
3324 rhaas 917 UIC 0 : elog(DEBUG1, "increasing space for committed transactions to %u",
3324 rhaas 918 ECB : (uint32) builder->committed.xcnt_space);
919 :
3324 rhaas 920 UBC 0 : builder->committed.xip = repalloc(builder->committed.xip,
2118 tgl 921 UIC 0 : builder->committed.xcnt_space * sizeof(TransactionId));
3324 rhaas 922 EUB : }
923 :
924 : /*
925 : * TODO: It might make sense to keep the array sorted here instead of
926 : * doing it every time we build a new snapshot. On the other hand this
927 : * gets called repeatedly when a transaction with subtransactions commits.
928 : */
3324 rhaas 929 GIC 1004 : builder->committed.xip[builder->committed.xcnt++] = xid;
930 1004 : }
931 :
932 : /*
933 : * Remove knowledge about transactions we treat as committed or containing catalog
934 : * changes that are smaller than ->xmin. Those won't ever get checked via
935 : * the ->committed or ->catchange array, respectively. The committed xids will
936 : * get checked via the clog machinery.
937 : *
938 : * We can ideally remove the transaction from catchange array once it is
939 : * finished (committed/aborted) but that could be costly as we need to maintain
940 : * the xids order in the array.
941 : */
942 : static void
241 akapila 943 274 : SnapBuildPurgeOlderTxn(SnapBuild *builder)
944 : {
945 : int off;
946 : TransactionId *workspace;
3324 rhaas 947 274 : int surviving_xids = 0;
3324 rhaas 948 ECB :
949 : /* not ready yet */
3324 rhaas 950 GIC 274 : if (!TransactionIdIsNormal(builder->xmin))
3324 rhaas 951 UIC 0 : return;
3324 rhaas 952 ECB :
953 : /* TODO: Neater algorithm than just copying and iterating? */
954 : workspace =
3324 rhaas 955 CBC 274 : MemoryContextAlloc(builder->context,
3324 rhaas 956 GBC 274 : builder->committed.xcnt * sizeof(TransactionId));
957 :
958 : /* copy xids that still are interesting to workspace */
3324 rhaas 959 GIC 481 : for (off = 0; off < builder->committed.xcnt; off++)
3324 rhaas 960 ECB : {
3324 rhaas 961 CBC 207 : if (NormalTransactionIdPrecedes(builder->committed.xip[off],
962 : builder->xmin))
963 : ; /* remove */
3324 rhaas 964 ECB : else
3324 rhaas 965 UIC 0 : workspace[surviving_xids++] = builder->committed.xip[off];
3324 rhaas 966 ECB : }
967 :
968 : /* copy workspace back to persistent state */
3324 rhaas 969 GIC 274 : memcpy(builder->committed.xip, workspace,
3324 rhaas 970 EUB : surviving_xids * sizeof(TransactionId));
971 :
3324 rhaas 972 GIC 274 : elog(DEBUG3, "purged committed transactions from %u to %u, xmin: %u, xmax: %u",
973 : (uint32) builder->committed.xcnt, (uint32) surviving_xids,
3324 rhaas 974 ECB : builder->xmin, builder->xmax);
3324 rhaas 975 GIC 274 : builder->committed.xcnt = surviving_xids;
976 :
3324 rhaas 977 CBC 274 : pfree(workspace);
978 :
241 akapila 979 ECB : /*
980 : * Purge xids in ->catchange as well. The purged array must also be sorted
981 : * in xidComparator order.
982 : */
223 akapila 983 GNC 274 : if (builder->catchange.xcnt > 0)
984 : {
985 : /*
986 : * Since catchange.xip is sorted, we find the lower bound of xids that
987 : * are still interesting.
988 : */
989 7 : for (off = 0; off < builder->catchange.xcnt; off++)
990 : {
991 5 : if (TransactionIdFollowsOrEquals(builder->catchange.xip[off],
992 : builder->xmin))
993 1 : break;
994 : }
995 :
996 3 : surviving_xids = builder->catchange.xcnt - off;
997 :
998 3 : if (surviving_xids > 0)
999 : {
1000 1 : memmove(builder->catchange.xip, &(builder->catchange.xip[off]),
1001 : surviving_xids * sizeof(TransactionId));
1002 : }
223 akapila 1003 ECB : else
1004 : {
223 akapila 1005 GNC 2 : pfree(builder->catchange.xip);
1006 2 : builder->catchange.xip = NULL;
1007 : }
1008 :
1009 3 : elog(DEBUG3, "purged catalog modifying transactions from %u to %u, xmin: %u, xmax: %u",
1010 : (uint32) builder->catchange.xcnt, (uint32) surviving_xids,
1011 : builder->xmin, builder->xmax);
1012 3 : builder->catchange.xcnt = surviving_xids;
223 akapila 1013 ECB : }
1014 : }
1015 :
1016 : /*
1017 : * Handle everything that needs to be done when a transaction commits
3324 rhaas 1018 : */
1019 : void
3324 rhaas 1020 GIC 2542 : SnapBuildCommitTxn(SnapBuild *builder, XLogRecPtr lsn, TransactionId xid,
1021 : int nsubxacts, TransactionId *subxacts, uint32 xinfo)
1022 : {
3324 rhaas 1023 ECB : int nxact;
1024 :
2157 andres 1025 CBC 2542 : bool needs_snapshot = false;
2157 andres 1026 GIC 2542 : bool needs_timetravel = false;
3324 rhaas 1027 CBC 2542 : bool sub_needs_timetravel = false;
1028 :
3324 rhaas 1029 GIC 2542 : TransactionId xmax = xid;
1030 :
1031 : /*
1032 : * Transactions preceding BUILDING_SNAPSHOT will neither be decoded, nor
2157 andres 1033 ECB : * will they be part of a snapshot. So we don't need to record anything.
3324 rhaas 1034 : */
2157 andres 1035 GBC 2542 : if (builder->state == SNAPBUILD_START ||
2157 andres 1036 GIC 2542 : (builder->state == SNAPBUILD_BUILDING_SNAPSHOT &&
783 andres 1037 UIC 0 : TransactionIdPrecedes(xid, builder->next_phase_at)))
2157 andres 1038 EUB : {
1039 : /* ensure that only commits after this are getting replayed */
2157 andres 1040 UBC 0 : if (builder->start_decoding_at <= lsn)
2157 andres 1041 UIC 0 : builder->start_decoding_at = lsn + 1;
1042 0 : return;
2157 andres 1043 ECB : }
1044 :
3324 rhaas 1045 GIC 2542 : if (builder->state < SNAPBUILD_CONSISTENT)
3324 rhaas 1046 ECB : {
1047 : /* ensure that only commits after this are getting replayed */
3230 andres 1048 GIC 11 : if (builder->start_decoding_at <= lsn)
1049 5 : builder->start_decoding_at = lsn + 1;
1050 :
1051 : /*
1052 : * If building an exportable snapshot, force xid to be tracked, even
2157 andres 1053 ECB : * if the transaction didn't modify the catalog.
1054 : */
2157 andres 1055 GBC 11 : if (builder->building_full_snapshot)
1056 : {
2157 andres 1057 UIC 0 : needs_timetravel = true;
1058 : }
3324 rhaas 1059 ECB : }
1060 :
3324 rhaas 1061 CBC 3790 : for (nxact = 0; nxact < nsubxacts; nxact++)
1062 : {
3324 rhaas 1063 GIC 1248 : TransactionId subxid = subxacts[nxact];
1064 :
1065 : /*
1066 : * Add subtransaction to base snapshot if catalog modifying, we don't
2157 andres 1067 ECB : * distinguish to toplevel transactions there.
1068 : */
241 akapila 1069 GNC 1248 : if (SnapBuildXidHasCatalogChanges(builder, subxid, xinfo))
3324 rhaas 1070 ECB : {
2157 andres 1071 GIC 9 : sub_needs_timetravel = true;
2157 andres 1072 CBC 9 : needs_snapshot = true;
1073 :
2157 andres 1074 GIC 9 : elog(DEBUG1, "found subtransaction %u:%u with catalog changes",
2157 andres 1075 ECB : xid, subxid);
1076 :
3324 rhaas 1077 CBC 9 : SnapBuildAddCommittedTxn(builder, subxid);
2157 andres 1078 ECB :
3324 rhaas 1079 GIC 9 : if (NormalTransactionIdFollows(subxid, xmax))
1080 9 : xmax = subxid;
1081 : }
1082 :
1083 : /*
1084 : * If we're forcing timetravel we also need visibility information
1085 : * about subtransaction, so keep track of subtransaction's state, even
1086 : * if not catalog modifying. Don't need to distribute a snapshot in
2157 andres 1087 ECB : * that case.
1088 : */
2157 andres 1089 GBC 1239 : else if (needs_timetravel)
3324 rhaas 1090 EUB : {
3324 rhaas 1091 UBC 0 : SnapBuildAddCommittedTxn(builder, subxid);
3324 rhaas 1092 UIC 0 : if (NormalTransactionIdFollows(subxid, xmax))
1093 0 : xmax = subxid;
1094 : }
1095 : }
3324 rhaas 1096 ECB :
1097 : /* if top-level modified catalog, it'll need a snapshot */
241 akapila 1098 GNC 2542 : if (SnapBuildXidHasCatalogChanges(builder, xid, xinfo))
1099 : {
2157 andres 1100 CBC 994 : elog(DEBUG2, "found top level transaction %u, with catalog changes",
2157 andres 1101 ECB : xid);
2157 andres 1102 CBC 994 : needs_snapshot = true;
2157 andres 1103 GIC 994 : needs_timetravel = true;
3324 rhaas 1104 CBC 994 : SnapBuildAddCommittedTxn(builder, xid);
1105 : }
2157 andres 1106 GIC 1548 : else if (sub_needs_timetravel)
3324 rhaas 1107 ECB : {
1108 : /* track toplevel txn as well, subxact alone isn't meaningful */
171 akapila 1109 CBC 1 : elog(DEBUG2, "forced transaction %u to do timetravel due to one of its subtransactions",
171 akapila 1110 ECB : xid);
171 akapila 1111 GIC 1 : needs_timetravel = true;
3324 rhaas 1112 CBC 1 : SnapBuildAddCommittedTxn(builder, xid);
1113 : }
2157 andres 1114 GBC 1547 : else if (needs_timetravel)
1115 : {
2157 andres 1116 UBC 0 : elog(DEBUG2, "forced transaction %u to do timetravel", xid);
1117 :
3324 rhaas 1118 UIC 0 : SnapBuildAddCommittedTxn(builder, xid);
3324 rhaas 1119 ECB : }
1120 :
2157 andres 1121 GIC 2542 : if (!needs_timetravel)
3324 rhaas 1122 ECB : {
1123 : /* record that we cannot export a general snapshot anymore */
2157 andres 1124 GIC 1547 : builder->committed.includes_all_transactions = false;
2157 andres 1125 ECB : }
1126 :
2157 andres 1127 GIC 2542 : Assert(!needs_snapshot || needs_timetravel);
1128 :
1129 : /*
1130 : * Adjust xmax of the snapshot builder, we only do that for committed,
1131 : * catalog modifying, transactions, everything else isn't interesting for
2153 bruce 1132 ECB : * us since we'll never look at the respective rows.
2157 andres 1133 : */
2157 andres 1134 CBC 2542 : if (needs_timetravel &&
2157 andres 1135 GIC 1990 : (!TransactionIdIsValid(builder->xmax) ||
2157 andres 1136 CBC 995 : TransactionIdFollowsOrEquals(xmax, builder->xmax)))
2157 andres 1137 ECB : {
2157 andres 1138 GIC 991 : builder->xmax = xmax;
1139 991 : TransactionIdAdvance(builder->xmax);
1140 : }
2157 andres 1141 ECB :
1142 : /* if there's any reason to build a historic snapshot, do so now */
2157 andres 1143 GIC 2542 : if (needs_snapshot)
1144 : {
1145 : /*
1146 : * If we haven't built a complete snapshot yet there's no need to hand
3324 rhaas 1147 ECB : * it out, it wouldn't (and couldn't) be used anyway.
3324 rhaas 1148 EUB : */
3324 rhaas 1149 GIC 995 : if (builder->state < SNAPBUILD_FULL_SNAPSHOT)
3324 rhaas 1150 UIC 0 : return;
1151 :
1152 : /*
1153 : * Decrease the snapshot builder's refcount of the old snapshot, note
1154 : * that it still will be used if it has been handed out to the
3324 rhaas 1155 ECB : * reorderbuffer earlier.
1156 : */
3324 rhaas 1157 GIC 995 : if (builder->snapshot)
3324 rhaas 1158 CBC 994 : SnapBuildSnapDecRefcount(builder->snapshot);
1159 :
2125 andres 1160 GIC 995 : builder->snapshot = SnapBuildBuildSnapshot(builder);
3324 rhaas 1161 ECB :
1162 : /* we might need to execute invalidations, add snapshot */
3324 rhaas 1163 CBC 995 : if (!ReorderBufferXidHasBaseSnapshot(builder->reorder, xid))
3324 rhaas 1164 ECB : {
3324 rhaas 1165 GIC 9 : SnapBuildSnapIncRefcount(builder->snapshot);
1166 9 : ReorderBufferSetBaseSnapshot(builder->reorder, xid, lsn,
1167 : builder->snapshot);
1168 : }
3324 rhaas 1169 ECB :
1170 : /* refcount of the snapshot builder for the new snapshot */
3324 rhaas 1171 GIC 995 : SnapBuildSnapIncRefcount(builder->snapshot);
3324 rhaas 1172 ECB :
1173 : /* add a new catalog snapshot to all currently running transactions */
3324 rhaas 1174 GIC 995 : SnapBuildDistributeNewCatalogSnapshot(builder, lsn);
1175 : }
1176 : }
1177 :
1178 : /*
1179 : * Check the reorder buffer and the snapshot to see if the given transaction has
1180 : * modified catalogs.
1181 : */
1182 : static inline bool
241 akapila 1183 GNC 3790 : SnapBuildXidHasCatalogChanges(SnapBuild *builder, TransactionId xid,
1184 : uint32 xinfo)
1185 : {
1186 3790 : if (ReorderBufferXidHasCatalogChanges(builder->reorder, xid))
1187 999 : return true;
1188 :
1189 : /*
1190 : * The transactions that have changed catalogs must have invalidation
1191 : * info.
1192 : */
1193 2791 : if (!(xinfo & XACT_XINFO_HAS_INVALS))
1194 2783 : return false;
1195 :
1196 : /* Check the catchange XID array */
1197 12 : return ((builder->catchange.xcnt > 0) &&
1198 4 : (bsearch(&xid, builder->catchange.xip, builder->catchange.xcnt,
1199 : sizeof(TransactionId), xidComparator) != NULL));
1200 : }
1201 :
1202 : /* -----------------------------------
1203 : * Snapshot building functions dealing with xlog records
3324 rhaas 1204 ECB : * -----------------------------------
1205 : */
1206 :
1207 : /*
3287 heikki.linnakangas 1208 : * Process a running xacts record, and use its information to first build a
1209 : * historic snapshot and later to release resources that aren't needed
1210 : * anymore.
1211 : */
1212 : void
3324 rhaas 1213 GIC 1026 : SnapBuildProcessRunningXacts(SnapBuild *builder, XLogRecPtr lsn, xl_running_xacts *running)
3324 rhaas 1214 ECB : {
1215 : ReorderBufferTXN *txn;
1216 : TransactionId xmin;
1217 :
1218 : /*
1219 : * If we're not consistent yet, inspect the record to see whether it
1220 : * allows to get closer to being consistent. If we are consistent, dump
1221 : * our snapshot so others or we, after a restart, can use it.
1222 : */
3324 rhaas 1223 GIC 1026 : if (builder->state < SNAPBUILD_CONSISTENT)
1224 : {
1225 : /* returns false if there's no point in performing cleanup just yet */
1226 776 : if (!SnapBuildFindSnapshot(builder, lsn, running))
1227 750 : return;
1228 : }
1229 : else
1230 250 : SnapBuildSerialize(builder, lsn);
1231 :
1232 : /*
1233 : * Update range of interesting xids based on the running xacts
3324 rhaas 1234 ECB : * information. We don't increase ->xmax using it, because once we are in
1235 : * a consistent state we can do that ourselves and much more efficiently
1236 : * so, because we only need to do it for catalog transactions since we
1237 : * only ever look at those.
1238 : *
1239 : * NB: We only increase xmax when a catalog modifying transaction commits
1240 : * (see SnapBuildCommitTxn). Because of this, xmax can be lower than
1241 : * xmin, which looks odd but is correct and actually more efficient, since
1242 : * we hit fast paths in heapam_visibility.c.
1243 : */
3324 rhaas 1244 CBC 274 : builder->xmin = running->oldestRunningXid;
1245 :
1246 : /* Remove transactions we don't need to keep track off anymore */
241 akapila 1247 274 : SnapBuildPurgeOlderTxn(builder);
3324 rhaas 1248 ECB :
1249 : /*
1250 : * Advance the xmin limit for the current replication slot, to allow
1748 alvherre 1251 : * vacuum to clean up the tuples this slot has been protecting.
1252 : *
1253 : * The reorderbuffer might have an xmin among the currently running
1254 : * snapshots; use it if so. If not, we need only consider the snapshots
1255 : * we'll produce later, which can't be less than the oldest running xid in
1256 : * the record we're reading now.
1257 : */
1748 alvherre 1258 GIC 274 : xmin = ReorderBufferGetOldestXmin(builder->reorder);
1259 274 : if (xmin == InvalidTransactionId)
1260 242 : xmin = running->oldestRunningXid;
1261 274 : elog(DEBUG3, "xmin: %u, xmax: %u, oldest running: %u, oldest xmin: %u",
1262 : builder->xmin, builder->xmax, running->oldestRunningXid, xmin);
1263 274 : LogicalIncreaseXminForSlot(lsn, xmin);
1264 :
3324 rhaas 1265 ECB : /*
1266 : * Also tell the slot where we can restart decoding from. We don't want to
1267 : * do that after every commit because changing that implies an fsync of
1268 : * the logical slot's state file, so we only do it every time we see a
1269 : * running xacts record.
1270 : *
1271 : * Do so by looking for the oldest in progress transaction (determined by
1272 : * the first LSN of any of its relevant records). Every transaction
1273 : * remembers the last location we stored the snapshot to disk before its
1274 : * beginning. That point is where we can restart from.
1275 : */
1276 :
1277 : /*
1278 : * Can't know about a serialized snapshot's location if we're not
1279 : * consistent.
1280 : */
3324 rhaas 1281 CBC 274 : if (builder->state < SNAPBUILD_CONSISTENT)
1282 19 : return;
1283 :
1284 255 : txn = ReorderBufferGetOldestTXN(builder->reorder);
1285 :
1286 : /*
1287 : * oldest ongoing txn might have started when we didn't yet serialize
1288 : * anything because we hadn't reached a consistent state yet.
1289 : */
3324 rhaas 1290 GIC 255 : if (txn != NULL && txn->restart_decoding_lsn != InvalidXLogRecPtr)
1291 13 : LogicalIncreaseRestartDecodingForSlot(lsn, txn->restart_decoding_lsn);
1292 :
1293 : /*
1294 : * No in-progress transaction, can reuse the last serialized snapshot if
1295 : * we have one.
1296 : */
1297 242 : else if (txn == NULL &&
2118 tgl 1298 223 : builder->reorder->current_restart_decoding_lsn != InvalidXLogRecPtr &&
3324 rhaas 1299 221 : builder->last_serialized_snapshot != InvalidXLogRecPtr)
1300 221 : LogicalIncreaseRestartDecodingForSlot(lsn,
1301 : builder->last_serialized_snapshot);
3324 rhaas 1302 ECB : }
1303 :
1304 :
1305 : /*
1306 : * Build the start of a snapshot that's capable of decoding the catalog.
1307 : *
1308 : * Helper function for SnapBuildProcessRunningXacts() while we're not yet
1309 : * consistent.
1310 : *
1311 : * Returns true if there is a point in performing internal maintenance/cleanup
1312 : * using the xl_running_xacts record.
1313 : */
1314 : static bool
3324 rhaas 1315 GIC 776 : SnapBuildFindSnapshot(SnapBuild *builder, XLogRecPtr lsn, xl_running_xacts *running)
1316 : {
1317 : /* ---
3324 rhaas 1318 ECB : * Build catalog decoding snapshot incrementally using information about
1319 : * the currently running transactions. There are several ways to do that:
1320 : *
1321 : * a) There were no running transactions when the xl_running_xacts record
1322 : * was inserted, jump to CONSISTENT immediately. We might find such a
1323 : * state while waiting on c)'s sub-states.
1324 : *
1325 : * b) This (in a previous run) or another decoding slot serialized a
1326 : * snapshot to disk that we can use. Can't use this method for the
1327 : * initial snapshot when slot is being created and needs full snapshot
1328 : * for export or direct use, as that snapshot will only contain catalog
1329 : * modifying transactions.
1330 : *
1331 : * c) First incrementally build a snapshot for catalog tuples
1332 : * (BUILDING_SNAPSHOT), that requires all, already in-progress,
1333 : * transactions to finish. Every transaction starting after that
1334 : * (FULL_SNAPSHOT state), has enough information to be decoded. But
1335 : * for older running transactions no viable snapshot exists yet, so
2153 bruce 1336 : * CONSISTENT will only be reached once all of those have finished.
1337 : * ---
1338 : */
1339 :
1340 : /*
1341 : * xl_running_xacts record is older than what we can use, we might not have
1342 : * all necessary catalog rows anymore.
1343 : */
3324 rhaas 1344 GIC 776 : if (TransactionIdIsNormal(builder->initial_xmin_horizon) &&
1345 344 : NormalTransactionIdPrecedes(running->oldestRunningXid,
1346 : builder->initial_xmin_horizon))
1347 : {
3324 rhaas 1348 UIC 0 : ereport(DEBUG1,
1349 : (errmsg_internal("skipping snapshot at %X/%X while building logical decoding snapshot, xmin horizon too low",
1350 : LSN_FORMAT_ARGS(lsn)),
1351 : errdetail_internal("initial xmin horizon of %u vs the snapshot's %u",
1352 : builder->initial_xmin_horizon, running->oldestRunningXid)));
1353 :
1354 :
2157 andres 1355 0 : SnapBuildWaitSnapshot(running, builder->initial_xmin_horizon);
1356 :
3324 rhaas 1357 0 : return true;
1358 : }
1359 :
1360 : /*
1361 : * a) No transaction were running, we can jump to consistent.
1362 : *
1363 : * This is not affected by races around xl_running_xacts, because we can
1364 : * miss transaction commits, but currently not transactions starting.
2157 andres 1365 ECB : *
3324 rhaas 1366 : * NB: We might have already started to incrementally assemble a snapshot,
1367 : * so we need to be careful to deal with that.
1368 : */
2157 andres 1369 GBC 776 : if (running->oldestRunningXid == running->nextXid)
1370 : {
3230 andres 1371 GIC 744 : if (builder->start_decoding_at == InvalidXLogRecPtr ||
1372 414 : builder->start_decoding_at <= lsn)
1373 : /* can decode everything after this */
1374 331 : builder->start_decoding_at = lsn + 1;
1375 :
3069 andres 1376 EUB : /* As no transactions were running xmin/xmax can be trivially set. */
2118 tgl 1377 GIC 744 : builder->xmin = running->nextXid; /* < are finished */
2118 tgl 1378 GBC 744 : builder->xmax = running->nextXid; /* >= are running */
1379 :
1380 : /* so we can safely use the faster comparisons */
3324 rhaas 1381 GIC 744 : Assert(TransactionIdIsNormal(builder->xmin));
1382 744 : Assert(TransactionIdIsNormal(builder->xmax));
1383 :
1384 744 : builder->state = SNAPBUILD_CONSISTENT;
783 andres 1385 744 : builder->next_phase_at = InvalidTransactionId;
1386 :
3324 rhaas 1387 744 : ereport(LOG,
1388 : (errmsg("logical decoding found consistent point at %X/%X",
1389 : LSN_FORMAT_ARGS(lsn)),
3069 peter_e 1390 ECB : errdetail("There are no running transactions.")));
1391 :
3324 rhaas 1392 CBC 744 : return false;
3324 rhaas 1393 ECB : }
1394 : /* b) valid on disk state and not building full snapshot */
2173 andres 1395 CBC 63 : else if (!builder->building_full_snapshot &&
2173 andres 1396 GIC 31 : SnapBuildRestore(builder, lsn))
1397 : {
1398 : /* there won't be any state to cleanup */
3324 rhaas 1399 CBC 6 : return false;
1400 : }
1401 :
3324 rhaas 1402 ECB : /*
2157 andres 1403 : * c) transition from START to BUILDING_SNAPSHOT.
1404 : *
1405 : * In START state, and a xl_running_xacts record with running xacts is
1406 : * encountered. In that case, switch to BUILDING_SNAPSHOT state, and
1407 : * record xl_running_xacts->nextXid. Once all running xacts have finished
1408 : * (i.e. they're all >= nextXid), we have a complete catalog snapshot. It
1409 : * might look that we could use xl_running_xacts's ->xids information to
1410 : * get there quicker, but that is problematic because transactions marked
1411 : * as running, might already have inserted their commit record - it's
1412 : * infeasible to change that with locking.
1413 : */
2157 andres 1414 GIC 26 : else if (builder->state == SNAPBUILD_START)
1415 : {
1416 14 : builder->state = SNAPBUILD_BUILDING_SNAPSHOT;
783 1417 14 : builder->next_phase_at = running->nextXid;
1418 :
1419 : /*
1420 : * Start with an xmin/xmax that's correct for future, when all the
3069 andres 1421 ECB : * currently running transactions have finished. We'll update both
1422 : * while waiting for the pending transactions to finish.
1423 : */
2118 tgl 1424 CBC 14 : builder->xmin = running->nextXid; /* < are finished */
2118 tgl 1425 GIC 14 : builder->xmax = running->nextXid; /* >= are running */
1426 :
1427 : /* so we can safely use the faster comparisons */
3324 rhaas 1428 14 : Assert(TransactionIdIsNormal(builder->xmin));
1429 14 : Assert(TransactionIdIsNormal(builder->xmax));
1430 :
3324 rhaas 1431 CBC 14 : ereport(LOG,
2118 tgl 1432 ECB : (errmsg("logical decoding found initial starting point at %X/%X",
1433 : LSN_FORMAT_ARGS(lsn)),
1434 : errdetail("Waiting for transactions (approximately %d) older than %u to end.",
1435 : running->xcnt, running->nextXid)));
3324 rhaas 1436 :
2157 andres 1437 GIC 14 : SnapBuildWaitSnapshot(running, running->nextXid);
2157 andres 1438 ECB : }
1439 :
1440 : /*
1441 : * c) transition from BUILDING_SNAPSHOT to FULL_SNAPSHOT.
1442 : *
1443 : * In BUILDING_SNAPSHOT state, and this xl_running_xacts' oldestRunningXid
1444 : * is >= than nextXid from when we switched to BUILDING_SNAPSHOT. This
1445 : * means all transactions starting afterwards have enough information to
1446 : * be decoded. Switch to FULL_SNAPSHOT.
1447 : */
2157 andres 1448 GIC 19 : else if (builder->state == SNAPBUILD_BUILDING_SNAPSHOT &&
783 1449 7 : TransactionIdPrecedesOrEquals(builder->next_phase_at,
1450 : running->oldestRunningXid))
1451 : {
2157 1452 7 : builder->state = SNAPBUILD_FULL_SNAPSHOT;
783 1453 7 : builder->next_phase_at = running->nextXid;
1454 :
2157 andres 1455 CBC 7 : ereport(LOG,
2118 tgl 1456 ECB : (errmsg("logical decoding found initial consistent point at %X/%X",
1457 : LSN_FORMAT_ARGS(lsn)),
1458 : errdetail("Waiting for transactions (approximately %d) older than %u to end.",
1459 : running->xcnt, running->nextXid)));
3324 rhaas 1460 :
2157 andres 1461 GIC 7 : SnapBuildWaitSnapshot(running, running->nextXid);
2157 andres 1462 ECB : }
1463 :
1464 : /*
1465 : * c) transition from FULL_SNAPSHOT to CONSISTENT.
1466 : *
1467 : * In FULL_SNAPSHOT state, and this xl_running_xacts' oldestRunningXid is
1468 : * >= than nextXid from when we switched to FULL_SNAPSHOT. This means all
1469 : * transactions that are currently in progress have a catalog snapshot,
1470 : * and all their changes have been collected. Switch to CONSISTENT.
1471 : */
2157 andres 1472 GIC 10 : else if (builder->state == SNAPBUILD_FULL_SNAPSHOT &&
783 1473 5 : TransactionIdPrecedesOrEquals(builder->next_phase_at,
1474 : running->oldestRunningXid))
1475 : {
2157 1476 5 : builder->state = SNAPBUILD_CONSISTENT;
783 1477 5 : builder->next_phase_at = InvalidTransactionId;
3324 rhaas 1478 ECB :
2157 andres 1479 CBC 5 : ereport(LOG,
1480 : (errmsg("logical decoding found consistent point at %X/%X",
1481 : LSN_FORMAT_ARGS(lsn)),
2157 andres 1482 ECB : errdetail("There are no old transactions anymore.")));
3324 rhaas 1483 : }
1484 :
1485 : /*
1486 : * We already started to track running xacts and need to wait for all
1487 : * in-progress ones to finish. We fall through to the normal processing of
1488 : * records so incremental cleanup can be performed.
1489 : */
3324 rhaas 1490 GIC 24 : return true;
1491 : }
1492 :
1493 : /* ---
1494 : * Iterate through xids in record, wait for all older than the cutoff to
1495 : * finish. Then, if possible, log a new xl_running_xacts record.
2157 andres 1496 ECB : *
1497 : * This isn't required for the correctness of decoding, but to:
1498 : * a) allow isolationtester to notice that we're currently waiting for
1499 : * something.
1500 : * b) log a new xl_running_xacts record where it'd be helpful, without having
1501 : * to wait for bgwriter or checkpointer.
1502 : * ---
1503 : */
1504 : static void
2157 andres 1505 GIC 21 : SnapBuildWaitSnapshot(xl_running_xacts *running, TransactionId cutoff)
1506 : {
1507 : int off;
1508 :
1509 40 : for (off = 0; off < running->xcnt; off++)
1510 : {
2157 andres 1511 CBC 21 : TransactionId xid = running->xids[off];
1512 :
1513 : /*
1514 : * Upper layers should prevent that we ever need to wait on ourselves.
2153 bruce 1515 ECB : * Check anyway, since failing to do so would either result in an
1516 : * endless wait or an Assert() failure.
2157 andres 1517 : */
2157 andres 1518 GIC 21 : if (TransactionIdIsCurrentTransactionId(xid))
2157 andres 1519 UIC 0 : elog(ERROR, "waiting for ourselves");
1520 :
2157 andres 1521 GIC 21 : if (TransactionIdFollows(xid, cutoff))
2157 andres 1522 UIC 0 : continue;
1523 :
2157 andres 1524 CBC 21 : XactLockTableWait(xid, NULL, NULL, XLTW_None);
2157 andres 1525 EUB : }
1526 :
2157 andres 1527 ECB : /*
2157 andres 1528 EUB : * All transactions we needed to finish finished - try to ensure there is
1529 : * another xl_running_xacts record in a timely manner, without having to
697 tgl 1530 ECB : * wait for bgwriter or checkpointer to log one. During recovery we can't
1531 : * enforce that, so we'll have to wait.
1532 : */
2157 andres 1533 GIC 19 : if (!RecoveryInProgress())
1534 : {
1535 19 : LogStandbySnapshot();
1536 : }
1537 19 : }
1538 :
3324 rhaas 1539 ECB : /* -----------------------------------
1540 : * Snapshot serialization support
1541 : * -----------------------------------
1542 : */
1543 :
1544 : /*
1545 : * We store current state of struct SnapBuild on disk in the following manner:
1546 : *
1547 : * struct SnapBuildOnDisk;
1548 : * TransactionId * committed.xcnt; (*not xcnt_space*)
1549 : * TransactionId * catchange.xcnt;
1550 : *
1551 : */
1552 : typedef struct SnapBuildOnDisk
1553 : {
1554 : /* first part of this struct needs to be version independent */
1555 :
1556 : /* data not covered by checksum */
1557 : uint32 magic;
1558 : pg_crc32c checksum;
1559 :
1560 : /* data covered by checksum */
1561 :
1562 : /* version, in case we want to support pg_upgrade */
1563 : uint32 version;
1564 : /* how large is the on disk data, excluding the constant sized part */
1565 : uint32 length;
1566 :
1567 : /* version dependent part */
1568 : SnapBuild builder;
1569 :
1570 : /* variable amount of TransactionIds follows */
1571 : } SnapBuildOnDisk;
1572 :
1573 : #define SnapBuildOnDiskConstantSize \
1574 : offsetof(SnapBuildOnDisk, builder)
1575 : #define SnapBuildOnDiskNotChecksummedSize \
1576 : offsetof(SnapBuildOnDisk, version)
1577 :
1578 : #define SNAPBUILD_MAGIC 0x51A1E001
1579 : #define SNAPBUILD_VERSION 5
1580 :
1581 : /*
1582 : * Store/Load a snapshot from disk, depending on the snapshot builder's state.
1583 : *
1584 : * Supposed to be used by external (i.e. not snapbuild.c) code that just read
1585 : * a record that's a potential location for a serialized snapshot.
1586 : */
1587 : void
3324 rhaas 1588 GIC 23 : SnapBuildSerializationPoint(SnapBuild *builder, XLogRecPtr lsn)
1589 : {
1590 23 : if (builder->state < SNAPBUILD_CONSISTENT)
3324 rhaas 1591 UIC 0 : SnapBuildRestore(builder, lsn);
1592 : else
3324 rhaas 1593 GIC 23 : SnapBuildSerialize(builder, lsn);
1594 23 : }
3324 rhaas 1595 ECB :
1596 : /*
1597 : * Serialize the snapshot 'builder' at the location 'lsn' if it hasn't already
3324 rhaas 1598 EUB : * been done by another decoding process.
1599 : */
3324 rhaas 1600 ECB : static void
3324 rhaas 1601 CBC 273 : SnapBuildSerialize(SnapBuild *builder, XLogRecPtr lsn)
1602 : {
1603 : Size needed_length;
816 akapila 1604 GIC 273 : SnapBuildOnDisk *ondisk = NULL;
241 akapila 1605 GNC 273 : TransactionId *catchange_xip = NULL;
1606 : MemoryContext old_ctx;
1607 : size_t catchange_xcnt;
1608 : char *ondisk_c;
1609 : int fd;
1610 : char tmppath[MAXPGPATH];
3324 rhaas 1611 ECB : char path[MAXPGPATH];
1612 : int ret;
1613 : struct stat stat_buf;
3261 heikki.linnakangas 1614 : Size sz;
3324 rhaas 1615 :
3324 rhaas 1616 GIC 273 : Assert(lsn != InvalidXLogRecPtr);
1617 273 : Assert(builder->last_serialized_snapshot == InvalidXLogRecPtr ||
1618 : builder->last_serialized_snapshot <= lsn);
1619 :
1620 : /*
1621 : * no point in serializing if we cannot continue to work immediately after
1622 : * restoring the snapshot
1623 : */
1624 273 : if (builder->state < SNAPBUILD_CONSISTENT)
3324 rhaas 1625 UIC 0 : return;
3324 rhaas 1626 ECB :
783 andres 1627 : /* consistent snapshots have no next phase */
783 andres 1628 GIC 273 : Assert(builder->next_phase_at == InvalidTransactionId);
1629 :
1630 : /*
1631 : * We identify snapshots by the LSN they are valid for. We don't need to
1632 : * include timelines in the name as each LSN maps to exactly one timeline
1633 : * unless the user used pg_resetwal or similar. If a user did so, there's
3324 rhaas 1634 ECB : * no hope continuing to decode anyway.
3324 rhaas 1635 EUB : */
3203 andres 1636 GIC 273 : sprintf(path, "pg_logical/snapshots/%X-%X.snap",
775 peter 1637 273 : LSN_FORMAT_ARGS(lsn));
3324 rhaas 1638 ECB :
1639 : /*
1640 : * first check whether some other backend already has written the snapshot
1641 : * for this LSN. It's perfectly fine if there's none, so we accept ENOENT
1642 : * as a valid state. Everything else is an unexpected error.
1643 : */
3324 rhaas 1644 GIC 273 : ret = stat(path, &stat_buf);
1645 :
3324 rhaas 1646 CBC 273 : if (ret != 0 && errno != ENOENT)
3324 rhaas 1647 LBC 0 : ereport(ERROR,
1648 : (errcode_for_file_access(),
1649 : errmsg("could not stat file \"%s\": %m", path)));
1650 :
3324 rhaas 1651 GIC 273 : else if (ret == 0)
1652 : {
1653 : /*
3324 rhaas 1654 ECB : * somebody else has already serialized to this point, don't overwrite
1655 : * but remember location, so we don't need to read old data again.
1656 : *
3324 rhaas 1657 EUB : * To be sure it has been synced to disk after the rename() from the
1658 : * tempfile filename to the real filename, we just repeat the fsync.
1659 : * That ought to be cheap because in most scenarios it should already
1660 : * be safely on disk.
3324 rhaas 1661 ECB : */
3324 rhaas 1662 GIC 65 : fsync_fname(path, false);
3203 andres 1663 65 : fsync_fname("pg_logical/snapshots", true);
1664 :
3324 rhaas 1665 65 : builder->last_serialized_snapshot = lsn;
1666 65 : goto out;
1667 : }
1668 :
1669 : /*
1670 : * there is an obvious race condition here between the time we stat(2) the
1671 : * file and us writing the file. But we rename the file into place
3324 rhaas 1672 ECB : * atomically and all files created need to contain the same data anyway,
1673 : * so this is perfectly fine, although a bit of a resource waste. Locking
1674 : * seems like pointless complication.
1675 : */
3324 rhaas 1676 CBC 208 : elog(DEBUG1, "serializing snapshot to %s", path);
1677 :
1678 : /* to make sure only we will write to this tempfile, include pid */
720 peter 1679 GIC 208 : sprintf(tmppath, "pg_logical/snapshots/%X-%X.snap.%d.tmp",
775 1680 208 : LSN_FORMAT_ARGS(lsn), MyProcPid);
1681 :
1682 : /*
1683 : * Unlink temporary file if it already exists, needs to have been before a
1684 : * crash/error since we won't enter this function twice from within a
1685 : * single decoding slot/backend and the temporary file contains the pid of
3324 rhaas 1686 ECB : * the current process.
1687 : */
3324 rhaas 1688 GIC 208 : if (unlink(tmppath) != 0 && errno != ENOENT)
3324 rhaas 1689 LBC 0 : ereport(ERROR,
3324 rhaas 1690 ECB : (errcode_for_file_access(),
1691 : errmsg("could not remove file \"%s\": %m", tmppath)));
1692 :
241 akapila 1693 GNC 208 : old_ctx = MemoryContextSwitchTo(builder->context);
1694 :
1695 : /* Get the catalog modifying transactions that are yet not committed */
1696 208 : catchange_xip = ReorderBufferGetCatalogChangesXacts(builder->reorder);
158 drowley 1697 208 : catchange_xcnt = dclist_count(&builder->reorder->catchange_txns);
1698 :
3324 rhaas 1699 208 : needed_length = sizeof(SnapBuildOnDisk) +
241 akapila 1700 208 : sizeof(TransactionId) * (builder->committed.xcnt + catchange_xcnt);
1701 :
1702 208 : ondisk_c = palloc0(needed_length);
3324 rhaas 1703 GIC 208 : ondisk = (SnapBuildOnDisk *) ondisk_c;
3324 rhaas 1704 CBC 208 : ondisk->magic = SNAPBUILD_MAGIC;
3324 rhaas 1705 GBC 208 : ondisk->version = SNAPBUILD_VERSION;
3324 rhaas 1706 GIC 208 : ondisk->length = needed_length;
3078 heikki.linnakangas 1707 208 : INIT_CRC32C(ondisk->checksum);
1708 208 : COMP_CRC32C(ondisk->checksum,
3078 heikki.linnakangas 1709 ECB : ((char *) ondisk) + SnapBuildOnDiskNotChecksummedSize,
1710 : SnapBuildOnDiskConstantSize - SnapBuildOnDiskNotChecksummedSize);
3324 rhaas 1711 GIC 208 : ondisk_c += sizeof(SnapBuildOnDisk);
3324 rhaas 1712 ECB :
3324 rhaas 1713 CBC 208 : memcpy(&ondisk->builder, builder, sizeof(SnapBuild));
1714 : /* NULL-ify memory-only data */
1715 208 : ondisk->builder.context = NULL;
1716 208 : ondisk->builder.snapshot = NULL;
3324 rhaas 1717 GIC 208 : ondisk->builder.reorder = NULL;
3324 rhaas 1718 CBC 208 : ondisk->builder.committed.xip = NULL;
241 akapila 1719 GNC 208 : ondisk->builder.catchange.xip = NULL;
1720 : /* update catchange only on disk data */
1721 208 : ondisk->builder.catchange.xcnt = catchange_xcnt;
3324 rhaas 1722 ECB :
3078 heikki.linnakangas 1723 CBC 208 : COMP_CRC32C(ondisk->checksum,
3078 heikki.linnakangas 1724 ECB : &ondisk->builder,
1725 : sizeof(SnapBuild));
3324 rhaas 1726 :
1727 : /* copy committed xacts */
241 akapila 1728 GNC 208 : if (builder->committed.xcnt > 0)
1729 : {
1730 44 : sz = sizeof(TransactionId) * builder->committed.xcnt;
1731 44 : memcpy(ondisk_c, builder->committed.xip, sz);
1732 44 : COMP_CRC32C(ondisk->checksum, ondisk_c, sz);
1733 44 : ondisk_c += sz;
1734 : }
1735 :
1736 : /* copy catalog modifying xacts */
1737 208 : if (catchange_xcnt > 0)
1738 : {
1739 6 : sz = sizeof(TransactionId) * catchange_xcnt;
1740 6 : memcpy(ondisk_c, catchange_xip, sz);
1741 6 : COMP_CRC32C(ondisk->checksum, ondisk_c, sz);
1742 6 : ondisk_c += sz;
1743 : }
3324 rhaas 1744 ECB :
3070 andres 1745 GIC 208 : FIN_CRC32C(ondisk->checksum);
3070 andres 1746 ECB :
3324 rhaas 1747 : /* we have valid data now, open tempfile and write it there */
3324 rhaas 1748 CBC 208 : fd = OpenTransientFile(tmppath,
2024 peter_e 1749 ECB : O_CREAT | O_EXCL | O_WRONLY | PG_BINARY);
3324 rhaas 1750 CBC 208 : if (fd < 0)
3324 rhaas 1751 UIC 0 : ereport(ERROR,
1517 tgl 1752 ECB : (errcode_for_file_access(),
1753 : errmsg("could not open file \"%s\": %m", tmppath)));
3324 rhaas 1754 :
1708 michael 1755 GIC 208 : errno = 0;
2213 rhaas 1756 208 : pgstat_report_wait_start(WAIT_EVENT_SNAPBUILD_WRITE);
3324 1757 208 : if ((write(fd, ondisk, needed_length)) != needed_length)
1758 : {
1749 michael 1759 LBC 0 : int save_errno = errno;
1760 :
3324 rhaas 1761 0 : CloseTransientFile(fd);
1749 michael 1762 ECB :
1763 : /* if write didn't set errno, assume problem is no disk space */
1749 michael 1764 LBC 0 : errno = save_errno ? save_errno : ENOSPC;
3324 rhaas 1765 UIC 0 : ereport(ERROR,
1766 : (errcode_for_file_access(),
1767 : errmsg("could not write to file \"%s\": %m", tmppath)));
3324 rhaas 1768 ECB : }
2213 rhaas 1769 GIC 208 : pgstat_report_wait_end();
3324 rhaas 1770 ECB :
1771 : /*
1772 : * fsync the file before renaming so that even if we crash after this we
1773 : * have either a fully valid file or nothing.
1774 : *
1775 : * It's safe to just ERROR on fsync() here because we'll retry the whole
1602 tmunro 1776 : * operation including the writes.
1777 : *
1778 : * TODO: Do the fsync() via checkpoints/restartpoints, doing it here has
3324 rhaas 1779 : * some noticeable overhead since it's performed synchronously during
1780 : * decoding?
1781 : */
2213 rhaas 1782 GBC 208 : pgstat_report_wait_start(WAIT_EVENT_SNAPBUILD_SYNC);
3324 rhaas 1783 GIC 208 : if (pg_fsync(fd) != 0)
1784 : {
1749 michael 1785 UIC 0 : int save_errno = errno;
1749 michael 1786 ECB :
3324 rhaas 1787 LBC 0 : CloseTransientFile(fd);
1749 michael 1788 0 : errno = save_errno;
3324 rhaas 1789 UIC 0 : ereport(ERROR,
3324 rhaas 1790 EUB : (errcode_for_file_access(),
1791 : errmsg("could not fsync file \"%s\": %m", tmppath)));
1792 : }
2213 rhaas 1793 GIC 208 : pgstat_report_wait_end();
1794 :
1373 peter 1795 GBC 208 : if (CloseTransientFile(fd) != 0)
1492 michael 1796 UBC 0 : ereport(ERROR,
1797 : (errcode_for_file_access(),
1798 : errmsg("could not close file \"%s\": %m", tmppath)));
1799 :
3203 andres 1800 CBC 208 : fsync_fname("pg_logical/snapshots", true);
1801 :
1802 : /*
1803 : * We may overwrite the work from some other backend, but that's ok, our
1804 : * snapshot is valid as well, we'll just have done some superfluous work.
1805 : */
3324 rhaas 1806 GIC 208 : if (rename(tmppath, path) != 0)
1807 : {
3324 rhaas 1808 UIC 0 : ereport(ERROR,
1809 : (errcode_for_file_access(),
1810 : errmsg("could not rename file \"%s\" to \"%s\": %m",
1811 : tmppath, path)));
1812 : }
3324 rhaas 1813 ECB :
1814 : /* make sure we persist */
3324 rhaas 1815 GIC 208 : fsync_fname(path, false);
3203 andres 1816 GBC 208 : fsync_fname("pg_logical/snapshots", true);
1817 :
3324 rhaas 1818 EUB : /*
1819 : * Now there's no way we can lose the dumped state anymore, remember this
3260 bruce 1820 : * as a serialization point.
1821 : */
3324 rhaas 1822 GIC 208 : builder->last_serialized_snapshot = lsn;
1823 :
241 akapila 1824 GNC 208 : MemoryContextSwitchTo(old_ctx);
1825 :
3324 rhaas 1826 CBC 273 : out:
3324 rhaas 1827 GIC 273 : ReorderBufferSetRestartPoint(builder->reorder,
3324 rhaas 1828 ECB : builder->last_serialized_snapshot);
816 akapila 1829 EUB : /* be tidy */
816 akapila 1830 GIC 273 : if (ondisk)
1831 208 : pfree(ondisk);
241 akapila 1832 GNC 273 : if (catchange_xip)
1833 6 : pfree(catchange_xip);
1834 : }
3324 rhaas 1835 ECB :
1836 : /*
1837 : * Restore a snapshot into 'builder' if previously one has been stored at the
1838 : * location indicated by 'lsn'. Returns true if successful, false otherwise.
1839 : */
1840 : static bool
3324 rhaas 1841 CBC 31 : SnapBuildRestore(SnapBuild *builder, XLogRecPtr lsn)
1842 : {
3324 rhaas 1843 EUB : SnapBuildOnDisk ondisk;
1844 : int fd;
1845 : char path[MAXPGPATH];
1846 : Size sz;
1847 : pg_crc32c checksum;
1848 :
3324 rhaas 1849 ECB : /* no point in loading a snapshot if we're already there */
3324 rhaas 1850 CBC 31 : if (builder->state == SNAPBUILD_CONSISTENT)
3324 rhaas 1851 UIC 0 : return false;
1852 :
3203 andres 1853 GIC 31 : sprintf(path, "pg_logical/snapshots/%X-%X.snap",
775 peter 1854 31 : LSN_FORMAT_ARGS(lsn));
1855 :
2024 peter_e 1856 CBC 31 : fd = OpenTransientFile(path, O_RDONLY | PG_BINARY);
1857 :
3324 rhaas 1858 31 : if (fd < 0 && errno == ENOENT)
3324 rhaas 1859 GIC 25 : return false;
3324 rhaas 1860 CBC 6 : else if (fd < 0)
3324 rhaas 1861 LBC 0 : ereport(ERROR,
1862 : (errcode_for_file_access(),
1863 : errmsg("could not open file \"%s\": %m", path)));
3324 rhaas 1864 ECB :
1865 : /* ----
1866 : * Make sure the snapshot had been stored safely to disk, that's normally
1867 : * cheap.
1868 : * Note that we do not need PANIC here, nobody will be able to use the
1869 : * slot without fsyncing, and saving it won't succeed without an fsync()
1870 : * either...
1871 : * ----
1872 : */
3324 rhaas 1873 GIC 6 : fsync_fname(path, false);
3203 andres 1874 6 : fsync_fname("pg_logical/snapshots", true);
3324 rhaas 1875 ECB :
1876 :
1877 : /* read statically sized portion of snapshot */
241 akapila 1878 GNC 6 : SnapBuildRestoreContents(fd, (char *) &ondisk, SnapBuildOnDiskConstantSize, path);
1879 :
3324 rhaas 1880 GIC 6 : if (ondisk.magic != SNAPBUILD_MAGIC)
3324 rhaas 1881 UIC 0 : ereport(ERROR,
1882 : (errcode(ERRCODE_DATA_CORRUPTED),
1883 : errmsg("snapbuild state file \"%s\" has wrong magic number: %u instead of %u",
1884 : path, ondisk.magic, SNAPBUILD_MAGIC)));
3324 rhaas 1885 ECB :
3324 rhaas 1886 CBC 6 : if (ondisk.version != SNAPBUILD_VERSION)
3324 rhaas 1887 UIC 0 : ereport(ERROR,
1888 : (errcode(ERRCODE_DATA_CORRUPTED),
1889 : errmsg("snapbuild state file \"%s\" has unsupported version: %u instead of %u",
3324 rhaas 1890 ECB : path, ondisk.version, SNAPBUILD_VERSION)));
1891 :
3078 heikki.linnakangas 1892 CBC 6 : INIT_CRC32C(checksum);
3078 heikki.linnakangas 1893 GBC 6 : COMP_CRC32C(checksum,
1894 : ((char *) &ondisk) + SnapBuildOnDiskNotChecksummedSize,
1895 : SnapBuildOnDiskConstantSize - SnapBuildOnDiskNotChecksummedSize);
1896 :
1897 : /* read SnapBuild */
241 akapila 1898 GNC 6 : SnapBuildRestoreContents(fd, (char *) &ondisk.builder, sizeof(SnapBuild), path);
3078 heikki.linnakangas 1899 GIC 6 : COMP_CRC32C(checksum, &ondisk.builder, sizeof(SnapBuild));
1900 :
1901 : /* restore committed xacts information */
241 akapila 1902 GNC 6 : if (ondisk.builder.committed.xcnt > 0)
1903 : {
1904 2 : sz = sizeof(TransactionId) * ondisk.builder.committed.xcnt;
1905 2 : ondisk.builder.committed.xip = MemoryContextAllocZero(builder->context, sz);
1906 2 : SnapBuildRestoreContents(fd, (char *) ondisk.builder.committed.xip, sz, path);
1907 2 : COMP_CRC32C(checksum, ondisk.builder.committed.xip, sz);
1908 : }
1909 :
1910 : /* restore catalog modifying xacts information */
1911 6 : if (ondisk.builder.catchange.xcnt > 0)
1912 : {
1913 3 : sz = sizeof(TransactionId) * ondisk.builder.catchange.xcnt;
1914 3 : ondisk.builder.catchange.xip = MemoryContextAllocZero(builder->context, sz);
1915 3 : SnapBuildRestoreContents(fd, (char *) ondisk.builder.catchange.xip, sz, path);
1916 3 : COMP_CRC32C(checksum, ondisk.builder.catchange.xip, sz);
1917 : }
1918 :
1373 peter 1919 GIC 6 : if (CloseTransientFile(fd) != 0)
1492 michael 1920 UIC 0 : ereport(ERROR,
1921 : (errcode_for_file_access(),
1922 : errmsg("could not close file \"%s\": %m", path)));
1923 :
3070 andres 1924 GIC 6 : FIN_CRC32C(checksum);
3070 andres 1925 ECB :
3324 rhaas 1926 EUB : /* verify checksum of what we've read */
3078 heikki.linnakangas 1927 GIC 6 : if (!EQ_CRC32C(checksum, ondisk.checksum))
3324 rhaas 1928 UIC 0 : ereport(ERROR,
1929 : (errcode(ERRCODE_DATA_CORRUPTED),
1930 : errmsg("checksum mismatch for snapbuild state file \"%s\": is %u, should be %u",
1931 : path, checksum, ondisk.checksum)));
3324 rhaas 1932 ECB :
3324 rhaas 1933 EUB : /*
1934 : * ok, we now have a sensible snapshot here, figure out if it has more
1935 : * information than we have.
3324 rhaas 1936 ECB : */
1937 :
1938 : /*
1939 : * We are only interested in consistent snapshots for now, comparing
2881 heikki.linnakangas 1940 : * whether one incomplete snapshot is more "advanced" seems to be
3324 rhaas 1941 : * unnecessarily complex.
1942 : */
3324 rhaas 1943 CBC 6 : if (ondisk.builder.state < SNAPBUILD_CONSISTENT)
3324 rhaas 1944 UIC 0 : goto snapshot_not_interesting;
1945 :
3324 rhaas 1946 ECB : /*
1947 : * Don't use a snapshot that requires an xmin that we cannot guarantee to
1948 : * be available.
1949 : */
3324 rhaas 1950 CBC 6 : if (TransactionIdPrecedes(ondisk.builder.xmin, builder->initial_xmin_horizon))
3324 rhaas 1951 UIC 0 : goto snapshot_not_interesting;
3324 rhaas 1952 ECB :
1953 : /* consistent snapshots have no next phase */
783 andres 1954 GIC 6 : Assert(ondisk.builder.next_phase_at == InvalidTransactionId);
3324 rhaas 1955 ECB :
3324 rhaas 1956 EUB : /* ok, we think the snapshot is sensible, copy over everything important */
3324 rhaas 1957 CBC 6 : builder->xmin = ondisk.builder.xmin;
1958 6 : builder->xmax = ondisk.builder.xmax;
1959 6 : builder->state = ondisk.builder.state;
1960 :
3324 rhaas 1961 GIC 6 : builder->committed.xcnt = ondisk.builder.committed.xcnt;
3324 rhaas 1962 ECB : /* We only allocated/stored xcnt, not xcnt_space xids ! */
1963 : /* don't overwrite preallocated xip, if we don't have anything here */
3324 rhaas 1964 GBC 6 : if (builder->committed.xcnt > 0)
1965 : {
3324 rhaas 1966 CBC 2 : pfree(builder->committed.xip);
1967 2 : builder->committed.xcnt_space = ondisk.builder.committed.xcnt;
3324 rhaas 1968 GIC 2 : builder->committed.xip = ondisk.builder.committed.xip;
3324 rhaas 1969 ECB : }
3324 rhaas 1970 GIC 6 : ondisk.builder.committed.xip = NULL;
3324 rhaas 1971 ECB :
1972 : /* set catalog modifying transactions */
241 akapila 1973 GNC 6 : if (builder->catchange.xip)
241 akapila 1974 UNC 0 : pfree(builder->catchange.xip);
241 akapila 1975 GNC 6 : builder->catchange.xcnt = ondisk.builder.catchange.xcnt;
1976 6 : builder->catchange.xip = ondisk.builder.catchange.xip;
1977 6 : ondisk.builder.catchange.xip = NULL;
1978 :
1979 : /* our snapshot is not interesting anymore, build a new one */
3324 rhaas 1980 CBC 6 : if (builder->snapshot != NULL)
1981 : {
3324 rhaas 1982 UIC 0 : SnapBuildSnapDecRefcount(builder->snapshot);
1983 : }
2125 andres 1984 CBC 6 : builder->snapshot = SnapBuildBuildSnapshot(builder);
3324 rhaas 1985 GIC 6 : SnapBuildSnapIncRefcount(builder->snapshot);
3324 rhaas 1986 EUB :
3324 rhaas 1987 GBC 6 : ReorderBufferSetRestartPoint(builder->reorder, lsn);
3324 rhaas 1988 EUB :
3324 rhaas 1989 GBC 6 : Assert(builder->state == SNAPBUILD_CONSISTENT);
3324 rhaas 1990 EUB :
3324 rhaas 1991 GBC 6 : ereport(LOG,
1992 : (errmsg("logical decoding found consistent point at %X/%X",
1993 : LSN_FORMAT_ARGS(lsn)),
1994 : errdetail("Logical decoding will begin using saved snapshot.")));
3324 rhaas 1995 GIC 6 : return true;
1996 :
3324 rhaas 1997 UIC 0 : snapshot_not_interesting:
3324 rhaas 1998 LBC 0 : if (ondisk.builder.committed.xip != NULL)
3324 rhaas 1999 UIC 0 : pfree(ondisk.builder.committed.xip);
241 akapila 2000 UNC 0 : if (ondisk.builder.catchange.xip != NULL)
2001 0 : pfree(ondisk.builder.catchange.xip);
3324 rhaas 2002 UIC 0 : return false;
2003 : }
3324 rhaas 2004 ECB :
2005 : /*
2006 : * Read the contents of the serialized snapshot to 'dest'.
2007 : */
2008 : static void
241 akapila 2009 GNC 17 : SnapBuildRestoreContents(int fd, char *dest, Size size, const char *path)
2010 : {
2011 : int readBytes;
2012 :
2013 17 : pgstat_report_wait_start(WAIT_EVENT_SNAPBUILD_READ);
2014 17 : readBytes = read(fd, dest, size);
2015 17 : pgstat_report_wait_end();
2016 17 : if (readBytes != size)
2017 : {
241 akapila 2018 UNC 0 : int save_errno = errno;
2019 :
2020 0 : CloseTransientFile(fd);
2021 :
2022 0 : if (readBytes < 0)
2023 : {
2024 0 : errno = save_errno;
2025 0 : ereport(ERROR,
2026 : (errcode_for_file_access(),
2027 : errmsg("could not read file \"%s\": %m", path)));
2028 : }
2029 : else
2030 0 : ereport(ERROR,
2031 : (errcode(ERRCODE_DATA_CORRUPTED),
2032 : errmsg("could not read file \"%s\": read %d of %zu",
2033 : path, readBytes, sizeof(SnapBuild))));
2034 : }
241 akapila 2035 GNC 17 : }
2036 :
3324 rhaas 2037 ECB : /*
2038 : * Remove all serialized snapshots that are not required anymore because no
2039 : * slot can need them. This doesn't actually have to run during a checkpoint,
2040 : * but it's a convenient point to schedule this.
3324 rhaas 2041 EUB : *
2042 : * NB: We run this during checkpoints even if logical decoding is disabled so
2043 : * we cleanup old slots at some point after it got disabled.
2044 : */
2045 : void
3324 rhaas 2046 GIC 2363 : CheckPointSnapBuild(void)
3324 rhaas 2047 EUB : {
2048 : XLogRecPtr cutoff;
2049 : XLogRecPtr redo;
2050 : DIR *snap_dir;
2051 : struct dirent *snap_de;
2052 : char path[MAXPGPATH + 21];
2053 :
2054 : /*
2055 : * We start off with a minimum of the last redo pointer. No new
2056 : * replication slot will start before that, so that's a safe upper bound
2057 : * for removal.
3324 rhaas 2058 ECB : */
3324 rhaas 2059 GIC 2363 : redo = GetRedoRecPtr();
2060 :
2061 : /* now check for the restart ptrs from existing slots */
2062 2363 : cutoff = ReplicationSlotsComputeLogicalRestartLSN();
2063 :
2064 : /* don't start earlier than the restart lsn */
2065 2363 : if (redo < cutoff)
3324 rhaas 2066 UIC 0 : cutoff = redo;
2067 :
3203 andres 2068 GIC 2363 : snap_dir = AllocateDir("pg_logical/snapshots");
3203 andres 2069 CBC 7279 : while ((snap_de = ReadDir(snap_dir, "pg_logical/snapshots")) != NULL)
2070 : {
2071 : uint32 hi;
2072 : uint32 lo;
2073 : XLogRecPtr lsn;
2074 : PGFileType de_type;
2075 :
3324 rhaas 2076 GIC 4916 : if (strcmp(snap_de->d_name, ".") == 0 ||
2077 2553 : strcmp(snap_de->d_name, "..") == 0)
2078 4726 : continue;
2079 :
2189 peter_e 2080 190 : snprintf(path, sizeof(path), "pg_logical/snapshots/%s", snap_de->d_name);
219 michael 2081 GNC 190 : de_type = get_dirent_type(path, snap_de, false, DEBUG1);
2082 :
2083 190 : if (de_type != PGFILETYPE_ERROR && de_type != PGFILETYPE_REG)
2084 : {
3324 rhaas 2085 UIC 0 : elog(DEBUG1, "only regular files expected: %s", path);
3324 rhaas 2086 LBC 0 : continue;
2087 : }
2088 :
3324 rhaas 2089 ECB : /*
3324 rhaas 2090 EUB : * temporary filenames from SnapBuildSerialize() include the LSN and
2091 : * everything but are postfixed by .$pid.tmp. We can just remove them
3260 bruce 2092 ECB : * the same as other files because there can be none that are
2093 : * currently being written that are older than cutoff.
2094 : *
2095 : * We just log a message if a file doesn't fit the pattern, it's
2096 : * probably some editors lock/state file or similar...
2097 : */
3324 rhaas 2098 GIC 190 : if (sscanf(snap_de->d_name, "%X-%X.snap", &hi, &lo) != 2)
2099 : {
3324 rhaas 2100 LBC 0 : ereport(LOG,
3138 peter_e 2101 ECB : (errmsg("could not parse file name \"%s\"", path)));
3324 rhaas 2102 LBC 0 : continue;
2103 : }
3324 rhaas 2104 ECB :
3324 rhaas 2105 CBC 190 : lsn = ((uint64) hi) << 32 | lo;
2106 :
3324 rhaas 2107 ECB : /* check whether we still need it */
3324 rhaas 2108 GIC 190 : if (lsn < cutoff || cutoff == InvalidXLogRecPtr)
3324 rhaas 2109 EUB : {
3324 rhaas 2110 GBC 147 : elog(DEBUG1, "removing snapbuild snapshot %s", path);
2111 :
2112 : /*
2113 : * It's not particularly harmful, though strange, if we can't
2114 : * remove the file here. Don't prevent the checkpoint from
2115 : * completing, that'd be a cure worse than the disease.
2116 : */
3324 rhaas 2117 GIC 147 : if (unlink(path) < 0)
2118 : {
3324 rhaas 2119 UIC 0 : ereport(LOG,
2120 : (errcode_for_file_access(),
2121 : errmsg("could not remove file \"%s\": %m",
3324 rhaas 2122 ECB : path)));
3324 rhaas 2123 UIC 0 : continue;
3324 rhaas 2124 EUB : }
2125 : }
2126 : }
3324 rhaas 2127 GIC 2363 : FreeDir(snap_dir);
2128 2363 : }
|