Age Owner Branch data TLA Line data Source code
1 : : /*-------------------------------------------------------------------------
2 : : *
3 : : * twophase.c
4 : : * Two-phase commit support functions.
5 : : *
6 : : * Portions Copyright (c) 1996-2024, PostgreSQL Global Development Group
7 : : * Portions Copyright (c) 1994, Regents of the University of California
8 : : *
9 : : * IDENTIFICATION
10 : : * src/backend/access/transam/twophase.c
11 : : *
12 : : * NOTES
13 : : * Each global transaction is associated with a global transaction
14 : : * identifier (GID). The client assigns a GID to a postgres
15 : : * transaction with the PREPARE TRANSACTION command.
16 : : *
17 : : * We keep all active global transactions in a shared memory array.
18 : : * When the PREPARE TRANSACTION command is issued, the GID is
19 : : * reserved for the transaction in the array. This is done before
20 : : * a WAL entry is made, because the reservation checks for duplicate
21 : : * GIDs and aborts the transaction if there already is a global
22 : : * transaction in prepared state with the same GID.
23 : : *
24 : : * A global transaction (gxact) also has dummy PGPROC; this is what keeps
25 : : * the XID considered running by TransactionIdIsInProgress. It is also
26 : : * convenient as a PGPROC to hook the gxact's locks to.
27 : : *
28 : : * Information to recover prepared transactions in case of crash is
29 : : * now stored in WAL for the common case. In some cases there will be
30 : : * an extended period between preparing a GXACT and commit/abort, in
31 : : * which case we need to separately record prepared transaction data
32 : : * in permanent storage. This includes locking information, pending
33 : : * notifications etc. All that state information is written to the
34 : : * per-transaction state file in the pg_twophase directory.
35 : : * All prepared transactions will be written prior to shutdown.
36 : : *
37 : : * Life track of state data is following:
38 : : *
39 : : * * On PREPARE TRANSACTION backend writes state data only to the WAL and
40 : : * stores pointer to the start of the WAL record in
41 : : * gxact->prepare_start_lsn.
42 : : * * If COMMIT occurs before checkpoint then backend reads data from WAL
43 : : * using prepare_start_lsn.
44 : : * * On checkpoint state data copied to files in pg_twophase directory and
45 : : * fsynced
46 : : * * If COMMIT happens after checkpoint then backend reads state data from
47 : : * files
48 : : *
49 : : * During replay and replication, TwoPhaseState also holds information
50 : : * about active prepared transactions that haven't been moved to disk yet.
51 : : *
52 : : * Replay of twophase records happens by the following rules:
53 : : *
54 : : * * At the beginning of recovery, pg_twophase is scanned once, filling
55 : : * TwoPhaseState with entries marked with gxact->inredo and
56 : : * gxact->ondisk. Two-phase file data older than the XID horizon of
57 : : * the redo position are discarded.
58 : : * * On PREPARE redo, the transaction is added to TwoPhaseState->prepXacts.
59 : : * gxact->inredo is set to true for such entries.
60 : : * * On Checkpoint we iterate through TwoPhaseState->prepXacts entries
61 : : * that have gxact->inredo set and are behind the redo_horizon. We
62 : : * save them to disk and then switch gxact->ondisk to true.
63 : : * * On COMMIT/ABORT we delete the entry from TwoPhaseState->prepXacts.
64 : : * If gxact->ondisk is true, the corresponding entry from the disk
65 : : * is additionally deleted.
66 : : * * RecoverPreparedTransactions(), StandbyRecoverPreparedTransactions()
67 : : * and PrescanPreparedTransactions() have been modified to go through
68 : : * gxact->inredo entries that have not made it to disk.
69 : : *
70 : : *-------------------------------------------------------------------------
71 : : */
72 : : #include "postgres.h"
73 : :
74 : : #include <fcntl.h>
75 : : #include <sys/stat.h>
76 : : #include <time.h>
77 : : #include <unistd.h>
78 : :
79 : : #include "access/commit_ts.h"
80 : : #include "access/htup_details.h"
81 : : #include "access/subtrans.h"
82 : : #include "access/transam.h"
83 : : #include "access/twophase.h"
84 : : #include "access/twophase_rmgr.h"
85 : : #include "access/xact.h"
86 : : #include "access/xlog.h"
87 : : #include "access/xloginsert.h"
88 : : #include "access/xlogreader.h"
89 : : #include "access/xlogrecovery.h"
90 : : #include "access/xlogutils.h"
91 : : #include "catalog/pg_type.h"
92 : : #include "catalog/storage.h"
93 : : #include "funcapi.h"
94 : : #include "miscadmin.h"
95 : : #include "pg_trace.h"
96 : : #include "pgstat.h"
97 : : #include "replication/origin.h"
98 : : #include "replication/syncrep.h"
99 : : #include "storage/fd.h"
100 : : #include "storage/ipc.h"
101 : : #include "storage/md.h"
102 : : #include "storage/predicate.h"
103 : : #include "storage/proc.h"
104 : : #include "storage/procarray.h"
105 : : #include "utils/builtins.h"
106 : : #include "utils/memutils.h"
107 : : #include "utils/timestamp.h"
108 : :
109 : : /*
110 : : * Directory where Two-phase commit files reside within PGDATA
111 : : */
112 : : #define TWOPHASE_DIR "pg_twophase"
113 : :
114 : : /* GUC variable, can't be changed after startup */
115 : : int max_prepared_xacts = 0;
116 : :
117 : : /*
118 : : * This struct describes one global transaction that is in prepared state
119 : : * or attempting to become prepared.
120 : : *
121 : : * The lifecycle of a global transaction is:
122 : : *
123 : : * 1. After checking that the requested GID is not in use, set up an entry in
124 : : * the TwoPhaseState->prepXacts array with the correct GID and valid = false,
125 : : * and mark it as locked by my backend.
126 : : *
127 : : * 2. After successfully completing prepare, set valid = true and enter the
128 : : * referenced PGPROC into the global ProcArray.
129 : : *
130 : : * 3. To begin COMMIT PREPARED or ROLLBACK PREPARED, check that the entry is
131 : : * valid and not locked, then mark the entry as locked by storing my current
132 : : * proc number into locking_backend. This prevents concurrent attempts to
133 : : * commit or rollback the same prepared xact.
134 : : *
135 : : * 4. On completion of COMMIT PREPARED or ROLLBACK PREPARED, remove the entry
136 : : * from the ProcArray and the TwoPhaseState->prepXacts array and return it to
137 : : * the freelist.
138 : : *
139 : : * Note that if the preparing transaction fails between steps 1 and 2, the
140 : : * entry must be removed so that the GID and the GlobalTransaction struct
141 : : * can be reused. See AtAbort_Twophase().
142 : : *
143 : : * typedef struct GlobalTransactionData *GlobalTransaction appears in
144 : : * twophase.h
145 : : */
146 : :
147 : : typedef struct GlobalTransactionData
148 : : {
149 : : GlobalTransaction next; /* list link for free list */
150 : : int pgprocno; /* ID of associated dummy PGPROC */
151 : : TimestampTz prepared_at; /* time of preparation */
152 : :
153 : : /*
154 : : * Note that we need to keep track of two LSNs for each GXACT. We keep
155 : : * track of the start LSN because this is the address we must use to read
156 : : * state data back from WAL when committing a prepared GXACT. We keep
157 : : * track of the end LSN because that is the LSN we need to wait for prior
158 : : * to commit.
159 : : */
160 : : XLogRecPtr prepare_start_lsn; /* XLOG offset of prepare record start */
161 : : XLogRecPtr prepare_end_lsn; /* XLOG offset of prepare record end */
162 : : TransactionId xid; /* The GXACT id */
163 : :
164 : : Oid owner; /* ID of user that executed the xact */
165 : : ProcNumber locking_backend; /* backend currently working on the xact */
166 : : bool valid; /* true if PGPROC entry is in proc array */
167 : : bool ondisk; /* true if prepare state file is on disk */
168 : : bool inredo; /* true if entry was added via xlog_redo */
169 : : char gid[GIDSIZE]; /* The GID assigned to the prepared xact */
170 : : } GlobalTransactionData;
171 : :
172 : : /*
173 : : * Two Phase Commit shared state. Access to this struct is protected
174 : : * by TwoPhaseStateLock.
175 : : */
176 : : typedef struct TwoPhaseStateData
177 : : {
178 : : /* Head of linked list of free GlobalTransactionData structs */
179 : : GlobalTransaction freeGXacts;
180 : :
181 : : /* Number of valid prepXacts entries. */
182 : : int numPrepXacts;
183 : :
184 : : /* There are max_prepared_xacts items in this array */
185 : : GlobalTransaction prepXacts[FLEXIBLE_ARRAY_MEMBER];
186 : : } TwoPhaseStateData;
187 : :
188 : : static TwoPhaseStateData *TwoPhaseState;
189 : :
190 : : /*
191 : : * Global transaction entry currently locked by us, if any. Note that any
192 : : * access to the entry pointed to by this variable must be protected by
193 : : * TwoPhaseStateLock, though obviously the pointer itself doesn't need to be
194 : : * (since it's just local memory).
195 : : */
196 : : static GlobalTransaction MyLockedGxact = NULL;
197 : :
198 : : static bool twophaseExitRegistered = false;
199 : :
200 : : static void RecordTransactionCommitPrepared(TransactionId xid,
201 : : int nchildren,
202 : : TransactionId *children,
203 : : int nrels,
204 : : RelFileLocator *rels,
205 : : int nstats,
206 : : xl_xact_stats_item *stats,
207 : : int ninvalmsgs,
208 : : SharedInvalidationMessage *invalmsgs,
209 : : bool initfileinval,
210 : : const char *gid);
211 : : static void RecordTransactionAbortPrepared(TransactionId xid,
212 : : int nchildren,
213 : : TransactionId *children,
214 : : int nrels,
215 : : RelFileLocator *rels,
216 : : int nstats,
217 : : xl_xact_stats_item *stats,
218 : : const char *gid);
219 : : static void ProcessRecords(char *bufptr, TransactionId xid,
220 : : const TwoPhaseCallback callbacks[]);
221 : : static void RemoveGXact(GlobalTransaction gxact);
222 : :
223 : : static void XlogReadTwoPhaseData(XLogRecPtr lsn, char **buf, int *len);
224 : : static char *ProcessTwoPhaseBuffer(TransactionId xid,
225 : : XLogRecPtr prepare_start_lsn,
226 : : bool fromdisk, bool setParent, bool setNextXid);
227 : : static void MarkAsPreparingGuts(GlobalTransaction gxact, TransactionId xid,
228 : : const char *gid, TimestampTz prepared_at, Oid owner,
229 : : Oid databaseid);
230 : : static void RemoveTwoPhaseFile(TransactionId xid, bool giveWarning);
231 : : static void RecreateTwoPhaseFile(TransactionId xid, void *content, int len);
232 : :
233 : : /*
234 : : * Initialization of shared memory
235 : : */
236 : : Size
6876 tgl@sss.pgh.pa.us 237 :CBC 2577 : TwoPhaseShmemSize(void)
238 : : {
239 : : Size size;
240 : :
241 : : /* Need the fixed struct, the array of pointers, and the GTD structs */
6812 242 : 2577 : size = offsetof(TwoPhaseStateData, prepXacts);
243 : 2577 : size = add_size(size, mul_size(max_prepared_xacts,
244 : : sizeof(GlobalTransaction)));
245 : 2577 : size = MAXALIGN(size);
246 : 2577 : size = add_size(size, mul_size(max_prepared_xacts,
247 : : sizeof(GlobalTransactionData)));
248 : :
249 : 2577 : return size;
250 : : }
251 : :
252 : : void
6876 253 : 898 : TwoPhaseShmemInit(void)
254 : : {
255 : : bool found;
256 : :
257 : 898 : TwoPhaseState = ShmemInitStruct("Prepared Transaction Table",
258 : : TwoPhaseShmemSize(),
259 : : &found);
260 [ + - ]: 898 : if (!IsUnderPostmaster)
261 : : {
262 : : GlobalTransaction gxacts;
263 : : int i;
264 : :
265 [ - + ]: 898 : Assert(!found);
5642 266 : 898 : TwoPhaseState->freeGXacts = NULL;
6876 267 : 898 : TwoPhaseState->numPrepXacts = 0;
268 : :
269 : : /*
270 : : * Initialize the linked list of free GlobalTransactionData structs
271 : : */
272 : 898 : gxacts = (GlobalTransaction)
273 : 898 : ((char *) TwoPhaseState +
6756 bruce@momjian.us 274 : 898 : MAXALIGN(offsetof(TwoPhaseStateData, prepXacts) +
275 : : sizeof(GlobalTransaction) * max_prepared_xacts));
6876 tgl@sss.pgh.pa.us 276 [ + + ]: 1687 : for (i = 0; i < max_prepared_xacts; i++)
277 : : {
278 : : /* insert into linked list */
4524 rhaas@postgresql.org 279 : 789 : gxacts[i].next = TwoPhaseState->freeGXacts;
5642 tgl@sss.pgh.pa.us 280 : 789 : TwoPhaseState->freeGXacts = &gxacts[i];
281 : :
282 : : /* associate it with a PGPROC assigned by InitProcGlobal */
52 heikki.linnakangas@i 283 :GNC 789 : gxacts[i].pgprocno = GetNumberFromPGProc(&PreparedXactProcs[i]);
284 : : }
285 : : }
286 : : else
6876 tgl@sss.pgh.pa.us 287 [ # # ]:UBC 0 : Assert(found);
6876 tgl@sss.pgh.pa.us 288 :CBC 898 : }
289 : :
290 : : /*
291 : : * Exit hook to unlock the global transaction entry we're working on.
292 : : */
293 : : static void
3622 heikki.linnakangas@i 294 : 127 : AtProcExit_Twophase(int code, Datum arg)
295 : : {
296 : : /* same logic as abort */
297 : 127 : AtAbort_Twophase();
298 : 127 : }
299 : :
300 : : /*
301 : : * Abort hook to unlock the global transaction entry we're working on.
302 : : */
303 : : void
304 : 22891 : AtAbort_Twophase(void)
305 : : {
306 [ + + ]: 22891 : if (MyLockedGxact == NULL)
307 : 22889 : return;
308 : :
309 : : /*
310 : : * What to do with the locked global transaction entry? If we were in the
311 : : * process of preparing the transaction, but haven't written the WAL
312 : : * record and state file yet, the transaction must not be considered as
313 : : * prepared. Likewise, if we are in the process of finishing an
314 : : * already-prepared transaction, and fail after having already written the
315 : : * 2nd phase commit or rollback record to the WAL, the transaction should
316 : : * not be considered as prepared anymore. In those cases, just remove the
317 : : * entry from shared memory.
318 : : *
319 : : * Otherwise, the entry must be left in place so that the transaction can
320 : : * be finished later, so just unlock it.
321 : : *
322 : : * If we abort during prepare, after having written the WAL record, we
323 : : * might not have transferred all locks and other state to the prepared
324 : : * transaction yet. Likewise, if we abort during commit or rollback,
325 : : * after having written the WAL record, we might not have released all the
326 : : * resources held by the transaction yet. In those cases, the in-memory
327 : : * state can be wrong, but it's too late to back out.
328 : : */
2496 alvherre@alvh.no-ip. 329 : 2 : LWLockAcquire(TwoPhaseStateLock, LW_EXCLUSIVE);
3622 heikki.linnakangas@i 330 [ + - ]: 2 : if (!MyLockedGxact->valid)
331 : 2 : RemoveGXact(MyLockedGxact);
332 : : else
42 heikki.linnakangas@i 333 :UNC 0 : MyLockedGxact->locking_backend = INVALID_PROC_NUMBER;
2496 alvherre@alvh.no-ip. 334 :CBC 2 : LWLockRelease(TwoPhaseStateLock);
335 : :
3622 heikki.linnakangas@i 336 : 2 : MyLockedGxact = NULL;
337 : : }
338 : :
339 : : /*
340 : : * This is called after we have finished transferring state to the prepared
341 : : * PGPROC entry.
342 : : */
343 : : void
3165 andres@anarazel.de 344 : 421 : PostPrepare_Twophase(void)
345 : : {
3622 heikki.linnakangas@i 346 : 421 : LWLockAcquire(TwoPhaseStateLock, LW_EXCLUSIVE);
42 heikki.linnakangas@i 347 :GNC 421 : MyLockedGxact->locking_backend = INVALID_PROC_NUMBER;
3622 heikki.linnakangas@i 348 :CBC 421 : LWLockRelease(TwoPhaseStateLock);
349 : :
350 : 421 : MyLockedGxact = NULL;
351 : 421 : }
352 : :
353 : :
354 : : /*
355 : : * MarkAsPreparing
356 : : * Reserve the GID for the given transaction.
357 : : */
358 : : GlobalTransaction
6875 tgl@sss.pgh.pa.us 359 : 404 : MarkAsPreparing(TransactionId xid, const char *gid,
360 : : TimestampTz prepared_at, Oid owner, Oid databaseid)
361 : : {
362 : : GlobalTransaction gxact;
363 : : int i;
364 : :
6876 365 [ - + ]: 404 : if (strlen(gid) >= GIDSIZE)
6876 tgl@sss.pgh.pa.us 366 [ # # ]:UBC 0 : ereport(ERROR,
367 : : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
368 : : errmsg("transaction identifier \"%s\" is too long",
369 : : gid)));
370 : :
371 : : /* fail immediately if feature is disabled */
5470 tgl@sss.pgh.pa.us 372 [ + + ]:CBC 404 : if (max_prepared_xacts == 0)
373 [ + - ]: 9 : ereport(ERROR,
374 : : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
375 : : errmsg("prepared transactions are disabled"),
376 : : errhint("Set max_prepared_transactions to a nonzero value.")));
377 : :
378 : : /* on first call, register the exit hook */
3622 heikki.linnakangas@i 379 [ + + ]: 395 : if (!twophaseExitRegistered)
380 : : {
381 : 72 : before_shmem_exit(AtProcExit_Twophase, 0);
382 : 72 : twophaseExitRegistered = true;
383 : : }
384 : :
385 : 395 : LWLockAcquire(TwoPhaseStateLock, LW_EXCLUSIVE);
386 : :
387 : : /* Check for conflicting GID */
6876 tgl@sss.pgh.pa.us 388 [ + + ]: 780 : for (i = 0; i < TwoPhaseState->numPrepXacts; i++)
389 : : {
390 : 387 : gxact = TwoPhaseState->prepXacts[i];
391 [ + + ]: 387 : if (strcmp(gxact->gid, gid) == 0)
392 : : {
393 [ + - ]: 2 : ereport(ERROR,
394 : : (errcode(ERRCODE_DUPLICATE_OBJECT),
395 : : errmsg("transaction identifier \"%s\" is already in use",
396 : : gid)));
397 : : }
398 : : }
399 : :
400 : : /* Get a free gxact from the freelist */
5642 401 [ - + ]: 393 : if (TwoPhaseState->freeGXacts == NULL)
6876 tgl@sss.pgh.pa.us 402 [ # # ]:UBC 0 : ereport(ERROR,
403 : : (errcode(ERRCODE_OUT_OF_MEMORY),
404 : : errmsg("maximum number of prepared transactions reached"),
405 : : errhint("Increase max_prepared_transactions (currently %d).",
406 : : max_prepared_xacts)));
5642 tgl@sss.pgh.pa.us 407 :CBC 393 : gxact = TwoPhaseState->freeGXacts;
4267 408 : 393 : TwoPhaseState->freeGXacts = gxact->next;
409 : :
2567 simon@2ndQuadrant.co 410 : 393 : MarkAsPreparingGuts(gxact, xid, gid, prepared_at, owner, databaseid);
411 : :
412 : 393 : gxact->ondisk = false;
413 : :
414 : : /* And insert it into the active array */
415 [ - + ]: 393 : Assert(TwoPhaseState->numPrepXacts < max_prepared_xacts);
416 : 393 : TwoPhaseState->prepXacts[TwoPhaseState->numPrepXacts++] = gxact;
417 : :
418 : 393 : LWLockRelease(TwoPhaseStateLock);
419 : :
420 : 393 : return gxact;
421 : : }
422 : :
423 : : /*
424 : : * MarkAsPreparingGuts
425 : : *
426 : : * This uses a gxact struct and puts it into the active array.
427 : : * NOTE: this is also used when reloading a gxact after a crash; so avoid
428 : : * assuming that we can use very much backend context.
429 : : *
430 : : * Note: This function should be called with appropriate locks held.
431 : : */
432 : : static void
433 : 423 : MarkAsPreparingGuts(GlobalTransaction gxact, TransactionId xid, const char *gid,
434 : : TimestampTz prepared_at, Oid owner, Oid databaseid)
435 : : {
436 : : PGPROC *proc;
437 : : int i;
438 : :
2496 alvherre@alvh.no-ip. 439 [ - + ]: 423 : Assert(LWLockHeldByMeInMode(TwoPhaseStateLock, LW_EXCLUSIVE));
440 : :
2567 simon@2ndQuadrant.co 441 [ - + ]: 423 : Assert(gxact != NULL);
42 heikki.linnakangas@i 442 :GNC 423 : proc = GetPGProcByNumber(gxact->pgprocno);
443 : :
444 : : /* Initialize the PGPROC entry */
4524 rhaas@postgresql.org 445 [ + - + - :CBC 47376 : MemSet(proc, 0, sizeof(PGPROC));
+ - + - +
+ ]
452 andres@anarazel.de 446 : 423 : dlist_node_init(&proc->links);
1397 peter@eisentraut.org 447 : 423 : proc->waitStatus = PROC_WAIT_STATUS_OK;
42 heikki.linnakangas@i 448 [ + + ]:GNC 423 : if (LocalTransactionIdIsValid(MyProc->vxid.lxid))
449 : : {
450 : : /* clone VXID, for TwoPhaseGetXidByVirtualXID() to find */
451 : 393 : proc->vxid.lxid = MyProc->vxid.lxid;
452 : 393 : proc->vxid.procNumber = MyProcNumber;
453 : : }
454 : : else
455 : : {
904 noah@leadboat.com 456 [ - + - - ]:CBC 30 : Assert(AmStartupProcess() || !IsPostmasterEnvironment);
457 : : /* GetLockConflicts() uses this to specify a wait on the XID */
42 heikki.linnakangas@i 458 :GNC 30 : proc->vxid.lxid = xid;
459 : 30 : proc->vxid.procNumber = INVALID_PROC_NUMBER;
460 : : }
1339 andres@anarazel.de 461 :CBC 423 : proc->xid = xid;
1340 462 [ - + ]: 423 : Assert(proc->xmin == InvalidTransactionId);
737 rhaas@postgresql.org 463 : 423 : proc->delayChkptFlags = 0;
1245 alvherre@alvh.no-ip. 464 : 423 : proc->statusFlags = 0;
4524 rhaas@postgresql.org 465 : 423 : proc->pid = 0;
466 : 423 : proc->databaseId = databaseid;
467 : 423 : proc->roleId = owner;
2071 michael@paquier.xyz 468 : 423 : proc->tempNamespaceId = InvalidOid;
2629 andrew@dunslane.net 469 : 423 : proc->isBackgroundWorker = false;
511 andres@anarazel.de 470 : 423 : proc->lwWaiting = LW_WS_NOT_WAITING;
4458 heikki.linnakangas@i 471 : 423 : proc->lwWaitMode = 0;
4524 rhaas@postgresql.org 472 : 423 : proc->waitLock = NULL;
473 : 423 : proc->waitProcLock = NULL;
1147 fujii@postgresql.org 474 : 423 : pg_atomic_init_u64(&proc->waitStart, 0);
6699 tgl@sss.pgh.pa.us 475 [ + + ]: 7191 : for (i = 0; i < NUM_LOCK_PARTITIONS; i++)
452 andres@anarazel.de 476 : 6768 : dlist_init(&proc->myProcLocks[i]);
477 : : /* subxid data must be filled later by GXactLoadSubxactData */
1339 478 : 423 : proc->subxidStatus.overflowed = false;
479 : 423 : proc->subxidStatus.count = 0;
480 : :
6875 tgl@sss.pgh.pa.us 481 : 423 : gxact->prepared_at = prepared_at;
2567 simon@2ndQuadrant.co 482 : 423 : gxact->xid = xid;
6876 tgl@sss.pgh.pa.us 483 : 423 : gxact->owner = owner;
42 heikki.linnakangas@i 484 :GNC 423 : gxact->locking_backend = MyProcNumber;
6876 tgl@sss.pgh.pa.us 485 :CBC 423 : gxact->valid = false;
2567 simon@2ndQuadrant.co 486 : 423 : gxact->inredo = false;
6876 tgl@sss.pgh.pa.us 487 : 423 : strcpy(gxact->gid, gid);
488 : :
489 : : /*
490 : : * Remember that we have this GlobalTransaction entry locked for us. If we
491 : : * abort after this, we must release it.
492 : : */
3622 heikki.linnakangas@i 493 : 423 : MyLockedGxact = gxact;
6876 tgl@sss.pgh.pa.us 494 : 423 : }
495 : :
496 : : /*
497 : : * GXactLoadSubxactData
498 : : *
499 : : * If the transaction being persisted had any subtransactions, this must
500 : : * be called before MarkAsPrepared() to load information into the dummy
501 : : * PGPROC.
502 : : */
503 : : static void
504 : 180 : GXactLoadSubxactData(GlobalTransaction gxact, int nsubxacts,
505 : : TransactionId *children)
506 : : {
42 heikki.linnakangas@i 507 :GNC 180 : PGPROC *proc = GetPGProcByNumber(gxact->pgprocno);
508 : :
509 : : /* We need no extra lock since the GXACT isn't valid yet */
6876 tgl@sss.pgh.pa.us 510 [ + + ]:CBC 180 : if (nsubxacts > PGPROC_MAX_CACHED_SUBXIDS)
511 : : {
1339 andres@anarazel.de 512 : 4 : proc->subxidStatus.overflowed = true;
6876 tgl@sss.pgh.pa.us 513 : 4 : nsubxacts = PGPROC_MAX_CACHED_SUBXIDS;
514 : : }
515 [ + + ]: 180 : if (nsubxacts > 0)
516 : : {
4524 rhaas@postgresql.org 517 : 165 : memcpy(proc->subxids.xids, children,
518 : : nsubxacts * sizeof(TransactionId));
1339 andres@anarazel.de 519 : 165 : proc->subxidStatus.count = nsubxacts;
520 : : }
6876 tgl@sss.pgh.pa.us 521 : 180 : }
522 : :
523 : : /*
524 : : * MarkAsPrepared
525 : : * Mark the GXACT as fully valid, and enter it into the global ProcArray.
526 : : *
527 : : * lock_held indicates whether caller already holds TwoPhaseStateLock.
528 : : */
529 : : static void
2496 alvherre@alvh.no-ip. 530 : 421 : MarkAsPrepared(GlobalTransaction gxact, bool lock_held)
531 : : {
532 : : /* Lock here may be overkill, but I'm not convinced of that ... */
533 [ + + ]: 421 : if (!lock_held)
534 : 391 : LWLockAcquire(TwoPhaseStateLock, LW_EXCLUSIVE);
6876 tgl@sss.pgh.pa.us 535 [ - + ]: 421 : Assert(!gxact->valid);
536 : 421 : gxact->valid = true;
2496 alvherre@alvh.no-ip. 537 [ + + ]: 421 : if (!lock_held)
538 : 391 : LWLockRelease(TwoPhaseStateLock);
539 : :
540 : : /*
541 : : * Put it into the global ProcArray so TransactionIdIsInProgress considers
542 : : * the XID as still running.
543 : : */
42 heikki.linnakangas@i 544 :GNC 421 : ProcArrayAdd(GetPGProcByNumber(gxact->pgprocno));
6876 tgl@sss.pgh.pa.us 545 :CBC 421 : }
546 : :
547 : : /*
548 : : * LockGXact
549 : : * Locate the prepared transaction and mark it busy for COMMIT or PREPARE.
550 : : */
551 : : static GlobalTransaction
6865 552 : 400 : LockGXact(const char *gid, Oid user)
553 : : {
554 : : int i;
555 : :
556 : : /* on first call, register the exit hook */
3622 heikki.linnakangas@i 557 [ + + ]: 400 : if (!twophaseExitRegistered)
558 : : {
559 : 61 : before_shmem_exit(AtProcExit_Twophase, 0);
560 : 61 : twophaseExitRegistered = true;
561 : : }
562 : :
6876 tgl@sss.pgh.pa.us 563 : 400 : LWLockAcquire(TwoPhaseStateLock, LW_EXCLUSIVE);
564 : :
565 [ + + ]: 707 : for (i = 0; i < TwoPhaseState->numPrepXacts; i++)
566 : : {
6756 bruce@momjian.us 567 : 701 : GlobalTransaction gxact = TwoPhaseState->prepXacts[i];
42 heikki.linnakangas@i 568 :GNC 701 : PGPROC *proc = GetPGProcByNumber(gxact->pgprocno);
569 : :
570 : : /* Ignore not-yet-valid GIDs */
6876 tgl@sss.pgh.pa.us 571 [ + + ]:CBC 701 : if (!gxact->valid)
572 : 1 : continue;
573 [ + + ]: 700 : if (strcmp(gxact->gid, gid) != 0)
574 : 306 : continue;
575 : :
576 : : /* Found it, but has someone else got it locked? */
42 heikki.linnakangas@i 577 [ - + ]:GNC 394 : if (gxact->locking_backend != INVALID_PROC_NUMBER)
3622 heikki.linnakangas@i 578 [ # # ]:UBC 0 : ereport(ERROR,
579 : : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
580 : : errmsg("prepared transaction with identifier \"%s\" is busy",
581 : : gid)));
582 : :
6876 tgl@sss.pgh.pa.us 583 [ - + - - ]:CBC 394 : if (user != gxact->owner && !superuser_arg(user))
6876 tgl@sss.pgh.pa.us 584 [ # # ]:UBC 0 : ereport(ERROR,
585 : : (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
586 : : errmsg("permission denied to finish prepared transaction"),
587 : : errhint("Must be superuser or the user that prepared the transaction.")));
588 : :
589 : : /*
590 : : * Note: it probably would be possible to allow committing from
591 : : * another database; but at the moment NOTIFY is known not to work and
592 : : * there may be some other issues as well. Hence disallow until
593 : : * someone gets motivated to make it work.
594 : : */
4524 rhaas@postgresql.org 595 [ - + ]:CBC 394 : if (MyDatabaseId != proc->databaseId)
6270 tgl@sss.pgh.pa.us 596 [ # # ]:UBC 0 : ereport(ERROR,
597 : : (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
598 : : errmsg("prepared transaction belongs to another database"),
599 : : errhint("Connect to the database where the transaction was prepared to finish it.")));
600 : :
601 : : /* OK for me to lock it */
42 heikki.linnakangas@i 602 :GNC 394 : gxact->locking_backend = MyProcNumber;
3622 heikki.linnakangas@i 603 :CBC 394 : MyLockedGxact = gxact;
604 : :
6876 tgl@sss.pgh.pa.us 605 : 394 : LWLockRelease(TwoPhaseStateLock);
606 : :
607 : 394 : return gxact;
608 : : }
609 : :
610 : 6 : LWLockRelease(TwoPhaseStateLock);
611 : :
612 [ + - ]: 6 : ereport(ERROR,
613 : : (errcode(ERRCODE_UNDEFINED_OBJECT),
614 : : errmsg("prepared transaction with identifier \"%s\" does not exist",
615 : : gid)));
616 : :
617 : : /* NOTREACHED */
618 : : return NULL;
619 : : }
620 : :
621 : : /*
622 : : * RemoveGXact
623 : : * Remove the prepared transaction from the shared memory array.
624 : : *
625 : : * NB: caller should have already removed it from ProcArray
626 : : */
627 : : static void
628 : 454 : RemoveGXact(GlobalTransaction gxact)
629 : : {
630 : : int i;
631 : :
2496 alvherre@alvh.no-ip. 632 [ - + ]: 454 : Assert(LWLockHeldByMeInMode(TwoPhaseStateLock, LW_EXCLUSIVE));
633 : :
6876 tgl@sss.pgh.pa.us 634 [ + - ]: 753 : for (i = 0; i < TwoPhaseState->numPrepXacts; i++)
635 : : {
636 [ + + ]: 753 : if (gxact == TwoPhaseState->prepXacts[i])
637 : : {
638 : : /* remove from the active array */
639 : 454 : TwoPhaseState->numPrepXacts--;
640 : 454 : TwoPhaseState->prepXacts[i] = TwoPhaseState->prepXacts[TwoPhaseState->numPrepXacts];
641 : :
642 : : /* and put it back in the freelist */
4524 rhaas@postgresql.org 643 : 454 : gxact->next = TwoPhaseState->freeGXacts;
5642 tgl@sss.pgh.pa.us 644 : 454 : TwoPhaseState->freeGXacts = gxact;
645 : :
6876 646 : 454 : return;
647 : : }
648 : : }
649 : :
6876 tgl@sss.pgh.pa.us 650 [ # # ]:UBC 0 : elog(ERROR, "failed to find %p in GlobalTransaction array", gxact);
651 : : }
652 : :
653 : : /*
654 : : * Returns an array of all prepared transactions for the user-level
655 : : * function pg_prepared_xact.
656 : : *
657 : : * The returned array and all its elements are copies of internal data
658 : : * structures, to minimize the time we need to hold the TwoPhaseStateLock.
659 : : *
660 : : * WARNING -- we return even those transactions that are not fully prepared
661 : : * yet. The caller should filter them out if he doesn't want them.
662 : : *
663 : : * The returned array is palloc'd.
664 : : */
665 : : static int
6876 tgl@sss.pgh.pa.us 666 :CBC 99 : GetPreparedTransactionList(GlobalTransaction *gxacts)
667 : : {
668 : : GlobalTransaction array;
669 : : int num;
670 : : int i;
671 : :
672 : 99 : LWLockAcquire(TwoPhaseStateLock, LW_SHARED);
673 : :
674 [ + + ]: 99 : if (TwoPhaseState->numPrepXacts == 0)
675 : : {
676 : 58 : LWLockRelease(TwoPhaseStateLock);
677 : :
678 : 58 : *gxacts = NULL;
679 : 58 : return 0;
680 : : }
681 : :
682 : 41 : num = TwoPhaseState->numPrepXacts;
683 : 41 : array = (GlobalTransaction) palloc(sizeof(GlobalTransactionData) * num);
684 : 41 : *gxacts = array;
685 [ + + ]: 87 : for (i = 0; i < num; i++)
686 : 46 : memcpy(array + i, TwoPhaseState->prepXacts[i],
687 : : sizeof(GlobalTransactionData));
688 : :
689 : 41 : LWLockRelease(TwoPhaseStateLock);
690 : :
691 : 41 : return num;
692 : : }
693 : :
694 : :
695 : : /* Working status for pg_prepared_xact */
696 : : typedef struct
697 : : {
698 : : GlobalTransaction array;
699 : : int ngxacts;
700 : : int currIdx;
701 : : } Working_State;
702 : :
703 : : /*
704 : : * pg_prepared_xact
705 : : * Produce a view with one row per prepared transaction.
706 : : *
707 : : * This function is here so we don't have to export the
708 : : * GlobalTransactionData struct definition.
709 : : */
710 : : Datum
711 : 145 : pg_prepared_xact(PG_FUNCTION_ARGS)
712 : : {
713 : : FuncCallContext *funcctx;
714 : : Working_State *status;
715 : :
716 [ + + ]: 145 : if (SRF_IS_FIRSTCALL())
717 : : {
718 : : TupleDesc tupdesc;
719 : : MemoryContext oldcontext;
720 : :
721 : : /* create a function context for cross-call persistence */
722 : 99 : funcctx = SRF_FIRSTCALL_INIT();
723 : :
724 : : /*
725 : : * Switch to memory context appropriate for multiple function calls
726 : : */
727 : 99 : oldcontext = MemoryContextSwitchTo(funcctx->multi_call_memory_ctx);
728 : :
729 : : /* build tupdesc for result tuples */
730 : : /* this had better match pg_prepared_xacts view in system_views.sql */
1972 andres@anarazel.de 731 : 99 : tupdesc = CreateTemplateTupleDesc(5);
6876 tgl@sss.pgh.pa.us 732 : 99 : TupleDescInitEntry(tupdesc, (AttrNumber) 1, "transaction",
733 : : XIDOID, -1, 0);
734 : 99 : TupleDescInitEntry(tupdesc, (AttrNumber) 2, "gid",
735 : : TEXTOID, -1, 0);
6875 736 : 99 : TupleDescInitEntry(tupdesc, (AttrNumber) 3, "prepared",
737 : : TIMESTAMPTZOID, -1, 0);
738 : 99 : TupleDescInitEntry(tupdesc, (AttrNumber) 4, "ownerid",
739 : : OIDOID, -1, 0);
740 : 99 : TupleDescInitEntry(tupdesc, (AttrNumber) 5, "dbid",
741 : : OIDOID, -1, 0);
742 : :
6876 743 : 99 : funcctx->tuple_desc = BlessTupleDesc(tupdesc);
744 : :
745 : : /*
746 : : * Collect all the 2PC status information that we will format and send
747 : : * out as a result set.
748 : : */
749 : 99 : status = (Working_State *) palloc(sizeof(Working_State));
750 : 99 : funcctx->user_fctx = (void *) status;
751 : :
752 : 99 : status->ngxacts = GetPreparedTransactionList(&status->array);
753 : 99 : status->currIdx = 0;
754 : :
755 : 99 : MemoryContextSwitchTo(oldcontext);
756 : : }
757 : :
758 : 145 : funcctx = SRF_PERCALL_SETUP();
759 : 145 : status = (Working_State *) funcctx->user_fctx;
760 : :
761 [ + + + + ]: 145 : while (status->array != NULL && status->currIdx < status->ngxacts)
762 : : {
763 : 46 : GlobalTransaction gxact = &status->array[status->currIdx++];
52 heikki.linnakangas@i 764 :GNC 46 : PGPROC *proc = GetPGProcByNumber(gxact->pgprocno);
638 peter@eisentraut.org 765 :CBC 46 : Datum values[5] = {0};
766 : 46 : bool nulls[5] = {0};
767 : : HeapTuple tuple;
768 : : Datum result;
769 : :
6876 tgl@sss.pgh.pa.us 770 [ - + ]: 46 : if (!gxact->valid)
6876 tgl@sss.pgh.pa.us 771 :UBC 0 : continue;
772 : :
773 : : /*
774 : : * Form tuple with appropriate data.
775 : : */
776 : :
1339 andres@anarazel.de 777 :CBC 46 : values[0] = TransactionIdGetDatum(proc->xid);
5864 tgl@sss.pgh.pa.us 778 : 46 : values[1] = CStringGetTextDatum(gxact->gid);
6875 779 : 46 : values[2] = TimestampTzGetDatum(gxact->prepared_at);
6865 780 : 46 : values[3] = ObjectIdGetDatum(gxact->owner);
4524 rhaas@postgresql.org 781 : 46 : values[4] = ObjectIdGetDatum(proc->databaseId);
782 : :
6876 tgl@sss.pgh.pa.us 783 : 46 : tuple = heap_form_tuple(funcctx->tuple_desc, values, nulls);
784 : 46 : result = HeapTupleGetDatum(tuple);
785 : 46 : SRF_RETURN_NEXT(funcctx, result);
786 : : }
787 : :
788 : 99 : SRF_RETURN_DONE(funcctx);
789 : : }
790 : :
791 : : /*
792 : : * TwoPhaseGetGXact
793 : : * Get the GlobalTransaction struct for a prepared transaction
794 : : * specified by XID
795 : : *
796 : : * If lock_held is set to true, TwoPhaseStateLock will not be taken, so the
797 : : * caller had better hold it.
798 : : */
799 : : static GlobalTransaction
1875 michael@paquier.xyz 800 : 1497 : TwoPhaseGetGXact(TransactionId xid, bool lock_held)
801 : : {
4267 tgl@sss.pgh.pa.us 802 : 1497 : GlobalTransaction result = NULL;
803 : : int i;
804 : :
805 : : static TransactionId cached_xid = InvalidTransactionId;
806 : : static GlobalTransaction cached_gxact = NULL;
807 : :
1875 michael@paquier.xyz 808 [ + + - + ]: 1497 : Assert(!lock_held || LWLockHeldByMe(TwoPhaseStateLock));
809 : :
810 : : /*
811 : : * During a recovery, COMMIT PREPARED, or ABORT PREPARED, we'll be called
812 : : * repeatedly for the same XID. We can save work with a simple cache.
813 : : */
6876 tgl@sss.pgh.pa.us 814 [ + + ]: 1497 : if (xid == cached_xid)
4267 815 : 1005 : return cached_gxact;
816 : :
1875 michael@paquier.xyz 817 [ + + ]: 492 : if (!lock_held)
818 : 421 : LWLockAcquire(TwoPhaseStateLock, LW_SHARED);
819 : :
6876 tgl@sss.pgh.pa.us 820 [ + - ]: 876 : for (i = 0; i < TwoPhaseState->numPrepXacts; i++)
821 : : {
6756 bruce@momjian.us 822 : 876 : GlobalTransaction gxact = TwoPhaseState->prepXacts[i];
823 : :
1339 andres@anarazel.de 824 [ + + ]: 876 : if (gxact->xid == xid)
825 : : {
4267 tgl@sss.pgh.pa.us 826 : 492 : result = gxact;
6876 827 : 492 : break;
828 : : }
829 : : }
830 : :
1875 michael@paquier.xyz 831 [ + + ]: 492 : if (!lock_held)
832 : 421 : LWLockRelease(TwoPhaseStateLock);
833 : :
6876 tgl@sss.pgh.pa.us 834 [ - + ]: 492 : if (result == NULL) /* should not happen */
4267 tgl@sss.pgh.pa.us 835 [ # # ]:UBC 0 : elog(ERROR, "failed to find GlobalTransaction for xid %u", xid);
836 : :
6876 tgl@sss.pgh.pa.us 837 :CBC 492 : cached_xid = xid;
4267 838 : 492 : cached_gxact = result;
839 : :
6876 840 : 492 : return result;
841 : : }
842 : :
843 : : /*
844 : : * TwoPhaseGetXidByVirtualXID
845 : : * Lookup VXID among xacts prepared since last startup.
846 : : *
847 : : * (This won't find recovered xacts.) If more than one matches, return any
848 : : * and set "have_more" to true. To witness multiple matches, a single
849 : : * proc number must consume 2^32 LXIDs, with no intervening database restart.
850 : : */
851 : : TransactionId
904 noah@leadboat.com 852 : 110 : TwoPhaseGetXidByVirtualXID(VirtualTransactionId vxid,
853 : : bool *have_more)
854 : : {
855 : : int i;
856 : 110 : TransactionId result = InvalidTransactionId;
857 : :
858 [ - + ]: 110 : Assert(VirtualTransactionIdIsValid(vxid));
859 : 110 : LWLockAcquire(TwoPhaseStateLock, LW_SHARED);
860 : :
861 [ + + ]: 183 : for (i = 0; i < TwoPhaseState->numPrepXacts; i++)
862 : : {
863 : 73 : GlobalTransaction gxact = TwoPhaseState->prepXacts[i];
864 : : PGPROC *proc;
865 : : VirtualTransactionId proc_vxid;
866 : :
867 [ - + ]: 73 : if (!gxact->valid)
904 noah@leadboat.com 868 :UBC 0 : continue;
42 heikki.linnakangas@i 869 :GNC 73 : proc = GetPGProcByNumber(gxact->pgprocno);
904 noah@leadboat.com 870 :CBC 73 : GET_VXID_FROM_PGPROC(proc_vxid, *proc);
871 [ + + + + ]: 73 : if (VirtualTransactionIdEquals(vxid, proc_vxid))
872 : : {
873 : : /*
874 : : * Startup process sets proc->vxid.procNumber to
875 : : * INVALID_PROC_NUMBER.
876 : : */
877 [ - + ]: 19 : Assert(!gxact->inredo);
878 : :
879 [ - + ]: 19 : if (result != InvalidTransactionId)
880 : : {
904 noah@leadboat.com 881 :UBC 0 : *have_more = true;
882 : 0 : break;
883 : : }
904 noah@leadboat.com 884 :CBC 19 : result = gxact->xid;
885 : : }
886 : : }
887 : :
888 : 110 : LWLockRelease(TwoPhaseStateLock);
889 : :
890 : 110 : return result;
891 : : }
892 : :
893 : : /*
894 : : * TwoPhaseGetDummyProcNumber
895 : : * Get the dummy proc number for prepared transaction specified by XID
896 : : *
897 : : * Dummy proc numbers are similar to proc numbers of real backends. They
898 : : * start at MaxBackends, and are unique across all currently active real
899 : : * backends and prepared transactions. If lock_held is set to true,
900 : : * TwoPhaseStateLock will not be taken, so the caller had better hold it.
901 : : */
902 : : ProcNumber
42 heikki.linnakangas@i 903 :GNC 117 : TwoPhaseGetDummyProcNumber(TransactionId xid, bool lock_held)
904 : : {
1875 michael@paquier.xyz 905 :CBC 117 : GlobalTransaction gxact = TwoPhaseGetGXact(xid, lock_held);
906 : :
42 heikki.linnakangas@i 907 :GNC 117 : return gxact->pgprocno;
908 : : }
909 : :
910 : : /*
911 : : * TwoPhaseGetDummyProc
912 : : * Get the PGPROC that represents a prepared transaction specified by XID
913 : : *
914 : : * If lock_held is set to true, TwoPhaseStateLock will not be taken, so the
915 : : * caller had better hold it.
916 : : */
917 : : PGPROC *
1875 michael@paquier.xyz 918 :CBC 1380 : TwoPhaseGetDummyProc(TransactionId xid, bool lock_held)
919 : : {
920 : 1380 : GlobalTransaction gxact = TwoPhaseGetGXact(xid, lock_held);
921 : :
52 heikki.linnakangas@i 922 :GNC 1380 : return GetPGProcByNumber(gxact->pgprocno);
923 : : }
924 : :
925 : : /************************************************************************/
926 : : /* State file support */
927 : : /************************************************************************/
928 : :
929 : : /*
930 : : * Compute the FullTransactionId for the given TransactionId.
931 : : *
932 : : * The wrap logic is safe here because the span of active xids cannot exceed one
933 : : * epoch at any given time.
934 : : */
935 : : static inline FullTransactionId
137 akorotkov@postgresql 936 : 236 : AdjustToFullTransactionId(TransactionId xid)
937 : : {
938 : : FullTransactionId nextFullXid;
939 : : TransactionId nextXid;
940 : : uint32 epoch;
941 : :
942 [ - + ]: 236 : Assert(TransactionIdIsValid(xid));
943 : :
944 : 236 : LWLockAcquire(XidGenLock, LW_SHARED);
128 heikki.linnakangas@i 945 : 236 : nextFullXid = TransamVariables->nextXid;
137 akorotkov@postgresql 946 : 236 : LWLockRelease(XidGenLock);
947 : :
948 : 236 : nextXid = XidFromFullTransactionId(nextFullXid);
949 : 236 : epoch = EpochFromFullTransactionId(nextFullXid);
950 [ - + ]: 236 : if (unlikely(xid > nextXid))
951 : : {
952 : : /* Wraparound occurred, must be from a prev epoch. */
137 akorotkov@postgresql 953 [ # # ]:UNC 0 : Assert(epoch > 0);
954 : 0 : epoch--;
955 : : }
956 : :
137 akorotkov@postgresql 957 :GNC 236 : return FullTransactionIdFromEpochAndXid(epoch, xid);
958 : : }
959 : :
960 : : static inline int
961 : 236 : TwoPhaseFilePath(char *path, TransactionId xid)
962 : : {
963 : 236 : FullTransactionId fxid = AdjustToFullTransactionId(xid);
964 : :
965 : 472 : return snprintf(path, MAXPGPATH, TWOPHASE_DIR "/%08X%08X",
966 : 236 : EpochFromFullTransactionId(fxid),
967 : 236 : XidFromFullTransactionId(fxid));
968 : : }
969 : :
970 : : /*
971 : : * 2PC state file format:
972 : : *
973 : : * 1. TwoPhaseFileHeader
974 : : * 2. TransactionId[] (subtransactions)
975 : : * 3. RelFileLocator[] (files to be deleted at commit)
976 : : * 4. RelFileLocator[] (files to be deleted at abort)
977 : : * 5. SharedInvalidationMessage[] (inval messages to be sent at commit)
978 : : * 6. TwoPhaseRecordOnDisk
979 : : * 7. ...
980 : : * 8. TwoPhaseRecordOnDisk (end sentinel, rmid == TWOPHASE_RM_END_ID)
981 : : * 9. checksum (CRC-32C)
982 : : *
983 : : * Each segment except the final checksum is MAXALIGN'd.
984 : : */
985 : :
986 : : /*
987 : : * Header for a 2PC state file
988 : : */
989 : : #define TWOPHASE_MAGIC 0x57F94534 /* format identifier */
990 : :
991 : : typedef xl_xact_prepare TwoPhaseFileHeader;
992 : :
993 : : /*
994 : : * Header for each record in a state file
995 : : *
996 : : * NOTE: len counts only the rmgr data, not the TwoPhaseRecordOnDisk header.
997 : : * The rmgr data will be stored starting on a MAXALIGN boundary.
998 : : */
999 : : typedef struct TwoPhaseRecordOnDisk
1000 : : {
1001 : : uint32 len; /* length of rmgr data */
1002 : : TwoPhaseRmgrId rmid; /* resource manager for this record */
1003 : : uint16 info; /* flag bits for use by rmgr */
1004 : : } TwoPhaseRecordOnDisk;
1005 : :
1006 : : /*
1007 : : * During prepare, the state file is assembled in memory before writing it
1008 : : * to WAL and the actual state file. We use a chain of StateFileChunk blocks
1009 : : * for that.
1010 : : */
1011 : : typedef struct StateFileChunk
1012 : : {
1013 : : char *data;
1014 : : uint32 len;
1015 : : struct StateFileChunk *next;
1016 : : } StateFileChunk;
1017 : :
1018 : : static struct xllist
1019 : : {
1020 : : StateFileChunk *head; /* first data block in the chain */
1021 : : StateFileChunk *tail; /* last block in chain */
1022 : : uint32 num_chunks;
1023 : : uint32 bytes_free; /* free bytes left in tail block */
1024 : : uint32 total_len; /* total data bytes in chain */
1025 : : } records;
1026 : :
1027 : :
1028 : : /*
1029 : : * Append a block of data to records data structure.
1030 : : *
1031 : : * NB: each block is padded to a MAXALIGN multiple. This must be
1032 : : * accounted for when the file is later read!
1033 : : *
1034 : : * The data is copied, so the caller is free to modify it afterwards.
1035 : : */
1036 : : static void
6876 tgl@sss.pgh.pa.us 1037 :CBC 4260 : save_state_data(const void *data, uint32 len)
1038 : : {
6756 bruce@momjian.us 1039 : 4260 : uint32 padlen = MAXALIGN(len);
1040 : :
6876 tgl@sss.pgh.pa.us 1041 [ + + ]: 4260 : if (padlen > records.bytes_free)
1042 : : {
3433 heikki.linnakangas@i 1043 : 41 : records.tail->next = palloc0(sizeof(StateFileChunk));
6876 tgl@sss.pgh.pa.us 1044 : 41 : records.tail = records.tail->next;
1045 : 41 : records.tail->len = 0;
1046 : 41 : records.tail->next = NULL;
3433 heikki.linnakangas@i 1047 : 41 : records.num_chunks++;
1048 : :
6876 tgl@sss.pgh.pa.us 1049 : 41 : records.bytes_free = Max(padlen, 512);
1050 : 41 : records.tail->data = palloc(records.bytes_free);
1051 : : }
1052 : :
1053 : 4260 : memcpy(((char *) records.tail->data) + records.tail->len, data, len);
1054 : 4260 : records.tail->len += padlen;
1055 : 4260 : records.bytes_free -= padlen;
1056 : 4260 : records.total_len += padlen;
1057 : 4260 : }
1058 : :
1059 : : /*
1060 : : * Start preparing a state file.
1061 : : *
1062 : : * Initializes data structure and inserts the 2PC file header record.
1063 : : */
1064 : : void
1065 : 393 : StartPrepare(GlobalTransaction gxact)
1066 : : {
52 heikki.linnakangas@i 1067 :GNC 393 : PGPROC *proc = GetPGProcByNumber(gxact->pgprocno);
1339 andres@anarazel.de 1068 :CBC 393 : TransactionId xid = gxact->xid;
1069 : : TwoPhaseFileHeader hdr;
1070 : : TransactionId *children;
1071 : : RelFileLocator *commitrels;
1072 : : RelFileLocator *abortrels;
739 1073 : 393 : xl_xact_stats_item *abortstats = NULL;
1074 : 393 : xl_xact_stats_item *commitstats = NULL;
1075 : : SharedInvalidationMessage *invalmsgs;
1076 : :
1077 : : /* Initialize linked list */
3433 heikki.linnakangas@i 1078 : 393 : records.head = palloc0(sizeof(StateFileChunk));
6876 tgl@sss.pgh.pa.us 1079 : 393 : records.head->len = 0;
1080 : 393 : records.head->next = NULL;
1081 : :
1082 : 393 : records.bytes_free = Max(sizeof(TwoPhaseFileHeader), 512);
1083 : 393 : records.head->data = palloc(records.bytes_free);
1084 : :
1085 : 393 : records.tail = records.head;
3433 heikki.linnakangas@i 1086 : 393 : records.num_chunks = 1;
1087 : :
6876 tgl@sss.pgh.pa.us 1088 : 393 : records.total_len = 0;
1089 : :
1090 : : /* Create header */
1091 : 393 : hdr.magic = TWOPHASE_MAGIC;
1092 : 393 : hdr.total_len = 0; /* EndPrepare will fill this in */
1093 : 393 : hdr.xid = xid;
4524 rhaas@postgresql.org 1094 : 393 : hdr.database = proc->databaseId;
6875 tgl@sss.pgh.pa.us 1095 : 393 : hdr.prepared_at = gxact->prepared_at;
1096 : 393 : hdr.owner = gxact->owner;
6876 1097 : 393 : hdr.nsubxacts = xactGetCommittedChildren(&children);
4993 rhaas@postgresql.org 1098 : 393 : hdr.ncommitrels = smgrGetPendingDeletes(true, &commitrels);
1099 : 393 : hdr.nabortrels = smgrGetPendingDeletes(false, &abortrels);
739 andres@anarazel.de 1100 : 393 : hdr.ncommitstats =
1101 : 393 : pgstat_get_transactional_drops(true, &commitstats);
1102 : 393 : hdr.nabortstats =
1103 : 393 : pgstat_get_transactional_drops(false, &abortstats);
5230 simon@2ndQuadrant.co 1104 : 393 : hdr.ninvalmsgs = xactGetCommittedInvalidationMessages(&invalmsgs,
1105 : : &hdr.initfileinval);
2489 tgl@sss.pgh.pa.us 1106 : 393 : hdr.gidlen = strlen(gxact->gid) + 1; /* Include '\0' */
1107 : : /* EndPrepare will fill the origin data, if necessary */
790 michael@paquier.xyz 1108 : 393 : hdr.origin_lsn = InvalidXLogRecPtr;
1109 : 393 : hdr.origin_timestamp = 0;
1110 : :
6876 tgl@sss.pgh.pa.us 1111 : 393 : save_state_data(&hdr, sizeof(TwoPhaseFileHeader));
2957 simon@2ndQuadrant.co 1112 : 393 : save_state_data(gxact->gid, hdr.gidlen);
1113 : :
1114 : : /*
1115 : : * Add the additional info about subxacts, deletable files and cache
1116 : : * invalidation messages.
1117 : : */
6876 tgl@sss.pgh.pa.us 1118 [ + + ]: 393 : if (hdr.nsubxacts > 0)
1119 : : {
1120 : 150 : save_state_data(children, hdr.nsubxacts * sizeof(TransactionId));
1121 : : /* While we have the child-xact data, stuff it in the gxact too */
1122 : 150 : GXactLoadSubxactData(gxact, hdr.nsubxacts, children);
1123 : : }
1124 [ + + ]: 393 : if (hdr.ncommitrels > 0)
1125 : : {
648 rhaas@postgresql.org 1126 : 9 : save_state_data(commitrels, hdr.ncommitrels * sizeof(RelFileLocator));
6876 tgl@sss.pgh.pa.us 1127 : 9 : pfree(commitrels);
1128 : : }
1129 [ + + ]: 393 : if (hdr.nabortrels > 0)
1130 : : {
648 rhaas@postgresql.org 1131 : 17 : save_state_data(abortrels, hdr.nabortrels * sizeof(RelFileLocator));
6876 tgl@sss.pgh.pa.us 1132 : 17 : pfree(abortrels);
1133 : : }
739 andres@anarazel.de 1134 [ + + ]: 393 : if (hdr.ncommitstats > 0)
1135 : : {
1136 : 9 : save_state_data(commitstats,
1137 : 9 : hdr.ncommitstats * sizeof(xl_xact_stats_item));
1138 : 9 : pfree(commitstats);
1139 : : }
1140 [ + + ]: 393 : if (hdr.nabortstats > 0)
1141 : : {
1142 : 13 : save_state_data(abortstats,
703 tgl@sss.pgh.pa.us 1143 : 13 : hdr.nabortstats * sizeof(xl_xact_stats_item));
739 andres@anarazel.de 1144 : 13 : pfree(abortstats);
1145 : : }
5230 simon@2ndQuadrant.co 1146 [ + + ]: 393 : if (hdr.ninvalmsgs > 0)
1147 : : {
1148 : 23 : save_state_data(invalmsgs,
1149 : 23 : hdr.ninvalmsgs * sizeof(SharedInvalidationMessage));
1150 : 23 : pfree(invalmsgs);
1151 : : }
6876 tgl@sss.pgh.pa.us 1152 : 393 : }
1153 : :
1154 : : /*
1155 : : * Finish preparing state data and writing it to WAL.
1156 : : */
1157 : : void
1158 : 391 : EndPrepare(GlobalTransaction gxact)
1159 : : {
1160 : : TwoPhaseFileHeader *hdr;
1161 : : StateFileChunk *record;
1162 : : bool replorigin;
1163 : :
1164 : : /* Add the end sentinel to the list of 2PC records */
1165 : 391 : RegisterTwoPhaseRecord(TWOPHASE_RM_END_ID, 0,
1166 : : NULL, 0);
1167 : :
1168 : : /* Go back and fill in total_len in the file header record */
1169 : 391 : hdr = (TwoPhaseFileHeader *) records.head->data;
1170 [ - + ]: 391 : Assert(hdr->magic == TWOPHASE_MAGIC);
3288 heikki.linnakangas@i 1171 : 391 : hdr->total_len = records.total_len + sizeof(pg_crc32c);
1172 : :
2209 simon@2ndQuadrant.co 1173 [ + + ]: 425 : replorigin = (replorigin_session_origin != InvalidRepOriginId &&
1174 [ + - ]: 34 : replorigin_session_origin != DoNotReplicateId);
1175 : :
1176 [ + + ]: 391 : if (replorigin)
1177 : : {
1178 : 34 : hdr->origin_lsn = replorigin_session_origin_lsn;
1179 : 34 : hdr->origin_timestamp = replorigin_session_origin_timestamp;
1180 : : }
1181 : :
1182 : : /*
1183 : : * If the data size exceeds MaxAllocSize, we won't be able to read it in
1184 : : * ReadTwoPhaseFile. Check for that now, rather than fail in the case
1185 : : * where we write data to file and then re-read at commit time.
1186 : : */
5809 heikki.linnakangas@i 1187 [ - + ]: 391 : if (hdr->total_len > MaxAllocSize)
5809 heikki.linnakangas@i 1188 [ # # ]:UBC 0 : ereport(ERROR,
1189 : : (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
1190 : : errmsg("two-phase state file maximum length exceeded")));
1191 : :
1192 : : /*
1193 : : * Now writing 2PC state data to WAL. We let the WAL's CRC protection
1194 : : * cover us, so no need to calculate a separate CRC.
1195 : : *
1196 : : * We have to set DELAY_CHKPT_START here, too; otherwise a checkpoint
1197 : : * starting immediately after the WAL record is inserted could complete
1198 : : * without fsync'ing our state file. (This is essentially the same kind
1199 : : * of race condition as the COMMIT-to-clog-write case that
1200 : : * RecordTransactionCommit uses DELAY_CHKPT_START for; see notes there.)
1201 : : *
1202 : : * We save the PREPARE record's location in the gxact for later use by
1203 : : * CheckPointTwoPhase.
1204 : : */
3433 heikki.linnakangas@i 1205 :CBC 391 : XLogEnsureRecordSpace(0, records.num_chunks);
1206 : :
6876 tgl@sss.pgh.pa.us 1207 : 391 : START_CRIT_SECTION();
1208 : :
737 rhaas@postgresql.org 1209 [ - + ]: 391 : Assert((MyProc->delayChkptFlags & DELAY_CHKPT_START) == 0);
1210 : 391 : MyProc->delayChkptFlags |= DELAY_CHKPT_START;
1211 : :
3433 heikki.linnakangas@i 1212 : 391 : XLogBeginInsert();
1213 [ + + ]: 823 : for (record = records.head; record != NULL; record = record->next)
1214 : 432 : XLogRegisterData(record->data, record->len);
1215 : :
2209 simon@2ndQuadrant.co 1216 : 391 : XLogSetRecordFlags(XLOG_INCLUDE_ORIGIN);
1217 : :
3007 1218 : 391 : gxact->prepare_end_lsn = XLogInsert(RM_XACT_ID, XLOG_XACT_PREPARE);
1219 : :
2209 1220 [ + + ]: 391 : if (replorigin)
1221 : : {
1222 : : /* Move LSNs forward for this replication origin */
1223 : 34 : replorigin_session_advance(replorigin_session_origin_lsn,
1224 : : gxact->prepare_end_lsn);
1225 : : }
1226 : :
3007 1227 : 391 : XLogFlush(gxact->prepare_end_lsn);
1228 : :
1229 : : /* If we crash now, we have prepared: WAL replay will fix things */
1230 : :
1231 : : /* Store record's start location to read that later on Commit */
1232 : 391 : gxact->prepare_start_lsn = ProcLastRecPtr;
1233 : :
1234 : : /*
1235 : : * Mark the prepared transaction as valid. As soon as xact.c marks MyProc
1236 : : * as not running our XID (which it will do immediately after this
1237 : : * function returns), others can commit/rollback the xact.
1238 : : *
1239 : : * NB: a side effect of this is to make a dummy ProcArray entry for the
1240 : : * prepared XID. This must happen before we clear the XID from MyProc /
1241 : : * ProcGlobal->xids[], else there is a window where the XID is not running
1242 : : * according to TransactionIdIsInProgress, and onlookers would be entitled
1243 : : * to assume the xact crashed. Instead we have a window where the same
1244 : : * XID appears twice in ProcArray, which is OK.
1245 : : */
2496 alvherre@alvh.no-ip. 1246 : 391 : MarkAsPrepared(gxact, false);
1247 : :
1248 : : /*
1249 : : * Now we can mark ourselves as out of the commit critical section: a
1250 : : * checkpoint starting after this will certainly see the gxact as a
1251 : : * candidate for fsyncing.
1252 : : */
737 rhaas@postgresql.org 1253 : 391 : MyProc->delayChkptFlags &= ~DELAY_CHKPT_START;
1254 : :
1255 : : /*
1256 : : * Remember that we have this GlobalTransaction entry locked for us. If
1257 : : * we crash after this point, it's too late to abort, but we must unlock
1258 : : * it so that the prepared transaction can be committed or rolled back.
1259 : : */
3622 heikki.linnakangas@i 1260 : 391 : MyLockedGxact = gxact;
1261 : :
6876 tgl@sss.pgh.pa.us 1262 [ - + ]: 391 : END_CRIT_SECTION();
1263 : :
1264 : : /*
1265 : : * Wait for synchronous replication, if required.
1266 : : *
1267 : : * Note that at this stage we have marked the prepare, but still show as
1268 : : * running in the procarray (twice!) and continue to hold locks.
1269 : : */
2938 rhaas@postgresql.org 1270 : 391 : SyncRepWaitForLSN(gxact->prepare_end_lsn, false);
1271 : :
6876 tgl@sss.pgh.pa.us 1272 : 391 : records.tail = records.head = NULL;
3433 heikki.linnakangas@i 1273 : 391 : records.num_chunks = 0;
6876 tgl@sss.pgh.pa.us 1274 : 391 : }
1275 : :
1276 : : /*
1277 : : * Register a 2PC record to be written to state file.
1278 : : */
1279 : : void
1280 : 1822 : RegisterTwoPhaseRecord(TwoPhaseRmgrId rmid, uint16 info,
1281 : : const void *data, uint32 len)
1282 : : {
1283 : : TwoPhaseRecordOnDisk record;
1284 : :
1285 : 1822 : record.rmid = rmid;
1286 : 1822 : record.info = info;
1287 : 1822 : record.len = len;
1288 : 1822 : save_state_data(&record, sizeof(TwoPhaseRecordOnDisk));
1289 [ + + ]: 1822 : if (len > 0)
1290 : 1431 : save_state_data(data, len);
1291 : 1822 : }
1292 : :
1293 : :
1294 : : /*
1295 : : * Read and validate the state file for xid.
1296 : : *
1297 : : * If it looks OK (has a valid magic number and CRC), return the palloc'd
1298 : : * contents of the file, issuing an error when finding corrupted data. If
1299 : : * missing_ok is true, which indicates that missing files can be safely
1300 : : * ignored, then return NULL. This state can be reached when doing recovery.
1301 : : */
1302 : : static char *
2046 michael@paquier.xyz 1303 : 96 : ReadTwoPhaseFile(TransactionId xid, bool missing_ok)
1304 : : {
1305 : : char path[MAXPGPATH];
1306 : : char *buf;
1307 : : TwoPhaseFileHeader *hdr;
1308 : : int fd;
1309 : : struct stat stat;
1310 : : uint32 crc_offset;
1311 : : pg_crc32c calc_crc,
1312 : : file_crc;
1313 : : int r;
1314 : :
6876 tgl@sss.pgh.pa.us 1315 : 96 : TwoPhaseFilePath(path, xid);
1316 : :
2395 peter_e@gmx.net 1317 : 96 : fd = OpenTransientFile(path, O_RDONLY | PG_BINARY);
6876 tgl@sss.pgh.pa.us 1318 [ + + ]: 96 : if (fd < 0)
1319 : : {
2046 michael@paquier.xyz 1320 [ + - + - ]: 23 : if (missing_ok && errno == ENOENT)
1321 : 23 : return NULL;
1322 : :
2046 michael@paquier.xyz 1323 [ # # ]:UBC 0 : ereport(ERROR,
1324 : : (errcode_for_file_access(),
1325 : : errmsg("could not open file \"%s\": %m", path)));
1326 : : }
1327 : :
1328 : : /*
1329 : : * Check file length. We can determine a lower bound pretty easily. We
1330 : : * set an upper bound to avoid palloc() failure on a corrupt file, though
1331 : : * we can't guarantee that we won't get an out of memory error anyway,
1332 : : * even on a valid file.
1333 : : */
6876 tgl@sss.pgh.pa.us 1334 [ - + ]:CBC 73 : if (fstat(fd, &stat))
2046 michael@paquier.xyz 1335 [ # # ]:UBC 0 : ereport(ERROR,
1336 : : (errcode_for_file_access(),
1337 : : errmsg("could not stat file \"%s\": %m", path)));
1338 : :
6876 tgl@sss.pgh.pa.us 1339 [ + - ]:CBC 73 : if (stat.st_size < (MAXALIGN(sizeof(TwoPhaseFileHeader)) +
1340 : : MAXALIGN(sizeof(TwoPhaseRecordOnDisk)) +
3288 heikki.linnakangas@i 1341 : 73 : sizeof(pg_crc32c)) ||
5809 1342 [ - + ]: 73 : stat.st_size > MaxAllocSize)
2046 michael@paquier.xyz 1343 [ # # ]:UBC 0 : ereport(ERROR,
1344 : : (errcode(ERRCODE_DATA_CORRUPTED),
1345 : : errmsg_plural("incorrect size of file \"%s\": %lld byte",
1346 : : "incorrect size of file \"%s\": %lld bytes",
1347 : : (long long int) stat.st_size, path,
1348 : : (long long int) stat.st_size)));
1349 : :
3288 heikki.linnakangas@i 1350 :CBC 73 : crc_offset = stat.st_size - sizeof(pg_crc32c);
6876 tgl@sss.pgh.pa.us 1351 [ - + ]: 73 : if (crc_offset != MAXALIGN(crc_offset))
2046 michael@paquier.xyz 1352 [ # # ]:UBC 0 : ereport(ERROR,
1353 : : (errcode(ERRCODE_DATA_CORRUPTED),
1354 : : errmsg("incorrect alignment of CRC offset for file \"%s\"",
1355 : : path)));
1356 : :
1357 : : /*
1358 : : * OK, slurp in the file.
1359 : : */
6876 tgl@sss.pgh.pa.us 1360 :CBC 73 : buf = (char *) palloc(stat.st_size);
1361 : :
2584 rhaas@postgresql.org 1362 : 73 : pgstat_report_wait_start(WAIT_EVENT_TWOPHASE_FILE_READ);
2097 michael@paquier.xyz 1363 : 73 : r = read(fd, buf, stat.st_size);
1364 [ - + ]: 73 : if (r != stat.st_size)
1365 : : {
2046 michael@paquier.xyz 1366 [ # # ]:UBC 0 : if (r < 0)
1367 [ # # ]: 0 : ereport(ERROR,
1368 : : (errcode_for_file_access(),
1369 : : errmsg("could not read file \"%s\": %m", path)));
1370 : : else
1371 [ # # ]: 0 : ereport(ERROR,
1372 : : (errmsg("could not read file \"%s\": read %d of %lld",
1373 : : path, r, (long long int) stat.st_size)));
1374 : : }
1375 : :
2584 rhaas@postgresql.org 1376 :CBC 73 : pgstat_report_wait_end();
1377 : :
1744 peter@eisentraut.org 1378 [ - + ]: 73 : if (CloseTransientFile(fd) != 0)
1863 michael@paquier.xyz 1379 [ # # ]:UBC 0 : ereport(ERROR,
1380 : : (errcode_for_file_access(),
1381 : : errmsg("could not close file \"%s\": %m", path)));
1382 : :
6876 tgl@sss.pgh.pa.us 1383 :CBC 73 : hdr = (TwoPhaseFileHeader *) buf;
2046 michael@paquier.xyz 1384 [ - + ]: 73 : if (hdr->magic != TWOPHASE_MAGIC)
2046 michael@paquier.xyz 1385 [ # # ]:UBC 0 : ereport(ERROR,
1386 : : (errcode(ERRCODE_DATA_CORRUPTED),
1387 : : errmsg("invalid magic number stored in file \"%s\"",
1388 : : path)));
1389 : :
2046 michael@paquier.xyz 1390 [ - + ]:CBC 73 : if (hdr->total_len != stat.st_size)
2046 michael@paquier.xyz 1391 [ # # ]:UBC 0 : ereport(ERROR,
1392 : : (errcode(ERRCODE_DATA_CORRUPTED),
1393 : : errmsg("invalid size stored in file \"%s\"",
1394 : : path)));
1395 : :
3449 heikki.linnakangas@i 1396 :CBC 73 : INIT_CRC32C(calc_crc);
1397 : 73 : COMP_CRC32C(calc_crc, buf, crc_offset);
1398 : 73 : FIN_CRC32C(calc_crc);
1399 : :
3288 1400 : 73 : file_crc = *((pg_crc32c *) (buf + crc_offset));
1401 : :
3449 1402 [ - + ]: 73 : if (!EQ_CRC32C(calc_crc, file_crc))
2046 michael@paquier.xyz 1403 [ # # ]:UBC 0 : ereport(ERROR,
1404 : : (errcode(ERRCODE_DATA_CORRUPTED),
1405 : : errmsg("calculated CRC checksum does not match value stored in file \"%s\"",
1406 : : path)));
1407 : :
6876 tgl@sss.pgh.pa.us 1408 :CBC 73 : return buf;
1409 : : }
1410 : :
1411 : :
1412 : : /*
1413 : : * Reads 2PC data from xlog. During checkpoint this data will be moved to
1414 : : * twophase files and ReadTwoPhaseFile should be used instead.
1415 : : *
1416 : : * Note clearly that this function can access WAL during normal operation,
1417 : : * similarly to the way WALSender or Logical Decoding would do.
1418 : : */
1419 : : static void
3007 simon@2ndQuadrant.co 1420 : 480 : XlogReadTwoPhaseData(XLogRecPtr lsn, char **buf, int *len)
1421 : : {
1422 : : XLogRecord *record;
1423 : : XLogReaderState *xlogreader;
1424 : : char *errormsg;
1425 : :
1070 tmunro@postgresql.or 1426 : 480 : xlogreader = XLogReaderAllocate(wal_segment_size, NULL,
1427 : 480 : XL_ROUTINE(.page_read = &read_local_xlog_page,
1428 : : .segment_open = &wal_segment_open,
1429 : : .segment_close = &wal_segment_close),
1430 : : NULL);
3007 simon@2ndQuadrant.co 1431 [ - + ]: 480 : if (!xlogreader)
3007 simon@2ndQuadrant.co 1432 [ # # ]:UBC 0 : ereport(ERROR,
1433 : : (errcode(ERRCODE_OUT_OF_MEMORY),
1434 : : errmsg("out of memory"),
1435 : : errdetail("Failed while allocating a WAL reading processor.")));
1436 : :
1540 heikki.linnakangas@i 1437 :CBC 480 : XLogBeginRead(xlogreader, lsn);
1070 tmunro@postgresql.or 1438 : 480 : record = XLogReadRecord(xlogreader, &errormsg);
1439 : :
3007 simon@2ndQuadrant.co 1440 [ - + ]: 480 : if (record == NULL)
1441 : : {
885 noah@leadboat.com 1442 [ # # ]:UBC 0 : if (errormsg)
1443 [ # # ]: 0 : ereport(ERROR,
1444 : : (errcode_for_file_access(),
1445 : : errmsg("could not read two-phase state from WAL at %X/%X: %s",
1446 : : LSN_FORMAT_ARGS(lsn), errormsg)));
1447 : : else
1448 [ # # ]: 0 : ereport(ERROR,
1449 : : (errcode_for_file_access(),
1450 : : errmsg("could not read two-phase state from WAL at %X/%X",
1451 : : LSN_FORMAT_ARGS(lsn))));
1452 : : }
1453 : :
3007 simon@2ndQuadrant.co 1454 [ + - ]:CBC 480 : if (XLogRecGetRmid(xlogreader) != RM_XACT_ID ||
1455 [ - + ]: 480 : (XLogRecGetInfo(xlogreader) & XLOG_XACT_OPMASK) != XLOG_XACT_PREPARE)
3007 simon@2ndQuadrant.co 1456 [ # # ]:UBC 0 : ereport(ERROR,
1457 : : (errcode_for_file_access(),
1458 : : errmsg("expected two-phase state data is not present in WAL at %X/%X",
1459 : : LSN_FORMAT_ARGS(lsn))));
1460 : :
3007 simon@2ndQuadrant.co 1461 [ + + ]:CBC 480 : if (len != NULL)
1462 : 35 : *len = XLogRecGetDataLen(xlogreader);
1463 : :
2866 rhaas@postgresql.org 1464 : 480 : *buf = palloc(sizeof(char) * XLogRecGetDataLen(xlogreader));
3007 simon@2ndQuadrant.co 1465 : 480 : memcpy(*buf, XLogRecGetData(xlogreader), sizeof(char) * XLogRecGetDataLen(xlogreader));
1466 : :
1467 : 480 : XLogReaderFree(xlogreader);
1468 : 480 : }
1469 : :
1470 : :
1471 : : /*
1472 : : * Confirms an xid is prepared, during recovery
1473 : : */
1474 : : bool
5230 1475 : 23 : StandbyTransactionIdIsPrepared(TransactionId xid)
1476 : : {
1477 : : char *buf;
1478 : : TwoPhaseFileHeader *hdr;
1479 : : bool result;
1480 : :
1481 [ - + ]: 23 : Assert(TransactionIdIsValid(xid));
1482 : :
5100 tgl@sss.pgh.pa.us 1483 [ - + ]: 23 : if (max_prepared_xacts <= 0)
5031 bruce@momjian.us 1484 :LBC (1) : return false; /* nothing to do */
1485 : :
1486 : : /* Read and validate file */
2046 michael@paquier.xyz 1487 :CBC 23 : buf = ReadTwoPhaseFile(xid, true);
5230 simon@2ndQuadrant.co 1488 [ + - ]: 23 : if (buf == NULL)
1489 : 23 : return false;
1490 : :
1491 : : /* Check header also */
5230 simon@2ndQuadrant.co 1492 :UBC 0 : hdr = (TwoPhaseFileHeader *) buf;
1493 : 0 : result = TransactionIdEquals(hdr->xid, xid);
1494 : 0 : pfree(buf);
1495 : :
1496 : 0 : return result;
1497 : : }
1498 : :
1499 : : /*
1500 : : * FinishPreparedTransaction: execute COMMIT PREPARED or ROLLBACK PREPARED
1501 : : */
1502 : : void
6875 tgl@sss.pgh.pa.us 1503 :CBC 400 : FinishPreparedTransaction(const char *gid, bool isCommit)
1504 : : {
1505 : : GlobalTransaction gxact;
1506 : : PGPROC *proc;
1507 : : TransactionId xid;
1508 : : char *buf;
1509 : : char *bufptr;
1510 : : TwoPhaseFileHeader *hdr;
1511 : : TransactionId latestXid;
1512 : : TransactionId *children;
1513 : : RelFileLocator *commitrels;
1514 : : RelFileLocator *abortrels;
1515 : : RelFileLocator *delrels;
1516 : : int ndelrels;
1517 : : xl_xact_stats_item *commitstats;
1518 : : xl_xact_stats_item *abortstats;
1519 : : SharedInvalidationMessage *invalmsgs;
1520 : :
1521 : : /*
1522 : : * Validate the GID, and lock the GXACT to ensure that two backends do not
1523 : : * try to commit the same GID at once.
1524 : : */
6876 1525 : 400 : gxact = LockGXact(gid, GetUserId());
52 heikki.linnakangas@i 1526 :GNC 394 : proc = GetPGProcByNumber(gxact->pgprocno);
1339 andres@anarazel.de 1527 :CBC 394 : xid = gxact->xid;
1528 : :
1529 : : /*
1530 : : * Read and validate 2PC state data. State data will typically be stored
1531 : : * in WAL files if the LSN is after the last checkpoint record, or moved
1532 : : * to disk if for some reason they have lived for a long time.
1533 : : */
3007 simon@2ndQuadrant.co 1534 [ + + ]: 394 : if (gxact->ondisk)
2046 michael@paquier.xyz 1535 : 25 : buf = ReadTwoPhaseFile(xid, false);
1536 : : else
3007 simon@2ndQuadrant.co 1537 : 369 : XlogReadTwoPhaseData(gxact->prepare_start_lsn, &buf, NULL);
1538 : :
1539 : :
1540 : : /*
1541 : : * Disassemble the header area
1542 : : */
6876 tgl@sss.pgh.pa.us 1543 : 394 : hdr = (TwoPhaseFileHeader *) buf;
1544 [ - + ]: 394 : Assert(TransactionIdEquals(hdr->xid, xid));
1545 : 394 : bufptr = buf + MAXALIGN(sizeof(TwoPhaseFileHeader));
2957 simon@2ndQuadrant.co 1546 : 394 : bufptr += MAXALIGN(hdr->gidlen);
6876 tgl@sss.pgh.pa.us 1547 : 394 : children = (TransactionId *) bufptr;
1548 : 394 : bufptr += MAXALIGN(hdr->nsubxacts * sizeof(TransactionId));
648 rhaas@postgresql.org 1549 : 394 : commitrels = (RelFileLocator *) bufptr;
1550 : 394 : bufptr += MAXALIGN(hdr->ncommitrels * sizeof(RelFileLocator));
1551 : 394 : abortrels = (RelFileLocator *) bufptr;
1552 : 394 : bufptr += MAXALIGN(hdr->nabortrels * sizeof(RelFileLocator));
703 tgl@sss.pgh.pa.us 1553 : 394 : commitstats = (xl_xact_stats_item *) bufptr;
739 andres@anarazel.de 1554 : 394 : bufptr += MAXALIGN(hdr->ncommitstats * sizeof(xl_xact_stats_item));
703 tgl@sss.pgh.pa.us 1555 : 394 : abortstats = (xl_xact_stats_item *) bufptr;
739 andres@anarazel.de 1556 : 394 : bufptr += MAXALIGN(hdr->nabortstats * sizeof(xl_xact_stats_item));
5230 simon@2ndQuadrant.co 1557 : 394 : invalmsgs = (SharedInvalidationMessage *) bufptr;
1558 : 394 : bufptr += MAXALIGN(hdr->ninvalmsgs * sizeof(SharedInvalidationMessage));
1559 : :
1560 : : /* compute latestXid among all children */
6063 tgl@sss.pgh.pa.us 1561 : 394 : latestXid = TransactionIdLatest(xid, hdr->nsubxacts, children);
1562 : :
1563 : : /* Prevent cancel/die interrupt while cleaning up */
2173 teodor@sigaev.ru 1564 : 394 : HOLD_INTERRUPTS();
1565 : :
1566 : : /*
1567 : : * The order of operations here is critical: make the XLOG entry for
1568 : : * commit or abort, then mark the transaction committed or aborted in
1569 : : * pg_xact, then remove its PGPROC from the global ProcArray (which means
1570 : : * TransactionIdIsInProgress will stop saying the prepared xact is in
1571 : : * progress), then run the post-commit or post-abort callbacks. The
1572 : : * callbacks will release the locks the transaction held.
1573 : : */
6876 tgl@sss.pgh.pa.us 1574 [ + + ]: 394 : if (isCommit)
1575 : 352 : RecordTransactionCommitPrepared(xid,
1576 : : hdr->nsubxacts, children,
1577 : : hdr->ncommitrels, commitrels,
1578 : : hdr->ncommitstats,
1579 : : commitstats,
1580 : : hdr->ninvalmsgs, invalmsgs,
2209 simon@2ndQuadrant.co 1581 : 352 : hdr->initfileinval, gid);
1582 : : else
6876 tgl@sss.pgh.pa.us 1583 : 42 : RecordTransactionAbortPrepared(xid,
1584 : : hdr->nsubxacts, children,
1585 : : hdr->nabortrels, abortrels,
1586 : : hdr->nabortstats,
1587 : : abortstats,
1588 : : gid);
1589 : :
4524 rhaas@postgresql.org 1590 : 394 : ProcArrayRemove(proc, latestXid);
1591 : :
1592 : : /*
1593 : : * In case we fail while running the callbacks, mark the gxact invalid so
1594 : : * no one else will try to commit/rollback, and so it will be recycled if
1595 : : * we fail after this point. It is still locked by our backend so it
1596 : : * won't go away yet.
1597 : : *
1598 : : * (We assume it's safe to do this without taking TwoPhaseStateLock.)
1599 : : */
6876 tgl@sss.pgh.pa.us 1600 : 394 : gxact->valid = false;
1601 : :
1602 : : /*
1603 : : * We have to remove any files that were supposed to be dropped. For
1604 : : * consistency with the regular xact.c code paths, must do this before
1605 : : * releasing locks, so do it before running the callbacks.
1606 : : *
1607 : : * NB: this code knows that we couldn't be dropping any temp rels ...
1608 : : */
1609 [ + + ]: 394 : if (isCommit)
1610 : : {
5625 heikki.linnakangas@i 1611 : 352 : delrels = commitrels;
1612 : 352 : ndelrels = hdr->ncommitrels;
1613 : : }
1614 : : else
1615 : : {
1616 : 42 : delrels = abortrels;
1617 : 42 : ndelrels = hdr->nabortrels;
1618 : : }
1619 : :
1620 : : /* Make sure files supposed to be dropped are dropped */
2110 fujii@postgresql.org 1621 : 394 : DropRelationFiles(delrels, ndelrels, false);
1622 : :
739 andres@anarazel.de 1623 [ + + ]: 394 : if (isCommit)
1624 : 352 : pgstat_execute_transactional_drops(hdr->ncommitstats, commitstats, false);
1625 : : else
1626 : 42 : pgstat_execute_transactional_drops(hdr->nabortstats, abortstats, false);
1627 : :
1628 : : /*
1629 : : * Handle cache invalidation messages.
1630 : : *
1631 : : * Relcache init file invalidation requires processing both before and
1632 : : * after we send the SI messages, only when committing. See
1633 : : * AtEOXact_Inval().
1634 : : */
976 michael@paquier.xyz 1635 [ + + ]: 394 : if (isCommit)
1636 : : {
1637 [ - + ]: 352 : if (hdr->initfileinval)
976 michael@paquier.xyz 1638 :UBC 0 : RelationCacheInitFilePreInvalidate();
976 michael@paquier.xyz 1639 :CBC 352 : SendSharedInvalidMessages(invalmsgs, hdr->ninvalmsgs);
1640 [ - + ]: 352 : if (hdr->initfileinval)
976 michael@paquier.xyz 1641 :UBC 0 : RelationCacheInitFilePostInvalidate();
1642 : : }
1643 : :
1644 : : /*
1645 : : * Acquire the two-phase lock. We want to work on the two-phase callbacks
1646 : : * while holding it to avoid potential conflicts with other transactions
1647 : : * attempting to use the same GID, so the lock is released once the shared
1648 : : * memory state is cleared.
1649 : : */
1875 michael@paquier.xyz 1650 :CBC 394 : LWLockAcquire(TwoPhaseStateLock, LW_EXCLUSIVE);
1651 : :
1652 : : /* And now do the callbacks */
6875 tgl@sss.pgh.pa.us 1653 [ + + ]: 394 : if (isCommit)
1654 : 352 : ProcessRecords(bufptr, xid, twophase_postcommit_callbacks);
1655 : : else
1656 : 42 : ProcessRecords(bufptr, xid, twophase_postabort_callbacks);
1657 : :
4815 heikki.linnakangas@i 1658 : 394 : PredicateLockTwoPhaseFinish(xid, isCommit);
1659 : :
1660 : : /* Clear shared memory state */
1875 michael@paquier.xyz 1661 : 394 : RemoveGXact(gxact);
1662 : :
1663 : : /*
1664 : : * Release the lock as all callbacks are called and shared memory cleanup
1665 : : * is done.
1666 : : */
1667 : 394 : LWLockRelease(TwoPhaseStateLock);
1668 : :
1669 : : /* Count the prepared xact as committed or aborted */
1831 akapila@postgresql.o 1670 : 394 : AtEOXact_PgStat(isCommit, false);
1671 : :
1672 : : /*
1673 : : * And now we can clean up any files we may have left.
1674 : : */
3007 simon@2ndQuadrant.co 1675 [ + + ]: 394 : if (gxact->ondisk)
1676 : 25 : RemoveTwoPhaseFile(xid, true);
1677 : :
3622 heikki.linnakangas@i 1678 : 394 : MyLockedGxact = NULL;
1679 : :
2173 teodor@sigaev.ru 1680 [ - + ]: 394 : RESUME_INTERRUPTS();
1681 : :
6876 tgl@sss.pgh.pa.us 1682 : 394 : pfree(buf);
1683 : 394 : }
1684 : :
1685 : : /*
1686 : : * Scan 2PC state data in memory and call the indicated callbacks for each 2PC record.
1687 : : */
1688 : : static void
1689 : 424 : ProcessRecords(char *bufptr, TransactionId xid,
1690 : : const TwoPhaseCallback callbacks[])
1691 : : {
1692 : : for (;;)
1693 : 1594 : {
1694 : 2018 : TwoPhaseRecordOnDisk *record = (TwoPhaseRecordOnDisk *) bufptr;
1695 : :
1696 [ - + ]: 2018 : Assert(record->rmid <= TWOPHASE_RM_MAX_ID);
1697 [ + + ]: 2018 : if (record->rmid == TWOPHASE_RM_END_ID)
1698 : 424 : break;
1699 : :
1700 : 1594 : bufptr += MAXALIGN(sizeof(TwoPhaseRecordOnDisk));
1701 : :
1702 [ + + ]: 1594 : if (callbacks[record->rmid] != NULL)
6756 bruce@momjian.us 1703 : 1518 : callbacks[record->rmid] (xid, record->info,
1704 : : (void *) bufptr, record->len);
1705 : :
6876 tgl@sss.pgh.pa.us 1706 : 1594 : bufptr += MAXALIGN(record->len);
1707 : : }
1708 : 424 : }
1709 : :
1710 : : /*
1711 : : * Remove the 2PC file for the specified XID.
1712 : : *
1713 : : * If giveWarning is false, do not complain about file-not-present;
1714 : : * this is an expected case during WAL replay.
1715 : : */
1716 : : static void
1717 : 30 : RemoveTwoPhaseFile(TransactionId xid, bool giveWarning)
1718 : : {
1719 : : char path[MAXPGPATH];
1720 : :
1721 : 30 : TwoPhaseFilePath(path, xid);
1722 [ - + ]: 30 : if (unlink(path))
6876 tgl@sss.pgh.pa.us 1723 [ # # # # ]:UBC 0 : if (errno != ENOENT || giveWarning)
1724 [ # # ]: 0 : ereport(WARNING,
1725 : : (errcode_for_file_access(),
1726 : : errmsg("could not remove file \"%s\": %m", path)));
6876 tgl@sss.pgh.pa.us 1727 :CBC 30 : }
1728 : :
1729 : : /*
1730 : : * Recreates a state file. This is used in WAL replay and during
1731 : : * checkpoint creation.
1732 : : *
1733 : : * Note: content and len don't include CRC.
1734 : : */
1735 : : static void
1736 : 35 : RecreateTwoPhaseFile(TransactionId xid, void *content, int len)
1737 : : {
1738 : : char path[MAXPGPATH];
1739 : : pg_crc32c statefile_crc;
1740 : : int fd;
1741 : :
1742 : : /* Recompute CRC */
3449 heikki.linnakangas@i 1743 : 35 : INIT_CRC32C(statefile_crc);
1744 : 35 : COMP_CRC32C(statefile_crc, content, len);
1745 : 35 : FIN_CRC32C(statefile_crc);
1746 : :
6876 tgl@sss.pgh.pa.us 1747 : 35 : TwoPhaseFilePath(path, xid);
1748 : :
4156 heikki.linnakangas@i 1749 : 35 : fd = OpenTransientFile(path,
1750 : : O_CREAT | O_TRUNC | O_WRONLY | PG_BINARY);
6876 tgl@sss.pgh.pa.us 1751 [ - + ]: 35 : if (fd < 0)
6876 tgl@sss.pgh.pa.us 1752 [ # # ]:UBC 0 : ereport(ERROR,
1753 : : (errcode_for_file_access(),
1754 : : errmsg("could not recreate file \"%s\": %m", path)));
1755 : :
1756 : : /* Write content and CRC */
2079 michael@paquier.xyz 1757 :CBC 35 : errno = 0;
2584 rhaas@postgresql.org 1758 : 35 : pgstat_report_wait_start(WAIT_EVENT_TWOPHASE_FILE_WRITE);
6876 tgl@sss.pgh.pa.us 1759 [ - + ]: 35 : if (write(fd, content, len) != len)
1760 : : {
1761 : : /* if write didn't set errno, assume problem is no disk space */
1824 michael@paquier.xyz 1762 [ # # ]:UBC 0 : if (errno == 0)
1763 : 0 : errno = ENOSPC;
6876 tgl@sss.pgh.pa.us 1764 [ # # ]: 0 : ereport(ERROR,
1765 : : (errcode_for_file_access(),
1766 : : errmsg("could not write file \"%s\": %m", path)));
1767 : : }
3288 heikki.linnakangas@i 1768 [ - + ]:CBC 35 : if (write(fd, &statefile_crc, sizeof(pg_crc32c)) != sizeof(pg_crc32c))
1769 : : {
1770 : : /* if write didn't set errno, assume problem is no disk space */
1824 michael@paquier.xyz 1771 [ # # ]:UBC 0 : if (errno == 0)
1772 : 0 : errno = ENOSPC;
6876 tgl@sss.pgh.pa.us 1773 [ # # ]: 0 : ereport(ERROR,
1774 : : (errcode_for_file_access(),
1775 : : errmsg("could not write file \"%s\": %m", path)));
1776 : : }
2584 rhaas@postgresql.org 1777 :CBC 35 : pgstat_report_wait_end();
1778 : :
1779 : : /*
1780 : : * We must fsync the file because the end-of-replay checkpoint will not do
1781 : : * so, there being no GXACT in shared memory yet to tell it to.
1782 : : */
1783 : 35 : pgstat_report_wait_start(WAIT_EVENT_TWOPHASE_FILE_SYNC);
6876 tgl@sss.pgh.pa.us 1784 [ - + ]: 35 : if (pg_fsync(fd) != 0)
6876 tgl@sss.pgh.pa.us 1785 [ # # ]:UBC 0 : ereport(ERROR,
1786 : : (errcode_for_file_access(),
1787 : : errmsg("could not fsync file \"%s\": %m", path)));
2584 rhaas@postgresql.org 1788 :CBC 35 : pgstat_report_wait_end();
1789 : :
4156 heikki.linnakangas@i 1790 [ - + ]: 35 : if (CloseTransientFile(fd) != 0)
6876 tgl@sss.pgh.pa.us 1791 [ # # ]:UBC 0 : ereport(ERROR,
1792 : : (errcode_for_file_access(),
1793 : : errmsg("could not close file \"%s\": %m", path)));
6876 tgl@sss.pgh.pa.us 1794 :CBC 35 : }
1795 : :
1796 : : /*
1797 : : * CheckPointTwoPhase -- handle 2PC component of checkpointing.
1798 : : *
1799 : : * We must fsync the state file of any GXACT that is valid or has been
1800 : : * generated during redo and has a PREPARE LSN <= the checkpoint's redo
1801 : : * horizon. (If the gxact isn't valid yet, has not been generated in
1802 : : * redo, or has a later LSN, this checkpoint is not responsible for
1803 : : * fsyncing it.)
1804 : : *
1805 : : * This is deliberately run as late as possible in the checkpoint sequence,
1806 : : * because GXACTs ordinarily have short lifespans, and so it is quite
1807 : : * possible that GXACTs that were valid at checkpoint start will no longer
1808 : : * exist if we wait a little bit. With typical checkpoint settings this
1809 : : * will be about 3 minutes for an online checkpoint, so as a result we
1810 : : * expect that there will be no GXACTs that need to be copied to disk.
1811 : : *
1812 : : * If a GXACT remains valid across multiple checkpoints, it will already
1813 : : * be on disk so we don't bother to repeat that write.
1814 : : */
1815 : : void
6874 1816 : 1148 : CheckPointTwoPhase(XLogRecPtr redo_horizon)
1817 : : {
1818 : : int i;
3007 simon@2ndQuadrant.co 1819 : 1148 : int serialized_xacts = 0;
1820 : :
6874 tgl@sss.pgh.pa.us 1821 [ + + ]: 1148 : if (max_prepared_xacts <= 0)
1822 : 908 : return; /* nothing to do */
1823 : :
1824 : : TRACE_POSTGRESQL_TWOPHASE_CHECKPOINT_START();
1825 : :
1826 : : /*
1827 : : * We are expecting there to be zero GXACTs that need to be copied to
1828 : : * disk, so we perform all I/O while holding TwoPhaseStateLock for
1829 : : * simplicity. This prevents any new xacts from preparing while this
1830 : : * occurs, which shouldn't be a problem since the presence of long-lived
1831 : : * prepared xacts indicates the transaction manager isn't active.
1832 : : *
1833 : : * It's also possible to move I/O out of the lock, but on every error we
1834 : : * should check whether somebody committed our transaction in different
1835 : : * backend. Let's leave this optimization for future, if somebody will
1836 : : * spot that this place cause bottleneck.
1837 : : *
1838 : : * Note that it isn't possible for there to be a GXACT with a
1839 : : * prepare_end_lsn set prior to the last checkpoint yet is marked invalid,
1840 : : * because of the efforts with delayChkptFlags.
1841 : : */
1842 : 240 : LWLockAcquire(TwoPhaseStateLock, LW_SHARED);
1843 [ + + ]: 282 : for (i = 0; i < TwoPhaseState->numPrepXacts; i++)
1844 : : {
1845 : : /*
1846 : : * Note that we are using gxact not PGPROC so this works in recovery
1847 : : * also
1848 : : */
6756 bruce@momjian.us 1849 : 42 : GlobalTransaction gxact = TwoPhaseState->prepXacts[i];
1850 : :
2567 simon@2ndQuadrant.co 1851 [ + + + - ]: 42 : if ((gxact->valid || gxact->inredo) &&
3007 1852 [ + + ]: 42 : !gxact->ondisk &&
1853 [ + + ]: 38 : gxact->prepare_end_lsn <= redo_horizon)
1854 : : {
1855 : : char *buf;
1856 : : int len;
1857 : :
1858 : 35 : XlogReadTwoPhaseData(gxact->prepare_start_lsn, &buf, &len);
2567 1859 : 35 : RecreateTwoPhaseFile(gxact->xid, buf, len);
3007 1860 : 35 : gxact->ondisk = true;
2567 1861 : 35 : gxact->prepare_start_lsn = InvalidXLogRecPtr;
1862 : 35 : gxact->prepare_end_lsn = InvalidXLogRecPtr;
3007 1863 : 35 : pfree(buf);
1864 : 35 : serialized_xacts++;
1865 : : }
1866 : : }
1867 : 240 : LWLockRelease(TwoPhaseStateLock);
1868 : :
1869 : : /*
1870 : : * Flush unconditionally the parent directory to make any information
1871 : : * durable on disk. Two-phase files could have been removed and those
1872 : : * removals need to be made persistent as well as any files newly created
1873 : : * previously since the last checkpoint.
1874 : : */
2575 teodor@sigaev.ru 1875 : 240 : fsync_fname(TWOPHASE_DIR, true);
1876 : :
1877 : : TRACE_POSTGRESQL_TWOPHASE_CHECKPOINT_DONE();
1878 : :
3007 simon@2ndQuadrant.co 1879 [ + - + + ]: 240 : if (log_checkpoints && serialized_xacts > 0)
1880 [ + - ]: 30 : ereport(LOG,
1881 : : (errmsg_plural("%u two-phase state file was written "
1882 : : "for a long-running prepared transaction",
1883 : : "%u two-phase state files were written "
1884 : : "for long-running prepared transactions",
1885 : : serialized_xacts,
1886 : : serialized_xacts)));
1887 : : }
1888 : :
1889 : : /*
1890 : : * restoreTwoPhaseData
1891 : : *
1892 : : * Scan pg_twophase and fill TwoPhaseState depending on the on-disk data.
1893 : : * This is called once at the beginning of recovery, saving any extra
1894 : : * lookups in the future. Two-phase files that are newer than the
1895 : : * minimum XID horizon are discarded on the way.
1896 : : */
1897 : : void
2567 1898 : 823 : restoreTwoPhaseData(void)
1899 : : {
1900 : : DIR *cldir;
1901 : : struct dirent *clde;
1902 : :
2496 alvherre@alvh.no-ip. 1903 : 823 : LWLockAcquire(TwoPhaseStateLock, LW_EXCLUSIVE);
2323 tgl@sss.pgh.pa.us 1904 : 823 : cldir = AllocateDir(TWOPHASE_DIR);
2567 simon@2ndQuadrant.co 1905 [ + + ]: 2484 : while ((clde = ReadDir(cldir, TWOPHASE_DIR)) != NULL)
1906 : : {
137 akorotkov@postgresql 1907 [ + + ]:GNC 1661 : if (strlen(clde->d_name) == 16 &&
1908 [ + - ]: 15 : strspn(clde->d_name, "0123456789ABCDEF") == 16)
1909 : : {
1910 : : TransactionId xid;
1911 : : FullTransactionId fxid;
1912 : : char *buf;
1913 : :
1914 : 15 : fxid = FullTransactionIdFromU64(strtou64(clde->d_name, NULL, 16));
1915 : 15 : xid = XidFromFullTransactionId(fxid);
1916 : :
2567 simon@2ndQuadrant.co 1917 :CBC 15 : buf = ProcessTwoPhaseBuffer(xid, InvalidXLogRecPtr,
1918 : : true, false, false);
1919 [ - + ]: 15 : if (buf == NULL)
2567 simon@2ndQuadrant.co 1920 :UBC 0 : continue;
1921 : :
2209 simon@2ndQuadrant.co 1922 :CBC 15 : PrepareRedoAdd(buf, InvalidXLogRecPtr,
1923 : : InvalidXLogRecPtr, InvalidRepOriginId);
1924 : : }
1925 : : }
2496 alvherre@alvh.no-ip. 1926 : 823 : LWLockRelease(TwoPhaseStateLock);
2567 simon@2ndQuadrant.co 1927 : 823 : FreeDir(cldir);
1928 : 823 : }
1929 : :
1930 : : /*
1931 : : * PrescanPreparedTransactions
1932 : : *
1933 : : * Scan the shared memory entries of TwoPhaseState and determine the range
1934 : : * of valid XIDs present. This is run during database startup, after we
1935 : : * have completed reading WAL. TransamVariables->nextXid has been set to
1936 : : * one more than the highest XID for which evidence exists in WAL.
1937 : : *
1938 : : * We throw away any prepared xacts with main XID beyond nextXid --- if any
1939 : : * are present, it suggests that the DBA has done a PITR recovery to an
1940 : : * earlier point in time without cleaning out pg_twophase. We dare not
1941 : : * try to recover such prepared xacts since they likely depend on database
1942 : : * state that doesn't exist now.
1943 : : *
1944 : : * However, we will advance nextXid beyond any subxact XIDs belonging to
1945 : : * valid prepared xacts. We need to do this since subxact commit doesn't
1946 : : * write a WAL entry, and so there might be no evidence in WAL of those
1947 : : * subxact XIDs.
1948 : : *
1949 : : * On corrupted two-phase files, fail immediately. Keeping around broken
1950 : : * entries and let replay continue causes harm on the system, and a new
1951 : : * backup should be rolled in.
1952 : : *
1953 : : * Our other responsibility is to determine and return the oldest valid XID
1954 : : * among the prepared xacts (if none, return TransamVariables->nextXid).
1955 : : * This is needed to synchronize pg_subtrans startup properly.
1956 : : *
1957 : : * If xids_p and nxids_p are not NULL, pointer to a palloc'd array of all
1958 : : * top-level xids is stored in *xids_p. The number of entries in the array
1959 : : * is returned in *nxids_p.
1960 : : */
1961 : : TransactionId
5230 1962 : 797 : PrescanPreparedTransactions(TransactionId **xids_p, int *nxids_p)
1963 : : {
128 heikki.linnakangas@i 1964 :GNC 797 : FullTransactionId nextXid = TransamVariables->nextXid;
1342 andres@anarazel.de 1965 :CBC 797 : TransactionId origNextXid = XidFromFullTransactionId(nextXid);
6876 tgl@sss.pgh.pa.us 1966 : 797 : TransactionId result = origNextXid;
5230 simon@2ndQuadrant.co 1967 : 797 : TransactionId *xids = NULL;
1968 : 797 : int nxids = 0;
1969 : 797 : int allocsize = 0;
1970 : : int i;
1971 : :
2496 alvherre@alvh.no-ip. 1972 : 797 : LWLockAcquire(TwoPhaseStateLock, LW_EXCLUSIVE);
2567 simon@2ndQuadrant.co 1973 [ + + ]: 847 : for (i = 0; i < TwoPhaseState->numPrepXacts; i++)
1974 : : {
1975 : : TransactionId xid;
1976 : : char *buf;
1977 : 50 : GlobalTransaction gxact = TwoPhaseState->prepXacts[i];
1978 : :
1979 [ - + ]: 50 : Assert(gxact->inredo);
1980 : :
1981 : 50 : xid = gxact->xid;
1982 : :
1983 : 50 : buf = ProcessTwoPhaseBuffer(xid,
1984 : : gxact->prepare_start_lsn,
2524 bruce@momjian.us 1985 : 50 : gxact->ondisk, false, true);
1986 : :
2567 simon@2ndQuadrant.co 1987 [ - + ]: 50 : if (buf == NULL)
2567 simon@2ndQuadrant.co 1988 :UBC 0 : continue;
1989 : :
1990 : : /*
1991 : : * OK, we think this file is valid. Incorporate xid into the
1992 : : * running-minimum result.
1993 : : */
2553 simon@2ndQuadrant.co 1994 [ + + ]:CBC 50 : if (TransactionIdPrecedes(xid, result))
1995 : 41 : result = xid;
1996 : :
2567 1997 [ + + ]: 50 : if (xids_p)
1998 : : {
1999 [ + + ]: 20 : if (nxids == allocsize)
2000 : : {
2001 [ + - ]: 16 : if (nxids == 0)
2002 : : {
2003 : 16 : allocsize = 10;
2004 : 16 : xids = palloc(allocsize * sizeof(TransactionId));
2005 : : }
2006 : : else
2007 : : {
2567 simon@2ndQuadrant.co 2008 :UBC 0 : allocsize = allocsize * 2;
2009 : 0 : xids = repalloc(xids, allocsize * sizeof(TransactionId));
2010 : : }
2011 : : }
2567 simon@2ndQuadrant.co 2012 :CBC 20 : xids[nxids++] = xid;
2013 : : }
2014 : :
2015 : 50 : pfree(buf);
2016 : : }
2017 : 797 : LWLockRelease(TwoPhaseStateLock);
2018 : :
5230 2019 [ + + ]: 797 : if (xids_p)
2020 : : {
2021 : 68 : *xids_p = xids;
2022 : 68 : *nxids_p = nxids;
2023 : : }
2024 : :
6876 tgl@sss.pgh.pa.us 2025 : 797 : return result;
2026 : : }
2027 : :
2028 : : /*
2029 : : * StandbyRecoverPreparedTransactions
2030 : : *
2031 : : * Scan the shared memory entries of TwoPhaseState and setup all the required
2032 : : * information to allow standby queries to treat prepared transactions as still
2033 : : * active.
2034 : : *
2035 : : * This is never called at the end of recovery - we use
2036 : : * RecoverPreparedTransactions() at that point.
2037 : : *
2038 : : * The lack of calls to SubTransSetParent() calls here is by design;
2039 : : * those calls are made by RecoverPreparedTransactions() at the end of recovery
2040 : : * for those xacts that need this.
2041 : : */
2042 : : void
2544 simon@2ndQuadrant.co 2043 : 68 : StandbyRecoverPreparedTransactions(void)
2044 : : {
2045 : : int i;
2046 : :
2496 alvherre@alvh.no-ip. 2047 : 68 : LWLockAcquire(TwoPhaseStateLock, LW_EXCLUSIVE);
2567 simon@2ndQuadrant.co 2048 [ + + ]: 88 : for (i = 0; i < TwoPhaseState->numPrepXacts; i++)
2049 : : {
2050 : : TransactionId xid;
2051 : : char *buf;
2052 : 20 : GlobalTransaction gxact = TwoPhaseState->prepXacts[i];
2053 : :
2054 [ - + ]: 20 : Assert(gxact->inredo);
2055 : :
2056 : 20 : xid = gxact->xid;
2057 : :
2058 : 20 : buf = ProcessTwoPhaseBuffer(xid,
2059 : : gxact->prepare_start_lsn,
2524 bruce@momjian.us 2060 : 20 : gxact->ondisk, false, false);
2567 simon@2ndQuadrant.co 2061 [ + - ]: 20 : if (buf != NULL)
2775 2062 : 20 : pfree(buf);
2063 : : }
2567 2064 : 68 : LWLockRelease(TwoPhaseStateLock);
5115 heikki.linnakangas@i 2065 : 68 : }
2066 : :
2067 : : /*
2068 : : * RecoverPreparedTransactions
2069 : : *
2070 : : * Scan the shared memory entries of TwoPhaseState and reload the state for
2071 : : * each prepared transaction (reacquire locks, etc).
2072 : : *
2073 : : * This is run at the end of recovery, but before we allow backends to write
2074 : : * WAL.
2075 : : *
2076 : : * At the end of recovery the way we take snapshots will change. We now need
2077 : : * to mark all running transactions with their full SubTransSetParent() info
2078 : : * to allow normal snapshots to work correctly if snapshots overflow.
2079 : : * We do this here because by definition prepared transactions are the only
2080 : : * type of write transaction still running, so this is necessary and
2081 : : * complete.
2082 : : */
2083 : : void
6876 tgl@sss.pgh.pa.us 2084 : 729 : RecoverPreparedTransactions(void)
2085 : : {
2086 : : int i;
2087 : :
2496 alvherre@alvh.no-ip. 2088 : 729 : LWLockAcquire(TwoPhaseStateLock, LW_EXCLUSIVE);
2567 simon@2ndQuadrant.co 2089 [ + + ]: 759 : for (i = 0; i < TwoPhaseState->numPrepXacts; i++)
2090 : : {
2091 : : TransactionId xid;
2092 : : char *buf;
2093 : 30 : GlobalTransaction gxact = TwoPhaseState->prepXacts[i];
2094 : : char *bufptr;
2095 : : TwoPhaseFileHeader *hdr;
2096 : : TransactionId *subxids;
2097 : : const char *gid;
2098 : :
2099 : 30 : xid = gxact->xid;
2100 : :
2101 : : /*
2102 : : * Reconstruct subtrans state for the transaction --- needed because
2103 : : * pg_subtrans is not preserved over a restart. Note that we are
2104 : : * linking all the subtransactions directly to the top-level XID;
2105 : : * there may originally have been a more complex hierarchy, but
2106 : : * there's no need to restore that exactly. It's possible that
2107 : : * SubTransSetParent has been set before, if the prepared transaction
2108 : : * generated xid assignment records.
2109 : : */
2110 : 30 : buf = ProcessTwoPhaseBuffer(xid,
2111 : : gxact->prepare_start_lsn,
2524 bruce@momjian.us 2112 : 30 : gxact->ondisk, true, false);
2567 simon@2ndQuadrant.co 2113 [ - + ]: 30 : if (buf == NULL)
2567 simon@2ndQuadrant.co 2114 :UBC 0 : continue;
2115 : :
2567 simon@2ndQuadrant.co 2116 [ + - ]:CBC 30 : ereport(LOG,
2117 : : (errmsg("recovering prepared transaction %u from shared memory", xid)));
2118 : :
2119 : 30 : hdr = (TwoPhaseFileHeader *) buf;
2120 [ - + ]: 30 : Assert(TransactionIdEquals(hdr->xid, xid));
2121 : 30 : bufptr = buf + MAXALIGN(sizeof(TwoPhaseFileHeader));
2122 : 30 : gid = (const char *) bufptr;
2123 : 30 : bufptr += MAXALIGN(hdr->gidlen);
2124 : 30 : subxids = (TransactionId *) bufptr;
2125 : 30 : bufptr += MAXALIGN(hdr->nsubxacts * sizeof(TransactionId));
648 rhaas@postgresql.org 2126 : 30 : bufptr += MAXALIGN(hdr->ncommitrels * sizeof(RelFileLocator));
2127 : 30 : bufptr += MAXALIGN(hdr->nabortrels * sizeof(RelFileLocator));
739 andres@anarazel.de 2128 : 30 : bufptr += MAXALIGN(hdr->ncommitstats * sizeof(xl_xact_stats_item));
2129 : 30 : bufptr += MAXALIGN(hdr->nabortstats * sizeof(xl_xact_stats_item));
2567 simon@2ndQuadrant.co 2130 : 30 : bufptr += MAXALIGN(hdr->ninvalmsgs * sizeof(SharedInvalidationMessage));
2131 : :
2132 : : /*
2133 : : * Recreate its GXACT and dummy PGPROC. But, check whether it was
2134 : : * added in redo and already has a shmem entry for it.
2135 : : */
2136 : 30 : MarkAsPreparingGuts(gxact, xid, gid,
2137 : : hdr->prepared_at,
2138 : : hdr->owner, hdr->database);
2139 : :
2140 : : /* recovered, so reset the flag for entries generated by redo */
2141 : 30 : gxact->inredo = false;
2142 : :
2143 : 30 : GXactLoadSubxactData(gxact, hdr->nsubxacts, subxids);
2496 alvherre@alvh.no-ip. 2144 : 30 : MarkAsPrepared(gxact, true);
2145 : :
2146 : 30 : LWLockRelease(TwoPhaseStateLock);
2147 : :
2148 : : /*
2149 : : * Recover other state (notably locks) using resource managers.
2150 : : */
2567 simon@2ndQuadrant.co 2151 : 30 : ProcessRecords(bufptr, xid, twophase_recover_callbacks);
2152 : :
2153 : : /*
2154 : : * Release locks held by the standby process after we process each
2155 : : * prepared transaction. As a result, we don't need too many
2156 : : * additional locks at any one time.
2157 : : */
2158 [ + + ]: 30 : if (InHotStandby)
2159 : 6 : StandbyReleaseLockTree(xid, hdr->nsubxacts, subxids);
2160 : :
2161 : : /*
2162 : : * We're done with recovering this transaction. Clear MyLockedGxact,
2163 : : * like we do in PrepareTransaction() during normal operation.
2164 : : */
2165 : 30 : PostPrepare_Twophase();
2166 : :
2167 : 30 : pfree(buf);
2168 : :
2496 alvherre@alvh.no-ip. 2169 : 30 : LWLockAcquire(TwoPhaseStateLock, LW_EXCLUSIVE);
2170 : : }
2171 : :
2172 : 729 : LWLockRelease(TwoPhaseStateLock);
2567 simon@2ndQuadrant.co 2173 : 729 : }
2174 : :
2175 : : /*
2176 : : * ProcessTwoPhaseBuffer
2177 : : *
2178 : : * Given a transaction id, read it either from disk or read it directly
2179 : : * via shmem xlog record pointer using the provided "prepare_start_lsn".
2180 : : *
2181 : : * If setParent is true, set up subtransaction parent linkages.
2182 : : *
2183 : : * If setNextXid is true, set TransamVariables->nextXid to the newest
2184 : : * value scanned.
2185 : : */
2186 : : static char *
2187 : 115 : ProcessTwoPhaseBuffer(TransactionId xid,
2188 : : XLogRecPtr prepare_start_lsn,
2189 : : bool fromdisk,
2190 : : bool setParent, bool setNextXid)
2191 : : {
128 heikki.linnakangas@i 2192 :GNC 115 : FullTransactionId nextXid = TransamVariables->nextXid;
1342 andres@anarazel.de 2193 :CBC 115 : TransactionId origNextXid = XidFromFullTransactionId(nextXid);
2194 : : TransactionId *subxids;
2195 : : char *buf;
2196 : : TwoPhaseFileHeader *hdr;
2197 : : int i;
2198 : :
2496 alvherre@alvh.no-ip. 2199 [ - + ]: 115 : Assert(LWLockHeldByMeInMode(TwoPhaseStateLock, LW_EXCLUSIVE));
2200 : :
2567 simon@2ndQuadrant.co 2201 [ + + ]: 115 : if (!fromdisk)
2202 [ - + ]: 68 : Assert(prepare_start_lsn != InvalidXLogRecPtr);
2203 : :
2204 : : /* Already processed? */
2205 [ + - - + ]: 115 : if (TransactionIdDidCommit(xid) || TransactionIdDidAbort(xid))
2206 : : {
2567 simon@2ndQuadrant.co 2207 [ # # ]:UBC 0 : if (fromdisk)
2208 : : {
2209 [ # # ]: 0 : ereport(WARNING,
2210 : : (errmsg("removing stale two-phase state file for transaction %u",
2211 : : xid)));
2212 : 0 : RemoveTwoPhaseFile(xid, true);
2213 : : }
2214 : : else
2215 : : {
2216 [ # # ]: 0 : ereport(WARNING,
2217 : : (errmsg("removing stale two-phase state from memory for transaction %u",
2218 : : xid)));
2219 : 0 : PrepareRedoRemove(xid, true);
2220 : : }
2221 : 0 : return NULL;
2222 : : }
2223 : :
2224 : : /* Reject XID if too new */
2567 simon@2ndQuadrant.co 2225 [ - + ]:CBC 115 : if (TransactionIdFollowsOrEquals(xid, origNextXid))
2226 : : {
2567 simon@2ndQuadrant.co 2227 [ # # ]:UBC 0 : if (fromdisk)
2228 : : {
2229 [ # # ]: 0 : ereport(WARNING,
2230 : : (errmsg("removing future two-phase state file for transaction %u",
2231 : : xid)));
2232 : 0 : RemoveTwoPhaseFile(xid, true);
2233 : : }
2234 : : else
2235 : : {
2236 [ # # ]: 0 : ereport(WARNING,
2237 : : (errmsg("removing future two-phase state from memory for transaction %u",
2238 : : xid)));
2239 : 0 : PrepareRedoRemove(xid, true);
2240 : : }
2241 : 0 : return NULL;
2242 : : }
2243 : :
2567 simon@2ndQuadrant.co 2244 [ + + ]:CBC 115 : if (fromdisk)
2245 : : {
2246 : : /* Read and validate file */
2046 michael@paquier.xyz 2247 : 47 : buf = ReadTwoPhaseFile(xid, false);
2248 : : }
2249 : : else
2250 : : {
2251 : : /* Read xlog data */
2567 simon@2ndQuadrant.co 2252 : 68 : XlogReadTwoPhaseData(prepare_start_lsn, &buf, NULL);
2253 : : }
2254 : :
2255 : : /* Deconstruct header */
2256 : 115 : hdr = (TwoPhaseFileHeader *) buf;
2257 [ - + ]: 115 : if (!TransactionIdEquals(hdr->xid, xid))
2258 : : {
2567 simon@2ndQuadrant.co 2259 [ # # ]:UBC 0 : if (fromdisk)
2046 michael@paquier.xyz 2260 [ # # ]: 0 : ereport(ERROR,
2261 : : (errcode(ERRCODE_DATA_CORRUPTED),
2262 : : errmsg("corrupted two-phase state file for transaction %u",
2263 : : xid)));
2264 : : else
2265 [ # # ]: 0 : ereport(ERROR,
2266 : : (errcode(ERRCODE_DATA_CORRUPTED),
2267 : : errmsg("corrupted two-phase state in memory for transaction %u",
2268 : : xid)));
2269 : : }
2270 : :
2271 : : /*
2272 : : * Examine subtransaction XIDs ... they should all follow main XID, and
2273 : : * they may force us to advance nextXid.
2274 : : */
2567 simon@2ndQuadrant.co 2275 :CBC 115 : subxids = (TransactionId *) (buf +
2276 : 115 : MAXALIGN(sizeof(TwoPhaseFileHeader)) +
2277 : 115 : MAXALIGN(hdr->gidlen));
2278 [ + + ]: 2497 : for (i = 0; i < hdr->nsubxacts; i++)
2279 : : {
2280 : 2382 : TransactionId subxid = subxids[i];
2281 : :
2282 [ - + ]: 2382 : Assert(TransactionIdFollows(subxid, xid));
2283 : :
2284 : : /* update nextXid if needed */
1844 tmunro@postgresql.or 2285 [ + + ]: 2382 : if (setNextXid)
2286 : 1021 : AdvanceNextFullTransactionIdPastXid(subxid);
2287 : :
2567 simon@2ndQuadrant.co 2288 [ + + ]: 2382 : if (setParent)
2544 2289 : 345 : SubTransSetParent(subxid, xid);
2290 : : }
2291 : :
2567 2292 : 115 : return buf;
2293 : : }
2294 : :
2295 : :
2296 : : /*
2297 : : * RecordTransactionCommitPrepared
2298 : : *
2299 : : * This is basically the same as RecordTransactionCommit (q.v. if you change
2300 : : * this function): in particular, we must set DELAY_CHKPT_START to avoid a
2301 : : * race condition.
2302 : : *
2303 : : * We know the transaction made at least one XLOG entry (its PREPARE),
2304 : : * so it is never possible to optimize out the commit record.
2305 : : */
2306 : : static void
6876 tgl@sss.pgh.pa.us 2307 : 352 : RecordTransactionCommitPrepared(TransactionId xid,
2308 : : int nchildren,
2309 : : TransactionId *children,
2310 : : int nrels,
2311 : : RelFileLocator *rels,
2312 : : int nstats,
2313 : : xl_xact_stats_item *stats,
2314 : : int ninvalmsgs,
2315 : : SharedInvalidationMessage *invalmsgs,
2316 : : bool initfileinval,
2317 : : const char *gid)
2318 : : {
2319 : : XLogRecPtr recptr;
3120 alvherre@alvh.no-ip. 2320 : 352 : TimestampTz committs = GetCurrentTimestamp();
2321 : : bool replorigin;
2322 : :
2323 : : /*
2324 : : * Are we using the replication origins feature? Or, in other words, are
2325 : : * we replaying remote actions?
2326 : : */
2327 [ + + ]: 376 : replorigin = (replorigin_session_origin != InvalidRepOriginId &&
2328 [ + - ]: 24 : replorigin_session_origin != DoNotReplicateId);
2329 : :
6876 tgl@sss.pgh.pa.us 2330 : 352 : START_CRIT_SECTION();
2331 : :
2332 : : /* See notes in RecordTransactionCommit */
737 rhaas@postgresql.org 2333 [ - + ]: 352 : Assert((MyProc->delayChkptFlags & DELAY_CHKPT_START) == 0);
2334 : 352 : MyProc->delayChkptFlags |= DELAY_CHKPT_START;
2335 : :
2336 : : /*
2337 : : * Emit the XLOG commit record. Note that we mark 2PC commits as
2338 : : * potentially having AccessExclusiveLocks since we don't know whether or
2339 : : * not they do.
2340 : : */
3120 alvherre@alvh.no-ip. 2341 : 352 : recptr = XactLogCommitRecord(committs,
2342 : : nchildren, children, nrels, rels,
2343 : : nstats, stats,
2344 : : ninvalmsgs, invalmsgs,
2345 : : initfileinval,
2489 tgl@sss.pgh.pa.us 2346 : 352 : MyXactFlags | XACT_FLAGS_ACQUIREDACCESSEXCLUSIVELOCK,
2347 : : xid, gid);
2348 : :
2349 : :
3120 alvherre@alvh.no-ip. 2350 [ + + ]: 352 : if (replorigin)
2351 : : /* Move LSNs forward for this replication origin */
2352 : 24 : replorigin_session_advance(replorigin_session_origin_lsn,
2353 : : XactLastRecEnd);
2354 : :
2355 : : /*
2356 : : * Record commit timestamp. The value comes from plain commit timestamp
2357 : : * if replorigin is not enabled, or replorigin already set a value for us
2358 : : * in replorigin_session_origin_timestamp otherwise.
2359 : : *
2360 : : * We don't need to WAL-log anything here, as the commit record written
2361 : : * above already contains the data.
2362 : : */
2363 [ + + - + ]: 352 : if (!replorigin || replorigin_session_origin_timestamp == 0)
2364 : 328 : replorigin_session_origin_timestamp = committs;
2365 : :
2366 : 352 : TransactionTreeSetCommitTsData(xid, nchildren, children,
2367 : : replorigin_session_origin_timestamp,
2368 : : replorigin_session_origin);
2369 : :
2370 : : /*
2371 : : * We don't currently try to sleep before flush here ... nor is there any
2372 : : * support for async commit of a prepared xact (the very idea is probably
2373 : : * a contradiction)
2374 : : */
2375 : :
2376 : : /* Flush XLOG to disk */
6876 tgl@sss.pgh.pa.us 2377 : 352 : XLogFlush(recptr);
2378 : :
2379 : : /* Mark the transaction committed in pg_xact */
5655 alvherre@alvh.no-ip. 2380 : 352 : TransactionIdCommitTree(xid, nchildren, children);
2381 : :
2382 : : /* Checkpoint can proceed now */
737 rhaas@postgresql.org 2383 : 352 : MyProc->delayChkptFlags &= ~DELAY_CHKPT_START;
2384 : :
6876 tgl@sss.pgh.pa.us 2385 [ - + ]: 352 : END_CRIT_SECTION();
2386 : :
2387 : : /*
2388 : : * Wait for synchronous replication, if required.
2389 : : *
2390 : : * Note that at this stage we have marked clog, but still show as running
2391 : : * in the procarray and continue to hold locks.
2392 : : */
2938 rhaas@postgresql.org 2393 : 352 : SyncRepWaitForLSN(recptr, true);
6876 tgl@sss.pgh.pa.us 2394 : 352 : }
2395 : :
2396 : : /*
2397 : : * RecordTransactionAbortPrepared
2398 : : *
2399 : : * This is basically the same as RecordTransactionAbort.
2400 : : *
2401 : : * We know the transaction made at least one XLOG entry (its PREPARE),
2402 : : * so it is never possible to optimize out the abort record.
2403 : : */
2404 : : static void
2405 : 42 : RecordTransactionAbortPrepared(TransactionId xid,
2406 : : int nchildren,
2407 : : TransactionId *children,
2408 : : int nrels,
2409 : : RelFileLocator *rels,
2410 : : int nstats,
2411 : : xl_xact_stats_item *stats,
2412 : : const char *gid)
2413 : : {
2414 : : XLogRecPtr recptr;
2415 : : bool replorigin;
2416 : :
2417 : : /*
2418 : : * Are we using the replication origins feature? Or, in other words, are
2419 : : * we replaying remote actions?
2420 : : */
1133 akapila@postgresql.o 2421 [ + + ]: 52 : replorigin = (replorigin_session_origin != InvalidRepOriginId &&
2422 [ + - ]: 10 : replorigin_session_origin != DoNotReplicateId);
2423 : :
2424 : : /*
2425 : : * Catch the scenario where we aborted partway through
2426 : : * RecordTransactionCommitPrepared ...
2427 : : */
6876 tgl@sss.pgh.pa.us 2428 [ - + ]: 42 : if (TransactionIdDidCommit(xid))
6876 tgl@sss.pgh.pa.us 2429 [ # # ]:UBC 0 : elog(PANIC, "cannot abort transaction %u, it was already committed",
2430 : : xid);
2431 : :
6876 tgl@sss.pgh.pa.us 2432 :CBC 42 : START_CRIT_SECTION();
2433 : :
2434 : : /*
2435 : : * Emit the XLOG commit record. Note that we mark 2PC aborts as
2436 : : * potentially having AccessExclusiveLocks since we don't know whether or
2437 : : * not they do.
2438 : : */
3318 andres@anarazel.de 2439 : 42 : recptr = XactLogAbortRecord(GetCurrentTimestamp(),
2440 : : nchildren, children,
2441 : : nrels, rels,
2442 : : nstats, stats,
2489 tgl@sss.pgh.pa.us 2443 : 42 : MyXactFlags | XACT_FLAGS_ACQUIREDACCESSEXCLUSIVELOCK,
2444 : : xid, gid);
2445 : :
1133 akapila@postgresql.o 2446 [ + + ]: 42 : if (replorigin)
2447 : : /* Move LSNs forward for this replication origin */
2448 : 10 : replorigin_session_advance(replorigin_session_origin_lsn,
2449 : : XactLastRecEnd);
2450 : :
2451 : : /* Always flush, since we're about to remove the 2PC state file */
6876 tgl@sss.pgh.pa.us 2452 : 42 : XLogFlush(recptr);
2453 : :
2454 : : /*
2455 : : * Mark the transaction aborted in clog. This is not absolutely necessary
2456 : : * but we may as well do it while we are here.
2457 : : */
5655 alvherre@alvh.no-ip. 2458 : 42 : TransactionIdAbortTree(xid, nchildren, children);
2459 : :
6876 tgl@sss.pgh.pa.us 2460 [ - + ]: 42 : END_CRIT_SECTION();
2461 : :
2462 : : /*
2463 : : * Wait for synchronous replication, if required.
2464 : : *
2465 : : * Note that at this stage we have marked clog, but still show as running
2466 : : * in the procarray and continue to hold locks.
2467 : : */
2938 rhaas@postgresql.org 2468 : 42 : SyncRepWaitForLSN(recptr, false);
6876 tgl@sss.pgh.pa.us 2469 : 42 : }
2470 : :
2471 : : /*
2472 : : * PrepareRedoAdd
2473 : : *
2474 : : * Store pointers to the start/end of the WAL record along with the xid in
2475 : : * a gxact entry in shared memory TwoPhaseState structure. If caller
2476 : : * specifies InvalidXLogRecPtr as WAL location to fetch the two-phase
2477 : : * data, the entry is marked as located on disk.
2478 : : */
2479 : : void
2209 simon@2ndQuadrant.co 2480 : 90 : PrepareRedoAdd(char *buf, XLogRecPtr start_lsn,
2481 : : XLogRecPtr end_lsn, RepOriginId origin_id)
2482 : : {
2567 2483 : 90 : TwoPhaseFileHeader *hdr = (TwoPhaseFileHeader *) buf;
2484 : : char *bufptr;
2485 : : const char *gid;
2486 : : GlobalTransaction gxact;
2487 : :
2496 alvherre@alvh.no-ip. 2488 [ - + ]: 90 : Assert(LWLockHeldByMeInMode(TwoPhaseStateLock, LW_EXCLUSIVE));
2567 simon@2ndQuadrant.co 2489 [ - + ]: 90 : Assert(RecoveryInProgress());
2490 : :
2491 : 90 : bufptr = buf + MAXALIGN(sizeof(TwoPhaseFileHeader));
2492 : 90 : gid = (const char *) bufptr;
2493 : :
2494 : : /*
2495 : : * Reserve the GID for the given transaction in the redo code path.
2496 : : *
2497 : : * This creates a gxact struct and puts it into the active array.
2498 : : *
2499 : : * In redo, this struct is mainly used to track PREPARE/COMMIT entries in
2500 : : * shared memory. Hence, we only fill up the bare minimum contents here.
2501 : : * The gxact also gets marked with gxact->inredo set to true to indicate
2502 : : * that it got added in the redo phase
2503 : : */
2504 : :
2505 : : /*
2506 : : * In the event of a crash while a checkpoint was running, it may be
2507 : : * possible that some two-phase data found its way to disk while its
2508 : : * corresponding record needs to be replayed in the follow-up recovery. As
2509 : : * the 2PC data was on disk, it has already been restored at the beginning
2510 : : * of recovery with restoreTwoPhaseData(), so skip this record to avoid
2511 : : * duplicates in TwoPhaseState. If a consistent state has been reached,
2512 : : * the record is added to TwoPhaseState and it should have no
2513 : : * corresponding file in pg_twophase.
2514 : : */
271 michael@paquier.xyz 2515 [ + + ]: 90 : if (!XLogRecPtrIsInvalid(start_lsn))
2516 : : {
2517 : : char path[MAXPGPATH];
2518 : :
2519 : 75 : TwoPhaseFilePath(path, hdr->xid);
2520 : :
2521 [ - + ]: 75 : if (access(path, F_OK) == 0)
2522 : : {
271 michael@paquier.xyz 2523 [ # # # # ]:UBC 0 : ereport(reachedConsistency ? ERROR : WARNING,
2524 : : (errmsg("could not recover two-phase state file for transaction %u",
2525 : : hdr->xid),
2526 : : errdetail("Two-phase state file has been found in WAL record %X/%X, but this transaction has already been restored from disk.",
2527 : : LSN_FORMAT_ARGS(start_lsn))));
2528 : 0 : return;
2529 : : }
2530 : :
271 michael@paquier.xyz 2531 [ - + ]:CBC 75 : if (errno != ENOENT)
271 michael@paquier.xyz 2532 [ # # ]:UBC 0 : ereport(ERROR,
2533 : : (errcode_for_file_access(),
2534 : : errmsg("could not access file \"%s\": %m", path)));
2535 : : }
2536 : :
2537 : : /* Get a free gxact from the freelist */
2567 simon@2ndQuadrant.co 2538 [ - + ]:CBC 90 : if (TwoPhaseState->freeGXacts == NULL)
2567 simon@2ndQuadrant.co 2539 [ # # ]:UBC 0 : ereport(ERROR,
2540 : : (errcode(ERRCODE_OUT_OF_MEMORY),
2541 : : errmsg("maximum number of prepared transactions reached"),
2542 : : errhint("Increase max_prepared_transactions (currently %d).",
2543 : : max_prepared_xacts)));
2567 simon@2ndQuadrant.co 2544 :CBC 90 : gxact = TwoPhaseState->freeGXacts;
2545 : 90 : TwoPhaseState->freeGXacts = gxact->next;
2546 : :
2547 : 90 : gxact->prepared_at = hdr->prepared_at;
2548 : 90 : gxact->prepare_start_lsn = start_lsn;
2549 : 90 : gxact->prepare_end_lsn = end_lsn;
2550 : 90 : gxact->xid = hdr->xid;
2551 : 90 : gxact->owner = hdr->owner;
42 heikki.linnakangas@i 2552 :GNC 90 : gxact->locking_backend = INVALID_PROC_NUMBER;
2567 simon@2ndQuadrant.co 2553 :CBC 90 : gxact->valid = false;
2554 : 90 : gxact->ondisk = XLogRecPtrIsInvalid(start_lsn);
2524 bruce@momjian.us 2555 : 90 : gxact->inredo = true; /* yes, added in redo */
2567 simon@2ndQuadrant.co 2556 : 90 : strcpy(gxact->gid, gid);
2557 : :
2558 : : /* And insert it into the active array */
2559 [ - + ]: 90 : Assert(TwoPhaseState->numPrepXacts < max_prepared_xacts);
2560 : 90 : TwoPhaseState->prepXacts[TwoPhaseState->numPrepXacts++] = gxact;
2561 : :
2209 2562 [ + + ]: 90 : if (origin_id != InvalidRepOriginId)
2563 : : {
2564 : : /* recover apply progress */
2565 : 13 : replorigin_advance(origin_id, hdr->origin_lsn, end_lsn,
2566 : : false /* backward */ , false /* WAL */ );
2567 : : }
2568 : :
2496 alvherre@alvh.no-ip. 2569 [ - + ]: 90 : elog(DEBUG2, "added 2PC data in shared memory for transaction %u", gxact->xid);
2570 : : }
2571 : :
2572 : : /*
2573 : : * PrepareRedoRemove
2574 : : *
2575 : : * Remove the corresponding gxact entry from TwoPhaseState. Also remove
2576 : : * the 2PC file if a prepared transaction was saved via an earlier checkpoint.
2577 : : *
2578 : : * Caller must hold TwoPhaseStateLock in exclusive mode, because TwoPhaseState
2579 : : * is updated.
2580 : : */
2581 : : void
2567 simon@2ndQuadrant.co 2582 : 66 : PrepareRedoRemove(TransactionId xid, bool giveWarning)
2583 : : {
2584 : 66 : GlobalTransaction gxact = NULL;
2585 : : int i;
2553 2586 : 66 : bool found = false;
2587 : :
2496 alvherre@alvh.no-ip. 2588 [ - + ]: 66 : Assert(LWLockHeldByMeInMode(TwoPhaseStateLock, LW_EXCLUSIVE));
2567 simon@2ndQuadrant.co 2589 [ - + ]: 66 : Assert(RecoveryInProgress());
2590 : :
2591 [ + + ]: 66 : for (i = 0; i < TwoPhaseState->numPrepXacts; i++)
2592 : : {
2593 : 58 : gxact = TwoPhaseState->prepXacts[i];
2594 : :
2595 [ + - ]: 58 : if (gxact->xid == xid)
2596 : : {
2597 [ - + ]: 58 : Assert(gxact->inredo);
2553 2598 : 58 : found = true;
2567 2599 : 58 : break;
2600 : : }
2601 : : }
2602 : :
2603 : : /*
2604 : : * Just leave if there is nothing, this is expected during WAL replay.
2605 : : */
2553 2606 [ + + ]: 66 : if (!found)
2567 2607 : 8 : return;
2608 : :
2609 : : /*
2610 : : * And now we can clean up any files we may have left.
2611 : : */
2496 alvherre@alvh.no-ip. 2612 [ - + ]: 58 : elog(DEBUG2, "removing 2PC data for transaction %u", xid);
2567 simon@2ndQuadrant.co 2613 [ + + ]: 58 : if (gxact->ondisk)
2614 : 5 : RemoveTwoPhaseFile(xid, giveWarning);
2615 : 58 : RemoveGXact(gxact);
2616 : : }
2617 : :
2618 : : /*
2619 : : * LookupGXact
2620 : : * Check if the prepared transaction with the given GID, lsn and timestamp
2621 : : * exists.
2622 : : *
2623 : : * Note that we always compare with the LSN where prepare ends because that is
2624 : : * what is stored as origin_lsn in the 2PC file.
2625 : : *
2626 : : * This function is primarily used to check if the prepared transaction
2627 : : * received from the upstream (remote node) already exists. Checking only GID
2628 : : * is not sufficient because a different prepared xact with the same GID can
2629 : : * exist on the same node. So, we are ensuring to match origin_lsn and
2630 : : * origin_timestamp of prepared xact to avoid the possibility of a match of
2631 : : * prepared xact from two different nodes.
2632 : : */
2633 : : bool
1005 akapila@postgresql.o 2634 : 9 : LookupGXact(const char *gid, XLogRecPtr prepare_end_lsn,
2635 : : TimestampTz origin_prepare_timestamp)
2636 : : {
2637 : : int i;
2638 : 9 : bool found = false;
2639 : :
2640 : 9 : LWLockAcquire(TwoPhaseStateLock, LW_SHARED);
2641 [ + - ]: 9 : for (i = 0; i < TwoPhaseState->numPrepXacts; i++)
2642 : : {
2643 : 9 : GlobalTransaction gxact = TwoPhaseState->prepXacts[i];
2644 : :
2645 : : /* Ignore not-yet-valid GIDs. */
2646 [ + - + - ]: 9 : if (gxact->valid && strcmp(gxact->gid, gid) == 0)
2647 : : {
2648 : : char *buf;
2649 : : TwoPhaseFileHeader *hdr;
2650 : :
2651 : : /*
2652 : : * We are not expecting collisions of GXACTs (same gid) between
2653 : : * publisher and subscribers, so we perform all I/O while holding
2654 : : * TwoPhaseStateLock for simplicity.
2655 : : *
2656 : : * To move the I/O out of the lock, we need to ensure that no
2657 : : * other backend commits the prepared xact in the meantime. We can
2658 : : * do this optimization if we encounter many collisions in GID
2659 : : * between publisher and subscriber.
2660 : : */
2661 [ + + ]: 9 : if (gxact->ondisk)
2662 : 1 : buf = ReadTwoPhaseFile(gxact->xid, false);
2663 : : else
2664 : : {
2665 [ - + ]: 8 : Assert(gxact->prepare_start_lsn);
2666 : 8 : XlogReadTwoPhaseData(gxact->prepare_start_lsn, &buf, NULL);
2667 : : }
2668 : :
2669 : 9 : hdr = (TwoPhaseFileHeader *) buf;
2670 : :
2671 [ + - ]: 9 : if (hdr->origin_lsn == prepare_end_lsn &&
2672 [ + - ]: 9 : hdr->origin_timestamp == origin_prepare_timestamp)
2673 : : {
2674 : 9 : found = true;
2675 : 9 : pfree(buf);
2676 : 9 : break;
2677 : : }
2678 : :
1005 akapila@postgresql.o 2679 :UBC 0 : pfree(buf);
2680 : : }
2681 : : }
1005 akapila@postgresql.o 2682 :CBC 9 : LWLockRelease(TwoPhaseStateLock);
2683 : 9 : return found;
2684 : : }
|