Age Owner Branch data TLA Line data Source code
1 : : /*-------------------------------------------------------------------------
2 : : *
3 : : * snapbuild.c
4 : : *
5 : : * Infrastructure for building historic catalog snapshots based on contents
6 : : * of the WAL, for the purpose of decoding heapam.c style values in the
7 : : * WAL.
8 : : *
9 : : * NOTES:
10 : : *
11 : : * We build snapshots which can *only* be used to read catalog contents and we
12 : : * do so by reading and interpreting the WAL stream. The aim is to build a
13 : : * snapshot that behaves the same as a freshly taken MVCC snapshot would have
14 : : * at the time the XLogRecord was generated.
15 : : *
16 : : * To build the snapshots we reuse the infrastructure built for Hot
17 : : * Standby. The in-memory snapshots we build look different than HS' because
18 : : * we have different needs. To successfully decode data from the WAL we only
19 : : * need to access catalog tables and (sys|rel|cat)cache, not the actual user
20 : : * tables since the data we decode is wholly contained in the WAL
21 : : * records. Also, our snapshots need to be different in comparison to normal
22 : : * MVCC ones because in contrast to those we cannot fully rely on the clog and
23 : : * pg_subtrans for information about committed transactions because they might
24 : : * commit in the future from the POV of the WAL entry we're currently
25 : : * decoding. This definition has the advantage that we only need to prevent
26 : : * removal of catalog rows, while normal table's rows can still be
27 : : * removed. This is achieved by using the replication slot mechanism.
28 : : *
29 : : * As the percentage of transactions modifying the catalog normally is fairly
30 : : * small in comparisons to ones only manipulating user data, we keep track of
31 : : * the committed catalog modifying ones inside [xmin, xmax) instead of keeping
32 : : * track of all running transactions like it's done in a normal snapshot. Note
33 : : * that we're generally only looking at transactions that have acquired an
34 : : * xid. That is we keep a list of transactions between snapshot->(xmin, xmax)
35 : : * that we consider committed, everything else is considered aborted/in
36 : : * progress. That also allows us not to care about subtransactions before they
37 : : * have committed which means this module, in contrast to HS, doesn't have to
38 : : * care about suboverflowed subtransactions and similar.
39 : : *
40 : : * One complexity of doing this is that to e.g. handle mixed DDL/DML
41 : : * transactions we need Snapshots that see intermediate versions of the
42 : : * catalog in a transaction. During normal operation this is achieved by using
43 : : * CommandIds/cmin/cmax. The problem with that however is that for space
44 : : * efficiency reasons, the cmin and cmax are not included in WAL records. We
45 : : * cannot read the cmin/cmax from the tuple itself, either, because it is
46 : : * reset on crash recovery. Even if we could, we could not decode combocids
47 : : * which are only tracked in the original backend's memory. To work around
48 : : * that, heapam writes an extra WAL record (XLOG_HEAP2_NEW_CID) every time a
49 : : * catalog row is modified, which includes the cmin and cmax of the
50 : : * tuple. During decoding, we insert the ctid->(cmin,cmax) mappings into the
51 : : * reorder buffer, and use them at visibility checks instead of the cmin/cmax
52 : : * on the tuple itself. Check the reorderbuffer.c's comment above
53 : : * ResolveCminCmaxDuringDecoding() for details.
54 : : *
55 : : * To facilitate all this we need our own visibility routine, as the normal
56 : : * ones are optimized for different usecases.
57 : : *
58 : : * To replace the normal catalog snapshots with decoding ones use the
59 : : * SetupHistoricSnapshot() and TeardownHistoricSnapshot() functions.
60 : : *
61 : : *
62 : : *
63 : : * The snapbuild machinery is starting up in several stages, as illustrated
64 : : * by the following graph describing the SnapBuild->state transitions:
65 : : *
66 : : * +-------------------------+
67 : : * +----| START |-------------+
68 : : * | +-------------------------+ |
69 : : * | | |
70 : : * | | |
71 : : * | running_xacts #1 |
72 : : * | | |
73 : : * | | |
74 : : * | v |
75 : : * | +-------------------------+ v
76 : : * | | BUILDING_SNAPSHOT |------------>|
77 : : * | +-------------------------+ |
78 : : * | | |
79 : : * | | |
80 : : * | running_xacts #2, xacts from #1 finished |
81 : : * | | |
82 : : * | | |
83 : : * | v |
84 : : * | +-------------------------+ v
85 : : * | | FULL_SNAPSHOT |------------>|
86 : : * | +-------------------------+ |
87 : : * | | |
88 : : * running_xacts | saved snapshot
89 : : * with zero xacts | at running_xacts's lsn
90 : : * | | |
91 : : * | running_xacts with xacts from #2 finished |
92 : : * | | |
93 : : * | v |
94 : : * | +-------------------------+ |
95 : : * +--->|SNAPBUILD_CONSISTENT |<------------+
96 : : * +-------------------------+
97 : : *
98 : : * Initially the machinery is in the START stage. When an xl_running_xacts
99 : : * record is read that is sufficiently new (above the safe xmin horizon),
100 : : * there's a state transition. If there were no running xacts when the
101 : : * xl_running_xacts record was generated, we'll directly go into CONSISTENT
102 : : * state, otherwise we'll switch to the BUILDING_SNAPSHOT state. Having a full
103 : : * snapshot means that all transactions that start henceforth can be decoded
104 : : * in their entirety, but transactions that started previously can't. In
105 : : * FULL_SNAPSHOT we'll switch into CONSISTENT once all those previously
106 : : * running transactions have committed or aborted.
107 : : *
108 : : * Only transactions that commit after CONSISTENT state has been reached will
109 : : * be replayed, even though they might have started while still in
110 : : * FULL_SNAPSHOT. That ensures that we'll reach a point where no previous
111 : : * changes has been exported, but all the following ones will be. That point
112 : : * is a convenient point to initialize replication from, which is why we
113 : : * export a snapshot at that point, which *can* be used to read normal data.
114 : : *
115 : : * Copyright (c) 2012-2024, PostgreSQL Global Development Group
116 : : *
117 : : * IDENTIFICATION
118 : : * src/backend/replication/logical/snapbuild.c
119 : : *
120 : : *-------------------------------------------------------------------------
121 : : */
122 : :
123 : : #include "postgres.h"
124 : :
125 : : #include <sys/stat.h>
126 : : #include <unistd.h>
127 : :
128 : : #include "access/heapam_xlog.h"
129 : : #include "access/transam.h"
130 : : #include "access/xact.h"
131 : : #include "common/file_utils.h"
132 : : #include "miscadmin.h"
133 : : #include "pgstat.h"
134 : : #include "replication/logical.h"
135 : : #include "replication/reorderbuffer.h"
136 : : #include "replication/snapbuild.h"
137 : : #include "storage/fd.h"
138 : : #include "storage/lmgr.h"
139 : : #include "storage/proc.h"
140 : : #include "storage/procarray.h"
141 : : #include "storage/standby.h"
142 : : #include "utils/builtins.h"
143 : : #include "utils/memutils.h"
144 : : #include "utils/snapmgr.h"
145 : : #include "utils/snapshot.h"
146 : :
147 : : /*
148 : : * This struct contains the current state of the snapshot building
149 : : * machinery. Besides a forward declaration in the header, it is not exposed
150 : : * to the public, so we can easily change its contents.
151 : : */
152 : : struct SnapBuild
153 : : {
154 : : /* how far are we along building our first full snapshot */
155 : : SnapBuildState state;
156 : :
157 : : /* private memory context used to allocate memory for this module. */
158 : : MemoryContext context;
159 : :
160 : : /* all transactions < than this have committed/aborted */
161 : : TransactionId xmin;
162 : :
163 : : /* all transactions >= than this are uncommitted */
164 : : TransactionId xmax;
165 : :
166 : : /*
167 : : * Don't replay commits from an LSN < this LSN. This can be set externally
168 : : * but it will also be advanced (never retreat) from within snapbuild.c.
169 : : */
170 : : XLogRecPtr start_decoding_at;
171 : :
172 : : /*
173 : : * LSN at which two-phase decoding was enabled or LSN at which we found a
174 : : * consistent point at the time of slot creation.
175 : : *
176 : : * The prepared transactions, that were skipped because previously
177 : : * two-phase was not enabled or are not covered by initial snapshot, need
178 : : * to be sent later along with commit prepared and they must be before
179 : : * this point.
180 : : */
181 : : XLogRecPtr two_phase_at;
182 : :
183 : : /*
184 : : * Don't start decoding WAL until the "xl_running_xacts" information
185 : : * indicates there are no running xids with an xid smaller than this.
186 : : */
187 : : TransactionId initial_xmin_horizon;
188 : :
189 : : /* Indicates if we are building full snapshot or just catalog one. */
190 : : bool building_full_snapshot;
191 : :
192 : : /*
193 : : * Snapshot that's valid to see the catalog state seen at this moment.
194 : : */
195 : : Snapshot snapshot;
196 : :
197 : : /*
198 : : * LSN of the last location we are sure a snapshot has been serialized to.
199 : : */
200 : : XLogRecPtr last_serialized_snapshot;
201 : :
202 : : /*
203 : : * The reorderbuffer we need to update with usable snapshots et al.
204 : : */
205 : : ReorderBuffer *reorder;
206 : :
207 : : /*
208 : : * TransactionId at which the next phase of initial snapshot building will
209 : : * happen. InvalidTransactionId if not known (i.e. SNAPBUILD_START), or
210 : : * when no next phase necessary (SNAPBUILD_CONSISTENT).
211 : : */
212 : : TransactionId next_phase_at;
213 : :
214 : : /*
215 : : * Array of transactions which could have catalog changes that committed
216 : : * between xmin and xmax.
217 : : */
218 : : struct
219 : : {
220 : : /* number of committed transactions */
221 : : size_t xcnt;
222 : :
223 : : /* available space for committed transactions */
224 : : size_t xcnt_space;
225 : :
226 : : /*
227 : : * Until we reach a CONSISTENT state, we record commits of all
228 : : * transactions, not just the catalog changing ones. Record when that
229 : : * changes so we know we cannot export a snapshot safely anymore.
230 : : */
231 : : bool includes_all_transactions;
232 : :
233 : : /*
234 : : * Array of committed transactions that have modified the catalog.
235 : : *
236 : : * As this array is frequently modified we do *not* keep it in
237 : : * xidComparator order. Instead we sort the array when building &
238 : : * distributing a snapshot.
239 : : *
240 : : * TODO: It's unclear whether that reasoning has much merit. Every
241 : : * time we add something here after becoming consistent will also
242 : : * require distributing a snapshot. Storing them sorted would
243 : : * potentially also make it easier to purge (but more complicated wrt
244 : : * wraparound?). Should be improved if sorting while building the
245 : : * snapshot shows up in profiles.
246 : : */
247 : : TransactionId *xip;
248 : : } committed;
249 : :
250 : : /*
251 : : * Array of transactions and subtransactions that had modified catalogs
252 : : * and were running when the snapshot was serialized.
253 : : *
254 : : * We normally rely on some WAL record types such as HEAP2_NEW_CID to know
255 : : * if the transaction has changed the catalog. But it could happen that
256 : : * the logical decoding decodes only the commit record of the transaction
257 : : * after restoring the previously serialized snapshot in which case we
258 : : * will miss adding the xid to the snapshot and end up looking at the
259 : : * catalogs with the wrong snapshot.
260 : : *
261 : : * Now to avoid the above problem, we serialize the transactions that had
262 : : * modified the catalogs and are still running at the time of snapshot
263 : : * serialization. We fill this array while restoring the snapshot and then
264 : : * refer it while decoding commit to ensure if the xact has modified the
265 : : * catalog. We discard this array when all the xids in the list become old
266 : : * enough to matter. See SnapBuildPurgeOlderTxn for details.
267 : : */
268 : : struct
269 : : {
270 : : /* number of transactions */
271 : : size_t xcnt;
272 : :
273 : : /* This array must be sorted in xidComparator order */
274 : : TransactionId *xip;
275 : : } catchange;
276 : : };
277 : :
278 : : /*
279 : : * Starting a transaction -- which we need to do while exporting a snapshot --
280 : : * removes knowledge about the previously used resowner, so we save it here.
281 : : */
282 : : static ResourceOwner SavedResourceOwnerDuringExport = NULL;
283 : : static bool ExportInProgress = false;
284 : :
285 : : /* ->committed and ->catchange manipulation */
286 : : static void SnapBuildPurgeOlderTxn(SnapBuild *builder);
287 : :
288 : : /* snapshot building/manipulation/distribution functions */
289 : : static Snapshot SnapBuildBuildSnapshot(SnapBuild *builder);
290 : :
291 : : static void SnapBuildFreeSnapshot(Snapshot snap);
292 : :
293 : : static void SnapBuildSnapIncRefcount(Snapshot snap);
294 : :
295 : : static void SnapBuildDistributeNewCatalogSnapshot(SnapBuild *builder, XLogRecPtr lsn);
296 : :
297 : : static inline bool SnapBuildXidHasCatalogChanges(SnapBuild *builder, TransactionId xid,
298 : : uint32 xinfo);
299 : :
300 : : /* xlog reading helper functions for SnapBuildProcessRunningXacts */
301 : : static bool SnapBuildFindSnapshot(SnapBuild *builder, XLogRecPtr lsn, xl_running_xacts *running);
302 : : static void SnapBuildWaitSnapshot(xl_running_xacts *running, TransactionId cutoff);
303 : :
304 : : /* serialization functions */
305 : : static void SnapBuildSerialize(SnapBuild *builder, XLogRecPtr lsn);
306 : : static bool SnapBuildRestore(SnapBuild *builder, XLogRecPtr lsn);
307 : : static void SnapBuildRestoreContents(int fd, char *dest, Size size, const char *path);
308 : :
309 : : /*
310 : : * Allocate a new snapshot builder.
311 : : *
312 : : * xmin_horizon is the xid >= which we can be sure no catalog rows have been
313 : : * removed, start_lsn is the LSN >= we want to replay commits.
314 : : */
315 : : SnapBuild *
3695 rhaas@postgresql.org 316 :CBC 951 : AllocateSnapshotBuilder(ReorderBuffer *reorder,
317 : : TransactionId xmin_horizon,
318 : : XLogRecPtr start_lsn,
319 : : bool need_full_snapshot,
320 : : XLogRecPtr two_phase_at)
321 : : {
322 : : MemoryContext context;
323 : : MemoryContext oldcontext;
324 : : SnapBuild *builder;
325 : :
326 : : /* allocate memory in own context, to have better accountability */
327 : 951 : context = AllocSetContextCreate(CurrentMemoryContext,
328 : : "snapshot builder context",
329 : : ALLOCSET_DEFAULT_SIZES);
330 : 951 : oldcontext = MemoryContextSwitchTo(context);
331 : :
332 : 951 : builder = palloc0(sizeof(SnapBuild));
333 : :
334 : 951 : builder->state = SNAPBUILD_START;
335 : 951 : builder->context = context;
336 : 951 : builder->reorder = reorder;
337 : : /* Other struct members initialized by zeroing via palloc0 above */
338 : :
339 : 951 : builder->committed.xcnt = 0;
2489 tgl@sss.pgh.pa.us 340 : 951 : builder->committed.xcnt_space = 128; /* arbitrary number */
3695 rhaas@postgresql.org 341 : 951 : builder->committed.xip =
342 : 951 : palloc0(builder->committed.xcnt_space * sizeof(TransactionId));
343 : 951 : builder->committed.includes_all_transactions = true;
344 : :
612 akapila@postgresql.o 345 : 951 : builder->catchange.xcnt = 0;
346 : 951 : builder->catchange.xip = NULL;
347 : :
3695 rhaas@postgresql.org 348 : 951 : builder->initial_xmin_horizon = xmin_horizon;
3601 andres@anarazel.de 349 : 951 : builder->start_decoding_at = start_lsn;
2544 350 : 951 : builder->building_full_snapshot = need_full_snapshot;
1005 akapila@postgresql.o 351 : 951 : builder->two_phase_at = two_phase_at;
352 : :
3695 rhaas@postgresql.org 353 : 951 : MemoryContextSwitchTo(oldcontext);
354 : :
355 : 951 : return builder;
356 : : }
357 : :
358 : : /*
359 : : * Free a snapshot builder.
360 : : */
361 : : void
362 : 764 : FreeSnapshotBuilder(SnapBuild *builder)
363 : : {
364 : 764 : MemoryContext context = builder->context;
365 : :
366 : : /* free snapshot explicitly, that contains some error checking */
367 [ + + ]: 764 : if (builder->snapshot != NULL)
368 : : {
369 : 193 : SnapBuildSnapDecRefcount(builder->snapshot);
370 : 193 : builder->snapshot = NULL;
371 : : }
372 : :
373 : : /* other resources are deallocated via memory context reset */
374 : 764 : MemoryContextDelete(context);
375 : 764 : }
376 : :
377 : : /*
378 : : * Free an unreferenced snapshot that has previously been built by us.
379 : : */
380 : : static void
381 : 1234 : SnapBuildFreeSnapshot(Snapshot snap)
382 : : {
383 : : /* make sure we don't get passed an external snapshot */
1910 andres@anarazel.de 384 [ - + ]: 1234 : Assert(snap->snapshot_type == SNAPSHOT_HISTORIC_MVCC);
385 : :
386 : : /* make sure nobody modified our snapshot */
3695 rhaas@postgresql.org 387 [ - + ]: 1234 : Assert(snap->curcid == FirstCommandId);
388 [ - + ]: 1234 : Assert(!snap->suboverflowed);
389 [ - + ]: 1234 : Assert(!snap->takenDuringRecovery);
3286 heikki.linnakangas@i 390 [ - + ]: 1234 : Assert(snap->regd_count == 0);
391 : :
392 : : /* slightly more likely, so it's checked even without c-asserts */
3695 rhaas@postgresql.org 393 [ - + ]: 1234 : if (snap->copied)
3695 rhaas@postgresql.org 394 [ # # ]:UBC 0 : elog(ERROR, "cannot free a copied snapshot");
395 : :
3695 rhaas@postgresql.org 396 [ - + ]:CBC 1234 : if (snap->active_count)
3695 rhaas@postgresql.org 397 [ # # ]:UBC 0 : elog(ERROR, "cannot free an active snapshot");
398 : :
3695 rhaas@postgresql.org 399 :CBC 1234 : pfree(snap);
400 : 1234 : }
401 : :
402 : : /*
403 : : * In which state of snapshot building are we?
404 : : */
405 : : SnapBuildState
406 : 2012918 : SnapBuildCurrentState(SnapBuild *builder)
407 : : {
408 : 2012918 : return builder->state;
409 : : }
410 : :
411 : : /*
412 : : * Return the LSN at which the two-phase decoding was first enabled.
413 : : */
414 : : XLogRecPtr
1005 akapila@postgresql.o 415 : 29 : SnapBuildGetTwoPhaseAt(SnapBuild *builder)
416 : : {
417 : 29 : return builder->two_phase_at;
418 : : }
419 : :
420 : : /*
421 : : * Set the LSN at which two-phase decoding is enabled.
422 : : */
423 : : void
424 : 5 : SnapBuildSetTwoPhaseAt(SnapBuild *builder, XLogRecPtr ptr)
425 : : {
426 : 5 : builder->two_phase_at = ptr;
1140 427 : 5 : }
428 : :
429 : : /*
430 : : * Should the contents of transaction ending at 'ptr' be decoded?
431 : : */
432 : : bool
3695 rhaas@postgresql.org 433 : 311120 : SnapBuildXactNeedsSkip(SnapBuild *builder, XLogRecPtr ptr)
434 : : {
3601 andres@anarazel.de 435 : 311120 : return ptr < builder->start_decoding_at;
436 : : }
437 : :
438 : : /*
439 : : * Increase refcount of a snapshot.
440 : : *
441 : : * This is used when handing out a snapshot to some external resource or when
442 : : * adding a Snapshot as builder->snapshot.
443 : : */
444 : : static void
3695 rhaas@postgresql.org 445 : 5385 : SnapBuildSnapIncRefcount(Snapshot snap)
446 : : {
447 : 5385 : snap->active_count++;
448 : 5385 : }
449 : :
450 : : /*
451 : : * Decrease refcount of a snapshot and free if the refcount reaches zero.
452 : : *
453 : : * Externally visible, so that external resources that have been handed an
454 : : * IncRef'ed Snapshot can adjust its refcount easily.
455 : : */
456 : : void
457 : 5181 : SnapBuildSnapDecRefcount(Snapshot snap)
458 : : {
459 : : /* make sure we don't get passed an external snapshot */
1910 andres@anarazel.de 460 [ - + ]: 5181 : Assert(snap->snapshot_type == SNAPSHOT_HISTORIC_MVCC);
461 : :
462 : : /* make sure nobody modified our snapshot */
3695 rhaas@postgresql.org 463 [ - + ]: 5181 : Assert(snap->curcid == FirstCommandId);
464 [ - + ]: 5181 : Assert(!snap->suboverflowed);
465 [ - + ]: 5181 : Assert(!snap->takenDuringRecovery);
466 : :
3286 heikki.linnakangas@i 467 [ - + ]: 5181 : Assert(snap->regd_count == 0);
468 : :
469 [ - + ]: 5181 : Assert(snap->active_count > 0);
470 : :
471 : : /* slightly more likely, so it's checked even without casserts */
3695 rhaas@postgresql.org 472 [ - + ]: 5181 : if (snap->copied)
3695 rhaas@postgresql.org 473 [ # # ]:UBC 0 : elog(ERROR, "cannot free a copied snapshot");
474 : :
3695 rhaas@postgresql.org 475 :CBC 5181 : snap->active_count--;
3286 heikki.linnakangas@i 476 [ + + ]: 5181 : if (snap->active_count == 0)
3695 rhaas@postgresql.org 477 : 1234 : SnapBuildFreeSnapshot(snap);
478 : 5181 : }
479 : :
480 : : /*
481 : : * Build a new snapshot, based on currently committed catalog-modifying
482 : : * transactions.
483 : : *
484 : : * In-progress transactions with catalog access are *not* allowed to modify
485 : : * these snapshots; they have to copy them and fill in appropriate ->curcid
486 : : * and ->subxip/subxcnt values.
487 : : */
488 : : static Snapshot
2496 andres@anarazel.de 489 : 1593 : SnapBuildBuildSnapshot(SnapBuild *builder)
490 : : {
491 : : Snapshot snapshot;
492 : : Size ssize;
493 : :
3695 rhaas@postgresql.org 494 [ - + ]: 1593 : Assert(builder->state >= SNAPBUILD_FULL_SNAPSHOT);
495 : :
496 : 1593 : ssize = sizeof(SnapshotData)
497 : 1593 : + sizeof(TransactionId) * builder->committed.xcnt
498 : 1593 : + sizeof(TransactionId) * 1 /* toplevel xid */ ;
499 : :
500 : 1593 : snapshot = MemoryContextAllocZero(builder->context, ssize);
501 : :
1910 andres@anarazel.de 502 : 1593 : snapshot->snapshot_type = SNAPSHOT_HISTORIC_MVCC;
503 : :
504 : : /*
505 : : * We misuse the original meaning of SnapshotData's xip and subxip fields
506 : : * to make the more fitting for our needs.
507 : : *
508 : : * In the 'xip' array we store transactions that have to be treated as
509 : : * committed. Since we will only ever look at tuples from transactions
510 : : * that have modified the catalog it's more efficient to store those few
511 : : * that exist between xmin and xmax (frequently there are none).
512 : : *
513 : : * Snapshots that are used in transactions that have modified the catalog
514 : : * also use the 'subxip' array to store their toplevel xid and all the
515 : : * subtransaction xids so we can recognize when we need to treat rows as
516 : : * visible that are not in xip but still need to be visible. Subxip only
517 : : * gets filled when the transaction is copied into the context of a
518 : : * catalog modifying transaction since we otherwise share a snapshot
519 : : * between transactions. As long as a txn hasn't modified the catalog it
520 : : * doesn't need to treat any uncommitted rows as visible, so there is no
521 : : * need for those xids.
522 : : *
523 : : * Both arrays are qsort'ed so that we can use bsearch() on them.
524 : : */
3695 rhaas@postgresql.org 525 [ - + ]: 1593 : Assert(TransactionIdIsNormal(builder->xmin));
526 [ - + ]: 1593 : Assert(TransactionIdIsNormal(builder->xmax));
527 : :
528 : 1593 : snapshot->xmin = builder->xmin;
529 : 1593 : snapshot->xmax = builder->xmax;
530 : :
531 : : /* store all transactions to be treated as committed by this snapshot */
532 : 1593 : snapshot->xip =
533 : 1593 : (TransactionId *) ((char *) snapshot + sizeof(SnapshotData));
534 : 1593 : snapshot->xcnt = builder->committed.xcnt;
535 : 1593 : memcpy(snapshot->xip,
536 : 1593 : builder->committed.xip,
537 : 1593 : builder->committed.xcnt * sizeof(TransactionId));
538 : :
539 : : /* sort so we can bsearch() */
540 : 1593 : qsort(snapshot->xip, snapshot->xcnt, sizeof(TransactionId), xidComparator);
541 : :
542 : : /*
543 : : * Initially, subxip is empty, i.e. it's a snapshot to be used by
544 : : * transactions that don't modify the catalog. Will be filled by
545 : : * ReorderBufferCopySnap() if necessary.
546 : : */
547 : 1593 : snapshot->subxcnt = 0;
548 : 1593 : snapshot->subxip = NULL;
549 : :
550 : 1593 : snapshot->suboverflowed = false;
551 : 1593 : snapshot->takenDuringRecovery = false;
552 : 1593 : snapshot->copied = false;
553 : 1593 : snapshot->curcid = FirstCommandId;
554 : 1593 : snapshot->active_count = 0;
3286 heikki.linnakangas@i 555 : 1593 : snapshot->regd_count = 0;
1336 andres@anarazel.de 556 : 1593 : snapshot->snapXactCompletionCount = 0;
557 : :
3695 rhaas@postgresql.org 558 : 1593 : return snapshot;
559 : : }
560 : :
561 : : /*
562 : : * Build the initial slot snapshot and convert it to a normal snapshot that
563 : : * is understood by HeapTupleSatisfiesMVCC.
564 : : *
565 : : * The snapshot will be usable directly in current transaction or exported
566 : : * for loading in different transaction.
567 : : */
568 : : Snapshot
2576 tgl@sss.pgh.pa.us 569 : 169 : SnapBuildInitialSnapshot(SnapBuild *builder)
570 : : {
571 : : Snapshot snap;
572 : : TransactionId xid;
573 : : TransactionId safeXid;
574 : : TransactionId *newxip;
3695 rhaas@postgresql.org 575 : 169 : int newxcnt = 0;
576 : :
2576 tgl@sss.pgh.pa.us 577 [ - + ]: 169 : Assert(XactIsoLevel == XACT_REPEATABLE_READ);
510 akapila@postgresql.o 578 [ - + ]: 169 : Assert(builder->building_full_snapshot);
579 : :
580 : : /* don't allow older snapshots */
331 tgl@sss.pgh.pa.us 581 : 169 : InvalidateCatalogSnapshot(); /* about to overwrite MyProc->xmin */
510 akapila@postgresql.o 582 [ - + ]: 169 : if (HaveRegisteredOrActiveSnapshot())
510 akapila@postgresql.o 583 [ # # ]:UBC 0 : elog(ERROR, "cannot build an initial slot snapshot when snapshots exist");
510 akapila@postgresql.o 584 [ - + ]:CBC 169 : Assert(!HistoricSnapshotActive());
585 : :
3695 rhaas@postgresql.org 586 [ - + ]: 169 : if (builder->state != SNAPBUILD_CONSISTENT)
2579 peter_e@gmx.net 587 [ # # ]:UBC 0 : elog(ERROR, "cannot build an initial slot snapshot before reaching a consistent state");
588 : :
3695 rhaas@postgresql.org 589 [ - + ]:CBC 169 : if (!builder->committed.includes_all_transactions)
2579 peter_e@gmx.net 590 [ # # ]:UBC 0 : elog(ERROR, "cannot build an initial slot snapshot, not all transactions are monitored anymore");
591 : :
592 : : /* so we don't overwrite the existing value */
1340 andres@anarazel.de 593 [ - + ]:CBC 169 : if (TransactionIdIsValid(MyProc->xmin))
1340 andres@anarazel.de 594 [ # # ]:UBC 0 : elog(ERROR, "cannot build an initial slot snapshot when MyProc->xmin already is valid");
595 : :
2496 andres@anarazel.de 596 :CBC 169 : snap = SnapBuildBuildSnapshot(builder);
597 : :
598 : : /*
599 : : * We know that snap->xmin is alive, enforced by the logical xmin
600 : : * mechanism. Due to that we can do this without locks, we're only
601 : : * changing our own value.
602 : : *
603 : : * Building an initial snapshot is expensive and an unenforced xmin
604 : : * horizon would have bad consequences, therefore always double-check that
605 : : * the horizon is enforced.
606 : : */
510 akapila@postgresql.o 607 : 169 : LWLockAcquire(ProcArrayLock, LW_SHARED);
608 : 169 : safeXid = GetOldestSafeDecodingTransactionId(false);
609 : 169 : LWLockRelease(ProcArrayLock);
610 : :
611 [ - + ]: 169 : if (TransactionIdFollows(safeXid, snap->xmin))
510 akapila@postgresql.o 612 [ # # ]:UBC 0 : elog(ERROR, "cannot build an initial slot snapshot as oldest safe xid %u follows snapshot's xmin %u",
613 : : safeXid, snap->xmin);
614 : :
1340 andres@anarazel.de 615 :CBC 169 : MyProc->xmin = snap->xmin;
616 : :
617 : : /* allocate in transaction context */
618 : : newxip = (TransactionId *)
3695 rhaas@postgresql.org 619 : 169 : palloc(sizeof(TransactionId) * GetMaxSnapshotXidCount());
620 : :
621 : : /*
622 : : * snapbuild.c builds transactions in an "inverted" manner, which means it
623 : : * stores committed transactions in ->xip, not ones in progress. Build a
624 : : * classical snapshot by marking all non-committed transactions as
625 : : * in-progress. This can be expensive.
626 : : */
627 [ + - - + : 169 : for (xid = snap->xmin; NormalTransactionIdPrecedes(xid, snap->xmax);)
- + ]
628 : : {
629 : : void *test;
630 : :
631 : : /*
632 : : * Check whether transaction committed using the decoding snapshot
633 : : * meaning of ->xip.
634 : : */
3695 rhaas@postgresql.org 635 :UBC 0 : test = bsearch(&xid, snap->xip, snap->xcnt,
636 : : sizeof(TransactionId), xidComparator);
637 : :
638 [ # # ]: 0 : if (test == NULL)
639 : : {
640 [ # # ]: 0 : if (newxcnt >= GetMaxSnapshotXidCount())
2579 peter_e@gmx.net 641 [ # # ]: 0 : ereport(ERROR,
642 : : (errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),
643 : : errmsg("initial slot snapshot too large")));
644 : :
3695 rhaas@postgresql.org 645 : 0 : newxip[newxcnt++] = xid;
646 : : }
647 : :
648 [ # # ]: 0 : TransactionIdAdvance(xid);
649 : : }
650 : :
651 : : /* adjust remaining snapshot fields as needed */
1880 michael@paquier.xyz 652 :CBC 169 : snap->snapshot_type = SNAPSHOT_MVCC;
3695 rhaas@postgresql.org 653 : 169 : snap->xcnt = newxcnt;
654 : 169 : snap->xip = newxip;
655 : :
2579 peter_e@gmx.net 656 : 169 : return snap;
657 : : }
658 : :
659 : : /*
660 : : * Export a snapshot so it can be set in another session with SET TRANSACTION
661 : : * SNAPSHOT.
662 : : *
663 : : * For that we need to start a transaction in the current backend as the
664 : : * importing side checks whether the source transaction is still open to make
665 : : * sure the xmin horizon hasn't advanced since then.
666 : : */
667 : : const char *
2579 peter_e@gmx.net 668 :UBC 0 : SnapBuildExportSnapshot(SnapBuild *builder)
669 : : {
670 : : Snapshot snap;
671 : : char *snapname;
672 : :
673 [ # # ]: 0 : if (IsTransactionOrTransactionBlock())
674 [ # # ]: 0 : elog(ERROR, "cannot export a snapshot from within a transaction");
675 : :
676 [ # # ]: 0 : if (SavedResourceOwnerDuringExport)
677 [ # # ]: 0 : elog(ERROR, "can only export one snapshot at a time");
678 : :
679 : 0 : SavedResourceOwnerDuringExport = CurrentResourceOwner;
680 : 0 : ExportInProgress = true;
681 : :
682 : 0 : StartTransactionCommand();
683 : :
684 : : /* There doesn't seem to a nice API to set these */
685 : 0 : XactIsoLevel = XACT_REPEATABLE_READ;
686 : 0 : XactReadOnly = true;
687 : :
2576 tgl@sss.pgh.pa.us 688 : 0 : snap = SnapBuildInitialSnapshot(builder);
689 : :
690 : : /*
691 : : * now that we've built a plain snapshot, make it active and use the
692 : : * normal mechanisms for exporting it
693 : : */
3695 rhaas@postgresql.org 694 : 0 : snapname = ExportSnapshot(snap);
695 : :
696 [ # # ]: 0 : ereport(LOG,
697 : : (errmsg_plural("exported logical decoding snapshot: \"%s\" with %u transaction ID",
698 : : "exported logical decoding snapshot: \"%s\" with %u transaction IDs",
699 : : snap->xcnt,
700 : : snapname, snap->xcnt)));
701 : 0 : return snapname;
702 : : }
703 : :
704 : : /*
705 : : * Ensure there is a snapshot and if not build one for current transaction.
706 : : */
707 : : Snapshot
566 akapila@postgresql.o 708 :CBC 8 : SnapBuildGetOrBuildSnapshot(SnapBuild *builder)
709 : : {
2930 simon@2ndQuadrant.co 710 [ - + ]: 8 : Assert(builder->state == SNAPBUILD_CONSISTENT);
711 : :
712 : : /* only build a new snapshot if we don't have a prebuilt one */
713 [ + + ]: 8 : if (builder->snapshot == NULL)
714 : : {
2496 andres@anarazel.de 715 :GBC 1 : builder->snapshot = SnapBuildBuildSnapshot(builder);
716 : : /* increase refcount for the snapshot builder */
2930 simon@2ndQuadrant.co 717 : 1 : SnapBuildSnapIncRefcount(builder->snapshot);
718 : : }
719 : :
2930 simon@2ndQuadrant.co 720 :CBC 8 : return builder->snapshot;
721 : : }
722 : :
723 : : /*
724 : : * Reset a previously SnapBuildExportSnapshot()'ed snapshot if there is
725 : : * any. Aborts the previously started transaction and resets the resource
726 : : * owner back to its original value.
727 : : */
728 : : void
3165 andres@anarazel.de 729 : 4618 : SnapBuildClearExportedSnapshot(void)
730 : : {
731 : : ResourceOwner tmpResOwner;
732 : :
733 : : /* nothing exported, that is the usual case */
3695 rhaas@postgresql.org 734 [ + - ]: 4618 : if (!ExportInProgress)
735 : 4618 : return;
736 : :
3695 rhaas@postgresql.org 737 [ # # ]:UBC 0 : if (!IsTransactionState())
738 [ # # ]: 0 : elog(ERROR, "clearing exported snapshot in wrong transaction state");
739 : :
740 : : /*
741 : : * AbortCurrentTransaction() takes care of resetting the snapshot state,
742 : : * so remember SavedResourceOwnerDuringExport.
743 : : */
909 michael@paquier.xyz 744 : 0 : tmpResOwner = SavedResourceOwnerDuringExport;
745 : :
746 : : /* make sure nothing could have ever happened */
3695 rhaas@postgresql.org 747 : 0 : AbortCurrentTransaction();
748 : :
909 michael@paquier.xyz 749 : 0 : CurrentResourceOwner = tmpResOwner;
750 : : }
751 : :
752 : : /*
753 : : * Clear snapshot export state during transaction abort.
754 : : */
755 : : void
909 michael@paquier.xyz 756 :CBC 22815 : SnapBuildResetExportedSnapshotState(void)
757 : : {
3695 rhaas@postgresql.org 758 : 22815 : SavedResourceOwnerDuringExport = NULL;
759 : 22815 : ExportInProgress = false;
760 : 22815 : }
761 : :
762 : : /*
763 : : * Handle the effects of a single heap change, appropriate to the current state
764 : : * of the snapshot builder and returns whether changes made at (xid, lsn) can
765 : : * be decoded.
766 : : */
767 : : bool
768 : 1516276 : SnapBuildProcessChange(SnapBuild *builder, TransactionId xid, XLogRecPtr lsn)
769 : : {
770 : : /*
771 : : * We can't handle data in transactions if we haven't built a snapshot
772 : : * yet, so don't store them.
773 : : */
774 [ - + ]: 1516276 : if (builder->state < SNAPBUILD_FULL_SNAPSHOT)
3695 rhaas@postgresql.org 775 :UBC 0 : return false;
776 : :
777 : : /*
778 : : * No point in keeping track of changes in transactions that we don't have
779 : : * enough information about to decode. This means that they started before
780 : : * we got into the SNAPBUILD_FULL_SNAPSHOT state.
781 : : */
3695 rhaas@postgresql.org 782 [ + + - + ]:CBC 1516279 : if (builder->state < SNAPBUILD_CONSISTENT &&
1154 andres@anarazel.de 783 : 3 : TransactionIdPrecedes(xid, builder->next_phase_at))
3695 rhaas@postgresql.org 784 :UBC 0 : return false;
785 : :
786 : : /*
787 : : * If the reorderbuffer doesn't yet have a snapshot, add one now, it will
788 : : * be needed to decode the change we're currently processing.
789 : : */
2962 andres@anarazel.de 790 [ + + ]:CBC 1516276 : if (!ReorderBufferXidHasBaseSnapshot(builder->reorder, xid))
791 : : {
792 : : /* only build a new snapshot if we don't have a prebuilt one */
3695 rhaas@postgresql.org 793 [ + + ]: 2846 : if (builder->snapshot == NULL)
794 : : {
2496 andres@anarazel.de 795 : 334 : builder->snapshot = SnapBuildBuildSnapshot(builder);
796 : : /* increase refcount for the snapshot builder */
3695 rhaas@postgresql.org 797 : 334 : SnapBuildSnapIncRefcount(builder->snapshot);
798 : : }
799 : :
800 : : /*
801 : : * Increase refcount for the transaction we're handing the snapshot
802 : : * out to.
803 : : */
804 : 2846 : SnapBuildSnapIncRefcount(builder->snapshot);
805 : 2846 : ReorderBufferSetBaseSnapshot(builder->reorder, xid, lsn,
806 : : builder->snapshot);
807 : : }
808 : :
809 : 1516276 : return true;
810 : : }
811 : :
812 : : /*
813 : : * Do CommandId/combo CID handling after reading an xl_heap_new_cid record.
814 : : * This implies that a transaction has done some form of write to system
815 : : * catalogs.
816 : : */
817 : : void
818 : 22525 : SnapBuildProcessNewCid(SnapBuild *builder, TransactionId xid,
819 : : XLogRecPtr lsn, xl_heap_new_cid *xlrec)
820 : : {
821 : : CommandId cid;
822 : :
823 : : /*
824 : : * we only log new_cid's if a catalog tuple was modified, so mark the
825 : : * transaction as containing catalog modifications
826 : : */
3631 bruce@momjian.us 827 : 22525 : ReorderBufferXidSetCatalogChanges(builder->reorder, xid, lsn);
828 : :
3695 rhaas@postgresql.org 829 : 22525 : ReorderBufferAddNewTupleCids(builder->reorder, xlrec->top_xid, lsn,
830 : : xlrec->target_locator, xlrec->target_tid,
831 : : xlrec->cmin, xlrec->cmax,
832 : : xlrec->combocid);
833 : :
834 : : /* figure out new command id */
835 [ + + ]: 22525 : if (xlrec->cmin != InvalidCommandId &&
836 [ + + ]: 19203 : xlrec->cmax != InvalidCommandId)
837 : 3042 : cid = Max(xlrec->cmin, xlrec->cmax);
838 [ + + ]: 19483 : else if (xlrec->cmax != InvalidCommandId)
839 : 3322 : cid = xlrec->cmax;
840 [ + - ]: 16161 : else if (xlrec->cmin != InvalidCommandId)
841 : 16161 : cid = xlrec->cmin;
842 : : else
843 : : {
3631 bruce@momjian.us 844 :UBC 0 : cid = InvalidCommandId; /* silence compiler */
3695 rhaas@postgresql.org 845 [ # # ]: 0 : elog(ERROR, "xl_heap_new_cid record without a valid CommandId");
846 : : }
847 : :
3695 rhaas@postgresql.org 848 :CBC 22525 : ReorderBufferAddNewCommandId(builder->reorder, xid, lsn, cid + 1);
849 : 22525 : }
850 : :
851 : : /*
852 : : * Add a new Snapshot to all transactions we're decoding that currently are
853 : : * in-progress so they can see new catalog contents made by the transaction
854 : : * that just committed. This is necessary because those in-progress
855 : : * transactions will use the new catalog's contents from here on (at the very
856 : : * least everything they do needs to be compatible with newer catalog
857 : : * contents).
858 : : */
859 : : static void
860 : 1082 : SnapBuildDistributeNewCatalogSnapshot(SnapBuild *builder, XLogRecPtr lsn)
861 : : {
862 : : dlist_iter txn_i;
863 : : ReorderBufferTXN *txn;
864 : :
865 : : /*
866 : : * Iterate through all toplevel transactions. This can include
867 : : * subtransactions which we just don't yet know to be that, but that's
868 : : * fine, they will just get an unnecessary snapshot queued.
869 : : */
870 [ + - + + ]: 2201 : dlist_foreach(txn_i, &builder->reorder->toplevel_by_lsn)
871 : : {
872 : 1119 : txn = dlist_container(ReorderBufferTXN, node, txn_i.cur);
873 : :
874 [ - + ]: 1119 : Assert(TransactionIdIsValid(txn->xid));
875 : :
876 : : /*
877 : : * If we don't have a base snapshot yet, there are no changes in this
878 : : * transaction which in turn implies we don't yet need a snapshot at
879 : : * all. We'll add a snapshot when the first change gets queued.
880 : : *
881 : : * NB: This works correctly even for subtransactions because
882 : : * ReorderBufferAssignChild() takes care to transfer the base snapshot
883 : : * to the top-level transaction, and while iterating the changequeue
884 : : * we'll get the change from the subtxn.
885 : : */
886 [ + + ]: 1119 : if (!ReorderBufferXidHasBaseSnapshot(builder->reorder, txn->xid))
887 : 2 : continue;
888 : :
889 : : /*
890 : : * We don't need to add snapshot to prepared transactions as they
891 : : * should not see the new catalog contents.
892 : : */
1196 akapila@postgresql.o 893 [ + - + + ]: 1117 : if (rbtxn_prepared(txn) || rbtxn_skip_prepared(txn))
894 : 22 : continue;
895 : :
3695 rhaas@postgresql.org 896 [ + + ]: 1095 : elog(DEBUG2, "adding a new snapshot to %u at %X/%X",
897 : : txn->xid, LSN_FORMAT_ARGS(lsn));
898 : :
899 : : /*
900 : : * increase the snapshot's refcount for the transaction we are handing
901 : : * it out to
902 : : */
903 : 1095 : SnapBuildSnapIncRefcount(builder->snapshot);
904 : 1095 : ReorderBufferAddSnapshot(builder->reorder, txn->xid, lsn,
905 : : builder->snapshot);
906 : : }
907 : 1082 : }
908 : :
909 : : /*
910 : : * Keep track of a new catalog changing transaction that has committed.
911 : : */
912 : : static void
913 : 1091 : SnapBuildAddCommittedTxn(SnapBuild *builder, TransactionId xid)
914 : : {
915 [ - + ]: 1091 : Assert(TransactionIdIsValid(xid));
916 : :
917 [ - + ]: 1091 : if (builder->committed.xcnt == builder->committed.xcnt_space)
918 : : {
3695 rhaas@postgresql.org 919 :LBC (1) : builder->committed.xcnt_space = builder->committed.xcnt_space * 2 + 1;
920 : :
921 [ # # ]: (1) : elog(DEBUG1, "increasing space for committed transactions to %u",
922 : : (uint32) builder->committed.xcnt_space);
923 : :
924 : (1) : builder->committed.xip = repalloc(builder->committed.xip,
2489 tgl@sss.pgh.pa.us 925 : (1) : builder->committed.xcnt_space * sizeof(TransactionId));
926 : : }
927 : :
928 : : /*
929 : : * TODO: It might make sense to keep the array sorted here instead of
930 : : * doing it every time we build a new snapshot. On the other hand this
931 : : * gets called repeatedly when a transaction with subtransactions commits.
932 : : */
3695 rhaas@postgresql.org 933 :CBC 1091 : builder->committed.xip[builder->committed.xcnt++] = xid;
934 : 1091 : }
935 : :
936 : : /*
937 : : * Remove knowledge about transactions we treat as committed or containing catalog
938 : : * changes that are smaller than ->xmin. Those won't ever get checked via
939 : : * the ->committed or ->catchange array, respectively. The committed xids will
940 : : * get checked via the clog machinery.
941 : : *
942 : : * We can ideally remove the transaction from catchange array once it is
943 : : * finished (committed/aborted) but that could be costly as we need to maintain
944 : : * the xids order in the array.
945 : : */
946 : : static void
612 akapila@postgresql.o 947 : 409 : SnapBuildPurgeOlderTxn(SnapBuild *builder)
948 : : {
949 : : int off;
950 : : TransactionId *workspace;
3695 rhaas@postgresql.org 951 : 409 : int surviving_xids = 0;
952 : :
953 : : /* not ready yet */
954 [ - + ]: 409 : if (!TransactionIdIsNormal(builder->xmin))
3695 rhaas@postgresql.org 955 :UBC 0 : return;
956 : :
957 : : /* TODO: Neater algorithm than just copying and iterating? */
958 : : workspace =
3695 rhaas@postgresql.org 959 :CBC 409 : MemoryContextAlloc(builder->context,
960 : 409 : builder->committed.xcnt * sizeof(TransactionId));
961 : :
962 : : /* copy xids that still are interesting to workspace */
963 [ + + ]: 636 : for (off = 0; off < builder->committed.xcnt; off++)
964 : : {
965 [ + - - + : 227 : if (NormalTransactionIdPrecedes(builder->committed.xip[off],
- + ]
966 : : builder->xmin))
967 : : ; /* remove */
968 : : else
3695 rhaas@postgresql.org 969 :LBC (1) : workspace[surviving_xids++] = builder->committed.xip[off];
970 : : }
971 : :
972 : : /* copy workspace back to persistent state */
3695 rhaas@postgresql.org 973 :CBC 409 : memcpy(builder->committed.xip, workspace,
974 : : surviving_xids * sizeof(TransactionId));
975 : :
976 [ - + ]: 409 : elog(DEBUG3, "purged committed transactions from %u to %u, xmin: %u, xmax: %u",
977 : : (uint32) builder->committed.xcnt, (uint32) surviving_xids,
978 : : builder->xmin, builder->xmax);
979 : 409 : builder->committed.xcnt = surviving_xids;
980 : :
981 : 409 : pfree(workspace);
982 : :
983 : : /*
984 : : * Purge xids in ->catchange as well. The purged array must also be sorted
985 : : * in xidComparator order.
986 : : */
594 akapila@postgresql.o 987 [ + + ]: 409 : if (builder->catchange.xcnt > 0)
988 : : {
989 : : /*
990 : : * Since catchange.xip is sorted, we find the lower bound of xids that
991 : : * are still interesting.
992 : : */
993 [ + + ]: 9 : for (off = 0; off < builder->catchange.xcnt; off++)
994 : : {
995 [ + + ]: 6 : if (TransactionIdFollowsOrEquals(builder->catchange.xip[off],
996 : : builder->xmin))
997 : 1 : break;
998 : : }
999 : :
1000 : 4 : surviving_xids = builder->catchange.xcnt - off;
1001 : :
1002 [ + + ]: 4 : if (surviving_xids > 0)
1003 : : {
1004 : 1 : memmove(builder->catchange.xip, &(builder->catchange.xip[off]),
1005 : : surviving_xids * sizeof(TransactionId));
1006 : : }
1007 : : else
1008 : : {
1009 : 3 : pfree(builder->catchange.xip);
1010 : 3 : builder->catchange.xip = NULL;
1011 : : }
1012 : :
1013 [ - + ]: 4 : elog(DEBUG3, "purged catalog modifying transactions from %u to %u, xmin: %u, xmax: %u",
1014 : : (uint32) builder->catchange.xcnt, (uint32) surviving_xids,
1015 : : builder->xmin, builder->xmax);
1016 : 4 : builder->catchange.xcnt = surviving_xids;
1017 : : }
1018 : : }
1019 : :
1020 : : /*
1021 : : * Handle everything that needs to be done when a transaction commits
1022 : : */
1023 : : void
3695 rhaas@postgresql.org 1024 : 2811 : SnapBuildCommitTxn(SnapBuild *builder, XLogRecPtr lsn, TransactionId xid,
1025 : : int nsubxacts, TransactionId *subxacts, uint32 xinfo)
1026 : : {
1027 : : int nxact;
1028 : :
2528 andres@anarazel.de 1029 : 2811 : bool needs_snapshot = false;
1030 : 2811 : bool needs_timetravel = false;
3695 rhaas@postgresql.org 1031 : 2811 : bool sub_needs_timetravel = false;
1032 : :
1033 : 2811 : TransactionId xmax = xid;
1034 : :
1035 : : /*
1036 : : * Transactions preceding BUILDING_SNAPSHOT will neither be decoded, nor
1037 : : * will they be part of a snapshot. So we don't need to record anything.
1038 : : */
2528 andres@anarazel.de 1039 [ + - ]: 2811 : if (builder->state == SNAPBUILD_START ||
1040 [ - + - - ]: 2811 : (builder->state == SNAPBUILD_BUILDING_SNAPSHOT &&
1154 andres@anarazel.de 1041 :UBC 0 : TransactionIdPrecedes(xid, builder->next_phase_at)))
1042 : : {
1043 : : /* ensure that only commits after this are getting replayed */
2528 1044 [ # # ]: 0 : if (builder->start_decoding_at <= lsn)
1045 : 0 : builder->start_decoding_at = lsn + 1;
1046 : 0 : return;
1047 : : }
1048 : :
3695 rhaas@postgresql.org 1049 [ + + ]:CBC 2811 : if (builder->state < SNAPBUILD_CONSISTENT)
1050 : : {
1051 : : /* ensure that only commits after this are getting replayed */
3601 andres@anarazel.de 1052 [ + + ]: 5 : if (builder->start_decoding_at <= lsn)
1053 : 2 : builder->start_decoding_at = lsn + 1;
1054 : :
1055 : : /*
1056 : : * If building an exportable snapshot, force xid to be tracked, even
1057 : : * if the transaction didn't modify the catalog.
1058 : : */
2528 1059 [ - + ]: 5 : if (builder->building_full_snapshot)
1060 : : {
2528 andres@anarazel.de 1061 :UBC 0 : needs_timetravel = true;
1062 : : }
1063 : : }
1064 : :
3695 rhaas@postgresql.org 1065 [ + + ]:CBC 4059 : for (nxact = 0; nxact < nsubxacts; nxact++)
1066 : : {
1067 : 1248 : TransactionId subxid = subxacts[nxact];
1068 : :
1069 : : /*
1070 : : * Add subtransaction to base snapshot if catalog modifying, we don't
1071 : : * distinguish to toplevel transactions there.
1072 : : */
612 akapila@postgresql.o 1073 [ + + ]: 1248 : if (SnapBuildXidHasCatalogChanges(builder, subxid, xinfo))
1074 : : {
2528 andres@anarazel.de 1075 : 9 : sub_needs_timetravel = true;
1076 : 9 : needs_snapshot = true;
1077 : :
1078 [ - + ]: 9 : elog(DEBUG1, "found subtransaction %u:%u with catalog changes",
1079 : : xid, subxid);
1080 : :
3695 rhaas@postgresql.org 1081 : 9 : SnapBuildAddCommittedTxn(builder, subxid);
1082 : :
1083 [ + - - + : 9 : if (NormalTransactionIdFollows(subxid, xmax))
+ - ]
1084 : 9 : xmax = subxid;
1085 : : }
1086 : :
1087 : : /*
1088 : : * If we're forcing timetravel we also need visibility information
1089 : : * about subtransaction, so keep track of subtransaction's state, even
1090 : : * if not catalog modifying. Don't need to distribute a snapshot in
1091 : : * that case.
1092 : : */
2528 andres@anarazel.de 1093 [ - + ]: 1239 : else if (needs_timetravel)
1094 : : {
3695 rhaas@postgresql.org 1095 :UBC 0 : SnapBuildAddCommittedTxn(builder, subxid);
1096 [ # # # # : 0 : if (NormalTransactionIdFollows(subxid, xmax))
# # ]
1097 : 0 : xmax = subxid;
1098 : : }
1099 : : }
1100 : :
1101 : : /* if top-level modified catalog, it'll need a snapshot */
612 akapila@postgresql.o 1102 [ + + ]:CBC 2811 : if (SnapBuildXidHasCatalogChanges(builder, xid, xinfo))
1103 : : {
2528 andres@anarazel.de 1104 [ + + ]: 1081 : elog(DEBUG2, "found top level transaction %u, with catalog changes",
1105 : : xid);
1106 : 1081 : needs_snapshot = true;
1107 : 1081 : needs_timetravel = true;
3695 rhaas@postgresql.org 1108 : 1081 : SnapBuildAddCommittedTxn(builder, xid);
1109 : : }
2528 andres@anarazel.de 1110 [ + + ]: 1730 : else if (sub_needs_timetravel)
1111 : : {
1112 : : /* track toplevel txn as well, subxact alone isn't meaningful */
542 akapila@postgresql.o 1113 [ - + ]: 1 : elog(DEBUG2, "forced transaction %u to do timetravel due to one of its subtransactions",
1114 : : xid);
1115 : 1 : needs_timetravel = true;
3695 rhaas@postgresql.org 1116 : 1 : SnapBuildAddCommittedTxn(builder, xid);
1117 : : }
2528 andres@anarazel.de 1118 [ - + ]: 1729 : else if (needs_timetravel)
1119 : : {
2528 andres@anarazel.de 1120 [ # # ]:UBC 0 : elog(DEBUG2, "forced transaction %u to do timetravel", xid);
1121 : :
3695 rhaas@postgresql.org 1122 : 0 : SnapBuildAddCommittedTxn(builder, xid);
1123 : : }
1124 : :
2528 andres@anarazel.de 1125 [ + + ]:CBC 2811 : if (!needs_timetravel)
1126 : : {
1127 : : /* record that we cannot export a general snapshot anymore */
1128 : 1729 : builder->committed.includes_all_transactions = false;
1129 : : }
1130 : :
1131 [ + + - + ]: 2811 : Assert(!needs_snapshot || needs_timetravel);
1132 : :
1133 : : /*
1134 : : * Adjust xmax of the snapshot builder, we only do that for committed,
1135 : : * catalog modifying, transactions, everything else isn't interesting for
1136 : : * us since we'll never look at the respective rows.
1137 : : */
1138 [ + + ]: 2811 : if (needs_timetravel &&
1139 [ + - + + ]: 2164 : (!TransactionIdIsValid(builder->xmax) ||
1140 : 1082 : TransactionIdFollowsOrEquals(xmax, builder->xmax)))
1141 : : {
1142 : 1079 : builder->xmax = xmax;
1143 [ - + ]: 1079 : TransactionIdAdvance(builder->xmax);
1144 : : }
1145 : :
1146 : : /* if there's any reason to build a historic snapshot, do so now */
1147 [ + + ]: 2811 : if (needs_snapshot)
1148 : : {
1149 : : /*
1150 : : * If we haven't built a complete snapshot yet there's no need to hand
1151 : : * it out, it wouldn't (and couldn't) be used anyway.
1152 : : */
3695 rhaas@postgresql.org 1153 [ - + ]: 1082 : if (builder->state < SNAPBUILD_FULL_SNAPSHOT)
3695 rhaas@postgresql.org 1154 :UBC 0 : return;
1155 : :
1156 : : /*
1157 : : * Decrease the snapshot builder's refcount of the old snapshot, note
1158 : : * that it still will be used if it has been handed out to the
1159 : : * reorderbuffer earlier.
1160 : : */
3695 rhaas@postgresql.org 1161 [ + + ]:CBC 1082 : if (builder->snapshot)
1162 : 1076 : SnapBuildSnapDecRefcount(builder->snapshot);
1163 : :
2496 andres@anarazel.de 1164 : 1082 : builder->snapshot = SnapBuildBuildSnapshot(builder);
1165 : :
1166 : : /* we might need to execute invalidations, add snapshot */
3695 rhaas@postgresql.org 1167 [ + + ]: 1082 : if (!ReorderBufferXidHasBaseSnapshot(builder->reorder, xid))
1168 : : {
1169 : 20 : SnapBuildSnapIncRefcount(builder->snapshot);
1170 : 20 : ReorderBufferSetBaseSnapshot(builder->reorder, xid, lsn,
1171 : : builder->snapshot);
1172 : : }
1173 : :
1174 : : /* refcount of the snapshot builder for the new snapshot */
1175 : 1082 : SnapBuildSnapIncRefcount(builder->snapshot);
1176 : :
1177 : : /* add a new catalog snapshot to all currently running transactions */
1178 : 1082 : SnapBuildDistributeNewCatalogSnapshot(builder, lsn);
1179 : : }
1180 : : }
1181 : :
1182 : : /*
1183 : : * Check the reorder buffer and the snapshot to see if the given transaction has
1184 : : * modified catalogs.
1185 : : */
1186 : : static inline bool
612 akapila@postgresql.o 1187 : 4059 : SnapBuildXidHasCatalogChanges(SnapBuild *builder, TransactionId xid,
1188 : : uint32 xinfo)
1189 : : {
1190 [ + + ]: 4059 : if (ReorderBufferXidHasCatalogChanges(builder->reorder, xid))
1191 : 1086 : return true;
1192 : :
1193 : : /*
1194 : : * The transactions that have changed catalogs must have invalidation
1195 : : * info.
1196 : : */
1197 [ + + ]: 2973 : if (!(xinfo & XACT_XINFO_HAS_INVALS))
1198 : 2965 : return false;
1199 : :
1200 : : /* Check the catchange XID array */
1201 [ + + + - ]: 12 : return ((builder->catchange.xcnt > 0) &&
1202 : 4 : (bsearch(&xid, builder->catchange.xip, builder->catchange.xcnt,
1203 : : sizeof(TransactionId), xidComparator) != NULL));
1204 : : }
1205 : :
1206 : : /* -----------------------------------
1207 : : * Snapshot building functions dealing with xlog records
1208 : : * -----------------------------------
1209 : : */
1210 : :
1211 : : /*
1212 : : * Process a running xacts record, and use its information to first build a
1213 : : * historic snapshot and later to release resources that aren't needed
1214 : : * anymore.
1215 : : */
1216 : : void
3695 rhaas@postgresql.org 1217 : 1296 : SnapBuildProcessRunningXacts(SnapBuild *builder, XLogRecPtr lsn, xl_running_xacts *running)
1218 : : {
1219 : : ReorderBufferTXN *txn;
1220 : : TransactionId xmin;
1221 : :
1222 : : /*
1223 : : * If we're not consistent yet, inspect the record to see whether it
1224 : : * allows to get closer to being consistent. If we are consistent, dump
1225 : : * our snapshot so others or we, after a restart, can use it.
1226 : : */
1227 [ + + ]: 1296 : if (builder->state < SNAPBUILD_CONSISTENT)
1228 : : {
1229 : : /* returns false if there's no point in performing cleanup just yet */
1230 [ + + ]: 909 : if (!SnapBuildFindSnapshot(builder, lsn, running))
1231 : 885 : return;
1232 : : }
1233 : : else
1234 : 387 : SnapBuildSerialize(builder, lsn);
1235 : :
1236 : : /*
1237 : : * Update range of interesting xids based on the running xacts
1238 : : * information. We don't increase ->xmax using it, because once we are in
1239 : : * a consistent state we can do that ourselves and much more efficiently
1240 : : * so, because we only need to do it for catalog transactions since we
1241 : : * only ever look at those.
1242 : : *
1243 : : * NB: We only increase xmax when a catalog modifying transaction commits
1244 : : * (see SnapBuildCommitTxn). Because of this, xmax can be lower than
1245 : : * xmin, which looks odd but is correct and actually more efficient, since
1246 : : * we hit fast paths in heapam_visibility.c.
1247 : : */
1248 : 409 : builder->xmin = running->oldestRunningXid;
1249 : :
1250 : : /* Remove transactions we don't need to keep track off anymore */
612 akapila@postgresql.o 1251 : 409 : SnapBuildPurgeOlderTxn(builder);
1252 : :
1253 : : /*
1254 : : * Advance the xmin limit for the current replication slot, to allow
1255 : : * vacuum to clean up the tuples this slot has been protecting.
1256 : : *
1257 : : * The reorderbuffer might have an xmin among the currently running
1258 : : * snapshots; use it if so. If not, we need only consider the snapshots
1259 : : * we'll produce later, which can't be less than the oldest running xid in
1260 : : * the record we're reading now.
1261 : : */
2119 alvherre@alvh.no-ip. 1262 : 409 : xmin = ReorderBufferGetOldestXmin(builder->reorder);
1263 [ + + ]: 409 : if (xmin == InvalidTransactionId)
1264 : 373 : xmin = running->oldestRunningXid;
1265 [ - + ]: 409 : elog(DEBUG3, "xmin: %u, xmax: %u, oldest running: %u, oldest xmin: %u",
1266 : : builder->xmin, builder->xmax, running->oldestRunningXid, xmin);
1267 : 409 : LogicalIncreaseXminForSlot(lsn, xmin);
1268 : :
1269 : : /*
1270 : : * Also tell the slot where we can restart decoding from. We don't want to
1271 : : * do that after every commit because changing that implies an fsync of
1272 : : * the logical slot's state file, so we only do it every time we see a
1273 : : * running xacts record.
1274 : : *
1275 : : * Do so by looking for the oldest in progress transaction (determined by
1276 : : * the first LSN of any of its relevant records). Every transaction
1277 : : * remembers the last location we stored the snapshot to disk before its
1278 : : * beginning. That point is where we can restart from.
1279 : : */
1280 : :
1281 : : /*
1282 : : * Can't know about a serialized snapshot's location if we're not
1283 : : * consistent.
1284 : : */
3695 rhaas@postgresql.org 1285 [ + + ]: 409 : if (builder->state < SNAPBUILD_CONSISTENT)
1286 : 17 : return;
1287 : :
1288 : 392 : txn = ReorderBufferGetOldestTXN(builder->reorder);
1289 : :
1290 : : /*
1291 : : * oldest ongoing txn might have started when we didn't yet serialize
1292 : : * anything because we hadn't reached a consistent state yet.
1293 : : */
1294 [ + + + + ]: 392 : if (txn != NULL && txn->restart_decoding_lsn != InvalidXLogRecPtr)
1295 : 24 : LogicalIncreaseRestartDecodingForSlot(lsn, txn->restart_decoding_lsn);
1296 : :
1297 : : /*
1298 : : * No in-progress transaction, can reuse the last serialized snapshot if
1299 : : * we have one.
1300 : : */
1301 [ + + ]: 368 : else if (txn == NULL &&
2489 tgl@sss.pgh.pa.us 1302 [ + + ]: 346 : builder->reorder->current_restart_decoding_lsn != InvalidXLogRecPtr &&
3695 rhaas@postgresql.org 1303 [ + - ]: 344 : builder->last_serialized_snapshot != InvalidXLogRecPtr)
1304 : 344 : LogicalIncreaseRestartDecodingForSlot(lsn,
1305 : : builder->last_serialized_snapshot);
1306 : : }
1307 : :
1308 : :
1309 : : /*
1310 : : * Build the start of a snapshot that's capable of decoding the catalog.
1311 : : *
1312 : : * Helper function for SnapBuildProcessRunningXacts() while we're not yet
1313 : : * consistent.
1314 : : *
1315 : : * Returns true if there is a point in performing internal maintenance/cleanup
1316 : : * using the xl_running_xacts record.
1317 : : */
1318 : : static bool
1319 : 909 : SnapBuildFindSnapshot(SnapBuild *builder, XLogRecPtr lsn, xl_running_xacts *running)
1320 : : {
1321 : : /* ---
1322 : : * Build catalog decoding snapshot incrementally using information about
1323 : : * the currently running transactions. There are several ways to do that:
1324 : : *
1325 : : * a) There were no running transactions when the xl_running_xacts record
1326 : : * was inserted, jump to CONSISTENT immediately. We might find such a
1327 : : * state while waiting on c)'s sub-states.
1328 : : *
1329 : : * b) This (in a previous run) or another decoding slot serialized a
1330 : : * snapshot to disk that we can use. Can't use this method for the
1331 : : * initial snapshot when slot is being created and needs full snapshot
1332 : : * for export or direct use, as that snapshot will only contain catalog
1333 : : * modifying transactions.
1334 : : *
1335 : : * c) First incrementally build a snapshot for catalog tuples
1336 : : * (BUILDING_SNAPSHOT), that requires all, already in-progress,
1337 : : * transactions to finish. Every transaction starting after that
1338 : : * (FULL_SNAPSHOT state), has enough information to be decoded. But
1339 : : * for older running transactions no viable snapshot exists yet, so
1340 : : * CONSISTENT will only be reached once all of those have finished.
1341 : : * ---
1342 : : */
1343 : :
1344 : : /*
1345 : : * xl_running_xacts record is older than what we can use, we might not
1346 : : * have all necessary catalog rows anymore.
1347 : : */
1348 [ + + ]: 909 : if (TransactionIdIsNormal(builder->initial_xmin_horizon) &&
1349 [ + - - + : 397 : NormalTransactionIdPrecedes(running->oldestRunningXid,
- + ]
1350 : : builder->initial_xmin_horizon))
1351 : : {
3695 rhaas@postgresql.org 1352 [ # # ]:UBC 0 : ereport(DEBUG1,
1353 : : (errmsg_internal("skipping snapshot at %X/%X while building logical decoding snapshot, xmin horizon too low",
1354 : : LSN_FORMAT_ARGS(lsn)),
1355 : : errdetail_internal("initial xmin horizon of %u vs the snapshot's %u",
1356 : : builder->initial_xmin_horizon, running->oldestRunningXid)));
1357 : :
1358 : :
2528 andres@anarazel.de 1359 : 0 : SnapBuildWaitSnapshot(running, builder->initial_xmin_horizon);
1360 : :
3695 rhaas@postgresql.org 1361 : 0 : return true;
1362 : : }
1363 : :
1364 : : /*
1365 : : * a) No transaction were running, we can jump to consistent.
1366 : : *
1367 : : * This is not affected by races around xl_running_xacts, because we can
1368 : : * miss transaction commits, but currently not transactions starting.
1369 : : *
1370 : : * NB: We might have already started to incrementally assemble a snapshot,
1371 : : * so we need to be careful to deal with that.
1372 : : */
2528 andres@anarazel.de 1373 [ + + ]:CBC 909 : if (running->oldestRunningXid == running->nextXid)
1374 : : {
3601 1375 [ + + ]: 878 : if (builder->start_decoding_at == InvalidXLogRecPtr ||
1376 [ + + ]: 489 : builder->start_decoding_at <= lsn)
1377 : : /* can decode everything after this */
1378 : 390 : builder->start_decoding_at = lsn + 1;
1379 : :
1380 : : /* As no transactions were running xmin/xmax can be trivially set. */
2489 tgl@sss.pgh.pa.us 1381 : 878 : builder->xmin = running->nextXid; /* < are finished */
1382 : 878 : builder->xmax = running->nextXid; /* >= are running */
1383 : :
1384 : : /* so we can safely use the faster comparisons */
3695 rhaas@postgresql.org 1385 [ - + ]: 878 : Assert(TransactionIdIsNormal(builder->xmin));
1386 [ - + ]: 878 : Assert(TransactionIdIsNormal(builder->xmax));
1387 : :
1388 : 878 : builder->state = SNAPBUILD_CONSISTENT;
1154 andres@anarazel.de 1389 : 878 : builder->next_phase_at = InvalidTransactionId;
1390 : :
3695 rhaas@postgresql.org 1391 [ + - ]: 878 : ereport(LOG,
1392 : : (errmsg("logical decoding found consistent point at %X/%X",
1393 : : LSN_FORMAT_ARGS(lsn)),
1394 : : errdetail("There are no running transactions.")));
1395 : :
1396 : 878 : return false;
1397 : : }
1398 : : /* b) valid on disk state and not building full snapshot */
2544 andres@anarazel.de 1399 [ + + + + ]: 61 : else if (!builder->building_full_snapshot &&
1400 : 30 : SnapBuildRestore(builder, lsn))
1401 : : {
1402 : : /* there won't be any state to cleanup */
3695 rhaas@postgresql.org 1403 : 7 : return false;
1404 : : }
1405 : :
1406 : : /*
1407 : : * c) transition from START to BUILDING_SNAPSHOT.
1408 : : *
1409 : : * In START state, and a xl_running_xacts record with running xacts is
1410 : : * encountered. In that case, switch to BUILDING_SNAPSHOT state, and
1411 : : * record xl_running_xacts->nextXid. Once all running xacts have finished
1412 : : * (i.e. they're all >= nextXid), we have a complete catalog snapshot. It
1413 : : * might look that we could use xl_running_xacts's ->xids information to
1414 : : * get there quicker, but that is problematic because transactions marked
1415 : : * as running, might already have inserted their commit record - it's
1416 : : * infeasible to change that with locking.
1417 : : */
2528 andres@anarazel.de 1418 [ + + ]: 24 : else if (builder->state == SNAPBUILD_START)
1419 : : {
1420 : 14 : builder->state = SNAPBUILD_BUILDING_SNAPSHOT;
1154 1421 : 14 : builder->next_phase_at = running->nextXid;
1422 : :
1423 : : /*
1424 : : * Start with an xmin/xmax that's correct for future, when all the
1425 : : * currently running transactions have finished. We'll update both
1426 : : * while waiting for the pending transactions to finish.
1427 : : */
2489 tgl@sss.pgh.pa.us 1428 : 14 : builder->xmin = running->nextXid; /* < are finished */
1429 : 14 : builder->xmax = running->nextXid; /* >= are running */
1430 : :
1431 : : /* so we can safely use the faster comparisons */
3695 rhaas@postgresql.org 1432 [ - + ]: 14 : Assert(TransactionIdIsNormal(builder->xmin));
1433 [ - + ]: 14 : Assert(TransactionIdIsNormal(builder->xmax));
1434 : :
1435 [ + - ]: 14 : ereport(LOG,
1436 : : (errmsg("logical decoding found initial starting point at %X/%X",
1437 : : LSN_FORMAT_ARGS(lsn)),
1438 : : errdetail("Waiting for transactions (approximately %d) older than %u to end.",
1439 : : running->xcnt, running->nextXid)));
1440 : :
2528 andres@anarazel.de 1441 : 14 : SnapBuildWaitSnapshot(running, running->nextXid);
1442 : : }
1443 : :
1444 : : /*
1445 : : * c) transition from BUILDING_SNAPSHOT to FULL_SNAPSHOT.
1446 : : *
1447 : : * In BUILDING_SNAPSHOT state, and this xl_running_xacts' oldestRunningXid
1448 : : * is >= than nextXid from when we switched to BUILDING_SNAPSHOT. This
1449 : : * means all transactions starting afterwards have enough information to
1450 : : * be decoded. Switch to FULL_SNAPSHOT.
1451 : : */
1452 [ + + + - ]: 15 : else if (builder->state == SNAPBUILD_BUILDING_SNAPSHOT &&
1154 1453 : 5 : TransactionIdPrecedesOrEquals(builder->next_phase_at,
1454 : : running->oldestRunningXid))
1455 : : {
2528 1456 : 5 : builder->state = SNAPBUILD_FULL_SNAPSHOT;
1154 1457 : 5 : builder->next_phase_at = running->nextXid;
1458 : :
2528 1459 [ + - ]: 5 : ereport(LOG,
1460 : : (errmsg("logical decoding found initial consistent point at %X/%X",
1461 : : LSN_FORMAT_ARGS(lsn)),
1462 : : errdetail("Waiting for transactions (approximately %d) older than %u to end.",
1463 : : running->xcnt, running->nextXid)));
1464 : :
1465 : 5 : SnapBuildWaitSnapshot(running, running->nextXid);
1466 : : }
1467 : :
1468 : : /*
1469 : : * c) transition from FULL_SNAPSHOT to CONSISTENT.
1470 : : *
1471 : : * In FULL_SNAPSHOT state, and this xl_running_xacts' oldestRunningXid is
1472 : : * >= than nextXid from when we switched to FULL_SNAPSHOT. This means all
1473 : : * transactions that are currently in progress have a catalog snapshot,
1474 : : * and all their changes have been collected. Switch to CONSISTENT.
1475 : : */
1476 [ + - + - ]: 10 : else if (builder->state == SNAPBUILD_FULL_SNAPSHOT &&
1154 1477 : 5 : TransactionIdPrecedesOrEquals(builder->next_phase_at,
1478 : : running->oldestRunningXid))
1479 : : {
2528 1480 : 5 : builder->state = SNAPBUILD_CONSISTENT;
1154 1481 : 5 : builder->next_phase_at = InvalidTransactionId;
1482 : :
2528 1483 [ + - ]: 5 : ereport(LOG,
1484 : : (errmsg("logical decoding found consistent point at %X/%X",
1485 : : LSN_FORMAT_ARGS(lsn)),
1486 : : errdetail("There are no old transactions anymore.")));
1487 : : }
1488 : :
1489 : : /*
1490 : : * We already started to track running xacts and need to wait for all
1491 : : * in-progress ones to finish. We fall through to the normal processing of
1492 : : * records so incremental cleanup can be performed.
1493 : : */
3695 rhaas@postgresql.org 1494 : 22 : return true;
1495 : : }
1496 : :
1497 : : /* ---
1498 : : * Iterate through xids in record, wait for all older than the cutoff to
1499 : : * finish. Then, if possible, log a new xl_running_xacts record.
1500 : : *
1501 : : * This isn't required for the correctness of decoding, but to:
1502 : : * a) allow isolationtester to notice that we're currently waiting for
1503 : : * something.
1504 : : * b) log a new xl_running_xacts record where it'd be helpful, without having
1505 : : * to wait for bgwriter or checkpointer.
1506 : : * ---
1507 : : */
1508 : : static void
2528 andres@anarazel.de 1509 : 19 : SnapBuildWaitSnapshot(xl_running_xacts *running, TransactionId cutoff)
1510 : : {
1511 : : int off;
1512 : :
1513 [ + + ]: 36 : for (off = 0; off < running->xcnt; off++)
1514 : : {
1515 : 19 : TransactionId xid = running->xids[off];
1516 : :
1517 : : /*
1518 : : * Upper layers should prevent that we ever need to wait on ourselves.
1519 : : * Check anyway, since failing to do so would either result in an
1520 : : * endless wait or an Assert() failure.
1521 : : */
1522 [ - + ]: 19 : if (TransactionIdIsCurrentTransactionId(xid))
2528 andres@anarazel.de 1523 [ # # ]:UBC 0 : elog(ERROR, "waiting for ourselves");
1524 : :
2528 andres@anarazel.de 1525 [ - + ]:CBC 19 : if (TransactionIdFollows(xid, cutoff))
2528 andres@anarazel.de 1526 :UBC 0 : continue;
1527 : :
2528 andres@anarazel.de 1528 :CBC 19 : XactLockTableWait(xid, NULL, NULL, XLTW_None);
1529 : : }
1530 : :
1531 : : /*
1532 : : * All transactions we needed to finish finished - try to ensure there is
1533 : : * another xl_running_xacts record in a timely manner, without having to
1534 : : * wait for bgwriter or checkpointer to log one. During recovery we can't
1535 : : * enforce that, so we'll have to wait.
1536 : : */
1537 [ + - ]: 17 : if (!RecoveryInProgress())
1538 : : {
1539 : 17 : LogStandbySnapshot();
1540 : : }
1541 : 17 : }
1542 : :
1543 : : /* -----------------------------------
1544 : : * Snapshot serialization support
1545 : : * -----------------------------------
1546 : : */
1547 : :
1548 : : /*
1549 : : * We store current state of struct SnapBuild on disk in the following manner:
1550 : : *
1551 : : * struct SnapBuildOnDisk;
1552 : : * TransactionId * committed.xcnt; (*not xcnt_space*)
1553 : : * TransactionId * catchange.xcnt;
1554 : : *
1555 : : */
1556 : : typedef struct SnapBuildOnDisk
1557 : : {
1558 : : /* first part of this struct needs to be version independent */
1559 : :
1560 : : /* data not covered by checksum */
1561 : : uint32 magic;
1562 : : pg_crc32c checksum;
1563 : :
1564 : : /* data covered by checksum */
1565 : :
1566 : : /* version, in case we want to support pg_upgrade */
1567 : : uint32 version;
1568 : : /* how large is the on disk data, excluding the constant sized part */
1569 : : uint32 length;
1570 : :
1571 : : /* version dependent part */
1572 : : SnapBuild builder;
1573 : :
1574 : : /* variable amount of TransactionIds follows */
1575 : : } SnapBuildOnDisk;
1576 : :
1577 : : #define SnapBuildOnDiskConstantSize \
1578 : : offsetof(SnapBuildOnDisk, builder)
1579 : : #define SnapBuildOnDiskNotChecksummedSize \
1580 : : offsetof(SnapBuildOnDisk, version)
1581 : :
1582 : : #define SNAPBUILD_MAGIC 0x51A1E001
1583 : : #define SNAPBUILD_VERSION 5
1584 : :
1585 : : /*
1586 : : * Store/Load a snapshot from disk, depending on the snapshot builder's state.
1587 : : *
1588 : : * Supposed to be used by external (i.e. not snapbuild.c) code that just read
1589 : : * a record that's a potential location for a serialized snapshot.
1590 : : */
1591 : : void
3695 rhaas@postgresql.org 1592 : 43 : SnapBuildSerializationPoint(SnapBuild *builder, XLogRecPtr lsn)
1593 : : {
1594 [ - + ]: 43 : if (builder->state < SNAPBUILD_CONSISTENT)
3695 rhaas@postgresql.org 1595 :UBC 0 : SnapBuildRestore(builder, lsn);
1596 : : else
3695 rhaas@postgresql.org 1597 :CBC 43 : SnapBuildSerialize(builder, lsn);
1598 : 43 : }
1599 : :
1600 : : /*
1601 : : * Serialize the snapshot 'builder' at the location 'lsn' if it hasn't already
1602 : : * been done by another decoding process.
1603 : : */
1604 : : static void
1605 : 430 : SnapBuildSerialize(SnapBuild *builder, XLogRecPtr lsn)
1606 : : {
1607 : : Size needed_length;
1187 akapila@postgresql.o 1608 : 430 : SnapBuildOnDisk *ondisk = NULL;
612 1609 : 430 : TransactionId *catchange_xip = NULL;
1610 : : MemoryContext old_ctx;
1611 : : size_t catchange_xcnt;
1612 : : char *ondisk_c;
1613 : : int fd;
1614 : : char tmppath[MAXPGPATH];
1615 : : char path[MAXPGPATH];
1616 : : int ret;
1617 : : struct stat stat_buf;
1618 : : Size sz;
1619 : :
3695 rhaas@postgresql.org 1620 [ - + ]: 430 : Assert(lsn != InvalidXLogRecPtr);
1621 [ + + - + ]: 430 : Assert(builder->last_serialized_snapshot == InvalidXLogRecPtr ||
1622 : : builder->last_serialized_snapshot <= lsn);
1623 : :
1624 : : /*
1625 : : * no point in serializing if we cannot continue to work immediately after
1626 : : * restoring the snapshot
1627 : : */
1628 [ - + ]: 430 : if (builder->state < SNAPBUILD_CONSISTENT)
3695 rhaas@postgresql.org 1629 :UBC 0 : return;
1630 : :
1631 : : /* consistent snapshots have no next phase */
1154 andres@anarazel.de 1632 [ - + ]:CBC 430 : Assert(builder->next_phase_at == InvalidTransactionId);
1633 : :
1634 : : /*
1635 : : * We identify snapshots by the LSN they are valid for. We don't need to
1636 : : * include timelines in the name as each LSN maps to exactly one timeline
1637 : : * unless the user used pg_resetwal or similar. If a user did so, there's
1638 : : * no hope continuing to decode anyway.
1639 : : */
3574 1640 : 430 : sprintf(path, "pg_logical/snapshots/%X-%X.snap",
1146 peter@eisentraut.org 1641 : 430 : LSN_FORMAT_ARGS(lsn));
1642 : :
1643 : : /*
1644 : : * first check whether some other backend already has written the snapshot
1645 : : * for this LSN. It's perfectly fine if there's none, so we accept ENOENT
1646 : : * as a valid state. Everything else is an unexpected error.
1647 : : */
3695 rhaas@postgresql.org 1648 : 430 : ret = stat(path, &stat_buf);
1649 : :
1650 [ + + - + ]: 430 : if (ret != 0 && errno != ENOENT)
3695 rhaas@postgresql.org 1651 [ # # ]:UBC 0 : ereport(ERROR,
1652 : : (errcode_for_file_access(),
1653 : : errmsg("could not stat file \"%s\": %m", path)));
1654 : :
3695 rhaas@postgresql.org 1655 [ + + ]:CBC 430 : else if (ret == 0)
1656 : : {
1657 : : /*
1658 : : * somebody else has already serialized to this point, don't overwrite
1659 : : * but remember location, so we don't need to read old data again.
1660 : : *
1661 : : * To be sure it has been synced to disk after the rename() from the
1662 : : * tempfile filename to the real filename, we just repeat the fsync.
1663 : : * That ought to be cheap because in most scenarios it should already
1664 : : * be safely on disk.
1665 : : */
1666 : 133 : fsync_fname(path, false);
3574 andres@anarazel.de 1667 : 133 : fsync_fname("pg_logical/snapshots", true);
1668 : :
3695 rhaas@postgresql.org 1669 : 133 : builder->last_serialized_snapshot = lsn;
1670 : 133 : goto out;
1671 : : }
1672 : :
1673 : : /*
1674 : : * there is an obvious race condition here between the time we stat(2) the
1675 : : * file and us writing the file. But we rename the file into place
1676 : : * atomically and all files created need to contain the same data anyway,
1677 : : * so this is perfectly fine, although a bit of a resource waste. Locking
1678 : : * seems like pointless complication.
1679 : : */
1680 [ + + ]: 297 : elog(DEBUG1, "serializing snapshot to %s", path);
1681 : :
1682 : : /* to make sure only we will write to this tempfile, include pid */
1091 peter@eisentraut.org 1683 : 297 : sprintf(tmppath, "pg_logical/snapshots/%X-%X.snap.%d.tmp",
1146 1684 : 297 : LSN_FORMAT_ARGS(lsn), MyProcPid);
1685 : :
1686 : : /*
1687 : : * Unlink temporary file if it already exists, needs to have been before a
1688 : : * crash/error since we won't enter this function twice from within a
1689 : : * single decoding slot/backend and the temporary file contains the pid of
1690 : : * the current process.
1691 : : */
3695 rhaas@postgresql.org 1692 [ + - - + ]: 297 : if (unlink(tmppath) != 0 && errno != ENOENT)
3695 rhaas@postgresql.org 1693 [ # # ]:UBC 0 : ereport(ERROR,
1694 : : (errcode_for_file_access(),
1695 : : errmsg("could not remove file \"%s\": %m", tmppath)));
1696 : :
612 akapila@postgresql.o 1697 :CBC 297 : old_ctx = MemoryContextSwitchTo(builder->context);
1698 : :
1699 : : /* Get the catalog modifying transactions that are yet not committed */
1700 : 297 : catchange_xip = ReorderBufferGetCatalogChangesXacts(builder->reorder);
529 drowley@postgresql.o 1701 : 297 : catchange_xcnt = dclist_count(&builder->reorder->catchange_txns);
1702 : :
3695 rhaas@postgresql.org 1703 : 297 : needed_length = sizeof(SnapBuildOnDisk) +
612 akapila@postgresql.o 1704 : 297 : sizeof(TransactionId) * (builder->committed.xcnt + catchange_xcnt);
1705 : :
1706 : 297 : ondisk_c = palloc0(needed_length);
3695 rhaas@postgresql.org 1707 : 297 : ondisk = (SnapBuildOnDisk *) ondisk_c;
1708 : 297 : ondisk->magic = SNAPBUILD_MAGIC;
1709 : 297 : ondisk->version = SNAPBUILD_VERSION;
1710 : 297 : ondisk->length = needed_length;
3449 heikki.linnakangas@i 1711 : 297 : INIT_CRC32C(ondisk->checksum);
1712 : 297 : COMP_CRC32C(ondisk->checksum,
1713 : : ((char *) ondisk) + SnapBuildOnDiskNotChecksummedSize,
1714 : : SnapBuildOnDiskConstantSize - SnapBuildOnDiskNotChecksummedSize);
3695 rhaas@postgresql.org 1715 : 297 : ondisk_c += sizeof(SnapBuildOnDisk);
1716 : :
1717 : 297 : memcpy(&ondisk->builder, builder, sizeof(SnapBuild));
1718 : : /* NULL-ify memory-only data */
1719 : 297 : ondisk->builder.context = NULL;
1720 : 297 : ondisk->builder.snapshot = NULL;
1721 : 297 : ondisk->builder.reorder = NULL;
1722 : 297 : ondisk->builder.committed.xip = NULL;
612 akapila@postgresql.o 1723 : 297 : ondisk->builder.catchange.xip = NULL;
1724 : : /* update catchange only on disk data */
1725 : 297 : ondisk->builder.catchange.xcnt = catchange_xcnt;
1726 : :
3449 heikki.linnakangas@i 1727 : 297 : COMP_CRC32C(ondisk->checksum,
1728 : : &ondisk->builder,
1729 : : sizeof(SnapBuild));
1730 : :
1731 : : /* copy committed xacts */
612 akapila@postgresql.o 1732 [ + + ]: 297 : if (builder->committed.xcnt > 0)
1733 : : {
1734 : 49 : sz = sizeof(TransactionId) * builder->committed.xcnt;
1735 : 49 : memcpy(ondisk_c, builder->committed.xip, sz);
1736 : 49 : COMP_CRC32C(ondisk->checksum, ondisk_c, sz);
1737 : 49 : ondisk_c += sz;
1738 : : }
1739 : :
1740 : : /* copy catalog modifying xacts */
1741 [ + + ]: 297 : if (catchange_xcnt > 0)
1742 : : {
1743 : 8 : sz = sizeof(TransactionId) * catchange_xcnt;
1744 : 8 : memcpy(ondisk_c, catchange_xip, sz);
1745 : 8 : COMP_CRC32C(ondisk->checksum, ondisk_c, sz);
1746 : 8 : ondisk_c += sz;
1747 : : }
1748 : :
3441 andres@anarazel.de 1749 : 297 : FIN_CRC32C(ondisk->checksum);
1750 : :
1751 : : /* we have valid data now, open tempfile and write it there */
3695 rhaas@postgresql.org 1752 : 297 : fd = OpenTransientFile(tmppath,
1753 : : O_CREAT | O_EXCL | O_WRONLY | PG_BINARY);
1754 [ - + ]: 297 : if (fd < 0)
3695 rhaas@postgresql.org 1755 [ # # ]:UBC 0 : ereport(ERROR,
1756 : : (errcode_for_file_access(),
1757 : : errmsg("could not open file \"%s\": %m", tmppath)));
1758 : :
2079 michael@paquier.xyz 1759 :CBC 297 : errno = 0;
2584 rhaas@postgresql.org 1760 : 297 : pgstat_report_wait_start(WAIT_EVENT_SNAPBUILD_WRITE);
3695 1761 [ - + ]: 297 : if ((write(fd, ondisk, needed_length)) != needed_length)
1762 : : {
2120 michael@paquier.xyz 1763 :UBC 0 : int save_errno = errno;
1764 : :
3695 rhaas@postgresql.org 1765 : 0 : CloseTransientFile(fd);
1766 : :
1767 : : /* if write didn't set errno, assume problem is no disk space */
2120 michael@paquier.xyz 1768 [ # # ]: 0 : errno = save_errno ? save_errno : ENOSPC;
3695 rhaas@postgresql.org 1769 [ # # ]: 0 : ereport(ERROR,
1770 : : (errcode_for_file_access(),
1771 : : errmsg("could not write to file \"%s\": %m", tmppath)));
1772 : : }
2584 rhaas@postgresql.org 1773 :CBC 297 : pgstat_report_wait_end();
1774 : :
1775 : : /*
1776 : : * fsync the file before renaming so that even if we crash after this we
1777 : : * have either a fully valid file or nothing.
1778 : : *
1779 : : * It's safe to just ERROR on fsync() here because we'll retry the whole
1780 : : * operation including the writes.
1781 : : *
1782 : : * TODO: Do the fsync() via checkpoints/restartpoints, doing it here has
1783 : : * some noticeable overhead since it's performed synchronously during
1784 : : * decoding?
1785 : : */
1786 : 297 : pgstat_report_wait_start(WAIT_EVENT_SNAPBUILD_SYNC);
3695 1787 [ - + ]: 297 : if (pg_fsync(fd) != 0)
1788 : : {
2120 michael@paquier.xyz 1789 :UBC 0 : int save_errno = errno;
1790 : :
3695 rhaas@postgresql.org 1791 : 0 : CloseTransientFile(fd);
2120 michael@paquier.xyz 1792 : 0 : errno = save_errno;
3695 rhaas@postgresql.org 1793 [ # # ]: 0 : ereport(ERROR,
1794 : : (errcode_for_file_access(),
1795 : : errmsg("could not fsync file \"%s\": %m", tmppath)));
1796 : : }
2584 rhaas@postgresql.org 1797 :CBC 297 : pgstat_report_wait_end();
1798 : :
1744 peter@eisentraut.org 1799 [ - + ]: 297 : if (CloseTransientFile(fd) != 0)
1863 michael@paquier.xyz 1800 [ # # ]:UBC 0 : ereport(ERROR,
1801 : : (errcode_for_file_access(),
1802 : : errmsg("could not close file \"%s\": %m", tmppath)));
1803 : :
3574 andres@anarazel.de 1804 :CBC 297 : fsync_fname("pg_logical/snapshots", true);
1805 : :
1806 : : /*
1807 : : * We may overwrite the work from some other backend, but that's ok, our
1808 : : * snapshot is valid as well, we'll just have done some superfluous work.
1809 : : */
3695 rhaas@postgresql.org 1810 [ - + ]: 297 : if (rename(tmppath, path) != 0)
1811 : : {
3695 rhaas@postgresql.org 1812 [ # # ]:UBC 0 : ereport(ERROR,
1813 : : (errcode_for_file_access(),
1814 : : errmsg("could not rename file \"%s\" to \"%s\": %m",
1815 : : tmppath, path)));
1816 : : }
1817 : :
1818 : : /* make sure we persist */
3695 rhaas@postgresql.org 1819 :CBC 297 : fsync_fname(path, false);
3574 andres@anarazel.de 1820 : 297 : fsync_fname("pg_logical/snapshots", true);
1821 : :
1822 : : /*
1823 : : * Now there's no way we can lose the dumped state anymore, remember this
1824 : : * as a serialization point.
1825 : : */
3695 rhaas@postgresql.org 1826 : 297 : builder->last_serialized_snapshot = lsn;
1827 : :
612 akapila@postgresql.o 1828 : 297 : MemoryContextSwitchTo(old_ctx);
1829 : :
3695 rhaas@postgresql.org 1830 : 430 : out:
1831 : 430 : ReorderBufferSetRestartPoint(builder->reorder,
1832 : : builder->last_serialized_snapshot);
1833 : : /* be tidy */
1187 akapila@postgresql.o 1834 [ + + ]: 430 : if (ondisk)
1835 : 297 : pfree(ondisk);
612 1836 [ + + ]: 430 : if (catchange_xip)
1837 : 8 : pfree(catchange_xip);
1838 : : }
1839 : :
1840 : : /*
1841 : : * Restore a snapshot into 'builder' if previously one has been stored at the
1842 : : * location indicated by 'lsn'. Returns true if successful, false otherwise.
1843 : : */
1844 : : static bool
3695 rhaas@postgresql.org 1845 : 30 : SnapBuildRestore(SnapBuild *builder, XLogRecPtr lsn)
1846 : : {
1847 : : SnapBuildOnDisk ondisk;
1848 : : int fd;
1849 : : char path[MAXPGPATH];
1850 : : Size sz;
1851 : : pg_crc32c checksum;
1852 : :
1853 : : /* no point in loading a snapshot if we're already there */
1854 [ - + ]: 30 : if (builder->state == SNAPBUILD_CONSISTENT)
3695 rhaas@postgresql.org 1855 :UBC 0 : return false;
1856 : :
3574 andres@anarazel.de 1857 :CBC 30 : sprintf(path, "pg_logical/snapshots/%X-%X.snap",
1146 peter@eisentraut.org 1858 : 30 : LSN_FORMAT_ARGS(lsn));
1859 : :
2395 peter_e@gmx.net 1860 : 30 : fd = OpenTransientFile(path, O_RDONLY | PG_BINARY);
1861 : :
3695 rhaas@postgresql.org 1862 [ + + + - ]: 30 : if (fd < 0 && errno == ENOENT)
1863 : 23 : return false;
1864 [ - + ]: 7 : else if (fd < 0)
3695 rhaas@postgresql.org 1865 [ # # ]:UBC 0 : ereport(ERROR,
1866 : : (errcode_for_file_access(),
1867 : : errmsg("could not open file \"%s\": %m", path)));
1868 : :
1869 : : /* ----
1870 : : * Make sure the snapshot had been stored safely to disk, that's normally
1871 : : * cheap.
1872 : : * Note that we do not need PANIC here, nobody will be able to use the
1873 : : * slot without fsyncing, and saving it won't succeed without an fsync()
1874 : : * either...
1875 : : * ----
1876 : : */
3695 rhaas@postgresql.org 1877 :CBC 7 : fsync_fname(path, false);
3574 andres@anarazel.de 1878 : 7 : fsync_fname("pg_logical/snapshots", true);
1879 : :
1880 : :
1881 : : /* read statically sized portion of snapshot */
612 akapila@postgresql.o 1882 : 7 : SnapBuildRestoreContents(fd, (char *) &ondisk, SnapBuildOnDiskConstantSize, path);
1883 : :
3695 rhaas@postgresql.org 1884 [ - + ]: 7 : if (ondisk.magic != SNAPBUILD_MAGIC)
3695 rhaas@postgresql.org 1885 [ # # ]:UBC 0 : ereport(ERROR,
1886 : : (errcode(ERRCODE_DATA_CORRUPTED),
1887 : : errmsg("snapbuild state file \"%s\" has wrong magic number: %u instead of %u",
1888 : : path, ondisk.magic, SNAPBUILD_MAGIC)));
1889 : :
3695 rhaas@postgresql.org 1890 [ - + ]:CBC 7 : if (ondisk.version != SNAPBUILD_VERSION)
3695 rhaas@postgresql.org 1891 [ # # ]:UBC 0 : ereport(ERROR,
1892 : : (errcode(ERRCODE_DATA_CORRUPTED),
1893 : : errmsg("snapbuild state file \"%s\" has unsupported version: %u instead of %u",
1894 : : path, ondisk.version, SNAPBUILD_VERSION)));
1895 : :
3449 heikki.linnakangas@i 1896 :CBC 7 : INIT_CRC32C(checksum);
1897 : 7 : COMP_CRC32C(checksum,
1898 : : ((char *) &ondisk) + SnapBuildOnDiskNotChecksummedSize,
1899 : : SnapBuildOnDiskConstantSize - SnapBuildOnDiskNotChecksummedSize);
1900 : :
1901 : : /* read SnapBuild */
612 akapila@postgresql.o 1902 : 7 : SnapBuildRestoreContents(fd, (char *) &ondisk.builder, sizeof(SnapBuild), path);
3449 heikki.linnakangas@i 1903 : 7 : COMP_CRC32C(checksum, &ondisk.builder, sizeof(SnapBuild));
1904 : :
1905 : : /* restore committed xacts information */
612 akapila@postgresql.o 1906 [ + + ]: 7 : if (ondisk.builder.committed.xcnt > 0)
1907 : : {
1908 : 2 : sz = sizeof(TransactionId) * ondisk.builder.committed.xcnt;
1909 : 2 : ondisk.builder.committed.xip = MemoryContextAllocZero(builder->context, sz);
1910 : 2 : SnapBuildRestoreContents(fd, (char *) ondisk.builder.committed.xip, sz, path);
1911 : 2 : COMP_CRC32C(checksum, ondisk.builder.committed.xip, sz);
1912 : : }
1913 : :
1914 : : /* restore catalog modifying xacts information */
1915 [ + + ]: 7 : if (ondisk.builder.catchange.xcnt > 0)
1916 : : {
1917 : 3 : sz = sizeof(TransactionId) * ondisk.builder.catchange.xcnt;
1918 : 3 : ondisk.builder.catchange.xip = MemoryContextAllocZero(builder->context, sz);
1919 : 3 : SnapBuildRestoreContents(fd, (char *) ondisk.builder.catchange.xip, sz, path);
1920 : 3 : COMP_CRC32C(checksum, ondisk.builder.catchange.xip, sz);
1921 : : }
1922 : :
1744 peter@eisentraut.org 1923 [ - + ]: 7 : if (CloseTransientFile(fd) != 0)
1863 michael@paquier.xyz 1924 [ # # ]:UBC 0 : ereport(ERROR,
1925 : : (errcode_for_file_access(),
1926 : : errmsg("could not close file \"%s\": %m", path)));
1927 : :
3441 andres@anarazel.de 1928 :CBC 7 : FIN_CRC32C(checksum);
1929 : :
1930 : : /* verify checksum of what we've read */
3449 heikki.linnakangas@i 1931 [ - + ]: 7 : if (!EQ_CRC32C(checksum, ondisk.checksum))
3695 rhaas@postgresql.org 1932 [ # # ]:UBC 0 : ereport(ERROR,
1933 : : (errcode(ERRCODE_DATA_CORRUPTED),
1934 : : errmsg("checksum mismatch for snapbuild state file \"%s\": is %u, should be %u",
1935 : : path, checksum, ondisk.checksum)));
1936 : :
1937 : : /*
1938 : : * ok, we now have a sensible snapshot here, figure out if it has more
1939 : : * information than we have.
1940 : : */
1941 : :
1942 : : /*
1943 : : * We are only interested in consistent snapshots for now, comparing
1944 : : * whether one incomplete snapshot is more "advanced" seems to be
1945 : : * unnecessarily complex.
1946 : : */
3695 rhaas@postgresql.org 1947 [ - + ]:CBC 7 : if (ondisk.builder.state < SNAPBUILD_CONSISTENT)
3695 rhaas@postgresql.org 1948 :UBC 0 : goto snapshot_not_interesting;
1949 : :
1950 : : /*
1951 : : * Don't use a snapshot that requires an xmin that we cannot guarantee to
1952 : : * be available.
1953 : : */
3695 rhaas@postgresql.org 1954 [ - + ]:CBC 7 : if (TransactionIdPrecedes(ondisk.builder.xmin, builder->initial_xmin_horizon))
3695 rhaas@postgresql.org 1955 :UBC 0 : goto snapshot_not_interesting;
1956 : :
1957 : : /*
1958 : : * Consistent snapshots have no next phase. Reset next_phase_at as it is
1959 : : * possible that an old value may remain.
1960 : : */
1154 andres@anarazel.de 1961 [ - + ]:CBC 7 : Assert(ondisk.builder.next_phase_at == InvalidTransactionId);
285 dgustafsson@postgres 1962 : 7 : builder->next_phase_at = InvalidTransactionId;
1963 : :
1964 : : /* ok, we think the snapshot is sensible, copy over everything important */
3695 rhaas@postgresql.org 1965 : 7 : builder->xmin = ondisk.builder.xmin;
1966 : 7 : builder->xmax = ondisk.builder.xmax;
1967 : 7 : builder->state = ondisk.builder.state;
1968 : :
1969 : 7 : builder->committed.xcnt = ondisk.builder.committed.xcnt;
1970 : : /* We only allocated/stored xcnt, not xcnt_space xids ! */
1971 : : /* don't overwrite preallocated xip, if we don't have anything here */
1972 [ + + ]: 7 : if (builder->committed.xcnt > 0)
1973 : : {
1974 : 2 : pfree(builder->committed.xip);
1975 : 2 : builder->committed.xcnt_space = ondisk.builder.committed.xcnt;
1976 : 2 : builder->committed.xip = ondisk.builder.committed.xip;
1977 : : }
1978 : 7 : ondisk.builder.committed.xip = NULL;
1979 : :
1980 : : /* set catalog modifying transactions */
612 akapila@postgresql.o 1981 [ - + ]: 7 : if (builder->catchange.xip)
612 akapila@postgresql.o 1982 :UBC 0 : pfree(builder->catchange.xip);
612 akapila@postgresql.o 1983 :CBC 7 : builder->catchange.xcnt = ondisk.builder.catchange.xcnt;
1984 : 7 : builder->catchange.xip = ondisk.builder.catchange.xip;
1985 : 7 : ondisk.builder.catchange.xip = NULL;
1986 : :
1987 : : /* our snapshot is not interesting anymore, build a new one */
3695 rhaas@postgresql.org 1988 [ - + ]: 7 : if (builder->snapshot != NULL)
1989 : : {
3695 rhaas@postgresql.org 1990 :UBC 0 : SnapBuildSnapDecRefcount(builder->snapshot);
1991 : : }
2496 andres@anarazel.de 1992 :CBC 7 : builder->snapshot = SnapBuildBuildSnapshot(builder);
3695 rhaas@postgresql.org 1993 : 7 : SnapBuildSnapIncRefcount(builder->snapshot);
1994 : :
1995 : 7 : ReorderBufferSetRestartPoint(builder->reorder, lsn);
1996 : :
1997 [ - + ]: 7 : Assert(builder->state == SNAPBUILD_CONSISTENT);
1998 : :
1999 [ + - ]: 7 : ereport(LOG,
2000 : : (errmsg("logical decoding found consistent point at %X/%X",
2001 : : LSN_FORMAT_ARGS(lsn)),
2002 : : errdetail("Logical decoding will begin using saved snapshot.")));
2003 : 7 : return true;
2004 : :
3695 rhaas@postgresql.org 2005 :UBC 0 : snapshot_not_interesting:
2006 [ # # ]: 0 : if (ondisk.builder.committed.xip != NULL)
2007 : 0 : pfree(ondisk.builder.committed.xip);
612 akapila@postgresql.o 2008 [ # # ]: 0 : if (ondisk.builder.catchange.xip != NULL)
2009 : 0 : pfree(ondisk.builder.catchange.xip);
3695 rhaas@postgresql.org 2010 : 0 : return false;
2011 : : }
2012 : :
2013 : : /*
2014 : : * Read the contents of the serialized snapshot to 'dest'.
2015 : : */
2016 : : static void
612 akapila@postgresql.o 2017 :CBC 19 : SnapBuildRestoreContents(int fd, char *dest, Size size, const char *path)
2018 : : {
2019 : : int readBytes;
2020 : :
2021 : 19 : pgstat_report_wait_start(WAIT_EVENT_SNAPBUILD_READ);
2022 : 19 : readBytes = read(fd, dest, size);
2023 : 19 : pgstat_report_wait_end();
2024 [ - + ]: 19 : if (readBytes != size)
2025 : : {
612 akapila@postgresql.o 2026 :UBC 0 : int save_errno = errno;
2027 : :
2028 : 0 : CloseTransientFile(fd);
2029 : :
2030 [ # # ]: 0 : if (readBytes < 0)
2031 : : {
2032 : 0 : errno = save_errno;
2033 [ # # ]: 0 : ereport(ERROR,
2034 : : (errcode_for_file_access(),
2035 : : errmsg("could not read file \"%s\": %m", path)));
2036 : : }
2037 : : else
2038 [ # # ]: 0 : ereport(ERROR,
2039 : : (errcode(ERRCODE_DATA_CORRUPTED),
2040 : : errmsg("could not read file \"%s\": read %d of %zu",
2041 : : path, readBytes, size)));
2042 : : }
612 akapila@postgresql.o 2043 :CBC 19 : }
2044 : :
2045 : : /*
2046 : : * Remove all serialized snapshots that are not required anymore because no
2047 : : * slot can need them. This doesn't actually have to run during a checkpoint,
2048 : : * but it's a convenient point to schedule this.
2049 : : *
2050 : : * NB: We run this during checkpoints even if logical decoding is disabled so
2051 : : * we cleanup old slots at some point after it got disabled.
2052 : : */
2053 : : void
3695 rhaas@postgresql.org 2054 : 1153 : CheckPointSnapBuild(void)
2055 : : {
2056 : : XLogRecPtr cutoff;
2057 : : XLogRecPtr redo;
2058 : : DIR *snap_dir;
2059 : : struct dirent *snap_de;
2060 : : char path[MAXPGPATH + 21];
2061 : :
2062 : : /*
2063 : : * We start off with a minimum of the last redo pointer. No new
2064 : : * replication slot will start before that, so that's a safe upper bound
2065 : : * for removal.
2066 : : */
2067 : 1153 : redo = GetRedoRecPtr();
2068 : :
2069 : : /* now check for the restart ptrs from existing slots */
2070 : 1153 : cutoff = ReplicationSlotsComputeLogicalRestartLSN();
2071 : :
2072 : : /* don't start earlier than the restart lsn */
2073 [ - + ]: 1153 : if (redo < cutoff)
3695 rhaas@postgresql.org 2074 :UBC 0 : cutoff = redo;
2075 : :
3574 andres@anarazel.de 2076 :CBC 1153 : snap_dir = AllocateDir("pg_logical/snapshots");
2077 [ + + ]: 3707 : while ((snap_de = ReadDir(snap_dir, "pg_logical/snapshots")) != NULL)
2078 : : {
2079 : : uint32 hi;
2080 : : uint32 lo;
2081 : : XLogRecPtr lsn;
2082 : : PGFileType de_type;
2083 : :
3695 rhaas@postgresql.org 2084 [ + + ]: 2554 : if (strcmp(snap_de->d_name, ".") == 0 ||
2085 [ + + ]: 1401 : strcmp(snap_de->d_name, "..") == 0)
2086 : 2306 : continue;
2087 : :
2560 peter_e@gmx.net 2088 : 248 : snprintf(path, sizeof(path), "pg_logical/snapshots/%s", snap_de->d_name);
590 michael@paquier.xyz 2089 : 248 : de_type = get_dirent_type(path, snap_de, false, DEBUG1);
2090 : :
2091 [ + - - + ]: 248 : if (de_type != PGFILETYPE_ERROR && de_type != PGFILETYPE_REG)
2092 : : {
3695 rhaas@postgresql.org 2093 [ # # ]:UBC 0 : elog(DEBUG1, "only regular files expected: %s", path);
2094 : 0 : continue;
2095 : : }
2096 : :
2097 : : /*
2098 : : * temporary filenames from SnapBuildSerialize() include the LSN and
2099 : : * everything but are postfixed by .$pid.tmp. We can just remove them
2100 : : * the same as other files because there can be none that are
2101 : : * currently being written that are older than cutoff.
2102 : : *
2103 : : * We just log a message if a file doesn't fit the pattern, it's
2104 : : * probably some editors lock/state file or similar...
2105 : : */
3695 rhaas@postgresql.org 2106 [ - + ]:CBC 248 : if (sscanf(snap_de->d_name, "%X-%X.snap", &hi, &lo) != 2)
2107 : : {
3695 rhaas@postgresql.org 2108 [ # # ]:UBC 0 : ereport(LOG,
2109 : : (errmsg("could not parse file name \"%s\"", path)));
2110 : 0 : continue;
2111 : : }
2112 : :
3695 rhaas@postgresql.org 2113 :CBC 248 : lsn = ((uint64) hi) << 32 | lo;
2114 : :
2115 : : /* check whether we still need it */
2116 [ + + + + ]: 248 : if (lsn < cutoff || cutoff == InvalidXLogRecPtr)
2117 : : {
2118 [ + + ]: 165 : elog(DEBUG1, "removing snapbuild snapshot %s", path);
2119 : :
2120 : : /*
2121 : : * It's not particularly harmful, though strange, if we can't
2122 : : * remove the file here. Don't prevent the checkpoint from
2123 : : * completing, that'd be a cure worse than the disease.
2124 : : */
2125 [ - + ]: 165 : if (unlink(path) < 0)
2126 : : {
3695 rhaas@postgresql.org 2127 [ # # ]:UBC 0 : ereport(LOG,
2128 : : (errcode_for_file_access(),
2129 : : errmsg("could not remove file \"%s\": %m",
2130 : : path)));
2131 : 0 : continue;
2132 : : }
2133 : : }
2134 : : }
3695 rhaas@postgresql.org 2135 :CBC 1153 : FreeDir(snap_dir);
2136 : 1153 : }
2137 : :
2138 : : /*
2139 : : * Check if a logical snapshot at the specified point has been serialized.
2140 : : */
2141 : : bool
11 akapila@postgresql.o 2142 :GNC 10 : SnapBuildSnapshotExists(XLogRecPtr lsn)
2143 : : {
2144 : : char path[MAXPGPATH];
2145 : : int ret;
2146 : : struct stat stat_buf;
2147 : :
2148 : 10 : sprintf(path, "pg_logical/snapshots/%X-%X.snap",
2149 : 10 : LSN_FORMAT_ARGS(lsn));
2150 : :
2151 : 10 : ret = stat(path, &stat_buf);
2152 : :
2153 [ + + - + ]: 10 : if (ret != 0 && errno != ENOENT)
11 akapila@postgresql.o 2154 [ # # ]:UNC 0 : ereport(ERROR,
2155 : : (errcode_for_file_access(),
2156 : : errmsg("could not stat file \"%s\": %m", path)));
2157 : :
11 akapila@postgresql.o 2158 :GNC 10 : return ret == 0;
2159 : : }
|