LCOV - differential code coverage report
Current view: top level - src/backend/access/transam - twophase.c (source / functions) Coverage Total Hit LBC UIC UBC GBC GIC GNC CBC EUB ECB DCB
Current: Differential Code Coverage HEAD vs 15 Lines: 90.7 % 778 706 5 49 18 16 273 13 404 38 283 14
Current Date: 2023-04-08 17:13:01 Functions: 100.0 % 39 39 26 2 11 26
Baseline: 15 Line coverage date bins:
Baseline Date: 2023-04-08 15:09:40 (60,120] days: 100.0 % 2 2 2
Legend: Lines: hit not hit (120,180] days: 100.0 % 1 1 1
(240..) days: 90.7 % 775 703 5 49 18 16 273 10 404 38 283
Function coverage date bins:
(240..) days: 60.0 % 65 39 26 2 11 26

 Age         Owner                  TLA  Line data    Source code
                                  1                 : /*-------------------------------------------------------------------------
                                  2                 :  *
                                  3                 :  * twophase.c
                                  4                 :  *      Two-phase commit support functions.
                                  5                 :  *
                                  6                 :  * Portions Copyright (c) 1996-2023, PostgreSQL Global Development Group
                                  7                 :  * Portions Copyright (c) 1994, Regents of the University of California
                                  8                 :  *
                                  9                 :  * IDENTIFICATION
                                 10                 :  *      src/backend/access/transam/twophase.c
                                 11                 :  *
                                 12                 :  * NOTES
                                 13                 :  *      Each global transaction is associated with a global transaction
                                 14                 :  *      identifier (GID). The client assigns a GID to a postgres
                                 15                 :  *      transaction with the PREPARE TRANSACTION command.
                                 16                 :  *
                                 17                 :  *      We keep all active global transactions in a shared memory array.
                                 18                 :  *      When the PREPARE TRANSACTION command is issued, the GID is
                                 19                 :  *      reserved for the transaction in the array. This is done before
                                 20                 :  *      a WAL entry is made, because the reservation checks for duplicate
                                 21                 :  *      GIDs and aborts the transaction if there already is a global
                                 22                 :  *      transaction in prepared state with the same GID.
                                 23                 :  *
                                 24                 :  *      A global transaction (gxact) also has dummy PGPROC; this is what keeps
                                 25                 :  *      the XID considered running by TransactionIdIsInProgress.  It is also
                                 26                 :  *      convenient as a PGPROC to hook the gxact's locks to.
                                 27                 :  *
                                 28                 :  *      Information to recover prepared transactions in case of crash is
                                 29                 :  *      now stored in WAL for the common case. In some cases there will be
                                 30                 :  *      an extended period between preparing a GXACT and commit/abort, in
                                 31                 :  *      which case we need to separately record prepared transaction data
                                 32                 :  *      in permanent storage. This includes locking information, pending
                                 33                 :  *      notifications etc. All that state information is written to the
                                 34                 :  *      per-transaction state file in the pg_twophase directory.
                                 35                 :  *      All prepared transactions will be written prior to shutdown.
                                 36                 :  *
                                 37                 :  *      Life track of state data is following:
                                 38                 :  *
                                 39                 :  *      * On PREPARE TRANSACTION backend writes state data only to the WAL and
                                 40                 :  *        stores pointer to the start of the WAL record in
                                 41                 :  *        gxact->prepare_start_lsn.
                                 42                 :  *      * If COMMIT occurs before checkpoint then backend reads data from WAL
                                 43                 :  *        using prepare_start_lsn.
                                 44                 :  *      * On checkpoint state data copied to files in pg_twophase directory and
                                 45                 :  *        fsynced
                                 46                 :  *      * If COMMIT happens after checkpoint then backend reads state data from
                                 47                 :  *        files
                                 48                 :  *
                                 49                 :  *      During replay and replication, TwoPhaseState also holds information
                                 50                 :  *      about active prepared transactions that haven't been moved to disk yet.
                                 51                 :  *
                                 52                 :  *      Replay of twophase records happens by the following rules:
                                 53                 :  *
                                 54                 :  *      * At the beginning of recovery, pg_twophase is scanned once, filling
                                 55                 :  *        TwoPhaseState with entries marked with gxact->inredo and
                                 56                 :  *        gxact->ondisk.  Two-phase file data older than the XID horizon of
                                 57                 :  *        the redo position are discarded.
                                 58                 :  *      * On PREPARE redo, the transaction is added to TwoPhaseState->prepXacts.
                                 59                 :  *        gxact->inredo is set to true for such entries.
                                 60                 :  *      * On Checkpoint we iterate through TwoPhaseState->prepXacts entries
                                 61                 :  *        that have gxact->inredo set and are behind the redo_horizon. We
                                 62                 :  *        save them to disk and then switch gxact->ondisk to true.
                                 63                 :  *      * On COMMIT/ABORT we delete the entry from TwoPhaseState->prepXacts.
                                 64                 :  *        If gxact->ondisk is true, the corresponding entry from the disk
                                 65                 :  *        is additionally deleted.
                                 66                 :  *      * RecoverPreparedTransactions(), StandbyRecoverPreparedTransactions()
                                 67                 :  *        and PrescanPreparedTransactions() have been modified to go through
                                 68                 :  *        gxact->inredo entries that have not made it to disk.
                                 69                 :  *
                                 70                 :  *-------------------------------------------------------------------------
                                 71                 :  */
                                 72                 : #include "postgres.h"
                                 73                 : 
                                 74                 : #include <fcntl.h>
                                 75                 : #include <sys/stat.h>
                                 76                 : #include <time.h>
                                 77                 : #include <unistd.h>
                                 78                 : 
                                 79                 : #include "access/commit_ts.h"
                                 80                 : #include "access/htup_details.h"
                                 81                 : #include "access/subtrans.h"
                                 82                 : #include "access/transam.h"
                                 83                 : #include "access/twophase.h"
                                 84                 : #include "access/twophase_rmgr.h"
                                 85                 : #include "access/xact.h"
                                 86                 : #include "access/xlog.h"
                                 87                 : #include "access/xloginsert.h"
                                 88                 : #include "access/xlogreader.h"
                                 89                 : #include "access/xlogutils.h"
                                 90                 : #include "catalog/pg_type.h"
                                 91                 : #include "catalog/storage.h"
                                 92                 : #include "funcapi.h"
                                 93                 : #include "miscadmin.h"
                                 94                 : #include "pg_trace.h"
                                 95                 : #include "pgstat.h"
                                 96                 : #include "replication/origin.h"
                                 97                 : #include "replication/syncrep.h"
                                 98                 : #include "replication/walsender.h"
                                 99                 : #include "storage/fd.h"
                                100                 : #include "storage/ipc.h"
                                101                 : #include "storage/md.h"
                                102                 : #include "storage/predicate.h"
                                103                 : #include "storage/proc.h"
                                104                 : #include "storage/procarray.h"
                                105                 : #include "storage/sinvaladt.h"
                                106                 : #include "storage/smgr.h"
                                107                 : #include "utils/builtins.h"
                                108                 : #include "utils/memutils.h"
                                109                 : #include "utils/timestamp.h"
                                110                 : 
                                111                 : /*
                                112                 :  * Directory where Two-phase commit files reside within PGDATA
                                113                 :  */
                                114                 : #define TWOPHASE_DIR "pg_twophase"
                                115                 : 
                                116                 : /* GUC variable, can't be changed after startup */
                                117                 : int         max_prepared_xacts = 0;
                                118                 : 
                                119                 : /*
                                120                 :  * This struct describes one global transaction that is in prepared state
                                121                 :  * or attempting to become prepared.
                                122                 :  *
                                123                 :  * The lifecycle of a global transaction is:
                                124                 :  *
                                125                 :  * 1. After checking that the requested GID is not in use, set up an entry in
                                126                 :  * the TwoPhaseState->prepXacts array with the correct GID and valid = false,
                                127                 :  * and mark it as locked by my backend.
                                128                 :  *
                                129                 :  * 2. After successfully completing prepare, set valid = true and enter the
                                130                 :  * referenced PGPROC into the global ProcArray.
                                131                 :  *
                                132                 :  * 3. To begin COMMIT PREPARED or ROLLBACK PREPARED, check that the entry is
                                133                 :  * valid and not locked, then mark the entry as locked by storing my current
                                134                 :  * backend ID into locking_backend.  This prevents concurrent attempts to
                                135                 :  * commit or rollback the same prepared xact.
                                136                 :  *
                                137                 :  * 4. On completion of COMMIT PREPARED or ROLLBACK PREPARED, remove the entry
                                138                 :  * from the ProcArray and the TwoPhaseState->prepXacts array and return it to
                                139                 :  * the freelist.
                                140                 :  *
                                141                 :  * Note that if the preparing transaction fails between steps 1 and 2, the
                                142                 :  * entry must be removed so that the GID and the GlobalTransaction struct
                                143                 :  * can be reused.  See AtAbort_Twophase().
                                144                 :  *
                                145                 :  * typedef struct GlobalTransactionData *GlobalTransaction appears in
                                146                 :  * twophase.h
                                147                 :  */
                                148                 : 
                                149                 : typedef struct GlobalTransactionData
                                150                 : {
                                151                 :     GlobalTransaction next;     /* list link for free list */
                                152                 :     int         pgprocno;       /* ID of associated dummy PGPROC */
                                153                 :     BackendId   dummyBackendId; /* similar to backend id for backends */
                                154                 :     TimestampTz prepared_at;    /* time of preparation */
                                155                 : 
                                156                 :     /*
                                157                 :      * Note that we need to keep track of two LSNs for each GXACT. We keep
                                158                 :      * track of the start LSN because this is the address we must use to read
                                159                 :      * state data back from WAL when committing a prepared GXACT. We keep
                                160                 :      * track of the end LSN because that is the LSN we need to wait for prior
                                161                 :      * to commit.
                                162                 :      */
                                163                 :     XLogRecPtr  prepare_start_lsn;  /* XLOG offset of prepare record start */
                                164                 :     XLogRecPtr  prepare_end_lsn;    /* XLOG offset of prepare record end */
                                165                 :     TransactionId xid;          /* The GXACT id */
                                166                 : 
                                167                 :     Oid         owner;          /* ID of user that executed the xact */
                                168                 :     BackendId   locking_backend;    /* backend currently working on the xact */
                                169                 :     bool        valid;          /* true if PGPROC entry is in proc array */
                                170                 :     bool        ondisk;         /* true if prepare state file is on disk */
                                171                 :     bool        inredo;         /* true if entry was added via xlog_redo */
                                172                 :     char        gid[GIDSIZE];   /* The GID assigned to the prepared xact */
                                173                 : }           GlobalTransactionData;
                                174                 : 
                                175                 : /*
                                176                 :  * Two Phase Commit shared state.  Access to this struct is protected
                                177                 :  * by TwoPhaseStateLock.
                                178                 :  */
                                179                 : typedef struct TwoPhaseStateData
                                180                 : {
                                181                 :     /* Head of linked list of free GlobalTransactionData structs */
                                182                 :     GlobalTransaction freeGXacts;
                                183                 : 
                                184                 :     /* Number of valid prepXacts entries. */
                                185                 :     int         numPrepXacts;
                                186                 : 
                                187                 :     /* There are max_prepared_xacts items in this array */
                                188                 :     GlobalTransaction prepXacts[FLEXIBLE_ARRAY_MEMBER];
                                189                 : } TwoPhaseStateData;
                                190                 : 
                                191                 : static TwoPhaseStateData *TwoPhaseState;
                                192                 : 
                                193                 : /*
                                194                 :  * Global transaction entry currently locked by us, if any.  Note that any
                                195                 :  * access to the entry pointed to by this variable must be protected by
                                196                 :  * TwoPhaseStateLock, though obviously the pointer itself doesn't need to be
                                197                 :  * (since it's just local memory).
                                198                 :  */
                                199                 : static GlobalTransaction MyLockedGxact = NULL;
                                200                 : 
                                201                 : static bool twophaseExitRegistered = false;
                                202                 : 
                                203                 : static void RecordTransactionCommitPrepared(TransactionId xid,
                                204                 :                                             int nchildren,
                                205                 :                                             TransactionId *children,
                                206                 :                                             int nrels,
                                207                 :                                             RelFileLocator *rels,
                                208                 :                                             int nstats,
                                209                 :                                             xl_xact_stats_item *stats,
                                210                 :                                             int ninvalmsgs,
                                211                 :                                             SharedInvalidationMessage *invalmsgs,
                                212                 :                                             bool initfileinval,
                                213                 :                                             const char *gid);
                                214                 : static void RecordTransactionAbortPrepared(TransactionId xid,
                                215                 :                                            int nchildren,
                                216                 :                                            TransactionId *children,
                                217                 :                                            int nrels,
                                218                 :                                            RelFileLocator *rels,
                                219                 :                                            int nstats,
                                220                 :                                            xl_xact_stats_item *stats,
                                221                 :                                            const char *gid);
                                222                 : static void ProcessRecords(char *bufptr, TransactionId xid,
                                223                 :                            const TwoPhaseCallback callbacks[]);
                                224                 : static void RemoveGXact(GlobalTransaction gxact);
                                225                 : 
                                226                 : static void XlogReadTwoPhaseData(XLogRecPtr lsn, char **buf, int *len);
                                227                 : static char *ProcessTwoPhaseBuffer(TransactionId xid,
                                228                 :                                    XLogRecPtr prepare_start_lsn,
                                229                 :                                    bool fromdisk, bool setParent, bool setNextXid);
                                230                 : static void MarkAsPreparingGuts(GlobalTransaction gxact, TransactionId xid,
                                231                 :                                 const char *gid, TimestampTz prepared_at, Oid owner,
                                232                 :                                 Oid databaseid);
                                233                 : static void RemoveTwoPhaseFile(TransactionId xid, bool giveWarning);
                                234                 : static void RecreateTwoPhaseFile(TransactionId xid, void *content, int len);
                                235                 : 
                                236                 : /*
                                237                 :  * Initialization of shared memory
                                238                 :  */
                                239                 : Size
 6505 tgl                       240 CBC        4564 : TwoPhaseShmemSize(void)
                                241                 : {
                                242                 :     Size        size;
                                243                 : 
                                244                 :     /* Need the fixed struct, the array of pointers, and the GTD structs */
 6441                           245            4564 :     size = offsetof(TwoPhaseStateData, prepXacts);
                                246            4564 :     size = add_size(size, mul_size(max_prepared_xacts,
                                247                 :                                    sizeof(GlobalTransaction)));
                                248            4564 :     size = MAXALIGN(size);
                                249            4564 :     size = add_size(size, mul_size(max_prepared_xacts,
                                250                 :                                    sizeof(GlobalTransactionData)));
                                251                 : 
                                252            4564 :     return size;
                                253                 : }
                                254                 : 
                                255                 : void
 6505                           256            1826 : TwoPhaseShmemInit(void)
                                257                 : {
                                258                 :     bool        found;
                                259                 : 
                                260            1826 :     TwoPhaseState = ShmemInitStruct("Prepared Transaction Table",
                                261                 :                                     TwoPhaseShmemSize(),
                                262                 :                                     &found);
                                263            1826 :     if (!IsUnderPostmaster)
                                264                 :     {
                                265                 :         GlobalTransaction gxacts;
                                266                 :         int         i;
                                267                 : 
                                268            1826 :         Assert(!found);
 5271                           269            1826 :         TwoPhaseState->freeGXacts = NULL;
 6505                           270            1826 :         TwoPhaseState->numPrepXacts = 0;
                                271                 : 
                                272                 :         /*
                                273                 :          * Initialize the linked list of free GlobalTransactionData structs
                                274                 :          */
                                275            1826 :         gxacts = (GlobalTransaction)
                                276            1826 :             ((char *) TwoPhaseState +
 6385 bruce                     277            1826 :              MAXALIGN(offsetof(TwoPhaseStateData, prepXacts) +
                                278                 :                       sizeof(GlobalTransaction) * max_prepared_xacts));
 6505 tgl                       279            2591 :         for (i = 0; i < max_prepared_xacts; i++)
                                280                 :         {
                                281                 :             /* insert into linked list */
 4153 rhaas                     282             765 :             gxacts[i].next = TwoPhaseState->freeGXacts;
 5271 tgl                       283             765 :             TwoPhaseState->freeGXacts = &gxacts[i];
                                284                 : 
                                285                 :             /* associate it with a PGPROC assigned by InitProcGlobal */
 3896                           286             765 :             gxacts[i].pgprocno = PreparedXactProcs[i].pgprocno;
                                287                 : 
                                288                 :             /*
                                289                 :              * Assign a unique ID for each dummy proc, so that the range of
                                290                 :              * dummy backend IDs immediately follows the range of normal
                                291                 :              * backend IDs. We don't dare to assign a real backend ID to dummy
                                292                 :              * procs, because prepared transactions don't take part in cache
                                293                 :              * invalidation like a real backend ID would imply, but having a
                                294                 :              * unique ID for them is nevertheless handy. This arrangement
                                295                 :              * allows you to allocate an array of size (MaxBackends +
                                296                 :              * max_prepared_xacts + 1), and have a slot for every backend and
                                297                 :              * prepared transaction. Currently multixact.c uses that
                                298                 :              * technique.
                                299                 :              */
  362 rhaas                     300             765 :             gxacts[i].dummyBackendId = MaxBackends + 1 + i;
                                301                 :         }
                                302                 :     }
                                303                 :     else
 6505 tgl                       304 UBC           0 :         Assert(found);
 6505 tgl                       305 CBC        1826 : }
                                306                 : 
                                307                 : /*
                                308                 :  * Exit hook to unlock the global transaction entry we're working on.
                                309                 :  */
                                310                 : static void
 3251 heikki.linnakangas        311             125 : AtProcExit_Twophase(int code, Datum arg)
                                312                 : {
                                313                 :     /* same logic as abort */
                                314             125 :     AtAbort_Twophase();
                                315             125 : }
                                316                 : 
                                317                 : /*
                                318                 :  * Abort hook to unlock the global transaction entry we're working on.
                                319                 :  */
                                320                 : void
                                321           20223 : AtAbort_Twophase(void)
                                322                 : {
                                323           20223 :     if (MyLockedGxact == NULL)
                                324           20221 :         return;
                                325                 : 
                                326                 :     /*
                                327                 :      * What to do with the locked global transaction entry?  If we were in the
                                328                 :      * process of preparing the transaction, but haven't written the WAL
                                329                 :      * record and state file yet, the transaction must not be considered as
                                330                 :      * prepared.  Likewise, if we are in the process of finishing an
                                331                 :      * already-prepared transaction, and fail after having already written the
                                332                 :      * 2nd phase commit or rollback record to the WAL, the transaction should
                                333                 :      * not be considered as prepared anymore.  In those cases, just remove the
                                334                 :      * entry from shared memory.
                                335                 :      *
                                336                 :      * Otherwise, the entry must be left in place so that the transaction can
                                337                 :      * be finished later, so just unlock it.
                                338                 :      *
                                339                 :      * If we abort during prepare, after having written the WAL record, we
                                340                 :      * might not have transferred all locks and other state to the prepared
                                341                 :      * transaction yet.  Likewise, if we abort during commit or rollback,
                                342                 :      * after having written the WAL record, we might not have released all the
                                343                 :      * resources held by the transaction yet.  In those cases, the in-memory
                                344                 :      * state can be wrong, but it's too late to back out.
                                345                 :      */
 2125 alvherre                  346               2 :     LWLockAcquire(TwoPhaseStateLock, LW_EXCLUSIVE);
 3251 heikki.linnakangas        347               2 :     if (!MyLockedGxact->valid)
                                348               2 :         RemoveGXact(MyLockedGxact);
                                349                 :     else
 3251 heikki.linnakangas        350 UBC           0 :         MyLockedGxact->locking_backend = InvalidBackendId;
 2125 alvherre                  351 CBC           2 :     LWLockRelease(TwoPhaseStateLock);
                                352                 : 
 3251 heikki.linnakangas        353               2 :     MyLockedGxact = NULL;
                                354                 : }
                                355                 : 
                                356                 : /*
                                357                 :  * This is called after we have finished transferring state to the prepared
                                358                 :  * PGPROC entry.
                                359                 :  */
                                360                 : void
 2794 andres                    361             384 : PostPrepare_Twophase(void)
                                362                 : {
 3251 heikki.linnakangas        363             384 :     LWLockAcquire(TwoPhaseStateLock, LW_EXCLUSIVE);
                                364             384 :     MyLockedGxact->locking_backend = InvalidBackendId;
                                365             384 :     LWLockRelease(TwoPhaseStateLock);
                                366                 : 
                                367             384 :     MyLockedGxact = NULL;
                                368             384 : }
                                369                 : 
                                370                 : 
                                371                 : /*
                                372                 :  * MarkAsPreparing
                                373                 :  *      Reserve the GID for the given transaction.
                                374                 :  */
                                375                 : GlobalTransaction
 6504 tgl                       376             367 : MarkAsPreparing(TransactionId xid, const char *gid,
                                377                 :                 TimestampTz prepared_at, Oid owner, Oid databaseid)
                                378                 : {
                                379                 :     GlobalTransaction gxact;
                                380                 :     int         i;
                                381                 : 
 6505                           382             367 :     if (strlen(gid) >= GIDSIZE)
 6505 tgl                       383 UBC           0 :         ereport(ERROR,
                                384                 :                 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
                                385                 :                  errmsg("transaction identifier \"%s\" is too long",
                                386                 :                         gid)));
                                387                 : 
                                388                 :     /* fail immediately if feature is disabled */
 5099 tgl                       389 CBC         367 :     if (max_prepared_xacts == 0)
                                390               9 :         ereport(ERROR,
                                391                 :                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
                                392                 :                  errmsg("prepared transactions are disabled"),
                                393                 :                  errhint("Set max_prepared_transactions to a nonzero value.")));
                                394                 : 
                                395                 :     /* on first call, register the exit hook */
 3251 heikki.linnakangas        396             358 :     if (!twophaseExitRegistered)
                                397                 :     {
                                398              65 :         before_shmem_exit(AtProcExit_Twophase, 0);
                                399              65 :         twophaseExitRegistered = true;
                                400                 :     }
                                401                 : 
                                402             358 :     LWLockAcquire(TwoPhaseStateLock, LW_EXCLUSIVE);
                                403                 : 
                                404                 :     /* Check for conflicting GID */
 6505 tgl                       405             731 :     for (i = 0; i < TwoPhaseState->numPrepXacts; i++)
                                406                 :     {
                                407             375 :         gxact = TwoPhaseState->prepXacts[i];
                                408             375 :         if (strcmp(gxact->gid, gid) == 0)
                                409                 :         {
                                410               2 :             ereport(ERROR,
                                411                 :                     (errcode(ERRCODE_DUPLICATE_OBJECT),
                                412                 :                      errmsg("transaction identifier \"%s\" is already in use",
                                413                 :                             gid)));
                                414                 :         }
                                415                 :     }
                                416                 : 
                                417                 :     /* Get a free gxact from the freelist */
 5271                           418             356 :     if (TwoPhaseState->freeGXacts == NULL)
 6505 tgl                       419 UBC           0 :         ereport(ERROR,
                                420                 :                 (errcode(ERRCODE_OUT_OF_MEMORY),
                                421                 :                  errmsg("maximum number of prepared transactions reached"),
                                422                 :                  errhint("Increase max_prepared_transactions (currently %d).",
                                423                 :                          max_prepared_xacts)));
 5271 tgl                       424 CBC         356 :     gxact = TwoPhaseState->freeGXacts;
 3896                           425             356 :     TwoPhaseState->freeGXacts = gxact->next;
                                426                 : 
 2196 simon                     427             356 :     MarkAsPreparingGuts(gxact, xid, gid, prepared_at, owner, databaseid);
                                428                 : 
                                429             356 :     gxact->ondisk = false;
                                430                 : 
                                431                 :     /* And insert it into the active array */
                                432             356 :     Assert(TwoPhaseState->numPrepXacts < max_prepared_xacts);
                                433             356 :     TwoPhaseState->prepXacts[TwoPhaseState->numPrepXacts++] = gxact;
                                434                 : 
                                435             356 :     LWLockRelease(TwoPhaseStateLock);
                                436                 : 
                                437             356 :     return gxact;
                                438                 : }
                                439                 : 
                                440                 : /*
                                441                 :  * MarkAsPreparingGuts
                                442                 :  *
                                443                 :  * This uses a gxact struct and puts it into the active array.
                                444                 :  * NOTE: this is also used when reloading a gxact after a crash; so avoid
                                445                 :  * assuming that we can use very much backend context.
                                446                 :  *
                                447                 :  * Note: This function should be called with appropriate locks held.
                                448                 :  */
                                449                 : static void
                                450             386 : MarkAsPreparingGuts(GlobalTransaction gxact, TransactionId xid, const char *gid,
                                451                 :                     TimestampTz prepared_at, Oid owner, Oid databaseid)
                                452                 : {
                                453                 :     PGPROC     *proc;
                                454                 :     int         i;
                                455                 : 
 2125 alvherre                  456             386 :     Assert(LWLockHeldByMeInMode(TwoPhaseStateLock, LW_EXCLUSIVE));
                                457                 : 
 2196 simon                     458             386 :     Assert(gxact != NULL);
 4153 rhaas                     459             386 :     proc = &ProcGlobal->allProcs[gxact->pgprocno];
                                460                 : 
                                461                 :     /* Initialize the PGPROC entry */
                                462           42846 :     MemSet(proc, 0, sizeof(PGPROC));
                                463             386 :     proc->pgprocno = gxact->pgprocno;
   81 andres                    464 GNC         386 :     dlist_node_init(&proc->links);
 1026 peter                     465 CBC         386 :     proc->waitStatus = PROC_WAIT_STATUS_OK;
  533 noah                      466             386 :     if (LocalTransactionIdIsValid(MyProc->lxid))
                                467                 :     {
                                468                 :         /* clone VXID, for TwoPhaseGetXidByVirtualXID() to find */
                                469             356 :         proc->lxid = MyProc->lxid;
                                470             356 :         proc->backendId = MyBackendId;
                                471                 :     }
                                472                 :     else
                                473                 :     {
                                474              30 :         Assert(AmStartupProcess() || !IsPostmasterEnvironment);
                                475                 :         /* GetLockConflicts() uses this to specify a wait on the XID */
                                476              30 :         proc->lxid = xid;
                                477              30 :         proc->backendId = InvalidBackendId;
                                478                 :     }
  968 andres                    479             386 :     proc->xid = xid;
  969                           480             386 :     Assert(proc->xmin == InvalidTransactionId);
  366 rhaas                     481             386 :     proc->delayChkptFlags = 0;
  874 alvherre                  482             386 :     proc->statusFlags = 0;
 4153 rhaas                     483             386 :     proc->pid = 0;
                                484             386 :     proc->databaseId = databaseid;
                                485             386 :     proc->roleId = owner;
 1700 michael                   486             386 :     proc->tempNamespaceId = InvalidOid;
 2258 andrew                    487             386 :     proc->isBackgroundWorker = false;
  140 andres                    488 GNC         386 :     proc->lwWaiting = LW_WS_NOT_WAITING;
 4087 heikki.linnakangas        489 CBC         386 :     proc->lwWaitMode = 0;
 4153 rhaas                     490             386 :     proc->waitLock = NULL;
                                491             386 :     proc->waitProcLock = NULL;
  776 fujii                     492             386 :     pg_atomic_init_u64(&proc->waitStart, 0);
 6328 tgl                       493            6562 :     for (i = 0; i < NUM_LOCK_PARTITIONS; i++)
   81 andres                    494 GNC        6176 :         dlist_init(&proc->myProcLocks[i]);
                                495                 :     /* subxid data must be filled later by GXactLoadSubxactData */
  968 andres                    496 CBC         386 :     proc->subxidStatus.overflowed = false;
                                497             386 :     proc->subxidStatus.count = 0;
                                498                 : 
 6504 tgl                       499             386 :     gxact->prepared_at = prepared_at;
 2196 simon                     500             386 :     gxact->xid = xid;
 6505 tgl                       501             386 :     gxact->owner = owner;
 3251 heikki.linnakangas        502             386 :     gxact->locking_backend = MyBackendId;
 6505 tgl                       503             386 :     gxact->valid = false;
 2196 simon                     504             386 :     gxact->inredo = false;
 6505 tgl                       505             386 :     strcpy(gxact->gid, gid);
                                506                 : 
                                507                 :     /*
                                508                 :      * Remember that we have this GlobalTransaction entry locked for us. If we
                                509                 :      * abort after this, we must release it.
                                510                 :      */
 3251 heikki.linnakangas        511             386 :     MyLockedGxact = gxact;
 6505 tgl                       512             386 : }
                                513                 : 
                                514                 : /*
                                515                 :  * GXactLoadSubxactData
                                516                 :  *
                                517                 :  * If the transaction being persisted had any subtransactions, this must
                                518                 :  * be called before MarkAsPrepared() to load information into the dummy
                                519                 :  * PGPROC.
                                520                 :  */
                                521                 : static void
                                522             173 : GXactLoadSubxactData(GlobalTransaction gxact, int nsubxacts,
                                523                 :                      TransactionId *children)
                                524                 : {
 3955 bruce                     525             173 :     PGPROC     *proc = &ProcGlobal->allProcs[gxact->pgprocno];
                                526                 : 
                                527                 :     /* We need no extra lock since the GXACT isn't valid yet */
 6505 tgl                       528             173 :     if (nsubxacts > PGPROC_MAX_CACHED_SUBXIDS)
                                529                 :     {
  968 andres                    530               4 :         proc->subxidStatus.overflowed = true;
 6505 tgl                       531               4 :         nsubxacts = PGPROC_MAX_CACHED_SUBXIDS;
                                532                 :     }
                                533             173 :     if (nsubxacts > 0)
                                534                 :     {
 4153 rhaas                     535             158 :         memcpy(proc->subxids.xids, children,
                                536                 :                nsubxacts * sizeof(TransactionId));
  968 andres                    537             158 :         proc->subxidStatus.count = nsubxacts;
                                538                 :     }
 6505 tgl                       539             173 : }
                                540                 : 
                                541                 : /*
                                542                 :  * MarkAsPrepared
                                543                 :  *      Mark the GXACT as fully valid, and enter it into the global ProcArray.
                                544                 :  *
                                545                 :  * lock_held indicates whether caller already holds TwoPhaseStateLock.
                                546                 :  */
                                547                 : static void
 2125 alvherre                  548             384 : MarkAsPrepared(GlobalTransaction gxact, bool lock_held)
                                549                 : {
                                550                 :     /* Lock here may be overkill, but I'm not convinced of that ... */
                                551             384 :     if (!lock_held)
                                552             354 :         LWLockAcquire(TwoPhaseStateLock, LW_EXCLUSIVE);
 6505 tgl                       553             384 :     Assert(!gxact->valid);
                                554             384 :     gxact->valid = true;
 2125 alvherre                  555             384 :     if (!lock_held)
                                556             354 :         LWLockRelease(TwoPhaseStateLock);
                                557                 : 
                                558                 :     /*
                                559                 :      * Put it into the global ProcArray so TransactionIdIsInProgress considers
                                560                 :      * the XID as still running.
                                561                 :      */
 4153 rhaas                     562             384 :     ProcArrayAdd(&ProcGlobal->allProcs[gxact->pgprocno]);
 6505 tgl                       563             384 : }
                                564                 : 
                                565                 : /*
                                566                 :  * LockGXact
                                567                 :  *      Locate the prepared transaction and mark it busy for COMMIT or PREPARE.
                                568                 :  */
                                569                 : static GlobalTransaction
 6494                           570             369 : LockGXact(const char *gid, Oid user)
                                571                 : {
                                572                 :     int         i;
                                573                 : 
                                574                 :     /* on first call, register the exit hook */
 3251 heikki.linnakangas        575             369 :     if (!twophaseExitRegistered)
                                576                 :     {
                                577              60 :         before_shmem_exit(AtProcExit_Twophase, 0);
                                578              60 :         twophaseExitRegistered = true;
                                579                 :     }
                                580                 : 
 6505 tgl                       581             369 :     LWLockAcquire(TwoPhaseStateLock, LW_EXCLUSIVE);
                                582                 : 
                                583             658 :     for (i = 0; i < TwoPhaseState->numPrepXacts; i++)
                                584                 :     {
 6385 bruce                     585             652 :         GlobalTransaction gxact = TwoPhaseState->prepXacts[i];
 3955                           586             652 :         PGPROC     *proc = &ProcGlobal->allProcs[gxact->pgprocno];
                                587                 : 
                                588                 :         /* Ignore not-yet-valid GIDs */
 6505 tgl                       589             652 :         if (!gxact->valid)
 6505 tgl                       590 UBC           0 :             continue;
 6505 tgl                       591 CBC         652 :         if (strcmp(gxact->gid, gid) != 0)
                                592             289 :             continue;
                                593                 : 
                                594                 :         /* Found it, but has someone else got it locked? */
 3251 heikki.linnakangas        595             363 :         if (gxact->locking_backend != InvalidBackendId)
 3251 heikki.linnakangas        596 UBC           0 :             ereport(ERROR,
                                597                 :                     (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
                                598                 :                      errmsg("prepared transaction with identifier \"%s\" is busy",
                                599                 :                             gid)));
                                600                 : 
 6505 tgl                       601 CBC         363 :         if (user != gxact->owner && !superuser_arg(user))
 6505 tgl                       602 UBC           0 :             ereport(ERROR,
                                603                 :                     (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
                                604                 :                      errmsg("permission denied to finish prepared transaction"),
                                605                 :                      errhint("Must be superuser or the user that prepared the transaction.")));
                                606                 : 
                                607                 :         /*
                                608                 :          * Note: it probably would be possible to allow committing from
                                609                 :          * another database; but at the moment NOTIFY is known not to work and
                                610                 :          * there may be some other issues as well.  Hence disallow until
                                611                 :          * someone gets motivated to make it work.
                                612                 :          */
 4153 rhaas                     613 CBC         363 :         if (MyDatabaseId != proc->databaseId)
 5899 tgl                       614 UBC           0 :             ereport(ERROR,
                                615                 :                     (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
                                616                 :                      errmsg("prepared transaction belongs to another database"),
                                617                 :                      errhint("Connect to the database where the transaction was prepared to finish it.")));
                                618                 : 
                                619                 :         /* OK for me to lock it */
 3251 heikki.linnakangas        620 CBC         363 :         gxact->locking_backend = MyBackendId;
                                621             363 :         MyLockedGxact = gxact;
                                622                 : 
 6505 tgl                       623             363 :         LWLockRelease(TwoPhaseStateLock);
                                624                 : 
                                625             363 :         return gxact;
                                626                 :     }
                                627                 : 
                                628               6 :     LWLockRelease(TwoPhaseStateLock);
                                629                 : 
                                630               6 :     ereport(ERROR,
                                631                 :             (errcode(ERRCODE_UNDEFINED_OBJECT),
                                632                 :              errmsg("prepared transaction with identifier \"%s\" does not exist",
                                633                 :                     gid)));
                                634                 : 
                                635                 :     /* NOTREACHED */
                                636                 :     return NULL;
                                637                 : }
                                638                 : 
                                639                 : /*
                                640                 :  * RemoveGXact
                                641                 :  *      Remove the prepared transaction from the shared memory array.
                                642                 :  *
                                643                 :  * NB: caller should have already removed it from ProcArray
                                644                 :  */
                                645                 : static void
                                646             414 : RemoveGXact(GlobalTransaction gxact)
                                647                 : {
                                648                 :     int         i;
                                649                 : 
 2125 alvherre                  650             414 :     Assert(LWLockHeldByMeInMode(TwoPhaseStateLock, LW_EXCLUSIVE));
                                651                 : 
 6505 tgl                       652             701 :     for (i = 0; i < TwoPhaseState->numPrepXacts; i++)
                                653                 :     {
                                654             701 :         if (gxact == TwoPhaseState->prepXacts[i])
                                655                 :         {
                                656                 :             /* remove from the active array */
                                657             414 :             TwoPhaseState->numPrepXacts--;
                                658             414 :             TwoPhaseState->prepXacts[i] = TwoPhaseState->prepXacts[TwoPhaseState->numPrepXacts];
                                659                 : 
                                660                 :             /* and put it back in the freelist */
 4153 rhaas                     661             414 :             gxact->next = TwoPhaseState->freeGXacts;
 5271 tgl                       662             414 :             TwoPhaseState->freeGXacts = gxact;
                                663                 : 
 6505                           664             414 :             return;
                                665                 :         }
                                666                 :     }
                                667                 : 
 6505 tgl                       668 UBC           0 :     elog(ERROR, "failed to find %p in GlobalTransaction array", gxact);
                                669                 : }
                                670                 : 
                                671                 : /*
                                672                 :  * Returns an array of all prepared transactions for the user-level
                                673                 :  * function pg_prepared_xact.
                                674                 :  *
                                675                 :  * The returned array and all its elements are copies of internal data
                                676                 :  * structures, to minimize the time we need to hold the TwoPhaseStateLock.
                                677                 :  *
                                678                 :  * WARNING -- we return even those transactions that are not fully prepared
                                679                 :  * yet.  The caller should filter them out if he doesn't want them.
                                680                 :  *
                                681                 :  * The returned array is palloc'd.
                                682                 :  */
                                683                 : static int
 6505 tgl                       684 CBC          89 : GetPreparedTransactionList(GlobalTransaction *gxacts)
                                685                 : {
                                686                 :     GlobalTransaction array;
                                687                 :     int         num;
                                688                 :     int         i;
                                689                 : 
                                690              89 :     LWLockAcquire(TwoPhaseStateLock, LW_SHARED);
                                691                 : 
                                692              89 :     if (TwoPhaseState->numPrepXacts == 0)
                                693                 :     {
                                694              48 :         LWLockRelease(TwoPhaseStateLock);
                                695                 : 
                                696              48 :         *gxacts = NULL;
                                697              48 :         return 0;
                                698                 :     }
                                699                 : 
                                700              41 :     num = TwoPhaseState->numPrepXacts;
                                701              41 :     array = (GlobalTransaction) palloc(sizeof(GlobalTransactionData) * num);
                                702              41 :     *gxacts = array;
                                703              87 :     for (i = 0; i < num; i++)
                                704              46 :         memcpy(array + i, TwoPhaseState->prepXacts[i],
                                705                 :                sizeof(GlobalTransactionData));
                                706                 : 
                                707              41 :     LWLockRelease(TwoPhaseStateLock);
                                708                 : 
                                709              41 :     return num;
                                710                 : }
                                711                 : 
                                712                 : 
                                713                 : /* Working status for pg_prepared_xact */
                                714                 : typedef struct
                                715                 : {
                                716                 :     GlobalTransaction array;
                                717                 :     int         ngxacts;
                                718                 :     int         currIdx;
                                719                 : } Working_State;
                                720                 : 
                                721                 : /*
                                722                 :  * pg_prepared_xact
                                723                 :  *      Produce a view with one row per prepared transaction.
                                724                 :  *
                                725                 :  * This function is here so we don't have to export the
                                726                 :  * GlobalTransactionData struct definition.
                                727                 :  */
                                728                 : Datum
                                729             135 : pg_prepared_xact(PG_FUNCTION_ARGS)
                                730                 : {
                                731                 :     FuncCallContext *funcctx;
                                732                 :     Working_State *status;
                                733                 : 
                                734             135 :     if (SRF_IS_FIRSTCALL())
                                735                 :     {
                                736                 :         TupleDesc   tupdesc;
                                737                 :         MemoryContext oldcontext;
                                738                 : 
                                739                 :         /* create a function context for cross-call persistence */
                                740              89 :         funcctx = SRF_FIRSTCALL_INIT();
                                741                 : 
                                742                 :         /*
                                743                 :          * Switch to memory context appropriate for multiple function calls
                                744                 :          */
                                745              89 :         oldcontext = MemoryContextSwitchTo(funcctx->multi_call_memory_ctx);
                                746                 : 
                                747                 :         /* build tupdesc for result tuples */
                                748                 :         /* this had better match pg_prepared_xacts view in system_views.sql */
 1601 andres                    749              89 :         tupdesc = CreateTemplateTupleDesc(5);
 6505 tgl                       750              89 :         TupleDescInitEntry(tupdesc, (AttrNumber) 1, "transaction",
                                751                 :                            XIDOID, -1, 0);
                                752              89 :         TupleDescInitEntry(tupdesc, (AttrNumber) 2, "gid",
                                753                 :                            TEXTOID, -1, 0);
 6504                           754              89 :         TupleDescInitEntry(tupdesc, (AttrNumber) 3, "prepared",
                                755                 :                            TIMESTAMPTZOID, -1, 0);
                                756              89 :         TupleDescInitEntry(tupdesc, (AttrNumber) 4, "ownerid",
                                757                 :                            OIDOID, -1, 0);
                                758              89 :         TupleDescInitEntry(tupdesc, (AttrNumber) 5, "dbid",
                                759                 :                            OIDOID, -1, 0);
                                760                 : 
 6505                           761              89 :         funcctx->tuple_desc = BlessTupleDesc(tupdesc);
                                762                 : 
                                763                 :         /*
                                764                 :          * Collect all the 2PC status information that we will format and send
                                765                 :          * out as a result set.
                                766                 :          */
                                767              89 :         status = (Working_State *) palloc(sizeof(Working_State));
                                768              89 :         funcctx->user_fctx = (void *) status;
                                769                 : 
                                770              89 :         status->ngxacts = GetPreparedTransactionList(&status->array);
                                771              89 :         status->currIdx = 0;
                                772                 : 
                                773              89 :         MemoryContextSwitchTo(oldcontext);
                                774                 :     }
                                775                 : 
                                776             135 :     funcctx = SRF_PERCALL_SETUP();
                                777             135 :     status = (Working_State *) funcctx->user_fctx;
                                778                 : 
                                779             135 :     while (status->array != NULL && status->currIdx < status->ngxacts)
                                780                 :     {
                                781              46 :         GlobalTransaction gxact = &status->array[status->currIdx++];
 3955 bruce                     782              46 :         PGPROC     *proc = &ProcGlobal->allProcs[gxact->pgprocno];
  267 peter                     783 GNC          46 :         Datum       values[5] = {0};
                                784              46 :         bool        nulls[5] = {0};
                                785                 :         HeapTuple   tuple;
                                786                 :         Datum       result;
                                787                 : 
 6505 tgl                       788 CBC          46 :         if (!gxact->valid)
 6505 tgl                       789 UBC           0 :             continue;
                                790                 : 
                                791                 :         /*
                                792                 :          * Form tuple with appropriate data.
                                793                 :          */
 6505 tgl                       794 ECB             : 
  968 andres                    795 CBC          46 :         values[0] = TransactionIdGetDatum(proc->xid);
 5493 tgl                       796              46 :         values[1] = CStringGetTextDatum(gxact->gid);
 6504                           797              46 :         values[2] = TimestampTzGetDatum(gxact->prepared_at);
 6494 tgl                       798 GIC          46 :         values[3] = ObjectIdGetDatum(gxact->owner);
 4153 rhaas                     799 CBC          46 :         values[4] = ObjectIdGetDatum(proc->databaseId);
 6505 tgl                       800 ECB             : 
 6505 tgl                       801 CBC          46 :         tuple = heap_form_tuple(funcctx->tuple_desc, values, nulls);
 6505 tgl                       802 GIC          46 :         result = HeapTupleGetDatum(tuple);
                                803              46 :         SRF_RETURN_NEXT(funcctx, result);
 6505 tgl                       804 ECB             :     }
                                805                 : 
 6505 tgl                       806 GIC          89 :     SRF_RETURN_DONE(funcctx);
                                807                 : }
                                808                 : 
                                809                 : /*
                                810                 :  * TwoPhaseGetGXact
                                811                 :  *      Get the GlobalTransaction struct for a prepared transaction
                                812                 :  *      specified by XID
                                813                 :  *
                                814                 :  * If lock_held is set to true, TwoPhaseStateLock will not be taken, so the
                                815                 :  * caller had better hold it.
 4885 heikki.linnakangas        816 ECB             :  */
                                817                 : static GlobalTransaction
 1504 michael                   818 CBC        1386 : TwoPhaseGetGXact(TransactionId xid, bool lock_held)
                                819                 : {
 3896 tgl                       820 GIC        1386 :     GlobalTransaction result = NULL;
                                821                 :     int         i;
                                822                 : 
                                823                 :     static TransactionId cached_xid = InvalidTransactionId;
 3896 tgl                       824 ECB             :     static GlobalTransaction cached_gxact = NULL;
                                825                 : 
 1504 michael                   826 GIC        1386 :     Assert(!lock_held || LWLockHeldByMe(TwoPhaseStateLock));
                                827                 : 
                                828                 :     /*
                                829                 :      * During a recovery, COMMIT PREPARED, or ABORT PREPARED, we'll be called
 6505 tgl                       830 ECB             :      * repeatedly for the same XID.  We can save work with a simple cache.
                                831                 :      */
 6505 tgl                       832 GIC        1386 :     if (xid == cached_xid)
 3896 tgl                       833 CBC         933 :         return cached_gxact;
 6505 tgl                       834 ECB             : 
 1504 michael                   835 GIC         453 :     if (!lock_held)
 1504 michael                   836 CBC         384 :         LWLockAcquire(TwoPhaseStateLock, LW_SHARED);
                                837                 : 
 6505 tgl                       838             830 :     for (i = 0; i < TwoPhaseState->numPrepXacts; i++)
                                839                 :     {
 6385 bruce                     840             830 :         GlobalTransaction gxact = TwoPhaseState->prepXacts[i];
                                841                 : 
  968 andres                    842             830 :         if (gxact->xid == xid)
 6505 tgl                       843 ECB             :         {
 3896 tgl                       844 GIC         453 :             result = gxact;
 6505                           845             453 :             break;
                                846                 :         }
 6505 tgl                       847 ECB             :     }
                                848                 : 
 1504 michael                   849 GIC         453 :     if (!lock_held)
 1504 michael                   850 CBC         384 :         LWLockRelease(TwoPhaseStateLock);
 6505 tgl                       851 EUB             : 
 6505 tgl                       852 GIC         453 :     if (result == NULL)         /* should not happen */
 3896 tgl                       853 LBC           0 :         elog(ERROR, "failed to find GlobalTransaction for xid %u", xid);
 6505 tgl                       854 ECB             : 
 6505 tgl                       855 GIC         453 :     cached_xid = xid;
 3896 tgl                       856 CBC         453 :     cached_gxact = result;
                                857                 : 
 6505 tgl                       858 GIC         453 :     return result;
                                859                 : }
                                860                 : 
                                861                 : /*
                                862                 :  * TwoPhaseGetXidByVirtualXID
                                863                 :  *      Lookup VXID among xacts prepared since last startup.
                                864                 :  *
                                865                 :  * (This won't find recovered xacts.)  If more than one matches, return any
                                866                 :  * and set "have_more" to true.  To witness multiple matches, a single
                                867                 :  * BackendId must consume 2^32 LXIDs, with no intervening database restart.
  533 noah                      868 ECB             :  */
                                869                 : TransactionId
  533 noah                      870 GIC          98 : TwoPhaseGetXidByVirtualXID(VirtualTransactionId vxid,
                                871                 :                            bool *have_more)
  533 noah                      872 ECB             : {
                                873                 :     int         i;
  533 noah                      874 CBC          98 :     TransactionId result = InvalidTransactionId;
  533 noah                      875 ECB             : 
  533 noah                      876 GIC          98 :     Assert(VirtualTransactionIdIsValid(vxid));
  533 noah                      877 CBC          98 :     LWLockAcquire(TwoPhaseStateLock, LW_SHARED);
                                878                 : 
                                879             150 :     for (i = 0; i < TwoPhaseState->numPrepXacts; i++)
                                880                 :     {
  533 noah                      881 GIC          52 :         GlobalTransaction gxact = TwoPhaseState->prepXacts[i];
                                882                 :         PGPROC     *proc;
  533 noah                      883 ECB             :         VirtualTransactionId proc_vxid;
                                884                 : 
  533 noah                      885 CBC          52 :         if (!gxact->valid)
                                886               2 :             continue;
                                887              50 :         proc = &ProcGlobal->allProcs[gxact->pgprocno];
  533 noah                      888 GIC          50 :         GET_VXID_FROM_PGPROC(proc_vxid, *proc);
                                889              50 :         if (VirtualTransactionIdEquals(vxid, proc_vxid))
  533 noah                      890 ECB             :         {
                                891                 :             /* Startup process sets proc->backendId to InvalidBackendId. */
  533 noah                      892 CBC           9 :             Assert(!gxact->inredo);
                                893                 : 
  533 noah                      894 GBC           9 :             if (result != InvalidTransactionId)
  533 noah                      895 EUB             :             {
  533 noah                      896 UIC           0 :                 *have_more = true;
  533 noah                      897 LBC           0 :                 break;
                                898                 :             }
  533 noah                      899 GIC           9 :             result = gxact->xid;
                                900                 :         }
  533 noah                      901 ECB             :     }
                                902                 : 
  533 noah                      903 CBC          98 :     LWLockRelease(TwoPhaseStateLock);
                                904                 : 
  533 noah                      905 GIC          98 :     return result;
                                906                 : }
                                907                 : 
                                908                 : /*
                                909                 :  * TwoPhaseGetDummyBackendId
                                910                 :  *      Get the dummy backend ID for prepared transaction specified by XID
                                911                 :  *
                                912                 :  * Dummy backend IDs are similar to real backend IDs of real backends.
                                913                 :  * They start at MaxBackends + 1, and are unique across all currently active
                                914                 :  * real backends and prepared transactions.  If lock_held is set to true,
                                915                 :  * TwoPhaseStateLock will not be taken, so the caller had better hold it.
 3896 tgl                       916 ECB             :  */
                                917                 : BackendId
 1504 michael                   918 CBC         107 : TwoPhaseGetDummyBackendId(TransactionId xid, bool lock_held)
                                919                 : {
                                920             107 :     GlobalTransaction gxact = TwoPhaseGetGXact(xid, lock_held);
                                921                 : 
 3896 tgl                       922 GIC         107 :     return gxact->dummyBackendId;
                                923                 : }
                                924                 : 
                                925                 : /*
                                926                 :  * TwoPhaseGetDummyProc
                                927                 :  *      Get the PGPROC that represents a prepared transaction specified by XID
                                928                 :  *
                                929                 :  * If lock_held is set to true, TwoPhaseStateLock will not be taken, so the
                                930                 :  * caller had better hold it.
 3896 tgl                       931 ECB             :  */
                                932                 : PGPROC *
 1504 michael                   933 CBC        1279 : TwoPhaseGetDummyProc(TransactionId xid, bool lock_held)
                                934                 : {
                                935            1279 :     GlobalTransaction gxact = TwoPhaseGetGXact(xid, lock_held);
                                936                 : 
  776 fujii                     937 GIC        1279 :     return &ProcGlobal->allProcs[gxact->pgprocno];
                                938                 : }
                                939                 : 
                                940                 : /************************************************************************/
                                941                 : /* State file support                                                   */
                                942                 : /************************************************************************/
                                943                 : 
                                944                 : #define TwoPhaseFilePath(path, xid) \
                                945                 :     snprintf(path, MAXPGPATH, TWOPHASE_DIR "/%08X", xid)
                                946                 : 
                                947                 : /*
                                948                 :  * 2PC state file format:
                                949                 :  *
                                950                 :  *  1. TwoPhaseFileHeader
                                951                 :  *  2. TransactionId[] (subtransactions)
                                952                 :  *  3. RelFileLocator[] (files to be deleted at commit)
                                953                 :  *  4. RelFileLocator[] (files to be deleted at abort)
                                954                 :  *  5. SharedInvalidationMessage[] (inval messages to be sent at commit)
                                955                 :  *  6. TwoPhaseRecordOnDisk
                                956                 :  *  7. ...
                                957                 :  *  8. TwoPhaseRecordOnDisk (end sentinel, rmid == TWOPHASE_RM_END_ID)
                                958                 :  *  9. checksum (CRC-32C)
                                959                 :  *
                                960                 :  * Each segment except the final checksum is MAXALIGN'd.
                                961                 :  */
                                962                 : 
                                963                 : /*
                                964                 :  * Header for a 2PC state file
                                965                 :  */
                                966                 : #define TWOPHASE_MAGIC  0x57F94534  /* format identifier */
                                967                 : 
                                968                 : typedef xl_xact_prepare TwoPhaseFileHeader;
                                969                 : 
                                970                 : /*
                                971                 :  * Header for each record in a state file
                                972                 :  *
                                973                 :  * NOTE: len counts only the rmgr data, not the TwoPhaseRecordOnDisk header.
                                974                 :  * The rmgr data will be stored starting on a MAXALIGN boundary.
                                975                 :  */
                                976                 : typedef struct TwoPhaseRecordOnDisk
                                977                 : {
                                978                 :     uint32      len;            /* length of rmgr data */
                                979                 :     TwoPhaseRmgrId rmid;        /* resource manager for this record */
                                980                 :     uint16      info;           /* flag bits for use by rmgr */
                                981                 : } TwoPhaseRecordOnDisk;
                                982                 : 
                                983                 : /*
                                984                 :  * During prepare, the state file is assembled in memory before writing it
                                985                 :  * to WAL and the actual state file.  We use a chain of StateFileChunk blocks
                                986                 :  * for that.
                                987                 :  */
                                988                 : typedef struct StateFileChunk
                                989                 : {
                                990                 :     char       *data;
                                991                 :     uint32      len;
                                992                 :     struct StateFileChunk *next;
                                993                 : } StateFileChunk;
                                994                 : 
                                995                 : static struct xllist
                                996                 : {
                                997                 :     StateFileChunk *head;       /* first data block in the chain */
                                998                 :     StateFileChunk *tail;       /* last block in chain */
                                999                 :     uint32      num_chunks;
                               1000                 :     uint32      bytes_free;     /* free bytes left in tail block */
                               1001                 :     uint32      total_len;      /* total data bytes in chain */
                               1002                 : }           records;
                               1003                 : 
                               1004                 : 
                               1005                 : /*
                               1006                 :  * Append a block of data to records data structure.
                               1007                 :  *
                               1008                 :  * NB: each block is padded to a MAXALIGN multiple.  This must be
                               1009                 :  * accounted for when the file is later read!
                               1010                 :  *
                               1011                 :  * The data is copied, so the caller is free to modify it afterwards.
 6505 tgl                      1012 ECB             :  */
                               1013                 : static void
 6505 tgl                      1014 CBC        3889 : save_state_data(const void *data, uint32 len)
                               1015                 : {
 6385 bruce                    1016            3889 :     uint32      padlen = MAXALIGN(len);
                               1017                 : 
 6505 tgl                      1018            3889 :     if (padlen > records.bytes_free)
 6505 tgl                      1019 ECB             :     {
 3062 heikki.linnakangas       1020 CBC          40 :         records.tail->next = palloc0(sizeof(StateFileChunk));
 6505 tgl                      1021              40 :         records.tail = records.tail->next;
                               1022              40 :         records.tail->len = 0;
 6505 tgl                      1023 GIC          40 :         records.tail->next = NULL;
 3062 heikki.linnakangas       1024 CBC          40 :         records.num_chunks++;
 6505 tgl                      1025 ECB             : 
 6505 tgl                      1026 GIC          40 :         records.bytes_free = Max(padlen, 512);
                               1027              40 :         records.tail->data = palloc(records.bytes_free);
 6505 tgl                      1028 ECB             :     }
                               1029                 : 
 6505 tgl                      1030 CBC        3889 :     memcpy(((char *) records.tail->data) + records.tail->len, data, len);
                               1031            3889 :     records.tail->len += padlen;
                               1032            3889 :     records.bytes_free -= padlen;
 6505 tgl                      1033 GIC        3889 :     records.total_len += padlen;
                               1034            3889 : }
                               1035                 : 
                               1036                 : /*
                               1037                 :  * Start preparing a state file.
                               1038                 :  *
                               1039                 :  * Initializes data structure and inserts the 2PC file header record.
 6505 tgl                      1040 ECB             :  */
                               1041                 : void
 6505 tgl                      1042 CBC         356 : StartPrepare(GlobalTransaction gxact)
 6505 tgl                      1043 ECB             : {
 3955 bruce                    1044 GIC         356 :     PGPROC     *proc = &ProcGlobal->allProcs[gxact->pgprocno];
  968 andres                   1045             356 :     TransactionId xid = gxact->xid;
                               1046                 :     TwoPhaseFileHeader hdr;
                               1047                 :     TransactionId *children;
                               1048                 :     RelFileLocator *commitrels;
                               1049                 :     RelFileLocator *abortrels;
  368                          1050             356 :     xl_xact_stats_item *abortstats = NULL;
                               1051             356 :     xl_xact_stats_item *commitstats = NULL;
                               1052                 :     SharedInvalidationMessage *invalmsgs;
 6505 tgl                      1053 ECB             : 
                               1054                 :     /* Initialize linked list */
 3062 heikki.linnakangas       1055 CBC         356 :     records.head = palloc0(sizeof(StateFileChunk));
 6505 tgl                      1056 GIC         356 :     records.head->len = 0;
 6505 tgl                      1057 CBC         356 :     records.head->next = NULL;
 6505 tgl                      1058 ECB             : 
 6505 tgl                      1059 GIC         356 :     records.bytes_free = Max(sizeof(TwoPhaseFileHeader), 512);
 6505 tgl                      1060 CBC         356 :     records.head->data = palloc(records.bytes_free);
 6505 tgl                      1061 ECB             : 
 6505 tgl                      1062 GIC         356 :     records.tail = records.head;
 3062 heikki.linnakangas       1063 CBC         356 :     records.num_chunks = 1;
                               1064                 : 
 6505 tgl                      1065 GIC         356 :     records.total_len = 0;
 6505 tgl                      1066 ECB             : 
                               1067                 :     /* Create header */
 6505 tgl                      1068 CBC         356 :     hdr.magic = TWOPHASE_MAGIC;
                               1069             356 :     hdr.total_len = 0;          /* EndPrepare will fill this in */
                               1070             356 :     hdr.xid = xid;
 4153 rhaas                    1071             356 :     hdr.database = proc->databaseId;
 6504 tgl                      1072             356 :     hdr.prepared_at = gxact->prepared_at;
                               1073             356 :     hdr.owner = gxact->owner;
 6505                          1074             356 :     hdr.nsubxacts = xactGetCommittedChildren(&children);
 4622 rhaas                    1075             356 :     hdr.ncommitrels = smgrGetPendingDeletes(true, &commitrels);
                               1076             356 :     hdr.nabortrels = smgrGetPendingDeletes(false, &abortrels);
  368 andres                   1077             356 :     hdr.ncommitstats =
                               1078             356 :         pgstat_get_transactional_drops(true, &commitstats);
                               1079             356 :     hdr.nabortstats =
  368 andres                   1080 GIC         356 :         pgstat_get_transactional_drops(false, &abortstats);
 4859 simon                    1081 CBC         356 :     hdr.ninvalmsgs = xactGetCommittedInvalidationMessages(&invalmsgs,
                               1082                 :                                                           &hdr.initfileinval);
 2118 tgl                      1083             356 :     hdr.gidlen = strlen(gxact->gid) + 1; /* Include '\0' */
  419 michael                  1084 ECB             :     /* EndPrepare will fill the origin data, if necessary */
  419 michael                  1085 GIC         356 :     hdr.origin_lsn = InvalidXLogRecPtr;
  419 michael                  1086 CBC         356 :     hdr.origin_timestamp = 0;
 6505 tgl                      1087 ECB             : 
 6505 tgl                      1088 GIC         356 :     save_state_data(&hdr, sizeof(TwoPhaseFileHeader));
 2586 simon                    1089             356 :     save_state_data(gxact->gid, hdr.gidlen);
                               1090                 : 
                               1091                 :     /*
                               1092                 :      * Add the additional info about subxacts, deletable files and cache
 4790 bruce                    1093 ECB             :      * invalidation messages.
                               1094                 :      */
 6505 tgl                      1095 CBC         356 :     if (hdr.nsubxacts > 0)
                               1096                 :     {
                               1097             143 :         save_state_data(children, hdr.nsubxacts * sizeof(TransactionId));
                               1098                 :         /* While we have the child-xact data, stuff it in the gxact too */
                               1099             143 :         GXactLoadSubxactData(gxact, hdr.nsubxacts, children);
                               1100                 :     }
                               1101             356 :     if (hdr.ncommitrels > 0)
 6505 tgl                      1102 ECB             :     {
  277 rhaas                    1103 GNC           9 :         save_state_data(commitrels, hdr.ncommitrels * sizeof(RelFileLocator));
 6505 tgl                      1104 CBC           9 :         pfree(commitrels);
                               1105                 :     }
                               1106             356 :     if (hdr.nabortrels > 0)
 6505 tgl                      1107 ECB             :     {
  277 rhaas                    1108 GNC          16 :         save_state_data(abortrels, hdr.nabortrels * sizeof(RelFileLocator));
 6505 tgl                      1109 CBC          16 :         pfree(abortrels);
                               1110                 :     }
  368 andres                   1111             356 :     if (hdr.ncommitstats > 0)
  368 andres                   1112 ECB             :     {
  368 andres                   1113 CBC           9 :         save_state_data(commitstats,
  368 andres                   1114 GIC           9 :                         hdr.ncommitstats * sizeof(xl_xact_stats_item));
  368 andres                   1115 CBC           9 :         pfree(commitstats);
                               1116                 :     }
                               1117             356 :     if (hdr.nabortstats > 0)
  368 andres                   1118 ECB             :     {
  368 andres                   1119 CBC          12 :         save_state_data(abortstats,
  332 tgl                      1120 GIC          12 :                         hdr.nabortstats * sizeof(xl_xact_stats_item));
  368 andres                   1121 CBC          12 :         pfree(abortstats);
                               1122                 :     }
 4859 simon                    1123             356 :     if (hdr.ninvalmsgs > 0)
 4859 simon                    1124 ECB             :     {
 4859 simon                    1125 CBC          22 :         save_state_data(invalmsgs,
 4859 simon                    1126 GIC          22 :                         hdr.ninvalmsgs * sizeof(SharedInvalidationMessage));
 4859 simon                    1127 CBC          22 :         pfree(invalmsgs);
                               1128                 :     }
 6505 tgl                      1129 GIC         356 : }
                               1130                 : 
                               1131                 : /*
                               1132                 :  * Finish preparing state data and writing it to WAL.
 6505 tgl                      1133 ECB             :  */
                               1134                 : void
 6505 tgl                      1135 GIC         354 : EndPrepare(GlobalTransaction gxact)
                               1136                 : {
                               1137                 :     TwoPhaseFileHeader *hdr;
                               1138                 :     StateFileChunk *record;
                               1139                 :     bool        replorigin;
 6505 tgl                      1140 ECB             : 
                               1141                 :     /* Add the end sentinel to the list of 2PC records */
 6505 tgl                      1142 GIC         354 :     RegisterTwoPhaseRecord(TWOPHASE_RM_END_ID, 0,
                               1143                 :                            NULL, 0);
 6505 tgl                      1144 ECB             : 
                               1145                 :     /* Go back and fill in total_len in the file header record */
 6505 tgl                      1146 CBC         354 :     hdr = (TwoPhaseFileHeader *) records.head->data;
 6505 tgl                      1147 GIC         354 :     Assert(hdr->magic == TWOPHASE_MAGIC);
 2917 heikki.linnakangas       1148 CBC         354 :     hdr->total_len = records.total_len + sizeof(pg_crc32c);
 6505 tgl                      1149 ECB             : 
 1838 simon                    1150 GIC         377 :     replorigin = (replorigin_session_origin != InvalidRepOriginId &&
 1838 simon                    1151 CBC          23 :                   replorigin_session_origin != DoNotReplicateId);
                               1152                 : 
                               1153             354 :     if (replorigin)
 1838 simon                    1154 ECB             :     {
 1838 simon                    1155 GIC          23 :         hdr->origin_lsn = replorigin_session_origin_lsn;
                               1156              23 :         hdr->origin_timestamp = replorigin_session_origin_timestamp;
                               1157                 :     }
                               1158                 : 
                               1159                 :     /*
                               1160                 :      * If the data size exceeds MaxAllocSize, we won't be able to read it in
                               1161                 :      * ReadTwoPhaseFile. Check for that now, rather than fail in the case
 2636 simon                    1162 ECB             :      * where we write data to file and then re-read at commit time.
 5438 heikki.linnakangas       1163 EUB             :      */
 5438 heikki.linnakangas       1164 GIC         354 :     if (hdr->total_len > MaxAllocSize)
 5438 heikki.linnakangas       1165 UIC           0 :         ereport(ERROR,
                               1166                 :                 (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
                               1167                 :                  errmsg("two-phase state file maximum length exceeded")));
                               1168                 : 
                               1169                 :     /*
                               1170                 :      * Now writing 2PC state data to WAL. We let the WAL's CRC protection
                               1171                 :      * cover us, so no need to calculate a separate CRC.
                               1172                 :      *
                               1173                 :      * We have to set DELAY_CHKPT_START here, too; otherwise a checkpoint
                               1174                 :      * starting immediately after the WAL record is inserted could complete
                               1175                 :      * without fsync'ing our state file.  (This is essentially the same kind
                               1176                 :      * of race condition as the COMMIT-to-clog-write case that
                               1177                 :      * RecordTransactionCommit uses DELAY_CHKPT_START for; see notes there.)
                               1178                 :      *
                               1179                 :      * We save the PREPARE record's location in the gxact for later use by
 6503 tgl                      1180 ECB             :      * CheckPointTwoPhase.
                               1181                 :      */
 3062 heikki.linnakangas       1182 CBC         354 :     XLogEnsureRecordSpace(0, records.num_chunks);
                               1183                 : 
 6505 tgl                      1184             354 :     START_CRIT_SECTION();
 6505 tgl                      1185 ECB             : 
  366 rhaas                    1186 GIC         354 :     Assert((MyProc->delayChkptFlags & DELAY_CHKPT_START) == 0);
  366 rhaas                    1187 CBC         354 :     MyProc->delayChkptFlags |= DELAY_CHKPT_START;
 6505 tgl                      1188 ECB             : 
 3062 heikki.linnakangas       1189 CBC         354 :     XLogBeginInsert();
 3062 heikki.linnakangas       1190 GIC         748 :     for (record = records.head; record != NULL; record = record->next)
 3062 heikki.linnakangas       1191 CBC         394 :         XLogRegisterData(record->data, record->len);
                               1192                 : 
 1838 simon                    1193             354 :     XLogSetRecordFlags(XLOG_INCLUDE_ORIGIN);
                               1194                 : 
 2636                          1195             354 :     gxact->prepare_end_lsn = XLogInsert(RM_XACT_ID, XLOG_XACT_PREPARE);
                               1196                 : 
 1838 simon                    1197 GIC         354 :     if (replorigin)
 1818 heikki.linnakangas       1198 ECB             :     {
                               1199                 :         /* Move LSNs forward for this replication origin */
 1838 simon                    1200 GIC          23 :         replorigin_session_advance(replorigin_session_origin_lsn,
                               1201                 :                                    gxact->prepare_end_lsn);
 1818 heikki.linnakangas       1202 ECB             :     }
                               1203                 : 
 2636 simon                    1204 GIC         354 :     XLogFlush(gxact->prepare_end_lsn);
                               1205                 : 
                               1206                 :     /* If we crash now, we have prepared: WAL replay will fix things */
 6505 tgl                      1207 ECB             : 
                               1208                 :     /* Store record's start location to read that later on Commit */
 2636 simon                    1209 GIC         354 :     gxact->prepare_start_lsn = ProcLastRecPtr;
                               1210                 : 
                               1211                 :     /*
                               1212                 :      * Mark the prepared transaction as valid.  As soon as xact.c marks MyProc
                               1213                 :      * as not running our XID (which it will do immediately after this
                               1214                 :      * function returns), others can commit/rollback the xact.
                               1215                 :      *
                               1216                 :      * NB: a side effect of this is to make a dummy ProcArray entry for the
                               1217                 :      * prepared XID.  This must happen before we clear the XID from MyProc /
                               1218                 :      * ProcGlobal->xids[], else there is a window where the XID is not running
                               1219                 :      * according to TransactionIdIsInProgress, and onlookers would be entitled
                               1220                 :      * to assume the xact crashed.  Instead we have a window where the same
  968 andres                   1221 ECB             :      * XID appears twice in ProcArray, which is OK.
                               1222                 :      */
 2125 alvherre                 1223 GIC         354 :     MarkAsPrepared(gxact, false);
                               1224                 : 
                               1225                 :     /*
                               1226                 :      * Now we can mark ourselves as out of the commit critical section: a
                               1227                 :      * checkpoint starting after this will certainly see the gxact as a
 5850 tgl                      1228 ECB             :      * candidate for fsyncing.
                               1229                 :      */
  366 rhaas                    1230 GIC         354 :     MyProc->delayChkptFlags &= ~DELAY_CHKPT_START;
                               1231                 : 
                               1232                 :     /*
                               1233                 :      * Remember that we have this GlobalTransaction entry locked for us.  If
                               1234                 :      * we crash after this point, it's too late to abort, but we must unlock
 3251 heikki.linnakangas       1235 ECB             :      * it so that the prepared transaction can be committed or rolled back.
                               1236                 :      */
 3251 heikki.linnakangas       1237 CBC         354 :     MyLockedGxact = gxact;
                               1238                 : 
 6505 tgl                      1239 GIC         354 :     END_CRIT_SECTION();
                               1240                 : 
                               1241                 :     /*
                               1242                 :      * Wait for synchronous replication, if required.
                               1243                 :      *
                               1244                 :      * Note that at this stage we have marked the prepare, but still show as
 4417 simon                    1245 ECB             :      * running in the procarray (twice!) and continue to hold locks.
                               1246                 :      */
 2567 rhaas                    1247 CBC         354 :     SyncRepWaitForLSN(gxact->prepare_end_lsn, false);
 4417 simon                    1248 ECB             : 
 6505 tgl                      1249 CBC         354 :     records.tail = records.head = NULL;
 3062 heikki.linnakangas       1250 GIC         354 :     records.num_chunks = 0;
 6505 tgl                      1251             354 : }
                               1252                 : 
                               1253                 : /*
                               1254                 :  * Register a 2PC record to be written to state file.
 6505 tgl                      1255 ECB             :  */
                               1256                 : void
 6505 tgl                      1257 GIC        1660 : RegisterTwoPhaseRecord(TwoPhaseRmgrId rmid, uint16 info,
                               1258                 :                        const void *data, uint32 len)
                               1259                 : {
 6505 tgl                      1260 ECB             :     TwoPhaseRecordOnDisk record;
                               1261                 : 
 6505 tgl                      1262 CBC        1660 :     record.rmid = rmid;
                               1263            1660 :     record.info = info;
                               1264            1660 :     record.len = len;
                               1265            1660 :     save_state_data(&record, sizeof(TwoPhaseRecordOnDisk));
                               1266            1660 :     if (len > 0)
 6505 tgl                      1267 GIC        1306 :         save_state_data(data, len);
                               1268            1660 : }
                               1269                 : 
                               1270                 : 
                               1271                 : /*
                               1272                 :  * Read and validate the state file for xid.
                               1273                 :  *
                               1274                 :  * If it looks OK (has a valid magic number and CRC), return the palloc'd
                               1275                 :  * contents of the file, issuing an error when finding corrupted data.  If
                               1276                 :  * missing_ok is true, which indicates that missing files can be safely
                               1277                 :  * ignored, then return NULL.  This state can be reached when doing recovery.
 6505 tgl                      1278 ECB             :  */
                               1279                 : static char *
 1675 michael                  1280 GIC          78 : ReadTwoPhaseFile(TransactionId xid, bool missing_ok)
                               1281                 : {
                               1282                 :     char        path[MAXPGPATH];
                               1283                 :     char       *buf;
                               1284                 :     TwoPhaseFileHeader *hdr;
                               1285                 :     int         fd;
                               1286                 :     struct stat stat;
                               1287                 :     uint32      crc_offset;
                               1288                 :     pg_crc32c   calc_crc,
                               1289                 :                 file_crc;
 1726 michael                  1290 ECB             :     int         r;
                               1291                 : 
 6505 tgl                      1292 CBC          78 :     TwoPhaseFilePath(path, xid);
 6505 tgl                      1293 ECB             : 
 2024 peter_e                  1294 GIC          78 :     fd = OpenTransientFile(path, O_RDONLY | PG_BINARY);
 6505 tgl                      1295 CBC          78 :     if (fd < 0)
 6505 tgl                      1296 ECB             :     {
 1675 michael                  1297 GIC          17 :         if (missing_ok && errno == ENOENT)
 1675 michael                  1298 GBC          17 :             return NULL;
                               1299                 : 
 1675 michael                  1300 UIC           0 :         ereport(ERROR,
                               1301                 :                 (errcode_for_file_access(),
                               1302                 :                  errmsg("could not open file \"%s\": %m", path)));
                               1303                 :     }
                               1304                 : 
                               1305                 :     /*
                               1306                 :      * Check file length.  We can determine a lower bound pretty easily. We
                               1307                 :      * set an upper bound to avoid palloc() failure on a corrupt file, though
                               1308                 :      * we can't guarantee that we won't get an out of memory error anyway,
 5438 heikki.linnakangas       1309 ECB             :      * even on a valid file.
 6505 tgl                      1310 EUB             :      */
 6505 tgl                      1311 GIC          61 :     if (fstat(fd, &stat))
 1675 michael                  1312 UIC           0 :         ereport(ERROR,
                               1313                 :                 (errcode_for_file_access(),
 1675 michael                  1314 ECB             :                  errmsg("could not stat file \"%s\": %m", path)));
                               1315                 : 
 6505 tgl                      1316 CBC          61 :     if (stat.st_size < (MAXALIGN(sizeof(TwoPhaseFileHeader)) +
 6505 tgl                      1317 ECB             :                         MAXALIGN(sizeof(TwoPhaseRecordOnDisk)) +
 2917 heikki.linnakangas       1318 GBC          61 :                         sizeof(pg_crc32c)) ||
 5438 heikki.linnakangas       1319 GIC          61 :         stat.st_size > MaxAllocSize)
 1675 michael                  1320 UIC           0 :         ereport(ERROR,
                               1321                 :                 (errcode(ERRCODE_DATA_CORRUPTED),
                               1322                 :                  errmsg_plural("incorrect size of file \"%s\": %lld byte",
                               1323                 :                                "incorrect size of file \"%s\": %lld bytes",
                               1324                 :                                (long long int) stat.st_size, path,
  927 peter                    1325 ECB             :                                (long long int) stat.st_size)));
 6505 tgl                      1326                 : 
 2917 heikki.linnakangas       1327 GBC          61 :     crc_offset = stat.st_size - sizeof(pg_crc32c);
 6505 tgl                      1328 GIC          61 :     if (crc_offset != MAXALIGN(crc_offset))
 1675 michael                  1329 UIC           0 :         ereport(ERROR,
                               1330                 :                 (errcode(ERRCODE_DATA_CORRUPTED),
                               1331                 :                  errmsg("incorrect alignment of CRC offset for file \"%s\"",
                               1332                 :                         path)));
                               1333                 : 
                               1334                 :     /*
 6505 tgl                      1335 ECB             :      * OK, slurp in the file.
                               1336                 :      */
 6505 tgl                      1337 CBC          61 :     buf = (char *) palloc(stat.st_size);
 6505 tgl                      1338 ECB             : 
 2213 rhaas                    1339 CBC          61 :     pgstat_report_wait_start(WAIT_EVENT_TWOPHASE_FILE_READ);
 1726 michael                  1340 GIC          61 :     r = read(fd, buf, stat.st_size);
 1726 michael                  1341 GBC          61 :     if (r != stat.st_size)
 6505 tgl                      1342 EUB             :     {
 1675 michael                  1343 UIC           0 :         if (r < 0)
                               1344               0 :             ereport(ERROR,
                               1345                 :                     (errcode_for_file_access(),
 1675 michael                  1346 EUB             :                      errmsg("could not read file \"%s\": %m", path)));
                               1347                 :         else
 1675 michael                  1348 UIC           0 :             ereport(ERROR,
                               1349                 :                     (errmsg("could not read file \"%s\": read %d of %lld",
                               1350                 :                             path, r, (long long int) stat.st_size)));
 6505 tgl                      1351 ECB             :     }
                               1352                 : 
 2213 rhaas                    1353 CBC          61 :     pgstat_report_wait_end();
 1492 michael                  1354 EUB             : 
 1373 peter                    1355 GIC          61 :     if (CloseTransientFile(fd) != 0)
 1492 michael                  1356 UIC           0 :         ereport(ERROR,
                               1357                 :                 (errcode_for_file_access(),
 1492 michael                  1358 ECB             :                  errmsg("could not close file \"%s\": %m", path)));
 6505 tgl                      1359                 : 
 6505 tgl                      1360 GBC          61 :     hdr = (TwoPhaseFileHeader *) buf;
 1675 michael                  1361 GIC          61 :     if (hdr->magic != TWOPHASE_MAGIC)
 1675 michael                  1362 UIC           0 :         ereport(ERROR,
                               1363                 :                 (errcode(ERRCODE_DATA_CORRUPTED),
                               1364                 :                  errmsg("invalid magic number stored in file \"%s\"",
 1675 michael                  1365 ECB             :                         path)));
 1675 michael                  1366 EUB             : 
 1675 michael                  1367 GIC          61 :     if (hdr->total_len != stat.st_size)
 1675 michael                  1368 UIC           0 :         ereport(ERROR,
                               1369                 :                 (errcode(ERRCODE_DATA_CORRUPTED),
                               1370                 :                  errmsg("invalid size stored in file \"%s\"",
 1675 michael                  1371 ECB             :                         path)));
 6505 tgl                      1372                 : 
 3078 heikki.linnakangas       1373 CBC          61 :     INIT_CRC32C(calc_crc);
 3078 heikki.linnakangas       1374 GIC          61 :     COMP_CRC32C(calc_crc, buf, crc_offset);
 3078 heikki.linnakangas       1375 CBC          61 :     FIN_CRC32C(calc_crc);
                               1376                 : 
 2917                          1377              61 :     file_crc = *((pg_crc32c *) (buf + crc_offset));
 6505 tgl                      1378 EUB             : 
 3078 heikki.linnakangas       1379 GIC          61 :     if (!EQ_CRC32C(calc_crc, file_crc))
 1675 michael                  1380 UIC           0 :         ereport(ERROR,
                               1381                 :                 (errcode(ERRCODE_DATA_CORRUPTED),
                               1382                 :                  errmsg("calculated CRC checksum does not match value stored in file \"%s\"",
 1675 michael                  1383 ECB             :                         path)));
                               1384                 : 
 6505 tgl                      1385 GIC          61 :     return buf;
                               1386                 : }
                               1387                 : 
                               1388                 : 
                               1389                 : /*
                               1390                 :  * Reads 2PC data from xlog. During checkpoint this data will be moved to
                               1391                 :  * twophase files and ReadTwoPhaseFile should be used instead.
                               1392                 :  *
                               1393                 :  * Note clearly that this function can access WAL during normal operation,
                               1394                 :  * similarly to the way WALSender or Logical Decoding would do.
 2636 simon                    1395 ECB             :  */
                               1396                 : static void
 2636 simon                    1397 GIC         423 : XlogReadTwoPhaseData(XLogRecPtr lsn, char **buf, int *len)
                               1398                 : {
                               1399                 :     XLogRecord *record;
                               1400                 :     XLogReaderState *xlogreader;
 2636 simon                    1401 ECB             :     char       *errormsg;
                               1402                 : 
  699 tmunro                   1403 GIC         423 :     xlogreader = XLogReaderAllocate(wal_segment_size, NULL,
                               1404             423 :                                     XL_ROUTINE(.page_read = &read_local_xlog_page,
                               1405                 :                                                .segment_open = &wal_segment_open,
  699 tmunro                   1406 ECB             :                                                .segment_close = &wal_segment_close),
  699 tmunro                   1407 EUB             :                                     NULL);
 2636 simon                    1408 GIC         423 :     if (!xlogreader)
 2636 simon                    1409 UIC           0 :         ereport(ERROR,
                               1410                 :                 (errcode(ERRCODE_OUT_OF_MEMORY),
                               1411                 :                  errmsg("out of memory"),
 2118 tgl                      1412 ECB             :                  errdetail("Failed while allocating a WAL reading processor.")));
 2636 simon                    1413                 : 
 1169 heikki.linnakangas       1414 GIC         423 :     XLogBeginRead(xlogreader, lsn);
  699 tmunro                   1415 CBC         423 :     record = XLogReadRecord(xlogreader, &errormsg);
                               1416                 : 
 2636 simon                    1417 GBC         423 :     if (record == NULL)
  514 noah                     1418 EUB             :     {
  514 noah                     1419 UIC           0 :         if (errormsg)
                               1420               0 :             ereport(ERROR,
                               1421                 :                     (errcode_for_file_access(),
                               1422                 :                      errmsg("could not read two-phase state from WAL at %X/%X: %s",
  514 noah                     1423 EUB             :                             LSN_FORMAT_ARGS(lsn), errormsg)));
                               1424                 :         else
  514 noah                     1425 UIC           0 :             ereport(ERROR,
                               1426                 :                     (errcode_for_file_access(),
                               1427                 :                      errmsg("could not read two-phase state from WAL at %X/%X",
                               1428                 :                             LSN_FORMAT_ARGS(lsn))));
  514 noah                     1429 ECB             :     }
 2636 simon                    1430                 : 
 2636 simon                    1431 GBC         423 :     if (XLogRecGetRmid(xlogreader) != RM_XACT_ID ||
 2636 simon                    1432 GIC         423 :         (XLogRecGetInfo(xlogreader) & XLOG_XACT_OPMASK) != XLOG_XACT_PREPARE)
 2636 simon                    1433 UIC           0 :         ereport(ERROR,
                               1434                 :                 (errcode_for_file_access(),
                               1435                 :                  errmsg("expected two-phase state data is not present in WAL at %X/%X",
  775 peter                    1436 ECB             :                         LSN_FORMAT_ARGS(lsn))));
 2636 simon                    1437                 : 
 2636 simon                    1438 GIC         423 :     if (len != NULL)
 2636 simon                    1439 CBC          21 :         *len = XLogRecGetDataLen(xlogreader);
 2636 simon                    1440 ECB             : 
 2495 rhaas                    1441 GIC         423 :     *buf = palloc(sizeof(char) * XLogRecGetDataLen(xlogreader));
 2636 simon                    1442 CBC         423 :     memcpy(*buf, XLogRecGetData(xlogreader), sizeof(char) * XLogRecGetDataLen(xlogreader));
 2636 simon                    1443 ECB             : 
 2636 simon                    1444 GIC         423 :     XLogReaderFree(xlogreader);
                               1445             423 : }
                               1446                 : 
                               1447                 : 
                               1448                 : /*
                               1449                 :  * Confirms an xid is prepared, during recovery
 4859 simon                    1450 ECB             :  */
                               1451                 : bool
 4859 simon                    1452 GIC          17 : StandbyTransactionIdIsPrepared(TransactionId xid)
                               1453                 : {
                               1454                 :     char       *buf;
                               1455                 :     TwoPhaseFileHeader *hdr;
 4859 simon                    1456 ECB             :     bool        result;
                               1457                 : 
 4859 simon                    1458 CBC          17 :     Assert(TransactionIdIsValid(xid));
 4859 simon                    1459 EUB             : 
 4729 tgl                      1460 GIC          17 :     if (max_prepared_xacts <= 0)
 4660 bruce                    1461 UIC           0 :         return false;           /* nothing to do */
 4729 tgl                      1462 ECB             : 
 4859 simon                    1463                 :     /* Read and validate file */
 1675 michael                  1464 CBC          17 :     buf = ReadTwoPhaseFile(xid, true);
 4859 simon                    1465 GIC          17 :     if (buf == NULL)
                               1466              17 :         return false;
 4859 simon                    1467 EUB             : 
                               1468                 :     /* Check header also */
 4859 simon                    1469 UBC           0 :     hdr = (TwoPhaseFileHeader *) buf;
 4859 simon                    1470 UIC           0 :     result = TransactionIdEquals(hdr->xid, xid);
 4859 simon                    1471 UBC           0 :     pfree(buf);
                               1472                 : 
 4859 simon                    1473 UIC           0 :     return result;
                               1474                 : }
                               1475                 : 
                               1476                 : /*
                               1477                 :  * FinishPreparedTransaction: execute COMMIT PREPARED or ROLLBACK PREPARED
 6505 tgl                      1478 ECB             :  */
                               1479                 : void
 6504 tgl                      1480 GIC         369 : FinishPreparedTransaction(const char *gid, bool isCommit)
                               1481                 : {
                               1482                 :     GlobalTransaction gxact;
                               1483                 :     PGPROC     *proc;
                               1484                 :     TransactionId xid;
                               1485                 :     char       *buf;
                               1486                 :     char       *bufptr;
                               1487                 :     TwoPhaseFileHeader *hdr;
                               1488                 :     TransactionId latestXid;
                               1489                 :     TransactionId *children;
                               1490                 :     RelFileLocator *commitrels;
                               1491                 :     RelFileLocator *abortrels;
                               1492                 :     RelFileLocator *delrels;
                               1493                 :     int         ndelrels;
                               1494                 :     xl_xact_stats_item *commitstats;
                               1495                 :     xl_xact_stats_item *abortstats;
                               1496                 :     SharedInvalidationMessage *invalmsgs;
                               1497                 : 
                               1498                 :     /*
                               1499                 :      * Validate the GID, and lock the GXACT to ensure that two backends do not
 6385 bruce                    1500 ECB             :      * try to commit the same GID at once.
 6505 tgl                      1501                 :      */
 6505 tgl                      1502 CBC         369 :     gxact = LockGXact(gid, GetUserId());
 4153 rhaas                    1503 GIC         363 :     proc = &ProcGlobal->allProcs[gxact->pgprocno];
  968 andres                   1504             363 :     xid = gxact->xid;
                               1505                 : 
                               1506                 :     /*
                               1507                 :      * Read and validate 2PC state data. State data will typically be stored
                               1508                 :      * in WAL files if the LSN is after the last checkpoint record, or moved
 2495 rhaas                    1509 ECB             :      * to disk if for some reason they have lived for a long time.
 6505 tgl                      1510                 :      */
 2636 simon                    1511 GIC         363 :     if (gxact->ondisk)
 1675 michael                  1512 CBC          26 :         buf = ReadTwoPhaseFile(xid, false);
                               1513                 :     else
 2636 simon                    1514 GIC         337 :         XlogReadTwoPhaseData(gxact->prepare_start_lsn, &buf, NULL);
                               1515                 : 
                               1516                 : 
                               1517                 :     /*
 6505 tgl                      1518 ECB             :      * Disassemble the header area
                               1519                 :      */
 6505 tgl                      1520 CBC         363 :     hdr = (TwoPhaseFileHeader *) buf;
                               1521             363 :     Assert(TransactionIdEquals(hdr->xid, xid));
                               1522             363 :     bufptr = buf + MAXALIGN(sizeof(TwoPhaseFileHeader));
 2586 simon                    1523             363 :     bufptr += MAXALIGN(hdr->gidlen);
 6505 tgl                      1524             363 :     children = (TransactionId *) bufptr;
                               1525             363 :     bufptr += MAXALIGN(hdr->nsubxacts * sizeof(TransactionId));
  277 rhaas                    1526 GNC         363 :     commitrels = (RelFileLocator *) bufptr;
                               1527             363 :     bufptr += MAXALIGN(hdr->ncommitrels * sizeof(RelFileLocator));
                               1528             363 :     abortrels = (RelFileLocator *) bufptr;
                               1529             363 :     bufptr += MAXALIGN(hdr->nabortrels * sizeof(RelFileLocator));
  332 tgl                      1530 CBC         363 :     commitstats = (xl_xact_stats_item *) bufptr;
  368 andres                   1531             363 :     bufptr += MAXALIGN(hdr->ncommitstats * sizeof(xl_xact_stats_item));
  332 tgl                      1532             363 :     abortstats = (xl_xact_stats_item *) bufptr;
  368 andres                   1533             363 :     bufptr += MAXALIGN(hdr->nabortstats * sizeof(xl_xact_stats_item));
 4859 simon                    1534 GIC         363 :     invalmsgs = (SharedInvalidationMessage *) bufptr;
                               1535             363 :     bufptr += MAXALIGN(hdr->ninvalmsgs * sizeof(SharedInvalidationMessage));
 6505 tgl                      1536 ECB             : 
                               1537                 :     /* compute latestXid among all children */
 5692 tgl                      1538 GIC         363 :     latestXid = TransactionIdLatest(xid, hdr->nsubxacts, children);
 5692 tgl                      1539 ECB             : 
                               1540                 :     /* Prevent cancel/die interrupt while cleaning up */
 1802 teodor                   1541 GIC         363 :     HOLD_INTERRUPTS();
                               1542                 : 
                               1543                 :     /*
                               1544                 :      * The order of operations here is critical: make the XLOG entry for
                               1545                 :      * commit or abort, then mark the transaction committed or aborted in
                               1546                 :      * pg_xact, then remove its PGPROC from the global ProcArray (which means
                               1547                 :      * TransactionIdIsInProgress will stop saying the prepared xact is in
                               1548                 :      * progress), then run the post-commit or post-abort callbacks. The
 6385 bruce                    1549 ECB             :      * callbacks will release the locks the transaction held.
 6505 tgl                      1550                 :      */
 6505 tgl                      1551 GIC         363 :     if (isCommit)
                               1552             325 :         RecordTransactionCommitPrepared(xid,
                               1553                 :                                         hdr->nsubxacts, children,
                               1554                 :                                         hdr->ncommitrels, commitrels,
                               1555                 :                                         hdr->ncommitstats,
  368 andres                   1556 ECB             :                                         commitstats,
                               1557                 :                                         hdr->ninvalmsgs, invalmsgs,
 1838 simon                    1558 CBC         325 :                                         hdr->initfileinval, gid);
                               1559                 :     else
 6505 tgl                      1560 GIC          38 :         RecordTransactionAbortPrepared(xid,
                               1561                 :                                        hdr->nsubxacts, children,
                               1562                 :                                        hdr->nabortrels, abortrels,
                               1563                 :                                        hdr->nabortstats,
                               1564                 :                                        abortstats,
 1838 simon                    1565 ECB             :                                        gid);
                               1566                 : 
 4153 rhaas                    1567 GIC         363 :     ProcArrayRemove(proc, latestXid);
                               1568                 : 
                               1569                 :     /*
                               1570                 :      * In case we fail while running the callbacks, mark the gxact invalid so
                               1571                 :      * no one else will try to commit/rollback, and so it will be recycled if
                               1572                 :      * we fail after this point.  It is still locked by our backend so it
                               1573                 :      * won't go away yet.
                               1574                 :      *
 6503 tgl                      1575 ECB             :      * (We assume it's safe to do this without taking TwoPhaseStateLock.)
                               1576                 :      */
 6505 tgl                      1577 GIC         363 :     gxact->valid = false;
                               1578                 : 
                               1579                 :     /*
                               1580                 :      * We have to remove any files that were supposed to be dropped. For
                               1581                 :      * consistency with the regular xact.c code paths, must do this before
                               1582                 :      * releasing locks, so do it before running the callbacks.
                               1583                 :      *
 6505 tgl                      1584 ECB             :      * NB: this code knows that we couldn't be dropping any temp rels ...
                               1585                 :      */
 6505 tgl                      1586 CBC         363 :     if (isCommit)
 6505 tgl                      1587 ECB             :     {
 5254 heikki.linnakangas       1588 GIC         325 :         delrels = commitrels;
                               1589             325 :         ndelrels = hdr->ncommitrels;
                               1590                 :     }
 6505 tgl                      1591 ECB             :     else
                               1592                 :     {
 5254 heikki.linnakangas       1593 GIC          38 :         delrels = abortrels;
                               1594              38 :         ndelrels = hdr->nabortrels;
                               1595                 :     }
 5254 heikki.linnakangas       1596 ECB             : 
                               1597                 :     /* Make sure files supposed to be dropped are dropped */
 1739 fujii                    1598 CBC         363 :     DropRelationFiles(delrels, ndelrels, false);
 6505 tgl                      1599 ECB             : 
  368 andres                   1600 GIC         363 :     if (isCommit)
  368 andres                   1601 CBC         325 :         pgstat_execute_transactional_drops(hdr->ncommitstats, commitstats, false);
                               1602                 :     else
  368 andres                   1603 GIC          38 :         pgstat_execute_transactional_drops(hdr->nabortstats, abortstats, false);
                               1604                 : 
                               1605                 :     /*
                               1606                 :      * Handle cache invalidation messages.
                               1607                 :      *
                               1608                 :      * Relcache init file invalidation requires processing both before and
                               1609                 :      * after we send the SI messages, only when committing.  See
  605 michael                  1610 ECB             :      * AtEOXact_Inval().
                               1611                 :      */
  605 michael                  1612 CBC         363 :     if (isCommit)
  605 michael                  1613 EUB             :     {
  605 michael                  1614 CBC         325 :         if (hdr->initfileinval)
  605 michael                  1615 LBC           0 :             RelationCacheInitFilePreInvalidate();
  605 michael                  1616 GBC         325 :         SendSharedInvalidMessages(invalmsgs, hdr->ninvalmsgs);
  605 michael                  1617 GIC         325 :         if (hdr->initfileinval)
  605 michael                  1618 UIC           0 :             RelationCacheInitFilePostInvalidate();
                               1619                 :     }
                               1620                 : 
                               1621                 :     /*
                               1622                 :      * Acquire the two-phase lock.  We want to work on the two-phase callbacks
                               1623                 :      * while holding it to avoid potential conflicts with other transactions
                               1624                 :      * attempting to use the same GID, so the lock is released once the shared
 1504 michael                  1625 ECB             :      * memory state is cleared.
                               1626                 :      */
 1504 michael                  1627 GIC         363 :     LWLockAcquire(TwoPhaseStateLock, LW_EXCLUSIVE);
 1504 michael                  1628 ECB             : 
 6504 tgl                      1629                 :     /* And now do the callbacks */
 6504 tgl                      1630 GIC         363 :     if (isCommit)
 6504 tgl                      1631 CBC         325 :         ProcessRecords(bufptr, xid, twophase_postcommit_callbacks);
                               1632                 :     else
                               1633              38 :         ProcessRecords(bufptr, xid, twophase_postabort_callbacks);
                               1634                 : 
 4444 heikki.linnakangas       1635 GIC         363 :     PredicateLockTwoPhaseFinish(xid, isCommit);
 4444 heikki.linnakangas       1636 ECB             : 
                               1637                 :     /* Clear shared memory state */
 1504 michael                  1638 GIC         363 :     RemoveGXact(gxact);
                               1639                 : 
                               1640                 :     /*
                               1641                 :      * Release the lock as all callbacks are called and shared memory cleanup
 1504 michael                  1642 ECB             :      * is done.
                               1643                 :      */
 1504 michael                  1644 GIC         363 :     LWLockRelease(TwoPhaseStateLock);
 1504 michael                  1645 ECB             : 
                               1646                 :     /* Count the prepared xact as committed or aborted */
 1460 akapila                  1647 GIC         363 :     AtEOXact_PgStat(isCommit, false);
                               1648                 : 
                               1649                 :     /*
 2636 simon                    1650 ECB             :      * And now we can clean up any files we may have left.
 6505 tgl                      1651                 :      */
 2636 simon                    1652 GIC         363 :     if (gxact->ondisk)
 2636 simon                    1653 CBC          26 :         RemoveTwoPhaseFile(xid, true);
                               1654                 : 
 3251 heikki.linnakangas       1655             363 :     MyLockedGxact = NULL;
                               1656                 : 
 1802 teodor                   1657             363 :     RESUME_INTERRUPTS();
 1802 teodor                   1658 ECB             : 
 6505 tgl                      1659 GIC         363 :     pfree(buf);
                               1660             363 : }
                               1661                 : 
                               1662                 : /*
                               1663                 :  * Scan 2PC state data in memory and call the indicated callbacks for each 2PC record.
 6505 tgl                      1664 ECB             :  */
                               1665                 : static void
 6505 tgl                      1666 GIC         393 : ProcessRecords(char *bufptr, TransactionId xid,
                               1667                 :                const TwoPhaseCallback callbacks[])
 6505 tgl                      1668 ECB             : {
                               1669                 :     for (;;)
 6505 tgl                      1670 GIC        1495 :     {
 6505 tgl                      1671 CBC        1888 :         TwoPhaseRecordOnDisk *record = (TwoPhaseRecordOnDisk *) bufptr;
 6505 tgl                      1672 ECB             : 
 6505 tgl                      1673 CBC        1888 :         Assert(record->rmid <= TWOPHASE_RM_MAX_ID);
 6505 tgl                      1674 GIC        1888 :         if (record->rmid == TWOPHASE_RM_END_ID)
 6505 tgl                      1675 CBC         393 :             break;
                               1676                 : 
                               1677            1495 :         bufptr += MAXALIGN(sizeof(TwoPhaseRecordOnDisk));
 6505 tgl                      1678 ECB             : 
 6505 tgl                      1679 GIC        1495 :         if (callbacks[record->rmid] != NULL)
 6385 bruce                    1680            1422 :             callbacks[record->rmid] (xid, record->info,
 6385 bruce                    1681 ECB             :                                      (void *) bufptr, record->len);
                               1682                 : 
 6505 tgl                      1683 CBC        1495 :         bufptr += MAXALIGN(record->len);
                               1684                 :     }
 6505 tgl                      1685 GIC         393 : }
                               1686                 : 
                               1687                 : /*
                               1688                 :  * Remove the 2PC file for the specified XID.
                               1689                 :  *
                               1690                 :  * If giveWarning is false, do not complain about file-not-present;
                               1691                 :  * this is an expected case during WAL replay.
 6505 tgl                      1692 ECB             :  */
                               1693                 : static void
 6505 tgl                      1694 GIC          28 : RemoveTwoPhaseFile(TransactionId xid, bool giveWarning)
                               1695                 : {
 6385 bruce                    1696 ECB             :     char        path[MAXPGPATH];
 6505 tgl                      1697                 : 
 6505 tgl                      1698 GBC          28 :     TwoPhaseFilePath(path, xid);
                               1699              28 :     if (unlink(path))
 6505 tgl                      1700 UIC           0 :         if (errno != ENOENT || giveWarning)
                               1701               0 :             ereport(WARNING,
 6505 tgl                      1702 ECB             :                     (errcode_for_file_access(),
                               1703                 :                      errmsg("could not remove file \"%s\": %m", path)));
 6505 tgl                      1704 GIC          28 : }
                               1705                 : 
                               1706                 : /*
                               1707                 :  * Recreates a state file. This is used in WAL replay and during
                               1708                 :  * checkpoint creation.
                               1709                 :  *
                               1710                 :  * Note: content and len don't include CRC.
 6505 tgl                      1711 ECB             :  */
                               1712                 : static void
 6505 tgl                      1713 GIC          21 : RecreateTwoPhaseFile(TransactionId xid, void *content, int len)
                               1714                 : {
                               1715                 :     char        path[MAXPGPATH];
                               1716                 :     pg_crc32c   statefile_crc;
                               1717                 :     int         fd;
 6505 tgl                      1718 ECB             : 
                               1719                 :     /* Recompute CRC */
 3078 heikki.linnakangas       1720 CBC          21 :     INIT_CRC32C(statefile_crc);
 3078 heikki.linnakangas       1721 GIC          21 :     COMP_CRC32C(statefile_crc, content, len);
 3078 heikki.linnakangas       1722 CBC          21 :     FIN_CRC32C(statefile_crc);
                               1723                 : 
 6505 tgl                      1724              21 :     TwoPhaseFilePath(path, xid);
                               1725                 : 
 3785 heikki.linnakangas       1726              21 :     fd = OpenTransientFile(path,
 2024 peter_e                  1727 EUB             :                            O_CREAT | O_TRUNC | O_WRONLY | PG_BINARY);
 6505 tgl                      1728 GIC          21 :     if (fd < 0)
 6505 tgl                      1729 UIC           0 :         ereport(ERROR,
                               1730                 :                 (errcode_for_file_access(),
                               1731                 :                  errmsg("could not recreate file \"%s\": %m", path)));
 6505 tgl                      1732 ECB             : 
                               1733                 :     /* Write content and CRC */
 1708 michael                  1734 CBC          21 :     errno = 0;
 2213 rhaas                    1735 GIC          21 :     pgstat_report_wait_start(WAIT_EVENT_TWOPHASE_FILE_WRITE);
 6505 tgl                      1736              21 :     if (write(fd, content, len) != len)
 6505 tgl                      1737 EUB             :     {
 1749 michael                  1738                 :         /* if write didn't set errno, assume problem is no disk space */
 1453 michael                  1739 UBC           0 :         if (errno == 0)
 1453 michael                  1740 UIC           0 :             errno = ENOSPC;
 6505 tgl                      1741               0 :         ereport(ERROR,
                               1742                 :                 (errcode_for_file_access(),
 1726 michael                  1743 ECB             :                  errmsg("could not write file \"%s\": %m", path)));
                               1744                 :     }
 2917 heikki.linnakangas       1745 GIC          21 :     if (write(fd, &statefile_crc, sizeof(pg_crc32c)) != sizeof(pg_crc32c))
 6505 tgl                      1746 EUB             :     {
 1749 michael                  1747                 :         /* if write didn't set errno, assume problem is no disk space */
 1453 michael                  1748 UBC           0 :         if (errno == 0)
 1453 michael                  1749 UIC           0 :             errno = ENOSPC;
 6505 tgl                      1750               0 :         ereport(ERROR,
                               1751                 :                 (errcode_for_file_access(),
 1726 michael                  1752 ECB             :                  errmsg("could not write file \"%s\": %m", path)));
                               1753                 :     }
 2213 rhaas                    1754 GIC          21 :     pgstat_report_wait_end();
                               1755                 : 
                               1756                 :     /*
                               1757                 :      * We must fsync the file because the end-of-replay checkpoint will not do
 6385 bruce                    1758 ECB             :      * so, there being no GXACT in shared memory yet to tell it to.
 6503 tgl                      1759                 :      */
 2213 rhaas                    1760 GBC          21 :     pgstat_report_wait_start(WAIT_EVENT_TWOPHASE_FILE_SYNC);
 6505 tgl                      1761 GIC          21 :     if (pg_fsync(fd) != 0)
 6505 tgl                      1762 UIC           0 :         ereport(ERROR,
 6505 tgl                      1763 ECB             :                 (errcode_for_file_access(),
                               1764                 :                  errmsg("could not fsync file \"%s\": %m", path)));
 2213 rhaas                    1765 CBC          21 :     pgstat_report_wait_end();
 6505 tgl                      1766 EUB             : 
 3785 heikki.linnakangas       1767 GIC          21 :     if (CloseTransientFile(fd) != 0)
 6505 tgl                      1768 UIC           0 :         ereport(ERROR,
 6505 tgl                      1769 ECB             :                 (errcode_for_file_access(),
                               1770                 :                  errmsg("could not close file \"%s\": %m", path)));
 6505 tgl                      1771 GIC          21 : }
                               1772                 : 
                               1773                 : /*
                               1774                 :  * CheckPointTwoPhase -- handle 2PC component of checkpointing.
                               1775                 :  *
                               1776                 :  * We must fsync the state file of any GXACT that is valid or has been
                               1777                 :  * generated during redo and has a PREPARE LSN <= the checkpoint's redo
                               1778                 :  * horizon.  (If the gxact isn't valid yet, has not been generated in
                               1779                 :  * redo, or has a later LSN, this checkpoint is not responsible for
                               1780                 :  * fsyncing it.)
                               1781                 :  *
                               1782                 :  * This is deliberately run as late as possible in the checkpoint sequence,
                               1783                 :  * because GXACTs ordinarily have short lifespans, and so it is quite
                               1784                 :  * possible that GXACTs that were valid at checkpoint start will no longer
                               1785                 :  * exist if we wait a little bit. With typical checkpoint settings this
                               1786                 :  * will be about 3 minutes for an online checkpoint, so as a result we
                               1787                 :  * expect that there will be no GXACTs that need to be copied to disk.
                               1788                 :  *
                               1789                 :  * If a GXACT remains valid across multiple checkpoints, it will already
                               1790                 :  * be on disk so we don't bother to repeat that write.
 6503 tgl                      1791 ECB             :  */
                               1792                 : void
 6503 tgl                      1793 GIC        2363 : CheckPointTwoPhase(XLogRecPtr redo_horizon)
 6503 tgl                      1794 ECB             : {
                               1795                 :     int         i;
 2636 simon                    1796 CBC        2363 :     int         serialized_xacts = 0;
 6503 tgl                      1797 ECB             : 
 6503 tgl                      1798 GIC        2363 :     if (max_prepared_xacts <= 0)
                               1799            2197 :         return;                 /* nothing to do */
                               1800                 : 
                               1801                 :     TRACE_POSTGRESQL_TWOPHASE_CHECKPOINT_START();
                               1802                 : 
                               1803                 :     /*
                               1804                 :      * We are expecting there to be zero GXACTs that need to be copied to
                               1805                 :      * disk, so we perform all I/O while holding TwoPhaseStateLock for
                               1806                 :      * simplicity. This prevents any new xacts from preparing while this
                               1807                 :      * occurs, which shouldn't be a problem since the presence of long-lived
                               1808                 :      * prepared xacts indicates the transaction manager isn't active.
                               1809                 :      *
                               1810                 :      * It's also possible to move I/O out of the lock, but on every error we
                               1811                 :      * should check whether somebody committed our transaction in different
                               1812                 :      * backend. Let's leave this optimization for future, if somebody will
                               1813                 :      * spot that this place cause bottleneck.
                               1814                 :      *
                               1815                 :      * Note that it isn't possible for there to be a GXACT with a
                               1816                 :      * prepare_end_lsn set prior to the last checkpoint yet is marked invalid,
  366 rhaas                    1817 ECB             :      * because of the efforts with delayChkptFlags.
 2636 simon                    1818                 :      */
 6503 tgl                      1819 GIC         166 :     LWLockAcquire(TwoPhaseStateLock, LW_SHARED);
                               1820             193 :     for (i = 0; i < TwoPhaseState->numPrepXacts; i++)
                               1821                 :     {
                               1822                 :         /*
                               1823                 :          * Note that we are using gxact not PGPROC so this works in recovery
 2153 bruce                    1824 ECB             :          * also
                               1825                 :          */
 6385 bruce                    1826 CBC          27 :         GlobalTransaction gxact = TwoPhaseState->prepXacts[i];
 6503 tgl                      1827 ECB             : 
 2196 simon                    1828 CBC          27 :         if ((gxact->valid || gxact->inredo) &&
 2636 simon                    1829 GIC          27 :             !gxact->ondisk &&
                               1830              23 :             gxact->prepare_end_lsn <= redo_horizon)
                               1831                 :         {
                               1832                 :             char       *buf;
 2495 rhaas                    1833 ECB             :             int         len;
 6503 tgl                      1834                 : 
 2636 simon                    1835 CBC          21 :             XlogReadTwoPhaseData(gxact->prepare_start_lsn, &buf, &len);
 2196                          1836              21 :             RecreateTwoPhaseFile(gxact->xid, buf, len);
 2636                          1837              21 :             gxact->ondisk = true;
 2196                          1838              21 :             gxact->prepare_start_lsn = InvalidXLogRecPtr;
                               1839              21 :             gxact->prepare_end_lsn = InvalidXLogRecPtr;
 2636 simon                    1840 GIC          21 :             pfree(buf);
                               1841              21 :             serialized_xacts++;
 6503 tgl                      1842 ECB             :         }
                               1843                 :     }
 2636 simon                    1844 GIC         166 :     LWLockRelease(TwoPhaseStateLock);
                               1845                 : 
                               1846                 :     /*
                               1847                 :      * Flush unconditionally the parent directory to make any information
                               1848                 :      * durable on disk.  Two-phase files could have been removed and those
                               1849                 :      * removals need to be made persistent as well as any files newly created
 2204 teodor                   1850 ECB             :      * previously since the last checkpoint.
                               1851                 :      */
 2204 teodor                   1852 GIC         166 :     fsync_fname(TWOPHASE_DIR, true);
                               1853                 : 
 5364 alvherre                 1854 ECB             :     TRACE_POSTGRESQL_TWOPHASE_CHECKPOINT_DONE();
 2636 simon                    1855                 : 
 2636 simon                    1856 GIC         166 :     if (log_checkpoints && serialized_xacts > 0)
                               1857              17 :         ereport(LOG,
                               1858                 :                 (errmsg_plural("%u two-phase state file was written "
                               1859                 :                                "for a long-running prepared transaction",
                               1860                 :                                "%u two-phase state files were written "
                               1861                 :                                "for long-running prepared transactions",
                               1862                 :                                serialized_xacts,
                               1863                 :                                serialized_xacts)));
                               1864                 : }
                               1865                 : 
                               1866                 : /*
                               1867                 :  * restoreTwoPhaseData
                               1868                 :  *
                               1869                 :  * Scan pg_twophase and fill TwoPhaseState depending on the on-disk data.
                               1870                 :  * This is called once at the beginning of recovery, saving any extra
                               1871                 :  * lookups in the future.  Two-phase files that are newer than the
                               1872                 :  * minimum XID horizon are discarded on the way.
 2196 simon                    1873 ECB             :  */
                               1874                 : void
 2196 simon                    1875 GIC        1176 : restoreTwoPhaseData(void)
                               1876                 : {
                               1877                 :     DIR        *cldir;
 2153 bruce                    1878 ECB             :     struct dirent *clde;
 2196 simon                    1879                 : 
 2125 alvherre                 1880 CBC        1176 :     LWLockAcquire(TwoPhaseStateLock, LW_EXCLUSIVE);
 1952 tgl                      1881 GIC        1176 :     cldir = AllocateDir(TWOPHASE_DIR);
 2196 simon                    1882 CBC        3539 :     while ((clde = ReadDir(cldir, TWOPHASE_DIR)) != NULL)
 2196 simon                    1883 ECB             :     {
 2196 simon                    1884 GIC        2363 :         if (strlen(clde->d_name) == 8 &&
                               1885              11 :             strspn(clde->d_name, "0123456789ABCDEF") == 8)
                               1886                 :         {
                               1887                 :             TransactionId xid;
 2196 simon                    1888 ECB             :             char       *buf;
                               1889                 : 
 2196 simon                    1890 CBC          11 :             xid = (TransactionId) strtoul(clde->d_name, NULL, 16);
                               1891                 : 
                               1892              11 :             buf = ProcessTwoPhaseBuffer(xid, InvalidXLogRecPtr,
 2173 simon                    1893 EUB             :                                         true, false, false);
 2196 simon                    1894 GIC          11 :             if (buf == NULL)
 2196 simon                    1895 LBC           0 :                 continue;
                               1896                 : 
 1838 simon                    1897 GIC          11 :             PrepareRedoAdd(buf, InvalidXLogRecPtr,
                               1898                 :                            InvalidXLogRecPtr, InvalidRepOriginId);
 2196 simon                    1899 ECB             :         }
                               1900                 :     }
 2125 alvherre                 1901 CBC        1176 :     LWLockRelease(TwoPhaseStateLock);
 2196 simon                    1902 GIC        1176 :     FreeDir(cldir);
                               1903            1176 : }
                               1904                 : 
                               1905                 : /*
                               1906                 :  * PrescanPreparedTransactions
                               1907                 :  *
                               1908                 :  * Scan the shared memory entries of TwoPhaseState and determine the range
                               1909                 :  * of valid XIDs present.  This is run during database startup, after we
                               1910                 :  * have completed reading WAL.  ShmemVariableCache->nextXid has been set to
                               1911                 :  * one more than the highest XID for which evidence exists in WAL.
                               1912                 :  *
                               1913                 :  * We throw away any prepared xacts with main XID beyond nextXid --- if any
                               1914                 :  * are present, it suggests that the DBA has done a PITR recovery to an
                               1915                 :  * earlier point in time without cleaning out pg_twophase.  We dare not
                               1916                 :  * try to recover such prepared xacts since they likely depend on database
                               1917                 :  * state that doesn't exist now.
                               1918                 :  *
                               1919                 :  * However, we will advance nextXid beyond any subxact XIDs belonging to
                               1920                 :  * valid prepared xacts.  We need to do this since subxact commit doesn't
                               1921                 :  * write a WAL entry, and so there might be no evidence in WAL of those
                               1922                 :  * subxact XIDs.
                               1923                 :  *
                               1924                 :  * On corrupted two-phase files, fail immediately.  Keeping around broken
                               1925                 :  * entries and let replay continue causes harm on the system, and a new
                               1926                 :  * backup should be rolled in.
                               1927                 :  *
                               1928                 :  * Our other responsibility is to determine and return the oldest valid XID
                               1929                 :  * among the prepared xacts (if none, return ShmemVariableCache->nextXid).
                               1930                 :  * This is needed to synchronize pg_subtrans startup properly.
                               1931                 :  *
                               1932                 :  * If xids_p and nxids_p are not NULL, pointer to a palloc'd array of all
                               1933                 :  * top-level xids is stored in *xids_p. The number of entries in the array
                               1934                 :  * is returned in *nxids_p.
 6505 tgl                      1935 ECB             :  */
                               1936                 : TransactionId
 4859 simon                    1937 CBC        1177 : PrescanPreparedTransactions(TransactionId **xids_p, int *nxids_p)
 6505 tgl                      1938 ECB             : {
  971 andres                   1939 CBC        1177 :     FullTransactionId nextXid = ShmemVariableCache->nextXid;
                               1940            1177 :     TransactionId origNextXid = XidFromFullTransactionId(nextXid);
 6505 tgl                      1941            1177 :     TransactionId result = origNextXid;
 4859 simon                    1942            1177 :     TransactionId *xids = NULL;
 4859 simon                    1943 GIC        1177 :     int         nxids = 0;
                               1944            1177 :     int         allocsize = 0;
 2196 simon                    1945 ECB             :     int         i;
 6505 tgl                      1946                 : 
 2125 alvherre                 1947 GIC        1177 :     LWLockAcquire(TwoPhaseStateLock, LW_EXCLUSIVE);
 2196 simon                    1948            1219 :     for (i = 0; i < TwoPhaseState->numPrepXacts; i++)
                               1949                 :     {
 2196 simon                    1950 ECB             :         TransactionId xid;
                               1951                 :         char       *buf;
 2196 simon                    1952 CBC          42 :         GlobalTransaction gxact = TwoPhaseState->prepXacts[i];
                               1953                 : 
                               1954              42 :         Assert(gxact->inredo);
                               1955                 : 
                               1956              42 :         xid = gxact->xid;
                               1957                 : 
                               1958              42 :         buf = ProcessTwoPhaseBuffer(xid,
                               1959                 :                                     gxact->prepare_start_lsn,
 2153 bruce                    1960              42 :                                     gxact->ondisk, false, true);
 6505 tgl                      1961 EUB             : 
 2196 simon                    1962 GIC          42 :         if (buf == NULL)
 2196 simon                    1963 UIC           0 :             continue;
                               1964                 : 
                               1965                 :         /*
                               1966                 :          * OK, we think this file is valid.  Incorporate xid into the
 2182 simon                    1967 ECB             :          * running-minimum result.
                               1968                 :          */
 2182 simon                    1969 GIC          42 :         if (TransactionIdPrecedes(xid, result))
 2182 simon                    1970 CBC          36 :             result = xid;
                               1971                 : 
 2196                          1972              42 :         if (xids_p)
                               1973                 :         {
                               1974              12 :             if (nxids == allocsize)
                               1975                 :             {
                               1976              10 :                 if (nxids == 0)
 6505 tgl                      1977 ECB             :                 {
 2196 simon                    1978 GIC          10 :                     allocsize = 10;
                               1979              10 :                     xids = palloc(allocsize * sizeof(TransactionId));
                               1980                 :                 }
 2196 simon                    1981 EUB             :                 else
 4859                          1982                 :                 {
 2196 simon                    1983 UIC           0 :                     allocsize = allocsize * 2;
                               1984               0 :                     xids = repalloc(xids, allocsize * sizeof(TransactionId));
 4859 simon                    1985 ECB             :                 }
                               1986                 :             }
 2196 simon                    1987 GIC          12 :             xids[nxids++] = xid;
 6505 tgl                      1988 ECB             :         }
                               1989                 : 
 2196 simon                    1990 CBC          42 :         pfree(buf);
                               1991                 :     }
                               1992            1177 :     LWLockRelease(TwoPhaseStateLock);
                               1993                 : 
 4859                          1994            1177 :     if (xids_p)
 4859 simon                    1995 ECB             :     {
 4859 simon                    1996 GIC          35 :         *xids_p = xids;
                               1997              35 :         *nxids_p = nxids;
 4859 simon                    1998 ECB             :     }
                               1999                 : 
 6505 tgl                      2000 GIC        1177 :     return result;
                               2001                 : }
                               2002                 : 
                               2003                 : /*
                               2004                 :  * StandbyRecoverPreparedTransactions
                               2005                 :  *
                               2006                 :  * Scan the shared memory entries of TwoPhaseState and setup all the required
                               2007                 :  * information to allow standby queries to treat prepared transactions as still
                               2008                 :  * active.
                               2009                 :  *
                               2010                 :  * This is never called at the end of recovery - we use
                               2011                 :  * RecoverPreparedTransactions() at that point.
                               2012                 :  *
                               2013                 :  * The lack of calls to SubTransSetParent() calls here is by design;
                               2014                 :  * those calls are made by RecoverPreparedTransactions() at the end of recovery
                               2015                 :  * for those xacts that need this.
 4744 heikki.linnakangas       2016 ECB             :  */
                               2017                 : void
 2173 simon                    2018 GIC          35 : StandbyRecoverPreparedTransactions(void)
                               2019                 : {
 2196 simon                    2020 ECB             :     int         i;
 4744 heikki.linnakangas       2021                 : 
 2125 alvherre                 2022 GIC          35 :     LWLockAcquire(TwoPhaseStateLock, LW_EXCLUSIVE);
 2196 simon                    2023              47 :     for (i = 0; i < TwoPhaseState->numPrepXacts; i++)
                               2024                 :     {
 2196 simon                    2025 ECB             :         TransactionId xid;
                               2026                 :         char       *buf;
 2196 simon                    2027 CBC          12 :         GlobalTransaction gxact = TwoPhaseState->prepXacts[i];
                               2028                 : 
                               2029              12 :         Assert(gxact->inredo);
                               2030                 : 
                               2031              12 :         xid = gxact->xid;
                               2032                 : 
                               2033              12 :         buf = ProcessTwoPhaseBuffer(xid,
 2153 bruce                    2034 ECB             :                                     gxact->prepare_start_lsn,
 2153 bruce                    2035 CBC          12 :                                     gxact->ondisk, false, false);
 2196 simon                    2036 GIC          12 :         if (buf != NULL)
 2404 simon                    2037 CBC          12 :             pfree(buf);
 4744 heikki.linnakangas       2038 ECB             :     }
 2196 simon                    2039 GIC          35 :     LWLockRelease(TwoPhaseStateLock);
 4744 heikki.linnakangas       2040              35 : }
                               2041                 : 
                               2042                 : /*
                               2043                 :  * RecoverPreparedTransactions
                               2044                 :  *
                               2045                 :  * Scan the shared memory entries of TwoPhaseState and reload the state for
                               2046                 :  * each prepared transaction (reacquire locks, etc).
                               2047                 :  *
                               2048                 :  * This is run at the end of recovery, but before we allow backends to write
                               2049                 :  * WAL.
                               2050                 :  *
                               2051                 :  * At the end of recovery the way we take snapshots will change. We now need
                               2052                 :  * to mark all running transactions with their full SubTransSetParent() info
                               2053                 :  * to allow normal snapshots to work correctly if snapshots overflow.
                               2054                 :  * We do this here because by definition prepared transactions are the only
                               2055                 :  * type of write transaction still running, so this is necessary and
                               2056                 :  * complete.
 6505 tgl                      2057 ECB             :  */
                               2058                 : void
 6505 tgl                      2059 GIC        1142 : RecoverPreparedTransactions(void)
                               2060                 : {
 2196 simon                    2061 ECB             :     int         i;
 6505 tgl                      2062                 : 
 2125 alvherre                 2063 GIC        1142 :     LWLockAcquire(TwoPhaseStateLock, LW_EXCLUSIVE);
 2196 simon                    2064            1172 :     for (i = 0; i < TwoPhaseState->numPrepXacts; i++)
                               2065                 :     {
 2196 simon                    2066 ECB             :         TransactionId xid;
                               2067                 :         char       *buf;
 2196 simon                    2068 GIC          30 :         GlobalTransaction gxact = TwoPhaseState->prepXacts[i];
                               2069                 :         char       *bufptr;
                               2070                 :         TwoPhaseFileHeader *hdr;
                               2071                 :         TransactionId *subxids;
 2196 simon                    2072 ECB             :         const char *gid;
                               2073                 : 
 2196 simon                    2074 GIC          30 :         xid = gxact->xid;
                               2075                 : 
                               2076                 :         /*
                               2077                 :          * Reconstruct subtrans state for the transaction --- needed because
                               2078                 :          * pg_subtrans is not preserved over a restart.  Note that we are
                               2079                 :          * linking all the subtransactions directly to the top-level XID;
                               2080                 :          * there may originally have been a more complex hierarchy, but
                               2081                 :          * there's no need to restore that exactly. It's possible that
                               2082                 :          * SubTransSetParent has been set before, if the prepared transaction
 2153 bruce                    2083 ECB             :          * generated xid assignment records.
                               2084                 :          */
 2196 simon                    2085 CBC          30 :         buf = ProcessTwoPhaseBuffer(xid,
 2153 bruce                    2086 ECB             :                                     gxact->prepare_start_lsn,
 2153 bruce                    2087 GBC          30 :                                     gxact->ondisk, true, false);
 2196 simon                    2088 GIC          30 :         if (buf == NULL)
 2196 simon                    2089 LBC           0 :             continue;
                               2090                 : 
 2196 simon                    2091 GIC          30 :         ereport(LOG,
 2196 simon                    2092 ECB             :                 (errmsg("recovering prepared transaction %u from shared memory", xid)));
                               2093                 : 
 2196 simon                    2094 CBC          30 :         hdr = (TwoPhaseFileHeader *) buf;
                               2095              30 :         Assert(TransactionIdEquals(hdr->xid, xid));
                               2096              30 :         bufptr = buf + MAXALIGN(sizeof(TwoPhaseFileHeader));
                               2097              30 :         gid = (const char *) bufptr;
                               2098              30 :         bufptr += MAXALIGN(hdr->gidlen);
                               2099              30 :         subxids = (TransactionId *) bufptr;
                               2100              30 :         bufptr += MAXALIGN(hdr->nsubxacts * sizeof(TransactionId));
  277 rhaas                    2101 GNC          30 :         bufptr += MAXALIGN(hdr->ncommitrels * sizeof(RelFileLocator));
                               2102              30 :         bufptr += MAXALIGN(hdr->nabortrels * sizeof(RelFileLocator));
  368 andres                   2103 CBC          30 :         bufptr += MAXALIGN(hdr->ncommitstats * sizeof(xl_xact_stats_item));
  368 andres                   2104 GIC          30 :         bufptr += MAXALIGN(hdr->nabortstats * sizeof(xl_xact_stats_item));
 2196 simon                    2105              30 :         bufptr += MAXALIGN(hdr->ninvalmsgs * sizeof(SharedInvalidationMessage));
                               2106                 : 
                               2107                 :         /*
                               2108                 :          * Recreate its GXACT and dummy PGPROC. But, check whether it was
 2153 bruce                    2109 ECB             :          * added in redo and already has a shmem entry for it.
                               2110                 :          */
 2196 simon                    2111 GIC          30 :         MarkAsPreparingGuts(gxact, xid, gid,
                               2112                 :                             hdr->prepared_at,
                               2113                 :                             hdr->owner, hdr->database);
 6505 tgl                      2114 ECB             : 
                               2115                 :         /* recovered, so reset the flag for entries generated by redo */
 2196 simon                    2116 CBC          30 :         gxact->inredo = false;
 6505 tgl                      2117 ECB             : 
 2196 simon                    2118 GIC          30 :         GXactLoadSubxactData(gxact, hdr->nsubxacts, subxids);
 2125 alvherre                 2119 CBC          30 :         MarkAsPrepared(gxact, true);
                               2120                 : 
 2125 alvherre                 2121 GIC          30 :         LWLockRelease(TwoPhaseStateLock);
                               2122                 : 
                               2123                 :         /*
 2125 alvherre                 2124 ECB             :          * Recover other state (notably locks) using resource managers.
                               2125                 :          */
 2196 simon                    2126 GIC          30 :         ProcessRecords(bufptr, xid, twophase_recover_callbacks);
                               2127                 : 
                               2128                 :         /*
                               2129                 :          * Release locks held by the standby process after we process each
                               2130                 :          * prepared transaction. As a result, we don't need too many
 2196 simon                    2131 ECB             :          * additional locks at any one time.
                               2132                 :          */
 2196 simon                    2133 GIC          30 :         if (InHotStandby)
                               2134               6 :             StandbyReleaseLockTree(xid, hdr->nsubxacts, subxids);
                               2135                 : 
                               2136                 :         /*
                               2137                 :          * We're done with recovering this transaction. Clear MyLockedGxact,
 2153 bruce                    2138 ECB             :          * like we do in PrepareTransaction() during normal operation.
                               2139                 :          */
 2196 simon                    2140 CBC          30 :         PostPrepare_Twophase();
                               2141                 : 
                               2142              30 :         pfree(buf);
                               2143                 : 
 2125 alvherre                 2144 GIC          30 :         LWLockAcquire(TwoPhaseStateLock, LW_EXCLUSIVE);
 2196 simon                    2145 ECB             :     }
 2125 alvherre                 2146                 : 
 2125 alvherre                 2147 GIC        1142 :     LWLockRelease(TwoPhaseStateLock);
 2196 simon                    2148            1142 : }
                               2149                 : 
                               2150                 : /*
                               2151                 :  * ProcessTwoPhaseBuffer
                               2152                 :  *
                               2153                 :  * Given a transaction id, read it either from disk or read it directly
                               2154                 :  * via shmem xlog record pointer using the provided "prepare_start_lsn".
                               2155                 :  *
                               2156                 :  * If setParent is true, set up subtransaction parent linkages.
                               2157                 :  *
                               2158                 :  * If setNextXid is true, set ShmemVariableCache->nextXid to the newest
                               2159                 :  * value scanned.
 2196 simon                    2160 ECB             :  */
                               2161                 : static char *
 2196 simon                    2162 GIC          95 : ProcessTwoPhaseBuffer(TransactionId xid,
                               2163                 :                       XLogRecPtr prepare_start_lsn,
                               2164                 :                       bool fromdisk,
 2182 simon                    2165 ECB             :                       bool setParent, bool setNextXid)
 2196                          2166                 : {
  971 andres                   2167 GIC          95 :     FullTransactionId nextXid = ShmemVariableCache->nextXid;
                               2168              95 :     TransactionId origNextXid = XidFromFullTransactionId(nextXid);
                               2169                 :     TransactionId *subxids;
                               2170                 :     char       *buf;
                               2171                 :     TwoPhaseFileHeader *hdr;
 2196 simon                    2172 ECB             :     int         i;
                               2173                 : 
 2125 alvherre                 2174 CBC          95 :     Assert(LWLockHeldByMeInMode(TwoPhaseStateLock, LW_EXCLUSIVE));
 2125 alvherre                 2175 ECB             : 
 2196 simon                    2176 GIC          95 :     if (!fromdisk)
                               2177              60 :         Assert(prepare_start_lsn != InvalidXLogRecPtr);
 2196 simon                    2178 ECB             : 
                               2179                 :     /* Already processed? */
 2196 simon                    2180 GBC          95 :     if (TransactionIdDidCommit(xid) || TransactionIdDidAbort(xid))
                               2181                 :     {
 2196 simon                    2182 UBC           0 :         if (fromdisk)
                               2183                 :         {
 2196 simon                    2184 UIC           0 :             ereport(WARNING,
 2036 peter_e                  2185 EUB             :                     (errmsg("removing stale two-phase state file for transaction %u",
                               2186                 :                             xid)));
 2196 simon                    2187 UIC           0 :             RemoveTwoPhaseFile(xid, true);
                               2188                 :         }
 2196 simon                    2189 EUB             :         else
                               2190                 :         {
 2196 simon                    2191 UIC           0 :             ereport(WARNING,
 2036 peter_e                  2192 EUB             :                     (errmsg("removing stale two-phase state from memory for transaction %u",
                               2193                 :                             xid)));
 2196 simon                    2194 UBC           0 :             PrepareRedoRemove(xid, true);
                               2195                 :         }
 2196 simon                    2196 UIC           0 :         return NULL;
                               2197                 :     }
 2196 simon                    2198 ECB             : 
                               2199                 :     /* Reject XID if too new */
 2196 simon                    2200 GBC          95 :     if (TransactionIdFollowsOrEquals(xid, origNextXid))
                               2201                 :     {
 2196 simon                    2202 UBC           0 :         if (fromdisk)
                               2203                 :         {
 2196 simon                    2204 UIC           0 :             ereport(WARNING,
 2036 peter_e                  2205 EUB             :                     (errmsg("removing future two-phase state file for transaction %u",
                               2206                 :                             xid)));
 2196 simon                    2207 UIC           0 :             RemoveTwoPhaseFile(xid, true);
                               2208                 :         }
 2196 simon                    2209 EUB             :         else
                               2210                 :         {
 2196 simon                    2211 UIC           0 :             ereport(WARNING,
 2036 peter_e                  2212 EUB             :                     (errmsg("removing future two-phase state from memory for transaction %u",
                               2213                 :                             xid)));
 2196 simon                    2214 UBC           0 :             PrepareRedoRemove(xid, true);
                               2215                 :         }
 2196 simon                    2216 UIC           0 :         return NULL;
 2196 simon                    2217 ECB             :     }
                               2218                 : 
 2196 simon                    2219 GIC          95 :     if (fromdisk)
 2196 simon                    2220 ECB             :     {
                               2221                 :         /* Read and validate file */
 1675 michael                  2222 GIC          35 :         buf = ReadTwoPhaseFile(xid, false);
                               2223                 :     }
                               2224                 :     else
 2196 simon                    2225 ECB             :     {
                               2226                 :         /* Read xlog data */
 2196 simon                    2227 GIC          60 :         XlogReadTwoPhaseData(prepare_start_lsn, &buf, NULL);
                               2228                 :     }
 2196 simon                    2229 ECB             : 
                               2230                 :     /* Deconstruct header */
 2196 simon                    2231 GIC          95 :     hdr = (TwoPhaseFileHeader *) buf;
 2196 simon                    2232 GBC          95 :     if (!TransactionIdEquals(hdr->xid, xid))
 2196 simon                    2233 EUB             :     {
 2196 simon                    2234 UIC           0 :         if (fromdisk)
 1675 michael                  2235               0 :             ereport(ERROR,
                               2236                 :                     (errcode(ERRCODE_DATA_CORRUPTED),
                               2237                 :                      errmsg("corrupted two-phase state file for transaction %u",
 2118 tgl                      2238 EUB             :                             xid)));
                               2239                 :         else
 1675 michael                  2240 UIC           0 :             ereport(ERROR,
                               2241                 :                     (errcode(ERRCODE_DATA_CORRUPTED),
                               2242                 :                      errmsg("corrupted two-phase state in memory for transaction %u",
                               2243                 :                             xid)));
                               2244                 :     }
                               2245                 : 
                               2246                 :     /*
                               2247                 :      * Examine subtransaction XIDs ... they should all follow main XID, and
  971 andres                   2248 ECB             :      * they may force us to advance nextXid.
 2196 simon                    2249                 :      */
 2196 simon                    2250 CBC          95 :     subxids = (TransactionId *) (buf +
                               2251              95 :                                  MAXALIGN(sizeof(TwoPhaseFileHeader)) +
 2196 simon                    2252 GIC          95 :                                  MAXALIGN(hdr->gidlen));
 2196 simon                    2253 CBC        1858 :     for (i = 0; i < hdr->nsubxacts; i++)
                               2254                 :     {
                               2255            1763 :         TransactionId subxid = subxids[i];
                               2256                 : 
 2196 simon                    2257 GIC        1763 :         Assert(TransactionIdFollows(subxid, xid));
 2182 simon                    2258 ECB             : 
  971 andres                   2259                 :         /* update nextXid if needed */
 1473 tmunro                   2260 GIC        1763 :         if (setNextXid)
 1473 tmunro                   2261 CBC         813 :             AdvanceNextFullTransactionIdPastXid(subxid);
 2182 simon                    2262 ECB             : 
 2196 simon                    2263 GIC        1763 :         if (setParent)
 2173                          2264             345 :             SubTransSetParent(subxid, xid);
 2196 simon                    2265 ECB             :     }
                               2266                 : 
 2196 simon                    2267 GIC          95 :     return buf;
                               2268                 : }
                               2269                 : 
                               2270                 : 
                               2271                 : /*
                               2272                 :  *  RecordTransactionCommitPrepared
                               2273                 :  *
                               2274                 :  * This is basically the same as RecordTransactionCommit (q.v. if you change
                               2275                 :  * this function): in particular, we must set DELAY_CHKPT_START to avoid a
                               2276                 :  * race condition.
                               2277                 :  *
                               2278                 :  * We know the transaction made at least one XLOG entry (its PREPARE),
                               2279                 :  * so it is never possible to optimize out the commit record.
 6505 tgl                      2280 ECB             :  */
                               2281                 : static void
 6505 tgl                      2282 GIC         325 : RecordTransactionCommitPrepared(TransactionId xid,
                               2283                 :                                 int nchildren,
                               2284                 :                                 TransactionId *children,
                               2285                 :                                 int nrels,
                               2286                 :                                 RelFileLocator *rels,
                               2287                 :                                 int nstats,
                               2288                 :                                 xl_xact_stats_item *stats,
                               2289                 :                                 int ninvalmsgs,
                               2290                 :                                 SharedInvalidationMessage *invalmsgs,
                               2291                 :                                 bool initfileinval,
                               2292                 :                                 const char *gid)
 6505 tgl                      2293 ECB             : {
                               2294                 :     XLogRecPtr  recptr;
 2749 alvherre                 2295 GIC         325 :     TimestampTz committs = GetCurrentTimestamp();
                               2296                 :     bool        replorigin;
                               2297                 : 
                               2298                 :     /*
                               2299                 :      * Are we using the replication origins feature?  Or, in other words, are
 2749 alvherre                 2300 ECB             :      * we replaying remote actions?
                               2301                 :      */
 2749 alvherre                 2302 GIC         345 :     replorigin = (replorigin_session_origin != InvalidRepOriginId &&
 2749 alvherre                 2303 CBC          20 :                   replorigin_session_origin != DoNotReplicateId);
                               2304                 : 
 6505 tgl                      2305 GIC         325 :     START_CRIT_SECTION();
 6505 tgl                      2306 ECB             : 
                               2307                 :     /* See notes in RecordTransactionCommit */
  366 rhaas                    2308 GIC         325 :     Assert((MyProc->delayChkptFlags & DELAY_CHKPT_START) == 0);
                               2309             325 :     MyProc->delayChkptFlags |= DELAY_CHKPT_START;
                               2310                 : 
                               2311                 :     /*
                               2312                 :      * Emit the XLOG commit record. Note that we mark 2PC commits as
                               2313                 :      * potentially having AccessExclusiveLocks since we don't know whether or
 2153 bruce                    2314 ECB             :      * not they do.
                               2315                 :      */
 2749 alvherre                 2316 GIC         325 :     recptr = XactLogCommitRecord(committs,
                               2317                 :                                  nchildren, children, nrels, rels,
                               2318                 :                                  nstats, stats,
 2947 andres                   2319 ECB             :                                  ninvalmsgs, invalmsgs,
                               2320                 :                                  initfileinval,
 2118 tgl                      2321 GIC         325 :                                  MyXactFlags | XACT_FLAGS_ACQUIREDACCESSEXCLUSIVELOCK,
                               2322                 :                                  xid, gid);
 6505 tgl                      2323 ECB             : 
                               2324                 : 
 2749 alvherre                 2325 CBC         325 :     if (replorigin)
                               2326                 :         /* Move LSNs forward for this replication origin */
 2749 alvherre                 2327 GIC          20 :         replorigin_session_advance(replorigin_session_origin_lsn,
                               2328                 :                                    XactLastRecEnd);
                               2329                 : 
                               2330                 :     /*
                               2331                 :      * Record commit timestamp.  The value comes from plain commit timestamp
                               2332                 :      * if replorigin is not enabled, or replorigin already set a value for us
                               2333                 :      * in replorigin_session_origin_timestamp otherwise.
                               2334                 :      *
                               2335                 :      * We don't need to WAL-log anything here, as the commit record written
 2749 alvherre                 2336 ECB             :      * above already contains the data.
                               2337                 :      */
 2749 alvherre                 2338 GIC         325 :     if (!replorigin || replorigin_session_origin_timestamp == 0)
 2749 alvherre                 2339 CBC         305 :         replorigin_session_origin_timestamp = committs;
                               2340                 : 
 2749 alvherre                 2341 GIC         325 :     TransactionTreeSetCommitTsData(xid, nchildren, children,
                               2342                 :                                    replorigin_session_origin_timestamp,
                               2343                 :                                    replorigin_session_origin);
                               2344                 : 
                               2345                 :     /*
                               2346                 :      * We don't currently try to sleep before flush here ... nor is there any
                               2347                 :      * support for async commit of a prepared xact (the very idea is probably
                               2348                 :      * a contradiction)
                               2349                 :      */
 6505 tgl                      2350 ECB             : 
                               2351                 :     /* Flush XLOG to disk */
 6505 tgl                      2352 GIC         325 :     XLogFlush(recptr);
 6505 tgl                      2353 ECB             : 
                               2354                 :     /* Mark the transaction committed in pg_xact */
 5284 alvherre                 2355 GIC         325 :     TransactionIdCommitTree(xid, nchildren, children);
 6505 tgl                      2356 ECB             : 
                               2357                 :     /* Checkpoint can proceed now */
  366 rhaas                    2358 CBC         325 :     MyProc->delayChkptFlags &= ~DELAY_CHKPT_START;
                               2359                 : 
 6505 tgl                      2360 GIC         325 :     END_CRIT_SECTION();
                               2361                 : 
                               2362                 :     /*
                               2363                 :      * Wait for synchronous replication, if required.
                               2364                 :      *
                               2365                 :      * Note that at this stage we have marked clog, but still show as running
 4382 bruce                    2366 ECB             :      * in the procarray and continue to hold locks.
 4417 simon                    2367                 :      */
 2567 rhaas                    2368 GIC         325 :     SyncRepWaitForLSN(recptr, true);
 6505 tgl                      2369             325 : }
                               2370                 : 
                               2371                 : /*
                               2372                 :  *  RecordTransactionAbortPrepared
                               2373                 :  *
                               2374                 :  * This is basically the same as RecordTransactionAbort.
                               2375                 :  *
                               2376                 :  * We know the transaction made at least one XLOG entry (its PREPARE),
                               2377                 :  * so it is never possible to optimize out the abort record.
 6505 tgl                      2378 ECB             :  */
                               2379                 : static void
 6505 tgl                      2380 GIC          38 : RecordTransactionAbortPrepared(TransactionId xid,
                               2381                 :                                int nchildren,
                               2382                 :                                TransactionId *children,
                               2383                 :                                int nrels,
                               2384                 :                                RelFileLocator *rels,
                               2385                 :                                int nstats,
                               2386                 :                                xl_xact_stats_item *stats,
                               2387                 :                                const char *gid)
                               2388                 : {
                               2389                 :     XLogRecPtr  recptr;
                               2390                 :     bool        replorigin;
                               2391                 : 
                               2392                 :     /*
                               2393                 :      * Are we using the replication origins feature?  Or, in other words, are
  762 akapila                  2394 ECB             :      * we replaying remote actions?
                               2395                 :      */
  762 akapila                  2396 GIC          44 :     replorigin = (replorigin_session_origin != InvalidRepOriginId &&
                               2397               6 :                   replorigin_session_origin != DoNotReplicateId);
                               2398                 : 
                               2399                 :     /*
                               2400                 :      * Catch the scenario where we aborted partway through
 6505 tgl                      2401 ECB             :      * RecordTransactionCommitPrepared ...
 6505 tgl                      2402 EUB             :      */
 6505 tgl                      2403 GIC          38 :     if (TransactionIdDidCommit(xid))
 6505 tgl                      2404 UIC           0 :         elog(PANIC, "cannot abort transaction %u, it was already committed",
 6505 tgl                      2405 ECB             :              xid);
                               2406                 : 
 6505 tgl                      2407 GIC          38 :     START_CRIT_SECTION();
                               2408                 : 
                               2409                 :     /*
                               2410                 :      * Emit the XLOG commit record. Note that we mark 2PC aborts as
                               2411                 :      * potentially having AccessExclusiveLocks since we don't know whether or
 2153 bruce                    2412 ECB             :      * not they do.
                               2413                 :      */
 2947 andres                   2414 GIC          38 :     recptr = XactLogAbortRecord(GetCurrentTimestamp(),
                               2415                 :                                 nchildren, children,
 2947 andres                   2416 ECB             :                                 nrels, rels,
                               2417                 :                                 nstats, stats,
 2118 tgl                      2418 GIC          38 :                                 MyXactFlags | XACT_FLAGS_ACQUIREDACCESSEXCLUSIVELOCK,
 1838 simon                    2419 ECB             :                                 xid, gid);
                               2420                 : 
  762 akapila                  2421 CBC          38 :     if (replorigin)
                               2422                 :         /* Move LSNs forward for this replication origin */
  762 akapila                  2423 GIC           6 :         replorigin_session_advance(replorigin_session_origin_lsn,
                               2424                 :                                    XactLastRecEnd);
  762 akapila                  2425 ECB             : 
                               2426                 :     /* Always flush, since we're about to remove the 2PC state file */
 6505 tgl                      2427 GIC          38 :     XLogFlush(recptr);
                               2428                 : 
                               2429                 :     /*
                               2430                 :      * Mark the transaction aborted in clog.  This is not absolutely necessary
 6385 bruce                    2431 ECB             :      * but we may as well do it while we are here.
                               2432                 :      */
 5284 alvherre                 2433 CBC          38 :     TransactionIdAbortTree(xid, nchildren, children);
                               2434                 : 
 6505 tgl                      2435 GIC          38 :     END_CRIT_SECTION();
                               2436                 : 
                               2437                 :     /*
                               2438                 :      * Wait for synchronous replication, if required.
                               2439                 :      *
                               2440                 :      * Note that at this stage we have marked clog, but still show as running
 4382 bruce                    2441 ECB             :      * in the procarray and continue to hold locks.
 4417 simon                    2442                 :      */
 2567 rhaas                    2443 GIC          38 :     SyncRepWaitForLSN(recptr, false);
 6505 tgl                      2444              38 : }
                               2445                 : 
                               2446                 : /*
                               2447                 :  * PrepareRedoAdd
                               2448                 :  *
                               2449                 :  * Store pointers to the start/end of the WAL record along with the xid in
                               2450                 :  * a gxact entry in shared memory TwoPhaseState structure.  If caller
                               2451                 :  * specifies InvalidXLogRecPtr as WAL location to fetch the two-phase
                               2452                 :  * data, the entry is marked as located on disk.
 2196 simon                    2453 ECB             :  */
                               2454                 : void
 1838 simon                    2455 GIC          80 : PrepareRedoAdd(char *buf, XLogRecPtr start_lsn,
 1838 simon                    2456 ECB             :                XLogRecPtr end_lsn, RepOriginId origin_id)
                               2457                 : {
 2196 simon                    2458 GIC          80 :     TwoPhaseFileHeader *hdr = (TwoPhaseFileHeader *) buf;
                               2459                 :     char       *bufptr;
                               2460                 :     const char *gid;
 2196 simon                    2461 ECB             :     GlobalTransaction gxact;
                               2462                 : 
 2125 alvherre                 2463 GIC          80 :     Assert(LWLockHeldByMeInMode(TwoPhaseStateLock, LW_EXCLUSIVE));
 2196 simon                    2464 CBC          80 :     Assert(RecoveryInProgress());
 2196 simon                    2465 ECB             : 
 2196 simon                    2466 GIC          80 :     bufptr = buf + MAXALIGN(sizeof(TwoPhaseFileHeader));
                               2467              80 :     gid = (const char *) bufptr;
                               2468                 : 
                               2469                 :     /*
                               2470                 :      * Reserve the GID for the given transaction in the redo code path.
                               2471                 :      *
                               2472                 :      * This creates a gxact struct and puts it into the active array.
                               2473                 :      *
                               2474                 :      * In redo, this struct is mainly used to track PREPARE/COMMIT entries in
                               2475                 :      * shared memory. Hence, we only fill up the bare minimum contents here.
                               2476                 :      * The gxact also gets marked with gxact->inredo set to true to indicate
                               2477                 :      * that it got added in the redo phase
                               2478                 :      */
 2196 simon                    2479 ECB             : 
 2196 simon                    2480 EUB             :     /* Get a free gxact from the freelist */
 2196 simon                    2481 GIC          80 :     if (TwoPhaseState->freeGXacts == NULL)
 2196 simon                    2482 UIC           0 :         ereport(ERROR,
                               2483                 :                 (errcode(ERRCODE_OUT_OF_MEMORY),
                               2484                 :                  errmsg("maximum number of prepared transactions reached"),
 2196 simon                    2485 ECB             :                  errhint("Increase max_prepared_transactions (currently %d).",
                               2486                 :                          max_prepared_xacts)));
 2196 simon                    2487 GIC          80 :     gxact = TwoPhaseState->freeGXacts;
 2196 simon                    2488 CBC          80 :     TwoPhaseState->freeGXacts = gxact->next;
 2196 simon                    2489 ECB             : 
 2196 simon                    2490 CBC          80 :     gxact->prepared_at = hdr->prepared_at;
                               2491              80 :     gxact->prepare_start_lsn = start_lsn;
                               2492              80 :     gxact->prepare_end_lsn = end_lsn;
                               2493              80 :     gxact->xid = hdr->xid;
                               2494              80 :     gxact->owner = hdr->owner;
                               2495              80 :     gxact->locking_backend = InvalidBackendId;
                               2496              80 :     gxact->valid = false;
                               2497              80 :     gxact->ondisk = XLogRecPtrIsInvalid(start_lsn);
 2153 bruce                    2498 GIC          80 :     gxact->inredo = true;        /* yes, added in redo */
 2196 simon                    2499              80 :     strcpy(gxact->gid, gid);
 2196 simon                    2500 ECB             : 
                               2501                 :     /* And insert it into the active array */
 2196 simon                    2502 GIC          80 :     Assert(TwoPhaseState->numPrepXacts < max_prepared_xacts);
 2196 simon                    2503 CBC          80 :     TwoPhaseState->prepXacts[TwoPhaseState->numPrepXacts++] = gxact;
                               2504                 : 
 1838 simon                    2505 GIC          80 :     if (origin_id != InvalidRepOriginId)
 1838 simon                    2506 ECB             :     {
                               2507                 :         /* recover apply progress */
 1838 simon                    2508 GIC          13 :         replorigin_advance(origin_id, hdr->origin_lsn, end_lsn,
                               2509                 :                            false /* backward */ , false /* WAL */ );
 1838 simon                    2510 ECB             :     }
                               2511                 : 
 2125 alvherre                 2512 GIC          80 :     elog(DEBUG2, "added 2PC data in shared memory for transaction %u", gxact->xid);
 2196 simon                    2513              80 : }
                               2514                 : 
                               2515                 : /*
                               2516                 :  * PrepareRedoRemove
                               2517                 :  *
                               2518                 :  * Remove the corresponding gxact entry from TwoPhaseState. Also remove
                               2519                 :  * the 2PC file if a prepared transaction was saved via an earlier checkpoint.
                               2520                 :  *
                               2521                 :  * Caller must hold TwoPhaseStateLock in exclusive mode, because TwoPhaseState
                               2522                 :  * is updated.
 2196 simon                    2523 ECB             :  */
                               2524                 : void
 2196 simon                    2525 CBC          58 : PrepareRedoRemove(TransactionId xid, bool giveWarning)
                               2526                 : {
                               2527              58 :     GlobalTransaction gxact = NULL;
                               2528                 :     int         i;
 2182                          2529              58 :     bool        found = false;
 2196 simon                    2530 ECB             : 
 2125 alvherre                 2531 GIC          58 :     Assert(LWLockHeldByMeInMode(TwoPhaseStateLock, LW_EXCLUSIVE));
 2196 simon                    2532 CBC          58 :     Assert(RecoveryInProgress());
                               2533                 : 
                               2534              58 :     for (i = 0; i < TwoPhaseState->numPrepXacts; i++)
                               2535                 :     {
                               2536              49 :         gxact = TwoPhaseState->prepXacts[i];
                               2537                 : 
                               2538              49 :         if (gxact->xid == xid)
 2196 simon                    2539 ECB             :         {
 2196 simon                    2540 CBC          49 :             Assert(gxact->inredo);
 2182 simon                    2541 GIC          49 :             found = true;
 2196                          2542              49 :             break;
                               2543                 :         }
                               2544                 :     }
                               2545                 : 
                               2546                 :     /*
 2196 simon                    2547 ECB             :      * Just leave if there is nothing, this is expected during WAL replay.
                               2548                 :      */
 2182 simon                    2549 GIC          58 :     if (!found)
 2196                          2550               9 :         return;
                               2551                 : 
                               2552                 :     /*
 2196 simon                    2553 ECB             :      * And now we can clean up any files we may have left.
                               2554                 :      */
 2125 alvherre                 2555 CBC          49 :     elog(DEBUG2, "removing 2PC data for transaction %u", xid);
 2196 simon                    2556              49 :     if (gxact->ondisk)
 2196 simon                    2557 GIC           2 :         RemoveTwoPhaseFile(xid, giveWarning);
                               2558              49 :     RemoveGXact(gxact);
                               2559                 : }
                               2560                 : 
                               2561                 : /*
                               2562                 :  * LookupGXact
                               2563                 :  *      Check if the prepared transaction with the given GID, lsn and timestamp
                               2564                 :  *      exists.
                               2565                 :  *
                               2566                 :  * Note that we always compare with the LSN where prepare ends because that is
                               2567                 :  * what is stored as origin_lsn in the 2PC file.
                               2568                 :  *
                               2569                 :  * This function is primarily used to check if the prepared transaction
                               2570                 :  * received from the upstream (remote node) already exists. Checking only GID
                               2571                 :  * is not sufficient because a different prepared xact with the same GID can
                               2572                 :  * exist on the same node. So, we are ensuring to match origin_lsn and
                               2573                 :  * origin_timestamp of prepared xact to avoid the possibility of a match of
                               2574                 :  * prepared xact from two different nodes.
  634 akapila                  2575 ECB             :  */
                               2576                 : bool
  634 akapila                  2577 GIC           5 : LookupGXact(const char *gid, XLogRecPtr prepare_end_lsn,
                               2578                 :             TimestampTz origin_prepare_timestamp)
  634 akapila                  2579 ECB             : {
                               2580                 :     int         i;
  634 akapila                  2581 CBC           5 :     bool        found = false;
  634 akapila                  2582 ECB             : 
  634 akapila                  2583 GIC           5 :     LWLockAcquire(TwoPhaseStateLock, LW_SHARED);
  634 akapila                  2584 CBC           5 :     for (i = 0; i < TwoPhaseState->numPrepXacts; i++)
                               2585                 :     {
  634 akapila                  2586 GIC           5 :         GlobalTransaction gxact = TwoPhaseState->prepXacts[i];
  634 akapila                  2587 ECB             : 
                               2588                 :         /* Ignore not-yet-valid GIDs. */
  634 akapila                  2589 GIC           5 :         if (gxact->valid && strcmp(gxact->gid, gid) == 0)
                               2590                 :         {
                               2591                 :             char       *buf;
                               2592                 :             TwoPhaseFileHeader *hdr;
                               2593                 : 
                               2594                 :             /*
                               2595                 :              * We are not expecting collisions of GXACTs (same gid) between
                               2596                 :              * publisher and subscribers, so we perform all I/O while holding
                               2597                 :              * TwoPhaseStateLock for simplicity.
                               2598                 :              *
                               2599                 :              * To move the I/O out of the lock, we need to ensure that no
                               2600                 :              * other backend commits the prepared xact in the meantime. We can
                               2601                 :              * do this optimization if we encounter many collisions in GID
  634 akapila                  2602 ECB             :              * between publisher and subscriber.
  634 akapila                  2603 EUB             :              */
  634 akapila                  2604 GIC           5 :             if (gxact->ondisk)
  634 akapila                  2605 UIC           0 :                 buf = ReadTwoPhaseFile(gxact->xid, false);
  634 akapila                  2606 ECB             :             else
                               2607                 :             {
  634 akapila                  2608 GIC           5 :                 Assert(gxact->prepare_start_lsn);
                               2609               5 :                 XlogReadTwoPhaseData(gxact->prepare_start_lsn, &buf, NULL);
  634 akapila                  2610 ECB             :             }
                               2611                 : 
  634 akapila                  2612 CBC           5 :             hdr = (TwoPhaseFileHeader *) buf;
  634 akapila                  2613 ECB             : 
  634 akapila                  2614 GIC           5 :             if (hdr->origin_lsn == prepare_end_lsn &&
  634 akapila                  2615 CBC           5 :                 hdr->origin_timestamp == origin_prepare_timestamp)
  634 akapila                  2616 ECB             :             {
  634 akapila                  2617 CBC           5 :                 found = true;
  634 akapila                  2618 GIC           5 :                 pfree(buf);
                               2619               5 :                 break;
  634 akapila                  2620 EUB             :             }
                               2621                 : 
  634 akapila                  2622 UIC           0 :             pfree(buf);
  634 akapila                  2623 ECB             :         }
                               2624                 :     }
  634 akapila                  2625 GIC           5 :     LWLockRelease(TwoPhaseStateLock);
                               2626               5 :     return found;
                               2627                 : }
        

Generated by: LCOV version v1.16-55-g56c0a2a