LCOV - differential code coverage report
Current view: top level - src/backend/access/transam - twophase.c (source / functions) Coverage Total Hit LBC UIC UBC GBC GIC GNC CBC EUB ECB DCB
Current: Differential Code Coverage HEAD vs 15 Lines: 90.7 % 778 706 5 49 18 16 273 13 404 38 283 14
Current Date: 2023-04-08 15:15:32 Functions: 100.0 % 39 39 26 2 11 26
Baseline: 15
Baseline Date: 2023-04-08 15:09:40
Legend: Lines: hit not hit

           TLA  Line data    Source code
       1                 : /*-------------------------------------------------------------------------
       2                 :  *
       3                 :  * twophase.c
       4                 :  *      Two-phase commit support functions.
       5                 :  *
       6                 :  * Portions Copyright (c) 1996-2023, PostgreSQL Global Development Group
       7                 :  * Portions Copyright (c) 1994, Regents of the University of California
       8                 :  *
       9                 :  * IDENTIFICATION
      10                 :  *      src/backend/access/transam/twophase.c
      11                 :  *
      12                 :  * NOTES
      13                 :  *      Each global transaction is associated with a global transaction
      14                 :  *      identifier (GID). The client assigns a GID to a postgres
      15                 :  *      transaction with the PREPARE TRANSACTION command.
      16                 :  *
      17                 :  *      We keep all active global transactions in a shared memory array.
      18                 :  *      When the PREPARE TRANSACTION command is issued, the GID is
      19                 :  *      reserved for the transaction in the array. This is done before
      20                 :  *      a WAL entry is made, because the reservation checks for duplicate
      21                 :  *      GIDs and aborts the transaction if there already is a global
      22                 :  *      transaction in prepared state with the same GID.
      23                 :  *
      24                 :  *      A global transaction (gxact) also has dummy PGPROC; this is what keeps
      25                 :  *      the XID considered running by TransactionIdIsInProgress.  It is also
      26                 :  *      convenient as a PGPROC to hook the gxact's locks to.
      27                 :  *
      28                 :  *      Information to recover prepared transactions in case of crash is
      29                 :  *      now stored in WAL for the common case. In some cases there will be
      30                 :  *      an extended period between preparing a GXACT and commit/abort, in
      31                 :  *      which case we need to separately record prepared transaction data
      32                 :  *      in permanent storage. This includes locking information, pending
      33                 :  *      notifications etc. All that state information is written to the
      34                 :  *      per-transaction state file in the pg_twophase directory.
      35                 :  *      All prepared transactions will be written prior to shutdown.
      36                 :  *
      37                 :  *      Life track of state data is following:
      38                 :  *
      39                 :  *      * On PREPARE TRANSACTION backend writes state data only to the WAL and
      40                 :  *        stores pointer to the start of the WAL record in
      41                 :  *        gxact->prepare_start_lsn.
      42                 :  *      * If COMMIT occurs before checkpoint then backend reads data from WAL
      43                 :  *        using prepare_start_lsn.
      44                 :  *      * On checkpoint state data copied to files in pg_twophase directory and
      45                 :  *        fsynced
      46                 :  *      * If COMMIT happens after checkpoint then backend reads state data from
      47                 :  *        files
      48                 :  *
      49                 :  *      During replay and replication, TwoPhaseState also holds information
      50                 :  *      about active prepared transactions that haven't been moved to disk yet.
      51                 :  *
      52                 :  *      Replay of twophase records happens by the following rules:
      53                 :  *
      54                 :  *      * At the beginning of recovery, pg_twophase is scanned once, filling
      55                 :  *        TwoPhaseState with entries marked with gxact->inredo and
      56                 :  *        gxact->ondisk.  Two-phase file data older than the XID horizon of
      57                 :  *        the redo position are discarded.
      58                 :  *      * On PREPARE redo, the transaction is added to TwoPhaseState->prepXacts.
      59                 :  *        gxact->inredo is set to true for such entries.
      60                 :  *      * On Checkpoint we iterate through TwoPhaseState->prepXacts entries
      61                 :  *        that have gxact->inredo set and are behind the redo_horizon. We
      62                 :  *        save them to disk and then switch gxact->ondisk to true.
      63                 :  *      * On COMMIT/ABORT we delete the entry from TwoPhaseState->prepXacts.
      64                 :  *        If gxact->ondisk is true, the corresponding entry from the disk
      65                 :  *        is additionally deleted.
      66                 :  *      * RecoverPreparedTransactions(), StandbyRecoverPreparedTransactions()
      67                 :  *        and PrescanPreparedTransactions() have been modified to go through
      68                 :  *        gxact->inredo entries that have not made it to disk.
      69                 :  *
      70                 :  *-------------------------------------------------------------------------
      71                 :  */
      72                 : #include "postgres.h"
      73                 : 
      74                 : #include <fcntl.h>
      75                 : #include <sys/stat.h>
      76                 : #include <time.h>
      77                 : #include <unistd.h>
      78                 : 
      79                 : #include "access/commit_ts.h"
      80                 : #include "access/htup_details.h"
      81                 : #include "access/subtrans.h"
      82                 : #include "access/transam.h"
      83                 : #include "access/twophase.h"
      84                 : #include "access/twophase_rmgr.h"
      85                 : #include "access/xact.h"
      86                 : #include "access/xlog.h"
      87                 : #include "access/xloginsert.h"
      88                 : #include "access/xlogreader.h"
      89                 : #include "access/xlogutils.h"
      90                 : #include "catalog/pg_type.h"
      91                 : #include "catalog/storage.h"
      92                 : #include "funcapi.h"
      93                 : #include "miscadmin.h"
      94                 : #include "pg_trace.h"
      95                 : #include "pgstat.h"
      96                 : #include "replication/origin.h"
      97                 : #include "replication/syncrep.h"
      98                 : #include "replication/walsender.h"
      99                 : #include "storage/fd.h"
     100                 : #include "storage/ipc.h"
     101                 : #include "storage/md.h"
     102                 : #include "storage/predicate.h"
     103                 : #include "storage/proc.h"
     104                 : #include "storage/procarray.h"
     105                 : #include "storage/sinvaladt.h"
     106                 : #include "storage/smgr.h"
     107                 : #include "utils/builtins.h"
     108                 : #include "utils/memutils.h"
     109                 : #include "utils/timestamp.h"
     110                 : 
     111                 : /*
     112                 :  * Directory where Two-phase commit files reside within PGDATA
     113                 :  */
     114                 : #define TWOPHASE_DIR "pg_twophase"
     115                 : 
     116                 : /* GUC variable, can't be changed after startup */
     117                 : int         max_prepared_xacts = 0;
     118                 : 
     119                 : /*
     120                 :  * This struct describes one global transaction that is in prepared state
     121                 :  * or attempting to become prepared.
     122                 :  *
     123                 :  * The lifecycle of a global transaction is:
     124                 :  *
     125                 :  * 1. After checking that the requested GID is not in use, set up an entry in
     126                 :  * the TwoPhaseState->prepXacts array with the correct GID and valid = false,
     127                 :  * and mark it as locked by my backend.
     128                 :  *
     129                 :  * 2. After successfully completing prepare, set valid = true and enter the
     130                 :  * referenced PGPROC into the global ProcArray.
     131                 :  *
     132                 :  * 3. To begin COMMIT PREPARED or ROLLBACK PREPARED, check that the entry is
     133                 :  * valid and not locked, then mark the entry as locked by storing my current
     134                 :  * backend ID into locking_backend.  This prevents concurrent attempts to
     135                 :  * commit or rollback the same prepared xact.
     136                 :  *
     137                 :  * 4. On completion of COMMIT PREPARED or ROLLBACK PREPARED, remove the entry
     138                 :  * from the ProcArray and the TwoPhaseState->prepXacts array and return it to
     139                 :  * the freelist.
     140                 :  *
     141                 :  * Note that if the preparing transaction fails between steps 1 and 2, the
     142                 :  * entry must be removed so that the GID and the GlobalTransaction struct
     143                 :  * can be reused.  See AtAbort_Twophase().
     144                 :  *
     145                 :  * typedef struct GlobalTransactionData *GlobalTransaction appears in
     146                 :  * twophase.h
     147                 :  */
     148                 : 
     149                 : typedef struct GlobalTransactionData
     150                 : {
     151                 :     GlobalTransaction next;     /* list link for free list */
     152                 :     int         pgprocno;       /* ID of associated dummy PGPROC */
     153                 :     BackendId   dummyBackendId; /* similar to backend id for backends */
     154                 :     TimestampTz prepared_at;    /* time of preparation */
     155                 : 
     156                 :     /*
     157                 :      * Note that we need to keep track of two LSNs for each GXACT. We keep
     158                 :      * track of the start LSN because this is the address we must use to read
     159                 :      * state data back from WAL when committing a prepared GXACT. We keep
     160                 :      * track of the end LSN because that is the LSN we need to wait for prior
     161                 :      * to commit.
     162                 :      */
     163                 :     XLogRecPtr  prepare_start_lsn;  /* XLOG offset of prepare record start */
     164                 :     XLogRecPtr  prepare_end_lsn;    /* XLOG offset of prepare record end */
     165                 :     TransactionId xid;          /* The GXACT id */
     166                 : 
     167                 :     Oid         owner;          /* ID of user that executed the xact */
     168                 :     BackendId   locking_backend;    /* backend currently working on the xact */
     169                 :     bool        valid;          /* true if PGPROC entry is in proc array */
     170                 :     bool        ondisk;         /* true if prepare state file is on disk */
     171                 :     bool        inredo;         /* true if entry was added via xlog_redo */
     172                 :     char        gid[GIDSIZE];   /* The GID assigned to the prepared xact */
     173                 : }           GlobalTransactionData;
     174                 : 
     175                 : /*
     176                 :  * Two Phase Commit shared state.  Access to this struct is protected
     177                 :  * by TwoPhaseStateLock.
     178                 :  */
     179                 : typedef struct TwoPhaseStateData
     180                 : {
     181                 :     /* Head of linked list of free GlobalTransactionData structs */
     182                 :     GlobalTransaction freeGXacts;
     183                 : 
     184                 :     /* Number of valid prepXacts entries. */
     185                 :     int         numPrepXacts;
     186                 : 
     187                 :     /* There are max_prepared_xacts items in this array */
     188                 :     GlobalTransaction prepXacts[FLEXIBLE_ARRAY_MEMBER];
     189                 : } TwoPhaseStateData;
     190                 : 
     191                 : static TwoPhaseStateData *TwoPhaseState;
     192                 : 
     193                 : /*
     194                 :  * Global transaction entry currently locked by us, if any.  Note that any
     195                 :  * access to the entry pointed to by this variable must be protected by
     196                 :  * TwoPhaseStateLock, though obviously the pointer itself doesn't need to be
     197                 :  * (since it's just local memory).
     198                 :  */
     199                 : static GlobalTransaction MyLockedGxact = NULL;
     200                 : 
     201                 : static bool twophaseExitRegistered = false;
     202                 : 
     203                 : static void RecordTransactionCommitPrepared(TransactionId xid,
     204                 :                                             int nchildren,
     205                 :                                             TransactionId *children,
     206                 :                                             int nrels,
     207                 :                                             RelFileLocator *rels,
     208                 :                                             int nstats,
     209                 :                                             xl_xact_stats_item *stats,
     210                 :                                             int ninvalmsgs,
     211                 :                                             SharedInvalidationMessage *invalmsgs,
     212                 :                                             bool initfileinval,
     213                 :                                             const char *gid);
     214                 : static void RecordTransactionAbortPrepared(TransactionId xid,
     215                 :                                            int nchildren,
     216                 :                                            TransactionId *children,
     217                 :                                            int nrels,
     218                 :                                            RelFileLocator *rels,
     219                 :                                            int nstats,
     220                 :                                            xl_xact_stats_item *stats,
     221                 :                                            const char *gid);
     222                 : static void ProcessRecords(char *bufptr, TransactionId xid,
     223                 :                            const TwoPhaseCallback callbacks[]);
     224                 : static void RemoveGXact(GlobalTransaction gxact);
     225                 : 
     226                 : static void XlogReadTwoPhaseData(XLogRecPtr lsn, char **buf, int *len);
     227                 : static char *ProcessTwoPhaseBuffer(TransactionId xid,
     228                 :                                    XLogRecPtr prepare_start_lsn,
     229                 :                                    bool fromdisk, bool setParent, bool setNextXid);
     230                 : static void MarkAsPreparingGuts(GlobalTransaction gxact, TransactionId xid,
     231                 :                                 const char *gid, TimestampTz prepared_at, Oid owner,
     232                 :                                 Oid databaseid);
     233                 : static void RemoveTwoPhaseFile(TransactionId xid, bool giveWarning);
     234                 : static void RecreateTwoPhaseFile(TransactionId xid, void *content, int len);
     235                 : 
     236                 : /*
     237                 :  * Initialization of shared memory
     238                 :  */
     239                 : Size
     240 CBC        4564 : TwoPhaseShmemSize(void)
     241                 : {
     242                 :     Size        size;
     243                 : 
     244                 :     /* Need the fixed struct, the array of pointers, and the GTD structs */
     245            4564 :     size = offsetof(TwoPhaseStateData, prepXacts);
     246            4564 :     size = add_size(size, mul_size(max_prepared_xacts,
     247                 :                                    sizeof(GlobalTransaction)));
     248            4564 :     size = MAXALIGN(size);
     249            4564 :     size = add_size(size, mul_size(max_prepared_xacts,
     250                 :                                    sizeof(GlobalTransactionData)));
     251                 : 
     252            4564 :     return size;
     253                 : }
     254                 : 
     255                 : void
     256            1826 : TwoPhaseShmemInit(void)
     257                 : {
     258                 :     bool        found;
     259                 : 
     260            1826 :     TwoPhaseState = ShmemInitStruct("Prepared Transaction Table",
     261                 :                                     TwoPhaseShmemSize(),
     262                 :                                     &found);
     263            1826 :     if (!IsUnderPostmaster)
     264                 :     {
     265                 :         GlobalTransaction gxacts;
     266                 :         int         i;
     267                 : 
     268            1826 :         Assert(!found);
     269            1826 :         TwoPhaseState->freeGXacts = NULL;
     270            1826 :         TwoPhaseState->numPrepXacts = 0;
     271                 : 
     272                 :         /*
     273                 :          * Initialize the linked list of free GlobalTransactionData structs
     274                 :          */
     275            1826 :         gxacts = (GlobalTransaction)
     276            1826 :             ((char *) TwoPhaseState +
     277            1826 :              MAXALIGN(offsetof(TwoPhaseStateData, prepXacts) +
     278                 :                       sizeof(GlobalTransaction) * max_prepared_xacts));
     279            2591 :         for (i = 0; i < max_prepared_xacts; i++)
     280                 :         {
     281                 :             /* insert into linked list */
     282             765 :             gxacts[i].next = TwoPhaseState->freeGXacts;
     283             765 :             TwoPhaseState->freeGXacts = &gxacts[i];
     284                 : 
     285                 :             /* associate it with a PGPROC assigned by InitProcGlobal */
     286             765 :             gxacts[i].pgprocno = PreparedXactProcs[i].pgprocno;
     287                 : 
     288                 :             /*
     289                 :              * Assign a unique ID for each dummy proc, so that the range of
     290                 :              * dummy backend IDs immediately follows the range of normal
     291                 :              * backend IDs. We don't dare to assign a real backend ID to dummy
     292                 :              * procs, because prepared transactions don't take part in cache
     293                 :              * invalidation like a real backend ID would imply, but having a
     294                 :              * unique ID for them is nevertheless handy. This arrangement
     295                 :              * allows you to allocate an array of size (MaxBackends +
     296                 :              * max_prepared_xacts + 1), and have a slot for every backend and
     297                 :              * prepared transaction. Currently multixact.c uses that
     298                 :              * technique.
     299                 :              */
     300             765 :             gxacts[i].dummyBackendId = MaxBackends + 1 + i;
     301                 :         }
     302                 :     }
     303                 :     else
     304 UBC           0 :         Assert(found);
     305 CBC        1826 : }
     306                 : 
     307                 : /*
     308                 :  * Exit hook to unlock the global transaction entry we're working on.
     309                 :  */
     310                 : static void
     311             125 : AtProcExit_Twophase(int code, Datum arg)
     312                 : {
     313                 :     /* same logic as abort */
     314             125 :     AtAbort_Twophase();
     315             125 : }
     316                 : 
     317                 : /*
     318                 :  * Abort hook to unlock the global transaction entry we're working on.
     319                 :  */
     320                 : void
     321           20223 : AtAbort_Twophase(void)
     322                 : {
     323           20223 :     if (MyLockedGxact == NULL)
     324           20221 :         return;
     325                 : 
     326                 :     /*
     327                 :      * What to do with the locked global transaction entry?  If we were in the
     328                 :      * process of preparing the transaction, but haven't written the WAL
     329                 :      * record and state file yet, the transaction must not be considered as
     330                 :      * prepared.  Likewise, if we are in the process of finishing an
     331                 :      * already-prepared transaction, and fail after having already written the
     332                 :      * 2nd phase commit or rollback record to the WAL, the transaction should
     333                 :      * not be considered as prepared anymore.  In those cases, just remove the
     334                 :      * entry from shared memory.
     335                 :      *
     336                 :      * Otherwise, the entry must be left in place so that the transaction can
     337                 :      * be finished later, so just unlock it.
     338                 :      *
     339                 :      * If we abort during prepare, after having written the WAL record, we
     340                 :      * might not have transferred all locks and other state to the prepared
     341                 :      * transaction yet.  Likewise, if we abort during commit or rollback,
     342                 :      * after having written the WAL record, we might not have released all the
     343                 :      * resources held by the transaction yet.  In those cases, the in-memory
     344                 :      * state can be wrong, but it's too late to back out.
     345                 :      */
     346               2 :     LWLockAcquire(TwoPhaseStateLock, LW_EXCLUSIVE);
     347               2 :     if (!MyLockedGxact->valid)
     348               2 :         RemoveGXact(MyLockedGxact);
     349                 :     else
     350 UBC           0 :         MyLockedGxact->locking_backend = InvalidBackendId;
     351 CBC           2 :     LWLockRelease(TwoPhaseStateLock);
     352                 : 
     353               2 :     MyLockedGxact = NULL;
     354                 : }
     355                 : 
     356                 : /*
     357                 :  * This is called after we have finished transferring state to the prepared
     358                 :  * PGPROC entry.
     359                 :  */
     360                 : void
     361             384 : PostPrepare_Twophase(void)
     362                 : {
     363             384 :     LWLockAcquire(TwoPhaseStateLock, LW_EXCLUSIVE);
     364             384 :     MyLockedGxact->locking_backend = InvalidBackendId;
     365             384 :     LWLockRelease(TwoPhaseStateLock);
     366                 : 
     367             384 :     MyLockedGxact = NULL;
     368             384 : }
     369                 : 
     370                 : 
     371                 : /*
     372                 :  * MarkAsPreparing
     373                 :  *      Reserve the GID for the given transaction.
     374                 :  */
     375                 : GlobalTransaction
     376             367 : MarkAsPreparing(TransactionId xid, const char *gid,
     377                 :                 TimestampTz prepared_at, Oid owner, Oid databaseid)
     378                 : {
     379                 :     GlobalTransaction gxact;
     380                 :     int         i;
     381                 : 
     382             367 :     if (strlen(gid) >= GIDSIZE)
     383 UBC           0 :         ereport(ERROR,
     384                 :                 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
     385                 :                  errmsg("transaction identifier \"%s\" is too long",
     386                 :                         gid)));
     387                 : 
     388                 :     /* fail immediately if feature is disabled */
     389 CBC         367 :     if (max_prepared_xacts == 0)
     390               9 :         ereport(ERROR,
     391                 :                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
     392                 :                  errmsg("prepared transactions are disabled"),
     393                 :                  errhint("Set max_prepared_transactions to a nonzero value.")));
     394                 : 
     395                 :     /* on first call, register the exit hook */
     396             358 :     if (!twophaseExitRegistered)
     397                 :     {
     398              65 :         before_shmem_exit(AtProcExit_Twophase, 0);
     399              65 :         twophaseExitRegistered = true;
     400                 :     }
     401                 : 
     402             358 :     LWLockAcquire(TwoPhaseStateLock, LW_EXCLUSIVE);
     403                 : 
     404                 :     /* Check for conflicting GID */
     405             731 :     for (i = 0; i < TwoPhaseState->numPrepXacts; i++)
     406                 :     {
     407             375 :         gxact = TwoPhaseState->prepXacts[i];
     408             375 :         if (strcmp(gxact->gid, gid) == 0)
     409                 :         {
     410               2 :             ereport(ERROR,
     411                 :                     (errcode(ERRCODE_DUPLICATE_OBJECT),
     412                 :                      errmsg("transaction identifier \"%s\" is already in use",
     413                 :                             gid)));
     414                 :         }
     415                 :     }
     416                 : 
     417                 :     /* Get a free gxact from the freelist */
     418             356 :     if (TwoPhaseState->freeGXacts == NULL)
     419 UBC           0 :         ereport(ERROR,
     420                 :                 (errcode(ERRCODE_OUT_OF_MEMORY),
     421                 :                  errmsg("maximum number of prepared transactions reached"),
     422                 :                  errhint("Increase max_prepared_transactions (currently %d).",
     423                 :                          max_prepared_xacts)));
     424 CBC         356 :     gxact = TwoPhaseState->freeGXacts;
     425             356 :     TwoPhaseState->freeGXacts = gxact->next;
     426                 : 
     427             356 :     MarkAsPreparingGuts(gxact, xid, gid, prepared_at, owner, databaseid);
     428                 : 
     429             356 :     gxact->ondisk = false;
     430                 : 
     431                 :     /* And insert it into the active array */
     432             356 :     Assert(TwoPhaseState->numPrepXacts < max_prepared_xacts);
     433             356 :     TwoPhaseState->prepXacts[TwoPhaseState->numPrepXacts++] = gxact;
     434                 : 
     435             356 :     LWLockRelease(TwoPhaseStateLock);
     436                 : 
     437             356 :     return gxact;
     438                 : }
     439                 : 
     440                 : /*
     441                 :  * MarkAsPreparingGuts
     442                 :  *
     443                 :  * This uses a gxact struct and puts it into the active array.
     444                 :  * NOTE: this is also used when reloading a gxact after a crash; so avoid
     445                 :  * assuming that we can use very much backend context.
     446                 :  *
     447                 :  * Note: This function should be called with appropriate locks held.
     448                 :  */
     449                 : static void
     450             386 : MarkAsPreparingGuts(GlobalTransaction gxact, TransactionId xid, const char *gid,
     451                 :                     TimestampTz prepared_at, Oid owner, Oid databaseid)
     452                 : {
     453                 :     PGPROC     *proc;
     454                 :     int         i;
     455                 : 
     456             386 :     Assert(LWLockHeldByMeInMode(TwoPhaseStateLock, LW_EXCLUSIVE));
     457                 : 
     458             386 :     Assert(gxact != NULL);
     459             386 :     proc = &ProcGlobal->allProcs[gxact->pgprocno];
     460                 : 
     461                 :     /* Initialize the PGPROC entry */
     462           42846 :     MemSet(proc, 0, sizeof(PGPROC));
     463             386 :     proc->pgprocno = gxact->pgprocno;
     464 GNC         386 :     dlist_node_init(&proc->links);
     465 CBC         386 :     proc->waitStatus = PROC_WAIT_STATUS_OK;
     466             386 :     if (LocalTransactionIdIsValid(MyProc->lxid))
     467                 :     {
     468                 :         /* clone VXID, for TwoPhaseGetXidByVirtualXID() to find */
     469             356 :         proc->lxid = MyProc->lxid;
     470             356 :         proc->backendId = MyBackendId;
     471                 :     }
     472                 :     else
     473                 :     {
     474              30 :         Assert(AmStartupProcess() || !IsPostmasterEnvironment);
     475                 :         /* GetLockConflicts() uses this to specify a wait on the XID */
     476              30 :         proc->lxid = xid;
     477              30 :         proc->backendId = InvalidBackendId;
     478                 :     }
     479             386 :     proc->xid = xid;
     480             386 :     Assert(proc->xmin == InvalidTransactionId);
     481             386 :     proc->delayChkptFlags = 0;
     482             386 :     proc->statusFlags = 0;
     483             386 :     proc->pid = 0;
     484             386 :     proc->databaseId = databaseid;
     485             386 :     proc->roleId = owner;
     486             386 :     proc->tempNamespaceId = InvalidOid;
     487             386 :     proc->isBackgroundWorker = false;
     488 GNC         386 :     proc->lwWaiting = LW_WS_NOT_WAITING;
     489 CBC         386 :     proc->lwWaitMode = 0;
     490             386 :     proc->waitLock = NULL;
     491             386 :     proc->waitProcLock = NULL;
     492             386 :     pg_atomic_init_u64(&proc->waitStart, 0);
     493            6562 :     for (i = 0; i < NUM_LOCK_PARTITIONS; i++)
     494 GNC        6176 :         dlist_init(&proc->myProcLocks[i]);
     495                 :     /* subxid data must be filled later by GXactLoadSubxactData */
     496 CBC         386 :     proc->subxidStatus.overflowed = false;
     497             386 :     proc->subxidStatus.count = 0;
     498                 : 
     499             386 :     gxact->prepared_at = prepared_at;
     500             386 :     gxact->xid = xid;
     501             386 :     gxact->owner = owner;
     502             386 :     gxact->locking_backend = MyBackendId;
     503             386 :     gxact->valid = false;
     504             386 :     gxact->inredo = false;
     505             386 :     strcpy(gxact->gid, gid);
     506                 : 
     507                 :     /*
     508                 :      * Remember that we have this GlobalTransaction entry locked for us. If we
     509                 :      * abort after this, we must release it.
     510                 :      */
     511             386 :     MyLockedGxact = gxact;
     512             386 : }
     513                 : 
     514                 : /*
     515                 :  * GXactLoadSubxactData
     516                 :  *
     517                 :  * If the transaction being persisted had any subtransactions, this must
     518                 :  * be called before MarkAsPrepared() to load information into the dummy
     519                 :  * PGPROC.
     520                 :  */
     521                 : static void
     522             173 : GXactLoadSubxactData(GlobalTransaction gxact, int nsubxacts,
     523                 :                      TransactionId *children)
     524                 : {
     525             173 :     PGPROC     *proc = &ProcGlobal->allProcs[gxact->pgprocno];
     526                 : 
     527                 :     /* We need no extra lock since the GXACT isn't valid yet */
     528             173 :     if (nsubxacts > PGPROC_MAX_CACHED_SUBXIDS)
     529                 :     {
     530               4 :         proc->subxidStatus.overflowed = true;
     531               4 :         nsubxacts = PGPROC_MAX_CACHED_SUBXIDS;
     532                 :     }
     533             173 :     if (nsubxacts > 0)
     534                 :     {
     535             158 :         memcpy(proc->subxids.xids, children,
     536                 :                nsubxacts * sizeof(TransactionId));
     537             158 :         proc->subxidStatus.count = nsubxacts;
     538                 :     }
     539             173 : }
     540                 : 
     541                 : /*
     542                 :  * MarkAsPrepared
     543                 :  *      Mark the GXACT as fully valid, and enter it into the global ProcArray.
     544                 :  *
     545                 :  * lock_held indicates whether caller already holds TwoPhaseStateLock.
     546                 :  */
     547                 : static void
     548             384 : MarkAsPrepared(GlobalTransaction gxact, bool lock_held)
     549                 : {
     550                 :     /* Lock here may be overkill, but I'm not convinced of that ... */
     551             384 :     if (!lock_held)
     552             354 :         LWLockAcquire(TwoPhaseStateLock, LW_EXCLUSIVE);
     553             384 :     Assert(!gxact->valid);
     554             384 :     gxact->valid = true;
     555             384 :     if (!lock_held)
     556             354 :         LWLockRelease(TwoPhaseStateLock);
     557                 : 
     558                 :     /*
     559                 :      * Put it into the global ProcArray so TransactionIdIsInProgress considers
     560                 :      * the XID as still running.
     561                 :      */
     562             384 :     ProcArrayAdd(&ProcGlobal->allProcs[gxact->pgprocno]);
     563             384 : }
     564                 : 
     565                 : /*
     566                 :  * LockGXact
     567                 :  *      Locate the prepared transaction and mark it busy for COMMIT or PREPARE.
     568                 :  */
     569                 : static GlobalTransaction
     570             369 : LockGXact(const char *gid, Oid user)
     571                 : {
     572                 :     int         i;
     573                 : 
     574                 :     /* on first call, register the exit hook */
     575             369 :     if (!twophaseExitRegistered)
     576                 :     {
     577              60 :         before_shmem_exit(AtProcExit_Twophase, 0);
     578              60 :         twophaseExitRegistered = true;
     579                 :     }
     580                 : 
     581             369 :     LWLockAcquire(TwoPhaseStateLock, LW_EXCLUSIVE);
     582                 : 
     583             658 :     for (i = 0; i < TwoPhaseState->numPrepXacts; i++)
     584                 :     {
     585             652 :         GlobalTransaction gxact = TwoPhaseState->prepXacts[i];
     586             652 :         PGPROC     *proc = &ProcGlobal->allProcs[gxact->pgprocno];
     587                 : 
     588                 :         /* Ignore not-yet-valid GIDs */
     589             652 :         if (!gxact->valid)
     590 UBC           0 :             continue;
     591 CBC         652 :         if (strcmp(gxact->gid, gid) != 0)
     592             289 :             continue;
     593                 : 
     594                 :         /* Found it, but has someone else got it locked? */
     595             363 :         if (gxact->locking_backend != InvalidBackendId)
     596 UBC           0 :             ereport(ERROR,
     597                 :                     (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
     598                 :                      errmsg("prepared transaction with identifier \"%s\" is busy",
     599                 :                             gid)));
     600                 : 
     601 CBC         363 :         if (user != gxact->owner && !superuser_arg(user))
     602 UBC           0 :             ereport(ERROR,
     603                 :                     (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
     604                 :                      errmsg("permission denied to finish prepared transaction"),
     605                 :                      errhint("Must be superuser or the user that prepared the transaction.")));
     606                 : 
     607                 :         /*
     608                 :          * Note: it probably would be possible to allow committing from
     609                 :          * another database; but at the moment NOTIFY is known not to work and
     610                 :          * there may be some other issues as well.  Hence disallow until
     611                 :          * someone gets motivated to make it work.
     612                 :          */
     613 CBC         363 :         if (MyDatabaseId != proc->databaseId)
     614 UBC           0 :             ereport(ERROR,
     615                 :                     (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
     616                 :                      errmsg("prepared transaction belongs to another database"),
     617                 :                      errhint("Connect to the database where the transaction was prepared to finish it.")));
     618                 : 
     619                 :         /* OK for me to lock it */
     620 CBC         363 :         gxact->locking_backend = MyBackendId;
     621             363 :         MyLockedGxact = gxact;
     622                 : 
     623             363 :         LWLockRelease(TwoPhaseStateLock);
     624                 : 
     625             363 :         return gxact;
     626                 :     }
     627                 : 
     628               6 :     LWLockRelease(TwoPhaseStateLock);
     629                 : 
     630               6 :     ereport(ERROR,
     631                 :             (errcode(ERRCODE_UNDEFINED_OBJECT),
     632                 :              errmsg("prepared transaction with identifier \"%s\" does not exist",
     633                 :                     gid)));
     634                 : 
     635                 :     /* NOTREACHED */
     636                 :     return NULL;
     637                 : }
     638                 : 
     639                 : /*
     640                 :  * RemoveGXact
     641                 :  *      Remove the prepared transaction from the shared memory array.
     642                 :  *
     643                 :  * NB: caller should have already removed it from ProcArray
     644                 :  */
     645                 : static void
     646             414 : RemoveGXact(GlobalTransaction gxact)
     647                 : {
     648                 :     int         i;
     649                 : 
     650             414 :     Assert(LWLockHeldByMeInMode(TwoPhaseStateLock, LW_EXCLUSIVE));
     651                 : 
     652             701 :     for (i = 0; i < TwoPhaseState->numPrepXacts; i++)
     653                 :     {
     654             701 :         if (gxact == TwoPhaseState->prepXacts[i])
     655                 :         {
     656                 :             /* remove from the active array */
     657             414 :             TwoPhaseState->numPrepXacts--;
     658             414 :             TwoPhaseState->prepXacts[i] = TwoPhaseState->prepXacts[TwoPhaseState->numPrepXacts];
     659                 : 
     660                 :             /* and put it back in the freelist */
     661             414 :             gxact->next = TwoPhaseState->freeGXacts;
     662             414 :             TwoPhaseState->freeGXacts = gxact;
     663                 : 
     664             414 :             return;
     665                 :         }
     666                 :     }
     667                 : 
     668 UBC           0 :     elog(ERROR, "failed to find %p in GlobalTransaction array", gxact);
     669                 : }
     670                 : 
     671                 : /*
     672                 :  * Returns an array of all prepared transactions for the user-level
     673                 :  * function pg_prepared_xact.
     674                 :  *
     675                 :  * The returned array and all its elements are copies of internal data
     676                 :  * structures, to minimize the time we need to hold the TwoPhaseStateLock.
     677                 :  *
     678                 :  * WARNING -- we return even those transactions that are not fully prepared
     679                 :  * yet.  The caller should filter them out if he doesn't want them.
     680                 :  *
     681                 :  * The returned array is palloc'd.
     682                 :  */
     683                 : static int
     684 CBC          89 : GetPreparedTransactionList(GlobalTransaction *gxacts)
     685                 : {
     686                 :     GlobalTransaction array;
     687                 :     int         num;
     688                 :     int         i;
     689                 : 
     690              89 :     LWLockAcquire(TwoPhaseStateLock, LW_SHARED);
     691                 : 
     692              89 :     if (TwoPhaseState->numPrepXacts == 0)
     693                 :     {
     694              48 :         LWLockRelease(TwoPhaseStateLock);
     695                 : 
     696              48 :         *gxacts = NULL;
     697              48 :         return 0;
     698                 :     }
     699                 : 
     700              41 :     num = TwoPhaseState->numPrepXacts;
     701              41 :     array = (GlobalTransaction) palloc(sizeof(GlobalTransactionData) * num);
     702              41 :     *gxacts = array;
     703              87 :     for (i = 0; i < num; i++)
     704              46 :         memcpy(array + i, TwoPhaseState->prepXacts[i],
     705                 :                sizeof(GlobalTransactionData));
     706                 : 
     707              41 :     LWLockRelease(TwoPhaseStateLock);
     708                 : 
     709              41 :     return num;
     710                 : }
     711                 : 
     712                 : 
     713                 : /* Working status for pg_prepared_xact */
     714                 : typedef struct
     715                 : {
     716                 :     GlobalTransaction array;
     717                 :     int         ngxacts;
     718                 :     int         currIdx;
     719                 : } Working_State;
     720                 : 
     721                 : /*
     722                 :  * pg_prepared_xact
     723                 :  *      Produce a view with one row per prepared transaction.
     724                 :  *
     725                 :  * This function is here so we don't have to export the
     726                 :  * GlobalTransactionData struct definition.
     727                 :  */
     728                 : Datum
     729             135 : pg_prepared_xact(PG_FUNCTION_ARGS)
     730                 : {
     731                 :     FuncCallContext *funcctx;
     732                 :     Working_State *status;
     733                 : 
     734             135 :     if (SRF_IS_FIRSTCALL())
     735                 :     {
     736                 :         TupleDesc   tupdesc;
     737                 :         MemoryContext oldcontext;
     738                 : 
     739                 :         /* create a function context for cross-call persistence */
     740              89 :         funcctx = SRF_FIRSTCALL_INIT();
     741                 : 
     742                 :         /*
     743                 :          * Switch to memory context appropriate for multiple function calls
     744                 :          */
     745              89 :         oldcontext = MemoryContextSwitchTo(funcctx->multi_call_memory_ctx);
     746                 : 
     747                 :         /* build tupdesc for result tuples */
     748                 :         /* this had better match pg_prepared_xacts view in system_views.sql */
     749              89 :         tupdesc = CreateTemplateTupleDesc(5);
     750              89 :         TupleDescInitEntry(tupdesc, (AttrNumber) 1, "transaction",
     751                 :                            XIDOID, -1, 0);
     752              89 :         TupleDescInitEntry(tupdesc, (AttrNumber) 2, "gid",
     753                 :                            TEXTOID, -1, 0);
     754              89 :         TupleDescInitEntry(tupdesc, (AttrNumber) 3, "prepared",
     755                 :                            TIMESTAMPTZOID, -1, 0);
     756              89 :         TupleDescInitEntry(tupdesc, (AttrNumber) 4, "ownerid",
     757                 :                            OIDOID, -1, 0);
     758              89 :         TupleDescInitEntry(tupdesc, (AttrNumber) 5, "dbid",
     759                 :                            OIDOID, -1, 0);
     760                 : 
     761              89 :         funcctx->tuple_desc = BlessTupleDesc(tupdesc);
     762                 : 
     763                 :         /*
     764                 :          * Collect all the 2PC status information that we will format and send
     765                 :          * out as a result set.
     766                 :          */
     767              89 :         status = (Working_State *) palloc(sizeof(Working_State));
     768              89 :         funcctx->user_fctx = (void *) status;
     769                 : 
     770              89 :         status->ngxacts = GetPreparedTransactionList(&status->array);
     771              89 :         status->currIdx = 0;
     772                 : 
     773              89 :         MemoryContextSwitchTo(oldcontext);
     774                 :     }
     775                 : 
     776             135 :     funcctx = SRF_PERCALL_SETUP();
     777             135 :     status = (Working_State *) funcctx->user_fctx;
     778                 : 
     779             135 :     while (status->array != NULL && status->currIdx < status->ngxacts)
     780                 :     {
     781              46 :         GlobalTransaction gxact = &status->array[status->currIdx++];
     782              46 :         PGPROC     *proc = &ProcGlobal->allProcs[gxact->pgprocno];
     783 GNC          46 :         Datum       values[5] = {0};
     784              46 :         bool        nulls[5] = {0};
     785                 :         HeapTuple   tuple;
     786                 :         Datum       result;
     787                 : 
     788 CBC          46 :         if (!gxact->valid)
     789 UBC           0 :             continue;
     790                 : 
     791                 :         /*
     792                 :          * Form tuple with appropriate data.
     793                 :          */
     794 ECB             : 
     795 CBC          46 :         values[0] = TransactionIdGetDatum(proc->xid);
     796              46 :         values[1] = CStringGetTextDatum(gxact->gid);
     797              46 :         values[2] = TimestampTzGetDatum(gxact->prepared_at);
     798 GIC          46 :         values[3] = ObjectIdGetDatum(gxact->owner);
     799 CBC          46 :         values[4] = ObjectIdGetDatum(proc->databaseId);
     800 ECB             : 
     801 CBC          46 :         tuple = heap_form_tuple(funcctx->tuple_desc, values, nulls);
     802 GIC          46 :         result = HeapTupleGetDatum(tuple);
     803              46 :         SRF_RETURN_NEXT(funcctx, result);
     804 ECB             :     }
     805                 : 
     806 GIC          89 :     SRF_RETURN_DONE(funcctx);
     807                 : }
     808                 : 
     809                 : /*
     810                 :  * TwoPhaseGetGXact
     811                 :  *      Get the GlobalTransaction struct for a prepared transaction
     812                 :  *      specified by XID
     813                 :  *
     814                 :  * If lock_held is set to true, TwoPhaseStateLock will not be taken, so the
     815                 :  * caller had better hold it.
     816 ECB             :  */
     817                 : static GlobalTransaction
     818 CBC        1386 : TwoPhaseGetGXact(TransactionId xid, bool lock_held)
     819                 : {
     820 GIC        1386 :     GlobalTransaction result = NULL;
     821                 :     int         i;
     822                 : 
     823                 :     static TransactionId cached_xid = InvalidTransactionId;
     824 ECB             :     static GlobalTransaction cached_gxact = NULL;
     825                 : 
     826 GIC        1386 :     Assert(!lock_held || LWLockHeldByMe(TwoPhaseStateLock));
     827                 : 
     828                 :     /*
     829                 :      * During a recovery, COMMIT PREPARED, or ABORT PREPARED, we'll be called
     830 ECB             :      * repeatedly for the same XID.  We can save work with a simple cache.
     831                 :      */
     832 GIC        1386 :     if (xid == cached_xid)
     833 CBC         933 :         return cached_gxact;
     834 ECB             : 
     835 GIC         453 :     if (!lock_held)
     836 CBC         384 :         LWLockAcquire(TwoPhaseStateLock, LW_SHARED);
     837                 : 
     838             830 :     for (i = 0; i < TwoPhaseState->numPrepXacts; i++)
     839                 :     {
     840             830 :         GlobalTransaction gxact = TwoPhaseState->prepXacts[i];
     841                 : 
     842             830 :         if (gxact->xid == xid)
     843 ECB             :         {
     844 GIC         453 :             result = gxact;
     845             453 :             break;
     846                 :         }
     847 ECB             :     }
     848                 : 
     849 GIC         453 :     if (!lock_held)
     850 CBC         384 :         LWLockRelease(TwoPhaseStateLock);
     851 EUB             : 
     852 GIC         453 :     if (result == NULL)         /* should not happen */
     853 LBC           0 :         elog(ERROR, "failed to find GlobalTransaction for xid %u", xid);
     854 ECB             : 
     855 GIC         453 :     cached_xid = xid;
     856 CBC         453 :     cached_gxact = result;
     857                 : 
     858 GIC         453 :     return result;
     859                 : }
     860                 : 
     861                 : /*
     862                 :  * TwoPhaseGetXidByVirtualXID
     863                 :  *      Lookup VXID among xacts prepared since last startup.
     864                 :  *
     865                 :  * (This won't find recovered xacts.)  If more than one matches, return any
     866                 :  * and set "have_more" to true.  To witness multiple matches, a single
     867                 :  * BackendId must consume 2^32 LXIDs, with no intervening database restart.
     868 ECB             :  */
     869                 : TransactionId
     870 GIC          98 : TwoPhaseGetXidByVirtualXID(VirtualTransactionId vxid,
     871                 :                            bool *have_more)
     872 ECB             : {
     873                 :     int         i;
     874 CBC          98 :     TransactionId result = InvalidTransactionId;
     875 ECB             : 
     876 GIC          98 :     Assert(VirtualTransactionIdIsValid(vxid));
     877 CBC          98 :     LWLockAcquire(TwoPhaseStateLock, LW_SHARED);
     878                 : 
     879             150 :     for (i = 0; i < TwoPhaseState->numPrepXacts; i++)
     880                 :     {
     881 GIC          52 :         GlobalTransaction gxact = TwoPhaseState->prepXacts[i];
     882                 :         PGPROC     *proc;
     883 ECB             :         VirtualTransactionId proc_vxid;
     884                 : 
     885 CBC          52 :         if (!gxact->valid)
     886               2 :             continue;
     887              50 :         proc = &ProcGlobal->allProcs[gxact->pgprocno];
     888 GIC          50 :         GET_VXID_FROM_PGPROC(proc_vxid, *proc);
     889              50 :         if (VirtualTransactionIdEquals(vxid, proc_vxid))
     890 ECB             :         {
     891                 :             /* Startup process sets proc->backendId to InvalidBackendId. */
     892 CBC           9 :             Assert(!gxact->inredo);
     893                 : 
     894 GBC           9 :             if (result != InvalidTransactionId)
     895 EUB             :             {
     896 UIC           0 :                 *have_more = true;
     897 LBC           0 :                 break;
     898                 :             }
     899 GIC           9 :             result = gxact->xid;
     900                 :         }
     901 ECB             :     }
     902                 : 
     903 CBC          98 :     LWLockRelease(TwoPhaseStateLock);
     904                 : 
     905 GIC          98 :     return result;
     906                 : }
     907                 : 
     908                 : /*
     909                 :  * TwoPhaseGetDummyBackendId
     910                 :  *      Get the dummy backend ID for prepared transaction specified by XID
     911                 :  *
     912                 :  * Dummy backend IDs are similar to real backend IDs of real backends.
     913                 :  * They start at MaxBackends + 1, and are unique across all currently active
     914                 :  * real backends and prepared transactions.  If lock_held is set to true,
     915                 :  * TwoPhaseStateLock will not be taken, so the caller had better hold it.
     916 ECB             :  */
     917                 : BackendId
     918 CBC         107 : TwoPhaseGetDummyBackendId(TransactionId xid, bool lock_held)
     919                 : {
     920             107 :     GlobalTransaction gxact = TwoPhaseGetGXact(xid, lock_held);
     921                 : 
     922 GIC         107 :     return gxact->dummyBackendId;
     923                 : }
     924                 : 
     925                 : /*
     926                 :  * TwoPhaseGetDummyProc
     927                 :  *      Get the PGPROC that represents a prepared transaction specified by XID
     928                 :  *
     929                 :  * If lock_held is set to true, TwoPhaseStateLock will not be taken, so the
     930                 :  * caller had better hold it.
     931 ECB             :  */
     932                 : PGPROC *
     933 CBC        1279 : TwoPhaseGetDummyProc(TransactionId xid, bool lock_held)
     934                 : {
     935            1279 :     GlobalTransaction gxact = TwoPhaseGetGXact(xid, lock_held);
     936                 : 
     937 GIC        1279 :     return &ProcGlobal->allProcs[gxact->pgprocno];
     938                 : }
     939                 : 
     940                 : /************************************************************************/
     941                 : /* State file support                                                   */
     942                 : /************************************************************************/
     943                 : 
     944                 : #define TwoPhaseFilePath(path, xid) \
     945                 :     snprintf(path, MAXPGPATH, TWOPHASE_DIR "/%08X", xid)
     946                 : 
     947                 : /*
     948                 :  * 2PC state file format:
     949                 :  *
     950                 :  *  1. TwoPhaseFileHeader
     951                 :  *  2. TransactionId[] (subtransactions)
     952                 :  *  3. RelFileLocator[] (files to be deleted at commit)
     953                 :  *  4. RelFileLocator[] (files to be deleted at abort)
     954                 :  *  5. SharedInvalidationMessage[] (inval messages to be sent at commit)
     955                 :  *  6. TwoPhaseRecordOnDisk
     956                 :  *  7. ...
     957                 :  *  8. TwoPhaseRecordOnDisk (end sentinel, rmid == TWOPHASE_RM_END_ID)
     958                 :  *  9. checksum (CRC-32C)
     959                 :  *
     960                 :  * Each segment except the final checksum is MAXALIGN'd.
     961                 :  */
     962                 : 
     963                 : /*
     964                 :  * Header for a 2PC state file
     965                 :  */
     966                 : #define TWOPHASE_MAGIC  0x57F94534  /* format identifier */
     967                 : 
     968                 : typedef xl_xact_prepare TwoPhaseFileHeader;
     969                 : 
     970                 : /*
     971                 :  * Header for each record in a state file
     972                 :  *
     973                 :  * NOTE: len counts only the rmgr data, not the TwoPhaseRecordOnDisk header.
     974                 :  * The rmgr data will be stored starting on a MAXALIGN boundary.
     975                 :  */
     976                 : typedef struct TwoPhaseRecordOnDisk
     977                 : {
     978                 :     uint32      len;            /* length of rmgr data */
     979                 :     TwoPhaseRmgrId rmid;        /* resource manager for this record */
     980                 :     uint16      info;           /* flag bits for use by rmgr */
     981                 : } TwoPhaseRecordOnDisk;
     982                 : 
     983                 : /*
     984                 :  * During prepare, the state file is assembled in memory before writing it
     985                 :  * to WAL and the actual state file.  We use a chain of StateFileChunk blocks
     986                 :  * for that.
     987                 :  */
     988                 : typedef struct StateFileChunk
     989                 : {
     990                 :     char       *data;
     991                 :     uint32      len;
     992                 :     struct StateFileChunk *next;
     993                 : } StateFileChunk;
     994                 : 
     995                 : static struct xllist
     996                 : {
     997                 :     StateFileChunk *head;       /* first data block in the chain */
     998                 :     StateFileChunk *tail;       /* last block in chain */
     999                 :     uint32      num_chunks;
    1000                 :     uint32      bytes_free;     /* free bytes left in tail block */
    1001                 :     uint32      total_len;      /* total data bytes in chain */
    1002                 : }           records;
    1003                 : 
    1004                 : 
    1005                 : /*
    1006                 :  * Append a block of data to records data structure.
    1007                 :  *
    1008                 :  * NB: each block is padded to a MAXALIGN multiple.  This must be
    1009                 :  * accounted for when the file is later read!
    1010                 :  *
    1011                 :  * The data is copied, so the caller is free to modify it afterwards.
    1012 ECB             :  */
    1013                 : static void
    1014 CBC        3889 : save_state_data(const void *data, uint32 len)
    1015                 : {
    1016            3889 :     uint32      padlen = MAXALIGN(len);
    1017                 : 
    1018            3889 :     if (padlen > records.bytes_free)
    1019 ECB             :     {
    1020 CBC          40 :         records.tail->next = palloc0(sizeof(StateFileChunk));
    1021              40 :         records.tail = records.tail->next;
    1022              40 :         records.tail->len = 0;
    1023 GIC          40 :         records.tail->next = NULL;
    1024 CBC          40 :         records.num_chunks++;
    1025 ECB             : 
    1026 GIC          40 :         records.bytes_free = Max(padlen, 512);
    1027              40 :         records.tail->data = palloc(records.bytes_free);
    1028 ECB             :     }
    1029                 : 
    1030 CBC        3889 :     memcpy(((char *) records.tail->data) + records.tail->len, data, len);
    1031            3889 :     records.tail->len += padlen;
    1032            3889 :     records.bytes_free -= padlen;
    1033 GIC        3889 :     records.total_len += padlen;
    1034            3889 : }
    1035                 : 
    1036                 : /*
    1037                 :  * Start preparing a state file.
    1038                 :  *
    1039                 :  * Initializes data structure and inserts the 2PC file header record.
    1040 ECB             :  */
    1041                 : void
    1042 CBC         356 : StartPrepare(GlobalTransaction gxact)
    1043 ECB             : {
    1044 GIC         356 :     PGPROC     *proc = &ProcGlobal->allProcs[gxact->pgprocno];
    1045             356 :     TransactionId xid = gxact->xid;
    1046                 :     TwoPhaseFileHeader hdr;
    1047                 :     TransactionId *children;
    1048                 :     RelFileLocator *commitrels;
    1049                 :     RelFileLocator *abortrels;
    1050             356 :     xl_xact_stats_item *abortstats = NULL;
    1051             356 :     xl_xact_stats_item *commitstats = NULL;
    1052                 :     SharedInvalidationMessage *invalmsgs;
    1053 ECB             : 
    1054                 :     /* Initialize linked list */
    1055 CBC         356 :     records.head = palloc0(sizeof(StateFileChunk));
    1056 GIC         356 :     records.head->len = 0;
    1057 CBC         356 :     records.head->next = NULL;
    1058 ECB             : 
    1059 GIC         356 :     records.bytes_free = Max(sizeof(TwoPhaseFileHeader), 512);
    1060 CBC         356 :     records.head->data = palloc(records.bytes_free);
    1061 ECB             : 
    1062 GIC         356 :     records.tail = records.head;
    1063 CBC         356 :     records.num_chunks = 1;
    1064                 : 
    1065 GIC         356 :     records.total_len = 0;
    1066 ECB             : 
    1067                 :     /* Create header */
    1068 CBC         356 :     hdr.magic = TWOPHASE_MAGIC;
    1069             356 :     hdr.total_len = 0;          /* EndPrepare will fill this in */
    1070             356 :     hdr.xid = xid;
    1071             356 :     hdr.database = proc->databaseId;
    1072             356 :     hdr.prepared_at = gxact->prepared_at;
    1073             356 :     hdr.owner = gxact->owner;
    1074             356 :     hdr.nsubxacts = xactGetCommittedChildren(&children);
    1075             356 :     hdr.ncommitrels = smgrGetPendingDeletes(true, &commitrels);
    1076             356 :     hdr.nabortrels = smgrGetPendingDeletes(false, &abortrels);
    1077             356 :     hdr.ncommitstats =
    1078             356 :         pgstat_get_transactional_drops(true, &commitstats);
    1079             356 :     hdr.nabortstats =
    1080 GIC         356 :         pgstat_get_transactional_drops(false, &abortstats);
    1081 CBC         356 :     hdr.ninvalmsgs = xactGetCommittedInvalidationMessages(&invalmsgs,
    1082                 :                                                           &hdr.initfileinval);
    1083             356 :     hdr.gidlen = strlen(gxact->gid) + 1; /* Include '\0' */
    1084 ECB             :     /* EndPrepare will fill the origin data, if necessary */
    1085 GIC         356 :     hdr.origin_lsn = InvalidXLogRecPtr;
    1086 CBC         356 :     hdr.origin_timestamp = 0;
    1087 ECB             : 
    1088 GIC         356 :     save_state_data(&hdr, sizeof(TwoPhaseFileHeader));
    1089             356 :     save_state_data(gxact->gid, hdr.gidlen);
    1090                 : 
    1091                 :     /*
    1092                 :      * Add the additional info about subxacts, deletable files and cache
    1093 ECB             :      * invalidation messages.
    1094                 :      */
    1095 CBC         356 :     if (hdr.nsubxacts > 0)
    1096                 :     {
    1097             143 :         save_state_data(children, hdr.nsubxacts * sizeof(TransactionId));
    1098                 :         /* While we have the child-xact data, stuff it in the gxact too */
    1099             143 :         GXactLoadSubxactData(gxact, hdr.nsubxacts, children);
    1100                 :     }
    1101             356 :     if (hdr.ncommitrels > 0)
    1102 ECB             :     {
    1103 GNC           9 :         save_state_data(commitrels, hdr.ncommitrels * sizeof(RelFileLocator));
    1104 CBC           9 :         pfree(commitrels);
    1105                 :     }
    1106             356 :     if (hdr.nabortrels > 0)
    1107 ECB             :     {
    1108 GNC          16 :         save_state_data(abortrels, hdr.nabortrels * sizeof(RelFileLocator));
    1109 CBC          16 :         pfree(abortrels);
    1110                 :     }
    1111             356 :     if (hdr.ncommitstats > 0)
    1112 ECB             :     {
    1113 CBC           9 :         save_state_data(commitstats,
    1114 GIC           9 :                         hdr.ncommitstats * sizeof(xl_xact_stats_item));
    1115 CBC           9 :         pfree(commitstats);
    1116                 :     }
    1117             356 :     if (hdr.nabortstats > 0)
    1118 ECB             :     {
    1119 CBC          12 :         save_state_data(abortstats,
    1120 GIC          12 :                         hdr.nabortstats * sizeof(xl_xact_stats_item));
    1121 CBC          12 :         pfree(abortstats);
    1122                 :     }
    1123             356 :     if (hdr.ninvalmsgs > 0)
    1124 ECB             :     {
    1125 CBC          22 :         save_state_data(invalmsgs,
    1126 GIC          22 :                         hdr.ninvalmsgs * sizeof(SharedInvalidationMessage));
    1127 CBC          22 :         pfree(invalmsgs);
    1128                 :     }
    1129 GIC         356 : }
    1130                 : 
    1131                 : /*
    1132                 :  * Finish preparing state data and writing it to WAL.
    1133 ECB             :  */
    1134                 : void
    1135 GIC         354 : EndPrepare(GlobalTransaction gxact)
    1136                 : {
    1137                 :     TwoPhaseFileHeader *hdr;
    1138                 :     StateFileChunk *record;
    1139                 :     bool        replorigin;
    1140 ECB             : 
    1141                 :     /* Add the end sentinel to the list of 2PC records */
    1142 GIC         354 :     RegisterTwoPhaseRecord(TWOPHASE_RM_END_ID, 0,
    1143                 :                            NULL, 0);
    1144 ECB             : 
    1145                 :     /* Go back and fill in total_len in the file header record */
    1146 CBC         354 :     hdr = (TwoPhaseFileHeader *) records.head->data;
    1147 GIC         354 :     Assert(hdr->magic == TWOPHASE_MAGIC);
    1148 CBC         354 :     hdr->total_len = records.total_len + sizeof(pg_crc32c);
    1149 ECB             : 
    1150 GIC         377 :     replorigin = (replorigin_session_origin != InvalidRepOriginId &&
    1151 CBC          23 :                   replorigin_session_origin != DoNotReplicateId);
    1152                 : 
    1153             354 :     if (replorigin)
    1154 ECB             :     {
    1155 GIC          23 :         hdr->origin_lsn = replorigin_session_origin_lsn;
    1156              23 :         hdr->origin_timestamp = replorigin_session_origin_timestamp;
    1157                 :     }
    1158                 : 
    1159                 :     /*
    1160                 :      * If the data size exceeds MaxAllocSize, we won't be able to read it in
    1161                 :      * ReadTwoPhaseFile. Check for that now, rather than fail in the case
    1162 ECB             :      * where we write data to file and then re-read at commit time.
    1163 EUB             :      */
    1164 GIC         354 :     if (hdr->total_len > MaxAllocSize)
    1165 UIC           0 :         ereport(ERROR,
    1166                 :                 (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
    1167                 :                  errmsg("two-phase state file maximum length exceeded")));
    1168                 : 
    1169                 :     /*
    1170                 :      * Now writing 2PC state data to WAL. We let the WAL's CRC protection
    1171                 :      * cover us, so no need to calculate a separate CRC.
    1172                 :      *
    1173                 :      * We have to set DELAY_CHKPT_START here, too; otherwise a checkpoint
    1174                 :      * starting immediately after the WAL record is inserted could complete
    1175                 :      * without fsync'ing our state file.  (This is essentially the same kind
    1176                 :      * of race condition as the COMMIT-to-clog-write case that
    1177                 :      * RecordTransactionCommit uses DELAY_CHKPT_START for; see notes there.)
    1178                 :      *
    1179                 :      * We save the PREPARE record's location in the gxact for later use by
    1180 ECB             :      * CheckPointTwoPhase.
    1181                 :      */
    1182 CBC         354 :     XLogEnsureRecordSpace(0, records.num_chunks);
    1183                 : 
    1184             354 :     START_CRIT_SECTION();
    1185 ECB             : 
    1186 GIC         354 :     Assert((MyProc->delayChkptFlags & DELAY_CHKPT_START) == 0);
    1187 CBC         354 :     MyProc->delayChkptFlags |= DELAY_CHKPT_START;
    1188 ECB             : 
    1189 CBC         354 :     XLogBeginInsert();
    1190 GIC         748 :     for (record = records.head; record != NULL; record = record->next)
    1191 CBC         394 :         XLogRegisterData(record->data, record->len);
    1192                 : 
    1193             354 :     XLogSetRecordFlags(XLOG_INCLUDE_ORIGIN);
    1194                 : 
    1195             354 :     gxact->prepare_end_lsn = XLogInsert(RM_XACT_ID, XLOG_XACT_PREPARE);
    1196                 : 
    1197 GIC         354 :     if (replorigin)
    1198 ECB             :     {
    1199                 :         /* Move LSNs forward for this replication origin */
    1200 GIC          23 :         replorigin_session_advance(replorigin_session_origin_lsn,
    1201                 :                                    gxact->prepare_end_lsn);
    1202 ECB             :     }
    1203                 : 
    1204 GIC         354 :     XLogFlush(gxact->prepare_end_lsn);
    1205                 : 
    1206                 :     /* If we crash now, we have prepared: WAL replay will fix things */
    1207 ECB             : 
    1208                 :     /* Store record's start location to read that later on Commit */
    1209 GIC         354 :     gxact->prepare_start_lsn = ProcLastRecPtr;
    1210                 : 
    1211                 :     /*
    1212                 :      * Mark the prepared transaction as valid.  As soon as xact.c marks MyProc
    1213                 :      * as not running our XID (which it will do immediately after this
    1214                 :      * function returns), others can commit/rollback the xact.
    1215                 :      *
    1216                 :      * NB: a side effect of this is to make a dummy ProcArray entry for the
    1217                 :      * prepared XID.  This must happen before we clear the XID from MyProc /
    1218                 :      * ProcGlobal->xids[], else there is a window where the XID is not running
    1219                 :      * according to TransactionIdIsInProgress, and onlookers would be entitled
    1220                 :      * to assume the xact crashed.  Instead we have a window where the same
    1221 ECB             :      * XID appears twice in ProcArray, which is OK.
    1222                 :      */
    1223 GIC         354 :     MarkAsPrepared(gxact, false);
    1224                 : 
    1225                 :     /*
    1226                 :      * Now we can mark ourselves as out of the commit critical section: a
    1227                 :      * checkpoint starting after this will certainly see the gxact as a
    1228 ECB             :      * candidate for fsyncing.
    1229                 :      */
    1230 GIC         354 :     MyProc->delayChkptFlags &= ~DELAY_CHKPT_START;
    1231                 : 
    1232                 :     /*
    1233                 :      * Remember that we have this GlobalTransaction entry locked for us.  If
    1234                 :      * we crash after this point, it's too late to abort, but we must unlock
    1235 ECB             :      * it so that the prepared transaction can be committed or rolled back.
    1236                 :      */
    1237 CBC         354 :     MyLockedGxact = gxact;
    1238                 : 
    1239 GIC         354 :     END_CRIT_SECTION();
    1240                 : 
    1241                 :     /*
    1242                 :      * Wait for synchronous replication, if required.
    1243                 :      *
    1244                 :      * Note that at this stage we have marked the prepare, but still show as
    1245 ECB             :      * running in the procarray (twice!) and continue to hold locks.
    1246                 :      */
    1247 CBC         354 :     SyncRepWaitForLSN(gxact->prepare_end_lsn, false);
    1248 ECB             : 
    1249 CBC         354 :     records.tail = records.head = NULL;
    1250 GIC         354 :     records.num_chunks = 0;
    1251             354 : }
    1252                 : 
    1253                 : /*
    1254                 :  * Register a 2PC record to be written to state file.
    1255 ECB             :  */
    1256                 : void
    1257 GIC        1660 : RegisterTwoPhaseRecord(TwoPhaseRmgrId rmid, uint16 info,
    1258                 :                        const void *data, uint32 len)
    1259                 : {
    1260 ECB             :     TwoPhaseRecordOnDisk record;
    1261                 : 
    1262 CBC        1660 :     record.rmid = rmid;
    1263            1660 :     record.info = info;
    1264            1660 :     record.len = len;
    1265            1660 :     save_state_data(&record, sizeof(TwoPhaseRecordOnDisk));
    1266            1660 :     if (len > 0)
    1267 GIC        1306 :         save_state_data(data, len);
    1268            1660 : }
    1269                 : 
    1270                 : 
    1271                 : /*
    1272                 :  * Read and validate the state file for xid.
    1273                 :  *
    1274                 :  * If it looks OK (has a valid magic number and CRC), return the palloc'd
    1275                 :  * contents of the file, issuing an error when finding corrupted data.  If
    1276                 :  * missing_ok is true, which indicates that missing files can be safely
    1277                 :  * ignored, then return NULL.  This state can be reached when doing recovery.
    1278 ECB             :  */
    1279                 : static char *
    1280 GIC          78 : ReadTwoPhaseFile(TransactionId xid, bool missing_ok)
    1281                 : {
    1282                 :     char        path[MAXPGPATH];
    1283                 :     char       *buf;
    1284                 :     TwoPhaseFileHeader *hdr;
    1285                 :     int         fd;
    1286                 :     struct stat stat;
    1287                 :     uint32      crc_offset;
    1288                 :     pg_crc32c   calc_crc,
    1289                 :                 file_crc;
    1290 ECB             :     int         r;
    1291                 : 
    1292 CBC          78 :     TwoPhaseFilePath(path, xid);
    1293 ECB             : 
    1294 GIC          78 :     fd = OpenTransientFile(path, O_RDONLY | PG_BINARY);
    1295 CBC          78 :     if (fd < 0)
    1296 ECB             :     {
    1297 GIC          17 :         if (missing_ok && errno == ENOENT)
    1298 GBC          17 :             return NULL;
    1299                 : 
    1300 UIC           0 :         ereport(ERROR,
    1301                 :                 (errcode_for_file_access(),
    1302                 :                  errmsg("could not open file \"%s\": %m", path)));
    1303                 :     }
    1304                 : 
    1305                 :     /*
    1306                 :      * Check file length.  We can determine a lower bound pretty easily. We
    1307                 :      * set an upper bound to avoid palloc() failure on a corrupt file, though
    1308                 :      * we can't guarantee that we won't get an out of memory error anyway,
    1309 ECB             :      * even on a valid file.
    1310 EUB             :      */
    1311 GIC          61 :     if (fstat(fd, &stat))
    1312 UIC           0 :         ereport(ERROR,
    1313                 :                 (errcode_for_file_access(),
    1314 ECB             :                  errmsg("could not stat file \"%s\": %m", path)));
    1315                 : 
    1316 CBC          61 :     if (stat.st_size < (MAXALIGN(sizeof(TwoPhaseFileHeader)) +
    1317 ECB             :                         MAXALIGN(sizeof(TwoPhaseRecordOnDisk)) +
    1318 GBC          61 :                         sizeof(pg_crc32c)) ||
    1319 GIC          61 :         stat.st_size > MaxAllocSize)
    1320 UIC           0 :         ereport(ERROR,
    1321                 :                 (errcode(ERRCODE_DATA_CORRUPTED),
    1322                 :                  errmsg_plural("incorrect size of file \"%s\": %lld byte",
    1323                 :                                "incorrect size of file \"%s\": %lld bytes",
    1324                 :                                (long long int) stat.st_size, path,
    1325 ECB             :                                (long long int) stat.st_size)));
    1326                 : 
    1327 GBC          61 :     crc_offset = stat.st_size - sizeof(pg_crc32c);
    1328 GIC          61 :     if (crc_offset != MAXALIGN(crc_offset))
    1329 UIC           0 :         ereport(ERROR,
    1330                 :                 (errcode(ERRCODE_DATA_CORRUPTED),
    1331                 :                  errmsg("incorrect alignment of CRC offset for file \"%s\"",
    1332                 :                         path)));
    1333                 : 
    1334                 :     /*
    1335 ECB             :      * OK, slurp in the file.
    1336                 :      */
    1337 CBC          61 :     buf = (char *) palloc(stat.st_size);
    1338 ECB             : 
    1339 CBC          61 :     pgstat_report_wait_start(WAIT_EVENT_TWOPHASE_FILE_READ);
    1340 GIC          61 :     r = read(fd, buf, stat.st_size);
    1341 GBC          61 :     if (r != stat.st_size)
    1342 EUB             :     {
    1343 UIC           0 :         if (r < 0)
    1344               0 :             ereport(ERROR,
    1345                 :                     (errcode_for_file_access(),
    1346 EUB             :                      errmsg("could not read file \"%s\": %m", path)));
    1347                 :         else
    1348 UIC           0 :             ereport(ERROR,
    1349                 :                     (errmsg("could not read file \"%s\": read %d of %lld",
    1350                 :                             path, r, (long long int) stat.st_size)));
    1351 ECB             :     }
    1352                 : 
    1353 CBC          61 :     pgstat_report_wait_end();
    1354 EUB             : 
    1355 GIC          61 :     if (CloseTransientFile(fd) != 0)
    1356 UIC           0 :         ereport(ERROR,
    1357                 :                 (errcode_for_file_access(),
    1358 ECB             :                  errmsg("could not close file \"%s\": %m", path)));
    1359                 : 
    1360 GBC          61 :     hdr = (TwoPhaseFileHeader *) buf;
    1361 GIC          61 :     if (hdr->magic != TWOPHASE_MAGIC)
    1362 UIC           0 :         ereport(ERROR,
    1363                 :                 (errcode(ERRCODE_DATA_CORRUPTED),
    1364                 :                  errmsg("invalid magic number stored in file \"%s\"",
    1365 ECB             :                         path)));
    1366 EUB             : 
    1367 GIC          61 :     if (hdr->total_len != stat.st_size)
    1368 UIC           0 :         ereport(ERROR,
    1369                 :                 (errcode(ERRCODE_DATA_CORRUPTED),
    1370                 :                  errmsg("invalid size stored in file \"%s\"",
    1371 ECB             :                         path)));
    1372                 : 
    1373 CBC          61 :     INIT_CRC32C(calc_crc);
    1374 GIC          61 :     COMP_CRC32C(calc_crc, buf, crc_offset);
    1375 CBC          61 :     FIN_CRC32C(calc_crc);
    1376                 : 
    1377              61 :     file_crc = *((pg_crc32c *) (buf + crc_offset));
    1378 EUB             : 
    1379 GIC          61 :     if (!EQ_CRC32C(calc_crc, file_crc))
    1380 UIC           0 :         ereport(ERROR,
    1381                 :                 (errcode(ERRCODE_DATA_CORRUPTED),
    1382                 :                  errmsg("calculated CRC checksum does not match value stored in file \"%s\"",
    1383 ECB             :                         path)));
    1384                 : 
    1385 GIC          61 :     return buf;
    1386                 : }
    1387                 : 
    1388                 : 
    1389                 : /*
    1390                 :  * Reads 2PC data from xlog. During checkpoint this data will be moved to
    1391                 :  * twophase files and ReadTwoPhaseFile should be used instead.
    1392                 :  *
    1393                 :  * Note clearly that this function can access WAL during normal operation,
    1394                 :  * similarly to the way WALSender or Logical Decoding would do.
    1395 ECB             :  */
    1396                 : static void
    1397 GIC         423 : XlogReadTwoPhaseData(XLogRecPtr lsn, char **buf, int *len)
    1398                 : {
    1399                 :     XLogRecord *record;
    1400                 :     XLogReaderState *xlogreader;
    1401 ECB             :     char       *errormsg;
    1402                 : 
    1403 GIC         423 :     xlogreader = XLogReaderAllocate(wal_segment_size, NULL,
    1404             423 :                                     XL_ROUTINE(.page_read = &read_local_xlog_page,
    1405                 :                                                .segment_open = &wal_segment_open,
    1406 ECB             :                                                .segment_close = &wal_segment_close),
    1407 EUB             :                                     NULL);
    1408 GIC         423 :     if (!xlogreader)
    1409 UIC           0 :         ereport(ERROR,
    1410                 :                 (errcode(ERRCODE_OUT_OF_MEMORY),
    1411                 :                  errmsg("out of memory"),
    1412 ECB             :                  errdetail("Failed while allocating a WAL reading processor.")));
    1413                 : 
    1414 GIC         423 :     XLogBeginRead(xlogreader, lsn);
    1415 CBC         423 :     record = XLogReadRecord(xlogreader, &errormsg);
    1416                 : 
    1417 GBC         423 :     if (record == NULL)
    1418 EUB             :     {
    1419 UIC           0 :         if (errormsg)
    1420               0 :             ereport(ERROR,
    1421                 :                     (errcode_for_file_access(),
    1422                 :                      errmsg("could not read two-phase state from WAL at %X/%X: %s",
    1423 EUB             :                             LSN_FORMAT_ARGS(lsn), errormsg)));
    1424                 :         else
    1425 UIC           0 :             ereport(ERROR,
    1426                 :                     (errcode_for_file_access(),
    1427                 :                      errmsg("could not read two-phase state from WAL at %X/%X",
    1428                 :                             LSN_FORMAT_ARGS(lsn))));
    1429 ECB             :     }
    1430                 : 
    1431 GBC         423 :     if (XLogRecGetRmid(xlogreader) != RM_XACT_ID ||
    1432 GIC         423 :         (XLogRecGetInfo(xlogreader) & XLOG_XACT_OPMASK) != XLOG_XACT_PREPARE)
    1433 UIC           0 :         ereport(ERROR,
    1434                 :                 (errcode_for_file_access(),
    1435                 :                  errmsg("expected two-phase state data is not present in WAL at %X/%X",
    1436 ECB             :                         LSN_FORMAT_ARGS(lsn))));
    1437                 : 
    1438 GIC         423 :     if (len != NULL)
    1439 CBC          21 :         *len = XLogRecGetDataLen(xlogreader);
    1440 ECB             : 
    1441 GIC         423 :     *buf = palloc(sizeof(char) * XLogRecGetDataLen(xlogreader));
    1442 CBC         423 :     memcpy(*buf, XLogRecGetData(xlogreader), sizeof(char) * XLogRecGetDataLen(xlogreader));
    1443 ECB             : 
    1444 GIC         423 :     XLogReaderFree(xlogreader);
    1445             423 : }
    1446                 : 
    1447                 : 
    1448                 : /*
    1449                 :  * Confirms an xid is prepared, during recovery
    1450 ECB             :  */
    1451                 : bool
    1452 GIC          17 : StandbyTransactionIdIsPrepared(TransactionId xid)
    1453                 : {
    1454                 :     char       *buf;
    1455                 :     TwoPhaseFileHeader *hdr;
    1456 ECB             :     bool        result;
    1457                 : 
    1458 CBC          17 :     Assert(TransactionIdIsValid(xid));
    1459 EUB             : 
    1460 GIC          17 :     if (max_prepared_xacts <= 0)
    1461 UIC           0 :         return false;           /* nothing to do */
    1462 ECB             : 
    1463                 :     /* Read and validate file */
    1464 CBC          17 :     buf = ReadTwoPhaseFile(xid, true);
    1465 GIC          17 :     if (buf == NULL)
    1466              17 :         return false;
    1467 EUB             : 
    1468                 :     /* Check header also */
    1469 UBC           0 :     hdr = (TwoPhaseFileHeader *) buf;
    1470 UIC           0 :     result = TransactionIdEquals(hdr->xid, xid);
    1471 UBC           0 :     pfree(buf);
    1472                 : 
    1473 UIC           0 :     return result;
    1474                 : }
    1475                 : 
    1476                 : /*
    1477                 :  * FinishPreparedTransaction: execute COMMIT PREPARED or ROLLBACK PREPARED
    1478 ECB             :  */
    1479                 : void
    1480 GIC         369 : FinishPreparedTransaction(const char *gid, bool isCommit)
    1481                 : {
    1482                 :     GlobalTransaction gxact;
    1483                 :     PGPROC     *proc;
    1484                 :     TransactionId xid;
    1485                 :     char       *buf;
    1486                 :     char       *bufptr;
    1487                 :     TwoPhaseFileHeader *hdr;
    1488                 :     TransactionId latestXid;
    1489                 :     TransactionId *children;
    1490                 :     RelFileLocator *commitrels;
    1491                 :     RelFileLocator *abortrels;
    1492                 :     RelFileLocator *delrels;
    1493                 :     int         ndelrels;
    1494                 :     xl_xact_stats_item *commitstats;
    1495                 :     xl_xact_stats_item *abortstats;
    1496                 :     SharedInvalidationMessage *invalmsgs;
    1497                 : 
    1498                 :     /*
    1499                 :      * Validate the GID, and lock the GXACT to ensure that two backends do not
    1500 ECB             :      * try to commit the same GID at once.
    1501                 :      */
    1502 CBC         369 :     gxact = LockGXact(gid, GetUserId());
    1503 GIC         363 :     proc = &ProcGlobal->allProcs[gxact->pgprocno];
    1504             363 :     xid = gxact->xid;
    1505                 : 
    1506                 :     /*
    1507                 :      * Read and validate 2PC state data. State data will typically be stored
    1508                 :      * in WAL files if the LSN is after the last checkpoint record, or moved
    1509 ECB             :      * to disk if for some reason they have lived for a long time.
    1510                 :      */
    1511 GIC         363 :     if (gxact->ondisk)
    1512 CBC          26 :         buf = ReadTwoPhaseFile(xid, false);
    1513                 :     else
    1514 GIC         337 :         XlogReadTwoPhaseData(gxact->prepare_start_lsn, &buf, NULL);
    1515                 : 
    1516                 : 
    1517                 :     /*
    1518 ECB             :      * Disassemble the header area
    1519                 :      */
    1520 CBC         363 :     hdr = (TwoPhaseFileHeader *) buf;
    1521             363 :     Assert(TransactionIdEquals(hdr->xid, xid));
    1522             363 :     bufptr = buf + MAXALIGN(sizeof(TwoPhaseFileHeader));
    1523             363 :     bufptr += MAXALIGN(hdr->gidlen);
    1524             363 :     children = (TransactionId *) bufptr;
    1525             363 :     bufptr += MAXALIGN(hdr->nsubxacts * sizeof(TransactionId));
    1526 GNC         363 :     commitrels = (RelFileLocator *) bufptr;
    1527             363 :     bufptr += MAXALIGN(hdr->ncommitrels * sizeof(RelFileLocator));
    1528             363 :     abortrels = (RelFileLocator *) bufptr;
    1529             363 :     bufptr += MAXALIGN(hdr->nabortrels * sizeof(RelFileLocator));
    1530 CBC         363 :     commitstats = (xl_xact_stats_item *) bufptr;
    1531             363 :     bufptr += MAXALIGN(hdr->ncommitstats * sizeof(xl_xact_stats_item));
    1532             363 :     abortstats = (xl_xact_stats_item *) bufptr;
    1533             363 :     bufptr += MAXALIGN(hdr->nabortstats * sizeof(xl_xact_stats_item));
    1534 GIC         363 :     invalmsgs = (SharedInvalidationMessage *) bufptr;
    1535             363 :     bufptr += MAXALIGN(hdr->ninvalmsgs * sizeof(SharedInvalidationMessage));
    1536 ECB             : 
    1537                 :     /* compute latestXid among all children */
    1538 GIC         363 :     latestXid = TransactionIdLatest(xid, hdr->nsubxacts, children);
    1539 ECB             : 
    1540                 :     /* Prevent cancel/die interrupt while cleaning up */
    1541 GIC         363 :     HOLD_INTERRUPTS();
    1542                 : 
    1543                 :     /*
    1544                 :      * The order of operations here is critical: make the XLOG entry for
    1545                 :      * commit or abort, then mark the transaction committed or aborted in
    1546                 :      * pg_xact, then remove its PGPROC from the global ProcArray (which means
    1547                 :      * TransactionIdIsInProgress will stop saying the prepared xact is in
    1548                 :      * progress), then run the post-commit or post-abort callbacks. The
    1549 ECB             :      * callbacks will release the locks the transaction held.
    1550                 :      */
    1551 GIC         363 :     if (isCommit)
    1552             325 :         RecordTransactionCommitPrepared(xid,
    1553                 :                                         hdr->nsubxacts, children,
    1554                 :                                         hdr->ncommitrels, commitrels,
    1555                 :                                         hdr->ncommitstats,
    1556 ECB             :                                         commitstats,
    1557                 :                                         hdr->ninvalmsgs, invalmsgs,
    1558 CBC         325 :                                         hdr->initfileinval, gid);
    1559                 :     else
    1560 GIC          38 :         RecordTransactionAbortPrepared(xid,
    1561                 :                                        hdr->nsubxacts, children,
    1562                 :                                        hdr->nabortrels, abortrels,
    1563                 :                                        hdr->nabortstats,
    1564                 :                                        abortstats,
    1565 ECB             :                                        gid);
    1566                 : 
    1567 GIC         363 :     ProcArrayRemove(proc, latestXid);
    1568                 : 
    1569                 :     /*
    1570                 :      * In case we fail while running the callbacks, mark the gxact invalid so
    1571                 :      * no one else will try to commit/rollback, and so it will be recycled if
    1572                 :      * we fail after this point.  It is still locked by our backend so it
    1573                 :      * won't go away yet.
    1574                 :      *
    1575 ECB             :      * (We assume it's safe to do this without taking TwoPhaseStateLock.)
    1576                 :      */
    1577 GIC         363 :     gxact->valid = false;
    1578                 : 
    1579                 :     /*
    1580                 :      * We have to remove any files that were supposed to be dropped. For
    1581                 :      * consistency with the regular xact.c code paths, must do this before
    1582                 :      * releasing locks, so do it before running the callbacks.
    1583                 :      *
    1584 ECB             :      * NB: this code knows that we couldn't be dropping any temp rels ...
    1585                 :      */
    1586 CBC         363 :     if (isCommit)
    1587 ECB             :     {
    1588 GIC         325 :         delrels = commitrels;
    1589             325 :         ndelrels = hdr->ncommitrels;
    1590                 :     }
    1591 ECB             :     else
    1592                 :     {
    1593 GIC          38 :         delrels = abortrels;
    1594              38 :         ndelrels = hdr->nabortrels;
    1595                 :     }
    1596 ECB             : 
    1597                 :     /* Make sure files supposed to be dropped are dropped */
    1598 CBC         363 :     DropRelationFiles(delrels, ndelrels, false);
    1599 ECB             : 
    1600 GIC         363 :     if (isCommit)
    1601 CBC         325 :         pgstat_execute_transactional_drops(hdr->ncommitstats, commitstats, false);
    1602                 :     else
    1603 GIC          38 :         pgstat_execute_transactional_drops(hdr->nabortstats, abortstats, false);
    1604                 : 
    1605                 :     /*
    1606                 :      * Handle cache invalidation messages.
    1607                 :      *
    1608                 :      * Relcache init file invalidation requires processing both before and
    1609                 :      * after we send the SI messages, only when committing.  See
    1610 ECB             :      * AtEOXact_Inval().
    1611                 :      */
    1612 CBC         363 :     if (isCommit)
    1613 EUB             :     {
    1614 CBC         325 :         if (hdr->initfileinval)
    1615 LBC           0 :             RelationCacheInitFilePreInvalidate();
    1616 GBC         325 :         SendSharedInvalidMessages(invalmsgs, hdr->ninvalmsgs);
    1617 GIC         325 :         if (hdr->initfileinval)
    1618 UIC           0 :             RelationCacheInitFilePostInvalidate();
    1619                 :     }
    1620                 : 
    1621                 :     /*
    1622                 :      * Acquire the two-phase lock.  We want to work on the two-phase callbacks
    1623                 :      * while holding it to avoid potential conflicts with other transactions
    1624                 :      * attempting to use the same GID, so the lock is released once the shared
    1625 ECB             :      * memory state is cleared.
    1626                 :      */
    1627 GIC         363 :     LWLockAcquire(TwoPhaseStateLock, LW_EXCLUSIVE);
    1628 ECB             : 
    1629                 :     /* And now do the callbacks */
    1630 GIC         363 :     if (isCommit)
    1631 CBC         325 :         ProcessRecords(bufptr, xid, twophase_postcommit_callbacks);
    1632                 :     else
    1633              38 :         ProcessRecords(bufptr, xid, twophase_postabort_callbacks);
    1634                 : 
    1635 GIC         363 :     PredicateLockTwoPhaseFinish(xid, isCommit);
    1636 ECB             : 
    1637                 :     /* Clear shared memory state */
    1638 GIC         363 :     RemoveGXact(gxact);
    1639                 : 
    1640                 :     /*
    1641                 :      * Release the lock as all callbacks are called and shared memory cleanup
    1642 ECB             :      * is done.
    1643                 :      */
    1644 GIC         363 :     LWLockRelease(TwoPhaseStateLock);
    1645 ECB             : 
    1646                 :     /* Count the prepared xact as committed or aborted */
    1647 GIC         363 :     AtEOXact_PgStat(isCommit, false);
    1648                 : 
    1649                 :     /*
    1650 ECB             :      * And now we can clean up any files we may have left.
    1651                 :      */
    1652 GIC         363 :     if (gxact->ondisk)
    1653 CBC          26 :         RemoveTwoPhaseFile(xid, true);
    1654                 : 
    1655             363 :     MyLockedGxact = NULL;
    1656                 : 
    1657             363 :     RESUME_INTERRUPTS();
    1658 ECB             : 
    1659 GIC         363 :     pfree(buf);
    1660             363 : }
    1661                 : 
    1662                 : /*
    1663                 :  * Scan 2PC state data in memory and call the indicated callbacks for each 2PC record.
    1664 ECB             :  */
    1665                 : static void
    1666 GIC         393 : ProcessRecords(char *bufptr, TransactionId xid,
    1667                 :                const TwoPhaseCallback callbacks[])
    1668 ECB             : {
    1669                 :     for (;;)
    1670 GIC        1495 :     {
    1671 CBC        1888 :         TwoPhaseRecordOnDisk *record = (TwoPhaseRecordOnDisk *) bufptr;
    1672 ECB             : 
    1673 CBC        1888 :         Assert(record->rmid <= TWOPHASE_RM_MAX_ID);
    1674 GIC        1888 :         if (record->rmid == TWOPHASE_RM_END_ID)
    1675 CBC         393 :             break;
    1676                 : 
    1677            1495 :         bufptr += MAXALIGN(sizeof(TwoPhaseRecordOnDisk));
    1678 ECB             : 
    1679 GIC        1495 :         if (callbacks[record->rmid] != NULL)
    1680            1422 :             callbacks[record->rmid] (xid, record->info,
    1681 ECB             :                                      (void *) bufptr, record->len);
    1682                 : 
    1683 CBC        1495 :         bufptr += MAXALIGN(record->len);
    1684                 :     }
    1685 GIC         393 : }
    1686                 : 
    1687                 : /*
    1688                 :  * Remove the 2PC file for the specified XID.
    1689                 :  *
    1690                 :  * If giveWarning is false, do not complain about file-not-present;
    1691                 :  * this is an expected case during WAL replay.
    1692 ECB             :  */
    1693                 : static void
    1694 GIC          28 : RemoveTwoPhaseFile(TransactionId xid, bool giveWarning)
    1695                 : {
    1696 ECB             :     char        path[MAXPGPATH];
    1697                 : 
    1698 GBC          28 :     TwoPhaseFilePath(path, xid);
    1699              28 :     if (unlink(path))
    1700 UIC           0 :         if (errno != ENOENT || giveWarning)
    1701               0 :             ereport(WARNING,
    1702 ECB             :                     (errcode_for_file_access(),
    1703                 :                      errmsg("could not remove file \"%s\": %m", path)));
    1704 GIC          28 : }
    1705                 : 
    1706                 : /*
    1707                 :  * Recreates a state file. This is used in WAL replay and during
    1708                 :  * checkpoint creation.
    1709                 :  *
    1710                 :  * Note: content and len don't include CRC.
    1711 ECB             :  */
    1712                 : static void
    1713 GIC          21 : RecreateTwoPhaseFile(TransactionId xid, void *content, int len)
    1714                 : {
    1715                 :     char        path[MAXPGPATH];
    1716                 :     pg_crc32c   statefile_crc;
    1717                 :     int         fd;
    1718 ECB             : 
    1719                 :     /* Recompute CRC */
    1720 CBC          21 :     INIT_CRC32C(statefile_crc);
    1721 GIC          21 :     COMP_CRC32C(statefile_crc, content, len);
    1722 CBC          21 :     FIN_CRC32C(statefile_crc);
    1723                 : 
    1724              21 :     TwoPhaseFilePath(path, xid);
    1725                 : 
    1726              21 :     fd = OpenTransientFile(path,
    1727 EUB             :                            O_CREAT | O_TRUNC | O_WRONLY | PG_BINARY);
    1728 GIC          21 :     if (fd < 0)
    1729 UIC           0 :         ereport(ERROR,
    1730                 :                 (errcode_for_file_access(),
    1731                 :                  errmsg("could not recreate file \"%s\": %m", path)));
    1732 ECB             : 
    1733                 :     /* Write content and CRC */
    1734 CBC          21 :     errno = 0;
    1735 GIC          21 :     pgstat_report_wait_start(WAIT_EVENT_TWOPHASE_FILE_WRITE);
    1736              21 :     if (write(fd, content, len) != len)
    1737 EUB             :     {
    1738                 :         /* if write didn't set errno, assume problem is no disk space */
    1739 UBC           0 :         if (errno == 0)
    1740 UIC           0 :             errno = ENOSPC;
    1741               0 :         ereport(ERROR,
    1742                 :                 (errcode_for_file_access(),
    1743 ECB             :                  errmsg("could not write file \"%s\": %m", path)));
    1744                 :     }
    1745 GIC          21 :     if (write(fd, &statefile_crc, sizeof(pg_crc32c)) != sizeof(pg_crc32c))
    1746 EUB             :     {
    1747                 :         /* if write didn't set errno, assume problem is no disk space */
    1748 UBC           0 :         if (errno == 0)
    1749 UIC           0 :             errno = ENOSPC;
    1750               0 :         ereport(ERROR,
    1751                 :                 (errcode_for_file_access(),
    1752 ECB             :                  errmsg("could not write file \"%s\": %m", path)));
    1753                 :     }
    1754 GIC          21 :     pgstat_report_wait_end();
    1755                 : 
    1756                 :     /*
    1757                 :      * We must fsync the file because the end-of-replay checkpoint will not do
    1758 ECB             :      * so, there being no GXACT in shared memory yet to tell it to.
    1759                 :      */
    1760 GBC          21 :     pgstat_report_wait_start(WAIT_EVENT_TWOPHASE_FILE_SYNC);
    1761 GIC          21 :     if (pg_fsync(fd) != 0)
    1762 UIC           0 :         ereport(ERROR,
    1763 ECB             :                 (errcode_for_file_access(),
    1764                 :                  errmsg("could not fsync file \"%s\": %m", path)));
    1765 CBC          21 :     pgstat_report_wait_end();
    1766 EUB             : 
    1767 GIC          21 :     if (CloseTransientFile(fd) != 0)
    1768 UIC           0 :         ereport(ERROR,
    1769 ECB             :                 (errcode_for_file_access(),
    1770                 :                  errmsg("could not close file \"%s\": %m", path)));
    1771 GIC          21 : }
    1772                 : 
    1773                 : /*
    1774                 :  * CheckPointTwoPhase -- handle 2PC component of checkpointing.
    1775                 :  *
    1776                 :  * We must fsync the state file of any GXACT that is valid or has been
    1777                 :  * generated during redo and has a PREPARE LSN <= the checkpoint's redo
    1778                 :  * horizon.  (If the gxact isn't valid yet, has not been generated in
    1779                 :  * redo, or has a later LSN, this checkpoint is not responsible for
    1780                 :  * fsyncing it.)
    1781                 :  *
    1782                 :  * This is deliberately run as late as possible in the checkpoint sequence,
    1783                 :  * because GXACTs ordinarily have short lifespans, and so it is quite
    1784                 :  * possible that GXACTs that were valid at checkpoint start will no longer
    1785                 :  * exist if we wait a little bit. With typical checkpoint settings this
    1786                 :  * will be about 3 minutes for an online checkpoint, so as a result we
    1787                 :  * expect that there will be no GXACTs that need to be copied to disk.
    1788                 :  *
    1789                 :  * If a GXACT remains valid across multiple checkpoints, it will already
    1790                 :  * be on disk so we don't bother to repeat that write.
    1791 ECB             :  */
    1792                 : void
    1793 GIC        2363 : CheckPointTwoPhase(XLogRecPtr redo_horizon)
    1794 ECB             : {
    1795                 :     int         i;
    1796 CBC        2363 :     int         serialized_xacts = 0;
    1797 ECB             : 
    1798 GIC        2363 :     if (max_prepared_xacts <= 0)
    1799            2197 :         return;                 /* nothing to do */
    1800                 : 
    1801                 :     TRACE_POSTGRESQL_TWOPHASE_CHECKPOINT_START();
    1802                 : 
    1803                 :     /*
    1804                 :      * We are expecting there to be zero GXACTs that need to be copied to
    1805                 :      * disk, so we perform all I/O while holding TwoPhaseStateLock for
    1806                 :      * simplicity. This prevents any new xacts from preparing while this
    1807                 :      * occurs, which shouldn't be a problem since the presence of long-lived
    1808                 :      * prepared xacts indicates the transaction manager isn't active.
    1809                 :      *
    1810                 :      * It's also possible to move I/O out of the lock, but on every error we
    1811                 :      * should check whether somebody committed our transaction in different
    1812                 :      * backend. Let's leave this optimization for future, if somebody will
    1813                 :      * spot that this place cause bottleneck.
    1814                 :      *
    1815                 :      * Note that it isn't possible for there to be a GXACT with a
    1816                 :      * prepare_end_lsn set prior to the last checkpoint yet is marked invalid,
    1817 ECB             :      * because of the efforts with delayChkptFlags.
    1818                 :      */
    1819 GIC         166 :     LWLockAcquire(TwoPhaseStateLock, LW_SHARED);
    1820             193 :     for (i = 0; i < TwoPhaseState->numPrepXacts; i++)
    1821                 :     {
    1822                 :         /*
    1823                 :          * Note that we are using gxact not PGPROC so this works in recovery
    1824 ECB             :          * also
    1825                 :          */
    1826 CBC          27 :         GlobalTransaction gxact = TwoPhaseState->prepXacts[i];
    1827 ECB             : 
    1828 CBC          27 :         if ((gxact->valid || gxact->inredo) &&
    1829 GIC          27 :             !gxact->ondisk &&
    1830              23 :             gxact->prepare_end_lsn <= redo_horizon)
    1831                 :         {
    1832                 :             char       *buf;
    1833 ECB             :             int         len;
    1834                 : 
    1835 CBC          21 :             XlogReadTwoPhaseData(gxact->prepare_start_lsn, &buf, &len);
    1836              21 :             RecreateTwoPhaseFile(gxact->xid, buf, len);
    1837              21 :             gxact->ondisk = true;
    1838              21 :             gxact->prepare_start_lsn = InvalidXLogRecPtr;
    1839              21 :             gxact->prepare_end_lsn = InvalidXLogRecPtr;
    1840 GIC          21 :             pfree(buf);
    1841              21 :             serialized_xacts++;
    1842 ECB             :         }
    1843                 :     }
    1844 GIC         166 :     LWLockRelease(TwoPhaseStateLock);
    1845                 : 
    1846                 :     /*
    1847                 :      * Flush unconditionally the parent directory to make any information
    1848                 :      * durable on disk.  Two-phase files could have been removed and those
    1849                 :      * removals need to be made persistent as well as any files newly created
    1850 ECB             :      * previously since the last checkpoint.
    1851                 :      */
    1852 GIC         166 :     fsync_fname(TWOPHASE_DIR, true);
    1853                 : 
    1854 ECB             :     TRACE_POSTGRESQL_TWOPHASE_CHECKPOINT_DONE();
    1855                 : 
    1856 GIC         166 :     if (log_checkpoints && serialized_xacts > 0)
    1857              17 :         ereport(LOG,
    1858                 :                 (errmsg_plural("%u two-phase state file was written "
    1859                 :                                "for a long-running prepared transaction",
    1860                 :                                "%u two-phase state files were written "
    1861                 :                                "for long-running prepared transactions",
    1862                 :                                serialized_xacts,
    1863                 :                                serialized_xacts)));
    1864                 : }
    1865                 : 
    1866                 : /*
    1867                 :  * restoreTwoPhaseData
    1868                 :  *
    1869                 :  * Scan pg_twophase and fill TwoPhaseState depending on the on-disk data.
    1870                 :  * This is called once at the beginning of recovery, saving any extra
    1871                 :  * lookups in the future.  Two-phase files that are newer than the
    1872                 :  * minimum XID horizon are discarded on the way.
    1873 ECB             :  */
    1874                 : void
    1875 GIC        1176 : restoreTwoPhaseData(void)
    1876                 : {
    1877                 :     DIR        *cldir;
    1878 ECB             :     struct dirent *clde;
    1879                 : 
    1880 CBC        1176 :     LWLockAcquire(TwoPhaseStateLock, LW_EXCLUSIVE);
    1881 GIC        1176 :     cldir = AllocateDir(TWOPHASE_DIR);
    1882 CBC        3539 :     while ((clde = ReadDir(cldir, TWOPHASE_DIR)) != NULL)
    1883 ECB             :     {
    1884 GIC        2363 :         if (strlen(clde->d_name) == 8 &&
    1885              11 :             strspn(clde->d_name, "0123456789ABCDEF") == 8)
    1886                 :         {
    1887                 :             TransactionId xid;
    1888 ECB             :             char       *buf;
    1889                 : 
    1890 CBC          11 :             xid = (TransactionId) strtoul(clde->d_name, NULL, 16);
    1891                 : 
    1892              11 :             buf = ProcessTwoPhaseBuffer(xid, InvalidXLogRecPtr,
    1893 EUB             :                                         true, false, false);
    1894 GIC          11 :             if (buf == NULL)
    1895 LBC           0 :                 continue;
    1896                 : 
    1897 GIC          11 :             PrepareRedoAdd(buf, InvalidXLogRecPtr,
    1898                 :                            InvalidXLogRecPtr, InvalidRepOriginId);
    1899 ECB             :         }
    1900                 :     }
    1901 CBC        1176 :     LWLockRelease(TwoPhaseStateLock);
    1902 GIC        1176 :     FreeDir(cldir);
    1903            1176 : }
    1904                 : 
    1905                 : /*
    1906                 :  * PrescanPreparedTransactions
    1907                 :  *
    1908                 :  * Scan the shared memory entries of TwoPhaseState and determine the range
    1909                 :  * of valid XIDs present.  This is run during database startup, after we
    1910                 :  * have completed reading WAL.  ShmemVariableCache->nextXid has been set to
    1911                 :  * one more than the highest XID for which evidence exists in WAL.
    1912                 :  *
    1913                 :  * We throw away any prepared xacts with main XID beyond nextXid --- if any
    1914                 :  * are present, it suggests that the DBA has done a PITR recovery to an
    1915                 :  * earlier point in time without cleaning out pg_twophase.  We dare not
    1916                 :  * try to recover such prepared xacts since they likely depend on database
    1917                 :  * state that doesn't exist now.
    1918                 :  *
    1919                 :  * However, we will advance nextXid beyond any subxact XIDs belonging to
    1920                 :  * valid prepared xacts.  We need to do this since subxact commit doesn't
    1921                 :  * write a WAL entry, and so there might be no evidence in WAL of those
    1922                 :  * subxact XIDs.
    1923                 :  *
    1924                 :  * On corrupted two-phase files, fail immediately.  Keeping around broken
    1925                 :  * entries and let replay continue causes harm on the system, and a new
    1926                 :  * backup should be rolled in.
    1927                 :  *
    1928                 :  * Our other responsibility is to determine and return the oldest valid XID
    1929                 :  * among the prepared xacts (if none, return ShmemVariableCache->nextXid).
    1930                 :  * This is needed to synchronize pg_subtrans startup properly.
    1931                 :  *
    1932                 :  * If xids_p and nxids_p are not NULL, pointer to a palloc'd array of all
    1933                 :  * top-level xids is stored in *xids_p. The number of entries in the array
    1934                 :  * is returned in *nxids_p.
    1935 ECB             :  */
    1936                 : TransactionId
    1937 CBC        1177 : PrescanPreparedTransactions(TransactionId **xids_p, int *nxids_p)
    1938 ECB             : {
    1939 CBC        1177 :     FullTransactionId nextXid = ShmemVariableCache->nextXid;
    1940            1177 :     TransactionId origNextXid = XidFromFullTransactionId(nextXid);
    1941            1177 :     TransactionId result = origNextXid;
    1942            1177 :     TransactionId *xids = NULL;
    1943 GIC        1177 :     int         nxids = 0;
    1944            1177 :     int         allocsize = 0;
    1945 ECB             :     int         i;
    1946                 : 
    1947 GIC        1177 :     LWLockAcquire(TwoPhaseStateLock, LW_EXCLUSIVE);
    1948            1219 :     for (i = 0; i < TwoPhaseState->numPrepXacts; i++)
    1949                 :     {
    1950 ECB             :         TransactionId xid;
    1951                 :         char       *buf;
    1952 CBC          42 :         GlobalTransaction gxact = TwoPhaseState->prepXacts[i];
    1953                 : 
    1954              42 :         Assert(gxact->inredo);
    1955                 : 
    1956              42 :         xid = gxact->xid;
    1957                 : 
    1958              42 :         buf = ProcessTwoPhaseBuffer(xid,
    1959                 :                                     gxact->prepare_start_lsn,
    1960              42 :                                     gxact->ondisk, false, true);
    1961 EUB             : 
    1962 GIC          42 :         if (buf == NULL)
    1963 UIC           0 :             continue;
    1964                 : 
    1965                 :         /*
    1966                 :          * OK, we think this file is valid.  Incorporate xid into the
    1967 ECB             :          * running-minimum result.
    1968                 :          */
    1969 GIC          42 :         if (TransactionIdPrecedes(xid, result))
    1970 CBC          36 :             result = xid;
    1971                 : 
    1972              42 :         if (xids_p)
    1973                 :         {
    1974              12 :             if (nxids == allocsize)
    1975                 :             {
    1976              10 :                 if (nxids == 0)
    1977 ECB             :                 {
    1978 GIC          10 :                     allocsize = 10;
    1979              10 :                     xids = palloc(allocsize * sizeof(TransactionId));
    1980                 :                 }
    1981 EUB             :                 else
    1982                 :                 {
    1983 UIC           0 :                     allocsize = allocsize * 2;
    1984               0 :                     xids = repalloc(xids, allocsize * sizeof(TransactionId));
    1985 ECB             :                 }
    1986                 :             }
    1987 GIC          12 :             xids[nxids++] = xid;
    1988 ECB             :         }
    1989                 : 
    1990 CBC          42 :         pfree(buf);
    1991                 :     }
    1992            1177 :     LWLockRelease(TwoPhaseStateLock);
    1993                 : 
    1994            1177 :     if (xids_p)
    1995 ECB             :     {
    1996 GIC          35 :         *xids_p = xids;
    1997              35 :         *nxids_p = nxids;
    1998 ECB             :     }
    1999                 : 
    2000 GIC        1177 :     return result;
    2001                 : }
    2002                 : 
    2003                 : /*
    2004                 :  * StandbyRecoverPreparedTransactions
    2005                 :  *
    2006                 :  * Scan the shared memory entries of TwoPhaseState and setup all the required
    2007                 :  * information to allow standby queries to treat prepared transactions as still
    2008                 :  * active.
    2009                 :  *
    2010                 :  * This is never called at the end of recovery - we use
    2011                 :  * RecoverPreparedTransactions() at that point.
    2012                 :  *
    2013                 :  * The lack of calls to SubTransSetParent() calls here is by design;
    2014                 :  * those calls are made by RecoverPreparedTransactions() at the end of recovery
    2015                 :  * for those xacts that need this.
    2016 ECB             :  */
    2017                 : void
    2018 GIC          35 : StandbyRecoverPreparedTransactions(void)
    2019                 : {
    2020 ECB             :     int         i;
    2021                 : 
    2022 GIC          35 :     LWLockAcquire(TwoPhaseStateLock, LW_EXCLUSIVE);
    2023              47 :     for (i = 0; i < TwoPhaseState->numPrepXacts; i++)
    2024                 :     {
    2025 ECB             :         TransactionId xid;
    2026                 :         char       *buf;
    2027 CBC          12 :         GlobalTransaction gxact = TwoPhaseState->prepXacts[i];
    2028                 : 
    2029              12 :         Assert(gxact->inredo);
    2030                 : 
    2031              12 :         xid = gxact->xid;
    2032                 : 
    2033              12 :         buf = ProcessTwoPhaseBuffer(xid,
    2034 ECB             :                                     gxact->prepare_start_lsn,
    2035 CBC          12 :                                     gxact->ondisk, false, false);
    2036 GIC          12 :         if (buf != NULL)
    2037 CBC          12 :             pfree(buf);
    2038 ECB             :     }
    2039 GIC          35 :     LWLockRelease(TwoPhaseStateLock);
    2040              35 : }
    2041                 : 
    2042                 : /*
    2043                 :  * RecoverPreparedTransactions
    2044                 :  *
    2045                 :  * Scan the shared memory entries of TwoPhaseState and reload the state for
    2046                 :  * each prepared transaction (reacquire locks, etc).
    2047                 :  *
    2048                 :  * This is run at the end of recovery, but before we allow backends to write
    2049                 :  * WAL.
    2050                 :  *
    2051                 :  * At the end of recovery the way we take snapshots will change. We now need
    2052                 :  * to mark all running transactions with their full SubTransSetParent() info
    2053                 :  * to allow normal snapshots to work correctly if snapshots overflow.
    2054                 :  * We do this here because by definition prepared transactions are the only
    2055                 :  * type of write transaction still running, so this is necessary and
    2056                 :  * complete.
    2057 ECB             :  */
    2058                 : void
    2059 GIC        1142 : RecoverPreparedTransactions(void)
    2060                 : {
    2061 ECB             :     int         i;
    2062                 : 
    2063 GIC        1142 :     LWLockAcquire(TwoPhaseStateLock, LW_EXCLUSIVE);
    2064            1172 :     for (i = 0; i < TwoPhaseState->numPrepXacts; i++)
    2065                 :     {
    2066 ECB             :         TransactionId xid;
    2067                 :         char       *buf;
    2068 GIC          30 :         GlobalTransaction gxact = TwoPhaseState->prepXacts[i];
    2069                 :         char       *bufptr;
    2070                 :         TwoPhaseFileHeader *hdr;
    2071                 :         TransactionId *subxids;
    2072 ECB             :         const char *gid;
    2073                 : 
    2074 GIC          30 :         xid = gxact->xid;
    2075                 : 
    2076                 :         /*
    2077                 :          * Reconstruct subtrans state for the transaction --- needed because
    2078                 :          * pg_subtrans is not preserved over a restart.  Note that we are
    2079                 :          * linking all the subtransactions directly to the top-level XID;
    2080                 :          * there may originally have been a more complex hierarchy, but
    2081                 :          * there's no need to restore that exactly. It's possible that
    2082                 :          * SubTransSetParent has been set before, if the prepared transaction
    2083 ECB             :          * generated xid assignment records.
    2084                 :          */
    2085 CBC          30 :         buf = ProcessTwoPhaseBuffer(xid,
    2086 ECB             :                                     gxact->prepare_start_lsn,
    2087 GBC          30 :                                     gxact->ondisk, true, false);
    2088 GIC          30 :         if (buf == NULL)
    2089 LBC           0 :             continue;
    2090                 : 
    2091 GIC          30 :         ereport(LOG,
    2092 ECB             :                 (errmsg("recovering prepared transaction %u from shared memory", xid)));
    2093                 : 
    2094 CBC          30 :         hdr = (TwoPhaseFileHeader *) buf;
    2095              30 :         Assert(TransactionIdEquals(hdr->xid, xid));
    2096              30 :         bufptr = buf + MAXALIGN(sizeof(TwoPhaseFileHeader));
    2097              30 :         gid = (const char *) bufptr;
    2098              30 :         bufptr += MAXALIGN(hdr->gidlen);
    2099              30 :         subxids = (TransactionId *) bufptr;
    2100              30 :         bufptr += MAXALIGN(hdr->nsubxacts * sizeof(TransactionId));
    2101 GNC          30 :         bufptr += MAXALIGN(hdr->ncommitrels * sizeof(RelFileLocator));
    2102              30 :         bufptr += MAXALIGN(hdr->nabortrels * sizeof(RelFileLocator));
    2103 CBC          30 :         bufptr += MAXALIGN(hdr->ncommitstats * sizeof(xl_xact_stats_item));
    2104 GIC          30 :         bufptr += MAXALIGN(hdr->nabortstats * sizeof(xl_xact_stats_item));
    2105              30 :         bufptr += MAXALIGN(hdr->ninvalmsgs * sizeof(SharedInvalidationMessage));
    2106                 : 
    2107                 :         /*
    2108                 :          * Recreate its GXACT and dummy PGPROC. But, check whether it was
    2109 ECB             :          * added in redo and already has a shmem entry for it.
    2110                 :          */
    2111 GIC          30 :         MarkAsPreparingGuts(gxact, xid, gid,
    2112                 :                             hdr->prepared_at,
    2113                 :                             hdr->owner, hdr->database);
    2114 ECB             : 
    2115                 :         /* recovered, so reset the flag for entries generated by redo */
    2116 CBC          30 :         gxact->inredo = false;
    2117 ECB             : 
    2118 GIC          30 :         GXactLoadSubxactData(gxact, hdr->nsubxacts, subxids);
    2119 CBC          30 :         MarkAsPrepared(gxact, true);
    2120                 : 
    2121 GIC          30 :         LWLockRelease(TwoPhaseStateLock);
    2122                 : 
    2123                 :         /*
    2124 ECB             :          * Recover other state (notably locks) using resource managers.
    2125                 :          */
    2126 GIC          30 :         ProcessRecords(bufptr, xid, twophase_recover_callbacks);
    2127                 : 
    2128                 :         /*
    2129                 :          * Release locks held by the standby process after we process each
    2130                 :          * prepared transaction. As a result, we don't need too many
    2131 ECB             :          * additional locks at any one time.
    2132                 :          */
    2133 GIC          30 :         if (InHotStandby)
    2134               6 :             StandbyReleaseLockTree(xid, hdr->nsubxacts, subxids);
    2135                 : 
    2136                 :         /*
    2137                 :          * We're done with recovering this transaction. Clear MyLockedGxact,
    2138 ECB             :          * like we do in PrepareTransaction() during normal operation.
    2139                 :          */
    2140 CBC          30 :         PostPrepare_Twophase();
    2141                 : 
    2142              30 :         pfree(buf);
    2143                 : 
    2144 GIC          30 :         LWLockAcquire(TwoPhaseStateLock, LW_EXCLUSIVE);
    2145 ECB             :     }
    2146                 : 
    2147 GIC        1142 :     LWLockRelease(TwoPhaseStateLock);
    2148            1142 : }
    2149                 : 
    2150                 : /*
    2151                 :  * ProcessTwoPhaseBuffer
    2152                 :  *
    2153                 :  * Given a transaction id, read it either from disk or read it directly
    2154                 :  * via shmem xlog record pointer using the provided "prepare_start_lsn".
    2155                 :  *
    2156                 :  * If setParent is true, set up subtransaction parent linkages.
    2157                 :  *
    2158                 :  * If setNextXid is true, set ShmemVariableCache->nextXid to the newest
    2159                 :  * value scanned.
    2160 ECB             :  */
    2161                 : static char *
    2162 GIC          95 : ProcessTwoPhaseBuffer(TransactionId xid,
    2163                 :                       XLogRecPtr prepare_start_lsn,
    2164                 :                       bool fromdisk,
    2165 ECB             :                       bool setParent, bool setNextXid)
    2166                 : {
    2167 GIC          95 :     FullTransactionId nextXid = ShmemVariableCache->nextXid;
    2168              95 :     TransactionId origNextXid = XidFromFullTransactionId(nextXid);
    2169                 :     TransactionId *subxids;
    2170                 :     char       *buf;
    2171                 :     TwoPhaseFileHeader *hdr;
    2172 ECB             :     int         i;
    2173                 : 
    2174 CBC          95 :     Assert(LWLockHeldByMeInMode(TwoPhaseStateLock, LW_EXCLUSIVE));
    2175 ECB             : 
    2176 GIC          95 :     if (!fromdisk)
    2177              60 :         Assert(prepare_start_lsn != InvalidXLogRecPtr);
    2178 ECB             : 
    2179                 :     /* Already processed? */
    2180 GBC          95 :     if (TransactionIdDidCommit(xid) || TransactionIdDidAbort(xid))
    2181                 :     {
    2182 UBC           0 :         if (fromdisk)
    2183                 :         {
    2184 UIC           0 :             ereport(WARNING,
    2185 EUB             :                     (errmsg("removing stale two-phase state file for transaction %u",
    2186                 :                             xid)));
    2187 UIC           0 :             RemoveTwoPhaseFile(xid, true);
    2188                 :         }
    2189 EUB             :         else
    2190                 :         {
    2191 UIC           0 :             ereport(WARNING,
    2192 EUB             :                     (errmsg("removing stale two-phase state from memory for transaction %u",
    2193                 :                             xid)));
    2194 UBC           0 :             PrepareRedoRemove(xid, true);
    2195                 :         }
    2196 UIC           0 :         return NULL;
    2197                 :     }
    2198 ECB             : 
    2199                 :     /* Reject XID if too new */
    2200 GBC          95 :     if (TransactionIdFollowsOrEquals(xid, origNextXid))
    2201                 :     {
    2202 UBC           0 :         if (fromdisk)
    2203                 :         {
    2204 UIC           0 :             ereport(WARNING,
    2205 EUB             :                     (errmsg("removing future two-phase state file for transaction %u",
    2206                 :                             xid)));
    2207 UIC           0 :             RemoveTwoPhaseFile(xid, true);
    2208                 :         }
    2209 EUB             :         else
    2210                 :         {
    2211 UIC           0 :             ereport(WARNING,
    2212 EUB             :                     (errmsg("removing future two-phase state from memory for transaction %u",
    2213                 :                             xid)));
    2214 UBC           0 :             PrepareRedoRemove(xid, true);
    2215                 :         }
    2216 UIC           0 :         return NULL;
    2217 ECB             :     }
    2218                 : 
    2219 GIC          95 :     if (fromdisk)
    2220 ECB             :     {
    2221                 :         /* Read and validate file */
    2222 GIC          35 :         buf = ReadTwoPhaseFile(xid, false);
    2223                 :     }
    2224                 :     else
    2225 ECB             :     {
    2226                 :         /* Read xlog data */
    2227 GIC          60 :         XlogReadTwoPhaseData(prepare_start_lsn, &buf, NULL);
    2228                 :     }
    2229 ECB             : 
    2230                 :     /* Deconstruct header */
    2231 GIC          95 :     hdr = (TwoPhaseFileHeader *) buf;
    2232 GBC          95 :     if (!TransactionIdEquals(hdr->xid, xid))
    2233 EUB             :     {
    2234 UIC           0 :         if (fromdisk)
    2235               0 :             ereport(ERROR,
    2236                 :                     (errcode(ERRCODE_DATA_CORRUPTED),
    2237                 :                      errmsg("corrupted two-phase state file for transaction %u",
    2238 EUB             :                             xid)));
    2239                 :         else
    2240 UIC           0 :             ereport(ERROR,
    2241                 :                     (errcode(ERRCODE_DATA_CORRUPTED),
    2242                 :                      errmsg("corrupted two-phase state in memory for transaction %u",
    2243                 :                             xid)));
    2244                 :     }
    2245                 : 
    2246                 :     /*
    2247                 :      * Examine subtransaction XIDs ... they should all follow main XID, and
    2248 ECB             :      * they may force us to advance nextXid.
    2249                 :      */
    2250 CBC          95 :     subxids = (TransactionId *) (buf +
    2251              95 :                                  MAXALIGN(sizeof(TwoPhaseFileHeader)) +
    2252 GIC          95 :                                  MAXALIGN(hdr->gidlen));
    2253 CBC        1858 :     for (i = 0; i < hdr->nsubxacts; i++)
    2254                 :     {
    2255            1763 :         TransactionId subxid = subxids[i];
    2256                 : 
    2257 GIC        1763 :         Assert(TransactionIdFollows(subxid, xid));
    2258 ECB             : 
    2259                 :         /* update nextXid if needed */
    2260 GIC        1763 :         if (setNextXid)
    2261 CBC         813 :             AdvanceNextFullTransactionIdPastXid(subxid);
    2262 ECB             : 
    2263 GIC        1763 :         if (setParent)
    2264             345 :             SubTransSetParent(subxid, xid);
    2265 ECB             :     }
    2266                 : 
    2267 GIC          95 :     return buf;
    2268                 : }
    2269                 : 
    2270                 : 
    2271                 : /*
    2272                 :  *  RecordTransactionCommitPrepared
    2273                 :  *
    2274                 :  * This is basically the same as RecordTransactionCommit (q.v. if you change
    2275                 :  * this function): in particular, we must set DELAY_CHKPT_START to avoid a
    2276                 :  * race condition.
    2277                 :  *
    2278                 :  * We know the transaction made at least one XLOG entry (its PREPARE),
    2279                 :  * so it is never possible to optimize out the commit record.
    2280 ECB             :  */
    2281                 : static void
    2282 GIC         325 : RecordTransactionCommitPrepared(TransactionId xid,
    2283                 :                                 int nchildren,
    2284                 :                                 TransactionId *children,
    2285                 :                                 int nrels,
    2286                 :                                 RelFileLocator *rels,
    2287                 :                                 int nstats,
    2288                 :                                 xl_xact_stats_item *stats,
    2289                 :                                 int ninvalmsgs,
    2290                 :                                 SharedInvalidationMessage *invalmsgs,
    2291                 :                                 bool initfileinval,
    2292                 :                                 const char *gid)
    2293 ECB             : {
    2294                 :     XLogRecPtr  recptr;
    2295 GIC         325 :     TimestampTz committs = GetCurrentTimestamp();
    2296                 :     bool        replorigin;
    2297                 : 
    2298                 :     /*
    2299                 :      * Are we using the replication origins feature?  Or, in other words, are
    2300 ECB             :      * we replaying remote actions?
    2301                 :      */
    2302 GIC         345 :     replorigin = (replorigin_session_origin != InvalidRepOriginId &&
    2303 CBC          20 :                   replorigin_session_origin != DoNotReplicateId);
    2304                 : 
    2305 GIC         325 :     START_CRIT_SECTION();
    2306 ECB             : 
    2307                 :     /* See notes in RecordTransactionCommit */
    2308 GIC         325 :     Assert((MyProc->delayChkptFlags & DELAY_CHKPT_START) == 0);
    2309             325 :     MyProc->delayChkptFlags |= DELAY_CHKPT_START;
    2310                 : 
    2311                 :     /*
    2312                 :      * Emit the XLOG commit record. Note that we mark 2PC commits as
    2313                 :      * potentially having AccessExclusiveLocks since we don't know whether or
    2314 ECB             :      * not they do.
    2315                 :      */
    2316 GIC         325 :     recptr = XactLogCommitRecord(committs,
    2317                 :                                  nchildren, children, nrels, rels,
    2318                 :                                  nstats, stats,
    2319 ECB             :                                  ninvalmsgs, invalmsgs,
    2320                 :                                  initfileinval,
    2321 GIC         325 :                                  MyXactFlags | XACT_FLAGS_ACQUIREDACCESSEXCLUSIVELOCK,
    2322                 :                                  xid, gid);
    2323 ECB             : 
    2324                 : 
    2325 CBC         325 :     if (replorigin)
    2326                 :         /* Move LSNs forward for this replication origin */
    2327 GIC          20 :         replorigin_session_advance(replorigin_session_origin_lsn,
    2328                 :                                    XactLastRecEnd);
    2329                 : 
    2330                 :     /*
    2331                 :      * Record commit timestamp.  The value comes from plain commit timestamp
    2332                 :      * if replorigin is not enabled, or replorigin already set a value for us
    2333                 :      * in replorigin_session_origin_timestamp otherwise.
    2334                 :      *
    2335                 :      * We don't need to WAL-log anything here, as the commit record written
    2336 ECB             :      * above already contains the data.
    2337                 :      */
    2338 GIC         325 :     if (!replorigin || replorigin_session_origin_timestamp == 0)
    2339 CBC         305 :         replorigin_session_origin_timestamp = committs;
    2340                 : 
    2341 GIC         325 :     TransactionTreeSetCommitTsData(xid, nchildren, children,
    2342                 :                                    replorigin_session_origin_timestamp,
    2343                 :                                    replorigin_session_origin);
    2344                 : 
    2345                 :     /*
    2346                 :      * We don't currently try to sleep before flush here ... nor is there any
    2347                 :      * support for async commit of a prepared xact (the very idea is probably
    2348                 :      * a contradiction)
    2349                 :      */
    2350 ECB             : 
    2351                 :     /* Flush XLOG to disk */
    2352 GIC         325 :     XLogFlush(recptr);
    2353 ECB             : 
    2354                 :     /* Mark the transaction committed in pg_xact */
    2355 GIC         325 :     TransactionIdCommitTree(xid, nchildren, children);
    2356 ECB             : 
    2357                 :     /* Checkpoint can proceed now */
    2358 CBC         325 :     MyProc->delayChkptFlags &= ~DELAY_CHKPT_START;
    2359                 : 
    2360 GIC         325 :     END_CRIT_SECTION();
    2361                 : 
    2362                 :     /*
    2363                 :      * Wait for synchronous replication, if required.
    2364                 :      *
    2365                 :      * Note that at this stage we have marked clog, but still show as running
    2366 ECB             :      * in the procarray and continue to hold locks.
    2367                 :      */
    2368 GIC         325 :     SyncRepWaitForLSN(recptr, true);
    2369             325 : }
    2370                 : 
    2371                 : /*
    2372                 :  *  RecordTransactionAbortPrepared
    2373                 :  *
    2374                 :  * This is basically the same as RecordTransactionAbort.
    2375                 :  *
    2376                 :  * We know the transaction made at least one XLOG entry (its PREPARE),
    2377                 :  * so it is never possible to optimize out the abort record.
    2378 ECB             :  */
    2379                 : static void
    2380 GIC          38 : RecordTransactionAbortPrepared(TransactionId xid,
    2381                 :                                int nchildren,
    2382                 :                                TransactionId *children,
    2383                 :                                int nrels,
    2384                 :                                RelFileLocator *rels,
    2385                 :                                int nstats,
    2386                 :                                xl_xact_stats_item *stats,
    2387                 :                                const char *gid)
    2388                 : {
    2389                 :     XLogRecPtr  recptr;
    2390                 :     bool        replorigin;
    2391                 : 
    2392                 :     /*
    2393                 :      * Are we using the replication origins feature?  Or, in other words, are
    2394 ECB             :      * we replaying remote actions?
    2395                 :      */
    2396 GIC          44 :     replorigin = (replorigin_session_origin != InvalidRepOriginId &&
    2397               6 :                   replorigin_session_origin != DoNotReplicateId);
    2398                 : 
    2399                 :     /*
    2400                 :      * Catch the scenario where we aborted partway through
    2401 ECB             :      * RecordTransactionCommitPrepared ...
    2402 EUB             :      */
    2403 GIC          38 :     if (TransactionIdDidCommit(xid))
    2404 UIC           0 :         elog(PANIC, "cannot abort transaction %u, it was already committed",
    2405 ECB             :              xid);
    2406                 : 
    2407 GIC          38 :     START_CRIT_SECTION();
    2408                 : 
    2409                 :     /*
    2410                 :      * Emit the XLOG commit record. Note that we mark 2PC aborts as
    2411                 :      * potentially having AccessExclusiveLocks since we don't know whether or
    2412 ECB             :      * not they do.
    2413                 :      */
    2414 GIC          38 :     recptr = XactLogAbortRecord(GetCurrentTimestamp(),
    2415                 :                                 nchildren, children,
    2416 ECB             :                                 nrels, rels,
    2417                 :                                 nstats, stats,
    2418 GIC          38 :                                 MyXactFlags | XACT_FLAGS_ACQUIREDACCESSEXCLUSIVELOCK,
    2419 ECB             :                                 xid, gid);
    2420                 : 
    2421 CBC          38 :     if (replorigin)
    2422                 :         /* Move LSNs forward for this replication origin */
    2423 GIC           6 :         replorigin_session_advance(replorigin_session_origin_lsn,
    2424                 :                                    XactLastRecEnd);
    2425 ECB             : 
    2426                 :     /* Always flush, since we're about to remove the 2PC state file */
    2427 GIC          38 :     XLogFlush(recptr);
    2428                 : 
    2429                 :     /*
    2430                 :      * Mark the transaction aborted in clog.  This is not absolutely necessary
    2431 ECB             :      * but we may as well do it while we are here.
    2432                 :      */
    2433 CBC          38 :     TransactionIdAbortTree(xid, nchildren, children);
    2434                 : 
    2435 GIC          38 :     END_CRIT_SECTION();
    2436                 : 
    2437                 :     /*
    2438                 :      * Wait for synchronous replication, if required.
    2439                 :      *
    2440                 :      * Note that at this stage we have marked clog, but still show as running
    2441 ECB             :      * in the procarray and continue to hold locks.
    2442                 :      */
    2443 GIC          38 :     SyncRepWaitForLSN(recptr, false);
    2444              38 : }
    2445                 : 
    2446                 : /*
    2447                 :  * PrepareRedoAdd
    2448                 :  *
    2449                 :  * Store pointers to the start/end of the WAL record along with the xid in
    2450                 :  * a gxact entry in shared memory TwoPhaseState structure.  If caller
    2451                 :  * specifies InvalidXLogRecPtr as WAL location to fetch the two-phase
    2452                 :  * data, the entry is marked as located on disk.
    2453 ECB             :  */
    2454                 : void
    2455 GIC          80 : PrepareRedoAdd(char *buf, XLogRecPtr start_lsn,
    2456 ECB             :                XLogRecPtr end_lsn, RepOriginId origin_id)
    2457                 : {
    2458 GIC          80 :     TwoPhaseFileHeader *hdr = (TwoPhaseFileHeader *) buf;
    2459                 :     char       *bufptr;
    2460                 :     const char *gid;
    2461 ECB             :     GlobalTransaction gxact;
    2462                 : 
    2463 GIC          80 :     Assert(LWLockHeldByMeInMode(TwoPhaseStateLock, LW_EXCLUSIVE));
    2464 CBC          80 :     Assert(RecoveryInProgress());
    2465 ECB             : 
    2466 GIC          80 :     bufptr = buf + MAXALIGN(sizeof(TwoPhaseFileHeader));
    2467              80 :     gid = (const char *) bufptr;
    2468                 : 
    2469                 :     /*
    2470                 :      * Reserve the GID for the given transaction in the redo code path.
    2471                 :      *
    2472                 :      * This creates a gxact struct and puts it into the active array.
    2473                 :      *
    2474                 :      * In redo, this struct is mainly used to track PREPARE/COMMIT entries in
    2475                 :      * shared memory. Hence, we only fill up the bare minimum contents here.
    2476                 :      * The gxact also gets marked with gxact->inredo set to true to indicate
    2477                 :      * that it got added in the redo phase
    2478                 :      */
    2479 ECB             : 
    2480 EUB             :     /* Get a free gxact from the freelist */
    2481 GIC          80 :     if (TwoPhaseState->freeGXacts == NULL)
    2482 UIC           0 :         ereport(ERROR,
    2483                 :                 (errcode(ERRCODE_OUT_OF_MEMORY),
    2484                 :                  errmsg("maximum number of prepared transactions reached"),
    2485 ECB             :                  errhint("Increase max_prepared_transactions (currently %d).",
    2486                 :                          max_prepared_xacts)));
    2487 GIC          80 :     gxact = TwoPhaseState->freeGXacts;
    2488 CBC          80 :     TwoPhaseState->freeGXacts = gxact->next;
    2489 ECB             : 
    2490 CBC          80 :     gxact->prepared_at = hdr->prepared_at;
    2491              80 :     gxact->prepare_start_lsn = start_lsn;
    2492              80 :     gxact->prepare_end_lsn = end_lsn;
    2493              80 :     gxact->xid = hdr->xid;
    2494              80 :     gxact->owner = hdr->owner;
    2495              80 :     gxact->locking_backend = InvalidBackendId;
    2496              80 :     gxact->valid = false;
    2497              80 :     gxact->ondisk = XLogRecPtrIsInvalid(start_lsn);
    2498 GIC          80 :     gxact->inredo = true;        /* yes, added in redo */
    2499              80 :     strcpy(gxact->gid, gid);
    2500 ECB             : 
    2501                 :     /* And insert it into the active array */
    2502 GIC          80 :     Assert(TwoPhaseState->numPrepXacts < max_prepared_xacts);
    2503 CBC          80 :     TwoPhaseState->prepXacts[TwoPhaseState->numPrepXacts++] = gxact;
    2504                 : 
    2505 GIC          80 :     if (origin_id != InvalidRepOriginId)
    2506 ECB             :     {
    2507                 :         /* recover apply progress */
    2508 GIC          13 :         replorigin_advance(origin_id, hdr->origin_lsn, end_lsn,
    2509                 :                            false /* backward */ , false /* WAL */ );
    2510 ECB             :     }
    2511                 : 
    2512 GIC          80 :     elog(DEBUG2, "added 2PC data in shared memory for transaction %u", gxact->xid);
    2513              80 : }
    2514                 : 
    2515                 : /*
    2516                 :  * PrepareRedoRemove
    2517                 :  *
    2518                 :  * Remove the corresponding gxact entry from TwoPhaseState. Also remove
    2519                 :  * the 2PC file if a prepared transaction was saved via an earlier checkpoint.
    2520                 :  *
    2521                 :  * Caller must hold TwoPhaseStateLock in exclusive mode, because TwoPhaseState
    2522                 :  * is updated.
    2523 ECB             :  */
    2524                 : void
    2525 CBC          58 : PrepareRedoRemove(TransactionId xid, bool giveWarning)
    2526                 : {
    2527              58 :     GlobalTransaction gxact = NULL;
    2528                 :     int         i;
    2529              58 :     bool        found = false;
    2530 ECB             : 
    2531 GIC          58 :     Assert(LWLockHeldByMeInMode(TwoPhaseStateLock, LW_EXCLUSIVE));
    2532 CBC          58 :     Assert(RecoveryInProgress());
    2533                 : 
    2534              58 :     for (i = 0; i < TwoPhaseState->numPrepXacts; i++)
    2535                 :     {
    2536              49 :         gxact = TwoPhaseState->prepXacts[i];
    2537                 : 
    2538              49 :         if (gxact->xid == xid)
    2539 ECB             :         {
    2540 CBC          49 :             Assert(gxact->inredo);
    2541 GIC          49 :             found = true;
    2542              49 :             break;
    2543                 :         }
    2544                 :     }
    2545                 : 
    2546                 :     /*
    2547 ECB             :      * Just leave if there is nothing, this is expected during WAL replay.
    2548                 :      */
    2549 GIC          58 :     if (!found)
    2550               9 :         return;
    2551                 : 
    2552                 :     /*
    2553 ECB             :      * And now we can clean up any files we may have left.
    2554                 :      */
    2555 CBC          49 :     elog(DEBUG2, "removing 2PC data for transaction %u", xid);
    2556              49 :     if (gxact->ondisk)
    2557 GIC           2 :         RemoveTwoPhaseFile(xid, giveWarning);
    2558              49 :     RemoveGXact(gxact);
    2559                 : }
    2560                 : 
    2561                 : /*
    2562                 :  * LookupGXact
    2563                 :  *      Check if the prepared transaction with the given GID, lsn and timestamp
    2564                 :  *      exists.
    2565                 :  *
    2566                 :  * Note that we always compare with the LSN where prepare ends because that is
    2567                 :  * what is stored as origin_lsn in the 2PC file.
    2568                 :  *
    2569                 :  * This function is primarily used to check if the prepared transaction
    2570                 :  * received from the upstream (remote node) already exists. Checking only GID
    2571                 :  * is not sufficient because a different prepared xact with the same GID can
    2572                 :  * exist on the same node. So, we are ensuring to match origin_lsn and
    2573                 :  * origin_timestamp of prepared xact to avoid the possibility of a match of
    2574                 :  * prepared xact from two different nodes.
    2575 ECB             :  */
    2576                 : bool
    2577 GIC           5 : LookupGXact(const char *gid, XLogRecPtr prepare_end_lsn,
    2578                 :             TimestampTz origin_prepare_timestamp)
    2579 ECB             : {
    2580                 :     int         i;
    2581 CBC           5 :     bool        found = false;
    2582 ECB             : 
    2583 GIC           5 :     LWLockAcquire(TwoPhaseStateLock, LW_SHARED);
    2584 CBC           5 :     for (i = 0; i < TwoPhaseState->numPrepXacts; i++)
    2585                 :     {
    2586 GIC           5 :         GlobalTransaction gxact = TwoPhaseState->prepXacts[i];
    2587 ECB             : 
    2588                 :         /* Ignore not-yet-valid GIDs. */
    2589 GIC           5 :         if (gxact->valid && strcmp(gxact->gid, gid) == 0)
    2590                 :         {
    2591                 :             char       *buf;
    2592                 :             TwoPhaseFileHeader *hdr;
    2593                 : 
    2594                 :             /*
    2595                 :              * We are not expecting collisions of GXACTs (same gid) between
    2596                 :              * publisher and subscribers, so we perform all I/O while holding
    2597                 :              * TwoPhaseStateLock for simplicity.
    2598                 :              *
    2599                 :              * To move the I/O out of the lock, we need to ensure that no
    2600                 :              * other backend commits the prepared xact in the meantime. We can
    2601                 :              * do this optimization if we encounter many collisions in GID
    2602 ECB             :              * between publisher and subscriber.
    2603 EUB             :              */
    2604 GIC           5 :             if (gxact->ondisk)
    2605 UIC           0 :                 buf = ReadTwoPhaseFile(gxact->xid, false);
    2606 ECB             :             else
    2607                 :             {
    2608 GIC           5 :                 Assert(gxact->prepare_start_lsn);
    2609               5 :                 XlogReadTwoPhaseData(gxact->prepare_start_lsn, &buf, NULL);
    2610 ECB             :             }
    2611                 : 
    2612 CBC           5 :             hdr = (TwoPhaseFileHeader *) buf;
    2613 ECB             : 
    2614 GIC           5 :             if (hdr->origin_lsn == prepare_end_lsn &&
    2615 CBC           5 :                 hdr->origin_timestamp == origin_prepare_timestamp)
    2616 ECB             :             {
    2617 CBC           5 :                 found = true;
    2618 GIC           5 :                 pfree(buf);
    2619               5 :                 break;
    2620 EUB             :             }
    2621                 : 
    2622 UIC           0 :             pfree(buf);
    2623 ECB             :         }
    2624                 :     }
    2625 GIC           5 :     LWLockRelease(TwoPhaseStateLock);
    2626               5 :     return found;
    2627                 : }
        

Generated by: LCOV version v1.16-55-g56c0a2a