LCOV - differential code coverage report
Current view: top level - src/backend/replication/logical - snapbuild.c (source / functions) Coverage Total Hit UNC LBC UIC UBC GBC GIC GNC CBC EUB ECB DUB DCB
Current: Differential Code Coverage HEAD vs 15 Lines: 81.8 % 570 466 11 20 60 13 20 258 77 111 63 273 8 62
Current Date: 2023-04-08 17:13:01 Functions: 96.7 % 30 29 1 26 3 1 29
Baseline: 15 Line coverage date bins:
Baseline Date: 2023-04-08 15:09:40 (120,180] days: 84.6 % 13 11 2 1 9 1 1
Legend: Lines: hit not hit (180,240] days: 100.0 % 14 14 14 2
(240..) days: 81.2 % 543 441 9 20 60 13 20 257 54 110 63 270
Function coverage date bins:
(180,240] days: 100.0 % 1 1 1
(240..) days: 47.5 % 59 28 1 26 2 1 29

 Age         Owner                  TLA  Line data    Source code
                                  1                 : /*-------------------------------------------------------------------------
                                  2                 :  *
                                  3                 :  * snapbuild.c
                                  4                 :  *
                                  5                 :  *    Infrastructure for building historic catalog snapshots based on contents
                                  6                 :  *    of the WAL, for the purpose of decoding heapam.c style values in the
                                  7                 :  *    WAL.
                                  8                 :  *
                                  9                 :  * NOTES:
                                 10                 :  *
                                 11                 :  * We build snapshots which can *only* be used to read catalog contents and we
                                 12                 :  * do so by reading and interpreting the WAL stream. The aim is to build a
                                 13                 :  * snapshot that behaves the same as a freshly taken MVCC snapshot would have
                                 14                 :  * at the time the XLogRecord was generated.
                                 15                 :  *
                                 16                 :  * To build the snapshots we reuse the infrastructure built for Hot
                                 17                 :  * Standby. The in-memory snapshots we build look different than HS' because
                                 18                 :  * we have different needs. To successfully decode data from the WAL we only
                                 19                 :  * need to access catalog tables and (sys|rel|cat)cache, not the actual user
                                 20                 :  * tables since the data we decode is wholly contained in the WAL
                                 21                 :  * records. Also, our snapshots need to be different in comparison to normal
                                 22                 :  * MVCC ones because in contrast to those we cannot fully rely on the clog and
                                 23                 :  * pg_subtrans for information about committed transactions because they might
                                 24                 :  * commit in the future from the POV of the WAL entry we're currently
                                 25                 :  * decoding. This definition has the advantage that we only need to prevent
                                 26                 :  * removal of catalog rows, while normal table's rows can still be
                                 27                 :  * removed. This is achieved by using the replication slot mechanism.
                                 28                 :  *
                                 29                 :  * As the percentage of transactions modifying the catalog normally is fairly
                                 30                 :  * small in comparisons to ones only manipulating user data, we keep track of
                                 31                 :  * the committed catalog modifying ones inside [xmin, xmax) instead of keeping
                                 32                 :  * track of all running transactions like it's done in a normal snapshot. Note
                                 33                 :  * that we're generally only looking at transactions that have acquired an
                                 34                 :  * xid. That is we keep a list of transactions between snapshot->(xmin, xmax)
                                 35                 :  * that we consider committed, everything else is considered aborted/in
                                 36                 :  * progress. That also allows us not to care about subtransactions before they
                                 37                 :  * have committed which means this module, in contrast to HS, doesn't have to
                                 38                 :  * care about suboverflowed subtransactions and similar.
                                 39                 :  *
                                 40                 :  * One complexity of doing this is that to e.g. handle mixed DDL/DML
                                 41                 :  * transactions we need Snapshots that see intermediate versions of the
                                 42                 :  * catalog in a transaction. During normal operation this is achieved by using
                                 43                 :  * CommandIds/cmin/cmax. The problem with that however is that for space
                                 44                 :  * efficiency reasons only one value of that is stored
                                 45                 :  * (cf. combocid.c). Since combo CIDs are only available in memory we log
                                 46                 :  * additional information which allows us to get the original (cmin, cmax)
                                 47                 :  * pair during visibility checks. Check the reorderbuffer.c's comment above
                                 48                 :  * ResolveCminCmaxDuringDecoding() for details.
                                 49                 :  *
                                 50                 :  * To facilitate all this we need our own visibility routine, as the normal
                                 51                 :  * ones are optimized for different usecases.
                                 52                 :  *
                                 53                 :  * To replace the normal catalog snapshots with decoding ones use the
                                 54                 :  * SetupHistoricSnapshot() and TeardownHistoricSnapshot() functions.
                                 55                 :  *
                                 56                 :  *
                                 57                 :  *
                                 58                 :  * The snapbuild machinery is starting up in several stages, as illustrated
                                 59                 :  * by the following graph describing the SnapBuild->state transitions:
                                 60                 :  *
                                 61                 :  *         +-------------------------+
                                 62                 :  *    +----|         START           |-------------+
                                 63                 :  *    |    +-------------------------+             |
                                 64                 :  *    |                 |                          |
                                 65                 :  *    |                 |                          |
                                 66                 :  *    |        running_xacts #1                    |
                                 67                 :  *    |                 |                          |
                                 68                 :  *    |                 |                          |
                                 69                 :  *    |                 v                          |
                                 70                 :  *    |    +-------------------------+             v
                                 71                 :  *    |    |   BUILDING_SNAPSHOT     |------------>|
                                 72                 :  *    |    +-------------------------+             |
                                 73                 :  *    |                 |                          |
                                 74                 :  *    |                 |                          |
                                 75                 :  *    | running_xacts #2, xacts from #1 finished   |
                                 76                 :  *    |                 |                          |
                                 77                 :  *    |                 |                          |
                                 78                 :  *    |                 v                          |
                                 79                 :  *    |    +-------------------------+             v
                                 80                 :  *    |    |       FULL_SNAPSHOT     |------------>|
                                 81                 :  *    |    +-------------------------+             |
                                 82                 :  *    |                 |                          |
                                 83                 :  * running_xacts        |                      saved snapshot
                                 84                 :  * with zero xacts      |                 at running_xacts's lsn
                                 85                 :  *    |                 |                          |
                                 86                 :  *    | running_xacts with xacts from #2 finished  |
                                 87                 :  *    |                 |                          |
                                 88                 :  *    |                 v                          |
                                 89                 :  *    |    +-------------------------+             |
                                 90                 :  *    +--->|SNAPBUILD_CONSISTENT  |<------------+
                                 91                 :  *         +-------------------------+
                                 92                 :  *
                                 93                 :  * Initially the machinery is in the START stage. When an xl_running_xacts
                                 94                 :  * record is read that is sufficiently new (above the safe xmin horizon),
                                 95                 :  * there's a state transition. If there were no running xacts when the
                                 96                 :  * xl_running_xacts record was generated, we'll directly go into CONSISTENT
                                 97                 :  * state, otherwise we'll switch to the BUILDING_SNAPSHOT state. Having a full
                                 98                 :  * snapshot means that all transactions that start henceforth can be decoded
                                 99                 :  * in their entirety, but transactions that started previously can't. In
                                100                 :  * FULL_SNAPSHOT we'll switch into CONSISTENT once all those previously
                                101                 :  * running transactions have committed or aborted.
                                102                 :  *
                                103                 :  * Only transactions that commit after CONSISTENT state has been reached will
                                104                 :  * be replayed, even though they might have started while still in
                                105                 :  * FULL_SNAPSHOT. That ensures that we'll reach a point where no previous
                                106                 :  * changes has been exported, but all the following ones will be. That point
                                107                 :  * is a convenient point to initialize replication from, which is why we
                                108                 :  * export a snapshot at that point, which *can* be used to read normal data.
                                109                 :  *
                                110                 :  * Copyright (c) 2012-2023, PostgreSQL Global Development Group
                                111                 :  *
                                112                 :  * IDENTIFICATION
                                113                 :  *    src/backend/replication/logical/snapbuild.c
                                114                 :  *
                                115                 :  *-------------------------------------------------------------------------
                                116                 :  */
                                117                 : 
                                118                 : #include "postgres.h"
                                119                 : 
                                120                 : #include <sys/stat.h>
                                121                 : #include <unistd.h>
                                122                 : 
                                123                 : #include "access/heapam_xlog.h"
                                124                 : #include "access/transam.h"
                                125                 : #include "access/xact.h"
                                126                 : #include "common/file_utils.h"
                                127                 : #include "miscadmin.h"
                                128                 : #include "pgstat.h"
                                129                 : #include "replication/logical.h"
                                130                 : #include "replication/reorderbuffer.h"
                                131                 : #include "replication/snapbuild.h"
                                132                 : #include "storage/block.h"        /* debugging output */
                                133                 : #include "storage/fd.h"
                                134                 : #include "storage/lmgr.h"
                                135                 : #include "storage/proc.h"
                                136                 : #include "storage/procarray.h"
                                137                 : #include "storage/standby.h"
                                138                 : #include "utils/builtins.h"
                                139                 : #include "utils/memutils.h"
                                140                 : #include "utils/snapmgr.h"
                                141                 : #include "utils/snapshot.h"
                                142                 : 
                                143                 : /*
                                144                 :  * This struct contains the current state of the snapshot building
                                145                 :  * machinery. Besides a forward declaration in the header, it is not exposed
                                146                 :  * to the public, so we can easily change its contents.
                                147                 :  */
                                148                 : struct SnapBuild
                                149                 : {
                                150                 :     /* how far are we along building our first full snapshot */
                                151                 :     SnapBuildState state;
                                152                 : 
                                153                 :     /* private memory context used to allocate memory for this module. */
                                154                 :     MemoryContext context;
                                155                 : 
                                156                 :     /* all transactions < than this have committed/aborted */
                                157                 :     TransactionId xmin;
                                158                 : 
                                159                 :     /* all transactions >= than this are uncommitted */
                                160                 :     TransactionId xmax;
                                161                 : 
                                162                 :     /*
                                163                 :      * Don't replay commits from an LSN < this LSN. This can be set externally
                                164                 :      * but it will also be advanced (never retreat) from within snapbuild.c.
                                165                 :      */
                                166                 :     XLogRecPtr  start_decoding_at;
                                167                 : 
                                168                 :     /*
                                169                 :      * LSN at which two-phase decoding was enabled or LSN at which we found a
                                170                 :      * consistent point at the time of slot creation.
                                171                 :      *
                                172                 :      * The prepared transactions, that were skipped because previously
                                173                 :      * two-phase was not enabled or are not covered by initial snapshot, need
                                174                 :      * to be sent later along with commit prepared and they must be before
                                175                 :      * this point.
                                176                 :      */
                                177                 :     XLogRecPtr  two_phase_at;
                                178                 : 
                                179                 :     /*
                                180                 :      * Don't start decoding WAL until the "xl_running_xacts" information
                                181                 :      * indicates there are no running xids with an xid smaller than this.
                                182                 :      */
                                183                 :     TransactionId initial_xmin_horizon;
                                184                 : 
                                185                 :     /* Indicates if we are building full snapshot or just catalog one. */
                                186                 :     bool        building_full_snapshot;
                                187                 : 
                                188                 :     /*
                                189                 :      * Snapshot that's valid to see the catalog state seen at this moment.
                                190                 :      */
                                191                 :     Snapshot    snapshot;
                                192                 : 
                                193                 :     /*
                                194                 :      * LSN of the last location we are sure a snapshot has been serialized to.
                                195                 :      */
                                196                 :     XLogRecPtr  last_serialized_snapshot;
                                197                 : 
                                198                 :     /*
                                199                 :      * The reorderbuffer we need to update with usable snapshots et al.
                                200                 :      */
                                201                 :     ReorderBuffer *reorder;
                                202                 : 
                                203                 :     /*
                                204                 :      * TransactionId at which the next phase of initial snapshot building will
                                205                 :      * happen. InvalidTransactionId if not known (i.e. SNAPBUILD_START), or
                                206                 :      * when no next phase necessary (SNAPBUILD_CONSISTENT).
                                207                 :      */
                                208                 :     TransactionId next_phase_at;
                                209                 : 
                                210                 :     /*
                                211                 :      * Array of transactions which could have catalog changes that committed
                                212                 :      * between xmin and xmax.
                                213                 :      */
                                214                 :     struct
                                215                 :     {
                                216                 :         /* number of committed transactions */
                                217                 :         size_t      xcnt;
                                218                 : 
                                219                 :         /* available space for committed transactions */
                                220                 :         size_t      xcnt_space;
                                221                 : 
                                222                 :         /*
                                223                 :          * Until we reach a CONSISTENT state, we record commits of all
                                224                 :          * transactions, not just the catalog changing ones. Record when that
                                225                 :          * changes so we know we cannot export a snapshot safely anymore.
                                226                 :          */
                                227                 :         bool        includes_all_transactions;
                                228                 : 
                                229                 :         /*
                                230                 :          * Array of committed transactions that have modified the catalog.
                                231                 :          *
                                232                 :          * As this array is frequently modified we do *not* keep it in
                                233                 :          * xidComparator order. Instead we sort the array when building &
                                234                 :          * distributing a snapshot.
                                235                 :          *
                                236                 :          * TODO: It's unclear whether that reasoning has much merit. Every
                                237                 :          * time we add something here after becoming consistent will also
                                238                 :          * require distributing a snapshot. Storing them sorted would
                                239                 :          * potentially also make it easier to purge (but more complicated wrt
                                240                 :          * wraparound?). Should be improved if sorting while building the
                                241                 :          * snapshot shows up in profiles.
                                242                 :          */
                                243                 :         TransactionId *xip;
                                244                 :     }           committed;
                                245                 : 
                                246                 :     /*
                                247                 :      * Array of transactions and subtransactions that had modified catalogs
                                248                 :      * and were running when the snapshot was serialized.
                                249                 :      *
                                250                 :      * We normally rely on some WAL record types such as HEAP2_NEW_CID to know
                                251                 :      * if the transaction has changed the catalog. But it could happen that
                                252                 :      * the logical decoding decodes only the commit record of the transaction
                                253                 :      * after restoring the previously serialized snapshot in which case we
                                254                 :      * will miss adding the xid to the snapshot and end up looking at the
                                255                 :      * catalogs with the wrong snapshot.
                                256                 :      *
                                257                 :      * Now to avoid the above problem, we serialize the transactions that had
                                258                 :      * modified the catalogs and are still running at the time of snapshot
                                259                 :      * serialization. We fill this array while restoring the snapshot and then
                                260                 :      * refer it while decoding commit to ensure if the xact has modified the
                                261                 :      * catalog. We discard this array when all the xids in the list become old
                                262                 :      * enough to matter. See SnapBuildPurgeOlderTxn for details.
                                263                 :      */
                                264                 :     struct
                                265                 :     {
                                266                 :         /* number of transactions */
                                267                 :         size_t      xcnt;
                                268                 : 
                                269                 :         /* This array must be sorted in xidComparator order */
                                270                 :         TransactionId *xip;
                                271                 :     }           catchange;
                                272                 : };
                                273                 : 
                                274                 : /*
                                275                 :  * Starting a transaction -- which we need to do while exporting a snapshot --
                                276                 :  * removes knowledge about the previously used resowner, so we save it here.
                                277                 :  */
                                278                 : static ResourceOwner SavedResourceOwnerDuringExport = NULL;
                                279                 : static bool ExportInProgress = false;
                                280                 : 
                                281                 : /* ->committed and ->catchange manipulation */
                                282                 : static void SnapBuildPurgeOlderTxn(SnapBuild *builder);
                                283                 : 
                                284                 : /* snapshot building/manipulation/distribution functions */
                                285                 : static Snapshot SnapBuildBuildSnapshot(SnapBuild *builder);
                                286                 : 
                                287                 : static void SnapBuildFreeSnapshot(Snapshot snap);
                                288                 : 
                                289                 : static void SnapBuildSnapIncRefcount(Snapshot snap);
                                290                 : 
                                291                 : static void SnapBuildDistributeNewCatalogSnapshot(SnapBuild *builder, XLogRecPtr lsn);
                                292                 : 
                                293                 : static inline bool SnapBuildXidHasCatalogChanges(SnapBuild *builder, TransactionId xid,
                                294                 :                                                  uint32 xinfo);
                                295                 : 
                                296                 : /* xlog reading helper functions for SnapBuildProcessRunningXacts */
                                297                 : static bool SnapBuildFindSnapshot(SnapBuild *builder, XLogRecPtr lsn, xl_running_xacts *running);
                                298                 : static void SnapBuildWaitSnapshot(xl_running_xacts *running, TransactionId cutoff);
                                299                 : 
                                300                 : /* serialization functions */
                                301                 : static void SnapBuildSerialize(SnapBuild *builder, XLogRecPtr lsn);
                                302                 : static bool SnapBuildRestore(SnapBuild *builder, XLogRecPtr lsn);
                                303                 : static void SnapBuildRestoreContents(int fd, char *dest, Size size, const char *path);
                                304                 : 
                                305                 : /*
                                306                 :  * Allocate a new snapshot builder.
                                307                 :  *
                                308                 :  * xmin_horizon is the xid >= which we can be sure no catalog rows have been
                                309                 :  * removed, start_lsn is the LSN >= we want to replay commits.
                                310                 :  */
                                311                 : SnapBuild *
 3324 rhaas                     312 GIC         821 : AllocateSnapshotBuilder(ReorderBuffer *reorder,
                                313                 :                         TransactionId xmin_horizon,
 2173 andres                    314 ECB             :                         XLogRecPtr start_lsn,
                                315                 :                         bool need_full_snapshot,
                                316                 :                         XLogRecPtr two_phase_at)
                                317                 : {
                                318                 :     MemoryContext context;
                                319                 :     MemoryContext oldcontext;
                                320                 :     SnapBuild  *builder;
                                321                 : 
                                322                 :     /* allocate memory in own context, to have better accountability */
 3324 rhaas                     323 GIC         821 :     context = AllocSetContextCreate(CurrentMemoryContext,
                                324                 :                                     "snapshot builder context",
 2416 tgl                       325 ECB             :                                     ALLOCSET_DEFAULT_SIZES);
 3324 rhaas                     326 GIC         821 :     oldcontext = MemoryContextSwitchTo(context);
                                327                 : 
 3324 rhaas                     328 CBC         821 :     builder = palloc0(sizeof(SnapBuild));
                                329                 : 
                                330             821 :     builder->state = SNAPBUILD_START;
 3324 rhaas                     331 GIC         821 :     builder->context = context;
 3324 rhaas                     332 CBC         821 :     builder->reorder = reorder;
 3324 rhaas                     333 ECB             :     /* Other struct members initialized by zeroing via palloc0 above */
                                334                 : 
 3324 rhaas                     335 GIC         821 :     builder->committed.xcnt = 0;
 2118 tgl                       336             821 :     builder->committed.xcnt_space = 128; /* arbitrary number */
 3324 rhaas                     337 CBC         821 :     builder->committed.xip =
                                338             821 :         palloc0(builder->committed.xcnt_space * sizeof(TransactionId));
                                339             821 :     builder->committed.includes_all_transactions = true;
 3257 rhaas                     340 ECB             : 
  241 akapila                   341 GNC         821 :     builder->catchange.xcnt = 0;
                                342             821 :     builder->catchange.xip = NULL;
                                343                 : 
 3324 rhaas                     344 CBC         821 :     builder->initial_xmin_horizon = xmin_horizon;
 3230 andres                    345 GIC         821 :     builder->start_decoding_at = start_lsn;
 2173 andres                    346 CBC         821 :     builder->building_full_snapshot = need_full_snapshot;
  634 akapila                   347             821 :     builder->two_phase_at = two_phase_at;
                                348                 : 
 3324 rhaas                     349             821 :     MemoryContextSwitchTo(oldcontext);
 3324 rhaas                     350 ECB             : 
 3324 rhaas                     351 CBC         821 :     return builder;
                                352                 : }
 3324 rhaas                     353 ECB             : 
                                354                 : /*
                                355                 :  * Free a snapshot builder.
                                356                 :  */
                                357                 : void
 3324 rhaas                     358 GIC         679 : FreeSnapshotBuilder(SnapBuild *builder)
                                359                 : {
 3324 rhaas                     360 CBC         679 :     MemoryContext context = builder->context;
                                361                 : 
 3324 rhaas                     362 ECB             :     /* free snapshot explicitly, that contains some error checking */
 3324 rhaas                     363 GIC         679 :     if (builder->snapshot != NULL)
                                364                 :     {
 3324 rhaas                     365 CBC         184 :         SnapBuildSnapDecRefcount(builder->snapshot);
 3324 rhaas                     366 GIC         184 :         builder->snapshot = NULL;
 3324 rhaas                     367 ECB             :     }
                                368                 : 
                                369                 :     /* other resources are deallocated via memory context reset */
 3324 rhaas                     370 GIC         679 :     MemoryContextDelete(context);
                                371             679 : }
                                372                 : 
                                373                 : /*
                                374                 :  * Free an unreferenced snapshot that has previously been built by us.
 3324 rhaas                     375 ECB             :  */
                                376                 : static void
 3324 rhaas                     377 GIC        1148 : SnapBuildFreeSnapshot(Snapshot snap)
 3324 rhaas                     378 ECB             : {
                                379                 :     /* make sure we don't get passed an external snapshot */
 1539 andres                    380 GIC        1148 :     Assert(snap->snapshot_type == SNAPSHOT_HISTORIC_MVCC);
 3324 rhaas                     381 ECB             : 
                                382                 :     /* make sure nobody modified our snapshot */
 3324 rhaas                     383 CBC        1148 :     Assert(snap->curcid == FirstCommandId);
                                384            1148 :     Assert(!snap->suboverflowed);
 3324 rhaas                     385 GIC        1148 :     Assert(!snap->takenDuringRecovery);
 2915 heikki.linnakangas        386            1148 :     Assert(snap->regd_count == 0);
 3324 rhaas                     387 ECB             : 
 3324 rhaas                     388 EUB             :     /* slightly more likely, so it's checked even without c-asserts */
 3324 rhaas                     389 GIC        1148 :     if (snap->copied)
 3324 rhaas                     390 LBC           0 :         elog(ERROR, "cannot free a copied snapshot");
 3324 rhaas                     391 EUB             : 
 3324 rhaas                     392 GIC        1148 :     if (snap->active_count)
 3324 rhaas                     393 LBC           0 :         elog(ERROR, "cannot free an active snapshot");
 3324 rhaas                     394 ECB             : 
 3324 rhaas                     395 GIC        1148 :     pfree(snap);
                                396            1148 : }
                                397                 : 
                                398                 : /*
                                399                 :  * In which state of snapshot building are we?
 3324 rhaas                     400 ECB             :  */
                                401                 : SnapBuildState
 3324 rhaas                     402 CBC     2409461 : SnapBuildCurrentState(SnapBuild *builder)
                                403                 : {
 3324 rhaas                     404 GIC     2409461 :     return builder->state;
                                405                 : }
                                406                 : 
                                407                 : /*
                                408                 :  * Return the LSN at which the two-phase decoding was first enabled.
  769 akapila                   409 ECB             :  */
                                410                 : XLogRecPtr
  634 akapila                   411 CBC          29 : SnapBuildGetTwoPhaseAt(SnapBuild *builder)
                                412                 : {
  634 akapila                   413 GIC          29 :     return builder->two_phase_at;
                                414                 : }
                                415                 : 
                                416                 : /*
                                417                 :  * Set the LSN at which two-phase decoding is enabled.
  634 akapila                   418 ECB             :  */
                                419                 : void
  634 akapila                   420 CBC           4 : SnapBuildSetTwoPhaseAt(SnapBuild *builder, XLogRecPtr ptr)
  634 akapila                   421 ECB             : {
  634 akapila                   422 GIC           4 :     builder->two_phase_at = ptr;
  769                           423               4 : }
                                424                 : 
                                425                 : /*
                                426                 :  * Should the contents of transaction ending at 'ptr' be decoded?
 3324 rhaas                     427 ECB             :  */
                                428                 : bool
 3324 rhaas                     429 CBC      497915 : SnapBuildXactNeedsSkip(SnapBuild *builder, XLogRecPtr ptr)
                                430                 : {
 3230 andres                    431 GIC      497915 :     return ptr < builder->start_decoding_at;
                                432                 : }
                                433                 : 
                                434                 : /*
                                435                 :  * Increase refcount of a snapshot.
                                436                 :  *
                                437                 :  * This is used when handing out a snapshot to some external resource or when
                                438                 :  * adding a Snapshot as builder->snapshot.
 3324 rhaas                     439 ECB             :  */
                                440                 : static void
 3324 rhaas                     441 CBC        4907 : SnapBuildSnapIncRefcount(Snapshot snap)
 3324 rhaas                     442 ECB             : {
 3324 rhaas                     443 GIC        4907 :     snap->active_count++;
                                444            4907 : }
                                445                 : 
                                446                 : /*
                                447                 :  * Decrease refcount of a snapshot and free if the refcount reaches zero.
                                448                 :  *
                                449                 :  * Externally visible, so that external resources that have been handed an
                                450                 :  * IncRef'ed Snapshot can adjust its refcount easily.
 3324 rhaas                     451 ECB             :  */
                                452                 : void
 3324 rhaas                     453 GIC        4742 : SnapBuildSnapDecRefcount(Snapshot snap)
 3324 rhaas                     454 ECB             : {
                                455                 :     /* make sure we don't get passed an external snapshot */
 1539 andres                    456 GIC        4742 :     Assert(snap->snapshot_type == SNAPSHOT_HISTORIC_MVCC);
 3324 rhaas                     457 ECB             : 
                                458                 :     /* make sure nobody modified our snapshot */
 3324 rhaas                     459 CBC        4742 :     Assert(snap->curcid == FirstCommandId);
 3324 rhaas                     460 GIC        4742 :     Assert(!snap->suboverflowed);
 3324 rhaas                     461 CBC        4742 :     Assert(!snap->takenDuringRecovery);
                                462                 : 
 2915 heikki.linnakangas        463            4742 :     Assert(snap->regd_count == 0);
                                464                 : 
 2915 heikki.linnakangas        465 GIC        4742 :     Assert(snap->active_count > 0);
 3324 rhaas                     466 ECB             : 
 3286 heikki.linnakangas        467 EUB             :     /* slightly more likely, so it's checked even without casserts */
 3324 rhaas                     468 GIC        4742 :     if (snap->copied)
 3324 rhaas                     469 LBC           0 :         elog(ERROR, "cannot free a copied snapshot");
 3324 rhaas                     470 ECB             : 
 3324 rhaas                     471 CBC        4742 :     snap->active_count--;
 2915 heikki.linnakangas        472            4742 :     if (snap->active_count == 0)
 3324 rhaas                     473 GIC        1148 :         SnapBuildFreeSnapshot(snap);
                                474            4742 : }
                                475                 : 
                                476                 : /*
                                477                 :  * Build a new snapshot, based on currently committed catalog-modifying
                                478                 :  * transactions.
                                479                 :  *
                                480                 :  * In-progress transactions with catalog access are *not* allowed to modify
                                481                 :  * these snapshots; they have to copy them and fill in appropriate ->curcid
                                482                 :  * and ->subxip/subxcnt values.
 3324 rhaas                     483 ECB             :  */
                                484                 : static Snapshot
 2125 andres                    485 GIC        1453 : SnapBuildBuildSnapshot(SnapBuild *builder)
                                486                 : {
                                487                 :     Snapshot    snapshot;
 3324 rhaas                     488 ECB             :     Size        ssize;
                                489                 : 
 3324 rhaas                     490 CBC        1453 :     Assert(builder->state >= SNAPBUILD_FULL_SNAPSHOT);
 3324 rhaas                     491 ECB             : 
 3324 rhaas                     492 CBC        1453 :     ssize = sizeof(SnapshotData)
 3324 rhaas                     493 GIC        1453 :         + sizeof(TransactionId) * builder->committed.xcnt
 3324 rhaas                     494 CBC        1453 :         + sizeof(TransactionId) * 1 /* toplevel xid */ ;
                                495                 : 
                                496            1453 :     snapshot = MemoryContextAllocZero(builder->context, ssize);
                                497                 : 
 1539 andres                    498 GIC        1453 :     snapshot->snapshot_type = SNAPSHOT_HISTORIC_MVCC;
                                499                 : 
                                500                 :     /*
                                501                 :      * We misuse the original meaning of SnapshotData's xip and subxip fields
                                502                 :      * to make the more fitting for our needs.
                                503                 :      *
                                504                 :      * In the 'xip' array we store transactions that have to be treated as
                                505                 :      * committed. Since we will only ever look at tuples from transactions
                                506                 :      * that have modified the catalog it's more efficient to store those few
                                507                 :      * that exist between xmin and xmax (frequently there are none).
                                508                 :      *
                                509                 :      * Snapshots that are used in transactions that have modified the catalog
                                510                 :      * also use the 'subxip' array to store their toplevel xid and all the
                                511                 :      * subtransaction xids so we can recognize when we need to treat rows as
                                512                 :      * visible that are not in xip but still need to be visible. Subxip only
                                513                 :      * gets filled when the transaction is copied into the context of a
                                514                 :      * catalog modifying transaction since we otherwise share a snapshot
                                515                 :      * between transactions. As long as a txn hasn't modified the catalog it
                                516                 :      * doesn't need to treat any uncommitted rows as visible, so there is no
                                517                 :      * need for those xids.
                                518                 :      *
 3324 rhaas                     519 ECB             :      * Both arrays are qsort'ed so that we can use bsearch() on them.
                                520                 :      */
 3324 rhaas                     521 GIC        1453 :     Assert(TransactionIdIsNormal(builder->xmin));
 3324 rhaas                     522 CBC        1453 :     Assert(TransactionIdIsNormal(builder->xmax));
 3324 rhaas                     523 ECB             : 
 3324 rhaas                     524 GIC        1453 :     snapshot->xmin = builder->xmin;
                                525            1453 :     snapshot->xmax = builder->xmax;
 3324 rhaas                     526 ECB             : 
                                527                 :     /* store all transactions to be treated as committed by this snapshot */
 3324 rhaas                     528 CBC        1453 :     snapshot->xip =
                                529            1453 :         (TransactionId *) ((char *) snapshot + sizeof(SnapshotData));
                                530            1453 :     snapshot->xcnt = builder->committed.xcnt;
                                531            1453 :     memcpy(snapshot->xip,
 3324 rhaas                     532 GIC        1453 :            builder->committed.xip,
                                533            1453 :            builder->committed.xcnt * sizeof(TransactionId));
 3324 rhaas                     534 ECB             : 
                                535                 :     /* sort so we can bsearch() */
 3324 rhaas                     536 GIC        1453 :     qsort(snapshot->xip, snapshot->xcnt, sizeof(TransactionId), xidComparator);
                                537                 : 
                                538                 :     /*
                                539                 :      * Initially, subxip is empty, i.e. it's a snapshot to be used by
                                540                 :      * transactions that don't modify the catalog. Will be filled by
 3324 rhaas                     541 ECB             :      * ReorderBufferCopySnap() if necessary.
                                542                 :      */
 3324 rhaas                     543 GIC        1453 :     snapshot->subxcnt = 0;
 3324 rhaas                     544 CBC        1453 :     snapshot->subxip = NULL;
 3324 rhaas                     545 ECB             : 
 3324 rhaas                     546 CBC        1453 :     snapshot->suboverflowed = false;
                                547            1453 :     snapshot->takenDuringRecovery = false;
                                548            1453 :     snapshot->copied = false;
                                549            1453 :     snapshot->curcid = FirstCommandId;
                                550            1453 :     snapshot->active_count = 0;
 2915 heikki.linnakangas        551 GIC        1453 :     snapshot->regd_count = 0;
  965 andres                    552 CBC        1453 :     snapshot->snapXactCompletionCount = 0;
                                553                 : 
 3324 rhaas                     554 GIC        1453 :     return snapshot;
                                555                 : }
                                556                 : 
                                557                 : /*
                                558                 :  * Build the initial slot snapshot and convert it to a normal snapshot that
                                559                 :  * is understood by HeapTupleSatisfiesMVCC.
                                560                 :  *
                                561                 :  * The snapshot will be usable directly in current transaction or exported
                                562                 :  * for loading in different transaction.
 3324 rhaas                     563 ECB             :  */
                                564                 : Snapshot
 2205 tgl                       565 GIC         155 : SnapBuildInitialSnapshot(SnapBuild *builder)
                                566                 : {
                                567                 :     Snapshot    snap;
                                568                 :     TransactionId xid;
                                569                 :     TransactionId safeXid;
 3324 rhaas                     570 ECB             :     TransactionId *newxip;
 3324 rhaas                     571 GIC         155 :     int         newxcnt = 0;
 3324 rhaas                     572 ECB             : 
 2205 tgl                       573 GIC         155 :     Assert(XactIsoLevel == XACT_REPEATABLE_READ);
  139 akapila                   574 GNC         155 :     Assert(builder->building_full_snapshot);
                                575                 : 
                                576                 :     /* don't allow older snapshots */
                                577             155 :     InvalidateCatalogSnapshot(); /* about to overwrite MyProc->xmin */
                                578             155 :     if (HaveRegisteredOrActiveSnapshot())
  139 akapila                   579 UNC           0 :         elog(ERROR, "cannot build an initial slot snapshot when snapshots exist");
  139 akapila                   580 GNC         155 :     Assert(!HistoricSnapshotActive());
                                581                 : 
 3324 rhaas                     582 CBC         155 :     if (builder->state != SNAPBUILD_CONSISTENT)
 2208 peter_e                   583 LBC           0 :         elog(ERROR, "cannot build an initial slot snapshot before reaching a consistent state");
 3324 rhaas                     584 EUB             : 
 3324 rhaas                     585 CBC         155 :     if (!builder->committed.includes_all_transactions)
 2208 peter_e                   586 UIC           0 :         elog(ERROR, "cannot build an initial slot snapshot, not all transactions are monitored anymore");
 3324 rhaas                     587 ECB             : 
 3324 rhaas                     588 EUB             :     /* so we don't overwrite the existing value */
  969 andres                    589 GIC         155 :     if (TransactionIdIsValid(MyProc->xmin))
  969 andres                    590 LBC           0 :         elog(ERROR, "cannot build an initial slot snapshot when MyProc->xmin already is valid");
 3324 rhaas                     591 EUB             : 
 2125 andres                    592 GIC         155 :     snap = SnapBuildBuildSnapshot(builder);
                                593                 : 
 3324 rhaas                     594 ECB             :     /*
 3324 rhaas                     595 EUB             :      * We know that snap->xmin is alive, enforced by the logical xmin
                                596                 :      * mechanism. Due to that we can do this without locks, we're only
 3324 rhaas                     597 ECB             :      * changing our own value.
                                598                 :      *
                                599                 :      * Building an initial snapshot is expensive and an unenforced xmin
                                600                 :      * horizon would have bad consequences, therefore always double-check that
                                601                 :      * the horizon is enforced.
                                602                 :      */
  139 akapila                   603 GNC         155 :     LWLockAcquire(ProcArrayLock, LW_SHARED);
                                604             155 :     safeXid = GetOldestSafeDecodingTransactionId(false);
                                605             155 :     LWLockRelease(ProcArrayLock);
                                606                 : 
                                607             155 :     if (TransactionIdFollows(safeXid, snap->xmin))
  139 akapila                   608 UNC           0 :         elog(ERROR, "cannot build an initial slot snapshot as oldest safe xid %u follows snapshot's xmin %u",
                                609                 :              safeXid, snap->xmin);
 2177 andres                    610 ECB             : 
  969 andres                    611 GIC         155 :     MyProc->xmin = snap->xmin;
 3324 rhaas                     612 ECB             : 
 3324 rhaas                     613 EUB             :     /* allocate in transaction context */
                                614                 :     newxip = (TransactionId *)
 3324 rhaas                     615 GIC         155 :         palloc(sizeof(TransactionId) * GetMaxSnapshotXidCount());
 3324 rhaas                     616 ECB             : 
                                617                 :     /*
                                618                 :      * snapbuild.c builds transactions in an "inverted" manner, which means it
                                619                 :      * stores committed transactions in ->xip, not ones in progress. Build a
                                620                 :      * classical snapshot by marking all non-committed transactions as
                                621                 :      * in-progress. This can be expensive.
                                622                 :      */
 3324 rhaas                     623 GIC         155 :     for (xid = snap->xmin; NormalTransactionIdPrecedes(xid, snap->xmax);)
                                624                 :     {
                                625                 :         void       *test;
                                626                 : 
                                627                 :         /*
 3324 rhaas                     628 ECB             :          * Check whether transaction committed using the decoding snapshot
                                629                 :          * meaning of ->xip.
                                630                 :          */
 3324 rhaas                     631 UIC           0 :         test = bsearch(&xid, snap->xip, snap->xcnt,
                                632                 :                        sizeof(TransactionId), xidComparator);
                                633                 : 
                                634               0 :         if (test == NULL)
                                635                 :         {
 3324 rhaas                     636 UBC           0 :             if (newxcnt >= GetMaxSnapshotXidCount())
 2208 peter_e                   637 UIC           0 :                 ereport(ERROR,
                                638                 :                         (errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),
 2153 bruce                     639 EUB             :                          errmsg("initial slot snapshot too large")));
                                640                 : 
 3324 rhaas                     641 UBC           0 :             newxip[newxcnt++] = xid;
 3324 rhaas                     642 EUB             :         }
                                643                 : 
 3324 rhaas                     644 UIC           0 :         TransactionIdAdvance(xid);
                                645                 :     }
 3324 rhaas                     646 EUB             : 
                                647                 :     /* adjust remaining snapshot fields as needed */
 1509 michael                   648 GIC         155 :     snap->snapshot_type = SNAPSHOT_MVCC;
 3324 rhaas                     649 GBC         155 :     snap->xcnt = newxcnt;
 3324 rhaas                     650 GIC         155 :     snap->xip = newxip;
                                651                 : 
 2208 peter_e                   652             155 :     return snap;
 2208 peter_e                   653 ECB             : }
                                654                 : 
                                655                 : /*
                                656                 :  * Export a snapshot so it can be set in another session with SET TRANSACTION
                                657                 :  * SNAPSHOT.
                                658                 :  *
                                659                 :  * For that we need to start a transaction in the current backend as the
                                660                 :  * importing side checks whether the source transaction is still open to make
                                661                 :  * sure the xmin horizon hasn't advanced since then.
                                662                 :  */
                                663                 : const char *
 2208 peter_e                   664 UIC           0 : SnapBuildExportSnapshot(SnapBuild *builder)
                                665                 : {
                                666                 :     Snapshot    snap;
                                667                 :     char       *snapname;
                                668                 : 
 2208 peter_e                   669 UBC           0 :     if (IsTransactionOrTransactionBlock())
 2208 peter_e                   670 UIC           0 :         elog(ERROR, "cannot export a snapshot from within a transaction");
                                671                 : 
                                672               0 :     if (SavedResourceOwnerDuringExport)
                                673               0 :         elog(ERROR, "can only export one snapshot at a time");
 2208 peter_e                   674 EUB             : 
 2208 peter_e                   675 UBC           0 :     SavedResourceOwnerDuringExport = CurrentResourceOwner;
 2208 peter_e                   676 UIC           0 :     ExportInProgress = true;
 2208 peter_e                   677 EUB             : 
 2208 peter_e                   678 UBC           0 :     StartTransactionCommand();
                                679                 : 
 2208 peter_e                   680 EUB             :     /* There doesn't seem to a nice API to set these */
 2208 peter_e                   681 UBC           0 :     XactIsoLevel = XACT_REPEATABLE_READ;
 2208 peter_e                   682 UIC           0 :     XactReadOnly = true;
 2208 peter_e                   683 EUB             : 
 2205 tgl                       684 UIC           0 :     snap = SnapBuildInitialSnapshot(builder);
                                685                 : 
 3324 rhaas                     686 EUB             :     /*
 2208 peter_e                   687                 :      * now that we've built a plain snapshot, make it active and use the
                                688                 :      * normal mechanisms for exporting it
 3324 rhaas                     689                 :      */
 3324 rhaas                     690 UIC           0 :     snapname = ExportSnapshot(snap);
                                691                 : 
                                692               0 :     ereport(LOG,
                                693                 :             (errmsg_plural("exported logical decoding snapshot: \"%s\" with %u transaction ID",
                                694                 :                            "exported logical decoding snapshot: \"%s\" with %u transaction IDs",
 3071 peter_e                   695 EUB             :                            snap->xcnt,
                                696                 :                            snapname, snap->xcnt)));
 3324 rhaas                     697 UBC           0 :     return snapname;
                                698                 : }
                                699                 : 
                                700                 : /*
                                701                 :  * Ensure there is a snapshot and if not build one for current transaction.
 2559 simon                     702 EUB             :  */
                                703                 : Snapshot
  195 akapila                   704 GNC           6 : SnapBuildGetOrBuildSnapshot(SnapBuild *builder)
                                705                 : {
 2559 simon                     706 GIC           6 :     Assert(builder->state == SNAPBUILD_CONSISTENT);
                                707                 : 
                                708                 :     /* only build a new snapshot if we don't have a prebuilt one */
 2559 simon                     709 CBC           6 :     if (builder->snapshot == NULL)
                                710                 :     {
 2125 andres                    711 LBC           0 :         builder->snapshot = SnapBuildBuildSnapshot(builder);
                                712                 :         /* increase refcount for the snapshot builder */
 2559 simon                     713 UIC           0 :         SnapBuildSnapIncRefcount(builder->snapshot);
 2559 simon                     714 ECB             :     }
                                715                 : 
 2559 simon                     716 GBC           6 :     return builder->snapshot;
                                717                 : }
 2559 simon                     718 EUB             : 
                                719                 : /*
                                720                 :  * Reset a previously SnapBuildExportSnapshot()'ed snapshot if there is
 3324 rhaas                     721 ECB             :  * any. Aborts the previously started transaction and resets the resource
                                722                 :  * owner back to its original value.
                                723                 :  */
                                724                 : void
 2794 andres                    725 GIC        3912 : SnapBuildClearExportedSnapshot(void)
                                726                 : {
                                727                 :     ResourceOwner tmpResOwner;
                                728                 : 
                                729                 :     /* nothing exported, that is the usual case */
 3324 rhaas                     730 CBC        3912 :     if (!ExportInProgress)
 3324 rhaas                     731 GIC        3912 :         return;
                                732                 : 
 3324 rhaas                     733 UIC           0 :     if (!IsTransactionState())
                                734               0 :         elog(ERROR, "clearing exported snapshot in wrong transaction state");
 3324 rhaas                     735 ECB             : 
  538 michael                   736                 :     /*
                                737                 :      * AbortCurrentTransaction() takes care of resetting the snapshot state,
  538 michael                   738 EUB             :      * so remember SavedResourceOwnerDuringExport.
                                739                 :      */
  538 michael                   740 UIC           0 :     tmpResOwner = SavedResourceOwnerDuringExport;
                                741                 : 
                                742                 :     /* make sure nothing could have ever happened */
 3324 rhaas                     743               0 :     AbortCurrentTransaction();
                                744                 : 
  538 michael                   745 UBC           0 :     CurrentResourceOwner = tmpResOwner;
                                746                 : }
                                747                 : 
  538 michael                   748 EUB             : /*
                                749                 :  * Clear snapshot export state during transaction abort.
                                750                 :  */
                                751                 : void
  538 michael                   752 GIC       20125 : SnapBuildResetExportedSnapshotState(void)
                                753                 : {
 3324 rhaas                     754           20125 :     SavedResourceOwnerDuringExport = NULL;
                                755           20125 :     ExportInProgress = false;
                                756           20125 : }
 3324 rhaas                     757 ECB             : 
                                758                 : /*
                                759                 :  * Handle the effects of a single heap change, appropriate to the current state
                                760                 :  * of the snapshot builder and returns whether changes made at (xid, lsn) can
                                761                 :  * be decoded.
                                762                 :  */
                                763                 : bool
 3324 rhaas                     764 GIC     1694388 : SnapBuildProcessChange(SnapBuild *builder, TransactionId xid, XLogRecPtr lsn)
                                765                 : {
                                766                 :     /*
                                767                 :      * We can't handle data in transactions if we haven't built a snapshot
                                768                 :      * yet, so don't store them.
 3324 rhaas                     769 ECB             :      */
 3324 rhaas                     770 GIC     1694388 :     if (builder->state < SNAPBUILD_FULL_SNAPSHOT)
 3324 rhaas                     771 UIC           0 :         return false;
                                772                 : 
                                773                 :     /*
                                774                 :      * No point in keeping track of changes in transactions that we don't have
 3324 rhaas                     775 ECB             :      * enough information about to decode. This means that they started before
 3324 rhaas                     776 EUB             :      * we got into the SNAPBUILD_FULL_SNAPSHOT state.
                                777                 :      */
 3324 rhaas                     778 GIC     1694399 :     if (builder->state < SNAPBUILD_CONSISTENT &&
  783 andres                    779              11 :         TransactionIdPrecedes(xid, builder->next_phase_at))
 3324 rhaas                     780               4 :         return false;
                                781                 : 
                                782                 :     /*
 3324 rhaas                     783 ECB             :      * If the reorderbuffer doesn't yet have a snapshot, add one now, it will
                                784                 :      * be needed to decode the change we're currently processing.
                                785                 :      */
 2591 andres                    786 GIC     1694384 :     if (!ReorderBufferXidHasBaseSnapshot(builder->reorder, xid))
                                787                 :     {
                                788                 :         /* only build a new snapshot if we don't have a prebuilt one */
 3324 rhaas                     789            2598 :         if (builder->snapshot == NULL)
                                790                 :         {
 2125 andres                    791 CBC         297 :             builder->snapshot = SnapBuildBuildSnapshot(builder);
                                792                 :             /* increase refcount for the snapshot builder */
 3324 rhaas                     793 GIC         297 :             SnapBuildSnapIncRefcount(builder->snapshot);
 3324 rhaas                     794 ECB             :         }
                                795                 : 
                                796                 :         /*
                                797                 :          * Increase refcount for the transaction we're handing the snapshot
                                798                 :          * out to.
                                799                 :          */
 3324 rhaas                     800 GIC        2598 :         SnapBuildSnapIncRefcount(builder->snapshot);
                                801            2598 :         ReorderBufferSetBaseSnapshot(builder->reorder, xid, lsn,
                                802                 :                                      builder->snapshot);
                                803                 :     }
                                804                 : 
 3324 rhaas                     805 CBC     1694384 :     return true;
 3324 rhaas                     806 ECB             : }
                                807                 : 
                                808                 : /*
                                809                 :  * Do CommandId/combo CID handling after reading an xl_heap_new_cid record.
 2881 heikki.linnakangas        810                 :  * This implies that a transaction has done some form of write to system
                                811                 :  * catalogs.
                                812                 :  */
                                813                 : void
 3324 rhaas                     814 GIC       22808 : SnapBuildProcessNewCid(SnapBuild *builder, TransactionId xid,
                                815                 :                        XLogRecPtr lsn, xl_heap_new_cid *xlrec)
                                816                 : {
                                817                 :     CommandId   cid;
                                818                 : 
 3324 rhaas                     819 ECB             :     /*
                                820                 :      * we only log new_cid's if a catalog tuple was modified, so mark the
                                821                 :      * transaction as containing catalog modifications
                                822                 :      */
 3260 bruce                     823 GIC       22808 :     ReorderBufferXidSetCatalogChanges(builder->reorder, xid, lsn);
                                824                 : 
 3324 rhaas                     825           22808 :     ReorderBufferAddNewTupleCids(builder->reorder, xlrec->top_xid, lsn,
                                826                 :                                  xlrec->target_locator, xlrec->target_tid,
                                827                 :                                  xlrec->cmin, xlrec->cmax,
 3324 rhaas                     828 ECB             :                                  xlrec->combocid);
                                829                 : 
                                830                 :     /* figure out new command id */
 3324 rhaas                     831 GIC       22808 :     if (xlrec->cmin != InvalidCommandId &&
                                832           19055 :         xlrec->cmax != InvalidCommandId)
                                833            3030 :         cid = Max(xlrec->cmin, xlrec->cmax);
                                834           19778 :     else if (xlrec->cmax != InvalidCommandId)
                                835            3753 :         cid = xlrec->cmax;
 3324 rhaas                     836 CBC       16025 :     else if (xlrec->cmin != InvalidCommandId)
                                837           16025 :         cid = xlrec->cmin;
 3324 rhaas                     838 ECB             :     else
                                839                 :     {
 3260 bruce                     840 LBC           0 :         cid = InvalidCommandId; /* silence compiler */
 3324 rhaas                     841               0 :         elog(ERROR, "xl_heap_new_cid record without a valid CommandId");
 3324 rhaas                     842 ECB             :     }
                                843                 : 
 3324 rhaas                     844 GIC       22808 :     ReorderBufferAddNewCommandId(builder->reorder, xid, lsn, cid + 1);
 3324 rhaas                     845 GBC       22808 : }
 3324 rhaas                     846 EUB             : 
                                847                 : /*
                                848                 :  * Add a new Snapshot to all transactions we're decoding that currently are
 3324 rhaas                     849 ECB             :  * in-progress so they can see new catalog contents made by the transaction
                                850                 :  * that just committed. This is necessary because those in-progress
                                851                 :  * transactions will use the new catalog's contents from here on (at the very
                                852                 :  * least everything they do needs to be compatible with newer catalog
                                853                 :  * contents).
                                854                 :  */
                                855                 : static void
 3324 rhaas                     856 GIC         995 : SnapBuildDistributeNewCatalogSnapshot(SnapBuild *builder, XLogRecPtr lsn)
                                857                 : {
                                858                 :     dlist_iter  txn_i;
                                859                 :     ReorderBufferTXN *txn;
                                860                 : 
 3324 rhaas                     861 ECB             :     /*
                                862                 :      * Iterate through all toplevel transactions. This can include
                                863                 :      * subtransactions which we just don't yet know to be that, but that's
                                864                 :      * fine, they will just get an unnecessary snapshot queued.
                                865                 :      */
 3324 rhaas                     866 GIC        2031 :     dlist_foreach(txn_i, &builder->reorder->toplevel_by_lsn)
                                867                 :     {
                                868            1036 :         txn = dlist_container(ReorderBufferTXN, node, txn_i.cur);
                                869                 : 
                                870            1036 :         Assert(TransactionIdIsValid(txn->xid));
 3324 rhaas                     871 ECB             : 
                                872                 :         /*
                                873                 :          * If we don't have a base snapshot yet, there are no changes in this
                                874                 :          * transaction which in turn implies we don't yet need a snapshot at
 3310 fujii                     875                 :          * all. We'll add a snapshot when the first change gets queued.
                                876                 :          *
                                877                 :          * NB: This works correctly even for subtransactions because
                                878                 :          * ReorderBufferAssignChild() takes care to transfer the base snapshot
                                879                 :          * to the top-level transaction, and while iterating the changequeue
                                880                 :          * we'll get the change from the subtxn.
                                881                 :          */
 3324 rhaas                     882 GIC        1036 :         if (!ReorderBufferXidHasBaseSnapshot(builder->reorder, txn->xid))
                                883              12 :             continue;
                                884                 : 
                                885                 :         /*
                                886                 :          * We don't need to add snapshot to prepared transactions as they
  825 akapila                   887 ECB             :          * should not see the new catalog contents.
                                888                 :          */
  825 akapila                   889 GIC        1024 :         if (rbtxn_prepared(txn) || rbtxn_skip_prepared(txn))
                                890              22 :             continue;
                                891                 : 
 3324 rhaas                     892            1002 :         elog(DEBUG2, "adding a new snapshot to %u at %X/%X",
                                893                 :              txn->xid, LSN_FORMAT_ARGS(lsn));
 3324 rhaas                     894 ECB             : 
                                895                 :         /*
                                896                 :          * increase the snapshot's refcount for the transaction we are handing
                                897                 :          * it out to
                                898                 :          */
 3324 rhaas                     899 GIC        1002 :         SnapBuildSnapIncRefcount(builder->snapshot);
                                900            1002 :         ReorderBufferAddSnapshot(builder->reorder, txn->xid, lsn,
                                901                 :                                  builder->snapshot);
                                902                 :     }
                                903             995 : }
 3324 rhaas                     904 ECB             : 
                                905                 : /*
                                906                 :  * Keep track of a new catalog changing transaction that has committed.
                                907                 :  */
                                908                 : static void
 3324 rhaas                     909 GIC        1004 : SnapBuildAddCommittedTxn(SnapBuild *builder, TransactionId xid)
                                910                 : {
                                911            1004 :     Assert(TransactionIdIsValid(xid));
                                912                 : 
                                913            1004 :     if (builder->committed.xcnt == builder->committed.xcnt_space)
 3324 rhaas                     914 ECB             :     {
 3324 rhaas                     915 UIC           0 :         builder->committed.xcnt_space = builder->committed.xcnt_space * 2 + 1;
 3324 rhaas                     916 ECB             : 
 3324 rhaas                     917 UIC           0 :         elog(DEBUG1, "increasing space for committed transactions to %u",
 3324 rhaas                     918 ECB             :              (uint32) builder->committed.xcnt_space);
                                919                 : 
 3324 rhaas                     920 UBC           0 :         builder->committed.xip = repalloc(builder->committed.xip,
 2118 tgl                       921 UIC           0 :                                           builder->committed.xcnt_space * sizeof(TransactionId));
 3324 rhaas                     922 EUB             :     }
                                923                 : 
                                924                 :     /*
                                925                 :      * TODO: It might make sense to keep the array sorted here instead of
                                926                 :      * doing it every time we build a new snapshot. On the other hand this
                                927                 :      * gets called repeatedly when a transaction with subtransactions commits.
                                928                 :      */
 3324 rhaas                     929 GIC        1004 :     builder->committed.xip[builder->committed.xcnt++] = xid;
                                930            1004 : }
                                931                 : 
                                932                 : /*
                                933                 :  * Remove knowledge about transactions we treat as committed or containing catalog
                                934                 :  * changes that are smaller than ->xmin. Those won't ever get checked via
                                935                 :  * the ->committed or ->catchange array, respectively. The committed xids will
                                936                 :  * get checked via the clog machinery.
                                937                 :  *
                                938                 :  * We can ideally remove the transaction from catchange array once it is
                                939                 :  * finished (committed/aborted) but that could be costly as we need to maintain
                                940                 :  * the xids order in the array.
                                941                 :  */
                                942                 : static void
  241 akapila                   943             274 : SnapBuildPurgeOlderTxn(SnapBuild *builder)
                                944                 : {
                                945                 :     int         off;
                                946                 :     TransactionId *workspace;
 3324 rhaas                     947             274 :     int         surviving_xids = 0;
 3324 rhaas                     948 ECB             : 
                                949                 :     /* not ready yet */
 3324 rhaas                     950 GIC         274 :     if (!TransactionIdIsNormal(builder->xmin))
 3324 rhaas                     951 UIC           0 :         return;
 3324 rhaas                     952 ECB             : 
                                953                 :     /* TODO: Neater algorithm than just copying and iterating? */
                                954                 :     workspace =
 3324 rhaas                     955 CBC         274 :         MemoryContextAlloc(builder->context,
 3324 rhaas                     956 GBC         274 :                            builder->committed.xcnt * sizeof(TransactionId));
                                957                 : 
                                958                 :     /* copy xids that still are interesting to workspace */
 3324 rhaas                     959 GIC         481 :     for (off = 0; off < builder->committed.xcnt; off++)
 3324 rhaas                     960 ECB             :     {
 3324 rhaas                     961 CBC         207 :         if (NormalTransactionIdPrecedes(builder->committed.xip[off],
                                962                 :                                         builder->xmin))
                                963                 :             ;                   /* remove */
 3324 rhaas                     964 ECB             :         else
 3324 rhaas                     965 UIC           0 :             workspace[surviving_xids++] = builder->committed.xip[off];
 3324 rhaas                     966 ECB             :     }
                                967                 : 
                                968                 :     /* copy workspace back to persistent state */
 3324 rhaas                     969 GIC         274 :     memcpy(builder->committed.xip, workspace,
 3324 rhaas                     970 EUB             :            surviving_xids * sizeof(TransactionId));
                                971                 : 
 3324 rhaas                     972 GIC         274 :     elog(DEBUG3, "purged committed transactions from %u to %u, xmin: %u, xmax: %u",
                                973                 :          (uint32) builder->committed.xcnt, (uint32) surviving_xids,
 3324 rhaas                     974 ECB             :          builder->xmin, builder->xmax);
 3324 rhaas                     975 GIC         274 :     builder->committed.xcnt = surviving_xids;
                                976                 : 
 3324 rhaas                     977 CBC         274 :     pfree(workspace);
                                978                 : 
  241 akapila                   979 ECB             :     /*
                                980                 :      * Purge xids in ->catchange as well. The purged array must also be sorted
                                981                 :      * in xidComparator order.
                                982                 :      */
  223 akapila                   983 GNC         274 :     if (builder->catchange.xcnt > 0)
                                984                 :     {
                                985                 :         /*
                                986                 :          * Since catchange.xip is sorted, we find the lower bound of xids that
                                987                 :          * are still interesting.
                                988                 :          */
                                989               7 :         for (off = 0; off < builder->catchange.xcnt; off++)
                                990                 :         {
                                991               5 :             if (TransactionIdFollowsOrEquals(builder->catchange.xip[off],
                                992                 :                                              builder->xmin))
                                993               1 :                 break;
                                994                 :         }
                                995                 : 
                                996               3 :         surviving_xids = builder->catchange.xcnt - off;
                                997                 : 
                                998               3 :         if (surviving_xids > 0)
                                999                 :         {
                               1000               1 :             memmove(builder->catchange.xip, &(builder->catchange.xip[off]),
                               1001                 :                     surviving_xids * sizeof(TransactionId));
                               1002                 :         }
  223 akapila                  1003 ECB             :         else
                               1004                 :         {
  223 akapila                  1005 GNC           2 :             pfree(builder->catchange.xip);
                               1006               2 :             builder->catchange.xip = NULL;
                               1007                 :         }
                               1008                 : 
                               1009               3 :         elog(DEBUG3, "purged catalog modifying transactions from %u to %u, xmin: %u, xmax: %u",
                               1010                 :              (uint32) builder->catchange.xcnt, (uint32) surviving_xids,
                               1011                 :              builder->xmin, builder->xmax);
                               1012               3 :         builder->catchange.xcnt = surviving_xids;
  223 akapila                  1013 ECB             :     }
                               1014                 : }
                               1015                 : 
                               1016                 : /*
                               1017                 :  * Handle everything that needs to be done when a transaction commits
 3324 rhaas                    1018                 :  */
                               1019                 : void
 3324 rhaas                    1020 GIC        2542 : SnapBuildCommitTxn(SnapBuild *builder, XLogRecPtr lsn, TransactionId xid,
                               1021                 :                    int nsubxacts, TransactionId *subxacts, uint32 xinfo)
                               1022                 : {
 3324 rhaas                    1023 ECB             :     int         nxact;
                               1024                 : 
 2157 andres                   1025 CBC        2542 :     bool        needs_snapshot = false;
 2157 andres                   1026 GIC        2542 :     bool        needs_timetravel = false;
 3324 rhaas                    1027 CBC        2542 :     bool        sub_needs_timetravel = false;
                               1028                 : 
 3324 rhaas                    1029 GIC        2542 :     TransactionId xmax = xid;
                               1030                 : 
                               1031                 :     /*
                               1032                 :      * Transactions preceding BUILDING_SNAPSHOT will neither be decoded, nor
 2157 andres                   1033 ECB             :      * will they be part of a snapshot.  So we don't need to record anything.
 3324 rhaas                    1034                 :      */
 2157 andres                   1035 GBC        2542 :     if (builder->state == SNAPBUILD_START ||
 2157 andres                   1036 GIC        2542 :         (builder->state == SNAPBUILD_BUILDING_SNAPSHOT &&
  783 andres                   1037 UIC           0 :          TransactionIdPrecedes(xid, builder->next_phase_at)))
 2157 andres                   1038 EUB             :     {
                               1039                 :         /* ensure that only commits after this are getting replayed */
 2157 andres                   1040 UBC           0 :         if (builder->start_decoding_at <= lsn)
 2157 andres                   1041 UIC           0 :             builder->start_decoding_at = lsn + 1;
                               1042               0 :         return;
 2157 andres                   1043 ECB             :     }
                               1044                 : 
 3324 rhaas                    1045 GIC        2542 :     if (builder->state < SNAPBUILD_CONSISTENT)
 3324 rhaas                    1046 ECB             :     {
                               1047                 :         /* ensure that only commits after this are getting replayed */
 3230 andres                   1048 GIC          11 :         if (builder->start_decoding_at <= lsn)
                               1049               5 :             builder->start_decoding_at = lsn + 1;
                               1050                 : 
                               1051                 :         /*
                               1052                 :          * If building an exportable snapshot, force xid to be tracked, even
 2157 andres                   1053 ECB             :          * if the transaction didn't modify the catalog.
                               1054                 :          */
 2157 andres                   1055 GBC          11 :         if (builder->building_full_snapshot)
                               1056                 :         {
 2157 andres                   1057 UIC           0 :             needs_timetravel = true;
                               1058                 :         }
 3324 rhaas                    1059 ECB             :     }
                               1060                 : 
 3324 rhaas                    1061 CBC        3790 :     for (nxact = 0; nxact < nsubxacts; nxact++)
                               1062                 :     {
 3324 rhaas                    1063 GIC        1248 :         TransactionId subxid = subxacts[nxact];
                               1064                 : 
                               1065                 :         /*
                               1066                 :          * Add subtransaction to base snapshot if catalog modifying, we don't
 2157 andres                   1067 ECB             :          * distinguish to toplevel transactions there.
                               1068                 :          */
  241 akapila                  1069 GNC        1248 :         if (SnapBuildXidHasCatalogChanges(builder, subxid, xinfo))
 3324 rhaas                    1070 ECB             :         {
 2157 andres                   1071 GIC           9 :             sub_needs_timetravel = true;
 2157 andres                   1072 CBC           9 :             needs_snapshot = true;
                               1073                 : 
 2157 andres                   1074 GIC           9 :             elog(DEBUG1, "found subtransaction %u:%u with catalog changes",
 2157 andres                   1075 ECB             :                  xid, subxid);
                               1076                 : 
 3324 rhaas                    1077 CBC           9 :             SnapBuildAddCommittedTxn(builder, subxid);
 2157 andres                   1078 ECB             : 
 3324 rhaas                    1079 GIC           9 :             if (NormalTransactionIdFollows(subxid, xmax))
                               1080               9 :                 xmax = subxid;
                               1081                 :         }
                               1082                 : 
                               1083                 :         /*
                               1084                 :          * If we're forcing timetravel we also need visibility information
                               1085                 :          * about subtransaction, so keep track of subtransaction's state, even
                               1086                 :          * if not catalog modifying.  Don't need to distribute a snapshot in
 2157 andres                   1087 ECB             :          * that case.
                               1088                 :          */
 2157 andres                   1089 GBC        1239 :         else if (needs_timetravel)
 3324 rhaas                    1090 EUB             :         {
 3324 rhaas                    1091 UBC           0 :             SnapBuildAddCommittedTxn(builder, subxid);
 3324 rhaas                    1092 UIC           0 :             if (NormalTransactionIdFollows(subxid, xmax))
                               1093               0 :                 xmax = subxid;
                               1094                 :         }
                               1095                 :     }
 3324 rhaas                    1096 ECB             : 
                               1097                 :     /* if top-level modified catalog, it'll need a snapshot */
  241 akapila                  1098 GNC        2542 :     if (SnapBuildXidHasCatalogChanges(builder, xid, xinfo))
                               1099                 :     {
 2157 andres                   1100 CBC         994 :         elog(DEBUG2, "found top level transaction %u, with catalog changes",
 2157 andres                   1101 ECB             :              xid);
 2157 andres                   1102 CBC         994 :         needs_snapshot = true;
 2157 andres                   1103 GIC         994 :         needs_timetravel = true;
 3324 rhaas                    1104 CBC         994 :         SnapBuildAddCommittedTxn(builder, xid);
                               1105                 :     }
 2157 andres                   1106 GIC        1548 :     else if (sub_needs_timetravel)
 3324 rhaas                    1107 ECB             :     {
                               1108                 :         /* track toplevel txn as well, subxact alone isn't meaningful */
  171 akapila                  1109 CBC           1 :         elog(DEBUG2, "forced transaction %u to do timetravel due to one of its subtransactions",
  171 akapila                  1110 ECB             :              xid);
  171 akapila                  1111 GIC           1 :         needs_timetravel = true;
 3324 rhaas                    1112 CBC           1 :         SnapBuildAddCommittedTxn(builder, xid);
                               1113                 :     }
 2157 andres                   1114 GBC        1547 :     else if (needs_timetravel)
                               1115                 :     {
 2157 andres                   1116 UBC           0 :         elog(DEBUG2, "forced transaction %u to do timetravel", xid);
                               1117                 : 
 3324 rhaas                    1118 UIC           0 :         SnapBuildAddCommittedTxn(builder, xid);
 3324 rhaas                    1119 ECB             :     }
                               1120                 : 
 2157 andres                   1121 GIC        2542 :     if (!needs_timetravel)
 3324 rhaas                    1122 ECB             :     {
                               1123                 :         /* record that we cannot export a general snapshot anymore */
 2157 andres                   1124 GIC        1547 :         builder->committed.includes_all_transactions = false;
 2157 andres                   1125 ECB             :     }
                               1126                 : 
 2157 andres                   1127 GIC        2542 :     Assert(!needs_snapshot || needs_timetravel);
                               1128                 : 
                               1129                 :     /*
                               1130                 :      * Adjust xmax of the snapshot builder, we only do that for committed,
                               1131                 :      * catalog modifying, transactions, everything else isn't interesting for
 2153 bruce                    1132 ECB             :      * us since we'll never look at the respective rows.
 2157 andres                   1133                 :      */
 2157 andres                   1134 CBC        2542 :     if (needs_timetravel &&
 2157 andres                   1135 GIC        1990 :         (!TransactionIdIsValid(builder->xmax) ||
 2157 andres                   1136 CBC         995 :          TransactionIdFollowsOrEquals(xmax, builder->xmax)))
 2157 andres                   1137 ECB             :     {
 2157 andres                   1138 GIC         991 :         builder->xmax = xmax;
                               1139             991 :         TransactionIdAdvance(builder->xmax);
                               1140                 :     }
 2157 andres                   1141 ECB             : 
                               1142                 :     /* if there's any reason to build a historic snapshot, do so now */
 2157 andres                   1143 GIC        2542 :     if (needs_snapshot)
                               1144                 :     {
                               1145                 :         /*
                               1146                 :          * If we haven't built a complete snapshot yet there's no need to hand
 3324 rhaas                    1147 ECB             :          * it out, it wouldn't (and couldn't) be used anyway.
 3324 rhaas                    1148 EUB             :          */
 3324 rhaas                    1149 GIC         995 :         if (builder->state < SNAPBUILD_FULL_SNAPSHOT)
 3324 rhaas                    1150 UIC           0 :             return;
                               1151                 : 
                               1152                 :         /*
                               1153                 :          * Decrease the snapshot builder's refcount of the old snapshot, note
                               1154                 :          * that it still will be used if it has been handed out to the
 3324 rhaas                    1155 ECB             :          * reorderbuffer earlier.
                               1156                 :          */
 3324 rhaas                    1157 GIC         995 :         if (builder->snapshot)
 3324 rhaas                    1158 CBC         994 :             SnapBuildSnapDecRefcount(builder->snapshot);
                               1159                 : 
 2125 andres                   1160 GIC         995 :         builder->snapshot = SnapBuildBuildSnapshot(builder);
 3324 rhaas                    1161 ECB             : 
                               1162                 :         /* we might need to execute invalidations, add snapshot */
 3324 rhaas                    1163 CBC         995 :         if (!ReorderBufferXidHasBaseSnapshot(builder->reorder, xid))
 3324 rhaas                    1164 ECB             :         {
 3324 rhaas                    1165 GIC           9 :             SnapBuildSnapIncRefcount(builder->snapshot);
                               1166               9 :             ReorderBufferSetBaseSnapshot(builder->reorder, xid, lsn,
                               1167                 :                                          builder->snapshot);
                               1168                 :         }
 3324 rhaas                    1169 ECB             : 
                               1170                 :         /* refcount of the snapshot builder for the new snapshot */
 3324 rhaas                    1171 GIC         995 :         SnapBuildSnapIncRefcount(builder->snapshot);
 3324 rhaas                    1172 ECB             : 
                               1173                 :         /* add a new catalog snapshot to all currently running transactions */
 3324 rhaas                    1174 GIC         995 :         SnapBuildDistributeNewCatalogSnapshot(builder, lsn);
                               1175                 :     }
                               1176                 : }
                               1177                 : 
                               1178                 : /*
                               1179                 :  * Check the reorder buffer and the snapshot to see if the given transaction has
                               1180                 :  * modified catalogs.
                               1181                 :  */
                               1182                 : static inline bool
  241 akapila                  1183 GNC        3790 : SnapBuildXidHasCatalogChanges(SnapBuild *builder, TransactionId xid,
                               1184                 :                               uint32 xinfo)
                               1185                 : {
                               1186            3790 :     if (ReorderBufferXidHasCatalogChanges(builder->reorder, xid))
                               1187             999 :         return true;
                               1188                 : 
                               1189                 :     /*
                               1190                 :      * The transactions that have changed catalogs must have invalidation
                               1191                 :      * info.
                               1192                 :      */
                               1193            2791 :     if (!(xinfo & XACT_XINFO_HAS_INVALS))
                               1194            2783 :         return false;
                               1195                 : 
                               1196                 :     /* Check the catchange XID array */
                               1197              12 :     return ((builder->catchange.xcnt > 0) &&
                               1198               4 :             (bsearch(&xid, builder->catchange.xip, builder->catchange.xcnt,
                               1199                 :                      sizeof(TransactionId), xidComparator) != NULL));
                               1200                 : }
                               1201                 : 
                               1202                 : /* -----------------------------------
                               1203                 :  * Snapshot building functions dealing with xlog records
 3324 rhaas                    1204 ECB             :  * -----------------------------------
                               1205                 :  */
                               1206                 : 
                               1207                 : /*
 3287 heikki.linnakangas       1208                 :  * Process a running xacts record, and use its information to first build a
                               1209                 :  * historic snapshot and later to release resources that aren't needed
                               1210                 :  * anymore.
                               1211                 :  */
                               1212                 : void
 3324 rhaas                    1213 GIC        1026 : SnapBuildProcessRunningXacts(SnapBuild *builder, XLogRecPtr lsn, xl_running_xacts *running)
 3324 rhaas                    1214 ECB             : {
                               1215                 :     ReorderBufferTXN *txn;
                               1216                 :     TransactionId xmin;
                               1217                 : 
                               1218                 :     /*
                               1219                 :      * If we're not consistent yet, inspect the record to see whether it
                               1220                 :      * allows to get closer to being consistent. If we are consistent, dump
                               1221                 :      * our snapshot so others or we, after a restart, can use it.
                               1222                 :      */
 3324 rhaas                    1223 GIC        1026 :     if (builder->state < SNAPBUILD_CONSISTENT)
                               1224                 :     {
                               1225                 :         /* returns false if there's no point in performing cleanup just yet */
                               1226             776 :         if (!SnapBuildFindSnapshot(builder, lsn, running))
                               1227             750 :             return;
                               1228                 :     }
                               1229                 :     else
                               1230             250 :         SnapBuildSerialize(builder, lsn);
                               1231                 : 
                               1232                 :     /*
                               1233                 :      * Update range of interesting xids based on the running xacts
 3324 rhaas                    1234 ECB             :      * information. We don't increase ->xmax using it, because once we are in
                               1235                 :      * a consistent state we can do that ourselves and much more efficiently
                               1236                 :      * so, because we only need to do it for catalog transactions since we
                               1237                 :      * only ever look at those.
                               1238                 :      *
                               1239                 :      * NB: We only increase xmax when a catalog modifying transaction commits
                               1240                 :      * (see SnapBuildCommitTxn).  Because of this, xmax can be lower than
                               1241                 :      * xmin, which looks odd but is correct and actually more efficient, since
                               1242                 :      * we hit fast paths in heapam_visibility.c.
                               1243                 :      */
 3324 rhaas                    1244 CBC         274 :     builder->xmin = running->oldestRunningXid;
                               1245                 : 
                               1246                 :     /* Remove transactions we don't need to keep track off anymore */
  241 akapila                  1247             274 :     SnapBuildPurgeOlderTxn(builder);
 3324 rhaas                    1248 ECB             : 
                               1249                 :     /*
                               1250                 :      * Advance the xmin limit for the current replication slot, to allow
 1748 alvherre                 1251                 :      * vacuum to clean up the tuples this slot has been protecting.
                               1252                 :      *
                               1253                 :      * The reorderbuffer might have an xmin among the currently running
                               1254                 :      * snapshots; use it if so.  If not, we need only consider the snapshots
                               1255                 :      * we'll produce later, which can't be less than the oldest running xid in
                               1256                 :      * the record we're reading now.
                               1257                 :      */
 1748 alvherre                 1258 GIC         274 :     xmin = ReorderBufferGetOldestXmin(builder->reorder);
                               1259             274 :     if (xmin == InvalidTransactionId)
                               1260             242 :         xmin = running->oldestRunningXid;
                               1261             274 :     elog(DEBUG3, "xmin: %u, xmax: %u, oldest running: %u, oldest xmin: %u",
                               1262                 :          builder->xmin, builder->xmax, running->oldestRunningXid, xmin);
                               1263             274 :     LogicalIncreaseXminForSlot(lsn, xmin);
                               1264                 : 
 3324 rhaas                    1265 ECB             :     /*
                               1266                 :      * Also tell the slot where we can restart decoding from. We don't want to
                               1267                 :      * do that after every commit because changing that implies an fsync of
                               1268                 :      * the logical slot's state file, so we only do it every time we see a
                               1269                 :      * running xacts record.
                               1270                 :      *
                               1271                 :      * Do so by looking for the oldest in progress transaction (determined by
                               1272                 :      * the first LSN of any of its relevant records). Every transaction
                               1273                 :      * remembers the last location we stored the snapshot to disk before its
                               1274                 :      * beginning. That point is where we can restart from.
                               1275                 :      */
                               1276                 : 
                               1277                 :     /*
                               1278                 :      * Can't know about a serialized snapshot's location if we're not
                               1279                 :      * consistent.
                               1280                 :      */
 3324 rhaas                    1281 CBC         274 :     if (builder->state < SNAPBUILD_CONSISTENT)
                               1282              19 :         return;
                               1283                 : 
                               1284             255 :     txn = ReorderBufferGetOldestTXN(builder->reorder);
                               1285                 : 
                               1286                 :     /*
                               1287                 :      * oldest ongoing txn might have started when we didn't yet serialize
                               1288                 :      * anything because we hadn't reached a consistent state yet.
                               1289                 :      */
 3324 rhaas                    1290 GIC         255 :     if (txn != NULL && txn->restart_decoding_lsn != InvalidXLogRecPtr)
                               1291              13 :         LogicalIncreaseRestartDecodingForSlot(lsn, txn->restart_decoding_lsn);
                               1292                 : 
                               1293                 :     /*
                               1294                 :      * No in-progress transaction, can reuse the last serialized snapshot if
                               1295                 :      * we have one.
                               1296                 :      */
                               1297             242 :     else if (txn == NULL &&
 2118 tgl                      1298             223 :              builder->reorder->current_restart_decoding_lsn != InvalidXLogRecPtr &&
 3324 rhaas                    1299             221 :              builder->last_serialized_snapshot != InvalidXLogRecPtr)
                               1300             221 :         LogicalIncreaseRestartDecodingForSlot(lsn,
                               1301                 :                                               builder->last_serialized_snapshot);
 3324 rhaas                    1302 ECB             : }
                               1303                 : 
                               1304                 : 
                               1305                 : /*
                               1306                 :  * Build the start of a snapshot that's capable of decoding the catalog.
                               1307                 :  *
                               1308                 :  * Helper function for SnapBuildProcessRunningXacts() while we're not yet
                               1309                 :  * consistent.
                               1310                 :  *
                               1311                 :  * Returns true if there is a point in performing internal maintenance/cleanup
                               1312                 :  * using the xl_running_xacts record.
                               1313                 :  */
                               1314                 : static bool
 3324 rhaas                    1315 GIC         776 : SnapBuildFindSnapshot(SnapBuild *builder, XLogRecPtr lsn, xl_running_xacts *running)
                               1316                 : {
                               1317                 :     /* ---
 3324 rhaas                    1318 ECB             :      * Build catalog decoding snapshot incrementally using information about
                               1319                 :      * the currently running transactions. There are several ways to do that:
                               1320                 :      *
                               1321                 :      * a) There were no running transactions when the xl_running_xacts record
                               1322                 :      *    was inserted, jump to CONSISTENT immediately. We might find such a
                               1323                 :      *    state while waiting on c)'s sub-states.
                               1324                 :      *
                               1325                 :      * b) This (in a previous run) or another decoding slot serialized a
                               1326                 :      *    snapshot to disk that we can use.  Can't use this method for the
                               1327                 :      *    initial snapshot when slot is being created and needs full snapshot
                               1328                 :      *    for export or direct use, as that snapshot will only contain catalog
                               1329                 :      *    modifying transactions.
                               1330                 :      *
                               1331                 :      * c) First incrementally build a snapshot for catalog tuples
                               1332                 :      *    (BUILDING_SNAPSHOT), that requires all, already in-progress,
                               1333                 :      *    transactions to finish.  Every transaction starting after that
                               1334                 :      *    (FULL_SNAPSHOT state), has enough information to be decoded.  But
                               1335                 :      *    for older running transactions no viable snapshot exists yet, so
 2153 bruce                    1336                 :      *    CONSISTENT will only be reached once all of those have finished.
                               1337                 :      * ---
                               1338                 :      */
                               1339                 : 
                               1340                 :     /*
                               1341                 :      * xl_running_xacts record is older than what we can use, we might not have
                               1342                 :      * all necessary catalog rows anymore.
                               1343                 :      */
 3324 rhaas                    1344 GIC         776 :     if (TransactionIdIsNormal(builder->initial_xmin_horizon) &&
                               1345             344 :         NormalTransactionIdPrecedes(running->oldestRunningXid,
                               1346                 :                                     builder->initial_xmin_horizon))
                               1347                 :     {
 3324 rhaas                    1348 UIC           0 :         ereport(DEBUG1,
                               1349                 :                 (errmsg_internal("skipping snapshot at %X/%X while building logical decoding snapshot, xmin horizon too low",
                               1350                 :                                  LSN_FORMAT_ARGS(lsn)),
                               1351                 :                  errdetail_internal("initial xmin horizon of %u vs the snapshot's %u",
                               1352                 :                                     builder->initial_xmin_horizon, running->oldestRunningXid)));
                               1353                 : 
                               1354                 : 
 2157 andres                   1355               0 :         SnapBuildWaitSnapshot(running, builder->initial_xmin_horizon);
                               1356                 : 
 3324 rhaas                    1357               0 :         return true;
                               1358                 :     }
                               1359                 : 
                               1360                 :     /*
                               1361                 :      * a) No transaction were running, we can jump to consistent.
                               1362                 :      *
                               1363                 :      * This is not affected by races around xl_running_xacts, because we can
                               1364                 :      * miss transaction commits, but currently not transactions starting.
 2157 andres                   1365 ECB             :      *
 3324 rhaas                    1366                 :      * NB: We might have already started to incrementally assemble a snapshot,
                               1367                 :      * so we need to be careful to deal with that.
                               1368                 :      */
 2157 andres                   1369 GBC         776 :     if (running->oldestRunningXid == running->nextXid)
                               1370                 :     {
 3230 andres                   1371 GIC         744 :         if (builder->start_decoding_at == InvalidXLogRecPtr ||
                               1372             414 :             builder->start_decoding_at <= lsn)
                               1373                 :             /* can decode everything after this */
                               1374             331 :             builder->start_decoding_at = lsn + 1;
                               1375                 : 
 3069 andres                   1376 EUB             :         /* As no transactions were running xmin/xmax can be trivially set. */
 2118 tgl                      1377 GIC         744 :         builder->xmin = running->nextXid; /* < are finished */
 2118 tgl                      1378 GBC         744 :         builder->xmax = running->nextXid; /* >= are running */
                               1379                 : 
                               1380                 :         /* so we can safely use the faster comparisons */
 3324 rhaas                    1381 GIC         744 :         Assert(TransactionIdIsNormal(builder->xmin));
                               1382             744 :         Assert(TransactionIdIsNormal(builder->xmax));
                               1383                 : 
                               1384             744 :         builder->state = SNAPBUILD_CONSISTENT;
  783 andres                   1385             744 :         builder->next_phase_at = InvalidTransactionId;
                               1386                 : 
 3324 rhaas                    1387             744 :         ereport(LOG,
                               1388                 :                 (errmsg("logical decoding found consistent point at %X/%X",
                               1389                 :                         LSN_FORMAT_ARGS(lsn)),
 3069 peter_e                  1390 ECB             :                  errdetail("There are no running transactions.")));
                               1391                 : 
 3324 rhaas                    1392 CBC         744 :         return false;
 3324 rhaas                    1393 ECB             :     }
                               1394                 :     /* b) valid on disk state and not building full snapshot */
 2173 andres                   1395 CBC          63 :     else if (!builder->building_full_snapshot &&
 2173 andres                   1396 GIC          31 :              SnapBuildRestore(builder, lsn))
                               1397                 :     {
                               1398                 :         /* there won't be any state to cleanup */
 3324 rhaas                    1399 CBC           6 :         return false;
                               1400                 :     }
                               1401                 : 
 3324 rhaas                    1402 ECB             :     /*
 2157 andres                   1403                 :      * c) transition from START to BUILDING_SNAPSHOT.
                               1404                 :      *
                               1405                 :      * In START state, and a xl_running_xacts record with running xacts is
                               1406                 :      * encountered.  In that case, switch to BUILDING_SNAPSHOT state, and
                               1407                 :      * record xl_running_xacts->nextXid.  Once all running xacts have finished
                               1408                 :      * (i.e. they're all >= nextXid), we have a complete catalog snapshot.  It
                               1409                 :      * might look that we could use xl_running_xacts's ->xids information to
                               1410                 :      * get there quicker, but that is problematic because transactions marked
                               1411                 :      * as running, might already have inserted their commit record - it's
                               1412                 :      * infeasible to change that with locking.
                               1413                 :      */
 2157 andres                   1414 GIC          26 :     else if (builder->state == SNAPBUILD_START)
                               1415                 :     {
                               1416              14 :         builder->state = SNAPBUILD_BUILDING_SNAPSHOT;
  783                          1417              14 :         builder->next_phase_at = running->nextXid;
                               1418                 : 
                               1419                 :         /*
                               1420                 :          * Start with an xmin/xmax that's correct for future, when all the
 3069 andres                   1421 ECB             :          * currently running transactions have finished. We'll update both
                               1422                 :          * while waiting for the pending transactions to finish.
                               1423                 :          */
 2118 tgl                      1424 CBC          14 :         builder->xmin = running->nextXid; /* < are finished */
 2118 tgl                      1425 GIC          14 :         builder->xmax = running->nextXid; /* >= are running */
                               1426                 : 
                               1427                 :         /* so we can safely use the faster comparisons */
 3324 rhaas                    1428              14 :         Assert(TransactionIdIsNormal(builder->xmin));
                               1429              14 :         Assert(TransactionIdIsNormal(builder->xmax));
                               1430                 : 
 3324 rhaas                    1431 CBC          14 :         ereport(LOG,
 2118 tgl                      1432 ECB             :                 (errmsg("logical decoding found initial starting point at %X/%X",
                               1433                 :                         LSN_FORMAT_ARGS(lsn)),
                               1434                 :                  errdetail("Waiting for transactions (approximately %d) older than %u to end.",
                               1435                 :                            running->xcnt, running->nextXid)));
 3324 rhaas                    1436                 : 
 2157 andres                   1437 GIC          14 :         SnapBuildWaitSnapshot(running, running->nextXid);
 2157 andres                   1438 ECB             :     }
                               1439                 : 
                               1440                 :     /*
                               1441                 :      * c) transition from BUILDING_SNAPSHOT to FULL_SNAPSHOT.
                               1442                 :      *
                               1443                 :      * In BUILDING_SNAPSHOT state, and this xl_running_xacts' oldestRunningXid
                               1444                 :      * is >= than nextXid from when we switched to BUILDING_SNAPSHOT.  This
                               1445                 :      * means all transactions starting afterwards have enough information to
                               1446                 :      * be decoded.  Switch to FULL_SNAPSHOT.
                               1447                 :      */
 2157 andres                   1448 GIC          19 :     else if (builder->state == SNAPBUILD_BUILDING_SNAPSHOT &&
  783                          1449               7 :              TransactionIdPrecedesOrEquals(builder->next_phase_at,
                               1450                 :                                            running->oldestRunningXid))
                               1451                 :     {
 2157                          1452               7 :         builder->state = SNAPBUILD_FULL_SNAPSHOT;
  783                          1453               7 :         builder->next_phase_at = running->nextXid;
                               1454                 : 
 2157 andres                   1455 CBC           7 :         ereport(LOG,
 2118 tgl                      1456 ECB             :                 (errmsg("logical decoding found initial consistent point at %X/%X",
                               1457                 :                         LSN_FORMAT_ARGS(lsn)),
                               1458                 :                  errdetail("Waiting for transactions (approximately %d) older than %u to end.",
                               1459                 :                            running->xcnt, running->nextXid)));
 3324 rhaas                    1460                 : 
 2157 andres                   1461 GIC           7 :         SnapBuildWaitSnapshot(running, running->nextXid);
 2157 andres                   1462 ECB             :     }
                               1463                 : 
                               1464                 :     /*
                               1465                 :      * c) transition from FULL_SNAPSHOT to CONSISTENT.
                               1466                 :      *
                               1467                 :      * In FULL_SNAPSHOT state, and this xl_running_xacts' oldestRunningXid is
                               1468                 :      * >= than nextXid from when we switched to FULL_SNAPSHOT.  This means all
                               1469                 :      * transactions that are currently in progress have a catalog snapshot,
                               1470                 :      * and all their changes have been collected.  Switch to CONSISTENT.
                               1471                 :      */
 2157 andres                   1472 GIC          10 :     else if (builder->state == SNAPBUILD_FULL_SNAPSHOT &&
  783                          1473               5 :              TransactionIdPrecedesOrEquals(builder->next_phase_at,
                               1474                 :                                            running->oldestRunningXid))
                               1475                 :     {
 2157                          1476               5 :         builder->state = SNAPBUILD_CONSISTENT;
  783                          1477               5 :         builder->next_phase_at = InvalidTransactionId;
 3324 rhaas                    1478 ECB             : 
 2157 andres                   1479 CBC           5 :         ereport(LOG,
                               1480                 :                 (errmsg("logical decoding found consistent point at %X/%X",
                               1481                 :                         LSN_FORMAT_ARGS(lsn)),
 2157 andres                   1482 ECB             :                  errdetail("There are no old transactions anymore.")));
 3324 rhaas                    1483                 :     }
                               1484                 : 
                               1485                 :     /*
                               1486                 :      * We already started to track running xacts and need to wait for all
                               1487                 :      * in-progress ones to finish. We fall through to the normal processing of
                               1488                 :      * records so incremental cleanup can be performed.
                               1489                 :      */
 3324 rhaas                    1490 GIC          24 :     return true;
                               1491                 : }
                               1492                 : 
                               1493                 : /* ---
                               1494                 :  * Iterate through xids in record, wait for all older than the cutoff to
                               1495                 :  * finish.  Then, if possible, log a new xl_running_xacts record.
 2157 andres                   1496 ECB             :  *
                               1497                 :  * This isn't required for the correctness of decoding, but to:
                               1498                 :  * a) allow isolationtester to notice that we're currently waiting for
                               1499                 :  *    something.
                               1500                 :  * b) log a new xl_running_xacts record where it'd be helpful, without having
                               1501                 :  *    to wait for bgwriter or checkpointer.
                               1502                 :  * ---
                               1503                 :  */
                               1504                 : static void
 2157 andres                   1505 GIC          21 : SnapBuildWaitSnapshot(xl_running_xacts *running, TransactionId cutoff)
                               1506                 : {
                               1507                 :     int         off;
                               1508                 : 
                               1509              40 :     for (off = 0; off < running->xcnt; off++)
                               1510                 :     {
 2157 andres                   1511 CBC          21 :         TransactionId xid = running->xids[off];
                               1512                 : 
                               1513                 :         /*
                               1514                 :          * Upper layers should prevent that we ever need to wait on ourselves.
 2153 bruce                    1515 ECB             :          * Check anyway, since failing to do so would either result in an
                               1516                 :          * endless wait or an Assert() failure.
 2157 andres                   1517                 :          */
 2157 andres                   1518 GIC          21 :         if (TransactionIdIsCurrentTransactionId(xid))
 2157 andres                   1519 UIC           0 :             elog(ERROR, "waiting for ourselves");
                               1520                 : 
 2157 andres                   1521 GIC          21 :         if (TransactionIdFollows(xid, cutoff))
 2157 andres                   1522 UIC           0 :             continue;
                               1523                 : 
 2157 andres                   1524 CBC          21 :         XactLockTableWait(xid, NULL, NULL, XLTW_None);
 2157 andres                   1525 EUB             :     }
                               1526                 : 
 2157 andres                   1527 ECB             :     /*
 2157 andres                   1528 EUB             :      * All transactions we needed to finish finished - try to ensure there is
                               1529                 :      * another xl_running_xacts record in a timely manner, without having to
  697 tgl                      1530 ECB             :      * wait for bgwriter or checkpointer to log one.  During recovery we can't
                               1531                 :      * enforce that, so we'll have to wait.
                               1532                 :      */
 2157 andres                   1533 GIC          19 :     if (!RecoveryInProgress())
                               1534                 :     {
                               1535              19 :         LogStandbySnapshot();
                               1536                 :     }
                               1537              19 : }
                               1538                 : 
 3324 rhaas                    1539 ECB             : /* -----------------------------------
                               1540                 :  * Snapshot serialization support
                               1541                 :  * -----------------------------------
                               1542                 :  */
                               1543                 : 
                               1544                 : /*
                               1545                 :  * We store current state of struct SnapBuild on disk in the following manner:
                               1546                 :  *
                               1547                 :  * struct SnapBuildOnDisk;
                               1548                 :  * TransactionId * committed.xcnt; (*not xcnt_space*)
                               1549                 :  * TransactionId * catchange.xcnt;
                               1550                 :  *
                               1551                 :  */
                               1552                 : typedef struct SnapBuildOnDisk
                               1553                 : {
                               1554                 :     /* first part of this struct needs to be version independent */
                               1555                 : 
                               1556                 :     /* data not covered by checksum */
                               1557                 :     uint32      magic;
                               1558                 :     pg_crc32c   checksum;
                               1559                 : 
                               1560                 :     /* data covered by checksum */
                               1561                 : 
                               1562                 :     /* version, in case we want to support pg_upgrade */
                               1563                 :     uint32      version;
                               1564                 :     /* how large is the on disk data, excluding the constant sized part */
                               1565                 :     uint32      length;
                               1566                 : 
                               1567                 :     /* version dependent part */
                               1568                 :     SnapBuild   builder;
                               1569                 : 
                               1570                 :     /* variable amount of TransactionIds follows */
                               1571                 : } SnapBuildOnDisk;
                               1572                 : 
                               1573                 : #define SnapBuildOnDiskConstantSize \
                               1574                 :     offsetof(SnapBuildOnDisk, builder)
                               1575                 : #define SnapBuildOnDiskNotChecksummedSize \
                               1576                 :     offsetof(SnapBuildOnDisk, version)
                               1577                 : 
                               1578                 : #define SNAPBUILD_MAGIC 0x51A1E001
                               1579                 : #define SNAPBUILD_VERSION 5
                               1580                 : 
                               1581                 : /*
                               1582                 :  * Store/Load a snapshot from disk, depending on the snapshot builder's state.
                               1583                 :  *
                               1584                 :  * Supposed to be used by external (i.e. not snapbuild.c) code that just read
                               1585                 :  * a record that's a potential location for a serialized snapshot.
                               1586                 :  */
                               1587                 : void
 3324 rhaas                    1588 GIC          23 : SnapBuildSerializationPoint(SnapBuild *builder, XLogRecPtr lsn)
                               1589                 : {
                               1590              23 :     if (builder->state < SNAPBUILD_CONSISTENT)
 3324 rhaas                    1591 UIC           0 :         SnapBuildRestore(builder, lsn);
                               1592                 :     else
 3324 rhaas                    1593 GIC          23 :         SnapBuildSerialize(builder, lsn);
                               1594              23 : }
 3324 rhaas                    1595 ECB             : 
                               1596                 : /*
                               1597                 :  * Serialize the snapshot 'builder' at the location 'lsn' if it hasn't already
 3324 rhaas                    1598 EUB             :  * been done by another decoding process.
                               1599                 :  */
 3324 rhaas                    1600 ECB             : static void
 3324 rhaas                    1601 CBC         273 : SnapBuildSerialize(SnapBuild *builder, XLogRecPtr lsn)
                               1602                 : {
                               1603                 :     Size        needed_length;
  816 akapila                  1604 GIC         273 :     SnapBuildOnDisk *ondisk = NULL;
  241 akapila                  1605 GNC         273 :     TransactionId *catchange_xip = NULL;
                               1606                 :     MemoryContext old_ctx;
                               1607                 :     size_t      catchange_xcnt;
                               1608                 :     char       *ondisk_c;
                               1609                 :     int         fd;
                               1610                 :     char        tmppath[MAXPGPATH];
 3324 rhaas                    1611 ECB             :     char        path[MAXPGPATH];
                               1612                 :     int         ret;
                               1613                 :     struct stat stat_buf;
 3261 heikki.linnakangas       1614                 :     Size        sz;
 3324 rhaas                    1615                 : 
 3324 rhaas                    1616 GIC         273 :     Assert(lsn != InvalidXLogRecPtr);
                               1617             273 :     Assert(builder->last_serialized_snapshot == InvalidXLogRecPtr ||
                               1618                 :            builder->last_serialized_snapshot <= lsn);
                               1619                 : 
                               1620                 :     /*
                               1621                 :      * no point in serializing if we cannot continue to work immediately after
                               1622                 :      * restoring the snapshot
                               1623                 :      */
                               1624             273 :     if (builder->state < SNAPBUILD_CONSISTENT)
 3324 rhaas                    1625 UIC           0 :         return;
 3324 rhaas                    1626 ECB             : 
  783 andres                   1627                 :     /* consistent snapshots have no next phase */
  783 andres                   1628 GIC         273 :     Assert(builder->next_phase_at == InvalidTransactionId);
                               1629                 : 
                               1630                 :     /*
                               1631                 :      * We identify snapshots by the LSN they are valid for. We don't need to
                               1632                 :      * include timelines in the name as each LSN maps to exactly one timeline
                               1633                 :      * unless the user used pg_resetwal or similar. If a user did so, there's
 3324 rhaas                    1634 ECB             :      * no hope continuing to decode anyway.
 3324 rhaas                    1635 EUB             :      */
 3203 andres                   1636 GIC         273 :     sprintf(path, "pg_logical/snapshots/%X-%X.snap",
  775 peter                    1637             273 :             LSN_FORMAT_ARGS(lsn));
 3324 rhaas                    1638 ECB             : 
                               1639                 :     /*
                               1640                 :      * first check whether some other backend already has written the snapshot
                               1641                 :      * for this LSN. It's perfectly fine if there's none, so we accept ENOENT
                               1642                 :      * as a valid state. Everything else is an unexpected error.
                               1643                 :      */
 3324 rhaas                    1644 GIC         273 :     ret = stat(path, &stat_buf);
                               1645                 : 
 3324 rhaas                    1646 CBC         273 :     if (ret != 0 && errno != ENOENT)
 3324 rhaas                    1647 LBC           0 :         ereport(ERROR,
                               1648                 :                 (errcode_for_file_access(),
                               1649                 :                  errmsg("could not stat file \"%s\": %m", path)));
                               1650                 : 
 3324 rhaas                    1651 GIC         273 :     else if (ret == 0)
                               1652                 :     {
                               1653                 :         /*
 3324 rhaas                    1654 ECB             :          * somebody else has already serialized to this point, don't overwrite
                               1655                 :          * but remember location, so we don't need to read old data again.
                               1656                 :          *
 3324 rhaas                    1657 EUB             :          * To be sure it has been synced to disk after the rename() from the
                               1658                 :          * tempfile filename to the real filename, we just repeat the fsync.
                               1659                 :          * That ought to be cheap because in most scenarios it should already
                               1660                 :          * be safely on disk.
 3324 rhaas                    1661 ECB             :          */
 3324 rhaas                    1662 GIC          65 :         fsync_fname(path, false);
 3203 andres                   1663              65 :         fsync_fname("pg_logical/snapshots", true);
                               1664                 : 
 3324 rhaas                    1665              65 :         builder->last_serialized_snapshot = lsn;
                               1666              65 :         goto out;
                               1667                 :     }
                               1668                 : 
                               1669                 :     /*
                               1670                 :      * there is an obvious race condition here between the time we stat(2) the
                               1671                 :      * file and us writing the file. But we rename the file into place
 3324 rhaas                    1672 ECB             :      * atomically and all files created need to contain the same data anyway,
                               1673                 :      * so this is perfectly fine, although a bit of a resource waste. Locking
                               1674                 :      * seems like pointless complication.
                               1675                 :      */
 3324 rhaas                    1676 CBC         208 :     elog(DEBUG1, "serializing snapshot to %s", path);
                               1677                 : 
                               1678                 :     /* to make sure only we will write to this tempfile, include pid */
  720 peter                    1679 GIC         208 :     sprintf(tmppath, "pg_logical/snapshots/%X-%X.snap.%d.tmp",
  775                          1680             208 :             LSN_FORMAT_ARGS(lsn), MyProcPid);
                               1681                 : 
                               1682                 :     /*
                               1683                 :      * Unlink temporary file if it already exists, needs to have been before a
                               1684                 :      * crash/error since we won't enter this function twice from within a
                               1685                 :      * single decoding slot/backend and the temporary file contains the pid of
 3324 rhaas                    1686 ECB             :      * the current process.
                               1687                 :      */
 3324 rhaas                    1688 GIC         208 :     if (unlink(tmppath) != 0 && errno != ENOENT)
 3324 rhaas                    1689 LBC           0 :         ereport(ERROR,
 3324 rhaas                    1690 ECB             :                 (errcode_for_file_access(),
                               1691                 :                  errmsg("could not remove file \"%s\": %m", tmppath)));
                               1692                 : 
  241 akapila                  1693 GNC         208 :     old_ctx = MemoryContextSwitchTo(builder->context);
                               1694                 : 
                               1695                 :     /* Get the catalog modifying transactions that are yet not committed */
                               1696             208 :     catchange_xip = ReorderBufferGetCatalogChangesXacts(builder->reorder);
  158 drowley                  1697             208 :     catchange_xcnt = dclist_count(&builder->reorder->catchange_txns);
                               1698                 : 
 3324 rhaas                    1699             208 :     needed_length = sizeof(SnapBuildOnDisk) +
  241 akapila                  1700             208 :         sizeof(TransactionId) * (builder->committed.xcnt + catchange_xcnt);
                               1701                 : 
                               1702             208 :     ondisk_c = palloc0(needed_length);
 3324 rhaas                    1703 GIC         208 :     ondisk = (SnapBuildOnDisk *) ondisk_c;
 3324 rhaas                    1704 CBC         208 :     ondisk->magic = SNAPBUILD_MAGIC;
 3324 rhaas                    1705 GBC         208 :     ondisk->version = SNAPBUILD_VERSION;
 3324 rhaas                    1706 GIC         208 :     ondisk->length = needed_length;
 3078 heikki.linnakangas       1707             208 :     INIT_CRC32C(ondisk->checksum);
                               1708             208 :     COMP_CRC32C(ondisk->checksum,
 3078 heikki.linnakangas       1709 ECB             :                 ((char *) ondisk) + SnapBuildOnDiskNotChecksummedSize,
                               1710                 :                 SnapBuildOnDiskConstantSize - SnapBuildOnDiskNotChecksummedSize);
 3324 rhaas                    1711 GIC         208 :     ondisk_c += sizeof(SnapBuildOnDisk);
 3324 rhaas                    1712 ECB             : 
 3324 rhaas                    1713 CBC         208 :     memcpy(&ondisk->builder, builder, sizeof(SnapBuild));
                               1714                 :     /* NULL-ify memory-only data */
                               1715             208 :     ondisk->builder.context = NULL;
                               1716             208 :     ondisk->builder.snapshot = NULL;
 3324 rhaas                    1717 GIC         208 :     ondisk->builder.reorder = NULL;
 3324 rhaas                    1718 CBC         208 :     ondisk->builder.committed.xip = NULL;
  241 akapila                  1719 GNC         208 :     ondisk->builder.catchange.xip = NULL;
                               1720                 :     /* update catchange only on disk data */
                               1721             208 :     ondisk->builder.catchange.xcnt = catchange_xcnt;
 3324 rhaas                    1722 ECB             : 
 3078 heikki.linnakangas       1723 CBC         208 :     COMP_CRC32C(ondisk->checksum,
 3078 heikki.linnakangas       1724 ECB             :                 &ondisk->builder,
                               1725                 :                 sizeof(SnapBuild));
 3324 rhaas                    1726                 : 
                               1727                 :     /* copy committed xacts */
  241 akapila                  1728 GNC         208 :     if (builder->committed.xcnt > 0)
                               1729                 :     {
                               1730              44 :         sz = sizeof(TransactionId) * builder->committed.xcnt;
                               1731              44 :         memcpy(ondisk_c, builder->committed.xip, sz);
                               1732              44 :         COMP_CRC32C(ondisk->checksum, ondisk_c, sz);
                               1733              44 :         ondisk_c += sz;
                               1734                 :     }
                               1735                 : 
                               1736                 :     /* copy catalog modifying xacts */
                               1737             208 :     if (catchange_xcnt > 0)
                               1738                 :     {
                               1739               6 :         sz = sizeof(TransactionId) * catchange_xcnt;
                               1740               6 :         memcpy(ondisk_c, catchange_xip, sz);
                               1741               6 :         COMP_CRC32C(ondisk->checksum, ondisk_c, sz);
                               1742               6 :         ondisk_c += sz;
                               1743                 :     }
 3324 rhaas                    1744 ECB             : 
 3070 andres                   1745 GIC         208 :     FIN_CRC32C(ondisk->checksum);
 3070 andres                   1746 ECB             : 
 3324 rhaas                    1747                 :     /* we have valid data now, open tempfile and write it there */
 3324 rhaas                    1748 CBC         208 :     fd = OpenTransientFile(tmppath,
 2024 peter_e                  1749 ECB             :                            O_CREAT | O_EXCL | O_WRONLY | PG_BINARY);
 3324 rhaas                    1750 CBC         208 :     if (fd < 0)
 3324 rhaas                    1751 UIC           0 :         ereport(ERROR,
 1517 tgl                      1752 ECB             :                 (errcode_for_file_access(),
                               1753                 :                  errmsg("could not open file \"%s\": %m", tmppath)));
 3324 rhaas                    1754                 : 
 1708 michael                  1755 GIC         208 :     errno = 0;
 2213 rhaas                    1756             208 :     pgstat_report_wait_start(WAIT_EVENT_SNAPBUILD_WRITE);
 3324                          1757             208 :     if ((write(fd, ondisk, needed_length)) != needed_length)
                               1758                 :     {
 1749 michael                  1759 LBC           0 :         int         save_errno = errno;
                               1760                 : 
 3324 rhaas                    1761               0 :         CloseTransientFile(fd);
 1749 michael                  1762 ECB             : 
                               1763                 :         /* if write didn't set errno, assume problem is no disk space */
 1749 michael                  1764 LBC           0 :         errno = save_errno ? save_errno : ENOSPC;
 3324 rhaas                    1765 UIC           0 :         ereport(ERROR,
                               1766                 :                 (errcode_for_file_access(),
                               1767                 :                  errmsg("could not write to file \"%s\": %m", tmppath)));
 3324 rhaas                    1768 ECB             :     }
 2213 rhaas                    1769 GIC         208 :     pgstat_report_wait_end();
 3324 rhaas                    1770 ECB             : 
                               1771                 :     /*
                               1772                 :      * fsync the file before renaming so that even if we crash after this we
                               1773                 :      * have either a fully valid file or nothing.
                               1774                 :      *
                               1775                 :      * It's safe to just ERROR on fsync() here because we'll retry the whole
 1602 tmunro                   1776                 :      * operation including the writes.
                               1777                 :      *
                               1778                 :      * TODO: Do the fsync() via checkpoints/restartpoints, doing it here has
 3324 rhaas                    1779                 :      * some noticeable overhead since it's performed synchronously during
                               1780                 :      * decoding?
                               1781                 :      */
 2213 rhaas                    1782 GBC         208 :     pgstat_report_wait_start(WAIT_EVENT_SNAPBUILD_SYNC);
 3324 rhaas                    1783 GIC         208 :     if (pg_fsync(fd) != 0)
                               1784                 :     {
 1749 michael                  1785 UIC           0 :         int         save_errno = errno;
 1749 michael                  1786 ECB             : 
 3324 rhaas                    1787 LBC           0 :         CloseTransientFile(fd);
 1749 michael                  1788               0 :         errno = save_errno;
 3324 rhaas                    1789 UIC           0 :         ereport(ERROR,
 3324 rhaas                    1790 EUB             :                 (errcode_for_file_access(),
                               1791                 :                  errmsg("could not fsync file \"%s\": %m", tmppath)));
                               1792                 :     }
 2213 rhaas                    1793 GIC         208 :     pgstat_report_wait_end();
                               1794                 : 
 1373 peter                    1795 GBC         208 :     if (CloseTransientFile(fd) != 0)
 1492 michael                  1796 UBC           0 :         ereport(ERROR,
                               1797                 :                 (errcode_for_file_access(),
                               1798                 :                  errmsg("could not close file \"%s\": %m", tmppath)));
                               1799                 : 
 3203 andres                   1800 CBC         208 :     fsync_fname("pg_logical/snapshots", true);
                               1801                 : 
                               1802                 :     /*
                               1803                 :      * We may overwrite the work from some other backend, but that's ok, our
                               1804                 :      * snapshot is valid as well, we'll just have done some superfluous work.
                               1805                 :      */
 3324 rhaas                    1806 GIC         208 :     if (rename(tmppath, path) != 0)
                               1807                 :     {
 3324 rhaas                    1808 UIC           0 :         ereport(ERROR,
                               1809                 :                 (errcode_for_file_access(),
                               1810                 :                  errmsg("could not rename file \"%s\" to \"%s\": %m",
                               1811                 :                         tmppath, path)));
                               1812                 :     }
 3324 rhaas                    1813 ECB             : 
                               1814                 :     /* make sure we persist */
 3324 rhaas                    1815 GIC         208 :     fsync_fname(path, false);
 3203 andres                   1816 GBC         208 :     fsync_fname("pg_logical/snapshots", true);
                               1817                 : 
 3324 rhaas                    1818 EUB             :     /*
                               1819                 :      * Now there's no way we can lose the dumped state anymore, remember this
 3260 bruce                    1820                 :      * as a serialization point.
                               1821                 :      */
 3324 rhaas                    1822 GIC         208 :     builder->last_serialized_snapshot = lsn;
                               1823                 : 
  241 akapila                  1824 GNC         208 :     MemoryContextSwitchTo(old_ctx);
                               1825                 : 
 3324 rhaas                    1826 CBC         273 : out:
 3324 rhaas                    1827 GIC         273 :     ReorderBufferSetRestartPoint(builder->reorder,
 3324 rhaas                    1828 ECB             :                                  builder->last_serialized_snapshot);
  816 akapila                  1829 EUB             :     /* be tidy */
  816 akapila                  1830 GIC         273 :     if (ondisk)
                               1831             208 :         pfree(ondisk);
  241 akapila                  1832 GNC         273 :     if (catchange_xip)
                               1833               6 :         pfree(catchange_xip);
                               1834                 : }
 3324 rhaas                    1835 ECB             : 
                               1836                 : /*
                               1837                 :  * Restore a snapshot into 'builder' if previously one has been stored at the
                               1838                 :  * location indicated by 'lsn'. Returns true if successful, false otherwise.
                               1839                 :  */
                               1840                 : static bool
 3324 rhaas                    1841 CBC          31 : SnapBuildRestore(SnapBuild *builder, XLogRecPtr lsn)
                               1842                 : {
 3324 rhaas                    1843 EUB             :     SnapBuildOnDisk ondisk;
                               1844                 :     int         fd;
                               1845                 :     char        path[MAXPGPATH];
                               1846                 :     Size        sz;
                               1847                 :     pg_crc32c   checksum;
                               1848                 : 
 3324 rhaas                    1849 ECB             :     /* no point in loading a snapshot if we're already there */
 3324 rhaas                    1850 CBC          31 :     if (builder->state == SNAPBUILD_CONSISTENT)
 3324 rhaas                    1851 UIC           0 :         return false;
                               1852                 : 
 3203 andres                   1853 GIC          31 :     sprintf(path, "pg_logical/snapshots/%X-%X.snap",
  775 peter                    1854              31 :             LSN_FORMAT_ARGS(lsn));
                               1855                 : 
 2024 peter_e                  1856 CBC          31 :     fd = OpenTransientFile(path, O_RDONLY | PG_BINARY);
                               1857                 : 
 3324 rhaas                    1858              31 :     if (fd < 0 && errno == ENOENT)
 3324 rhaas                    1859 GIC          25 :         return false;
 3324 rhaas                    1860 CBC           6 :     else if (fd < 0)
 3324 rhaas                    1861 LBC           0 :         ereport(ERROR,
                               1862                 :                 (errcode_for_file_access(),
                               1863                 :                  errmsg("could not open file \"%s\": %m", path)));
 3324 rhaas                    1864 ECB             : 
                               1865                 :     /* ----
                               1866                 :      * Make sure the snapshot had been stored safely to disk, that's normally
                               1867                 :      * cheap.
                               1868                 :      * Note that we do not need PANIC here, nobody will be able to use the
                               1869                 :      * slot without fsyncing, and saving it won't succeed without an fsync()
                               1870                 :      * either...
                               1871                 :      * ----
                               1872                 :      */
 3324 rhaas                    1873 GIC           6 :     fsync_fname(path, false);
 3203 andres                   1874               6 :     fsync_fname("pg_logical/snapshots", true);
 3324 rhaas                    1875 ECB             : 
                               1876                 : 
                               1877                 :     /* read statically sized portion of snapshot */
  241 akapila                  1878 GNC           6 :     SnapBuildRestoreContents(fd, (char *) &ondisk, SnapBuildOnDiskConstantSize, path);
                               1879                 : 
 3324 rhaas                    1880 GIC           6 :     if (ondisk.magic != SNAPBUILD_MAGIC)
 3324 rhaas                    1881 UIC           0 :         ereport(ERROR,
                               1882                 :                 (errcode(ERRCODE_DATA_CORRUPTED),
                               1883                 :                  errmsg("snapbuild state file \"%s\" has wrong magic number: %u instead of %u",
                               1884                 :                         path, ondisk.magic, SNAPBUILD_MAGIC)));
 3324 rhaas                    1885 ECB             : 
 3324 rhaas                    1886 CBC           6 :     if (ondisk.version != SNAPBUILD_VERSION)
 3324 rhaas                    1887 UIC           0 :         ereport(ERROR,
                               1888                 :                 (errcode(ERRCODE_DATA_CORRUPTED),
                               1889                 :                  errmsg("snapbuild state file \"%s\" has unsupported version: %u instead of %u",
 3324 rhaas                    1890 ECB             :                         path, ondisk.version, SNAPBUILD_VERSION)));
                               1891                 : 
 3078 heikki.linnakangas       1892 CBC           6 :     INIT_CRC32C(checksum);
 3078 heikki.linnakangas       1893 GBC           6 :     COMP_CRC32C(checksum,
                               1894                 :                 ((char *) &ondisk) + SnapBuildOnDiskNotChecksummedSize,
                               1895                 :                 SnapBuildOnDiskConstantSize - SnapBuildOnDiskNotChecksummedSize);
                               1896                 : 
                               1897                 :     /* read SnapBuild */
  241 akapila                  1898 GNC           6 :     SnapBuildRestoreContents(fd, (char *) &ondisk.builder, sizeof(SnapBuild), path);
 3078 heikki.linnakangas       1899 GIC           6 :     COMP_CRC32C(checksum, &ondisk.builder, sizeof(SnapBuild));
                               1900                 : 
                               1901                 :     /* restore committed xacts information */
  241 akapila                  1902 GNC           6 :     if (ondisk.builder.committed.xcnt > 0)
                               1903                 :     {
                               1904               2 :         sz = sizeof(TransactionId) * ondisk.builder.committed.xcnt;
                               1905               2 :         ondisk.builder.committed.xip = MemoryContextAllocZero(builder->context, sz);
                               1906               2 :         SnapBuildRestoreContents(fd, (char *) ondisk.builder.committed.xip, sz, path);
                               1907               2 :         COMP_CRC32C(checksum, ondisk.builder.committed.xip, sz);
                               1908                 :     }
                               1909                 : 
                               1910                 :     /* restore catalog modifying xacts information */
                               1911               6 :     if (ondisk.builder.catchange.xcnt > 0)
                               1912                 :     {
                               1913               3 :         sz = sizeof(TransactionId) * ondisk.builder.catchange.xcnt;
                               1914               3 :         ondisk.builder.catchange.xip = MemoryContextAllocZero(builder->context, sz);
                               1915               3 :         SnapBuildRestoreContents(fd, (char *) ondisk.builder.catchange.xip, sz, path);
                               1916               3 :         COMP_CRC32C(checksum, ondisk.builder.catchange.xip, sz);
                               1917                 :     }
                               1918                 : 
 1373 peter                    1919 GIC           6 :     if (CloseTransientFile(fd) != 0)
 1492 michael                  1920 UIC           0 :         ereport(ERROR,
                               1921                 :                 (errcode_for_file_access(),
                               1922                 :                  errmsg("could not close file \"%s\": %m", path)));
                               1923                 : 
 3070 andres                   1924 GIC           6 :     FIN_CRC32C(checksum);
 3070 andres                   1925 ECB             : 
 3324 rhaas                    1926 EUB             :     /* verify checksum of what we've read */
 3078 heikki.linnakangas       1927 GIC           6 :     if (!EQ_CRC32C(checksum, ondisk.checksum))
 3324 rhaas                    1928 UIC           0 :         ereport(ERROR,
                               1929                 :                 (errcode(ERRCODE_DATA_CORRUPTED),
                               1930                 :                  errmsg("checksum mismatch for snapbuild state file \"%s\": is %u, should be %u",
                               1931                 :                         path, checksum, ondisk.checksum)));
 3324 rhaas                    1932 ECB             : 
 3324 rhaas                    1933 EUB             :     /*
                               1934                 :      * ok, we now have a sensible snapshot here, figure out if it has more
                               1935                 :      * information than we have.
 3324 rhaas                    1936 ECB             :      */
                               1937                 : 
                               1938                 :     /*
                               1939                 :      * We are only interested in consistent snapshots for now, comparing
 2881 heikki.linnakangas       1940                 :      * whether one incomplete snapshot is more "advanced" seems to be
 3324 rhaas                    1941                 :      * unnecessarily complex.
                               1942                 :      */
 3324 rhaas                    1943 CBC           6 :     if (ondisk.builder.state < SNAPBUILD_CONSISTENT)
 3324 rhaas                    1944 UIC           0 :         goto snapshot_not_interesting;
                               1945                 : 
 3324 rhaas                    1946 ECB             :     /*
                               1947                 :      * Don't use a snapshot that requires an xmin that we cannot guarantee to
                               1948                 :      * be available.
                               1949                 :      */
 3324 rhaas                    1950 CBC           6 :     if (TransactionIdPrecedes(ondisk.builder.xmin, builder->initial_xmin_horizon))
 3324 rhaas                    1951 UIC           0 :         goto snapshot_not_interesting;
 3324 rhaas                    1952 ECB             : 
                               1953                 :     /* consistent snapshots have no next phase */
  783 andres                   1954 GIC           6 :     Assert(ondisk.builder.next_phase_at == InvalidTransactionId);
 3324 rhaas                    1955 ECB             : 
 3324 rhaas                    1956 EUB             :     /* ok, we think the snapshot is sensible, copy over everything important */
 3324 rhaas                    1957 CBC           6 :     builder->xmin = ondisk.builder.xmin;
                               1958               6 :     builder->xmax = ondisk.builder.xmax;
                               1959               6 :     builder->state = ondisk.builder.state;
                               1960                 : 
 3324 rhaas                    1961 GIC           6 :     builder->committed.xcnt = ondisk.builder.committed.xcnt;
 3324 rhaas                    1962 ECB             :     /* We only allocated/stored xcnt, not xcnt_space xids ! */
                               1963                 :     /* don't overwrite preallocated xip, if we don't have anything here */
 3324 rhaas                    1964 GBC           6 :     if (builder->committed.xcnt > 0)
                               1965                 :     {
 3324 rhaas                    1966 CBC           2 :         pfree(builder->committed.xip);
                               1967               2 :         builder->committed.xcnt_space = ondisk.builder.committed.xcnt;
 3324 rhaas                    1968 GIC           2 :         builder->committed.xip = ondisk.builder.committed.xip;
 3324 rhaas                    1969 ECB             :     }
 3324 rhaas                    1970 GIC           6 :     ondisk.builder.committed.xip = NULL;
 3324 rhaas                    1971 ECB             : 
                               1972                 :     /* set catalog modifying transactions */
  241 akapila                  1973 GNC           6 :     if (builder->catchange.xip)
  241 akapila                  1974 UNC           0 :         pfree(builder->catchange.xip);
  241 akapila                  1975 GNC           6 :     builder->catchange.xcnt = ondisk.builder.catchange.xcnt;
                               1976               6 :     builder->catchange.xip = ondisk.builder.catchange.xip;
                               1977               6 :     ondisk.builder.catchange.xip = NULL;
                               1978                 : 
                               1979                 :     /* our snapshot is not interesting anymore, build a new one */
 3324 rhaas                    1980 CBC           6 :     if (builder->snapshot != NULL)
                               1981                 :     {
 3324 rhaas                    1982 UIC           0 :         SnapBuildSnapDecRefcount(builder->snapshot);
                               1983                 :     }
 2125 andres                   1984 CBC           6 :     builder->snapshot = SnapBuildBuildSnapshot(builder);
 3324 rhaas                    1985 GIC           6 :     SnapBuildSnapIncRefcount(builder->snapshot);
 3324 rhaas                    1986 EUB             : 
 3324 rhaas                    1987 GBC           6 :     ReorderBufferSetRestartPoint(builder->reorder, lsn);
 3324 rhaas                    1988 EUB             : 
 3324 rhaas                    1989 GBC           6 :     Assert(builder->state == SNAPBUILD_CONSISTENT);
 3324 rhaas                    1990 EUB             : 
 3324 rhaas                    1991 GBC           6 :     ereport(LOG,
                               1992                 :             (errmsg("logical decoding found consistent point at %X/%X",
                               1993                 :                     LSN_FORMAT_ARGS(lsn)),
                               1994                 :              errdetail("Logical decoding will begin using saved snapshot.")));
 3324 rhaas                    1995 GIC           6 :     return true;
                               1996                 : 
 3324 rhaas                    1997 UIC           0 : snapshot_not_interesting:
 3324 rhaas                    1998 LBC           0 :     if (ondisk.builder.committed.xip != NULL)
 3324 rhaas                    1999 UIC           0 :         pfree(ondisk.builder.committed.xip);
  241 akapila                  2000 UNC           0 :     if (ondisk.builder.catchange.xip != NULL)
                               2001               0 :         pfree(ondisk.builder.catchange.xip);
 3324 rhaas                    2002 UIC           0 :     return false;
                               2003                 : }
 3324 rhaas                    2004 ECB             : 
                               2005                 : /*
                               2006                 :  * Read the contents of the serialized snapshot to 'dest'.
                               2007                 :  */
                               2008                 : static void
  241 akapila                  2009 GNC          17 : SnapBuildRestoreContents(int fd, char *dest, Size size, const char *path)
                               2010                 : {
                               2011                 :     int         readBytes;
                               2012                 : 
                               2013              17 :     pgstat_report_wait_start(WAIT_EVENT_SNAPBUILD_READ);
                               2014              17 :     readBytes = read(fd, dest, size);
                               2015              17 :     pgstat_report_wait_end();
                               2016              17 :     if (readBytes != size)
                               2017                 :     {
  241 akapila                  2018 UNC           0 :         int         save_errno = errno;
                               2019                 : 
                               2020               0 :         CloseTransientFile(fd);
                               2021                 : 
                               2022               0 :         if (readBytes < 0)
                               2023                 :         {
                               2024               0 :             errno = save_errno;
                               2025               0 :             ereport(ERROR,
                               2026                 :                     (errcode_for_file_access(),
                               2027                 :                      errmsg("could not read file \"%s\": %m", path)));
                               2028                 :         }
                               2029                 :         else
                               2030               0 :             ereport(ERROR,
                               2031                 :                     (errcode(ERRCODE_DATA_CORRUPTED),
                               2032                 :                      errmsg("could not read file \"%s\": read %d of %zu",
                               2033                 :                             path, readBytes, sizeof(SnapBuild))));
                               2034                 :     }
  241 akapila                  2035 GNC          17 : }
                               2036                 : 
 3324 rhaas                    2037 ECB             : /*
                               2038                 :  * Remove all serialized snapshots that are not required anymore because no
                               2039                 :  * slot can need them. This doesn't actually have to run during a checkpoint,
                               2040                 :  * but it's a convenient point to schedule this.
 3324 rhaas                    2041 EUB             :  *
                               2042                 :  * NB: We run this during checkpoints even if logical decoding is disabled so
                               2043                 :  * we cleanup old slots at some point after it got disabled.
                               2044                 :  */
                               2045                 : void
 3324 rhaas                    2046 GIC        2363 : CheckPointSnapBuild(void)
 3324 rhaas                    2047 EUB             : {
                               2048                 :     XLogRecPtr  cutoff;
                               2049                 :     XLogRecPtr  redo;
                               2050                 :     DIR        *snap_dir;
                               2051                 :     struct dirent *snap_de;
                               2052                 :     char        path[MAXPGPATH + 21];
                               2053                 : 
                               2054                 :     /*
                               2055                 :      * We start off with a minimum of the last redo pointer. No new
                               2056                 :      * replication slot will start before that, so that's a safe upper bound
                               2057                 :      * for removal.
 3324 rhaas                    2058 ECB             :      */
 3324 rhaas                    2059 GIC        2363 :     redo = GetRedoRecPtr();
                               2060                 : 
                               2061                 :     /* now check for the restart ptrs from existing slots */
                               2062            2363 :     cutoff = ReplicationSlotsComputeLogicalRestartLSN();
                               2063                 : 
                               2064                 :     /* don't start earlier than the restart lsn */
                               2065            2363 :     if (redo < cutoff)
 3324 rhaas                    2066 UIC           0 :         cutoff = redo;
                               2067                 : 
 3203 andres                   2068 GIC        2363 :     snap_dir = AllocateDir("pg_logical/snapshots");
 3203 andres                   2069 CBC        7279 :     while ((snap_de = ReadDir(snap_dir, "pg_logical/snapshots")) != NULL)
                               2070                 :     {
                               2071                 :         uint32      hi;
                               2072                 :         uint32      lo;
                               2073                 :         XLogRecPtr  lsn;
                               2074                 :         PGFileType  de_type;
                               2075                 : 
 3324 rhaas                    2076 GIC        4916 :         if (strcmp(snap_de->d_name, ".") == 0 ||
                               2077            2553 :             strcmp(snap_de->d_name, "..") == 0)
                               2078            4726 :             continue;
                               2079                 : 
 2189 peter_e                  2080             190 :         snprintf(path, sizeof(path), "pg_logical/snapshots/%s", snap_de->d_name);
  219 michael                  2081 GNC         190 :         de_type = get_dirent_type(path, snap_de, false, DEBUG1);
                               2082                 : 
                               2083             190 :         if (de_type != PGFILETYPE_ERROR && de_type != PGFILETYPE_REG)
                               2084                 :         {
 3324 rhaas                    2085 UIC           0 :             elog(DEBUG1, "only regular files expected: %s", path);
 3324 rhaas                    2086 LBC           0 :             continue;
                               2087                 :         }
                               2088                 : 
 3324 rhaas                    2089 ECB             :         /*
 3324 rhaas                    2090 EUB             :          * temporary filenames from SnapBuildSerialize() include the LSN and
                               2091                 :          * everything but are postfixed by .$pid.tmp. We can just remove them
 3260 bruce                    2092 ECB             :          * the same as other files because there can be none that are
                               2093                 :          * currently being written that are older than cutoff.
                               2094                 :          *
                               2095                 :          * We just log a message if a file doesn't fit the pattern, it's
                               2096                 :          * probably some editors lock/state file or similar...
                               2097                 :          */
 3324 rhaas                    2098 GIC         190 :         if (sscanf(snap_de->d_name, "%X-%X.snap", &hi, &lo) != 2)
                               2099                 :         {
 3324 rhaas                    2100 LBC           0 :             ereport(LOG,
 3138 peter_e                  2101 ECB             :                     (errmsg("could not parse file name \"%s\"", path)));
 3324 rhaas                    2102 LBC           0 :             continue;
                               2103                 :         }
 3324 rhaas                    2104 ECB             : 
 3324 rhaas                    2105 CBC         190 :         lsn = ((uint64) hi) << 32 | lo;
                               2106                 : 
 3324 rhaas                    2107 ECB             :         /* check whether we still need it */
 3324 rhaas                    2108 GIC         190 :         if (lsn < cutoff || cutoff == InvalidXLogRecPtr)
 3324 rhaas                    2109 EUB             :         {
 3324 rhaas                    2110 GBC         147 :             elog(DEBUG1, "removing snapbuild snapshot %s", path);
                               2111                 : 
                               2112                 :             /*
                               2113                 :              * It's not particularly harmful, though strange, if we can't
                               2114                 :              * remove the file here. Don't prevent the checkpoint from
                               2115                 :              * completing, that'd be a cure worse than the disease.
                               2116                 :              */
 3324 rhaas                    2117 GIC         147 :             if (unlink(path) < 0)
                               2118                 :             {
 3324 rhaas                    2119 UIC           0 :                 ereport(LOG,
                               2120                 :                         (errcode_for_file_access(),
                               2121                 :                          errmsg("could not remove file \"%s\": %m",
 3324 rhaas                    2122 ECB             :                                 path)));
 3324 rhaas                    2123 UIC           0 :                 continue;
 3324 rhaas                    2124 EUB             :             }
                               2125                 :         }
                               2126                 :     }
 3324 rhaas                    2127 GIC        2363 :     FreeDir(snap_dir);
                               2128            2363 : }
        

Generated by: LCOV version v1.16-55-g56c0a2a