Age Owner TLA Line data Source code
1 : /*-------------------------------------------------------------------------
2 : *
3 : * twophase.c
4 : * Two-phase commit support functions.
5 : *
6 : * Portions Copyright (c) 1996-2023, PostgreSQL Global Development Group
7 : * Portions Copyright (c) 1994, Regents of the University of California
8 : *
9 : * IDENTIFICATION
10 : * src/backend/access/transam/twophase.c
11 : *
12 : * NOTES
13 : * Each global transaction is associated with a global transaction
14 : * identifier (GID). The client assigns a GID to a postgres
15 : * transaction with the PREPARE TRANSACTION command.
16 : *
17 : * We keep all active global transactions in a shared memory array.
18 : * When the PREPARE TRANSACTION command is issued, the GID is
19 : * reserved for the transaction in the array. This is done before
20 : * a WAL entry is made, because the reservation checks for duplicate
21 : * GIDs and aborts the transaction if there already is a global
22 : * transaction in prepared state with the same GID.
23 : *
24 : * A global transaction (gxact) also has dummy PGPROC; this is what keeps
25 : * the XID considered running by TransactionIdIsInProgress. It is also
26 : * convenient as a PGPROC to hook the gxact's locks to.
27 : *
28 : * Information to recover prepared transactions in case of crash is
29 : * now stored in WAL for the common case. In some cases there will be
30 : * an extended period between preparing a GXACT and commit/abort, in
31 : * which case we need to separately record prepared transaction data
32 : * in permanent storage. This includes locking information, pending
33 : * notifications etc. All that state information is written to the
34 : * per-transaction state file in the pg_twophase directory.
35 : * All prepared transactions will be written prior to shutdown.
36 : *
37 : * Life track of state data is following:
38 : *
39 : * * On PREPARE TRANSACTION backend writes state data only to the WAL and
40 : * stores pointer to the start of the WAL record in
41 : * gxact->prepare_start_lsn.
42 : * * If COMMIT occurs before checkpoint then backend reads data from WAL
43 : * using prepare_start_lsn.
44 : * * On checkpoint state data copied to files in pg_twophase directory and
45 : * fsynced
46 : * * If COMMIT happens after checkpoint then backend reads state data from
47 : * files
48 : *
49 : * During replay and replication, TwoPhaseState also holds information
50 : * about active prepared transactions that haven't been moved to disk yet.
51 : *
52 : * Replay of twophase records happens by the following rules:
53 : *
54 : * * At the beginning of recovery, pg_twophase is scanned once, filling
55 : * TwoPhaseState with entries marked with gxact->inredo and
56 : * gxact->ondisk. Two-phase file data older than the XID horizon of
57 : * the redo position are discarded.
58 : * * On PREPARE redo, the transaction is added to TwoPhaseState->prepXacts.
59 : * gxact->inredo is set to true for such entries.
60 : * * On Checkpoint we iterate through TwoPhaseState->prepXacts entries
61 : * that have gxact->inredo set and are behind the redo_horizon. We
62 : * save them to disk and then switch gxact->ondisk to true.
63 : * * On COMMIT/ABORT we delete the entry from TwoPhaseState->prepXacts.
64 : * If gxact->ondisk is true, the corresponding entry from the disk
65 : * is additionally deleted.
66 : * * RecoverPreparedTransactions(), StandbyRecoverPreparedTransactions()
67 : * and PrescanPreparedTransactions() have been modified to go through
68 : * gxact->inredo entries that have not made it to disk.
69 : *
70 : *-------------------------------------------------------------------------
71 : */
72 : #include "postgres.h"
73 :
74 : #include <fcntl.h>
75 : #include <sys/stat.h>
76 : #include <time.h>
77 : #include <unistd.h>
78 :
79 : #include "access/commit_ts.h"
80 : #include "access/htup_details.h"
81 : #include "access/subtrans.h"
82 : #include "access/transam.h"
83 : #include "access/twophase.h"
84 : #include "access/twophase_rmgr.h"
85 : #include "access/xact.h"
86 : #include "access/xlog.h"
87 : #include "access/xloginsert.h"
88 : #include "access/xlogreader.h"
89 : #include "access/xlogutils.h"
90 : #include "catalog/pg_type.h"
91 : #include "catalog/storage.h"
92 : #include "funcapi.h"
93 : #include "miscadmin.h"
94 : #include "pg_trace.h"
95 : #include "pgstat.h"
96 : #include "replication/origin.h"
97 : #include "replication/syncrep.h"
98 : #include "replication/walsender.h"
99 : #include "storage/fd.h"
100 : #include "storage/ipc.h"
101 : #include "storage/md.h"
102 : #include "storage/predicate.h"
103 : #include "storage/proc.h"
104 : #include "storage/procarray.h"
105 : #include "storage/sinvaladt.h"
106 : #include "storage/smgr.h"
107 : #include "utils/builtins.h"
108 : #include "utils/memutils.h"
109 : #include "utils/timestamp.h"
110 :
111 : /*
112 : * Directory where Two-phase commit files reside within PGDATA
113 : */
114 : #define TWOPHASE_DIR "pg_twophase"
115 :
116 : /* GUC variable, can't be changed after startup */
117 : int max_prepared_xacts = 0;
118 :
119 : /*
120 : * This struct describes one global transaction that is in prepared state
121 : * or attempting to become prepared.
122 : *
123 : * The lifecycle of a global transaction is:
124 : *
125 : * 1. After checking that the requested GID is not in use, set up an entry in
126 : * the TwoPhaseState->prepXacts array with the correct GID and valid = false,
127 : * and mark it as locked by my backend.
128 : *
129 : * 2. After successfully completing prepare, set valid = true and enter the
130 : * referenced PGPROC into the global ProcArray.
131 : *
132 : * 3. To begin COMMIT PREPARED or ROLLBACK PREPARED, check that the entry is
133 : * valid and not locked, then mark the entry as locked by storing my current
134 : * backend ID into locking_backend. This prevents concurrent attempts to
135 : * commit or rollback the same prepared xact.
136 : *
137 : * 4. On completion of COMMIT PREPARED or ROLLBACK PREPARED, remove the entry
138 : * from the ProcArray and the TwoPhaseState->prepXacts array and return it to
139 : * the freelist.
140 : *
141 : * Note that if the preparing transaction fails between steps 1 and 2, the
142 : * entry must be removed so that the GID and the GlobalTransaction struct
143 : * can be reused. See AtAbort_Twophase().
144 : *
145 : * typedef struct GlobalTransactionData *GlobalTransaction appears in
146 : * twophase.h
147 : */
148 :
149 : typedef struct GlobalTransactionData
150 : {
151 : GlobalTransaction next; /* list link for free list */
152 : int pgprocno; /* ID of associated dummy PGPROC */
153 : BackendId dummyBackendId; /* similar to backend id for backends */
154 : TimestampTz prepared_at; /* time of preparation */
155 :
156 : /*
157 : * Note that we need to keep track of two LSNs for each GXACT. We keep
158 : * track of the start LSN because this is the address we must use to read
159 : * state data back from WAL when committing a prepared GXACT. We keep
160 : * track of the end LSN because that is the LSN we need to wait for prior
161 : * to commit.
162 : */
163 : XLogRecPtr prepare_start_lsn; /* XLOG offset of prepare record start */
164 : XLogRecPtr prepare_end_lsn; /* XLOG offset of prepare record end */
165 : TransactionId xid; /* The GXACT id */
166 :
167 : Oid owner; /* ID of user that executed the xact */
168 : BackendId locking_backend; /* backend currently working on the xact */
169 : bool valid; /* true if PGPROC entry is in proc array */
170 : bool ondisk; /* true if prepare state file is on disk */
171 : bool inredo; /* true if entry was added via xlog_redo */
172 : char gid[GIDSIZE]; /* The GID assigned to the prepared xact */
173 : } GlobalTransactionData;
174 :
175 : /*
176 : * Two Phase Commit shared state. Access to this struct is protected
177 : * by TwoPhaseStateLock.
178 : */
179 : typedef struct TwoPhaseStateData
180 : {
181 : /* Head of linked list of free GlobalTransactionData structs */
182 : GlobalTransaction freeGXacts;
183 :
184 : /* Number of valid prepXacts entries. */
185 : int numPrepXacts;
186 :
187 : /* There are max_prepared_xacts items in this array */
188 : GlobalTransaction prepXacts[FLEXIBLE_ARRAY_MEMBER];
189 : } TwoPhaseStateData;
190 :
191 : static TwoPhaseStateData *TwoPhaseState;
192 :
193 : /*
194 : * Global transaction entry currently locked by us, if any. Note that any
195 : * access to the entry pointed to by this variable must be protected by
196 : * TwoPhaseStateLock, though obviously the pointer itself doesn't need to be
197 : * (since it's just local memory).
198 : */
199 : static GlobalTransaction MyLockedGxact = NULL;
200 :
201 : static bool twophaseExitRegistered = false;
202 :
203 : static void RecordTransactionCommitPrepared(TransactionId xid,
204 : int nchildren,
205 : TransactionId *children,
206 : int nrels,
207 : RelFileLocator *rels,
208 : int nstats,
209 : xl_xact_stats_item *stats,
210 : int ninvalmsgs,
211 : SharedInvalidationMessage *invalmsgs,
212 : bool initfileinval,
213 : const char *gid);
214 : static void RecordTransactionAbortPrepared(TransactionId xid,
215 : int nchildren,
216 : TransactionId *children,
217 : int nrels,
218 : RelFileLocator *rels,
219 : int nstats,
220 : xl_xact_stats_item *stats,
221 : const char *gid);
222 : static void ProcessRecords(char *bufptr, TransactionId xid,
223 : const TwoPhaseCallback callbacks[]);
224 : static void RemoveGXact(GlobalTransaction gxact);
225 :
226 : static void XlogReadTwoPhaseData(XLogRecPtr lsn, char **buf, int *len);
227 : static char *ProcessTwoPhaseBuffer(TransactionId xid,
228 : XLogRecPtr prepare_start_lsn,
229 : bool fromdisk, bool setParent, bool setNextXid);
230 : static void MarkAsPreparingGuts(GlobalTransaction gxact, TransactionId xid,
231 : const char *gid, TimestampTz prepared_at, Oid owner,
232 : Oid databaseid);
233 : static void RemoveTwoPhaseFile(TransactionId xid, bool giveWarning);
234 : static void RecreateTwoPhaseFile(TransactionId xid, void *content, int len);
235 :
236 : /*
237 : * Initialization of shared memory
238 : */
239 : Size
6505 tgl 240 CBC 4564 : TwoPhaseShmemSize(void)
241 : {
242 : Size size;
243 :
244 : /* Need the fixed struct, the array of pointers, and the GTD structs */
6441 245 4564 : size = offsetof(TwoPhaseStateData, prepXacts);
246 4564 : size = add_size(size, mul_size(max_prepared_xacts,
247 : sizeof(GlobalTransaction)));
248 4564 : size = MAXALIGN(size);
249 4564 : size = add_size(size, mul_size(max_prepared_xacts,
250 : sizeof(GlobalTransactionData)));
251 :
252 4564 : return size;
253 : }
254 :
255 : void
6505 256 1826 : TwoPhaseShmemInit(void)
257 : {
258 : bool found;
259 :
260 1826 : TwoPhaseState = ShmemInitStruct("Prepared Transaction Table",
261 : TwoPhaseShmemSize(),
262 : &found);
263 1826 : if (!IsUnderPostmaster)
264 : {
265 : GlobalTransaction gxacts;
266 : int i;
267 :
268 1826 : Assert(!found);
5271 269 1826 : TwoPhaseState->freeGXacts = NULL;
6505 270 1826 : TwoPhaseState->numPrepXacts = 0;
271 :
272 : /*
273 : * Initialize the linked list of free GlobalTransactionData structs
274 : */
275 1826 : gxacts = (GlobalTransaction)
276 1826 : ((char *) TwoPhaseState +
6385 bruce 277 1826 : MAXALIGN(offsetof(TwoPhaseStateData, prepXacts) +
278 : sizeof(GlobalTransaction) * max_prepared_xacts));
6505 tgl 279 2591 : for (i = 0; i < max_prepared_xacts; i++)
280 : {
281 : /* insert into linked list */
4153 rhaas 282 765 : gxacts[i].next = TwoPhaseState->freeGXacts;
5271 tgl 283 765 : TwoPhaseState->freeGXacts = &gxacts[i];
284 :
285 : /* associate it with a PGPROC assigned by InitProcGlobal */
3896 286 765 : gxacts[i].pgprocno = PreparedXactProcs[i].pgprocno;
287 :
288 : /*
289 : * Assign a unique ID for each dummy proc, so that the range of
290 : * dummy backend IDs immediately follows the range of normal
291 : * backend IDs. We don't dare to assign a real backend ID to dummy
292 : * procs, because prepared transactions don't take part in cache
293 : * invalidation like a real backend ID would imply, but having a
294 : * unique ID for them is nevertheless handy. This arrangement
295 : * allows you to allocate an array of size (MaxBackends +
296 : * max_prepared_xacts + 1), and have a slot for every backend and
297 : * prepared transaction. Currently multixact.c uses that
298 : * technique.
299 : */
362 rhaas 300 765 : gxacts[i].dummyBackendId = MaxBackends + 1 + i;
301 : }
302 : }
303 : else
6505 tgl 304 UBC 0 : Assert(found);
6505 tgl 305 CBC 1826 : }
306 :
307 : /*
308 : * Exit hook to unlock the global transaction entry we're working on.
309 : */
310 : static void
3251 heikki.linnakangas 311 125 : AtProcExit_Twophase(int code, Datum arg)
312 : {
313 : /* same logic as abort */
314 125 : AtAbort_Twophase();
315 125 : }
316 :
317 : /*
318 : * Abort hook to unlock the global transaction entry we're working on.
319 : */
320 : void
321 20223 : AtAbort_Twophase(void)
322 : {
323 20223 : if (MyLockedGxact == NULL)
324 20221 : return;
325 :
326 : /*
327 : * What to do with the locked global transaction entry? If we were in the
328 : * process of preparing the transaction, but haven't written the WAL
329 : * record and state file yet, the transaction must not be considered as
330 : * prepared. Likewise, if we are in the process of finishing an
331 : * already-prepared transaction, and fail after having already written the
332 : * 2nd phase commit or rollback record to the WAL, the transaction should
333 : * not be considered as prepared anymore. In those cases, just remove the
334 : * entry from shared memory.
335 : *
336 : * Otherwise, the entry must be left in place so that the transaction can
337 : * be finished later, so just unlock it.
338 : *
339 : * If we abort during prepare, after having written the WAL record, we
340 : * might not have transferred all locks and other state to the prepared
341 : * transaction yet. Likewise, if we abort during commit or rollback,
342 : * after having written the WAL record, we might not have released all the
343 : * resources held by the transaction yet. In those cases, the in-memory
344 : * state can be wrong, but it's too late to back out.
345 : */
2125 alvherre 346 2 : LWLockAcquire(TwoPhaseStateLock, LW_EXCLUSIVE);
3251 heikki.linnakangas 347 2 : if (!MyLockedGxact->valid)
348 2 : RemoveGXact(MyLockedGxact);
349 : else
3251 heikki.linnakangas 350 UBC 0 : MyLockedGxact->locking_backend = InvalidBackendId;
2125 alvherre 351 CBC 2 : LWLockRelease(TwoPhaseStateLock);
352 :
3251 heikki.linnakangas 353 2 : MyLockedGxact = NULL;
354 : }
355 :
356 : /*
357 : * This is called after we have finished transferring state to the prepared
358 : * PGPROC entry.
359 : */
360 : void
2794 andres 361 384 : PostPrepare_Twophase(void)
362 : {
3251 heikki.linnakangas 363 384 : LWLockAcquire(TwoPhaseStateLock, LW_EXCLUSIVE);
364 384 : MyLockedGxact->locking_backend = InvalidBackendId;
365 384 : LWLockRelease(TwoPhaseStateLock);
366 :
367 384 : MyLockedGxact = NULL;
368 384 : }
369 :
370 :
371 : /*
372 : * MarkAsPreparing
373 : * Reserve the GID for the given transaction.
374 : */
375 : GlobalTransaction
6504 tgl 376 367 : MarkAsPreparing(TransactionId xid, const char *gid,
377 : TimestampTz prepared_at, Oid owner, Oid databaseid)
378 : {
379 : GlobalTransaction gxact;
380 : int i;
381 :
6505 382 367 : if (strlen(gid) >= GIDSIZE)
6505 tgl 383 UBC 0 : ereport(ERROR,
384 : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
385 : errmsg("transaction identifier \"%s\" is too long",
386 : gid)));
387 :
388 : /* fail immediately if feature is disabled */
5099 tgl 389 CBC 367 : if (max_prepared_xacts == 0)
390 9 : ereport(ERROR,
391 : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
392 : errmsg("prepared transactions are disabled"),
393 : errhint("Set max_prepared_transactions to a nonzero value.")));
394 :
395 : /* on first call, register the exit hook */
3251 heikki.linnakangas 396 358 : if (!twophaseExitRegistered)
397 : {
398 65 : before_shmem_exit(AtProcExit_Twophase, 0);
399 65 : twophaseExitRegistered = true;
400 : }
401 :
402 358 : LWLockAcquire(TwoPhaseStateLock, LW_EXCLUSIVE);
403 :
404 : /* Check for conflicting GID */
6505 tgl 405 731 : for (i = 0; i < TwoPhaseState->numPrepXacts; i++)
406 : {
407 375 : gxact = TwoPhaseState->prepXacts[i];
408 375 : if (strcmp(gxact->gid, gid) == 0)
409 : {
410 2 : ereport(ERROR,
411 : (errcode(ERRCODE_DUPLICATE_OBJECT),
412 : errmsg("transaction identifier \"%s\" is already in use",
413 : gid)));
414 : }
415 : }
416 :
417 : /* Get a free gxact from the freelist */
5271 418 356 : if (TwoPhaseState->freeGXacts == NULL)
6505 tgl 419 UBC 0 : ereport(ERROR,
420 : (errcode(ERRCODE_OUT_OF_MEMORY),
421 : errmsg("maximum number of prepared transactions reached"),
422 : errhint("Increase max_prepared_transactions (currently %d).",
423 : max_prepared_xacts)));
5271 tgl 424 CBC 356 : gxact = TwoPhaseState->freeGXacts;
3896 425 356 : TwoPhaseState->freeGXacts = gxact->next;
426 :
2196 simon 427 356 : MarkAsPreparingGuts(gxact, xid, gid, prepared_at, owner, databaseid);
428 :
429 356 : gxact->ondisk = false;
430 :
431 : /* And insert it into the active array */
432 356 : Assert(TwoPhaseState->numPrepXacts < max_prepared_xacts);
433 356 : TwoPhaseState->prepXacts[TwoPhaseState->numPrepXacts++] = gxact;
434 :
435 356 : LWLockRelease(TwoPhaseStateLock);
436 :
437 356 : return gxact;
438 : }
439 :
440 : /*
441 : * MarkAsPreparingGuts
442 : *
443 : * This uses a gxact struct and puts it into the active array.
444 : * NOTE: this is also used when reloading a gxact after a crash; so avoid
445 : * assuming that we can use very much backend context.
446 : *
447 : * Note: This function should be called with appropriate locks held.
448 : */
449 : static void
450 386 : MarkAsPreparingGuts(GlobalTransaction gxact, TransactionId xid, const char *gid,
451 : TimestampTz prepared_at, Oid owner, Oid databaseid)
452 : {
453 : PGPROC *proc;
454 : int i;
455 :
2125 alvherre 456 386 : Assert(LWLockHeldByMeInMode(TwoPhaseStateLock, LW_EXCLUSIVE));
457 :
2196 simon 458 386 : Assert(gxact != NULL);
4153 rhaas 459 386 : proc = &ProcGlobal->allProcs[gxact->pgprocno];
460 :
461 : /* Initialize the PGPROC entry */
462 42846 : MemSet(proc, 0, sizeof(PGPROC));
463 386 : proc->pgprocno = gxact->pgprocno;
81 andres 464 GNC 386 : dlist_node_init(&proc->links);
1026 peter 465 CBC 386 : proc->waitStatus = PROC_WAIT_STATUS_OK;
533 noah 466 386 : if (LocalTransactionIdIsValid(MyProc->lxid))
467 : {
468 : /* clone VXID, for TwoPhaseGetXidByVirtualXID() to find */
469 356 : proc->lxid = MyProc->lxid;
470 356 : proc->backendId = MyBackendId;
471 : }
472 : else
473 : {
474 30 : Assert(AmStartupProcess() || !IsPostmasterEnvironment);
475 : /* GetLockConflicts() uses this to specify a wait on the XID */
476 30 : proc->lxid = xid;
477 30 : proc->backendId = InvalidBackendId;
478 : }
968 andres 479 386 : proc->xid = xid;
969 480 386 : Assert(proc->xmin == InvalidTransactionId);
366 rhaas 481 386 : proc->delayChkptFlags = 0;
874 alvherre 482 386 : proc->statusFlags = 0;
4153 rhaas 483 386 : proc->pid = 0;
484 386 : proc->databaseId = databaseid;
485 386 : proc->roleId = owner;
1700 michael 486 386 : proc->tempNamespaceId = InvalidOid;
2258 andrew 487 386 : proc->isBackgroundWorker = false;
140 andres 488 GNC 386 : proc->lwWaiting = LW_WS_NOT_WAITING;
4087 heikki.linnakangas 489 CBC 386 : proc->lwWaitMode = 0;
4153 rhaas 490 386 : proc->waitLock = NULL;
491 386 : proc->waitProcLock = NULL;
776 fujii 492 386 : pg_atomic_init_u64(&proc->waitStart, 0);
6328 tgl 493 6562 : for (i = 0; i < NUM_LOCK_PARTITIONS; i++)
81 andres 494 GNC 6176 : dlist_init(&proc->myProcLocks[i]);
495 : /* subxid data must be filled later by GXactLoadSubxactData */
968 andres 496 CBC 386 : proc->subxidStatus.overflowed = false;
497 386 : proc->subxidStatus.count = 0;
498 :
6504 tgl 499 386 : gxact->prepared_at = prepared_at;
2196 simon 500 386 : gxact->xid = xid;
6505 tgl 501 386 : gxact->owner = owner;
3251 heikki.linnakangas 502 386 : gxact->locking_backend = MyBackendId;
6505 tgl 503 386 : gxact->valid = false;
2196 simon 504 386 : gxact->inredo = false;
6505 tgl 505 386 : strcpy(gxact->gid, gid);
506 :
507 : /*
508 : * Remember that we have this GlobalTransaction entry locked for us. If we
509 : * abort after this, we must release it.
510 : */
3251 heikki.linnakangas 511 386 : MyLockedGxact = gxact;
6505 tgl 512 386 : }
513 :
514 : /*
515 : * GXactLoadSubxactData
516 : *
517 : * If the transaction being persisted had any subtransactions, this must
518 : * be called before MarkAsPrepared() to load information into the dummy
519 : * PGPROC.
520 : */
521 : static void
522 173 : GXactLoadSubxactData(GlobalTransaction gxact, int nsubxacts,
523 : TransactionId *children)
524 : {
3955 bruce 525 173 : PGPROC *proc = &ProcGlobal->allProcs[gxact->pgprocno];
526 :
527 : /* We need no extra lock since the GXACT isn't valid yet */
6505 tgl 528 173 : if (nsubxacts > PGPROC_MAX_CACHED_SUBXIDS)
529 : {
968 andres 530 4 : proc->subxidStatus.overflowed = true;
6505 tgl 531 4 : nsubxacts = PGPROC_MAX_CACHED_SUBXIDS;
532 : }
533 173 : if (nsubxacts > 0)
534 : {
4153 rhaas 535 158 : memcpy(proc->subxids.xids, children,
536 : nsubxacts * sizeof(TransactionId));
968 andres 537 158 : proc->subxidStatus.count = nsubxacts;
538 : }
6505 tgl 539 173 : }
540 :
541 : /*
542 : * MarkAsPrepared
543 : * Mark the GXACT as fully valid, and enter it into the global ProcArray.
544 : *
545 : * lock_held indicates whether caller already holds TwoPhaseStateLock.
546 : */
547 : static void
2125 alvherre 548 384 : MarkAsPrepared(GlobalTransaction gxact, bool lock_held)
549 : {
550 : /* Lock here may be overkill, but I'm not convinced of that ... */
551 384 : if (!lock_held)
552 354 : LWLockAcquire(TwoPhaseStateLock, LW_EXCLUSIVE);
6505 tgl 553 384 : Assert(!gxact->valid);
554 384 : gxact->valid = true;
2125 alvherre 555 384 : if (!lock_held)
556 354 : LWLockRelease(TwoPhaseStateLock);
557 :
558 : /*
559 : * Put it into the global ProcArray so TransactionIdIsInProgress considers
560 : * the XID as still running.
561 : */
4153 rhaas 562 384 : ProcArrayAdd(&ProcGlobal->allProcs[gxact->pgprocno]);
6505 tgl 563 384 : }
564 :
565 : /*
566 : * LockGXact
567 : * Locate the prepared transaction and mark it busy for COMMIT or PREPARE.
568 : */
569 : static GlobalTransaction
6494 570 369 : LockGXact(const char *gid, Oid user)
571 : {
572 : int i;
573 :
574 : /* on first call, register the exit hook */
3251 heikki.linnakangas 575 369 : if (!twophaseExitRegistered)
576 : {
577 60 : before_shmem_exit(AtProcExit_Twophase, 0);
578 60 : twophaseExitRegistered = true;
579 : }
580 :
6505 tgl 581 369 : LWLockAcquire(TwoPhaseStateLock, LW_EXCLUSIVE);
582 :
583 658 : for (i = 0; i < TwoPhaseState->numPrepXacts; i++)
584 : {
6385 bruce 585 652 : GlobalTransaction gxact = TwoPhaseState->prepXacts[i];
3955 586 652 : PGPROC *proc = &ProcGlobal->allProcs[gxact->pgprocno];
587 :
588 : /* Ignore not-yet-valid GIDs */
6505 tgl 589 652 : if (!gxact->valid)
6505 tgl 590 UBC 0 : continue;
6505 tgl 591 CBC 652 : if (strcmp(gxact->gid, gid) != 0)
592 289 : continue;
593 :
594 : /* Found it, but has someone else got it locked? */
3251 heikki.linnakangas 595 363 : if (gxact->locking_backend != InvalidBackendId)
3251 heikki.linnakangas 596 UBC 0 : ereport(ERROR,
597 : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
598 : errmsg("prepared transaction with identifier \"%s\" is busy",
599 : gid)));
600 :
6505 tgl 601 CBC 363 : if (user != gxact->owner && !superuser_arg(user))
6505 tgl 602 UBC 0 : ereport(ERROR,
603 : (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
604 : errmsg("permission denied to finish prepared transaction"),
605 : errhint("Must be superuser or the user that prepared the transaction.")));
606 :
607 : /*
608 : * Note: it probably would be possible to allow committing from
609 : * another database; but at the moment NOTIFY is known not to work and
610 : * there may be some other issues as well. Hence disallow until
611 : * someone gets motivated to make it work.
612 : */
4153 rhaas 613 CBC 363 : if (MyDatabaseId != proc->databaseId)
5899 tgl 614 UBC 0 : ereport(ERROR,
615 : (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
616 : errmsg("prepared transaction belongs to another database"),
617 : errhint("Connect to the database where the transaction was prepared to finish it.")));
618 :
619 : /* OK for me to lock it */
3251 heikki.linnakangas 620 CBC 363 : gxact->locking_backend = MyBackendId;
621 363 : MyLockedGxact = gxact;
622 :
6505 tgl 623 363 : LWLockRelease(TwoPhaseStateLock);
624 :
625 363 : return gxact;
626 : }
627 :
628 6 : LWLockRelease(TwoPhaseStateLock);
629 :
630 6 : ereport(ERROR,
631 : (errcode(ERRCODE_UNDEFINED_OBJECT),
632 : errmsg("prepared transaction with identifier \"%s\" does not exist",
633 : gid)));
634 :
635 : /* NOTREACHED */
636 : return NULL;
637 : }
638 :
639 : /*
640 : * RemoveGXact
641 : * Remove the prepared transaction from the shared memory array.
642 : *
643 : * NB: caller should have already removed it from ProcArray
644 : */
645 : static void
646 414 : RemoveGXact(GlobalTransaction gxact)
647 : {
648 : int i;
649 :
2125 alvherre 650 414 : Assert(LWLockHeldByMeInMode(TwoPhaseStateLock, LW_EXCLUSIVE));
651 :
6505 tgl 652 701 : for (i = 0; i < TwoPhaseState->numPrepXacts; i++)
653 : {
654 701 : if (gxact == TwoPhaseState->prepXacts[i])
655 : {
656 : /* remove from the active array */
657 414 : TwoPhaseState->numPrepXacts--;
658 414 : TwoPhaseState->prepXacts[i] = TwoPhaseState->prepXacts[TwoPhaseState->numPrepXacts];
659 :
660 : /* and put it back in the freelist */
4153 rhaas 661 414 : gxact->next = TwoPhaseState->freeGXacts;
5271 tgl 662 414 : TwoPhaseState->freeGXacts = gxact;
663 :
6505 664 414 : return;
665 : }
666 : }
667 :
6505 tgl 668 UBC 0 : elog(ERROR, "failed to find %p in GlobalTransaction array", gxact);
669 : }
670 :
671 : /*
672 : * Returns an array of all prepared transactions for the user-level
673 : * function pg_prepared_xact.
674 : *
675 : * The returned array and all its elements are copies of internal data
676 : * structures, to minimize the time we need to hold the TwoPhaseStateLock.
677 : *
678 : * WARNING -- we return even those transactions that are not fully prepared
679 : * yet. The caller should filter them out if he doesn't want them.
680 : *
681 : * The returned array is palloc'd.
682 : */
683 : static int
6505 tgl 684 CBC 89 : GetPreparedTransactionList(GlobalTransaction *gxacts)
685 : {
686 : GlobalTransaction array;
687 : int num;
688 : int i;
689 :
690 89 : LWLockAcquire(TwoPhaseStateLock, LW_SHARED);
691 :
692 89 : if (TwoPhaseState->numPrepXacts == 0)
693 : {
694 48 : LWLockRelease(TwoPhaseStateLock);
695 :
696 48 : *gxacts = NULL;
697 48 : return 0;
698 : }
699 :
700 41 : num = TwoPhaseState->numPrepXacts;
701 41 : array = (GlobalTransaction) palloc(sizeof(GlobalTransactionData) * num);
702 41 : *gxacts = array;
703 87 : for (i = 0; i < num; i++)
704 46 : memcpy(array + i, TwoPhaseState->prepXacts[i],
705 : sizeof(GlobalTransactionData));
706 :
707 41 : LWLockRelease(TwoPhaseStateLock);
708 :
709 41 : return num;
710 : }
711 :
712 :
713 : /* Working status for pg_prepared_xact */
714 : typedef struct
715 : {
716 : GlobalTransaction array;
717 : int ngxacts;
718 : int currIdx;
719 : } Working_State;
720 :
721 : /*
722 : * pg_prepared_xact
723 : * Produce a view with one row per prepared transaction.
724 : *
725 : * This function is here so we don't have to export the
726 : * GlobalTransactionData struct definition.
727 : */
728 : Datum
729 135 : pg_prepared_xact(PG_FUNCTION_ARGS)
730 : {
731 : FuncCallContext *funcctx;
732 : Working_State *status;
733 :
734 135 : if (SRF_IS_FIRSTCALL())
735 : {
736 : TupleDesc tupdesc;
737 : MemoryContext oldcontext;
738 :
739 : /* create a function context for cross-call persistence */
740 89 : funcctx = SRF_FIRSTCALL_INIT();
741 :
742 : /*
743 : * Switch to memory context appropriate for multiple function calls
744 : */
745 89 : oldcontext = MemoryContextSwitchTo(funcctx->multi_call_memory_ctx);
746 :
747 : /* build tupdesc for result tuples */
748 : /* this had better match pg_prepared_xacts view in system_views.sql */
1601 andres 749 89 : tupdesc = CreateTemplateTupleDesc(5);
6505 tgl 750 89 : TupleDescInitEntry(tupdesc, (AttrNumber) 1, "transaction",
751 : XIDOID, -1, 0);
752 89 : TupleDescInitEntry(tupdesc, (AttrNumber) 2, "gid",
753 : TEXTOID, -1, 0);
6504 754 89 : TupleDescInitEntry(tupdesc, (AttrNumber) 3, "prepared",
755 : TIMESTAMPTZOID, -1, 0);
756 89 : TupleDescInitEntry(tupdesc, (AttrNumber) 4, "ownerid",
757 : OIDOID, -1, 0);
758 89 : TupleDescInitEntry(tupdesc, (AttrNumber) 5, "dbid",
759 : OIDOID, -1, 0);
760 :
6505 761 89 : funcctx->tuple_desc = BlessTupleDesc(tupdesc);
762 :
763 : /*
764 : * Collect all the 2PC status information that we will format and send
765 : * out as a result set.
766 : */
767 89 : status = (Working_State *) palloc(sizeof(Working_State));
768 89 : funcctx->user_fctx = (void *) status;
769 :
770 89 : status->ngxacts = GetPreparedTransactionList(&status->array);
771 89 : status->currIdx = 0;
772 :
773 89 : MemoryContextSwitchTo(oldcontext);
774 : }
775 :
776 135 : funcctx = SRF_PERCALL_SETUP();
777 135 : status = (Working_State *) funcctx->user_fctx;
778 :
779 135 : while (status->array != NULL && status->currIdx < status->ngxacts)
780 : {
781 46 : GlobalTransaction gxact = &status->array[status->currIdx++];
3955 bruce 782 46 : PGPROC *proc = &ProcGlobal->allProcs[gxact->pgprocno];
267 peter 783 GNC 46 : Datum values[5] = {0};
784 46 : bool nulls[5] = {0};
785 : HeapTuple tuple;
786 : Datum result;
787 :
6505 tgl 788 CBC 46 : if (!gxact->valid)
6505 tgl 789 UBC 0 : continue;
790 :
791 : /*
792 : * Form tuple with appropriate data.
793 : */
6505 tgl 794 ECB :
968 andres 795 CBC 46 : values[0] = TransactionIdGetDatum(proc->xid);
5493 tgl 796 46 : values[1] = CStringGetTextDatum(gxact->gid);
6504 797 46 : values[2] = TimestampTzGetDatum(gxact->prepared_at);
6494 tgl 798 GIC 46 : values[3] = ObjectIdGetDatum(gxact->owner);
4153 rhaas 799 CBC 46 : values[4] = ObjectIdGetDatum(proc->databaseId);
6505 tgl 800 ECB :
6505 tgl 801 CBC 46 : tuple = heap_form_tuple(funcctx->tuple_desc, values, nulls);
6505 tgl 802 GIC 46 : result = HeapTupleGetDatum(tuple);
803 46 : SRF_RETURN_NEXT(funcctx, result);
6505 tgl 804 ECB : }
805 :
6505 tgl 806 GIC 89 : SRF_RETURN_DONE(funcctx);
807 : }
808 :
809 : /*
810 : * TwoPhaseGetGXact
811 : * Get the GlobalTransaction struct for a prepared transaction
812 : * specified by XID
813 : *
814 : * If lock_held is set to true, TwoPhaseStateLock will not be taken, so the
815 : * caller had better hold it.
4885 heikki.linnakangas 816 ECB : */
817 : static GlobalTransaction
1504 michael 818 CBC 1386 : TwoPhaseGetGXact(TransactionId xid, bool lock_held)
819 : {
3896 tgl 820 GIC 1386 : GlobalTransaction result = NULL;
821 : int i;
822 :
823 : static TransactionId cached_xid = InvalidTransactionId;
3896 tgl 824 ECB : static GlobalTransaction cached_gxact = NULL;
825 :
1504 michael 826 GIC 1386 : Assert(!lock_held || LWLockHeldByMe(TwoPhaseStateLock));
827 :
828 : /*
829 : * During a recovery, COMMIT PREPARED, or ABORT PREPARED, we'll be called
6505 tgl 830 ECB : * repeatedly for the same XID. We can save work with a simple cache.
831 : */
6505 tgl 832 GIC 1386 : if (xid == cached_xid)
3896 tgl 833 CBC 933 : return cached_gxact;
6505 tgl 834 ECB :
1504 michael 835 GIC 453 : if (!lock_held)
1504 michael 836 CBC 384 : LWLockAcquire(TwoPhaseStateLock, LW_SHARED);
837 :
6505 tgl 838 830 : for (i = 0; i < TwoPhaseState->numPrepXacts; i++)
839 : {
6385 bruce 840 830 : GlobalTransaction gxact = TwoPhaseState->prepXacts[i];
841 :
968 andres 842 830 : if (gxact->xid == xid)
6505 tgl 843 ECB : {
3896 tgl 844 GIC 453 : result = gxact;
6505 845 453 : break;
846 : }
6505 tgl 847 ECB : }
848 :
1504 michael 849 GIC 453 : if (!lock_held)
1504 michael 850 CBC 384 : LWLockRelease(TwoPhaseStateLock);
6505 tgl 851 EUB :
6505 tgl 852 GIC 453 : if (result == NULL) /* should not happen */
3896 tgl 853 LBC 0 : elog(ERROR, "failed to find GlobalTransaction for xid %u", xid);
6505 tgl 854 ECB :
6505 tgl 855 GIC 453 : cached_xid = xid;
3896 tgl 856 CBC 453 : cached_gxact = result;
857 :
6505 tgl 858 GIC 453 : return result;
859 : }
860 :
861 : /*
862 : * TwoPhaseGetXidByVirtualXID
863 : * Lookup VXID among xacts prepared since last startup.
864 : *
865 : * (This won't find recovered xacts.) If more than one matches, return any
866 : * and set "have_more" to true. To witness multiple matches, a single
867 : * BackendId must consume 2^32 LXIDs, with no intervening database restart.
533 noah 868 ECB : */
869 : TransactionId
533 noah 870 GIC 98 : TwoPhaseGetXidByVirtualXID(VirtualTransactionId vxid,
871 : bool *have_more)
533 noah 872 ECB : {
873 : int i;
533 noah 874 CBC 98 : TransactionId result = InvalidTransactionId;
533 noah 875 ECB :
533 noah 876 GIC 98 : Assert(VirtualTransactionIdIsValid(vxid));
533 noah 877 CBC 98 : LWLockAcquire(TwoPhaseStateLock, LW_SHARED);
878 :
879 150 : for (i = 0; i < TwoPhaseState->numPrepXacts; i++)
880 : {
533 noah 881 GIC 52 : GlobalTransaction gxact = TwoPhaseState->prepXacts[i];
882 : PGPROC *proc;
533 noah 883 ECB : VirtualTransactionId proc_vxid;
884 :
533 noah 885 CBC 52 : if (!gxact->valid)
886 2 : continue;
887 50 : proc = &ProcGlobal->allProcs[gxact->pgprocno];
533 noah 888 GIC 50 : GET_VXID_FROM_PGPROC(proc_vxid, *proc);
889 50 : if (VirtualTransactionIdEquals(vxid, proc_vxid))
533 noah 890 ECB : {
891 : /* Startup process sets proc->backendId to InvalidBackendId. */
533 noah 892 CBC 9 : Assert(!gxact->inredo);
893 :
533 noah 894 GBC 9 : if (result != InvalidTransactionId)
533 noah 895 EUB : {
533 noah 896 UIC 0 : *have_more = true;
533 noah 897 LBC 0 : break;
898 : }
533 noah 899 GIC 9 : result = gxact->xid;
900 : }
533 noah 901 ECB : }
902 :
533 noah 903 CBC 98 : LWLockRelease(TwoPhaseStateLock);
904 :
533 noah 905 GIC 98 : return result;
906 : }
907 :
908 : /*
909 : * TwoPhaseGetDummyBackendId
910 : * Get the dummy backend ID for prepared transaction specified by XID
911 : *
912 : * Dummy backend IDs are similar to real backend IDs of real backends.
913 : * They start at MaxBackends + 1, and are unique across all currently active
914 : * real backends and prepared transactions. If lock_held is set to true,
915 : * TwoPhaseStateLock will not be taken, so the caller had better hold it.
3896 tgl 916 ECB : */
917 : BackendId
1504 michael 918 CBC 107 : TwoPhaseGetDummyBackendId(TransactionId xid, bool lock_held)
919 : {
920 107 : GlobalTransaction gxact = TwoPhaseGetGXact(xid, lock_held);
921 :
3896 tgl 922 GIC 107 : return gxact->dummyBackendId;
923 : }
924 :
925 : /*
926 : * TwoPhaseGetDummyProc
927 : * Get the PGPROC that represents a prepared transaction specified by XID
928 : *
929 : * If lock_held is set to true, TwoPhaseStateLock will not be taken, so the
930 : * caller had better hold it.
3896 tgl 931 ECB : */
932 : PGPROC *
1504 michael 933 CBC 1279 : TwoPhaseGetDummyProc(TransactionId xid, bool lock_held)
934 : {
935 1279 : GlobalTransaction gxact = TwoPhaseGetGXact(xid, lock_held);
936 :
776 fujii 937 GIC 1279 : return &ProcGlobal->allProcs[gxact->pgprocno];
938 : }
939 :
940 : /************************************************************************/
941 : /* State file support */
942 : /************************************************************************/
943 :
944 : #define TwoPhaseFilePath(path, xid) \
945 : snprintf(path, MAXPGPATH, TWOPHASE_DIR "/%08X", xid)
946 :
947 : /*
948 : * 2PC state file format:
949 : *
950 : * 1. TwoPhaseFileHeader
951 : * 2. TransactionId[] (subtransactions)
952 : * 3. RelFileLocator[] (files to be deleted at commit)
953 : * 4. RelFileLocator[] (files to be deleted at abort)
954 : * 5. SharedInvalidationMessage[] (inval messages to be sent at commit)
955 : * 6. TwoPhaseRecordOnDisk
956 : * 7. ...
957 : * 8. TwoPhaseRecordOnDisk (end sentinel, rmid == TWOPHASE_RM_END_ID)
958 : * 9. checksum (CRC-32C)
959 : *
960 : * Each segment except the final checksum is MAXALIGN'd.
961 : */
962 :
963 : /*
964 : * Header for a 2PC state file
965 : */
966 : #define TWOPHASE_MAGIC 0x57F94534 /* format identifier */
967 :
968 : typedef xl_xact_prepare TwoPhaseFileHeader;
969 :
970 : /*
971 : * Header for each record in a state file
972 : *
973 : * NOTE: len counts only the rmgr data, not the TwoPhaseRecordOnDisk header.
974 : * The rmgr data will be stored starting on a MAXALIGN boundary.
975 : */
976 : typedef struct TwoPhaseRecordOnDisk
977 : {
978 : uint32 len; /* length of rmgr data */
979 : TwoPhaseRmgrId rmid; /* resource manager for this record */
980 : uint16 info; /* flag bits for use by rmgr */
981 : } TwoPhaseRecordOnDisk;
982 :
983 : /*
984 : * During prepare, the state file is assembled in memory before writing it
985 : * to WAL and the actual state file. We use a chain of StateFileChunk blocks
986 : * for that.
987 : */
988 : typedef struct StateFileChunk
989 : {
990 : char *data;
991 : uint32 len;
992 : struct StateFileChunk *next;
993 : } StateFileChunk;
994 :
995 : static struct xllist
996 : {
997 : StateFileChunk *head; /* first data block in the chain */
998 : StateFileChunk *tail; /* last block in chain */
999 : uint32 num_chunks;
1000 : uint32 bytes_free; /* free bytes left in tail block */
1001 : uint32 total_len; /* total data bytes in chain */
1002 : } records;
1003 :
1004 :
1005 : /*
1006 : * Append a block of data to records data structure.
1007 : *
1008 : * NB: each block is padded to a MAXALIGN multiple. This must be
1009 : * accounted for when the file is later read!
1010 : *
1011 : * The data is copied, so the caller is free to modify it afterwards.
6505 tgl 1012 ECB : */
1013 : static void
6505 tgl 1014 CBC 3889 : save_state_data(const void *data, uint32 len)
1015 : {
6385 bruce 1016 3889 : uint32 padlen = MAXALIGN(len);
1017 :
6505 tgl 1018 3889 : if (padlen > records.bytes_free)
6505 tgl 1019 ECB : {
3062 heikki.linnakangas 1020 CBC 40 : records.tail->next = palloc0(sizeof(StateFileChunk));
6505 tgl 1021 40 : records.tail = records.tail->next;
1022 40 : records.tail->len = 0;
6505 tgl 1023 GIC 40 : records.tail->next = NULL;
3062 heikki.linnakangas 1024 CBC 40 : records.num_chunks++;
6505 tgl 1025 ECB :
6505 tgl 1026 GIC 40 : records.bytes_free = Max(padlen, 512);
1027 40 : records.tail->data = palloc(records.bytes_free);
6505 tgl 1028 ECB : }
1029 :
6505 tgl 1030 CBC 3889 : memcpy(((char *) records.tail->data) + records.tail->len, data, len);
1031 3889 : records.tail->len += padlen;
1032 3889 : records.bytes_free -= padlen;
6505 tgl 1033 GIC 3889 : records.total_len += padlen;
1034 3889 : }
1035 :
1036 : /*
1037 : * Start preparing a state file.
1038 : *
1039 : * Initializes data structure and inserts the 2PC file header record.
6505 tgl 1040 ECB : */
1041 : void
6505 tgl 1042 CBC 356 : StartPrepare(GlobalTransaction gxact)
6505 tgl 1043 ECB : {
3955 bruce 1044 GIC 356 : PGPROC *proc = &ProcGlobal->allProcs[gxact->pgprocno];
968 andres 1045 356 : TransactionId xid = gxact->xid;
1046 : TwoPhaseFileHeader hdr;
1047 : TransactionId *children;
1048 : RelFileLocator *commitrels;
1049 : RelFileLocator *abortrels;
368 1050 356 : xl_xact_stats_item *abortstats = NULL;
1051 356 : xl_xact_stats_item *commitstats = NULL;
1052 : SharedInvalidationMessage *invalmsgs;
6505 tgl 1053 ECB :
1054 : /* Initialize linked list */
3062 heikki.linnakangas 1055 CBC 356 : records.head = palloc0(sizeof(StateFileChunk));
6505 tgl 1056 GIC 356 : records.head->len = 0;
6505 tgl 1057 CBC 356 : records.head->next = NULL;
6505 tgl 1058 ECB :
6505 tgl 1059 GIC 356 : records.bytes_free = Max(sizeof(TwoPhaseFileHeader), 512);
6505 tgl 1060 CBC 356 : records.head->data = palloc(records.bytes_free);
6505 tgl 1061 ECB :
6505 tgl 1062 GIC 356 : records.tail = records.head;
3062 heikki.linnakangas 1063 CBC 356 : records.num_chunks = 1;
1064 :
6505 tgl 1065 GIC 356 : records.total_len = 0;
6505 tgl 1066 ECB :
1067 : /* Create header */
6505 tgl 1068 CBC 356 : hdr.magic = TWOPHASE_MAGIC;
1069 356 : hdr.total_len = 0; /* EndPrepare will fill this in */
1070 356 : hdr.xid = xid;
4153 rhaas 1071 356 : hdr.database = proc->databaseId;
6504 tgl 1072 356 : hdr.prepared_at = gxact->prepared_at;
1073 356 : hdr.owner = gxact->owner;
6505 1074 356 : hdr.nsubxacts = xactGetCommittedChildren(&children);
4622 rhaas 1075 356 : hdr.ncommitrels = smgrGetPendingDeletes(true, &commitrels);
1076 356 : hdr.nabortrels = smgrGetPendingDeletes(false, &abortrels);
368 andres 1077 356 : hdr.ncommitstats =
1078 356 : pgstat_get_transactional_drops(true, &commitstats);
1079 356 : hdr.nabortstats =
368 andres 1080 GIC 356 : pgstat_get_transactional_drops(false, &abortstats);
4859 simon 1081 CBC 356 : hdr.ninvalmsgs = xactGetCommittedInvalidationMessages(&invalmsgs,
1082 : &hdr.initfileinval);
2118 tgl 1083 356 : hdr.gidlen = strlen(gxact->gid) + 1; /* Include '\0' */
419 michael 1084 ECB : /* EndPrepare will fill the origin data, if necessary */
419 michael 1085 GIC 356 : hdr.origin_lsn = InvalidXLogRecPtr;
419 michael 1086 CBC 356 : hdr.origin_timestamp = 0;
6505 tgl 1087 ECB :
6505 tgl 1088 GIC 356 : save_state_data(&hdr, sizeof(TwoPhaseFileHeader));
2586 simon 1089 356 : save_state_data(gxact->gid, hdr.gidlen);
1090 :
1091 : /*
1092 : * Add the additional info about subxacts, deletable files and cache
4790 bruce 1093 ECB : * invalidation messages.
1094 : */
6505 tgl 1095 CBC 356 : if (hdr.nsubxacts > 0)
1096 : {
1097 143 : save_state_data(children, hdr.nsubxacts * sizeof(TransactionId));
1098 : /* While we have the child-xact data, stuff it in the gxact too */
1099 143 : GXactLoadSubxactData(gxact, hdr.nsubxacts, children);
1100 : }
1101 356 : if (hdr.ncommitrels > 0)
6505 tgl 1102 ECB : {
277 rhaas 1103 GNC 9 : save_state_data(commitrels, hdr.ncommitrels * sizeof(RelFileLocator));
6505 tgl 1104 CBC 9 : pfree(commitrels);
1105 : }
1106 356 : if (hdr.nabortrels > 0)
6505 tgl 1107 ECB : {
277 rhaas 1108 GNC 16 : save_state_data(abortrels, hdr.nabortrels * sizeof(RelFileLocator));
6505 tgl 1109 CBC 16 : pfree(abortrels);
1110 : }
368 andres 1111 356 : if (hdr.ncommitstats > 0)
368 andres 1112 ECB : {
368 andres 1113 CBC 9 : save_state_data(commitstats,
368 andres 1114 GIC 9 : hdr.ncommitstats * sizeof(xl_xact_stats_item));
368 andres 1115 CBC 9 : pfree(commitstats);
1116 : }
1117 356 : if (hdr.nabortstats > 0)
368 andres 1118 ECB : {
368 andres 1119 CBC 12 : save_state_data(abortstats,
332 tgl 1120 GIC 12 : hdr.nabortstats * sizeof(xl_xact_stats_item));
368 andres 1121 CBC 12 : pfree(abortstats);
1122 : }
4859 simon 1123 356 : if (hdr.ninvalmsgs > 0)
4859 simon 1124 ECB : {
4859 simon 1125 CBC 22 : save_state_data(invalmsgs,
4859 simon 1126 GIC 22 : hdr.ninvalmsgs * sizeof(SharedInvalidationMessage));
4859 simon 1127 CBC 22 : pfree(invalmsgs);
1128 : }
6505 tgl 1129 GIC 356 : }
1130 :
1131 : /*
1132 : * Finish preparing state data and writing it to WAL.
6505 tgl 1133 ECB : */
1134 : void
6505 tgl 1135 GIC 354 : EndPrepare(GlobalTransaction gxact)
1136 : {
1137 : TwoPhaseFileHeader *hdr;
1138 : StateFileChunk *record;
1139 : bool replorigin;
6505 tgl 1140 ECB :
1141 : /* Add the end sentinel to the list of 2PC records */
6505 tgl 1142 GIC 354 : RegisterTwoPhaseRecord(TWOPHASE_RM_END_ID, 0,
1143 : NULL, 0);
6505 tgl 1144 ECB :
1145 : /* Go back and fill in total_len in the file header record */
6505 tgl 1146 CBC 354 : hdr = (TwoPhaseFileHeader *) records.head->data;
6505 tgl 1147 GIC 354 : Assert(hdr->magic == TWOPHASE_MAGIC);
2917 heikki.linnakangas 1148 CBC 354 : hdr->total_len = records.total_len + sizeof(pg_crc32c);
6505 tgl 1149 ECB :
1838 simon 1150 GIC 377 : replorigin = (replorigin_session_origin != InvalidRepOriginId &&
1838 simon 1151 CBC 23 : replorigin_session_origin != DoNotReplicateId);
1152 :
1153 354 : if (replorigin)
1838 simon 1154 ECB : {
1838 simon 1155 GIC 23 : hdr->origin_lsn = replorigin_session_origin_lsn;
1156 23 : hdr->origin_timestamp = replorigin_session_origin_timestamp;
1157 : }
1158 :
1159 : /*
1160 : * If the data size exceeds MaxAllocSize, we won't be able to read it in
1161 : * ReadTwoPhaseFile. Check for that now, rather than fail in the case
2636 simon 1162 ECB : * where we write data to file and then re-read at commit time.
5438 heikki.linnakangas 1163 EUB : */
5438 heikki.linnakangas 1164 GIC 354 : if (hdr->total_len > MaxAllocSize)
5438 heikki.linnakangas 1165 UIC 0 : ereport(ERROR,
1166 : (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
1167 : errmsg("two-phase state file maximum length exceeded")));
1168 :
1169 : /*
1170 : * Now writing 2PC state data to WAL. We let the WAL's CRC protection
1171 : * cover us, so no need to calculate a separate CRC.
1172 : *
1173 : * We have to set DELAY_CHKPT_START here, too; otherwise a checkpoint
1174 : * starting immediately after the WAL record is inserted could complete
1175 : * without fsync'ing our state file. (This is essentially the same kind
1176 : * of race condition as the COMMIT-to-clog-write case that
1177 : * RecordTransactionCommit uses DELAY_CHKPT_START for; see notes there.)
1178 : *
1179 : * We save the PREPARE record's location in the gxact for later use by
6503 tgl 1180 ECB : * CheckPointTwoPhase.
1181 : */
3062 heikki.linnakangas 1182 CBC 354 : XLogEnsureRecordSpace(0, records.num_chunks);
1183 :
6505 tgl 1184 354 : START_CRIT_SECTION();
6505 tgl 1185 ECB :
366 rhaas 1186 GIC 354 : Assert((MyProc->delayChkptFlags & DELAY_CHKPT_START) == 0);
366 rhaas 1187 CBC 354 : MyProc->delayChkptFlags |= DELAY_CHKPT_START;
6505 tgl 1188 ECB :
3062 heikki.linnakangas 1189 CBC 354 : XLogBeginInsert();
3062 heikki.linnakangas 1190 GIC 748 : for (record = records.head; record != NULL; record = record->next)
3062 heikki.linnakangas 1191 CBC 394 : XLogRegisterData(record->data, record->len);
1192 :
1838 simon 1193 354 : XLogSetRecordFlags(XLOG_INCLUDE_ORIGIN);
1194 :
2636 1195 354 : gxact->prepare_end_lsn = XLogInsert(RM_XACT_ID, XLOG_XACT_PREPARE);
1196 :
1838 simon 1197 GIC 354 : if (replorigin)
1818 heikki.linnakangas 1198 ECB : {
1199 : /* Move LSNs forward for this replication origin */
1838 simon 1200 GIC 23 : replorigin_session_advance(replorigin_session_origin_lsn,
1201 : gxact->prepare_end_lsn);
1818 heikki.linnakangas 1202 ECB : }
1203 :
2636 simon 1204 GIC 354 : XLogFlush(gxact->prepare_end_lsn);
1205 :
1206 : /* If we crash now, we have prepared: WAL replay will fix things */
6505 tgl 1207 ECB :
1208 : /* Store record's start location to read that later on Commit */
2636 simon 1209 GIC 354 : gxact->prepare_start_lsn = ProcLastRecPtr;
1210 :
1211 : /*
1212 : * Mark the prepared transaction as valid. As soon as xact.c marks MyProc
1213 : * as not running our XID (which it will do immediately after this
1214 : * function returns), others can commit/rollback the xact.
1215 : *
1216 : * NB: a side effect of this is to make a dummy ProcArray entry for the
1217 : * prepared XID. This must happen before we clear the XID from MyProc /
1218 : * ProcGlobal->xids[], else there is a window where the XID is not running
1219 : * according to TransactionIdIsInProgress, and onlookers would be entitled
1220 : * to assume the xact crashed. Instead we have a window where the same
968 andres 1221 ECB : * XID appears twice in ProcArray, which is OK.
1222 : */
2125 alvherre 1223 GIC 354 : MarkAsPrepared(gxact, false);
1224 :
1225 : /*
1226 : * Now we can mark ourselves as out of the commit critical section: a
1227 : * checkpoint starting after this will certainly see the gxact as a
5850 tgl 1228 ECB : * candidate for fsyncing.
1229 : */
366 rhaas 1230 GIC 354 : MyProc->delayChkptFlags &= ~DELAY_CHKPT_START;
1231 :
1232 : /*
1233 : * Remember that we have this GlobalTransaction entry locked for us. If
1234 : * we crash after this point, it's too late to abort, but we must unlock
3251 heikki.linnakangas 1235 ECB : * it so that the prepared transaction can be committed or rolled back.
1236 : */
3251 heikki.linnakangas 1237 CBC 354 : MyLockedGxact = gxact;
1238 :
6505 tgl 1239 GIC 354 : END_CRIT_SECTION();
1240 :
1241 : /*
1242 : * Wait for synchronous replication, if required.
1243 : *
1244 : * Note that at this stage we have marked the prepare, but still show as
4417 simon 1245 ECB : * running in the procarray (twice!) and continue to hold locks.
1246 : */
2567 rhaas 1247 CBC 354 : SyncRepWaitForLSN(gxact->prepare_end_lsn, false);
4417 simon 1248 ECB :
6505 tgl 1249 CBC 354 : records.tail = records.head = NULL;
3062 heikki.linnakangas 1250 GIC 354 : records.num_chunks = 0;
6505 tgl 1251 354 : }
1252 :
1253 : /*
1254 : * Register a 2PC record to be written to state file.
6505 tgl 1255 ECB : */
1256 : void
6505 tgl 1257 GIC 1660 : RegisterTwoPhaseRecord(TwoPhaseRmgrId rmid, uint16 info,
1258 : const void *data, uint32 len)
1259 : {
6505 tgl 1260 ECB : TwoPhaseRecordOnDisk record;
1261 :
6505 tgl 1262 CBC 1660 : record.rmid = rmid;
1263 1660 : record.info = info;
1264 1660 : record.len = len;
1265 1660 : save_state_data(&record, sizeof(TwoPhaseRecordOnDisk));
1266 1660 : if (len > 0)
6505 tgl 1267 GIC 1306 : save_state_data(data, len);
1268 1660 : }
1269 :
1270 :
1271 : /*
1272 : * Read and validate the state file for xid.
1273 : *
1274 : * If it looks OK (has a valid magic number and CRC), return the palloc'd
1275 : * contents of the file, issuing an error when finding corrupted data. If
1276 : * missing_ok is true, which indicates that missing files can be safely
1277 : * ignored, then return NULL. This state can be reached when doing recovery.
6505 tgl 1278 ECB : */
1279 : static char *
1675 michael 1280 GIC 78 : ReadTwoPhaseFile(TransactionId xid, bool missing_ok)
1281 : {
1282 : char path[MAXPGPATH];
1283 : char *buf;
1284 : TwoPhaseFileHeader *hdr;
1285 : int fd;
1286 : struct stat stat;
1287 : uint32 crc_offset;
1288 : pg_crc32c calc_crc,
1289 : file_crc;
1726 michael 1290 ECB : int r;
1291 :
6505 tgl 1292 CBC 78 : TwoPhaseFilePath(path, xid);
6505 tgl 1293 ECB :
2024 peter_e 1294 GIC 78 : fd = OpenTransientFile(path, O_RDONLY | PG_BINARY);
6505 tgl 1295 CBC 78 : if (fd < 0)
6505 tgl 1296 ECB : {
1675 michael 1297 GIC 17 : if (missing_ok && errno == ENOENT)
1675 michael 1298 GBC 17 : return NULL;
1299 :
1675 michael 1300 UIC 0 : ereport(ERROR,
1301 : (errcode_for_file_access(),
1302 : errmsg("could not open file \"%s\": %m", path)));
1303 : }
1304 :
1305 : /*
1306 : * Check file length. We can determine a lower bound pretty easily. We
1307 : * set an upper bound to avoid palloc() failure on a corrupt file, though
1308 : * we can't guarantee that we won't get an out of memory error anyway,
5438 heikki.linnakangas 1309 ECB : * even on a valid file.
6505 tgl 1310 EUB : */
6505 tgl 1311 GIC 61 : if (fstat(fd, &stat))
1675 michael 1312 UIC 0 : ereport(ERROR,
1313 : (errcode_for_file_access(),
1675 michael 1314 ECB : errmsg("could not stat file \"%s\": %m", path)));
1315 :
6505 tgl 1316 CBC 61 : if (stat.st_size < (MAXALIGN(sizeof(TwoPhaseFileHeader)) +
6505 tgl 1317 ECB : MAXALIGN(sizeof(TwoPhaseRecordOnDisk)) +
2917 heikki.linnakangas 1318 GBC 61 : sizeof(pg_crc32c)) ||
5438 heikki.linnakangas 1319 GIC 61 : stat.st_size > MaxAllocSize)
1675 michael 1320 UIC 0 : ereport(ERROR,
1321 : (errcode(ERRCODE_DATA_CORRUPTED),
1322 : errmsg_plural("incorrect size of file \"%s\": %lld byte",
1323 : "incorrect size of file \"%s\": %lld bytes",
1324 : (long long int) stat.st_size, path,
927 peter 1325 ECB : (long long int) stat.st_size)));
6505 tgl 1326 :
2917 heikki.linnakangas 1327 GBC 61 : crc_offset = stat.st_size - sizeof(pg_crc32c);
6505 tgl 1328 GIC 61 : if (crc_offset != MAXALIGN(crc_offset))
1675 michael 1329 UIC 0 : ereport(ERROR,
1330 : (errcode(ERRCODE_DATA_CORRUPTED),
1331 : errmsg("incorrect alignment of CRC offset for file \"%s\"",
1332 : path)));
1333 :
1334 : /*
6505 tgl 1335 ECB : * OK, slurp in the file.
1336 : */
6505 tgl 1337 CBC 61 : buf = (char *) palloc(stat.st_size);
6505 tgl 1338 ECB :
2213 rhaas 1339 CBC 61 : pgstat_report_wait_start(WAIT_EVENT_TWOPHASE_FILE_READ);
1726 michael 1340 GIC 61 : r = read(fd, buf, stat.st_size);
1726 michael 1341 GBC 61 : if (r != stat.st_size)
6505 tgl 1342 EUB : {
1675 michael 1343 UIC 0 : if (r < 0)
1344 0 : ereport(ERROR,
1345 : (errcode_for_file_access(),
1675 michael 1346 EUB : errmsg("could not read file \"%s\": %m", path)));
1347 : else
1675 michael 1348 UIC 0 : ereport(ERROR,
1349 : (errmsg("could not read file \"%s\": read %d of %lld",
1350 : path, r, (long long int) stat.st_size)));
6505 tgl 1351 ECB : }
1352 :
2213 rhaas 1353 CBC 61 : pgstat_report_wait_end();
1492 michael 1354 EUB :
1373 peter 1355 GIC 61 : if (CloseTransientFile(fd) != 0)
1492 michael 1356 UIC 0 : ereport(ERROR,
1357 : (errcode_for_file_access(),
1492 michael 1358 ECB : errmsg("could not close file \"%s\": %m", path)));
6505 tgl 1359 :
6505 tgl 1360 GBC 61 : hdr = (TwoPhaseFileHeader *) buf;
1675 michael 1361 GIC 61 : if (hdr->magic != TWOPHASE_MAGIC)
1675 michael 1362 UIC 0 : ereport(ERROR,
1363 : (errcode(ERRCODE_DATA_CORRUPTED),
1364 : errmsg("invalid magic number stored in file \"%s\"",
1675 michael 1365 ECB : path)));
1675 michael 1366 EUB :
1675 michael 1367 GIC 61 : if (hdr->total_len != stat.st_size)
1675 michael 1368 UIC 0 : ereport(ERROR,
1369 : (errcode(ERRCODE_DATA_CORRUPTED),
1370 : errmsg("invalid size stored in file \"%s\"",
1675 michael 1371 ECB : path)));
6505 tgl 1372 :
3078 heikki.linnakangas 1373 CBC 61 : INIT_CRC32C(calc_crc);
3078 heikki.linnakangas 1374 GIC 61 : COMP_CRC32C(calc_crc, buf, crc_offset);
3078 heikki.linnakangas 1375 CBC 61 : FIN_CRC32C(calc_crc);
1376 :
2917 1377 61 : file_crc = *((pg_crc32c *) (buf + crc_offset));
6505 tgl 1378 EUB :
3078 heikki.linnakangas 1379 GIC 61 : if (!EQ_CRC32C(calc_crc, file_crc))
1675 michael 1380 UIC 0 : ereport(ERROR,
1381 : (errcode(ERRCODE_DATA_CORRUPTED),
1382 : errmsg("calculated CRC checksum does not match value stored in file \"%s\"",
1675 michael 1383 ECB : path)));
1384 :
6505 tgl 1385 GIC 61 : return buf;
1386 : }
1387 :
1388 :
1389 : /*
1390 : * Reads 2PC data from xlog. During checkpoint this data will be moved to
1391 : * twophase files and ReadTwoPhaseFile should be used instead.
1392 : *
1393 : * Note clearly that this function can access WAL during normal operation,
1394 : * similarly to the way WALSender or Logical Decoding would do.
2636 simon 1395 ECB : */
1396 : static void
2636 simon 1397 GIC 423 : XlogReadTwoPhaseData(XLogRecPtr lsn, char **buf, int *len)
1398 : {
1399 : XLogRecord *record;
1400 : XLogReaderState *xlogreader;
2636 simon 1401 ECB : char *errormsg;
1402 :
699 tmunro 1403 GIC 423 : xlogreader = XLogReaderAllocate(wal_segment_size, NULL,
1404 423 : XL_ROUTINE(.page_read = &read_local_xlog_page,
1405 : .segment_open = &wal_segment_open,
699 tmunro 1406 ECB : .segment_close = &wal_segment_close),
699 tmunro 1407 EUB : NULL);
2636 simon 1408 GIC 423 : if (!xlogreader)
2636 simon 1409 UIC 0 : ereport(ERROR,
1410 : (errcode(ERRCODE_OUT_OF_MEMORY),
1411 : errmsg("out of memory"),
2118 tgl 1412 ECB : errdetail("Failed while allocating a WAL reading processor.")));
2636 simon 1413 :
1169 heikki.linnakangas 1414 GIC 423 : XLogBeginRead(xlogreader, lsn);
699 tmunro 1415 CBC 423 : record = XLogReadRecord(xlogreader, &errormsg);
1416 :
2636 simon 1417 GBC 423 : if (record == NULL)
514 noah 1418 EUB : {
514 noah 1419 UIC 0 : if (errormsg)
1420 0 : ereport(ERROR,
1421 : (errcode_for_file_access(),
1422 : errmsg("could not read two-phase state from WAL at %X/%X: %s",
514 noah 1423 EUB : LSN_FORMAT_ARGS(lsn), errormsg)));
1424 : else
514 noah 1425 UIC 0 : ereport(ERROR,
1426 : (errcode_for_file_access(),
1427 : errmsg("could not read two-phase state from WAL at %X/%X",
1428 : LSN_FORMAT_ARGS(lsn))));
514 noah 1429 ECB : }
2636 simon 1430 :
2636 simon 1431 GBC 423 : if (XLogRecGetRmid(xlogreader) != RM_XACT_ID ||
2636 simon 1432 GIC 423 : (XLogRecGetInfo(xlogreader) & XLOG_XACT_OPMASK) != XLOG_XACT_PREPARE)
2636 simon 1433 UIC 0 : ereport(ERROR,
1434 : (errcode_for_file_access(),
1435 : errmsg("expected two-phase state data is not present in WAL at %X/%X",
775 peter 1436 ECB : LSN_FORMAT_ARGS(lsn))));
2636 simon 1437 :
2636 simon 1438 GIC 423 : if (len != NULL)
2636 simon 1439 CBC 21 : *len = XLogRecGetDataLen(xlogreader);
2636 simon 1440 ECB :
2495 rhaas 1441 GIC 423 : *buf = palloc(sizeof(char) * XLogRecGetDataLen(xlogreader));
2636 simon 1442 CBC 423 : memcpy(*buf, XLogRecGetData(xlogreader), sizeof(char) * XLogRecGetDataLen(xlogreader));
2636 simon 1443 ECB :
2636 simon 1444 GIC 423 : XLogReaderFree(xlogreader);
1445 423 : }
1446 :
1447 :
1448 : /*
1449 : * Confirms an xid is prepared, during recovery
4859 simon 1450 ECB : */
1451 : bool
4859 simon 1452 GIC 17 : StandbyTransactionIdIsPrepared(TransactionId xid)
1453 : {
1454 : char *buf;
1455 : TwoPhaseFileHeader *hdr;
4859 simon 1456 ECB : bool result;
1457 :
4859 simon 1458 CBC 17 : Assert(TransactionIdIsValid(xid));
4859 simon 1459 EUB :
4729 tgl 1460 GIC 17 : if (max_prepared_xacts <= 0)
4660 bruce 1461 UIC 0 : return false; /* nothing to do */
4729 tgl 1462 ECB :
4859 simon 1463 : /* Read and validate file */
1675 michael 1464 CBC 17 : buf = ReadTwoPhaseFile(xid, true);
4859 simon 1465 GIC 17 : if (buf == NULL)
1466 17 : return false;
4859 simon 1467 EUB :
1468 : /* Check header also */
4859 simon 1469 UBC 0 : hdr = (TwoPhaseFileHeader *) buf;
4859 simon 1470 UIC 0 : result = TransactionIdEquals(hdr->xid, xid);
4859 simon 1471 UBC 0 : pfree(buf);
1472 :
4859 simon 1473 UIC 0 : return result;
1474 : }
1475 :
1476 : /*
1477 : * FinishPreparedTransaction: execute COMMIT PREPARED or ROLLBACK PREPARED
6505 tgl 1478 ECB : */
1479 : void
6504 tgl 1480 GIC 369 : FinishPreparedTransaction(const char *gid, bool isCommit)
1481 : {
1482 : GlobalTransaction gxact;
1483 : PGPROC *proc;
1484 : TransactionId xid;
1485 : char *buf;
1486 : char *bufptr;
1487 : TwoPhaseFileHeader *hdr;
1488 : TransactionId latestXid;
1489 : TransactionId *children;
1490 : RelFileLocator *commitrels;
1491 : RelFileLocator *abortrels;
1492 : RelFileLocator *delrels;
1493 : int ndelrels;
1494 : xl_xact_stats_item *commitstats;
1495 : xl_xact_stats_item *abortstats;
1496 : SharedInvalidationMessage *invalmsgs;
1497 :
1498 : /*
1499 : * Validate the GID, and lock the GXACT to ensure that two backends do not
6385 bruce 1500 ECB : * try to commit the same GID at once.
6505 tgl 1501 : */
6505 tgl 1502 CBC 369 : gxact = LockGXact(gid, GetUserId());
4153 rhaas 1503 GIC 363 : proc = &ProcGlobal->allProcs[gxact->pgprocno];
968 andres 1504 363 : xid = gxact->xid;
1505 :
1506 : /*
1507 : * Read and validate 2PC state data. State data will typically be stored
1508 : * in WAL files if the LSN is after the last checkpoint record, or moved
2495 rhaas 1509 ECB : * to disk if for some reason they have lived for a long time.
6505 tgl 1510 : */
2636 simon 1511 GIC 363 : if (gxact->ondisk)
1675 michael 1512 CBC 26 : buf = ReadTwoPhaseFile(xid, false);
1513 : else
2636 simon 1514 GIC 337 : XlogReadTwoPhaseData(gxact->prepare_start_lsn, &buf, NULL);
1515 :
1516 :
1517 : /*
6505 tgl 1518 ECB : * Disassemble the header area
1519 : */
6505 tgl 1520 CBC 363 : hdr = (TwoPhaseFileHeader *) buf;
1521 363 : Assert(TransactionIdEquals(hdr->xid, xid));
1522 363 : bufptr = buf + MAXALIGN(sizeof(TwoPhaseFileHeader));
2586 simon 1523 363 : bufptr += MAXALIGN(hdr->gidlen);
6505 tgl 1524 363 : children = (TransactionId *) bufptr;
1525 363 : bufptr += MAXALIGN(hdr->nsubxacts * sizeof(TransactionId));
277 rhaas 1526 GNC 363 : commitrels = (RelFileLocator *) bufptr;
1527 363 : bufptr += MAXALIGN(hdr->ncommitrels * sizeof(RelFileLocator));
1528 363 : abortrels = (RelFileLocator *) bufptr;
1529 363 : bufptr += MAXALIGN(hdr->nabortrels * sizeof(RelFileLocator));
332 tgl 1530 CBC 363 : commitstats = (xl_xact_stats_item *) bufptr;
368 andres 1531 363 : bufptr += MAXALIGN(hdr->ncommitstats * sizeof(xl_xact_stats_item));
332 tgl 1532 363 : abortstats = (xl_xact_stats_item *) bufptr;
368 andres 1533 363 : bufptr += MAXALIGN(hdr->nabortstats * sizeof(xl_xact_stats_item));
4859 simon 1534 GIC 363 : invalmsgs = (SharedInvalidationMessage *) bufptr;
1535 363 : bufptr += MAXALIGN(hdr->ninvalmsgs * sizeof(SharedInvalidationMessage));
6505 tgl 1536 ECB :
1537 : /* compute latestXid among all children */
5692 tgl 1538 GIC 363 : latestXid = TransactionIdLatest(xid, hdr->nsubxacts, children);
5692 tgl 1539 ECB :
1540 : /* Prevent cancel/die interrupt while cleaning up */
1802 teodor 1541 GIC 363 : HOLD_INTERRUPTS();
1542 :
1543 : /*
1544 : * The order of operations here is critical: make the XLOG entry for
1545 : * commit or abort, then mark the transaction committed or aborted in
1546 : * pg_xact, then remove its PGPROC from the global ProcArray (which means
1547 : * TransactionIdIsInProgress will stop saying the prepared xact is in
1548 : * progress), then run the post-commit or post-abort callbacks. The
6385 bruce 1549 ECB : * callbacks will release the locks the transaction held.
6505 tgl 1550 : */
6505 tgl 1551 GIC 363 : if (isCommit)
1552 325 : RecordTransactionCommitPrepared(xid,
1553 : hdr->nsubxacts, children,
1554 : hdr->ncommitrels, commitrels,
1555 : hdr->ncommitstats,
368 andres 1556 ECB : commitstats,
1557 : hdr->ninvalmsgs, invalmsgs,
1838 simon 1558 CBC 325 : hdr->initfileinval, gid);
1559 : else
6505 tgl 1560 GIC 38 : RecordTransactionAbortPrepared(xid,
1561 : hdr->nsubxacts, children,
1562 : hdr->nabortrels, abortrels,
1563 : hdr->nabortstats,
1564 : abortstats,
1838 simon 1565 ECB : gid);
1566 :
4153 rhaas 1567 GIC 363 : ProcArrayRemove(proc, latestXid);
1568 :
1569 : /*
1570 : * In case we fail while running the callbacks, mark the gxact invalid so
1571 : * no one else will try to commit/rollback, and so it will be recycled if
1572 : * we fail after this point. It is still locked by our backend so it
1573 : * won't go away yet.
1574 : *
6503 tgl 1575 ECB : * (We assume it's safe to do this without taking TwoPhaseStateLock.)
1576 : */
6505 tgl 1577 GIC 363 : gxact->valid = false;
1578 :
1579 : /*
1580 : * We have to remove any files that were supposed to be dropped. For
1581 : * consistency with the regular xact.c code paths, must do this before
1582 : * releasing locks, so do it before running the callbacks.
1583 : *
6505 tgl 1584 ECB : * NB: this code knows that we couldn't be dropping any temp rels ...
1585 : */
6505 tgl 1586 CBC 363 : if (isCommit)
6505 tgl 1587 ECB : {
5254 heikki.linnakangas 1588 GIC 325 : delrels = commitrels;
1589 325 : ndelrels = hdr->ncommitrels;
1590 : }
6505 tgl 1591 ECB : else
1592 : {
5254 heikki.linnakangas 1593 GIC 38 : delrels = abortrels;
1594 38 : ndelrels = hdr->nabortrels;
1595 : }
5254 heikki.linnakangas 1596 ECB :
1597 : /* Make sure files supposed to be dropped are dropped */
1739 fujii 1598 CBC 363 : DropRelationFiles(delrels, ndelrels, false);
6505 tgl 1599 ECB :
368 andres 1600 GIC 363 : if (isCommit)
368 andres 1601 CBC 325 : pgstat_execute_transactional_drops(hdr->ncommitstats, commitstats, false);
1602 : else
368 andres 1603 GIC 38 : pgstat_execute_transactional_drops(hdr->nabortstats, abortstats, false);
1604 :
1605 : /*
1606 : * Handle cache invalidation messages.
1607 : *
1608 : * Relcache init file invalidation requires processing both before and
1609 : * after we send the SI messages, only when committing. See
605 michael 1610 ECB : * AtEOXact_Inval().
1611 : */
605 michael 1612 CBC 363 : if (isCommit)
605 michael 1613 EUB : {
605 michael 1614 CBC 325 : if (hdr->initfileinval)
605 michael 1615 LBC 0 : RelationCacheInitFilePreInvalidate();
605 michael 1616 GBC 325 : SendSharedInvalidMessages(invalmsgs, hdr->ninvalmsgs);
605 michael 1617 GIC 325 : if (hdr->initfileinval)
605 michael 1618 UIC 0 : RelationCacheInitFilePostInvalidate();
1619 : }
1620 :
1621 : /*
1622 : * Acquire the two-phase lock. We want to work on the two-phase callbacks
1623 : * while holding it to avoid potential conflicts with other transactions
1624 : * attempting to use the same GID, so the lock is released once the shared
1504 michael 1625 ECB : * memory state is cleared.
1626 : */
1504 michael 1627 GIC 363 : LWLockAcquire(TwoPhaseStateLock, LW_EXCLUSIVE);
1504 michael 1628 ECB :
6504 tgl 1629 : /* And now do the callbacks */
6504 tgl 1630 GIC 363 : if (isCommit)
6504 tgl 1631 CBC 325 : ProcessRecords(bufptr, xid, twophase_postcommit_callbacks);
1632 : else
1633 38 : ProcessRecords(bufptr, xid, twophase_postabort_callbacks);
1634 :
4444 heikki.linnakangas 1635 GIC 363 : PredicateLockTwoPhaseFinish(xid, isCommit);
4444 heikki.linnakangas 1636 ECB :
1637 : /* Clear shared memory state */
1504 michael 1638 GIC 363 : RemoveGXact(gxact);
1639 :
1640 : /*
1641 : * Release the lock as all callbacks are called and shared memory cleanup
1504 michael 1642 ECB : * is done.
1643 : */
1504 michael 1644 GIC 363 : LWLockRelease(TwoPhaseStateLock);
1504 michael 1645 ECB :
1646 : /* Count the prepared xact as committed or aborted */
1460 akapila 1647 GIC 363 : AtEOXact_PgStat(isCommit, false);
1648 :
1649 : /*
2636 simon 1650 ECB : * And now we can clean up any files we may have left.
6505 tgl 1651 : */
2636 simon 1652 GIC 363 : if (gxact->ondisk)
2636 simon 1653 CBC 26 : RemoveTwoPhaseFile(xid, true);
1654 :
3251 heikki.linnakangas 1655 363 : MyLockedGxact = NULL;
1656 :
1802 teodor 1657 363 : RESUME_INTERRUPTS();
1802 teodor 1658 ECB :
6505 tgl 1659 GIC 363 : pfree(buf);
1660 363 : }
1661 :
1662 : /*
1663 : * Scan 2PC state data in memory and call the indicated callbacks for each 2PC record.
6505 tgl 1664 ECB : */
1665 : static void
6505 tgl 1666 GIC 393 : ProcessRecords(char *bufptr, TransactionId xid,
1667 : const TwoPhaseCallback callbacks[])
6505 tgl 1668 ECB : {
1669 : for (;;)
6505 tgl 1670 GIC 1495 : {
6505 tgl 1671 CBC 1888 : TwoPhaseRecordOnDisk *record = (TwoPhaseRecordOnDisk *) bufptr;
6505 tgl 1672 ECB :
6505 tgl 1673 CBC 1888 : Assert(record->rmid <= TWOPHASE_RM_MAX_ID);
6505 tgl 1674 GIC 1888 : if (record->rmid == TWOPHASE_RM_END_ID)
6505 tgl 1675 CBC 393 : break;
1676 :
1677 1495 : bufptr += MAXALIGN(sizeof(TwoPhaseRecordOnDisk));
6505 tgl 1678 ECB :
6505 tgl 1679 GIC 1495 : if (callbacks[record->rmid] != NULL)
6385 bruce 1680 1422 : callbacks[record->rmid] (xid, record->info,
6385 bruce 1681 ECB : (void *) bufptr, record->len);
1682 :
6505 tgl 1683 CBC 1495 : bufptr += MAXALIGN(record->len);
1684 : }
6505 tgl 1685 GIC 393 : }
1686 :
1687 : /*
1688 : * Remove the 2PC file for the specified XID.
1689 : *
1690 : * If giveWarning is false, do not complain about file-not-present;
1691 : * this is an expected case during WAL replay.
6505 tgl 1692 ECB : */
1693 : static void
6505 tgl 1694 GIC 28 : RemoveTwoPhaseFile(TransactionId xid, bool giveWarning)
1695 : {
6385 bruce 1696 ECB : char path[MAXPGPATH];
6505 tgl 1697 :
6505 tgl 1698 GBC 28 : TwoPhaseFilePath(path, xid);
1699 28 : if (unlink(path))
6505 tgl 1700 UIC 0 : if (errno != ENOENT || giveWarning)
1701 0 : ereport(WARNING,
6505 tgl 1702 ECB : (errcode_for_file_access(),
1703 : errmsg("could not remove file \"%s\": %m", path)));
6505 tgl 1704 GIC 28 : }
1705 :
1706 : /*
1707 : * Recreates a state file. This is used in WAL replay and during
1708 : * checkpoint creation.
1709 : *
1710 : * Note: content and len don't include CRC.
6505 tgl 1711 ECB : */
1712 : static void
6505 tgl 1713 GIC 21 : RecreateTwoPhaseFile(TransactionId xid, void *content, int len)
1714 : {
1715 : char path[MAXPGPATH];
1716 : pg_crc32c statefile_crc;
1717 : int fd;
6505 tgl 1718 ECB :
1719 : /* Recompute CRC */
3078 heikki.linnakangas 1720 CBC 21 : INIT_CRC32C(statefile_crc);
3078 heikki.linnakangas 1721 GIC 21 : COMP_CRC32C(statefile_crc, content, len);
3078 heikki.linnakangas 1722 CBC 21 : FIN_CRC32C(statefile_crc);
1723 :
6505 tgl 1724 21 : TwoPhaseFilePath(path, xid);
1725 :
3785 heikki.linnakangas 1726 21 : fd = OpenTransientFile(path,
2024 peter_e 1727 EUB : O_CREAT | O_TRUNC | O_WRONLY | PG_BINARY);
6505 tgl 1728 GIC 21 : if (fd < 0)
6505 tgl 1729 UIC 0 : ereport(ERROR,
1730 : (errcode_for_file_access(),
1731 : errmsg("could not recreate file \"%s\": %m", path)));
6505 tgl 1732 ECB :
1733 : /* Write content and CRC */
1708 michael 1734 CBC 21 : errno = 0;
2213 rhaas 1735 GIC 21 : pgstat_report_wait_start(WAIT_EVENT_TWOPHASE_FILE_WRITE);
6505 tgl 1736 21 : if (write(fd, content, len) != len)
6505 tgl 1737 EUB : {
1749 michael 1738 : /* if write didn't set errno, assume problem is no disk space */
1453 michael 1739 UBC 0 : if (errno == 0)
1453 michael 1740 UIC 0 : errno = ENOSPC;
6505 tgl 1741 0 : ereport(ERROR,
1742 : (errcode_for_file_access(),
1726 michael 1743 ECB : errmsg("could not write file \"%s\": %m", path)));
1744 : }
2917 heikki.linnakangas 1745 GIC 21 : if (write(fd, &statefile_crc, sizeof(pg_crc32c)) != sizeof(pg_crc32c))
6505 tgl 1746 EUB : {
1749 michael 1747 : /* if write didn't set errno, assume problem is no disk space */
1453 michael 1748 UBC 0 : if (errno == 0)
1453 michael 1749 UIC 0 : errno = ENOSPC;
6505 tgl 1750 0 : ereport(ERROR,
1751 : (errcode_for_file_access(),
1726 michael 1752 ECB : errmsg("could not write file \"%s\": %m", path)));
1753 : }
2213 rhaas 1754 GIC 21 : pgstat_report_wait_end();
1755 :
1756 : /*
1757 : * We must fsync the file because the end-of-replay checkpoint will not do
6385 bruce 1758 ECB : * so, there being no GXACT in shared memory yet to tell it to.
6503 tgl 1759 : */
2213 rhaas 1760 GBC 21 : pgstat_report_wait_start(WAIT_EVENT_TWOPHASE_FILE_SYNC);
6505 tgl 1761 GIC 21 : if (pg_fsync(fd) != 0)
6505 tgl 1762 UIC 0 : ereport(ERROR,
6505 tgl 1763 ECB : (errcode_for_file_access(),
1764 : errmsg("could not fsync file \"%s\": %m", path)));
2213 rhaas 1765 CBC 21 : pgstat_report_wait_end();
6505 tgl 1766 EUB :
3785 heikki.linnakangas 1767 GIC 21 : if (CloseTransientFile(fd) != 0)
6505 tgl 1768 UIC 0 : ereport(ERROR,
6505 tgl 1769 ECB : (errcode_for_file_access(),
1770 : errmsg("could not close file \"%s\": %m", path)));
6505 tgl 1771 GIC 21 : }
1772 :
1773 : /*
1774 : * CheckPointTwoPhase -- handle 2PC component of checkpointing.
1775 : *
1776 : * We must fsync the state file of any GXACT that is valid or has been
1777 : * generated during redo and has a PREPARE LSN <= the checkpoint's redo
1778 : * horizon. (If the gxact isn't valid yet, has not been generated in
1779 : * redo, or has a later LSN, this checkpoint is not responsible for
1780 : * fsyncing it.)
1781 : *
1782 : * This is deliberately run as late as possible in the checkpoint sequence,
1783 : * because GXACTs ordinarily have short lifespans, and so it is quite
1784 : * possible that GXACTs that were valid at checkpoint start will no longer
1785 : * exist if we wait a little bit. With typical checkpoint settings this
1786 : * will be about 3 minutes for an online checkpoint, so as a result we
1787 : * expect that there will be no GXACTs that need to be copied to disk.
1788 : *
1789 : * If a GXACT remains valid across multiple checkpoints, it will already
1790 : * be on disk so we don't bother to repeat that write.
6503 tgl 1791 ECB : */
1792 : void
6503 tgl 1793 GIC 2363 : CheckPointTwoPhase(XLogRecPtr redo_horizon)
6503 tgl 1794 ECB : {
1795 : int i;
2636 simon 1796 CBC 2363 : int serialized_xacts = 0;
6503 tgl 1797 ECB :
6503 tgl 1798 GIC 2363 : if (max_prepared_xacts <= 0)
1799 2197 : return; /* nothing to do */
1800 :
1801 : TRACE_POSTGRESQL_TWOPHASE_CHECKPOINT_START();
1802 :
1803 : /*
1804 : * We are expecting there to be zero GXACTs that need to be copied to
1805 : * disk, so we perform all I/O while holding TwoPhaseStateLock for
1806 : * simplicity. This prevents any new xacts from preparing while this
1807 : * occurs, which shouldn't be a problem since the presence of long-lived
1808 : * prepared xacts indicates the transaction manager isn't active.
1809 : *
1810 : * It's also possible to move I/O out of the lock, but on every error we
1811 : * should check whether somebody committed our transaction in different
1812 : * backend. Let's leave this optimization for future, if somebody will
1813 : * spot that this place cause bottleneck.
1814 : *
1815 : * Note that it isn't possible for there to be a GXACT with a
1816 : * prepare_end_lsn set prior to the last checkpoint yet is marked invalid,
366 rhaas 1817 ECB : * because of the efforts with delayChkptFlags.
2636 simon 1818 : */
6503 tgl 1819 GIC 166 : LWLockAcquire(TwoPhaseStateLock, LW_SHARED);
1820 193 : for (i = 0; i < TwoPhaseState->numPrepXacts; i++)
1821 : {
1822 : /*
1823 : * Note that we are using gxact not PGPROC so this works in recovery
2153 bruce 1824 ECB : * also
1825 : */
6385 bruce 1826 CBC 27 : GlobalTransaction gxact = TwoPhaseState->prepXacts[i];
6503 tgl 1827 ECB :
2196 simon 1828 CBC 27 : if ((gxact->valid || gxact->inredo) &&
2636 simon 1829 GIC 27 : !gxact->ondisk &&
1830 23 : gxact->prepare_end_lsn <= redo_horizon)
1831 : {
1832 : char *buf;
2495 rhaas 1833 ECB : int len;
6503 tgl 1834 :
2636 simon 1835 CBC 21 : XlogReadTwoPhaseData(gxact->prepare_start_lsn, &buf, &len);
2196 1836 21 : RecreateTwoPhaseFile(gxact->xid, buf, len);
2636 1837 21 : gxact->ondisk = true;
2196 1838 21 : gxact->prepare_start_lsn = InvalidXLogRecPtr;
1839 21 : gxact->prepare_end_lsn = InvalidXLogRecPtr;
2636 simon 1840 GIC 21 : pfree(buf);
1841 21 : serialized_xacts++;
6503 tgl 1842 ECB : }
1843 : }
2636 simon 1844 GIC 166 : LWLockRelease(TwoPhaseStateLock);
1845 :
1846 : /*
1847 : * Flush unconditionally the parent directory to make any information
1848 : * durable on disk. Two-phase files could have been removed and those
1849 : * removals need to be made persistent as well as any files newly created
2204 teodor 1850 ECB : * previously since the last checkpoint.
1851 : */
2204 teodor 1852 GIC 166 : fsync_fname(TWOPHASE_DIR, true);
1853 :
5364 alvherre 1854 ECB : TRACE_POSTGRESQL_TWOPHASE_CHECKPOINT_DONE();
2636 simon 1855 :
2636 simon 1856 GIC 166 : if (log_checkpoints && serialized_xacts > 0)
1857 17 : ereport(LOG,
1858 : (errmsg_plural("%u two-phase state file was written "
1859 : "for a long-running prepared transaction",
1860 : "%u two-phase state files were written "
1861 : "for long-running prepared transactions",
1862 : serialized_xacts,
1863 : serialized_xacts)));
1864 : }
1865 :
1866 : /*
1867 : * restoreTwoPhaseData
1868 : *
1869 : * Scan pg_twophase and fill TwoPhaseState depending on the on-disk data.
1870 : * This is called once at the beginning of recovery, saving any extra
1871 : * lookups in the future. Two-phase files that are newer than the
1872 : * minimum XID horizon are discarded on the way.
2196 simon 1873 ECB : */
1874 : void
2196 simon 1875 GIC 1176 : restoreTwoPhaseData(void)
1876 : {
1877 : DIR *cldir;
2153 bruce 1878 ECB : struct dirent *clde;
2196 simon 1879 :
2125 alvherre 1880 CBC 1176 : LWLockAcquire(TwoPhaseStateLock, LW_EXCLUSIVE);
1952 tgl 1881 GIC 1176 : cldir = AllocateDir(TWOPHASE_DIR);
2196 simon 1882 CBC 3539 : while ((clde = ReadDir(cldir, TWOPHASE_DIR)) != NULL)
2196 simon 1883 ECB : {
2196 simon 1884 GIC 2363 : if (strlen(clde->d_name) == 8 &&
1885 11 : strspn(clde->d_name, "0123456789ABCDEF") == 8)
1886 : {
1887 : TransactionId xid;
2196 simon 1888 ECB : char *buf;
1889 :
2196 simon 1890 CBC 11 : xid = (TransactionId) strtoul(clde->d_name, NULL, 16);
1891 :
1892 11 : buf = ProcessTwoPhaseBuffer(xid, InvalidXLogRecPtr,
2173 simon 1893 EUB : true, false, false);
2196 simon 1894 GIC 11 : if (buf == NULL)
2196 simon 1895 LBC 0 : continue;
1896 :
1838 simon 1897 GIC 11 : PrepareRedoAdd(buf, InvalidXLogRecPtr,
1898 : InvalidXLogRecPtr, InvalidRepOriginId);
2196 simon 1899 ECB : }
1900 : }
2125 alvherre 1901 CBC 1176 : LWLockRelease(TwoPhaseStateLock);
2196 simon 1902 GIC 1176 : FreeDir(cldir);
1903 1176 : }
1904 :
1905 : /*
1906 : * PrescanPreparedTransactions
1907 : *
1908 : * Scan the shared memory entries of TwoPhaseState and determine the range
1909 : * of valid XIDs present. This is run during database startup, after we
1910 : * have completed reading WAL. ShmemVariableCache->nextXid has been set to
1911 : * one more than the highest XID for which evidence exists in WAL.
1912 : *
1913 : * We throw away any prepared xacts with main XID beyond nextXid --- if any
1914 : * are present, it suggests that the DBA has done a PITR recovery to an
1915 : * earlier point in time without cleaning out pg_twophase. We dare not
1916 : * try to recover such prepared xacts since they likely depend on database
1917 : * state that doesn't exist now.
1918 : *
1919 : * However, we will advance nextXid beyond any subxact XIDs belonging to
1920 : * valid prepared xacts. We need to do this since subxact commit doesn't
1921 : * write a WAL entry, and so there might be no evidence in WAL of those
1922 : * subxact XIDs.
1923 : *
1924 : * On corrupted two-phase files, fail immediately. Keeping around broken
1925 : * entries and let replay continue causes harm on the system, and a new
1926 : * backup should be rolled in.
1927 : *
1928 : * Our other responsibility is to determine and return the oldest valid XID
1929 : * among the prepared xacts (if none, return ShmemVariableCache->nextXid).
1930 : * This is needed to synchronize pg_subtrans startup properly.
1931 : *
1932 : * If xids_p and nxids_p are not NULL, pointer to a palloc'd array of all
1933 : * top-level xids is stored in *xids_p. The number of entries in the array
1934 : * is returned in *nxids_p.
6505 tgl 1935 ECB : */
1936 : TransactionId
4859 simon 1937 CBC 1177 : PrescanPreparedTransactions(TransactionId **xids_p, int *nxids_p)
6505 tgl 1938 ECB : {
971 andres 1939 CBC 1177 : FullTransactionId nextXid = ShmemVariableCache->nextXid;
1940 1177 : TransactionId origNextXid = XidFromFullTransactionId(nextXid);
6505 tgl 1941 1177 : TransactionId result = origNextXid;
4859 simon 1942 1177 : TransactionId *xids = NULL;
4859 simon 1943 GIC 1177 : int nxids = 0;
1944 1177 : int allocsize = 0;
2196 simon 1945 ECB : int i;
6505 tgl 1946 :
2125 alvherre 1947 GIC 1177 : LWLockAcquire(TwoPhaseStateLock, LW_EXCLUSIVE);
2196 simon 1948 1219 : for (i = 0; i < TwoPhaseState->numPrepXacts; i++)
1949 : {
2196 simon 1950 ECB : TransactionId xid;
1951 : char *buf;
2196 simon 1952 CBC 42 : GlobalTransaction gxact = TwoPhaseState->prepXacts[i];
1953 :
1954 42 : Assert(gxact->inredo);
1955 :
1956 42 : xid = gxact->xid;
1957 :
1958 42 : buf = ProcessTwoPhaseBuffer(xid,
1959 : gxact->prepare_start_lsn,
2153 bruce 1960 42 : gxact->ondisk, false, true);
6505 tgl 1961 EUB :
2196 simon 1962 GIC 42 : if (buf == NULL)
2196 simon 1963 UIC 0 : continue;
1964 :
1965 : /*
1966 : * OK, we think this file is valid. Incorporate xid into the
2182 simon 1967 ECB : * running-minimum result.
1968 : */
2182 simon 1969 GIC 42 : if (TransactionIdPrecedes(xid, result))
2182 simon 1970 CBC 36 : result = xid;
1971 :
2196 1972 42 : if (xids_p)
1973 : {
1974 12 : if (nxids == allocsize)
1975 : {
1976 10 : if (nxids == 0)
6505 tgl 1977 ECB : {
2196 simon 1978 GIC 10 : allocsize = 10;
1979 10 : xids = palloc(allocsize * sizeof(TransactionId));
1980 : }
2196 simon 1981 EUB : else
4859 1982 : {
2196 simon 1983 UIC 0 : allocsize = allocsize * 2;
1984 0 : xids = repalloc(xids, allocsize * sizeof(TransactionId));
4859 simon 1985 ECB : }
1986 : }
2196 simon 1987 GIC 12 : xids[nxids++] = xid;
6505 tgl 1988 ECB : }
1989 :
2196 simon 1990 CBC 42 : pfree(buf);
1991 : }
1992 1177 : LWLockRelease(TwoPhaseStateLock);
1993 :
4859 1994 1177 : if (xids_p)
4859 simon 1995 ECB : {
4859 simon 1996 GIC 35 : *xids_p = xids;
1997 35 : *nxids_p = nxids;
4859 simon 1998 ECB : }
1999 :
6505 tgl 2000 GIC 1177 : return result;
2001 : }
2002 :
2003 : /*
2004 : * StandbyRecoverPreparedTransactions
2005 : *
2006 : * Scan the shared memory entries of TwoPhaseState and setup all the required
2007 : * information to allow standby queries to treat prepared transactions as still
2008 : * active.
2009 : *
2010 : * This is never called at the end of recovery - we use
2011 : * RecoverPreparedTransactions() at that point.
2012 : *
2013 : * The lack of calls to SubTransSetParent() calls here is by design;
2014 : * those calls are made by RecoverPreparedTransactions() at the end of recovery
2015 : * for those xacts that need this.
4744 heikki.linnakangas 2016 ECB : */
2017 : void
2173 simon 2018 GIC 35 : StandbyRecoverPreparedTransactions(void)
2019 : {
2196 simon 2020 ECB : int i;
4744 heikki.linnakangas 2021 :
2125 alvherre 2022 GIC 35 : LWLockAcquire(TwoPhaseStateLock, LW_EXCLUSIVE);
2196 simon 2023 47 : for (i = 0; i < TwoPhaseState->numPrepXacts; i++)
2024 : {
2196 simon 2025 ECB : TransactionId xid;
2026 : char *buf;
2196 simon 2027 CBC 12 : GlobalTransaction gxact = TwoPhaseState->prepXacts[i];
2028 :
2029 12 : Assert(gxact->inredo);
2030 :
2031 12 : xid = gxact->xid;
2032 :
2033 12 : buf = ProcessTwoPhaseBuffer(xid,
2153 bruce 2034 ECB : gxact->prepare_start_lsn,
2153 bruce 2035 CBC 12 : gxact->ondisk, false, false);
2196 simon 2036 GIC 12 : if (buf != NULL)
2404 simon 2037 CBC 12 : pfree(buf);
4744 heikki.linnakangas 2038 ECB : }
2196 simon 2039 GIC 35 : LWLockRelease(TwoPhaseStateLock);
4744 heikki.linnakangas 2040 35 : }
2041 :
2042 : /*
2043 : * RecoverPreparedTransactions
2044 : *
2045 : * Scan the shared memory entries of TwoPhaseState and reload the state for
2046 : * each prepared transaction (reacquire locks, etc).
2047 : *
2048 : * This is run at the end of recovery, but before we allow backends to write
2049 : * WAL.
2050 : *
2051 : * At the end of recovery the way we take snapshots will change. We now need
2052 : * to mark all running transactions with their full SubTransSetParent() info
2053 : * to allow normal snapshots to work correctly if snapshots overflow.
2054 : * We do this here because by definition prepared transactions are the only
2055 : * type of write transaction still running, so this is necessary and
2056 : * complete.
6505 tgl 2057 ECB : */
2058 : void
6505 tgl 2059 GIC 1142 : RecoverPreparedTransactions(void)
2060 : {
2196 simon 2061 ECB : int i;
6505 tgl 2062 :
2125 alvherre 2063 GIC 1142 : LWLockAcquire(TwoPhaseStateLock, LW_EXCLUSIVE);
2196 simon 2064 1172 : for (i = 0; i < TwoPhaseState->numPrepXacts; i++)
2065 : {
2196 simon 2066 ECB : TransactionId xid;
2067 : char *buf;
2196 simon 2068 GIC 30 : GlobalTransaction gxact = TwoPhaseState->prepXacts[i];
2069 : char *bufptr;
2070 : TwoPhaseFileHeader *hdr;
2071 : TransactionId *subxids;
2196 simon 2072 ECB : const char *gid;
2073 :
2196 simon 2074 GIC 30 : xid = gxact->xid;
2075 :
2076 : /*
2077 : * Reconstruct subtrans state for the transaction --- needed because
2078 : * pg_subtrans is not preserved over a restart. Note that we are
2079 : * linking all the subtransactions directly to the top-level XID;
2080 : * there may originally have been a more complex hierarchy, but
2081 : * there's no need to restore that exactly. It's possible that
2082 : * SubTransSetParent has been set before, if the prepared transaction
2153 bruce 2083 ECB : * generated xid assignment records.
2084 : */
2196 simon 2085 CBC 30 : buf = ProcessTwoPhaseBuffer(xid,
2153 bruce 2086 ECB : gxact->prepare_start_lsn,
2153 bruce 2087 GBC 30 : gxact->ondisk, true, false);
2196 simon 2088 GIC 30 : if (buf == NULL)
2196 simon 2089 LBC 0 : continue;
2090 :
2196 simon 2091 GIC 30 : ereport(LOG,
2196 simon 2092 ECB : (errmsg("recovering prepared transaction %u from shared memory", xid)));
2093 :
2196 simon 2094 CBC 30 : hdr = (TwoPhaseFileHeader *) buf;
2095 30 : Assert(TransactionIdEquals(hdr->xid, xid));
2096 30 : bufptr = buf + MAXALIGN(sizeof(TwoPhaseFileHeader));
2097 30 : gid = (const char *) bufptr;
2098 30 : bufptr += MAXALIGN(hdr->gidlen);
2099 30 : subxids = (TransactionId *) bufptr;
2100 30 : bufptr += MAXALIGN(hdr->nsubxacts * sizeof(TransactionId));
277 rhaas 2101 GNC 30 : bufptr += MAXALIGN(hdr->ncommitrels * sizeof(RelFileLocator));
2102 30 : bufptr += MAXALIGN(hdr->nabortrels * sizeof(RelFileLocator));
368 andres 2103 CBC 30 : bufptr += MAXALIGN(hdr->ncommitstats * sizeof(xl_xact_stats_item));
368 andres 2104 GIC 30 : bufptr += MAXALIGN(hdr->nabortstats * sizeof(xl_xact_stats_item));
2196 simon 2105 30 : bufptr += MAXALIGN(hdr->ninvalmsgs * sizeof(SharedInvalidationMessage));
2106 :
2107 : /*
2108 : * Recreate its GXACT and dummy PGPROC. But, check whether it was
2153 bruce 2109 ECB : * added in redo and already has a shmem entry for it.
2110 : */
2196 simon 2111 GIC 30 : MarkAsPreparingGuts(gxact, xid, gid,
2112 : hdr->prepared_at,
2113 : hdr->owner, hdr->database);
6505 tgl 2114 ECB :
2115 : /* recovered, so reset the flag for entries generated by redo */
2196 simon 2116 CBC 30 : gxact->inredo = false;
6505 tgl 2117 ECB :
2196 simon 2118 GIC 30 : GXactLoadSubxactData(gxact, hdr->nsubxacts, subxids);
2125 alvherre 2119 CBC 30 : MarkAsPrepared(gxact, true);
2120 :
2125 alvherre 2121 GIC 30 : LWLockRelease(TwoPhaseStateLock);
2122 :
2123 : /*
2125 alvherre 2124 ECB : * Recover other state (notably locks) using resource managers.
2125 : */
2196 simon 2126 GIC 30 : ProcessRecords(bufptr, xid, twophase_recover_callbacks);
2127 :
2128 : /*
2129 : * Release locks held by the standby process after we process each
2130 : * prepared transaction. As a result, we don't need too many
2196 simon 2131 ECB : * additional locks at any one time.
2132 : */
2196 simon 2133 GIC 30 : if (InHotStandby)
2134 6 : StandbyReleaseLockTree(xid, hdr->nsubxacts, subxids);
2135 :
2136 : /*
2137 : * We're done with recovering this transaction. Clear MyLockedGxact,
2153 bruce 2138 ECB : * like we do in PrepareTransaction() during normal operation.
2139 : */
2196 simon 2140 CBC 30 : PostPrepare_Twophase();
2141 :
2142 30 : pfree(buf);
2143 :
2125 alvherre 2144 GIC 30 : LWLockAcquire(TwoPhaseStateLock, LW_EXCLUSIVE);
2196 simon 2145 ECB : }
2125 alvherre 2146 :
2125 alvherre 2147 GIC 1142 : LWLockRelease(TwoPhaseStateLock);
2196 simon 2148 1142 : }
2149 :
2150 : /*
2151 : * ProcessTwoPhaseBuffer
2152 : *
2153 : * Given a transaction id, read it either from disk or read it directly
2154 : * via shmem xlog record pointer using the provided "prepare_start_lsn".
2155 : *
2156 : * If setParent is true, set up subtransaction parent linkages.
2157 : *
2158 : * If setNextXid is true, set ShmemVariableCache->nextXid to the newest
2159 : * value scanned.
2196 simon 2160 ECB : */
2161 : static char *
2196 simon 2162 GIC 95 : ProcessTwoPhaseBuffer(TransactionId xid,
2163 : XLogRecPtr prepare_start_lsn,
2164 : bool fromdisk,
2182 simon 2165 ECB : bool setParent, bool setNextXid)
2196 2166 : {
971 andres 2167 GIC 95 : FullTransactionId nextXid = ShmemVariableCache->nextXid;
2168 95 : TransactionId origNextXid = XidFromFullTransactionId(nextXid);
2169 : TransactionId *subxids;
2170 : char *buf;
2171 : TwoPhaseFileHeader *hdr;
2196 simon 2172 ECB : int i;
2173 :
2125 alvherre 2174 CBC 95 : Assert(LWLockHeldByMeInMode(TwoPhaseStateLock, LW_EXCLUSIVE));
2125 alvherre 2175 ECB :
2196 simon 2176 GIC 95 : if (!fromdisk)
2177 60 : Assert(prepare_start_lsn != InvalidXLogRecPtr);
2196 simon 2178 ECB :
2179 : /* Already processed? */
2196 simon 2180 GBC 95 : if (TransactionIdDidCommit(xid) || TransactionIdDidAbort(xid))
2181 : {
2196 simon 2182 UBC 0 : if (fromdisk)
2183 : {
2196 simon 2184 UIC 0 : ereport(WARNING,
2036 peter_e 2185 EUB : (errmsg("removing stale two-phase state file for transaction %u",
2186 : xid)));
2196 simon 2187 UIC 0 : RemoveTwoPhaseFile(xid, true);
2188 : }
2196 simon 2189 EUB : else
2190 : {
2196 simon 2191 UIC 0 : ereport(WARNING,
2036 peter_e 2192 EUB : (errmsg("removing stale two-phase state from memory for transaction %u",
2193 : xid)));
2196 simon 2194 UBC 0 : PrepareRedoRemove(xid, true);
2195 : }
2196 simon 2196 UIC 0 : return NULL;
2197 : }
2196 simon 2198 ECB :
2199 : /* Reject XID if too new */
2196 simon 2200 GBC 95 : if (TransactionIdFollowsOrEquals(xid, origNextXid))
2201 : {
2196 simon 2202 UBC 0 : if (fromdisk)
2203 : {
2196 simon 2204 UIC 0 : ereport(WARNING,
2036 peter_e 2205 EUB : (errmsg("removing future two-phase state file for transaction %u",
2206 : xid)));
2196 simon 2207 UIC 0 : RemoveTwoPhaseFile(xid, true);
2208 : }
2196 simon 2209 EUB : else
2210 : {
2196 simon 2211 UIC 0 : ereport(WARNING,
2036 peter_e 2212 EUB : (errmsg("removing future two-phase state from memory for transaction %u",
2213 : xid)));
2196 simon 2214 UBC 0 : PrepareRedoRemove(xid, true);
2215 : }
2196 simon 2216 UIC 0 : return NULL;
2196 simon 2217 ECB : }
2218 :
2196 simon 2219 GIC 95 : if (fromdisk)
2196 simon 2220 ECB : {
2221 : /* Read and validate file */
1675 michael 2222 GIC 35 : buf = ReadTwoPhaseFile(xid, false);
2223 : }
2224 : else
2196 simon 2225 ECB : {
2226 : /* Read xlog data */
2196 simon 2227 GIC 60 : XlogReadTwoPhaseData(prepare_start_lsn, &buf, NULL);
2228 : }
2196 simon 2229 ECB :
2230 : /* Deconstruct header */
2196 simon 2231 GIC 95 : hdr = (TwoPhaseFileHeader *) buf;
2196 simon 2232 GBC 95 : if (!TransactionIdEquals(hdr->xid, xid))
2196 simon 2233 EUB : {
2196 simon 2234 UIC 0 : if (fromdisk)
1675 michael 2235 0 : ereport(ERROR,
2236 : (errcode(ERRCODE_DATA_CORRUPTED),
2237 : errmsg("corrupted two-phase state file for transaction %u",
2118 tgl 2238 EUB : xid)));
2239 : else
1675 michael 2240 UIC 0 : ereport(ERROR,
2241 : (errcode(ERRCODE_DATA_CORRUPTED),
2242 : errmsg("corrupted two-phase state in memory for transaction %u",
2243 : xid)));
2244 : }
2245 :
2246 : /*
2247 : * Examine subtransaction XIDs ... they should all follow main XID, and
971 andres 2248 ECB : * they may force us to advance nextXid.
2196 simon 2249 : */
2196 simon 2250 CBC 95 : subxids = (TransactionId *) (buf +
2251 95 : MAXALIGN(sizeof(TwoPhaseFileHeader)) +
2196 simon 2252 GIC 95 : MAXALIGN(hdr->gidlen));
2196 simon 2253 CBC 1858 : for (i = 0; i < hdr->nsubxacts; i++)
2254 : {
2255 1763 : TransactionId subxid = subxids[i];
2256 :
2196 simon 2257 GIC 1763 : Assert(TransactionIdFollows(subxid, xid));
2182 simon 2258 ECB :
971 andres 2259 : /* update nextXid if needed */
1473 tmunro 2260 GIC 1763 : if (setNextXid)
1473 tmunro 2261 CBC 813 : AdvanceNextFullTransactionIdPastXid(subxid);
2182 simon 2262 ECB :
2196 simon 2263 GIC 1763 : if (setParent)
2173 2264 345 : SubTransSetParent(subxid, xid);
2196 simon 2265 ECB : }
2266 :
2196 simon 2267 GIC 95 : return buf;
2268 : }
2269 :
2270 :
2271 : /*
2272 : * RecordTransactionCommitPrepared
2273 : *
2274 : * This is basically the same as RecordTransactionCommit (q.v. if you change
2275 : * this function): in particular, we must set DELAY_CHKPT_START to avoid a
2276 : * race condition.
2277 : *
2278 : * We know the transaction made at least one XLOG entry (its PREPARE),
2279 : * so it is never possible to optimize out the commit record.
6505 tgl 2280 ECB : */
2281 : static void
6505 tgl 2282 GIC 325 : RecordTransactionCommitPrepared(TransactionId xid,
2283 : int nchildren,
2284 : TransactionId *children,
2285 : int nrels,
2286 : RelFileLocator *rels,
2287 : int nstats,
2288 : xl_xact_stats_item *stats,
2289 : int ninvalmsgs,
2290 : SharedInvalidationMessage *invalmsgs,
2291 : bool initfileinval,
2292 : const char *gid)
6505 tgl 2293 ECB : {
2294 : XLogRecPtr recptr;
2749 alvherre 2295 GIC 325 : TimestampTz committs = GetCurrentTimestamp();
2296 : bool replorigin;
2297 :
2298 : /*
2299 : * Are we using the replication origins feature? Or, in other words, are
2749 alvherre 2300 ECB : * we replaying remote actions?
2301 : */
2749 alvherre 2302 GIC 345 : replorigin = (replorigin_session_origin != InvalidRepOriginId &&
2749 alvherre 2303 CBC 20 : replorigin_session_origin != DoNotReplicateId);
2304 :
6505 tgl 2305 GIC 325 : START_CRIT_SECTION();
6505 tgl 2306 ECB :
2307 : /* See notes in RecordTransactionCommit */
366 rhaas 2308 GIC 325 : Assert((MyProc->delayChkptFlags & DELAY_CHKPT_START) == 0);
2309 325 : MyProc->delayChkptFlags |= DELAY_CHKPT_START;
2310 :
2311 : /*
2312 : * Emit the XLOG commit record. Note that we mark 2PC commits as
2313 : * potentially having AccessExclusiveLocks since we don't know whether or
2153 bruce 2314 ECB : * not they do.
2315 : */
2749 alvherre 2316 GIC 325 : recptr = XactLogCommitRecord(committs,
2317 : nchildren, children, nrels, rels,
2318 : nstats, stats,
2947 andres 2319 ECB : ninvalmsgs, invalmsgs,
2320 : initfileinval,
2118 tgl 2321 GIC 325 : MyXactFlags | XACT_FLAGS_ACQUIREDACCESSEXCLUSIVELOCK,
2322 : xid, gid);
6505 tgl 2323 ECB :
2324 :
2749 alvherre 2325 CBC 325 : if (replorigin)
2326 : /* Move LSNs forward for this replication origin */
2749 alvherre 2327 GIC 20 : replorigin_session_advance(replorigin_session_origin_lsn,
2328 : XactLastRecEnd);
2329 :
2330 : /*
2331 : * Record commit timestamp. The value comes from plain commit timestamp
2332 : * if replorigin is not enabled, or replorigin already set a value for us
2333 : * in replorigin_session_origin_timestamp otherwise.
2334 : *
2335 : * We don't need to WAL-log anything here, as the commit record written
2749 alvherre 2336 ECB : * above already contains the data.
2337 : */
2749 alvherre 2338 GIC 325 : if (!replorigin || replorigin_session_origin_timestamp == 0)
2749 alvherre 2339 CBC 305 : replorigin_session_origin_timestamp = committs;
2340 :
2749 alvherre 2341 GIC 325 : TransactionTreeSetCommitTsData(xid, nchildren, children,
2342 : replorigin_session_origin_timestamp,
2343 : replorigin_session_origin);
2344 :
2345 : /*
2346 : * We don't currently try to sleep before flush here ... nor is there any
2347 : * support for async commit of a prepared xact (the very idea is probably
2348 : * a contradiction)
2349 : */
6505 tgl 2350 ECB :
2351 : /* Flush XLOG to disk */
6505 tgl 2352 GIC 325 : XLogFlush(recptr);
6505 tgl 2353 ECB :
2354 : /* Mark the transaction committed in pg_xact */
5284 alvherre 2355 GIC 325 : TransactionIdCommitTree(xid, nchildren, children);
6505 tgl 2356 ECB :
2357 : /* Checkpoint can proceed now */
366 rhaas 2358 CBC 325 : MyProc->delayChkptFlags &= ~DELAY_CHKPT_START;
2359 :
6505 tgl 2360 GIC 325 : END_CRIT_SECTION();
2361 :
2362 : /*
2363 : * Wait for synchronous replication, if required.
2364 : *
2365 : * Note that at this stage we have marked clog, but still show as running
4382 bruce 2366 ECB : * in the procarray and continue to hold locks.
4417 simon 2367 : */
2567 rhaas 2368 GIC 325 : SyncRepWaitForLSN(recptr, true);
6505 tgl 2369 325 : }
2370 :
2371 : /*
2372 : * RecordTransactionAbortPrepared
2373 : *
2374 : * This is basically the same as RecordTransactionAbort.
2375 : *
2376 : * We know the transaction made at least one XLOG entry (its PREPARE),
2377 : * so it is never possible to optimize out the abort record.
6505 tgl 2378 ECB : */
2379 : static void
6505 tgl 2380 GIC 38 : RecordTransactionAbortPrepared(TransactionId xid,
2381 : int nchildren,
2382 : TransactionId *children,
2383 : int nrels,
2384 : RelFileLocator *rels,
2385 : int nstats,
2386 : xl_xact_stats_item *stats,
2387 : const char *gid)
2388 : {
2389 : XLogRecPtr recptr;
2390 : bool replorigin;
2391 :
2392 : /*
2393 : * Are we using the replication origins feature? Or, in other words, are
762 akapila 2394 ECB : * we replaying remote actions?
2395 : */
762 akapila 2396 GIC 44 : replorigin = (replorigin_session_origin != InvalidRepOriginId &&
2397 6 : replorigin_session_origin != DoNotReplicateId);
2398 :
2399 : /*
2400 : * Catch the scenario where we aborted partway through
6505 tgl 2401 ECB : * RecordTransactionCommitPrepared ...
6505 tgl 2402 EUB : */
6505 tgl 2403 GIC 38 : if (TransactionIdDidCommit(xid))
6505 tgl 2404 UIC 0 : elog(PANIC, "cannot abort transaction %u, it was already committed",
6505 tgl 2405 ECB : xid);
2406 :
6505 tgl 2407 GIC 38 : START_CRIT_SECTION();
2408 :
2409 : /*
2410 : * Emit the XLOG commit record. Note that we mark 2PC aborts as
2411 : * potentially having AccessExclusiveLocks since we don't know whether or
2153 bruce 2412 ECB : * not they do.
2413 : */
2947 andres 2414 GIC 38 : recptr = XactLogAbortRecord(GetCurrentTimestamp(),
2415 : nchildren, children,
2947 andres 2416 ECB : nrels, rels,
2417 : nstats, stats,
2118 tgl 2418 GIC 38 : MyXactFlags | XACT_FLAGS_ACQUIREDACCESSEXCLUSIVELOCK,
1838 simon 2419 ECB : xid, gid);
2420 :
762 akapila 2421 CBC 38 : if (replorigin)
2422 : /* Move LSNs forward for this replication origin */
762 akapila 2423 GIC 6 : replorigin_session_advance(replorigin_session_origin_lsn,
2424 : XactLastRecEnd);
762 akapila 2425 ECB :
2426 : /* Always flush, since we're about to remove the 2PC state file */
6505 tgl 2427 GIC 38 : XLogFlush(recptr);
2428 :
2429 : /*
2430 : * Mark the transaction aborted in clog. This is not absolutely necessary
6385 bruce 2431 ECB : * but we may as well do it while we are here.
2432 : */
5284 alvherre 2433 CBC 38 : TransactionIdAbortTree(xid, nchildren, children);
2434 :
6505 tgl 2435 GIC 38 : END_CRIT_SECTION();
2436 :
2437 : /*
2438 : * Wait for synchronous replication, if required.
2439 : *
2440 : * Note that at this stage we have marked clog, but still show as running
4382 bruce 2441 ECB : * in the procarray and continue to hold locks.
4417 simon 2442 : */
2567 rhaas 2443 GIC 38 : SyncRepWaitForLSN(recptr, false);
6505 tgl 2444 38 : }
2445 :
2446 : /*
2447 : * PrepareRedoAdd
2448 : *
2449 : * Store pointers to the start/end of the WAL record along with the xid in
2450 : * a gxact entry in shared memory TwoPhaseState structure. If caller
2451 : * specifies InvalidXLogRecPtr as WAL location to fetch the two-phase
2452 : * data, the entry is marked as located on disk.
2196 simon 2453 ECB : */
2454 : void
1838 simon 2455 GIC 80 : PrepareRedoAdd(char *buf, XLogRecPtr start_lsn,
1838 simon 2456 ECB : XLogRecPtr end_lsn, RepOriginId origin_id)
2457 : {
2196 simon 2458 GIC 80 : TwoPhaseFileHeader *hdr = (TwoPhaseFileHeader *) buf;
2459 : char *bufptr;
2460 : const char *gid;
2196 simon 2461 ECB : GlobalTransaction gxact;
2462 :
2125 alvherre 2463 GIC 80 : Assert(LWLockHeldByMeInMode(TwoPhaseStateLock, LW_EXCLUSIVE));
2196 simon 2464 CBC 80 : Assert(RecoveryInProgress());
2196 simon 2465 ECB :
2196 simon 2466 GIC 80 : bufptr = buf + MAXALIGN(sizeof(TwoPhaseFileHeader));
2467 80 : gid = (const char *) bufptr;
2468 :
2469 : /*
2470 : * Reserve the GID for the given transaction in the redo code path.
2471 : *
2472 : * This creates a gxact struct and puts it into the active array.
2473 : *
2474 : * In redo, this struct is mainly used to track PREPARE/COMMIT entries in
2475 : * shared memory. Hence, we only fill up the bare minimum contents here.
2476 : * The gxact also gets marked with gxact->inredo set to true to indicate
2477 : * that it got added in the redo phase
2478 : */
2196 simon 2479 ECB :
2196 simon 2480 EUB : /* Get a free gxact from the freelist */
2196 simon 2481 GIC 80 : if (TwoPhaseState->freeGXacts == NULL)
2196 simon 2482 UIC 0 : ereport(ERROR,
2483 : (errcode(ERRCODE_OUT_OF_MEMORY),
2484 : errmsg("maximum number of prepared transactions reached"),
2196 simon 2485 ECB : errhint("Increase max_prepared_transactions (currently %d).",
2486 : max_prepared_xacts)));
2196 simon 2487 GIC 80 : gxact = TwoPhaseState->freeGXacts;
2196 simon 2488 CBC 80 : TwoPhaseState->freeGXacts = gxact->next;
2196 simon 2489 ECB :
2196 simon 2490 CBC 80 : gxact->prepared_at = hdr->prepared_at;
2491 80 : gxact->prepare_start_lsn = start_lsn;
2492 80 : gxact->prepare_end_lsn = end_lsn;
2493 80 : gxact->xid = hdr->xid;
2494 80 : gxact->owner = hdr->owner;
2495 80 : gxact->locking_backend = InvalidBackendId;
2496 80 : gxact->valid = false;
2497 80 : gxact->ondisk = XLogRecPtrIsInvalid(start_lsn);
2153 bruce 2498 GIC 80 : gxact->inredo = true; /* yes, added in redo */
2196 simon 2499 80 : strcpy(gxact->gid, gid);
2196 simon 2500 ECB :
2501 : /* And insert it into the active array */
2196 simon 2502 GIC 80 : Assert(TwoPhaseState->numPrepXacts < max_prepared_xacts);
2196 simon 2503 CBC 80 : TwoPhaseState->prepXacts[TwoPhaseState->numPrepXacts++] = gxact;
2504 :
1838 simon 2505 GIC 80 : if (origin_id != InvalidRepOriginId)
1838 simon 2506 ECB : {
2507 : /* recover apply progress */
1838 simon 2508 GIC 13 : replorigin_advance(origin_id, hdr->origin_lsn, end_lsn,
2509 : false /* backward */ , false /* WAL */ );
1838 simon 2510 ECB : }
2511 :
2125 alvherre 2512 GIC 80 : elog(DEBUG2, "added 2PC data in shared memory for transaction %u", gxact->xid);
2196 simon 2513 80 : }
2514 :
2515 : /*
2516 : * PrepareRedoRemove
2517 : *
2518 : * Remove the corresponding gxact entry from TwoPhaseState. Also remove
2519 : * the 2PC file if a prepared transaction was saved via an earlier checkpoint.
2520 : *
2521 : * Caller must hold TwoPhaseStateLock in exclusive mode, because TwoPhaseState
2522 : * is updated.
2196 simon 2523 ECB : */
2524 : void
2196 simon 2525 CBC 58 : PrepareRedoRemove(TransactionId xid, bool giveWarning)
2526 : {
2527 58 : GlobalTransaction gxact = NULL;
2528 : int i;
2182 2529 58 : bool found = false;
2196 simon 2530 ECB :
2125 alvherre 2531 GIC 58 : Assert(LWLockHeldByMeInMode(TwoPhaseStateLock, LW_EXCLUSIVE));
2196 simon 2532 CBC 58 : Assert(RecoveryInProgress());
2533 :
2534 58 : for (i = 0; i < TwoPhaseState->numPrepXacts; i++)
2535 : {
2536 49 : gxact = TwoPhaseState->prepXacts[i];
2537 :
2538 49 : if (gxact->xid == xid)
2196 simon 2539 ECB : {
2196 simon 2540 CBC 49 : Assert(gxact->inredo);
2182 simon 2541 GIC 49 : found = true;
2196 2542 49 : break;
2543 : }
2544 : }
2545 :
2546 : /*
2196 simon 2547 ECB : * Just leave if there is nothing, this is expected during WAL replay.
2548 : */
2182 simon 2549 GIC 58 : if (!found)
2196 2550 9 : return;
2551 :
2552 : /*
2196 simon 2553 ECB : * And now we can clean up any files we may have left.
2554 : */
2125 alvherre 2555 CBC 49 : elog(DEBUG2, "removing 2PC data for transaction %u", xid);
2196 simon 2556 49 : if (gxact->ondisk)
2196 simon 2557 GIC 2 : RemoveTwoPhaseFile(xid, giveWarning);
2558 49 : RemoveGXact(gxact);
2559 : }
2560 :
2561 : /*
2562 : * LookupGXact
2563 : * Check if the prepared transaction with the given GID, lsn and timestamp
2564 : * exists.
2565 : *
2566 : * Note that we always compare with the LSN where prepare ends because that is
2567 : * what is stored as origin_lsn in the 2PC file.
2568 : *
2569 : * This function is primarily used to check if the prepared transaction
2570 : * received from the upstream (remote node) already exists. Checking only GID
2571 : * is not sufficient because a different prepared xact with the same GID can
2572 : * exist on the same node. So, we are ensuring to match origin_lsn and
2573 : * origin_timestamp of prepared xact to avoid the possibility of a match of
2574 : * prepared xact from two different nodes.
634 akapila 2575 ECB : */
2576 : bool
634 akapila 2577 GIC 5 : LookupGXact(const char *gid, XLogRecPtr prepare_end_lsn,
2578 : TimestampTz origin_prepare_timestamp)
634 akapila 2579 ECB : {
2580 : int i;
634 akapila 2581 CBC 5 : bool found = false;
634 akapila 2582 ECB :
634 akapila 2583 GIC 5 : LWLockAcquire(TwoPhaseStateLock, LW_SHARED);
634 akapila 2584 CBC 5 : for (i = 0; i < TwoPhaseState->numPrepXacts; i++)
2585 : {
634 akapila 2586 GIC 5 : GlobalTransaction gxact = TwoPhaseState->prepXacts[i];
634 akapila 2587 ECB :
2588 : /* Ignore not-yet-valid GIDs. */
634 akapila 2589 GIC 5 : if (gxact->valid && strcmp(gxact->gid, gid) == 0)
2590 : {
2591 : char *buf;
2592 : TwoPhaseFileHeader *hdr;
2593 :
2594 : /*
2595 : * We are not expecting collisions of GXACTs (same gid) between
2596 : * publisher and subscribers, so we perform all I/O while holding
2597 : * TwoPhaseStateLock for simplicity.
2598 : *
2599 : * To move the I/O out of the lock, we need to ensure that no
2600 : * other backend commits the prepared xact in the meantime. We can
2601 : * do this optimization if we encounter many collisions in GID
634 akapila 2602 ECB : * between publisher and subscriber.
634 akapila 2603 EUB : */
634 akapila 2604 GIC 5 : if (gxact->ondisk)
634 akapila 2605 UIC 0 : buf = ReadTwoPhaseFile(gxact->xid, false);
634 akapila 2606 ECB : else
2607 : {
634 akapila 2608 GIC 5 : Assert(gxact->prepare_start_lsn);
2609 5 : XlogReadTwoPhaseData(gxact->prepare_start_lsn, &buf, NULL);
634 akapila 2610 ECB : }
2611 :
634 akapila 2612 CBC 5 : hdr = (TwoPhaseFileHeader *) buf;
634 akapila 2613 ECB :
634 akapila 2614 GIC 5 : if (hdr->origin_lsn == prepare_end_lsn &&
634 akapila 2615 CBC 5 : hdr->origin_timestamp == origin_prepare_timestamp)
634 akapila 2616 ECB : {
634 akapila 2617 CBC 5 : found = true;
634 akapila 2618 GIC 5 : pfree(buf);
2619 5 : break;
634 akapila 2620 EUB : }
2621 :
634 akapila 2622 UIC 0 : pfree(buf);
634 akapila 2623 ECB : }
2624 : }
634 akapila 2625 GIC 5 : LWLockRelease(TwoPhaseStateLock);
2626 5 : return found;
2627 : }
|