LCOV - differential code coverage report
Current view: top level - src/backend/replication/logical - snapbuild.c (source / functions) Coverage Total Hit UNC LBC UIC UBC GBC GIC GNC CBC EUB ECB DUB DCB
Current: Differential Code Coverage HEAD vs 15 Lines: 81.8 % 570 466 11 20 60 13 20 258 77 111 63 273 8 62
Current Date: 2023-04-08 15:15:32 Functions: 96.7 % 30 29 1 26 3 1 29
Baseline: 15
Baseline Date: 2023-04-08 15:09:40
Legend: Lines: hit not hit

           TLA  Line data    Source code
       1                 : /*-------------------------------------------------------------------------
       2                 :  *
       3                 :  * snapbuild.c
       4                 :  *
       5                 :  *    Infrastructure for building historic catalog snapshots based on contents
       6                 :  *    of the WAL, for the purpose of decoding heapam.c style values in the
       7                 :  *    WAL.
       8                 :  *
       9                 :  * NOTES:
      10                 :  *
      11                 :  * We build snapshots which can *only* be used to read catalog contents and we
      12                 :  * do so by reading and interpreting the WAL stream. The aim is to build a
      13                 :  * snapshot that behaves the same as a freshly taken MVCC snapshot would have
      14                 :  * at the time the XLogRecord was generated.
      15                 :  *
      16                 :  * To build the snapshots we reuse the infrastructure built for Hot
      17                 :  * Standby. The in-memory snapshots we build look different than HS' because
      18                 :  * we have different needs. To successfully decode data from the WAL we only
      19                 :  * need to access catalog tables and (sys|rel|cat)cache, not the actual user
      20                 :  * tables since the data we decode is wholly contained in the WAL
      21                 :  * records. Also, our snapshots need to be different in comparison to normal
      22                 :  * MVCC ones because in contrast to those we cannot fully rely on the clog and
      23                 :  * pg_subtrans for information about committed transactions because they might
      24                 :  * commit in the future from the POV of the WAL entry we're currently
      25                 :  * decoding. This definition has the advantage that we only need to prevent
      26                 :  * removal of catalog rows, while normal table's rows can still be
      27                 :  * removed. This is achieved by using the replication slot mechanism.
      28                 :  *
      29                 :  * As the percentage of transactions modifying the catalog normally is fairly
      30                 :  * small in comparisons to ones only manipulating user data, we keep track of
      31                 :  * the committed catalog modifying ones inside [xmin, xmax) instead of keeping
      32                 :  * track of all running transactions like it's done in a normal snapshot. Note
      33                 :  * that we're generally only looking at transactions that have acquired an
      34                 :  * xid. That is we keep a list of transactions between snapshot->(xmin, xmax)
      35                 :  * that we consider committed, everything else is considered aborted/in
      36                 :  * progress. That also allows us not to care about subtransactions before they
      37                 :  * have committed which means this module, in contrast to HS, doesn't have to
      38                 :  * care about suboverflowed subtransactions and similar.
      39                 :  *
      40                 :  * One complexity of doing this is that to e.g. handle mixed DDL/DML
      41                 :  * transactions we need Snapshots that see intermediate versions of the
      42                 :  * catalog in a transaction. During normal operation this is achieved by using
      43                 :  * CommandIds/cmin/cmax. The problem with that however is that for space
      44                 :  * efficiency reasons only one value of that is stored
      45                 :  * (cf. combocid.c). Since combo CIDs are only available in memory we log
      46                 :  * additional information which allows us to get the original (cmin, cmax)
      47                 :  * pair during visibility checks. Check the reorderbuffer.c's comment above
      48                 :  * ResolveCminCmaxDuringDecoding() for details.
      49                 :  *
      50                 :  * To facilitate all this we need our own visibility routine, as the normal
      51                 :  * ones are optimized for different usecases.
      52                 :  *
      53                 :  * To replace the normal catalog snapshots with decoding ones use the
      54                 :  * SetupHistoricSnapshot() and TeardownHistoricSnapshot() functions.
      55                 :  *
      56                 :  *
      57                 :  *
      58                 :  * The snapbuild machinery is starting up in several stages, as illustrated
      59                 :  * by the following graph describing the SnapBuild->state transitions:
      60                 :  *
      61                 :  *         +-------------------------+
      62                 :  *    +----|         START           |-------------+
      63                 :  *    |    +-------------------------+             |
      64                 :  *    |                 |                          |
      65                 :  *    |                 |                          |
      66                 :  *    |        running_xacts #1                    |
      67                 :  *    |                 |                          |
      68                 :  *    |                 |                          |
      69                 :  *    |                 v                          |
      70                 :  *    |    +-------------------------+             v
      71                 :  *    |    |   BUILDING_SNAPSHOT     |------------>|
      72                 :  *    |    +-------------------------+             |
      73                 :  *    |                 |                          |
      74                 :  *    |                 |                          |
      75                 :  *    | running_xacts #2, xacts from #1 finished   |
      76                 :  *    |                 |                          |
      77                 :  *    |                 |                          |
      78                 :  *    |                 v                          |
      79                 :  *    |    +-------------------------+             v
      80                 :  *    |    |       FULL_SNAPSHOT     |------------>|
      81                 :  *    |    +-------------------------+             |
      82                 :  *    |                 |                          |
      83                 :  * running_xacts        |                      saved snapshot
      84                 :  * with zero xacts      |                 at running_xacts's lsn
      85                 :  *    |                 |                          |
      86                 :  *    | running_xacts with xacts from #2 finished  |
      87                 :  *    |                 |                          |
      88                 :  *    |                 v                          |
      89                 :  *    |    +-------------------------+             |
      90                 :  *    +--->|SNAPBUILD_CONSISTENT  |<------------+
      91                 :  *         +-------------------------+
      92                 :  *
      93                 :  * Initially the machinery is in the START stage. When an xl_running_xacts
      94                 :  * record is read that is sufficiently new (above the safe xmin horizon),
      95                 :  * there's a state transition. If there were no running xacts when the
      96                 :  * xl_running_xacts record was generated, we'll directly go into CONSISTENT
      97                 :  * state, otherwise we'll switch to the BUILDING_SNAPSHOT state. Having a full
      98                 :  * snapshot means that all transactions that start henceforth can be decoded
      99                 :  * in their entirety, but transactions that started previously can't. In
     100                 :  * FULL_SNAPSHOT we'll switch into CONSISTENT once all those previously
     101                 :  * running transactions have committed or aborted.
     102                 :  *
     103                 :  * Only transactions that commit after CONSISTENT state has been reached will
     104                 :  * be replayed, even though they might have started while still in
     105                 :  * FULL_SNAPSHOT. That ensures that we'll reach a point where no previous
     106                 :  * changes has been exported, but all the following ones will be. That point
     107                 :  * is a convenient point to initialize replication from, which is why we
     108                 :  * export a snapshot at that point, which *can* be used to read normal data.
     109                 :  *
     110                 :  * Copyright (c) 2012-2023, PostgreSQL Global Development Group
     111                 :  *
     112                 :  * IDENTIFICATION
     113                 :  *    src/backend/replication/logical/snapbuild.c
     114                 :  *
     115                 :  *-------------------------------------------------------------------------
     116                 :  */
     117                 : 
     118                 : #include "postgres.h"
     119                 : 
     120                 : #include <sys/stat.h>
     121                 : #include <unistd.h>
     122                 : 
     123                 : #include "access/heapam_xlog.h"
     124                 : #include "access/transam.h"
     125                 : #include "access/xact.h"
     126                 : #include "common/file_utils.h"
     127                 : #include "miscadmin.h"
     128                 : #include "pgstat.h"
     129                 : #include "replication/logical.h"
     130                 : #include "replication/reorderbuffer.h"
     131                 : #include "replication/snapbuild.h"
     132                 : #include "storage/block.h"        /* debugging output */
     133                 : #include "storage/fd.h"
     134                 : #include "storage/lmgr.h"
     135                 : #include "storage/proc.h"
     136                 : #include "storage/procarray.h"
     137                 : #include "storage/standby.h"
     138                 : #include "utils/builtins.h"
     139                 : #include "utils/memutils.h"
     140                 : #include "utils/snapmgr.h"
     141                 : #include "utils/snapshot.h"
     142                 : 
     143                 : /*
     144                 :  * This struct contains the current state of the snapshot building
     145                 :  * machinery. Besides a forward declaration in the header, it is not exposed
     146                 :  * to the public, so we can easily change its contents.
     147                 :  */
     148                 : struct SnapBuild
     149                 : {
     150                 :     /* how far are we along building our first full snapshot */
     151                 :     SnapBuildState state;
     152                 : 
     153                 :     /* private memory context used to allocate memory for this module. */
     154                 :     MemoryContext context;
     155                 : 
     156                 :     /* all transactions < than this have committed/aborted */
     157                 :     TransactionId xmin;
     158                 : 
     159                 :     /* all transactions >= than this are uncommitted */
     160                 :     TransactionId xmax;
     161                 : 
     162                 :     /*
     163                 :      * Don't replay commits from an LSN < this LSN. This can be set externally
     164                 :      * but it will also be advanced (never retreat) from within snapbuild.c.
     165                 :      */
     166                 :     XLogRecPtr  start_decoding_at;
     167                 : 
     168                 :     /*
     169                 :      * LSN at which two-phase decoding was enabled or LSN at which we found a
     170                 :      * consistent point at the time of slot creation.
     171                 :      *
     172                 :      * The prepared transactions, that were skipped because previously
     173                 :      * two-phase was not enabled or are not covered by initial snapshot, need
     174                 :      * to be sent later along with commit prepared and they must be before
     175                 :      * this point.
     176                 :      */
     177                 :     XLogRecPtr  two_phase_at;
     178                 : 
     179                 :     /*
     180                 :      * Don't start decoding WAL until the "xl_running_xacts" information
     181                 :      * indicates there are no running xids with an xid smaller than this.
     182                 :      */
     183                 :     TransactionId initial_xmin_horizon;
     184                 : 
     185                 :     /* Indicates if we are building full snapshot or just catalog one. */
     186                 :     bool        building_full_snapshot;
     187                 : 
     188                 :     /*
     189                 :      * Snapshot that's valid to see the catalog state seen at this moment.
     190                 :      */
     191                 :     Snapshot    snapshot;
     192                 : 
     193                 :     /*
     194                 :      * LSN of the last location we are sure a snapshot has been serialized to.
     195                 :      */
     196                 :     XLogRecPtr  last_serialized_snapshot;
     197                 : 
     198                 :     /*
     199                 :      * The reorderbuffer we need to update with usable snapshots et al.
     200                 :      */
     201                 :     ReorderBuffer *reorder;
     202                 : 
     203                 :     /*
     204                 :      * TransactionId at which the next phase of initial snapshot building will
     205                 :      * happen. InvalidTransactionId if not known (i.e. SNAPBUILD_START), or
     206                 :      * when no next phase necessary (SNAPBUILD_CONSISTENT).
     207                 :      */
     208                 :     TransactionId next_phase_at;
     209                 : 
     210                 :     /*
     211                 :      * Array of transactions which could have catalog changes that committed
     212                 :      * between xmin and xmax.
     213                 :      */
     214                 :     struct
     215                 :     {
     216                 :         /* number of committed transactions */
     217                 :         size_t      xcnt;
     218                 : 
     219                 :         /* available space for committed transactions */
     220                 :         size_t      xcnt_space;
     221                 : 
     222                 :         /*
     223                 :          * Until we reach a CONSISTENT state, we record commits of all
     224                 :          * transactions, not just the catalog changing ones. Record when that
     225                 :          * changes so we know we cannot export a snapshot safely anymore.
     226                 :          */
     227                 :         bool        includes_all_transactions;
     228                 : 
     229                 :         /*
     230                 :          * Array of committed transactions that have modified the catalog.
     231                 :          *
     232                 :          * As this array is frequently modified we do *not* keep it in
     233                 :          * xidComparator order. Instead we sort the array when building &
     234                 :          * distributing a snapshot.
     235                 :          *
     236                 :          * TODO: It's unclear whether that reasoning has much merit. Every
     237                 :          * time we add something here after becoming consistent will also
     238                 :          * require distributing a snapshot. Storing them sorted would
     239                 :          * potentially also make it easier to purge (but more complicated wrt
     240                 :          * wraparound?). Should be improved if sorting while building the
     241                 :          * snapshot shows up in profiles.
     242                 :          */
     243                 :         TransactionId *xip;
     244                 :     }           committed;
     245                 : 
     246                 :     /*
     247                 :      * Array of transactions and subtransactions that had modified catalogs
     248                 :      * and were running when the snapshot was serialized.
     249                 :      *
     250                 :      * We normally rely on some WAL record types such as HEAP2_NEW_CID to know
     251                 :      * if the transaction has changed the catalog. But it could happen that
     252                 :      * the logical decoding decodes only the commit record of the transaction
     253                 :      * after restoring the previously serialized snapshot in which case we
     254                 :      * will miss adding the xid to the snapshot and end up looking at the
     255                 :      * catalogs with the wrong snapshot.
     256                 :      *
     257                 :      * Now to avoid the above problem, we serialize the transactions that had
     258                 :      * modified the catalogs and are still running at the time of snapshot
     259                 :      * serialization. We fill this array while restoring the snapshot and then
     260                 :      * refer it while decoding commit to ensure if the xact has modified the
     261                 :      * catalog. We discard this array when all the xids in the list become old
     262                 :      * enough to matter. See SnapBuildPurgeOlderTxn for details.
     263                 :      */
     264                 :     struct
     265                 :     {
     266                 :         /* number of transactions */
     267                 :         size_t      xcnt;
     268                 : 
     269                 :         /* This array must be sorted in xidComparator order */
     270                 :         TransactionId *xip;
     271                 :     }           catchange;
     272                 : };
     273                 : 
     274                 : /*
     275                 :  * Starting a transaction -- which we need to do while exporting a snapshot --
     276                 :  * removes knowledge about the previously used resowner, so we save it here.
     277                 :  */
     278                 : static ResourceOwner SavedResourceOwnerDuringExport = NULL;
     279                 : static bool ExportInProgress = false;
     280                 : 
     281                 : /* ->committed and ->catchange manipulation */
     282                 : static void SnapBuildPurgeOlderTxn(SnapBuild *builder);
     283                 : 
     284                 : /* snapshot building/manipulation/distribution functions */
     285                 : static Snapshot SnapBuildBuildSnapshot(SnapBuild *builder);
     286                 : 
     287                 : static void SnapBuildFreeSnapshot(Snapshot snap);
     288                 : 
     289                 : static void SnapBuildSnapIncRefcount(Snapshot snap);
     290                 : 
     291                 : static void SnapBuildDistributeNewCatalogSnapshot(SnapBuild *builder, XLogRecPtr lsn);
     292                 : 
     293                 : static inline bool SnapBuildXidHasCatalogChanges(SnapBuild *builder, TransactionId xid,
     294                 :                                                  uint32 xinfo);
     295                 : 
     296                 : /* xlog reading helper functions for SnapBuildProcessRunningXacts */
     297                 : static bool SnapBuildFindSnapshot(SnapBuild *builder, XLogRecPtr lsn, xl_running_xacts *running);
     298                 : static void SnapBuildWaitSnapshot(xl_running_xacts *running, TransactionId cutoff);
     299                 : 
     300                 : /* serialization functions */
     301                 : static void SnapBuildSerialize(SnapBuild *builder, XLogRecPtr lsn);
     302                 : static bool SnapBuildRestore(SnapBuild *builder, XLogRecPtr lsn);
     303                 : static void SnapBuildRestoreContents(int fd, char *dest, Size size, const char *path);
     304                 : 
     305                 : /*
     306                 :  * Allocate a new snapshot builder.
     307                 :  *
     308                 :  * xmin_horizon is the xid >= which we can be sure no catalog rows have been
     309                 :  * removed, start_lsn is the LSN >= we want to replay commits.
     310                 :  */
     311                 : SnapBuild *
     312 GIC         821 : AllocateSnapshotBuilder(ReorderBuffer *reorder,
     313                 :                         TransactionId xmin_horizon,
     314 ECB             :                         XLogRecPtr start_lsn,
     315                 :                         bool need_full_snapshot,
     316                 :                         XLogRecPtr two_phase_at)
     317                 : {
     318                 :     MemoryContext context;
     319                 :     MemoryContext oldcontext;
     320                 :     SnapBuild  *builder;
     321                 : 
     322                 :     /* allocate memory in own context, to have better accountability */
     323 GIC         821 :     context = AllocSetContextCreate(CurrentMemoryContext,
     324                 :                                     "snapshot builder context",
     325 ECB             :                                     ALLOCSET_DEFAULT_SIZES);
     326 GIC         821 :     oldcontext = MemoryContextSwitchTo(context);
     327                 : 
     328 CBC         821 :     builder = palloc0(sizeof(SnapBuild));
     329                 : 
     330             821 :     builder->state = SNAPBUILD_START;
     331 GIC         821 :     builder->context = context;
     332 CBC         821 :     builder->reorder = reorder;
     333 ECB             :     /* Other struct members initialized by zeroing via palloc0 above */
     334                 : 
     335 GIC         821 :     builder->committed.xcnt = 0;
     336             821 :     builder->committed.xcnt_space = 128; /* arbitrary number */
     337 CBC         821 :     builder->committed.xip =
     338             821 :         palloc0(builder->committed.xcnt_space * sizeof(TransactionId));
     339             821 :     builder->committed.includes_all_transactions = true;
     340 ECB             : 
     341 GNC         821 :     builder->catchange.xcnt = 0;
     342             821 :     builder->catchange.xip = NULL;
     343                 : 
     344 CBC         821 :     builder->initial_xmin_horizon = xmin_horizon;
     345 GIC         821 :     builder->start_decoding_at = start_lsn;
     346 CBC         821 :     builder->building_full_snapshot = need_full_snapshot;
     347             821 :     builder->two_phase_at = two_phase_at;
     348                 : 
     349             821 :     MemoryContextSwitchTo(oldcontext);
     350 ECB             : 
     351 CBC         821 :     return builder;
     352                 : }
     353 ECB             : 
     354                 : /*
     355                 :  * Free a snapshot builder.
     356                 :  */
     357                 : void
     358 GIC         679 : FreeSnapshotBuilder(SnapBuild *builder)
     359                 : {
     360 CBC         679 :     MemoryContext context = builder->context;
     361                 : 
     362 ECB             :     /* free snapshot explicitly, that contains some error checking */
     363 GIC         679 :     if (builder->snapshot != NULL)
     364                 :     {
     365 CBC         184 :         SnapBuildSnapDecRefcount(builder->snapshot);
     366 GIC         184 :         builder->snapshot = NULL;
     367 ECB             :     }
     368                 : 
     369                 :     /* other resources are deallocated via memory context reset */
     370 GIC         679 :     MemoryContextDelete(context);
     371             679 : }
     372                 : 
     373                 : /*
     374                 :  * Free an unreferenced snapshot that has previously been built by us.
     375 ECB             :  */
     376                 : static void
     377 GIC        1148 : SnapBuildFreeSnapshot(Snapshot snap)
     378 ECB             : {
     379                 :     /* make sure we don't get passed an external snapshot */
     380 GIC        1148 :     Assert(snap->snapshot_type == SNAPSHOT_HISTORIC_MVCC);
     381 ECB             : 
     382                 :     /* make sure nobody modified our snapshot */
     383 CBC        1148 :     Assert(snap->curcid == FirstCommandId);
     384            1148 :     Assert(!snap->suboverflowed);
     385 GIC        1148 :     Assert(!snap->takenDuringRecovery);
     386            1148 :     Assert(snap->regd_count == 0);
     387 ECB             : 
     388 EUB             :     /* slightly more likely, so it's checked even without c-asserts */
     389 GIC        1148 :     if (snap->copied)
     390 LBC           0 :         elog(ERROR, "cannot free a copied snapshot");
     391 EUB             : 
     392 GIC        1148 :     if (snap->active_count)
     393 LBC           0 :         elog(ERROR, "cannot free an active snapshot");
     394 ECB             : 
     395 GIC        1148 :     pfree(snap);
     396            1148 : }
     397                 : 
     398                 : /*
     399                 :  * In which state of snapshot building are we?
     400 ECB             :  */
     401                 : SnapBuildState
     402 CBC     2409461 : SnapBuildCurrentState(SnapBuild *builder)
     403                 : {
     404 GIC     2409461 :     return builder->state;
     405                 : }
     406                 : 
     407                 : /*
     408                 :  * Return the LSN at which the two-phase decoding was first enabled.
     409 ECB             :  */
     410                 : XLogRecPtr
     411 CBC          29 : SnapBuildGetTwoPhaseAt(SnapBuild *builder)
     412                 : {
     413 GIC          29 :     return builder->two_phase_at;
     414                 : }
     415                 : 
     416                 : /*
     417                 :  * Set the LSN at which two-phase decoding is enabled.
     418 ECB             :  */
     419                 : void
     420 CBC           4 : SnapBuildSetTwoPhaseAt(SnapBuild *builder, XLogRecPtr ptr)
     421 ECB             : {
     422 GIC           4 :     builder->two_phase_at = ptr;
     423               4 : }
     424                 : 
     425                 : /*
     426                 :  * Should the contents of transaction ending at 'ptr' be decoded?
     427 ECB             :  */
     428                 : bool
     429 CBC      497915 : SnapBuildXactNeedsSkip(SnapBuild *builder, XLogRecPtr ptr)
     430                 : {
     431 GIC      497915 :     return ptr < builder->start_decoding_at;
     432                 : }
     433                 : 
     434                 : /*
     435                 :  * Increase refcount of a snapshot.
     436                 :  *
     437                 :  * This is used when handing out a snapshot to some external resource or when
     438                 :  * adding a Snapshot as builder->snapshot.
     439 ECB             :  */
     440                 : static void
     441 CBC        4907 : SnapBuildSnapIncRefcount(Snapshot snap)
     442 ECB             : {
     443 GIC        4907 :     snap->active_count++;
     444            4907 : }
     445                 : 
     446                 : /*
     447                 :  * Decrease refcount of a snapshot and free if the refcount reaches zero.
     448                 :  *
     449                 :  * Externally visible, so that external resources that have been handed an
     450                 :  * IncRef'ed Snapshot can adjust its refcount easily.
     451 ECB             :  */
     452                 : void
     453 GIC        4742 : SnapBuildSnapDecRefcount(Snapshot snap)
     454 ECB             : {
     455                 :     /* make sure we don't get passed an external snapshot */
     456 GIC        4742 :     Assert(snap->snapshot_type == SNAPSHOT_HISTORIC_MVCC);
     457 ECB             : 
     458                 :     /* make sure nobody modified our snapshot */
     459 CBC        4742 :     Assert(snap->curcid == FirstCommandId);
     460 GIC        4742 :     Assert(!snap->suboverflowed);
     461 CBC        4742 :     Assert(!snap->takenDuringRecovery);
     462                 : 
     463            4742 :     Assert(snap->regd_count == 0);
     464                 : 
     465 GIC        4742 :     Assert(snap->active_count > 0);
     466 ECB             : 
     467 EUB             :     /* slightly more likely, so it's checked even without casserts */
     468 GIC        4742 :     if (snap->copied)
     469 LBC           0 :         elog(ERROR, "cannot free a copied snapshot");
     470 ECB             : 
     471 CBC        4742 :     snap->active_count--;
     472            4742 :     if (snap->active_count == 0)
     473 GIC        1148 :         SnapBuildFreeSnapshot(snap);
     474            4742 : }
     475                 : 
     476                 : /*
     477                 :  * Build a new snapshot, based on currently committed catalog-modifying
     478                 :  * transactions.
     479                 :  *
     480                 :  * In-progress transactions with catalog access are *not* allowed to modify
     481                 :  * these snapshots; they have to copy them and fill in appropriate ->curcid
     482                 :  * and ->subxip/subxcnt values.
     483 ECB             :  */
     484                 : static Snapshot
     485 GIC        1453 : SnapBuildBuildSnapshot(SnapBuild *builder)
     486                 : {
     487                 :     Snapshot    snapshot;
     488 ECB             :     Size        ssize;
     489                 : 
     490 CBC        1453 :     Assert(builder->state >= SNAPBUILD_FULL_SNAPSHOT);
     491 ECB             : 
     492 CBC        1453 :     ssize = sizeof(SnapshotData)
     493 GIC        1453 :         + sizeof(TransactionId) * builder->committed.xcnt
     494 CBC        1453 :         + sizeof(TransactionId) * 1 /* toplevel xid */ ;
     495                 : 
     496            1453 :     snapshot = MemoryContextAllocZero(builder->context, ssize);
     497                 : 
     498 GIC        1453 :     snapshot->snapshot_type = SNAPSHOT_HISTORIC_MVCC;
     499                 : 
     500                 :     /*
     501                 :      * We misuse the original meaning of SnapshotData's xip and subxip fields
     502                 :      * to make the more fitting for our needs.
     503                 :      *
     504                 :      * In the 'xip' array we store transactions that have to be treated as
     505                 :      * committed. Since we will only ever look at tuples from transactions
     506                 :      * that have modified the catalog it's more efficient to store those few
     507                 :      * that exist between xmin and xmax (frequently there are none).
     508                 :      *
     509                 :      * Snapshots that are used in transactions that have modified the catalog
     510                 :      * also use the 'subxip' array to store their toplevel xid and all the
     511                 :      * subtransaction xids so we can recognize when we need to treat rows as
     512                 :      * visible that are not in xip but still need to be visible. Subxip only
     513                 :      * gets filled when the transaction is copied into the context of a
     514                 :      * catalog modifying transaction since we otherwise share a snapshot
     515                 :      * between transactions. As long as a txn hasn't modified the catalog it
     516                 :      * doesn't need to treat any uncommitted rows as visible, so there is no
     517                 :      * need for those xids.
     518                 :      *
     519 ECB             :      * Both arrays are qsort'ed so that we can use bsearch() on them.
     520                 :      */
     521 GIC        1453 :     Assert(TransactionIdIsNormal(builder->xmin));
     522 CBC        1453 :     Assert(TransactionIdIsNormal(builder->xmax));
     523 ECB             : 
     524 GIC        1453 :     snapshot->xmin = builder->xmin;
     525            1453 :     snapshot->xmax = builder->xmax;
     526 ECB             : 
     527                 :     /* store all transactions to be treated as committed by this snapshot */
     528 CBC        1453 :     snapshot->xip =
     529            1453 :         (TransactionId *) ((char *) snapshot + sizeof(SnapshotData));
     530            1453 :     snapshot->xcnt = builder->committed.xcnt;
     531            1453 :     memcpy(snapshot->xip,
     532 GIC        1453 :            builder->committed.xip,
     533            1453 :            builder->committed.xcnt * sizeof(TransactionId));
     534 ECB             : 
     535                 :     /* sort so we can bsearch() */
     536 GIC        1453 :     qsort(snapshot->xip, snapshot->xcnt, sizeof(TransactionId), xidComparator);
     537                 : 
     538                 :     /*
     539                 :      * Initially, subxip is empty, i.e. it's a snapshot to be used by
     540                 :      * transactions that don't modify the catalog. Will be filled by
     541 ECB             :      * ReorderBufferCopySnap() if necessary.
     542                 :      */
     543 GIC        1453 :     snapshot->subxcnt = 0;
     544 CBC        1453 :     snapshot->subxip = NULL;
     545 ECB             : 
     546 CBC        1453 :     snapshot->suboverflowed = false;
     547            1453 :     snapshot->takenDuringRecovery = false;
     548            1453 :     snapshot->copied = false;
     549            1453 :     snapshot->curcid = FirstCommandId;
     550            1453 :     snapshot->active_count = 0;
     551 GIC        1453 :     snapshot->regd_count = 0;
     552 CBC        1453 :     snapshot->snapXactCompletionCount = 0;
     553                 : 
     554 GIC        1453 :     return snapshot;
     555                 : }
     556                 : 
     557                 : /*
     558                 :  * Build the initial slot snapshot and convert it to a normal snapshot that
     559                 :  * is understood by HeapTupleSatisfiesMVCC.
     560                 :  *
     561                 :  * The snapshot will be usable directly in current transaction or exported
     562                 :  * for loading in different transaction.
     563 ECB             :  */
     564                 : Snapshot
     565 GIC         155 : SnapBuildInitialSnapshot(SnapBuild *builder)
     566                 : {
     567                 :     Snapshot    snap;
     568                 :     TransactionId xid;
     569                 :     TransactionId safeXid;
     570 ECB             :     TransactionId *newxip;
     571 GIC         155 :     int         newxcnt = 0;
     572 ECB             : 
     573 GIC         155 :     Assert(XactIsoLevel == XACT_REPEATABLE_READ);
     574 GNC         155 :     Assert(builder->building_full_snapshot);
     575                 : 
     576                 :     /* don't allow older snapshots */
     577             155 :     InvalidateCatalogSnapshot(); /* about to overwrite MyProc->xmin */
     578             155 :     if (HaveRegisteredOrActiveSnapshot())
     579 UNC           0 :         elog(ERROR, "cannot build an initial slot snapshot when snapshots exist");
     580 GNC         155 :     Assert(!HistoricSnapshotActive());
     581                 : 
     582 CBC         155 :     if (builder->state != SNAPBUILD_CONSISTENT)
     583 LBC           0 :         elog(ERROR, "cannot build an initial slot snapshot before reaching a consistent state");
     584 EUB             : 
     585 CBC         155 :     if (!builder->committed.includes_all_transactions)
     586 UIC           0 :         elog(ERROR, "cannot build an initial slot snapshot, not all transactions are monitored anymore");
     587 ECB             : 
     588 EUB             :     /* so we don't overwrite the existing value */
     589 GIC         155 :     if (TransactionIdIsValid(MyProc->xmin))
     590 LBC           0 :         elog(ERROR, "cannot build an initial slot snapshot when MyProc->xmin already is valid");
     591 EUB             : 
     592 GIC         155 :     snap = SnapBuildBuildSnapshot(builder);
     593                 : 
     594 ECB             :     /*
     595 EUB             :      * We know that snap->xmin is alive, enforced by the logical xmin
     596                 :      * mechanism. Due to that we can do this without locks, we're only
     597 ECB             :      * changing our own value.
     598                 :      *
     599                 :      * Building an initial snapshot is expensive and an unenforced xmin
     600                 :      * horizon would have bad consequences, therefore always double-check that
     601                 :      * the horizon is enforced.
     602                 :      */
     603 GNC         155 :     LWLockAcquire(ProcArrayLock, LW_SHARED);
     604             155 :     safeXid = GetOldestSafeDecodingTransactionId(false);
     605             155 :     LWLockRelease(ProcArrayLock);
     606                 : 
     607             155 :     if (TransactionIdFollows(safeXid, snap->xmin))
     608 UNC           0 :         elog(ERROR, "cannot build an initial slot snapshot as oldest safe xid %u follows snapshot's xmin %u",
     609                 :              safeXid, snap->xmin);
     610 ECB             : 
     611 GIC         155 :     MyProc->xmin = snap->xmin;
     612 ECB             : 
     613 EUB             :     /* allocate in transaction context */
     614                 :     newxip = (TransactionId *)
     615 GIC         155 :         palloc(sizeof(TransactionId) * GetMaxSnapshotXidCount());
     616 ECB             : 
     617                 :     /*
     618                 :      * snapbuild.c builds transactions in an "inverted" manner, which means it
     619                 :      * stores committed transactions in ->xip, not ones in progress. Build a
     620                 :      * classical snapshot by marking all non-committed transactions as
     621                 :      * in-progress. This can be expensive.
     622                 :      */
     623 GIC         155 :     for (xid = snap->xmin; NormalTransactionIdPrecedes(xid, snap->xmax);)
     624                 :     {
     625                 :         void       *test;
     626                 : 
     627                 :         /*
     628 ECB             :          * Check whether transaction committed using the decoding snapshot
     629                 :          * meaning of ->xip.
     630                 :          */
     631 UIC           0 :         test = bsearch(&xid, snap->xip, snap->xcnt,
     632                 :                        sizeof(TransactionId), xidComparator);
     633                 : 
     634               0 :         if (test == NULL)
     635                 :         {
     636 UBC           0 :             if (newxcnt >= GetMaxSnapshotXidCount())
     637 UIC           0 :                 ereport(ERROR,
     638                 :                         (errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),
     639 EUB             :                          errmsg("initial slot snapshot too large")));
     640                 : 
     641 UBC           0 :             newxip[newxcnt++] = xid;
     642 EUB             :         }
     643                 : 
     644 UIC           0 :         TransactionIdAdvance(xid);
     645                 :     }
     646 EUB             : 
     647                 :     /* adjust remaining snapshot fields as needed */
     648 GIC         155 :     snap->snapshot_type = SNAPSHOT_MVCC;
     649 GBC         155 :     snap->xcnt = newxcnt;
     650 GIC         155 :     snap->xip = newxip;
     651                 : 
     652             155 :     return snap;
     653 ECB             : }
     654                 : 
     655                 : /*
     656                 :  * Export a snapshot so it can be set in another session with SET TRANSACTION
     657                 :  * SNAPSHOT.
     658                 :  *
     659                 :  * For that we need to start a transaction in the current backend as the
     660                 :  * importing side checks whether the source transaction is still open to make
     661                 :  * sure the xmin horizon hasn't advanced since then.
     662                 :  */
     663                 : const char *
     664 UIC           0 : SnapBuildExportSnapshot(SnapBuild *builder)
     665                 : {
     666                 :     Snapshot    snap;
     667                 :     char       *snapname;
     668                 : 
     669 UBC           0 :     if (IsTransactionOrTransactionBlock())
     670 UIC           0 :         elog(ERROR, "cannot export a snapshot from within a transaction");
     671                 : 
     672               0 :     if (SavedResourceOwnerDuringExport)
     673               0 :         elog(ERROR, "can only export one snapshot at a time");
     674 EUB             : 
     675 UBC           0 :     SavedResourceOwnerDuringExport = CurrentResourceOwner;
     676 UIC           0 :     ExportInProgress = true;
     677 EUB             : 
     678 UBC           0 :     StartTransactionCommand();
     679                 : 
     680 EUB             :     /* There doesn't seem to a nice API to set these */
     681 UBC           0 :     XactIsoLevel = XACT_REPEATABLE_READ;
     682 UIC           0 :     XactReadOnly = true;
     683 EUB             : 
     684 UIC           0 :     snap = SnapBuildInitialSnapshot(builder);
     685                 : 
     686 EUB             :     /*
     687                 :      * now that we've built a plain snapshot, make it active and use the
     688                 :      * normal mechanisms for exporting it
     689                 :      */
     690 UIC           0 :     snapname = ExportSnapshot(snap);
     691                 : 
     692               0 :     ereport(LOG,
     693                 :             (errmsg_plural("exported logical decoding snapshot: \"%s\" with %u transaction ID",
     694                 :                            "exported logical decoding snapshot: \"%s\" with %u transaction IDs",
     695 EUB             :                            snap->xcnt,
     696                 :                            snapname, snap->xcnt)));
     697 UBC           0 :     return snapname;
     698                 : }
     699                 : 
     700                 : /*
     701                 :  * Ensure there is a snapshot and if not build one for current transaction.
     702 EUB             :  */
     703                 : Snapshot
     704 GNC           6 : SnapBuildGetOrBuildSnapshot(SnapBuild *builder)
     705                 : {
     706 GIC           6 :     Assert(builder->state == SNAPBUILD_CONSISTENT);
     707                 : 
     708                 :     /* only build a new snapshot if we don't have a prebuilt one */
     709 CBC           6 :     if (builder->snapshot == NULL)
     710                 :     {
     711 LBC           0 :         builder->snapshot = SnapBuildBuildSnapshot(builder);
     712                 :         /* increase refcount for the snapshot builder */
     713 UIC           0 :         SnapBuildSnapIncRefcount(builder->snapshot);
     714 ECB             :     }
     715                 : 
     716 GBC           6 :     return builder->snapshot;
     717                 : }
     718 EUB             : 
     719                 : /*
     720                 :  * Reset a previously SnapBuildExportSnapshot()'ed snapshot if there is
     721 ECB             :  * any. Aborts the previously started transaction and resets the resource
     722                 :  * owner back to its original value.
     723                 :  */
     724                 : void
     725 GIC        3912 : SnapBuildClearExportedSnapshot(void)
     726                 : {
     727                 :     ResourceOwner tmpResOwner;
     728                 : 
     729                 :     /* nothing exported, that is the usual case */
     730 CBC        3912 :     if (!ExportInProgress)
     731 GIC        3912 :         return;
     732                 : 
     733 UIC           0 :     if (!IsTransactionState())
     734               0 :         elog(ERROR, "clearing exported snapshot in wrong transaction state");
     735 ECB             : 
     736                 :     /*
     737                 :      * AbortCurrentTransaction() takes care of resetting the snapshot state,
     738 EUB             :      * so remember SavedResourceOwnerDuringExport.
     739                 :      */
     740 UIC           0 :     tmpResOwner = SavedResourceOwnerDuringExport;
     741                 : 
     742                 :     /* make sure nothing could have ever happened */
     743               0 :     AbortCurrentTransaction();
     744                 : 
     745 UBC           0 :     CurrentResourceOwner = tmpResOwner;
     746                 : }
     747                 : 
     748 EUB             : /*
     749                 :  * Clear snapshot export state during transaction abort.
     750                 :  */
     751                 : void
     752 GIC       20125 : SnapBuildResetExportedSnapshotState(void)
     753                 : {
     754           20125 :     SavedResourceOwnerDuringExport = NULL;
     755           20125 :     ExportInProgress = false;
     756           20125 : }
     757 ECB             : 
     758                 : /*
     759                 :  * Handle the effects of a single heap change, appropriate to the current state
     760                 :  * of the snapshot builder and returns whether changes made at (xid, lsn) can
     761                 :  * be decoded.
     762                 :  */
     763                 : bool
     764 GIC     1694388 : SnapBuildProcessChange(SnapBuild *builder, TransactionId xid, XLogRecPtr lsn)
     765                 : {
     766                 :     /*
     767                 :      * We can't handle data in transactions if we haven't built a snapshot
     768                 :      * yet, so don't store them.
     769 ECB             :      */
     770 GIC     1694388 :     if (builder->state < SNAPBUILD_FULL_SNAPSHOT)
     771 UIC           0 :         return false;
     772                 : 
     773                 :     /*
     774                 :      * No point in keeping track of changes in transactions that we don't have
     775 ECB             :      * enough information about to decode. This means that they started before
     776 EUB             :      * we got into the SNAPBUILD_FULL_SNAPSHOT state.
     777                 :      */
     778 GIC     1694399 :     if (builder->state < SNAPBUILD_CONSISTENT &&
     779              11 :         TransactionIdPrecedes(xid, builder->next_phase_at))
     780               4 :         return false;
     781                 : 
     782                 :     /*
     783 ECB             :      * If the reorderbuffer doesn't yet have a snapshot, add one now, it will
     784                 :      * be needed to decode the change we're currently processing.
     785                 :      */
     786 GIC     1694384 :     if (!ReorderBufferXidHasBaseSnapshot(builder->reorder, xid))
     787                 :     {
     788                 :         /* only build a new snapshot if we don't have a prebuilt one */
     789            2598 :         if (builder->snapshot == NULL)
     790                 :         {
     791 CBC         297 :             builder->snapshot = SnapBuildBuildSnapshot(builder);
     792                 :             /* increase refcount for the snapshot builder */
     793 GIC         297 :             SnapBuildSnapIncRefcount(builder->snapshot);
     794 ECB             :         }
     795                 : 
     796                 :         /*
     797                 :          * Increase refcount for the transaction we're handing the snapshot
     798                 :          * out to.
     799                 :          */
     800 GIC        2598 :         SnapBuildSnapIncRefcount(builder->snapshot);
     801            2598 :         ReorderBufferSetBaseSnapshot(builder->reorder, xid, lsn,
     802                 :                                      builder->snapshot);
     803                 :     }
     804                 : 
     805 CBC     1694384 :     return true;
     806 ECB             : }
     807                 : 
     808                 : /*
     809                 :  * Do CommandId/combo CID handling after reading an xl_heap_new_cid record.
     810                 :  * This implies that a transaction has done some form of write to system
     811                 :  * catalogs.
     812                 :  */
     813                 : void
     814 GIC       22808 : SnapBuildProcessNewCid(SnapBuild *builder, TransactionId xid,
     815                 :                        XLogRecPtr lsn, xl_heap_new_cid *xlrec)
     816                 : {
     817                 :     CommandId   cid;
     818                 : 
     819 ECB             :     /*
     820                 :      * we only log new_cid's if a catalog tuple was modified, so mark the
     821                 :      * transaction as containing catalog modifications
     822                 :      */
     823 GIC       22808 :     ReorderBufferXidSetCatalogChanges(builder->reorder, xid, lsn);
     824                 : 
     825           22808 :     ReorderBufferAddNewTupleCids(builder->reorder, xlrec->top_xid, lsn,
     826                 :                                  xlrec->target_locator, xlrec->target_tid,
     827                 :                                  xlrec->cmin, xlrec->cmax,
     828 ECB             :                                  xlrec->combocid);
     829                 : 
     830                 :     /* figure out new command id */
     831 GIC       22808 :     if (xlrec->cmin != InvalidCommandId &&
     832           19055 :         xlrec->cmax != InvalidCommandId)
     833            3030 :         cid = Max(xlrec->cmin, xlrec->cmax);
     834           19778 :     else if (xlrec->cmax != InvalidCommandId)
     835            3753 :         cid = xlrec->cmax;
     836 CBC       16025 :     else if (xlrec->cmin != InvalidCommandId)
     837           16025 :         cid = xlrec->cmin;
     838 ECB             :     else
     839                 :     {
     840 LBC           0 :         cid = InvalidCommandId; /* silence compiler */
     841               0 :         elog(ERROR, "xl_heap_new_cid record without a valid CommandId");
     842 ECB             :     }
     843                 : 
     844 GIC       22808 :     ReorderBufferAddNewCommandId(builder->reorder, xid, lsn, cid + 1);
     845 GBC       22808 : }
     846 EUB             : 
     847                 : /*
     848                 :  * Add a new Snapshot to all transactions we're decoding that currently are
     849 ECB             :  * in-progress so they can see new catalog contents made by the transaction
     850                 :  * that just committed. This is necessary because those in-progress
     851                 :  * transactions will use the new catalog's contents from here on (at the very
     852                 :  * least everything they do needs to be compatible with newer catalog
     853                 :  * contents).
     854                 :  */
     855                 : static void
     856 GIC         995 : SnapBuildDistributeNewCatalogSnapshot(SnapBuild *builder, XLogRecPtr lsn)
     857                 : {
     858                 :     dlist_iter  txn_i;
     859                 :     ReorderBufferTXN *txn;
     860                 : 
     861 ECB             :     /*
     862                 :      * Iterate through all toplevel transactions. This can include
     863                 :      * subtransactions which we just don't yet know to be that, but that's
     864                 :      * fine, they will just get an unnecessary snapshot queued.
     865                 :      */
     866 GIC        2031 :     dlist_foreach(txn_i, &builder->reorder->toplevel_by_lsn)
     867                 :     {
     868            1036 :         txn = dlist_container(ReorderBufferTXN, node, txn_i.cur);
     869                 : 
     870            1036 :         Assert(TransactionIdIsValid(txn->xid));
     871 ECB             : 
     872                 :         /*
     873                 :          * If we don't have a base snapshot yet, there are no changes in this
     874                 :          * transaction which in turn implies we don't yet need a snapshot at
     875                 :          * all. We'll add a snapshot when the first change gets queued.
     876                 :          *
     877                 :          * NB: This works correctly even for subtransactions because
     878                 :          * ReorderBufferAssignChild() takes care to transfer the base snapshot
     879                 :          * to the top-level transaction, and while iterating the changequeue
     880                 :          * we'll get the change from the subtxn.
     881                 :          */
     882 GIC        1036 :         if (!ReorderBufferXidHasBaseSnapshot(builder->reorder, txn->xid))
     883              12 :             continue;
     884                 : 
     885                 :         /*
     886                 :          * We don't need to add snapshot to prepared transactions as they
     887 ECB             :          * should not see the new catalog contents.
     888                 :          */
     889 GIC        1024 :         if (rbtxn_prepared(txn) || rbtxn_skip_prepared(txn))
     890              22 :             continue;
     891                 : 
     892            1002 :         elog(DEBUG2, "adding a new snapshot to %u at %X/%X",
     893                 :              txn->xid, LSN_FORMAT_ARGS(lsn));
     894 ECB             : 
     895                 :         /*
     896                 :          * increase the snapshot's refcount for the transaction we are handing
     897                 :          * it out to
     898                 :          */
     899 GIC        1002 :         SnapBuildSnapIncRefcount(builder->snapshot);
     900            1002 :         ReorderBufferAddSnapshot(builder->reorder, txn->xid, lsn,
     901                 :                                  builder->snapshot);
     902                 :     }
     903             995 : }
     904 ECB             : 
     905                 : /*
     906                 :  * Keep track of a new catalog changing transaction that has committed.
     907                 :  */
     908                 : static void
     909 GIC        1004 : SnapBuildAddCommittedTxn(SnapBuild *builder, TransactionId xid)
     910                 : {
     911            1004 :     Assert(TransactionIdIsValid(xid));
     912                 : 
     913            1004 :     if (builder->committed.xcnt == builder->committed.xcnt_space)
     914 ECB             :     {
     915 UIC           0 :         builder->committed.xcnt_space = builder->committed.xcnt_space * 2 + 1;
     916 ECB             : 
     917 UIC           0 :         elog(DEBUG1, "increasing space for committed transactions to %u",
     918 ECB             :              (uint32) builder->committed.xcnt_space);
     919                 : 
     920 UBC           0 :         builder->committed.xip = repalloc(builder->committed.xip,
     921 UIC           0 :                                           builder->committed.xcnt_space * sizeof(TransactionId));
     922 EUB             :     }
     923                 : 
     924                 :     /*
     925                 :      * TODO: It might make sense to keep the array sorted here instead of
     926                 :      * doing it every time we build a new snapshot. On the other hand this
     927                 :      * gets called repeatedly when a transaction with subtransactions commits.
     928                 :      */
     929 GIC        1004 :     builder->committed.xip[builder->committed.xcnt++] = xid;
     930            1004 : }
     931                 : 
     932                 : /*
     933                 :  * Remove knowledge about transactions we treat as committed or containing catalog
     934                 :  * changes that are smaller than ->xmin. Those won't ever get checked via
     935                 :  * the ->committed or ->catchange array, respectively. The committed xids will
     936                 :  * get checked via the clog machinery.
     937                 :  *
     938                 :  * We can ideally remove the transaction from catchange array once it is
     939                 :  * finished (committed/aborted) but that could be costly as we need to maintain
     940                 :  * the xids order in the array.
     941                 :  */
     942                 : static void
     943             274 : SnapBuildPurgeOlderTxn(SnapBuild *builder)
     944                 : {
     945                 :     int         off;
     946                 :     TransactionId *workspace;
     947             274 :     int         surviving_xids = 0;
     948 ECB             : 
     949                 :     /* not ready yet */
     950 GIC         274 :     if (!TransactionIdIsNormal(builder->xmin))
     951 UIC           0 :         return;
     952 ECB             : 
     953                 :     /* TODO: Neater algorithm than just copying and iterating? */
     954                 :     workspace =
     955 CBC         274 :         MemoryContextAlloc(builder->context,
     956 GBC         274 :                            builder->committed.xcnt * sizeof(TransactionId));
     957                 : 
     958                 :     /* copy xids that still are interesting to workspace */
     959 GIC         481 :     for (off = 0; off < builder->committed.xcnt; off++)
     960 ECB             :     {
     961 CBC         207 :         if (NormalTransactionIdPrecedes(builder->committed.xip[off],
     962                 :                                         builder->xmin))
     963                 :             ;                   /* remove */
     964 ECB             :         else
     965 UIC           0 :             workspace[surviving_xids++] = builder->committed.xip[off];
     966 ECB             :     }
     967                 : 
     968                 :     /* copy workspace back to persistent state */
     969 GIC         274 :     memcpy(builder->committed.xip, workspace,
     970 EUB             :            surviving_xids * sizeof(TransactionId));
     971                 : 
     972 GIC         274 :     elog(DEBUG3, "purged committed transactions from %u to %u, xmin: %u, xmax: %u",
     973                 :          (uint32) builder->committed.xcnt, (uint32) surviving_xids,
     974 ECB             :          builder->xmin, builder->xmax);
     975 GIC         274 :     builder->committed.xcnt = surviving_xids;
     976                 : 
     977 CBC         274 :     pfree(workspace);
     978                 : 
     979 ECB             :     /*
     980                 :      * Purge xids in ->catchange as well. The purged array must also be sorted
     981                 :      * in xidComparator order.
     982                 :      */
     983 GNC         274 :     if (builder->catchange.xcnt > 0)
     984                 :     {
     985                 :         /*
     986                 :          * Since catchange.xip is sorted, we find the lower bound of xids that
     987                 :          * are still interesting.
     988                 :          */
     989               7 :         for (off = 0; off < builder->catchange.xcnt; off++)
     990                 :         {
     991               5 :             if (TransactionIdFollowsOrEquals(builder->catchange.xip[off],
     992                 :                                              builder->xmin))
     993               1 :                 break;
     994                 :         }
     995                 : 
     996               3 :         surviving_xids = builder->catchange.xcnt - off;
     997                 : 
     998               3 :         if (surviving_xids > 0)
     999                 :         {
    1000               1 :             memmove(builder->catchange.xip, &(builder->catchange.xip[off]),
    1001                 :                     surviving_xids * sizeof(TransactionId));
    1002                 :         }
    1003 ECB             :         else
    1004                 :         {
    1005 GNC           2 :             pfree(builder->catchange.xip);
    1006               2 :             builder->catchange.xip = NULL;
    1007                 :         }
    1008                 : 
    1009               3 :         elog(DEBUG3, "purged catalog modifying transactions from %u to %u, xmin: %u, xmax: %u",
    1010                 :              (uint32) builder->catchange.xcnt, (uint32) surviving_xids,
    1011                 :              builder->xmin, builder->xmax);
    1012               3 :         builder->catchange.xcnt = surviving_xids;
    1013 ECB             :     }
    1014                 : }
    1015                 : 
    1016                 : /*
    1017                 :  * Handle everything that needs to be done when a transaction commits
    1018                 :  */
    1019                 : void
    1020 GIC        2542 : SnapBuildCommitTxn(SnapBuild *builder, XLogRecPtr lsn, TransactionId xid,
    1021                 :                    int nsubxacts, TransactionId *subxacts, uint32 xinfo)
    1022                 : {
    1023 ECB             :     int         nxact;
    1024                 : 
    1025 CBC        2542 :     bool        needs_snapshot = false;
    1026 GIC        2542 :     bool        needs_timetravel = false;
    1027 CBC        2542 :     bool        sub_needs_timetravel = false;
    1028                 : 
    1029 GIC        2542 :     TransactionId xmax = xid;
    1030                 : 
    1031                 :     /*
    1032                 :      * Transactions preceding BUILDING_SNAPSHOT will neither be decoded, nor
    1033 ECB             :      * will they be part of a snapshot.  So we don't need to record anything.
    1034                 :      */
    1035 GBC        2542 :     if (builder->state == SNAPBUILD_START ||
    1036 GIC        2542 :         (builder->state == SNAPBUILD_BUILDING_SNAPSHOT &&
    1037 UIC           0 :          TransactionIdPrecedes(xid, builder->next_phase_at)))
    1038 EUB             :     {
    1039                 :         /* ensure that only commits after this are getting replayed */
    1040 UBC           0 :         if (builder->start_decoding_at <= lsn)
    1041 UIC           0 :             builder->start_decoding_at = lsn + 1;
    1042               0 :         return;
    1043 ECB             :     }
    1044                 : 
    1045 GIC        2542 :     if (builder->state < SNAPBUILD_CONSISTENT)
    1046 ECB             :     {
    1047                 :         /* ensure that only commits after this are getting replayed */
    1048 GIC          11 :         if (builder->start_decoding_at <= lsn)
    1049               5 :             builder->start_decoding_at = lsn + 1;
    1050                 : 
    1051                 :         /*
    1052                 :          * If building an exportable snapshot, force xid to be tracked, even
    1053 ECB             :          * if the transaction didn't modify the catalog.
    1054                 :          */
    1055 GBC          11 :         if (builder->building_full_snapshot)
    1056                 :         {
    1057 UIC           0 :             needs_timetravel = true;
    1058                 :         }
    1059 ECB             :     }
    1060                 : 
    1061 CBC        3790 :     for (nxact = 0; nxact < nsubxacts; nxact++)
    1062                 :     {
    1063 GIC        1248 :         TransactionId subxid = subxacts[nxact];
    1064                 : 
    1065                 :         /*
    1066                 :          * Add subtransaction to base snapshot if catalog modifying, we don't
    1067 ECB             :          * distinguish to toplevel transactions there.
    1068                 :          */
    1069 GNC        1248 :         if (SnapBuildXidHasCatalogChanges(builder, subxid, xinfo))
    1070 ECB             :         {
    1071 GIC           9 :             sub_needs_timetravel = true;
    1072 CBC           9 :             needs_snapshot = true;
    1073                 : 
    1074 GIC           9 :             elog(DEBUG1, "found subtransaction %u:%u with catalog changes",
    1075 ECB             :                  xid, subxid);
    1076                 : 
    1077 CBC           9 :             SnapBuildAddCommittedTxn(builder, subxid);
    1078 ECB             : 
    1079 GIC           9 :             if (NormalTransactionIdFollows(subxid, xmax))
    1080               9 :                 xmax = subxid;
    1081                 :         }
    1082                 : 
    1083                 :         /*
    1084                 :          * If we're forcing timetravel we also need visibility information
    1085                 :          * about subtransaction, so keep track of subtransaction's state, even
    1086                 :          * if not catalog modifying.  Don't need to distribute a snapshot in
    1087 ECB             :          * that case.
    1088                 :          */
    1089 GBC        1239 :         else if (needs_timetravel)
    1090 EUB             :         {
    1091 UBC           0 :             SnapBuildAddCommittedTxn(builder, subxid);
    1092 UIC           0 :             if (NormalTransactionIdFollows(subxid, xmax))
    1093               0 :                 xmax = subxid;
    1094                 :         }
    1095                 :     }
    1096 ECB             : 
    1097                 :     /* if top-level modified catalog, it'll need a snapshot */
    1098 GNC        2542 :     if (SnapBuildXidHasCatalogChanges(builder, xid, xinfo))
    1099                 :     {
    1100 CBC         994 :         elog(DEBUG2, "found top level transaction %u, with catalog changes",
    1101 ECB             :              xid);
    1102 CBC         994 :         needs_snapshot = true;
    1103 GIC         994 :         needs_timetravel = true;
    1104 CBC         994 :         SnapBuildAddCommittedTxn(builder, xid);
    1105                 :     }
    1106 GIC        1548 :     else if (sub_needs_timetravel)
    1107 ECB             :     {
    1108                 :         /* track toplevel txn as well, subxact alone isn't meaningful */
    1109 CBC           1 :         elog(DEBUG2, "forced transaction %u to do timetravel due to one of its subtransactions",
    1110 ECB             :              xid);
    1111 GIC           1 :         needs_timetravel = true;
    1112 CBC           1 :         SnapBuildAddCommittedTxn(builder, xid);
    1113                 :     }
    1114 GBC        1547 :     else if (needs_timetravel)
    1115                 :     {
    1116 UBC           0 :         elog(DEBUG2, "forced transaction %u to do timetravel", xid);
    1117                 : 
    1118 UIC           0 :         SnapBuildAddCommittedTxn(builder, xid);
    1119 ECB             :     }
    1120                 : 
    1121 GIC        2542 :     if (!needs_timetravel)
    1122 ECB             :     {
    1123                 :         /* record that we cannot export a general snapshot anymore */
    1124 GIC        1547 :         builder->committed.includes_all_transactions = false;
    1125 ECB             :     }
    1126                 : 
    1127 GIC        2542 :     Assert(!needs_snapshot || needs_timetravel);
    1128                 : 
    1129                 :     /*
    1130                 :      * Adjust xmax of the snapshot builder, we only do that for committed,
    1131                 :      * catalog modifying, transactions, everything else isn't interesting for
    1132 ECB             :      * us since we'll never look at the respective rows.
    1133                 :      */
    1134 CBC        2542 :     if (needs_timetravel &&
    1135 GIC        1990 :         (!TransactionIdIsValid(builder->xmax) ||
    1136 CBC         995 :          TransactionIdFollowsOrEquals(xmax, builder->xmax)))
    1137 ECB             :     {
    1138 GIC         991 :         builder->xmax = xmax;
    1139             991 :         TransactionIdAdvance(builder->xmax);
    1140                 :     }
    1141 ECB             : 
    1142                 :     /* if there's any reason to build a historic snapshot, do so now */
    1143 GIC        2542 :     if (needs_snapshot)
    1144                 :     {
    1145                 :         /*
    1146                 :          * If we haven't built a complete snapshot yet there's no need to hand
    1147 ECB             :          * it out, it wouldn't (and couldn't) be used anyway.
    1148 EUB             :          */
    1149 GIC         995 :         if (builder->state < SNAPBUILD_FULL_SNAPSHOT)
    1150 UIC           0 :             return;
    1151                 : 
    1152                 :         /*
    1153                 :          * Decrease the snapshot builder's refcount of the old snapshot, note
    1154                 :          * that it still will be used if it has been handed out to the
    1155 ECB             :          * reorderbuffer earlier.
    1156                 :          */
    1157 GIC         995 :         if (builder->snapshot)
    1158 CBC         994 :             SnapBuildSnapDecRefcount(builder->snapshot);
    1159                 : 
    1160 GIC         995 :         builder->snapshot = SnapBuildBuildSnapshot(builder);
    1161 ECB             : 
    1162                 :         /* we might need to execute invalidations, add snapshot */
    1163 CBC         995 :         if (!ReorderBufferXidHasBaseSnapshot(builder->reorder, xid))
    1164 ECB             :         {
    1165 GIC           9 :             SnapBuildSnapIncRefcount(builder->snapshot);
    1166               9 :             ReorderBufferSetBaseSnapshot(builder->reorder, xid, lsn,
    1167                 :                                          builder->snapshot);
    1168                 :         }
    1169 ECB             : 
    1170                 :         /* refcount of the snapshot builder for the new snapshot */
    1171 GIC         995 :         SnapBuildSnapIncRefcount(builder->snapshot);
    1172 ECB             : 
    1173                 :         /* add a new catalog snapshot to all currently running transactions */
    1174 GIC         995 :         SnapBuildDistributeNewCatalogSnapshot(builder, lsn);
    1175                 :     }
    1176                 : }
    1177                 : 
    1178                 : /*
    1179                 :  * Check the reorder buffer and the snapshot to see if the given transaction has
    1180                 :  * modified catalogs.
    1181                 :  */
    1182                 : static inline bool
    1183 GNC        3790 : SnapBuildXidHasCatalogChanges(SnapBuild *builder, TransactionId xid,
    1184                 :                               uint32 xinfo)
    1185                 : {
    1186            3790 :     if (ReorderBufferXidHasCatalogChanges(builder->reorder, xid))
    1187             999 :         return true;
    1188                 : 
    1189                 :     /*
    1190                 :      * The transactions that have changed catalogs must have invalidation
    1191                 :      * info.
    1192                 :      */
    1193            2791 :     if (!(xinfo & XACT_XINFO_HAS_INVALS))
    1194            2783 :         return false;
    1195                 : 
    1196                 :     /* Check the catchange XID array */
    1197              12 :     return ((builder->catchange.xcnt > 0) &&
    1198               4 :             (bsearch(&xid, builder->catchange.xip, builder->catchange.xcnt,
    1199                 :                      sizeof(TransactionId), xidComparator) != NULL));
    1200                 : }
    1201                 : 
    1202                 : /* -----------------------------------
    1203                 :  * Snapshot building functions dealing with xlog records
    1204 ECB             :  * -----------------------------------
    1205                 :  */
    1206                 : 
    1207                 : /*
    1208                 :  * Process a running xacts record, and use its information to first build a
    1209                 :  * historic snapshot and later to release resources that aren't needed
    1210                 :  * anymore.
    1211                 :  */
    1212                 : void
    1213 GIC        1026 : SnapBuildProcessRunningXacts(SnapBuild *builder, XLogRecPtr lsn, xl_running_xacts *running)
    1214 ECB             : {
    1215                 :     ReorderBufferTXN *txn;
    1216                 :     TransactionId xmin;
    1217                 : 
    1218                 :     /*
    1219                 :      * If we're not consistent yet, inspect the record to see whether it
    1220                 :      * allows to get closer to being consistent. If we are consistent, dump
    1221                 :      * our snapshot so others or we, after a restart, can use it.
    1222                 :      */
    1223 GIC        1026 :     if (builder->state < SNAPBUILD_CONSISTENT)
    1224                 :     {
    1225                 :         /* returns false if there's no point in performing cleanup just yet */
    1226             776 :         if (!SnapBuildFindSnapshot(builder, lsn, running))
    1227             750 :             return;
    1228                 :     }
    1229                 :     else
    1230             250 :         SnapBuildSerialize(builder, lsn);
    1231                 : 
    1232                 :     /*
    1233                 :      * Update range of interesting xids based on the running xacts
    1234 ECB             :      * information. We don't increase ->xmax using it, because once we are in
    1235                 :      * a consistent state we can do that ourselves and much more efficiently
    1236                 :      * so, because we only need to do it for catalog transactions since we
    1237                 :      * only ever look at those.
    1238                 :      *
    1239                 :      * NB: We only increase xmax when a catalog modifying transaction commits
    1240                 :      * (see SnapBuildCommitTxn).  Because of this, xmax can be lower than
    1241                 :      * xmin, which looks odd but is correct and actually more efficient, since
    1242                 :      * we hit fast paths in heapam_visibility.c.
    1243                 :      */
    1244 CBC         274 :     builder->xmin = running->oldestRunningXid;
    1245                 : 
    1246                 :     /* Remove transactions we don't need to keep track off anymore */
    1247             274 :     SnapBuildPurgeOlderTxn(builder);
    1248 ECB             : 
    1249                 :     /*
    1250                 :      * Advance the xmin limit for the current replication slot, to allow
    1251                 :      * vacuum to clean up the tuples this slot has been protecting.
    1252                 :      *
    1253                 :      * The reorderbuffer might have an xmin among the currently running
    1254                 :      * snapshots; use it if so.  If not, we need only consider the snapshots
    1255                 :      * we'll produce later, which can't be less than the oldest running xid in
    1256                 :      * the record we're reading now.
    1257                 :      */
    1258 GIC         274 :     xmin = ReorderBufferGetOldestXmin(builder->reorder);
    1259             274 :     if (xmin == InvalidTransactionId)
    1260             242 :         xmin = running->oldestRunningXid;
    1261             274 :     elog(DEBUG3, "xmin: %u, xmax: %u, oldest running: %u, oldest xmin: %u",
    1262                 :          builder->xmin, builder->xmax, running->oldestRunningXid, xmin);
    1263             274 :     LogicalIncreaseXminForSlot(lsn, xmin);
    1264                 : 
    1265 ECB             :     /*
    1266                 :      * Also tell the slot where we can restart decoding from. We don't want to
    1267                 :      * do that after every commit because changing that implies an fsync of
    1268                 :      * the logical slot's state file, so we only do it every time we see a
    1269                 :      * running xacts record.
    1270                 :      *
    1271                 :      * Do so by looking for the oldest in progress transaction (determined by
    1272                 :      * the first LSN of any of its relevant records). Every transaction
    1273                 :      * remembers the last location we stored the snapshot to disk before its
    1274                 :      * beginning. That point is where we can restart from.
    1275                 :      */
    1276                 : 
    1277                 :     /*
    1278                 :      * Can't know about a serialized snapshot's location if we're not
    1279                 :      * consistent.
    1280                 :      */
    1281 CBC         274 :     if (builder->state < SNAPBUILD_CONSISTENT)
    1282              19 :         return;
    1283                 : 
    1284             255 :     txn = ReorderBufferGetOldestTXN(builder->reorder);
    1285                 : 
    1286                 :     /*
    1287                 :      * oldest ongoing txn might have started when we didn't yet serialize
    1288                 :      * anything because we hadn't reached a consistent state yet.
    1289                 :      */
    1290 GIC         255 :     if (txn != NULL && txn->restart_decoding_lsn != InvalidXLogRecPtr)
    1291              13 :         LogicalIncreaseRestartDecodingForSlot(lsn, txn->restart_decoding_lsn);
    1292                 : 
    1293                 :     /*
    1294                 :      * No in-progress transaction, can reuse the last serialized snapshot if
    1295                 :      * we have one.
    1296                 :      */
    1297             242 :     else if (txn == NULL &&
    1298             223 :              builder->reorder->current_restart_decoding_lsn != InvalidXLogRecPtr &&
    1299             221 :              builder->last_serialized_snapshot != InvalidXLogRecPtr)
    1300             221 :         LogicalIncreaseRestartDecodingForSlot(lsn,
    1301                 :                                               builder->last_serialized_snapshot);
    1302 ECB             : }
    1303                 : 
    1304                 : 
    1305                 : /*
    1306                 :  * Build the start of a snapshot that's capable of decoding the catalog.
    1307                 :  *
    1308                 :  * Helper function for SnapBuildProcessRunningXacts() while we're not yet
    1309                 :  * consistent.
    1310                 :  *
    1311                 :  * Returns true if there is a point in performing internal maintenance/cleanup
    1312                 :  * using the xl_running_xacts record.
    1313                 :  */
    1314                 : static bool
    1315 GIC         776 : SnapBuildFindSnapshot(SnapBuild *builder, XLogRecPtr lsn, xl_running_xacts *running)
    1316                 : {
    1317                 :     /* ---
    1318 ECB             :      * Build catalog decoding snapshot incrementally using information about
    1319                 :      * the currently running transactions. There are several ways to do that:
    1320                 :      *
    1321                 :      * a) There were no running transactions when the xl_running_xacts record
    1322                 :      *    was inserted, jump to CONSISTENT immediately. We might find such a
    1323                 :      *    state while waiting on c)'s sub-states.
    1324                 :      *
    1325                 :      * b) This (in a previous run) or another decoding slot serialized a
    1326                 :      *    snapshot to disk that we can use.  Can't use this method for the
    1327                 :      *    initial snapshot when slot is being created and needs full snapshot
    1328                 :      *    for export or direct use, as that snapshot will only contain catalog
    1329                 :      *    modifying transactions.
    1330                 :      *
    1331                 :      * c) First incrementally build a snapshot for catalog tuples
    1332                 :      *    (BUILDING_SNAPSHOT), that requires all, already in-progress,
    1333                 :      *    transactions to finish.  Every transaction starting after that
    1334                 :      *    (FULL_SNAPSHOT state), has enough information to be decoded.  But
    1335                 :      *    for older running transactions no viable snapshot exists yet, so
    1336                 :      *    CONSISTENT will only be reached once all of those have finished.
    1337                 :      * ---
    1338                 :      */
    1339                 : 
    1340                 :     /*
    1341                 :      * xl_running_xacts record is older than what we can use, we might not have
    1342                 :      * all necessary catalog rows anymore.
    1343                 :      */
    1344 GIC         776 :     if (TransactionIdIsNormal(builder->initial_xmin_horizon) &&
    1345             344 :         NormalTransactionIdPrecedes(running->oldestRunningXid,
    1346                 :                                     builder->initial_xmin_horizon))
    1347                 :     {
    1348 UIC           0 :         ereport(DEBUG1,
    1349                 :                 (errmsg_internal("skipping snapshot at %X/%X while building logical decoding snapshot, xmin horizon too low",
    1350                 :                                  LSN_FORMAT_ARGS(lsn)),
    1351                 :                  errdetail_internal("initial xmin horizon of %u vs the snapshot's %u",
    1352                 :                                     builder->initial_xmin_horizon, running->oldestRunningXid)));
    1353                 : 
    1354                 : 
    1355               0 :         SnapBuildWaitSnapshot(running, builder->initial_xmin_horizon);
    1356                 : 
    1357               0 :         return true;
    1358                 :     }
    1359                 : 
    1360                 :     /*
    1361                 :      * a) No transaction were running, we can jump to consistent.
    1362                 :      *
    1363                 :      * This is not affected by races around xl_running_xacts, because we can
    1364                 :      * miss transaction commits, but currently not transactions starting.
    1365 ECB             :      *
    1366                 :      * NB: We might have already started to incrementally assemble a snapshot,
    1367                 :      * so we need to be careful to deal with that.
    1368                 :      */
    1369 GBC         776 :     if (running->oldestRunningXid == running->nextXid)
    1370                 :     {
    1371 GIC         744 :         if (builder->start_decoding_at == InvalidXLogRecPtr ||
    1372             414 :             builder->start_decoding_at <= lsn)
    1373                 :             /* can decode everything after this */
    1374             331 :             builder->start_decoding_at = lsn + 1;
    1375                 : 
    1376 EUB             :         /* As no transactions were running xmin/xmax can be trivially set. */
    1377 GIC         744 :         builder->xmin = running->nextXid; /* < are finished */
    1378 GBC         744 :         builder->xmax = running->nextXid; /* >= are running */
    1379                 : 
    1380                 :         /* so we can safely use the faster comparisons */
    1381 GIC         744 :         Assert(TransactionIdIsNormal(builder->xmin));
    1382             744 :         Assert(TransactionIdIsNormal(builder->xmax));
    1383                 : 
    1384             744 :         builder->state = SNAPBUILD_CONSISTENT;
    1385             744 :         builder->next_phase_at = InvalidTransactionId;
    1386                 : 
    1387             744 :         ereport(LOG,
    1388                 :                 (errmsg("logical decoding found consistent point at %X/%X",
    1389                 :                         LSN_FORMAT_ARGS(lsn)),
    1390 ECB             :                  errdetail("There are no running transactions.")));
    1391                 : 
    1392 CBC         744 :         return false;
    1393 ECB             :     }
    1394                 :     /* b) valid on disk state and not building full snapshot */
    1395 CBC          63 :     else if (!builder->building_full_snapshot &&
    1396 GIC          31 :              SnapBuildRestore(builder, lsn))
    1397                 :     {
    1398                 :         /* there won't be any state to cleanup */
    1399 CBC           6 :         return false;
    1400                 :     }
    1401                 : 
    1402 ECB             :     /*
    1403                 :      * c) transition from START to BUILDING_SNAPSHOT.
    1404                 :      *
    1405                 :      * In START state, and a xl_running_xacts record with running xacts is
    1406                 :      * encountered.  In that case, switch to BUILDING_SNAPSHOT state, and
    1407                 :      * record xl_running_xacts->nextXid.  Once all running xacts have finished
    1408                 :      * (i.e. they're all >= nextXid), we have a complete catalog snapshot.  It
    1409                 :      * might look that we could use xl_running_xacts's ->xids information to
    1410                 :      * get there quicker, but that is problematic because transactions marked
    1411                 :      * as running, might already have inserted their commit record - it's
    1412                 :      * infeasible to change that with locking.
    1413                 :      */
    1414 GIC          26 :     else if (builder->state == SNAPBUILD_START)
    1415                 :     {
    1416              14 :         builder->state = SNAPBUILD_BUILDING_SNAPSHOT;
    1417              14 :         builder->next_phase_at = running->nextXid;
    1418                 : 
    1419                 :         /*
    1420                 :          * Start with an xmin/xmax that's correct for future, when all the
    1421 ECB             :          * currently running transactions have finished. We'll update both
    1422                 :          * while waiting for the pending transactions to finish.
    1423                 :          */
    1424 CBC          14 :         builder->xmin = running->nextXid; /* < are finished */
    1425 GIC          14 :         builder->xmax = running->nextXid; /* >= are running */
    1426                 : 
    1427                 :         /* so we can safely use the faster comparisons */
    1428              14 :         Assert(TransactionIdIsNormal(builder->xmin));
    1429              14 :         Assert(TransactionIdIsNormal(builder->xmax));
    1430                 : 
    1431 CBC          14 :         ereport(LOG,
    1432 ECB             :                 (errmsg("logical decoding found initial starting point at %X/%X",
    1433                 :                         LSN_FORMAT_ARGS(lsn)),
    1434                 :                  errdetail("Waiting for transactions (approximately %d) older than %u to end.",
    1435                 :                            running->xcnt, running->nextXid)));
    1436                 : 
    1437 GIC          14 :         SnapBuildWaitSnapshot(running, running->nextXid);
    1438 ECB             :     }
    1439                 : 
    1440                 :     /*
    1441                 :      * c) transition from BUILDING_SNAPSHOT to FULL_SNAPSHOT.
    1442                 :      *
    1443                 :      * In BUILDING_SNAPSHOT state, and this xl_running_xacts' oldestRunningXid
    1444                 :      * is >= than nextXid from when we switched to BUILDING_SNAPSHOT.  This
    1445                 :      * means all transactions starting afterwards have enough information to
    1446                 :      * be decoded.  Switch to FULL_SNAPSHOT.
    1447                 :      */
    1448 GIC          19 :     else if (builder->state == SNAPBUILD_BUILDING_SNAPSHOT &&
    1449               7 :              TransactionIdPrecedesOrEquals(builder->next_phase_at,
    1450                 :                                            running->oldestRunningXid))
    1451                 :     {
    1452               7 :         builder->state = SNAPBUILD_FULL_SNAPSHOT;
    1453               7 :         builder->next_phase_at = running->nextXid;
    1454                 : 
    1455 CBC           7 :         ereport(LOG,
    1456 ECB             :                 (errmsg("logical decoding found initial consistent point at %X/%X",
    1457                 :                         LSN_FORMAT_ARGS(lsn)),
    1458                 :                  errdetail("Waiting for transactions (approximately %d) older than %u to end.",
    1459                 :                            running->xcnt, running->nextXid)));
    1460                 : 
    1461 GIC           7 :         SnapBuildWaitSnapshot(running, running->nextXid);
    1462 ECB             :     }
    1463                 : 
    1464                 :     /*
    1465                 :      * c) transition from FULL_SNAPSHOT to CONSISTENT.
    1466                 :      *
    1467                 :      * In FULL_SNAPSHOT state, and this xl_running_xacts' oldestRunningXid is
    1468                 :      * >= than nextXid from when we switched to FULL_SNAPSHOT.  This means all
    1469                 :      * transactions that are currently in progress have a catalog snapshot,
    1470                 :      * and all their changes have been collected.  Switch to CONSISTENT.
    1471                 :      */
    1472 GIC          10 :     else if (builder->state == SNAPBUILD_FULL_SNAPSHOT &&
    1473               5 :              TransactionIdPrecedesOrEquals(builder->next_phase_at,
    1474                 :                                            running->oldestRunningXid))
    1475                 :     {
    1476               5 :         builder->state = SNAPBUILD_CONSISTENT;
    1477               5 :         builder->next_phase_at = InvalidTransactionId;
    1478 ECB             : 
    1479 CBC           5 :         ereport(LOG,
    1480                 :                 (errmsg("logical decoding found consistent point at %X/%X",
    1481                 :                         LSN_FORMAT_ARGS(lsn)),
    1482 ECB             :                  errdetail("There are no old transactions anymore.")));
    1483                 :     }
    1484                 : 
    1485                 :     /*
    1486                 :      * We already started to track running xacts and need to wait for all
    1487                 :      * in-progress ones to finish. We fall through to the normal processing of
    1488                 :      * records so incremental cleanup can be performed.
    1489                 :      */
    1490 GIC          24 :     return true;
    1491                 : }
    1492                 : 
    1493                 : /* ---
    1494                 :  * Iterate through xids in record, wait for all older than the cutoff to
    1495                 :  * finish.  Then, if possible, log a new xl_running_xacts record.
    1496 ECB             :  *
    1497                 :  * This isn't required for the correctness of decoding, but to:
    1498                 :  * a) allow isolationtester to notice that we're currently waiting for
    1499                 :  *    something.
    1500                 :  * b) log a new xl_running_xacts record where it'd be helpful, without having
    1501                 :  *    to wait for bgwriter or checkpointer.
    1502                 :  * ---
    1503                 :  */
    1504                 : static void
    1505 GIC          21 : SnapBuildWaitSnapshot(xl_running_xacts *running, TransactionId cutoff)
    1506                 : {
    1507                 :     int         off;
    1508                 : 
    1509              40 :     for (off = 0; off < running->xcnt; off++)
    1510                 :     {
    1511 CBC          21 :         TransactionId xid = running->xids[off];
    1512                 : 
    1513                 :         /*
    1514                 :          * Upper layers should prevent that we ever need to wait on ourselves.
    1515 ECB             :          * Check anyway, since failing to do so would either result in an
    1516                 :          * endless wait or an Assert() failure.
    1517                 :          */
    1518 GIC          21 :         if (TransactionIdIsCurrentTransactionId(xid))
    1519 UIC           0 :             elog(ERROR, "waiting for ourselves");
    1520                 : 
    1521 GIC          21 :         if (TransactionIdFollows(xid, cutoff))
    1522 UIC           0 :             continue;
    1523                 : 
    1524 CBC          21 :         XactLockTableWait(xid, NULL, NULL, XLTW_None);
    1525 EUB             :     }
    1526                 : 
    1527 ECB             :     /*
    1528 EUB             :      * All transactions we needed to finish finished - try to ensure there is
    1529                 :      * another xl_running_xacts record in a timely manner, without having to
    1530 ECB             :      * wait for bgwriter or checkpointer to log one.  During recovery we can't
    1531                 :      * enforce that, so we'll have to wait.
    1532                 :      */
    1533 GIC          19 :     if (!RecoveryInProgress())
    1534                 :     {
    1535              19 :         LogStandbySnapshot();
    1536                 :     }
    1537              19 : }
    1538                 : 
    1539 ECB             : /* -----------------------------------
    1540                 :  * Snapshot serialization support
    1541                 :  * -----------------------------------
    1542                 :  */
    1543                 : 
    1544                 : /*
    1545                 :  * We store current state of struct SnapBuild on disk in the following manner:
    1546                 :  *
    1547                 :  * struct SnapBuildOnDisk;
    1548                 :  * TransactionId * committed.xcnt; (*not xcnt_space*)
    1549                 :  * TransactionId * catchange.xcnt;
    1550                 :  *
    1551                 :  */
    1552                 : typedef struct SnapBuildOnDisk
    1553                 : {
    1554                 :     /* first part of this struct needs to be version independent */
    1555                 : 
    1556                 :     /* data not covered by checksum */
    1557                 :     uint32      magic;
    1558                 :     pg_crc32c   checksum;
    1559                 : 
    1560                 :     /* data covered by checksum */
    1561                 : 
    1562                 :     /* version, in case we want to support pg_upgrade */
    1563                 :     uint32      version;
    1564                 :     /* how large is the on disk data, excluding the constant sized part */
    1565                 :     uint32      length;
    1566                 : 
    1567                 :     /* version dependent part */
    1568                 :     SnapBuild   builder;
    1569                 : 
    1570                 :     /* variable amount of TransactionIds follows */
    1571                 : } SnapBuildOnDisk;
    1572                 : 
    1573                 : #define SnapBuildOnDiskConstantSize \
    1574                 :     offsetof(SnapBuildOnDisk, builder)
    1575                 : #define SnapBuildOnDiskNotChecksummedSize \
    1576                 :     offsetof(SnapBuildOnDisk, version)
    1577                 : 
    1578                 : #define SNAPBUILD_MAGIC 0x51A1E001
    1579                 : #define SNAPBUILD_VERSION 5
    1580                 : 
    1581                 : /*
    1582                 :  * Store/Load a snapshot from disk, depending on the snapshot builder's state.
    1583                 :  *
    1584                 :  * Supposed to be used by external (i.e. not snapbuild.c) code that just read
    1585                 :  * a record that's a potential location for a serialized snapshot.
    1586                 :  */
    1587                 : void
    1588 GIC          23 : SnapBuildSerializationPoint(SnapBuild *builder, XLogRecPtr lsn)
    1589                 : {
    1590              23 :     if (builder->state < SNAPBUILD_CONSISTENT)
    1591 UIC           0 :         SnapBuildRestore(builder, lsn);
    1592                 :     else
    1593 GIC          23 :         SnapBuildSerialize(builder, lsn);
    1594              23 : }
    1595 ECB             : 
    1596                 : /*
    1597                 :  * Serialize the snapshot 'builder' at the location 'lsn' if it hasn't already
    1598 EUB             :  * been done by another decoding process.
    1599                 :  */
    1600 ECB             : static void
    1601 CBC         273 : SnapBuildSerialize(SnapBuild *builder, XLogRecPtr lsn)
    1602                 : {
    1603                 :     Size        needed_length;
    1604 GIC         273 :     SnapBuildOnDisk *ondisk = NULL;
    1605 GNC         273 :     TransactionId *catchange_xip = NULL;
    1606                 :     MemoryContext old_ctx;
    1607                 :     size_t      catchange_xcnt;
    1608                 :     char       *ondisk_c;
    1609                 :     int         fd;
    1610                 :     char        tmppath[MAXPGPATH];
    1611 ECB             :     char        path[MAXPGPATH];
    1612                 :     int         ret;
    1613                 :     struct stat stat_buf;
    1614                 :     Size        sz;
    1615                 : 
    1616 GIC         273 :     Assert(lsn != InvalidXLogRecPtr);
    1617             273 :     Assert(builder->last_serialized_snapshot == InvalidXLogRecPtr ||
    1618                 :            builder->last_serialized_snapshot <= lsn);
    1619                 : 
    1620                 :     /*
    1621                 :      * no point in serializing if we cannot continue to work immediately after
    1622                 :      * restoring the snapshot
    1623                 :      */
    1624             273 :     if (builder->state < SNAPBUILD_CONSISTENT)
    1625 UIC           0 :         return;
    1626 ECB             : 
    1627                 :     /* consistent snapshots have no next phase */
    1628 GIC         273 :     Assert(builder->next_phase_at == InvalidTransactionId);
    1629                 : 
    1630                 :     /*
    1631                 :      * We identify snapshots by the LSN they are valid for. We don't need to
    1632                 :      * include timelines in the name as each LSN maps to exactly one timeline
    1633                 :      * unless the user used pg_resetwal or similar. If a user did so, there's
    1634 ECB             :      * no hope continuing to decode anyway.
    1635 EUB             :      */
    1636 GIC         273 :     sprintf(path, "pg_logical/snapshots/%X-%X.snap",
    1637             273 :             LSN_FORMAT_ARGS(lsn));
    1638 ECB             : 
    1639                 :     /*
    1640                 :      * first check whether some other backend already has written the snapshot
    1641                 :      * for this LSN. It's perfectly fine if there's none, so we accept ENOENT
    1642                 :      * as a valid state. Everything else is an unexpected error.
    1643                 :      */
    1644 GIC         273 :     ret = stat(path, &stat_buf);
    1645                 : 
    1646 CBC         273 :     if (ret != 0 && errno != ENOENT)
    1647 LBC           0 :         ereport(ERROR,
    1648                 :                 (errcode_for_file_access(),
    1649                 :                  errmsg("could not stat file \"%s\": %m", path)));
    1650                 : 
    1651 GIC         273 :     else if (ret == 0)
    1652                 :     {
    1653                 :         /*
    1654 ECB             :          * somebody else has already serialized to this point, don't overwrite
    1655                 :          * but remember location, so we don't need to read old data again.
    1656                 :          *
    1657 EUB             :          * To be sure it has been synced to disk after the rename() from the
    1658                 :          * tempfile filename to the real filename, we just repeat the fsync.
    1659                 :          * That ought to be cheap because in most scenarios it should already
    1660                 :          * be safely on disk.
    1661 ECB             :          */
    1662 GIC          65 :         fsync_fname(path, false);
    1663              65 :         fsync_fname("pg_logical/snapshots", true);
    1664                 : 
    1665              65 :         builder->last_serialized_snapshot = lsn;
    1666              65 :         goto out;
    1667                 :     }
    1668                 : 
    1669                 :     /*
    1670                 :      * there is an obvious race condition here between the time we stat(2) the
    1671                 :      * file and us writing the file. But we rename the file into place
    1672 ECB             :      * atomically and all files created need to contain the same data anyway,
    1673                 :      * so this is perfectly fine, although a bit of a resource waste. Locking
    1674                 :      * seems like pointless complication.
    1675                 :      */
    1676 CBC         208 :     elog(DEBUG1, "serializing snapshot to %s", path);
    1677                 : 
    1678                 :     /* to make sure only we will write to this tempfile, include pid */
    1679 GIC         208 :     sprintf(tmppath, "pg_logical/snapshots/%X-%X.snap.%d.tmp",
    1680             208 :             LSN_FORMAT_ARGS(lsn), MyProcPid);
    1681                 : 
    1682                 :     /*
    1683                 :      * Unlink temporary file if it already exists, needs to have been before a
    1684                 :      * crash/error since we won't enter this function twice from within a
    1685                 :      * single decoding slot/backend and the temporary file contains the pid of
    1686 ECB             :      * the current process.
    1687                 :      */
    1688 GIC         208 :     if (unlink(tmppath) != 0 && errno != ENOENT)
    1689 LBC           0 :         ereport(ERROR,
    1690 ECB             :                 (errcode_for_file_access(),
    1691                 :                  errmsg("could not remove file \"%s\": %m", tmppath)));
    1692                 : 
    1693 GNC         208 :     old_ctx = MemoryContextSwitchTo(builder->context);
    1694                 : 
    1695                 :     /* Get the catalog modifying transactions that are yet not committed */
    1696             208 :     catchange_xip = ReorderBufferGetCatalogChangesXacts(builder->reorder);
    1697             208 :     catchange_xcnt = dclist_count(&builder->reorder->catchange_txns);
    1698                 : 
    1699             208 :     needed_length = sizeof(SnapBuildOnDisk) +
    1700             208 :         sizeof(TransactionId) * (builder->committed.xcnt + catchange_xcnt);
    1701                 : 
    1702             208 :     ondisk_c = palloc0(needed_length);
    1703 GIC         208 :     ondisk = (SnapBuildOnDisk *) ondisk_c;
    1704 CBC         208 :     ondisk->magic = SNAPBUILD_MAGIC;
    1705 GBC         208 :     ondisk->version = SNAPBUILD_VERSION;
    1706 GIC         208 :     ondisk->length = needed_length;
    1707             208 :     INIT_CRC32C(ondisk->checksum);
    1708             208 :     COMP_CRC32C(ondisk->checksum,
    1709 ECB             :                 ((char *) ondisk) + SnapBuildOnDiskNotChecksummedSize,
    1710                 :                 SnapBuildOnDiskConstantSize - SnapBuildOnDiskNotChecksummedSize);
    1711 GIC         208 :     ondisk_c += sizeof(SnapBuildOnDisk);
    1712 ECB             : 
    1713 CBC         208 :     memcpy(&ondisk->builder, builder, sizeof(SnapBuild));
    1714                 :     /* NULL-ify memory-only data */
    1715             208 :     ondisk->builder.context = NULL;
    1716             208 :     ondisk->builder.snapshot = NULL;
    1717 GIC         208 :     ondisk->builder.reorder = NULL;
    1718 CBC         208 :     ondisk->builder.committed.xip = NULL;
    1719 GNC         208 :     ondisk->builder.catchange.xip = NULL;
    1720                 :     /* update catchange only on disk data */
    1721             208 :     ondisk->builder.catchange.xcnt = catchange_xcnt;
    1722 ECB             : 
    1723 CBC         208 :     COMP_CRC32C(ondisk->checksum,
    1724 ECB             :                 &ondisk->builder,
    1725                 :                 sizeof(SnapBuild));
    1726                 : 
    1727                 :     /* copy committed xacts */
    1728 GNC         208 :     if (builder->committed.xcnt > 0)
    1729                 :     {
    1730              44 :         sz = sizeof(TransactionId) * builder->committed.xcnt;
    1731              44 :         memcpy(ondisk_c, builder->committed.xip, sz);
    1732              44 :         COMP_CRC32C(ondisk->checksum, ondisk_c, sz);
    1733              44 :         ondisk_c += sz;
    1734                 :     }
    1735                 : 
    1736                 :     /* copy catalog modifying xacts */
    1737             208 :     if (catchange_xcnt > 0)
    1738                 :     {
    1739               6 :         sz = sizeof(TransactionId) * catchange_xcnt;
    1740               6 :         memcpy(ondisk_c, catchange_xip, sz);
    1741               6 :         COMP_CRC32C(ondisk->checksum, ondisk_c, sz);
    1742               6 :         ondisk_c += sz;
    1743                 :     }
    1744 ECB             : 
    1745 GIC         208 :     FIN_CRC32C(ondisk->checksum);
    1746 ECB             : 
    1747                 :     /* we have valid data now, open tempfile and write it there */
    1748 CBC         208 :     fd = OpenTransientFile(tmppath,
    1749 ECB             :                            O_CREAT | O_EXCL | O_WRONLY | PG_BINARY);
    1750 CBC         208 :     if (fd < 0)
    1751 UIC           0 :         ereport(ERROR,
    1752 ECB             :                 (errcode_for_file_access(),
    1753                 :                  errmsg("could not open file \"%s\": %m", tmppath)));
    1754                 : 
    1755 GIC         208 :     errno = 0;
    1756             208 :     pgstat_report_wait_start(WAIT_EVENT_SNAPBUILD_WRITE);
    1757             208 :     if ((write(fd, ondisk, needed_length)) != needed_length)
    1758                 :     {
    1759 LBC           0 :         int         save_errno = errno;
    1760                 : 
    1761               0 :         CloseTransientFile(fd);
    1762 ECB             : 
    1763                 :         /* if write didn't set errno, assume problem is no disk space */
    1764 LBC           0 :         errno = save_errno ? save_errno : ENOSPC;
    1765 UIC           0 :         ereport(ERROR,
    1766                 :                 (errcode_for_file_access(),
    1767                 :                  errmsg("could not write to file \"%s\": %m", tmppath)));
    1768 ECB             :     }
    1769 GIC         208 :     pgstat_report_wait_end();
    1770 ECB             : 
    1771                 :     /*
    1772                 :      * fsync the file before renaming so that even if we crash after this we
    1773                 :      * have either a fully valid file or nothing.
    1774                 :      *
    1775                 :      * It's safe to just ERROR on fsync() here because we'll retry the whole
    1776                 :      * operation including the writes.
    1777                 :      *
    1778                 :      * TODO: Do the fsync() via checkpoints/restartpoints, doing it here has
    1779                 :      * some noticeable overhead since it's performed synchronously during
    1780                 :      * decoding?
    1781                 :      */
    1782 GBC         208 :     pgstat_report_wait_start(WAIT_EVENT_SNAPBUILD_SYNC);
    1783 GIC         208 :     if (pg_fsync(fd) != 0)
    1784                 :     {
    1785 UIC           0 :         int         save_errno = errno;
    1786 ECB             : 
    1787 LBC           0 :         CloseTransientFile(fd);
    1788               0 :         errno = save_errno;
    1789 UIC           0 :         ereport(ERROR,
    1790 EUB             :                 (errcode_for_file_access(),
    1791                 :                  errmsg("could not fsync file \"%s\": %m", tmppath)));
    1792                 :     }
    1793 GIC         208 :     pgstat_report_wait_end();
    1794                 : 
    1795 GBC         208 :     if (CloseTransientFile(fd) != 0)
    1796 UBC           0 :         ereport(ERROR,
    1797                 :                 (errcode_for_file_access(),
    1798                 :                  errmsg("could not close file \"%s\": %m", tmppath)));
    1799                 : 
    1800 CBC         208 :     fsync_fname("pg_logical/snapshots", true);
    1801                 : 
    1802                 :     /*
    1803                 :      * We may overwrite the work from some other backend, but that's ok, our
    1804                 :      * snapshot is valid as well, we'll just have done some superfluous work.
    1805                 :      */
    1806 GIC         208 :     if (rename(tmppath, path) != 0)
    1807                 :     {
    1808 UIC           0 :         ereport(ERROR,
    1809                 :                 (errcode_for_file_access(),
    1810                 :                  errmsg("could not rename file \"%s\" to \"%s\": %m",
    1811                 :                         tmppath, path)));
    1812                 :     }
    1813 ECB             : 
    1814                 :     /* make sure we persist */
    1815 GIC         208 :     fsync_fname(path, false);
    1816 GBC         208 :     fsync_fname("pg_logical/snapshots", true);
    1817                 : 
    1818 EUB             :     /*
    1819                 :      * Now there's no way we can lose the dumped state anymore, remember this
    1820                 :      * as a serialization point.
    1821                 :      */
    1822 GIC         208 :     builder->last_serialized_snapshot = lsn;
    1823                 : 
    1824 GNC         208 :     MemoryContextSwitchTo(old_ctx);
    1825                 : 
    1826 CBC         273 : out:
    1827 GIC         273 :     ReorderBufferSetRestartPoint(builder->reorder,
    1828 ECB             :                                  builder->last_serialized_snapshot);
    1829 EUB             :     /* be tidy */
    1830 GIC         273 :     if (ondisk)
    1831             208 :         pfree(ondisk);
    1832 GNC         273 :     if (catchange_xip)
    1833               6 :         pfree(catchange_xip);
    1834                 : }
    1835 ECB             : 
    1836                 : /*
    1837                 :  * Restore a snapshot into 'builder' if previously one has been stored at the
    1838                 :  * location indicated by 'lsn'. Returns true if successful, false otherwise.
    1839                 :  */
    1840                 : static bool
    1841 CBC          31 : SnapBuildRestore(SnapBuild *builder, XLogRecPtr lsn)
    1842                 : {
    1843 EUB             :     SnapBuildOnDisk ondisk;
    1844                 :     int         fd;
    1845                 :     char        path[MAXPGPATH];
    1846                 :     Size        sz;
    1847                 :     pg_crc32c   checksum;
    1848                 : 
    1849 ECB             :     /* no point in loading a snapshot if we're already there */
    1850 CBC          31 :     if (builder->state == SNAPBUILD_CONSISTENT)
    1851 UIC           0 :         return false;
    1852                 : 
    1853 GIC          31 :     sprintf(path, "pg_logical/snapshots/%X-%X.snap",
    1854              31 :             LSN_FORMAT_ARGS(lsn));
    1855                 : 
    1856 CBC          31 :     fd = OpenTransientFile(path, O_RDONLY | PG_BINARY);
    1857                 : 
    1858              31 :     if (fd < 0 && errno == ENOENT)
    1859 GIC          25 :         return false;
    1860 CBC           6 :     else if (fd < 0)
    1861 LBC           0 :         ereport(ERROR,
    1862                 :                 (errcode_for_file_access(),
    1863                 :                  errmsg("could not open file \"%s\": %m", path)));
    1864 ECB             : 
    1865                 :     /* ----
    1866                 :      * Make sure the snapshot had been stored safely to disk, that's normally
    1867                 :      * cheap.
    1868                 :      * Note that we do not need PANIC here, nobody will be able to use the
    1869                 :      * slot without fsyncing, and saving it won't succeed without an fsync()
    1870                 :      * either...
    1871                 :      * ----
    1872                 :      */
    1873 GIC           6 :     fsync_fname(path, false);
    1874               6 :     fsync_fname("pg_logical/snapshots", true);
    1875 ECB             : 
    1876                 : 
    1877                 :     /* read statically sized portion of snapshot */
    1878 GNC           6 :     SnapBuildRestoreContents(fd, (char *) &ondisk, SnapBuildOnDiskConstantSize, path);
    1879                 : 
    1880 GIC           6 :     if (ondisk.magic != SNAPBUILD_MAGIC)
    1881 UIC           0 :         ereport(ERROR,
    1882                 :                 (errcode(ERRCODE_DATA_CORRUPTED),
    1883                 :                  errmsg("snapbuild state file \"%s\" has wrong magic number: %u instead of %u",
    1884                 :                         path, ondisk.magic, SNAPBUILD_MAGIC)));
    1885 ECB             : 
    1886 CBC           6 :     if (ondisk.version != SNAPBUILD_VERSION)
    1887 UIC           0 :         ereport(ERROR,
    1888                 :                 (errcode(ERRCODE_DATA_CORRUPTED),
    1889                 :                  errmsg("snapbuild state file \"%s\" has unsupported version: %u instead of %u",
    1890 ECB             :                         path, ondisk.version, SNAPBUILD_VERSION)));
    1891                 : 
    1892 CBC           6 :     INIT_CRC32C(checksum);
    1893 GBC           6 :     COMP_CRC32C(checksum,
    1894                 :                 ((char *) &ondisk) + SnapBuildOnDiskNotChecksummedSize,
    1895                 :                 SnapBuildOnDiskConstantSize - SnapBuildOnDiskNotChecksummedSize);
    1896                 : 
    1897                 :     /* read SnapBuild */
    1898 GNC           6 :     SnapBuildRestoreContents(fd, (char *) &ondisk.builder, sizeof(SnapBuild), path);
    1899 GIC           6 :     COMP_CRC32C(checksum, &ondisk.builder, sizeof(SnapBuild));
    1900                 : 
    1901                 :     /* restore committed xacts information */
    1902 GNC           6 :     if (ondisk.builder.committed.xcnt > 0)
    1903                 :     {
    1904               2 :         sz = sizeof(TransactionId) * ondisk.builder.committed.xcnt;
    1905               2 :         ondisk.builder.committed.xip = MemoryContextAllocZero(builder->context, sz);
    1906               2 :         SnapBuildRestoreContents(fd, (char *) ondisk.builder.committed.xip, sz, path);
    1907               2 :         COMP_CRC32C(checksum, ondisk.builder.committed.xip, sz);
    1908                 :     }
    1909                 : 
    1910                 :     /* restore catalog modifying xacts information */
    1911               6 :     if (ondisk.builder.catchange.xcnt > 0)
    1912                 :     {
    1913               3 :         sz = sizeof(TransactionId) * ondisk.builder.catchange.xcnt;
    1914               3 :         ondisk.builder.catchange.xip = MemoryContextAllocZero(builder->context, sz);
    1915               3 :         SnapBuildRestoreContents(fd, (char *) ondisk.builder.catchange.xip, sz, path);
    1916               3 :         COMP_CRC32C(checksum, ondisk.builder.catchange.xip, sz);
    1917                 :     }
    1918                 : 
    1919 GIC           6 :     if (CloseTransientFile(fd) != 0)
    1920 UIC           0 :         ereport(ERROR,
    1921                 :                 (errcode_for_file_access(),
    1922                 :                  errmsg("could not close file \"%s\": %m", path)));
    1923                 : 
    1924 GIC           6 :     FIN_CRC32C(checksum);
    1925 ECB             : 
    1926 EUB             :     /* verify checksum of what we've read */
    1927 GIC           6 :     if (!EQ_CRC32C(checksum, ondisk.checksum))
    1928 UIC           0 :         ereport(ERROR,
    1929                 :                 (errcode(ERRCODE_DATA_CORRUPTED),
    1930                 :                  errmsg("checksum mismatch for snapbuild state file \"%s\": is %u, should be %u",
    1931                 :                         path, checksum, ondisk.checksum)));
    1932 ECB             : 
    1933 EUB             :     /*
    1934                 :      * ok, we now have a sensible snapshot here, figure out if it has more
    1935                 :      * information than we have.
    1936 ECB             :      */
    1937                 : 
    1938                 :     /*
    1939                 :      * We are only interested in consistent snapshots for now, comparing
    1940                 :      * whether one incomplete snapshot is more "advanced" seems to be
    1941                 :      * unnecessarily complex.
    1942                 :      */
    1943 CBC           6 :     if (ondisk.builder.state < SNAPBUILD_CONSISTENT)
    1944 UIC           0 :         goto snapshot_not_interesting;
    1945                 : 
    1946 ECB             :     /*
    1947                 :      * Don't use a snapshot that requires an xmin that we cannot guarantee to
    1948                 :      * be available.
    1949                 :      */
    1950 CBC           6 :     if (TransactionIdPrecedes(ondisk.builder.xmin, builder->initial_xmin_horizon))
    1951 UIC           0 :         goto snapshot_not_interesting;
    1952 ECB             : 
    1953                 :     /* consistent snapshots have no next phase */
    1954 GIC           6 :     Assert(ondisk.builder.next_phase_at == InvalidTransactionId);
    1955 ECB             : 
    1956 EUB             :     /* ok, we think the snapshot is sensible, copy over everything important */
    1957 CBC           6 :     builder->xmin = ondisk.builder.xmin;
    1958               6 :     builder->xmax = ondisk.builder.xmax;
    1959               6 :     builder->state = ondisk.builder.state;
    1960                 : 
    1961 GIC           6 :     builder->committed.xcnt = ondisk.builder.committed.xcnt;
    1962 ECB             :     /* We only allocated/stored xcnt, not xcnt_space xids ! */
    1963                 :     /* don't overwrite preallocated xip, if we don't have anything here */
    1964 GBC           6 :     if (builder->committed.xcnt > 0)
    1965                 :     {
    1966 CBC           2 :         pfree(builder->committed.xip);
    1967               2 :         builder->committed.xcnt_space = ondisk.builder.committed.xcnt;
    1968 GIC           2 :         builder->committed.xip = ondisk.builder.committed.xip;
    1969 ECB             :     }
    1970 GIC           6 :     ondisk.builder.committed.xip = NULL;
    1971 ECB             : 
    1972                 :     /* set catalog modifying transactions */
    1973 GNC           6 :     if (builder->catchange.xip)
    1974 UNC           0 :         pfree(builder->catchange.xip);
    1975 GNC           6 :     builder->catchange.xcnt = ondisk.builder.catchange.xcnt;
    1976               6 :     builder->catchange.xip = ondisk.builder.catchange.xip;
    1977               6 :     ondisk.builder.catchange.xip = NULL;
    1978                 : 
    1979                 :     /* our snapshot is not interesting anymore, build a new one */
    1980 CBC           6 :     if (builder->snapshot != NULL)
    1981                 :     {
    1982 UIC           0 :         SnapBuildSnapDecRefcount(builder->snapshot);
    1983                 :     }
    1984 CBC           6 :     builder->snapshot = SnapBuildBuildSnapshot(builder);
    1985 GIC           6 :     SnapBuildSnapIncRefcount(builder->snapshot);
    1986 EUB             : 
    1987 GBC           6 :     ReorderBufferSetRestartPoint(builder->reorder, lsn);
    1988 EUB             : 
    1989 GBC           6 :     Assert(builder->state == SNAPBUILD_CONSISTENT);
    1990 EUB             : 
    1991 GBC           6 :     ereport(LOG,
    1992                 :             (errmsg("logical decoding found consistent point at %X/%X",
    1993                 :                     LSN_FORMAT_ARGS(lsn)),
    1994                 :              errdetail("Logical decoding will begin using saved snapshot.")));
    1995 GIC           6 :     return true;
    1996                 : 
    1997 UIC           0 : snapshot_not_interesting:
    1998 LBC           0 :     if (ondisk.builder.committed.xip != NULL)
    1999 UIC           0 :         pfree(ondisk.builder.committed.xip);
    2000 UNC           0 :     if (ondisk.builder.catchange.xip != NULL)
    2001               0 :         pfree(ondisk.builder.catchange.xip);
    2002 UIC           0 :     return false;
    2003                 : }
    2004 ECB             : 
    2005                 : /*
    2006                 :  * Read the contents of the serialized snapshot to 'dest'.
    2007                 :  */
    2008                 : static void
    2009 GNC          17 : SnapBuildRestoreContents(int fd, char *dest, Size size, const char *path)
    2010                 : {
    2011                 :     int         readBytes;
    2012                 : 
    2013              17 :     pgstat_report_wait_start(WAIT_EVENT_SNAPBUILD_READ);
    2014              17 :     readBytes = read(fd, dest, size);
    2015              17 :     pgstat_report_wait_end();
    2016              17 :     if (readBytes != size)
    2017                 :     {
    2018 UNC           0 :         int         save_errno = errno;
    2019                 : 
    2020               0 :         CloseTransientFile(fd);
    2021                 : 
    2022               0 :         if (readBytes < 0)
    2023                 :         {
    2024               0 :             errno = save_errno;
    2025               0 :             ereport(ERROR,
    2026                 :                     (errcode_for_file_access(),
    2027                 :                      errmsg("could not read file \"%s\": %m", path)));
    2028                 :         }
    2029                 :         else
    2030               0 :             ereport(ERROR,
    2031                 :                     (errcode(ERRCODE_DATA_CORRUPTED),
    2032                 :                      errmsg("could not read file \"%s\": read %d of %zu",
    2033                 :                             path, readBytes, sizeof(SnapBuild))));
    2034                 :     }
    2035 GNC          17 : }
    2036                 : 
    2037 ECB             : /*
    2038                 :  * Remove all serialized snapshots that are not required anymore because no
    2039                 :  * slot can need them. This doesn't actually have to run during a checkpoint,
    2040                 :  * but it's a convenient point to schedule this.
    2041 EUB             :  *
    2042                 :  * NB: We run this during checkpoints even if logical decoding is disabled so
    2043                 :  * we cleanup old slots at some point after it got disabled.
    2044                 :  */
    2045                 : void
    2046 GIC        2363 : CheckPointSnapBuild(void)
    2047 EUB             : {
    2048                 :     XLogRecPtr  cutoff;
    2049                 :     XLogRecPtr  redo;
    2050                 :     DIR        *snap_dir;
    2051                 :     struct dirent *snap_de;
    2052                 :     char        path[MAXPGPATH + 21];
    2053                 : 
    2054                 :     /*
    2055                 :      * We start off with a minimum of the last redo pointer. No new
    2056                 :      * replication slot will start before that, so that's a safe upper bound
    2057                 :      * for removal.
    2058 ECB             :      */
    2059 GIC        2363 :     redo = GetRedoRecPtr();
    2060                 : 
    2061                 :     /* now check for the restart ptrs from existing slots */
    2062            2363 :     cutoff = ReplicationSlotsComputeLogicalRestartLSN();
    2063                 : 
    2064                 :     /* don't start earlier than the restart lsn */
    2065            2363 :     if (redo < cutoff)
    2066 UIC           0 :         cutoff = redo;
    2067                 : 
    2068 GIC        2363 :     snap_dir = AllocateDir("pg_logical/snapshots");
    2069 CBC        7279 :     while ((snap_de = ReadDir(snap_dir, "pg_logical/snapshots")) != NULL)
    2070                 :     {
    2071                 :         uint32      hi;
    2072                 :         uint32      lo;
    2073                 :         XLogRecPtr  lsn;
    2074                 :         PGFileType  de_type;
    2075                 : 
    2076 GIC        4916 :         if (strcmp(snap_de->d_name, ".") == 0 ||
    2077            2553 :             strcmp(snap_de->d_name, "..") == 0)
    2078            4726 :             continue;
    2079                 : 
    2080             190 :         snprintf(path, sizeof(path), "pg_logical/snapshots/%s", snap_de->d_name);
    2081 GNC         190 :         de_type = get_dirent_type(path, snap_de, false, DEBUG1);
    2082                 : 
    2083             190 :         if (de_type != PGFILETYPE_ERROR && de_type != PGFILETYPE_REG)
    2084                 :         {
    2085 UIC           0 :             elog(DEBUG1, "only regular files expected: %s", path);
    2086 LBC           0 :             continue;
    2087                 :         }
    2088                 : 
    2089 ECB             :         /*
    2090 EUB             :          * temporary filenames from SnapBuildSerialize() include the LSN and
    2091                 :          * everything but are postfixed by .$pid.tmp. We can just remove them
    2092 ECB             :          * the same as other files because there can be none that are
    2093                 :          * currently being written that are older than cutoff.
    2094                 :          *
    2095                 :          * We just log a message if a file doesn't fit the pattern, it's
    2096                 :          * probably some editors lock/state file or similar...
    2097                 :          */
    2098 GIC         190 :         if (sscanf(snap_de->d_name, "%X-%X.snap", &hi, &lo) != 2)
    2099                 :         {
    2100 LBC           0 :             ereport(LOG,
    2101 ECB             :                     (errmsg("could not parse file name \"%s\"", path)));
    2102 LBC           0 :             continue;
    2103                 :         }
    2104 ECB             : 
    2105 CBC         190 :         lsn = ((uint64) hi) << 32 | lo;
    2106                 : 
    2107 ECB             :         /* check whether we still need it */
    2108 GIC         190 :         if (lsn < cutoff || cutoff == InvalidXLogRecPtr)
    2109 EUB             :         {
    2110 GBC         147 :             elog(DEBUG1, "removing snapbuild snapshot %s", path);
    2111                 : 
    2112                 :             /*
    2113                 :              * It's not particularly harmful, though strange, if we can't
    2114                 :              * remove the file here. Don't prevent the checkpoint from
    2115                 :              * completing, that'd be a cure worse than the disease.
    2116                 :              */
    2117 GIC         147 :             if (unlink(path) < 0)
    2118                 :             {
    2119 UIC           0 :                 ereport(LOG,
    2120                 :                         (errcode_for_file_access(),
    2121                 :                          errmsg("could not remove file \"%s\": %m",
    2122 ECB             :                                 path)));
    2123 UIC           0 :                 continue;
    2124 EUB             :             }
    2125                 :         }
    2126                 :     }
    2127 GIC        2363 :     FreeDir(snap_dir);
    2128            2363 : }
        

Generated by: LCOV version v1.16-55-g56c0a2a