Age Owner TLA Line data Source code
1 : /*-------------------------------------------------------------------------
2 : *
3 : * commit_ts.c
4 : * PostgreSQL commit timestamp manager
5 : *
6 : * This module is a pg_xact-like system that stores the commit timestamp
7 : * for each transaction.
8 : *
9 : * XLOG interactions: this module generates an XLOG record whenever a new
10 : * CommitTs page is initialized to zeroes. Other writes of CommitTS come
11 : * from recording of transaction commit in xact.c, which generates its own
12 : * XLOG records for these events and will re-perform the status update on
13 : * redo; so we need make no additional XLOG entry here.
14 : *
15 : * Portions Copyright (c) 1996-2023, PostgreSQL Global Development Group
16 : * Portions Copyright (c) 1994, Regents of the University of California
17 : *
18 : * src/backend/access/transam/commit_ts.c
19 : *
20 : *-------------------------------------------------------------------------
21 : */
22 : #include "postgres.h"
23 :
24 : #include "access/commit_ts.h"
25 : #include "access/htup_details.h"
26 : #include "access/slru.h"
27 : #include "access/transam.h"
28 : #include "access/xloginsert.h"
29 : #include "access/xlogutils.h"
30 : #include "catalog/pg_type.h"
31 : #include "funcapi.h"
32 : #include "miscadmin.h"
33 : #include "pg_trace.h"
34 : #include "storage/shmem.h"
35 : #include "utils/builtins.h"
36 : #include "utils/snapmgr.h"
37 : #include "utils/timestamp.h"
38 :
39 : /*
40 : * Defines for CommitTs page sizes. A page is the same BLCKSZ as is used
41 : * everywhere else in Postgres.
42 : *
43 : * Note: because TransactionIds are 32 bits and wrap around at 0xFFFFFFFF,
44 : * CommitTs page numbering also wraps around at
45 : * 0xFFFFFFFF/COMMIT_TS_XACTS_PER_PAGE, and CommitTs segment numbering at
46 : * 0xFFFFFFFF/COMMIT_TS_XACTS_PER_PAGE/SLRU_PAGES_PER_SEGMENT. We need take no
47 : * explicit notice of that fact in this module, except when comparing segment
48 : * and page numbers in TruncateCommitTs (see CommitTsPagePrecedes).
49 : */
50 :
51 : /*
52 : * We need 8+2 bytes per xact. Note that enlarging this struct might mean
53 : * the largest possible file name is more than 5 chars long; see
54 : * SlruScanDirectory.
55 : */
56 : typedef struct CommitTimestampEntry
57 : {
58 : TimestampTz time;
59 : RepOriginId nodeid;
60 : } CommitTimestampEntry;
61 :
62 : #define SizeOfCommitTimestampEntry (offsetof(CommitTimestampEntry, nodeid) + \
63 : sizeof(RepOriginId))
64 :
65 : #define COMMIT_TS_XACTS_PER_PAGE \
66 : (BLCKSZ / SizeOfCommitTimestampEntry)
67 :
68 : #define TransactionIdToCTsPage(xid) \
69 : ((xid) / (TransactionId) COMMIT_TS_XACTS_PER_PAGE)
70 : #define TransactionIdToCTsEntry(xid) \
71 : ((xid) % (TransactionId) COMMIT_TS_XACTS_PER_PAGE)
72 :
73 : /*
74 : * Link to shared-memory data structures for CommitTs control
75 : */
76 : static SlruCtlData CommitTsCtlData;
77 :
78 : #define CommitTsCtl (&CommitTsCtlData)
79 :
80 : /*
81 : * We keep a cache of the last value set in shared memory.
82 : *
83 : * This is also good place to keep the activation status. We keep this
84 : * separate from the GUC so that the standby can activate the module if the
85 : * primary has it active independently of the value of the GUC.
86 : *
87 : * This is protected by CommitTsLock. In some places, we use commitTsActive
88 : * without acquiring the lock; where this happens, a comment explains the
89 : * rationale for it.
90 : */
91 : typedef struct CommitTimestampShared
92 : {
93 : TransactionId xidLastCommit;
94 : CommitTimestampEntry dataLastCommit;
95 : bool commitTsActive;
96 : } CommitTimestampShared;
97 :
98 : static CommitTimestampShared *commitTsShared;
99 :
100 :
101 : /* GUC variable */
102 : bool track_commit_timestamp;
103 :
104 : static void SetXidCommitTsInPage(TransactionId xid, int nsubxids,
105 : TransactionId *subxids, TimestampTz ts,
106 : RepOriginId nodeid, int pageno);
107 : static void TransactionIdSetCommitTs(TransactionId xid, TimestampTz ts,
108 : RepOriginId nodeid, int slotno);
109 : static void error_commit_ts_disabled(void);
110 : static int ZeroCommitTsPage(int pageno, bool writeXlog);
111 : static bool CommitTsPagePrecedes(int page1, int page2);
112 : static void ActivateCommitTs(void);
113 : static void DeactivateCommitTs(void);
114 : static void WriteZeroPageXlogRec(int pageno);
115 : static void WriteTruncateXlogRec(int pageno, TransactionId oldestXid);
116 :
117 : /*
118 : * TransactionTreeSetCommitTsData
119 : *
120 : * Record the final commit timestamp of transaction entries in the commit log
121 : * for a transaction and its subtransaction tree, as efficiently as possible.
122 : *
123 : * xid is the top level transaction id.
124 : *
125 : * subxids is an array of xids of length nsubxids, representing subtransactions
126 : * in the tree of xid. In various cases nsubxids may be zero.
127 : * The reason why tracking just the parent xid commit timestamp is not enough
128 : * is that the subtrans SLRU does not stay valid across crashes (it's not
129 : * permanent) so we need to keep the information about them here. If the
130 : * subtrans implementation changes in the future, we might want to revisit the
3049 alvherre 131 ECB : * decision of storing timestamp info for each subxid.
132 : */
133 : void
3049 alvherre 134 GIC 312745 : TransactionTreeSetCommitTsData(TransactionId xid, int nsubxids,
135 : TransactionId *subxids, TimestampTz timestamp,
136 : RepOriginId nodeid)
137 : {
138 : int i;
139 : TransactionId headxid;
140 : TransactionId newestXact;
141 :
142 : /*
143 : * No-op if the module is not active.
144 : *
145 : * An unlocked read here is fine, because in a standby (the only place
146 : * where the flag can change in flight) this routine is only called by the
2495 rhaas 147 ECB : * recovery process, which is also the only process which can change the
148 : * flag.
149 : */
2721 alvherre 150 GIC 312745 : if (!commitTsShared->commitTsActive)
3049 151 312697 : return;
152 :
153 : /*
3049 alvherre 154 ECB : * Figure out the latest Xid in this batch: either the last subxid if
3049 alvherre 155 EUB : * there's any, otherwise the parent xid.
156 : */
3049 alvherre 157 CBC 48 : if (nsubxids > 0)
3049 alvherre 158 UIC 0 : newestXact = subxids[nsubxids - 1];
159 : else
3049 alvherre 160 GIC 48 : newestXact = xid;
161 :
162 : /*
163 : * We split the xids to set the timestamp to in groups belonging to the
164 : * same SLRU page; the first element in each such set is its head. The
165 : * first group has the main XID as the head; subsequent sets use the first
2878 bruce 166 ECB : * subxid not on the previous page as head. This way, we only have to
167 : * lock/modify each SLRU page once.
168 : */
443 michael 169 GBC 48 : headxid = xid;
443 michael 170 CBC 48 : i = 0;
171 : for (;;)
3049 alvherre 172 UIC 0 : {
3049 alvherre 173 CBC 48 : int pageno = TransactionIdToCTsPage(headxid);
174 : int j;
3049 alvherre 175 EUB :
3049 alvherre 176 GBC 48 : for (j = i; j < nsubxids; j++)
177 : {
3049 alvherre 178 UIC 0 : if (TransactionIdToCTsPage(subxids[j]) != pageno)
179 0 : break;
3049 alvherre 180 ECB : }
181 : /* subxids[i..j] are on the same page as the head */
182 :
3049 alvherre 183 GIC 48 : SetXidCommitTsInPage(headxid, j - i, subxids + i, timestamp, nodeid,
3049 alvherre 184 ECB : pageno);
185 :
186 : /* if we wrote out all subxids, we're done. */
443 michael 187 GIC 48 : if (j >= nsubxids)
3049 alvherre 188 48 : break;
189 :
190 : /*
2878 bruce 191 EUB : * Set the new head and skip over it, as well as over the subxids we
192 : * just wrote.
193 : */
3049 alvherre 194 UIC 0 : headxid = subxids[j];
443 michael 195 0 : i = j + 1;
3049 alvherre 196 ECB : }
197 :
198 : /* update the cached value in shared memory */
3049 alvherre 199 CBC 48 : LWLockAcquire(CommitTsLock, LW_EXCLUSIVE);
3049 alvherre 200 GIC 48 : commitTsShared->xidLastCommit = xid;
201 48 : commitTsShared->dataLastCommit.time = timestamp;
3049 alvherre 202 CBC 48 : commitTsShared->dataLastCommit.nodeid = nodeid;
3049 alvherre 203 ECB :
204 : /* and move forwards our endpoint, if needed */
2659 mail 205 GIC 48 : if (TransactionIdPrecedes(ShmemVariableCache->newestCommitTsXid, newestXact))
206 39 : ShmemVariableCache->newestCommitTsXid = newestXact;
3049 alvherre 207 48 : LWLockRelease(CommitTsLock);
208 : }
209 :
210 : /*
211 : * Record the commit timestamp of transaction entries in the commit log for all
3049 alvherre 212 ECB : * entries on a single page. Atomic only on this page.
213 : */
214 : static void
3049 alvherre 215 GIC 48 : SetXidCommitTsInPage(TransactionId xid, int nsubxids,
216 : TransactionId *subxids, TimestampTz ts,
217 : RepOriginId nodeid, int pageno)
218 : {
3049 alvherre 219 ECB : int slotno;
220 : int i;
221 :
1059 tgl 222 GIC 48 : LWLockAcquire(CommitTsSLRULock, LW_EXCLUSIVE);
3049 alvherre 223 ECB :
3049 alvherre 224 CBC 48 : slotno = SimpleLruReadPage(CommitTsCtl, pageno, true, xid);
3049 alvherre 225 EUB :
3049 alvherre 226 GIC 48 : TransactionIdSetCommitTs(xid, ts, nodeid, slotno);
3049 alvherre 227 CBC 48 : for (i = 0; i < nsubxids; i++)
3049 alvherre 228 UIC 0 : TransactionIdSetCommitTs(subxids[i], ts, nodeid, slotno);
3049 alvherre 229 ECB :
3049 alvherre 230 CBC 48 : CommitTsCtl->shared->page_dirty[slotno] = true;
231 :
1059 tgl 232 GIC 48 : LWLockRelease(CommitTsSLRULock);
3049 alvherre 233 48 : }
234 :
235 : /*
236 : * Sets the commit timestamp of a single transaction.
237 : *
1059 tgl 238 ECB : * Must be called with CommitTsSLRULock held
239 : */
240 : static void
3049 alvherre 241 CBC 48 : TransactionIdSetCommitTs(TransactionId xid, TimestampTz ts,
242 : RepOriginId nodeid, int slotno)
243 : {
244 48 : int entryno = TransactionIdToCTsEntry(xid);
245 : CommitTimestampEntry entry;
3049 alvherre 246 ECB :
3049 alvherre 247 CBC 48 : Assert(TransactionIdIsNormal(xid));
248 :
249 48 : entry.time = ts;
250 48 : entry.nodeid = nodeid;
251 :
252 48 : memcpy(CommitTsCtl->shared->page_buffer[slotno] +
3049 alvherre 253 GIC 48 : SizeOfCommitTimestampEntry * entryno,
254 : &entry, SizeOfCommitTimestampEntry);
255 48 : }
256 :
257 : /*
258 : * Interrogate the commit timestamp of a transaction.
259 : *
260 : * The return value indicates whether a commit timestamp record was found for
261 : * the given xid. The timestamp value is returned in *ts (which may not be
262 : * null), and the origin node for the Xid is returned in *nodeid, if it's not
2788 alvherre 263 ECB : * null.
264 : */
265 : bool
3049 alvherre 266 CBC 31 : TransactionIdGetCommitTsData(TransactionId xid, TimestampTz *ts,
2902 andres 267 ECB : RepOriginId *nodeid)
268 : {
3049 alvherre 269 GIC 31 : int pageno = TransactionIdToCTsPage(xid);
270 31 : int entryno = TransactionIdToCTsEntry(xid);
271 : int slotno;
272 : CommitTimestampEntry entry;
2659 mail 273 ECB : TransactionId oldestCommitTsXid;
274 : TransactionId newestCommitTsXid;
275 :
2327 alvherre 276 GIC 31 : if (!TransactionIdIsValid(xid))
2721 alvherre 277 CBC 3 : ereport(ERROR,
278 : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
279 : errmsg("cannot retrieve commit timestamp for transaction %u", xid)));
2327 280 28 : else if (!TransactionIdIsNormal(xid))
2327 alvherre 281 ECB : {
282 : /* frozen and bootstrap xids are always committed far in the past */
2327 alvherre 283 CBC 6 : *ts = 0;
2327 alvherre 284 GIC 6 : if (nodeid)
285 2 : *nodeid = 0;
2327 alvherre 286 CBC 6 : return false;
287 : }
288 :
2721 289 22 : LWLockAcquire(CommitTsLock, LW_SHARED);
2721 alvherre 290 ECB :
291 : /* Error if module not enabled */
2721 alvherre 292 GIC 22 : if (!commitTsShared->commitTsActive)
2684 293 3 : error_commit_ts_disabled();
294 :
295 : /*
2721 alvherre 296 ECB : * If we're asked for the cached value, return that. Otherwise, fall
297 : * through to read from SLRU.
3049 298 : */
2721 alvherre 299 CBC 19 : if (commitTsShared->xidLastCommit == xid)
2721 alvherre 300 ECB : {
2721 alvherre 301 GIC 9 : *ts = commitTsShared->dataLastCommit.time;
2721 alvherre 302 CBC 9 : if (nodeid)
303 2 : *nodeid = commitTsShared->dataLastCommit.nodeid;
304 :
2721 alvherre 305 GIC 9 : LWLockRelease(CommitTsLock);
2721 alvherre 306 CBC 9 : return *ts != 0;
2721 alvherre 307 ECB : }
308 :
2659 mail 309 CBC 10 : oldestCommitTsXid = ShmemVariableCache->oldestCommitTsXid;
310 10 : newestCommitTsXid = ShmemVariableCache->newestCommitTsXid;
311 : /* neither is invalid, or both are */
2659 mail 312 GIC 10 : Assert(TransactionIdIsValid(oldestCommitTsXid) == TransactionIdIsValid(newestCommitTsXid));
3049 alvherre 313 10 : LWLockRelease(CommitTsLock);
314 :
2721 alvherre 315 ECB : /*
316 : * Return empty if the requested value is outside our valid range.
317 : */
2659 mail 318 GIC 20 : if (!TransactionIdIsValid(oldestCommitTsXid) ||
2659 mail 319 CBC 17 : TransactionIdPrecedes(xid, oldestCommitTsXid) ||
320 7 : TransactionIdPrecedes(newestCommitTsXid, xid))
3049 alvherre 321 EUB : {
2788 alvherre 322 CBC 3 : *ts = 0;
3049 alvherre 323 GIC 3 : if (nodeid)
2902 andres 324 UIC 0 : *nodeid = InvalidRepOriginId;
3049 alvherre 325 GIC 3 : return false;
3049 alvherre 326 ECB : }
327 :
328 : /* lock is acquired by SimpleLruReadPage_ReadOnly */
3049 alvherre 329 CBC 7 : slotno = SimpleLruReadPage_ReadOnly(CommitTsCtl, pageno, xid);
3049 alvherre 330 GIC 7 : memcpy(&entry,
331 7 : CommitTsCtl->shared->page_buffer[slotno] +
3049 alvherre 332 CBC 7 : SizeOfCommitTimestampEntry * entryno,
3049 alvherre 333 ECB : SizeOfCommitTimestampEntry);
3049 alvherre 334 EUB :
2788 alvherre 335 GIC 7 : *ts = entry.time;
3049 alvherre 336 CBC 7 : if (nodeid)
3049 alvherre 337 LBC 0 : *nodeid = entry.nodeid;
338 :
1059 tgl 339 GIC 7 : LWLockRelease(CommitTsSLRULock);
3049 alvherre 340 7 : return *ts != 0;
341 : }
342 :
343 : /*
344 : * Return the Xid of the latest committed transaction. (As far as this module
345 : * is concerned, anyway; it's up to the caller to ensure the value is useful
346 : * for its purposes.)
347 : *
348 : * ts and nodeid are filled with the corresponding data; they can be passed
3049 alvherre 349 ECB : * as NULL if not wanted.
350 : */
351 : TransactionId
2902 andres 352 GIC 3 : GetLatestCommitTsData(TimestampTz *ts, RepOriginId *nodeid)
3049 alvherre 353 ECB : {
354 : TransactionId xid;
355 :
2721 alvherre 356 CBC 3 : LWLockAcquire(CommitTsLock, LW_SHARED);
2721 alvherre 357 EUB :
358 : /* Error if module not enabled */
2721 alvherre 359 CBC 3 : if (!commitTsShared->commitTsActive)
2684 alvherre 360 LBC 0 : error_commit_ts_disabled();
3049 alvherre 361 ECB :
3049 alvherre 362 CBC 3 : xid = commitTsShared->xidLastCommit;
363 3 : if (ts)
364 3 : *ts = commitTsShared->dataLastCommit.time;
3049 alvherre 365 GIC 3 : if (nodeid)
3049 alvherre 366 CBC 3 : *nodeid = commitTsShared->dataLastCommit.nodeid;
3049 alvherre 367 GIC 3 : LWLockRelease(CommitTsLock);
368 :
369 3 : return xid;
3049 alvherre 370 ECB : }
371 :
2684 372 : static void
2684 alvherre 373 GIC 3 : error_commit_ts_disabled(void)
374 : {
375 3 : ereport(ERROR,
376 : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
377 : errmsg("could not get commit timestamp data"),
378 : RecoveryInProgress() ?
379 : errhint("Make sure the configuration parameter \"%s\" is set on the primary server.",
380 : "track_commit_timestamp") :
381 : errhint("Make sure the configuration parameter \"%s\" is set.",
382 : "track_commit_timestamp")));
383 : }
384 :
385 : /*
3049 alvherre 386 ECB : * SQL-callable wrapper to obtain commit time of a transaction
387 : */
388 : Datum
3049 alvherre 389 GIC 26 : pg_xact_commit_timestamp(PG_FUNCTION_ARGS)
390 : {
888 peter 391 26 : TransactionId xid = PG_GETARG_TRANSACTIONID(0);
2878 bruce 392 ECB : TimestampTz ts;
393 : bool found;
3049 alvherre 394 :
3049 alvherre 395 CBC 26 : found = TransactionIdGetCommitTsData(xid, &ts, NULL);
396 :
397 21 : if (!found)
3049 alvherre 398 GIC 7 : PG_RETURN_NULL();
399 :
400 14 : PG_RETURN_TIMESTAMPTZ(ts);
401 : }
402 :
403 :
404 : /*
405 : * pg_last_committed_xact
406 : *
407 : * SQL-callable wrapper to obtain some information about the latest
408 : * committed transaction: transaction ID, timestamp and replication
1001 michael 409 ECB : * origin.
410 : */
411 : Datum
3049 alvherre 412 GIC 3 : pg_last_committed_xact(PG_FUNCTION_ARGS)
413 : {
414 : TransactionId xid;
415 : RepOriginId nodeid;
416 : TimestampTz ts;
417 : Datum values[3];
418 : bool nulls[3];
419 : TupleDesc tupdesc;
3049 alvherre 420 ECB : HeapTuple htup;
421 :
422 : /* and construct a tuple with our data */
1001 michael 423 GBC 3 : xid = GetLatestCommitTsData(&ts, &nodeid);
424 :
109 michael 425 GNC 3 : if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE)
109 michael 426 UNC 0 : elog(ERROR, "return type must be a row type");
3049 alvherre 427 ECB :
3049 alvherre 428 CBC 3 : if (!TransactionIdIsNormal(xid))
429 : {
3049 alvherre 430 UIC 0 : memset(nulls, true, sizeof(nulls));
3049 alvherre 431 ECB : }
432 : else
433 : {
3049 alvherre 434 GIC 3 : values[0] = TransactionIdGetDatum(xid);
435 3 : nulls[0] = false;
436 :
437 3 : values[1] = TimestampTzGetDatum(ts);
438 3 : nulls[1] = false;
439 :
1001 michael 440 3 : values[2] = ObjectIdGetDatum((Oid) nodeid);
441 3 : nulls[2] = false;
442 : }
3049 alvherre 443 ECB :
3049 alvherre 444 GIC 3 : htup = heap_form_tuple(tupdesc, values, nulls);
3049 alvherre 445 ECB :
3049 alvherre 446 GIC 3 : PG_RETURN_DATUM(HeapTupleGetDatum(htup));
447 : }
448 :
449 : /*
450 : * pg_xact_commit_timestamp_origin
451 : *
452 : * SQL-callable wrapper to obtain commit timestamp and replication origin
453 : * of a given transaction.
1001 michael 454 ECB : */
455 : Datum
1001 michael 456 CBC 5 : pg_xact_commit_timestamp_origin(PG_FUNCTION_ARGS)
1001 michael 457 EUB : {
888 peter 458 GIC 5 : TransactionId xid = PG_GETARG_TRANSACTIONID(0);
1001 michael 459 ECB : RepOriginId nodeid;
460 : TimestampTz ts;
461 : Datum values[2];
462 : bool nulls[2];
463 : TupleDesc tupdesc;
464 : HeapTuple htup;
465 : bool found;
466 :
1001 michael 467 GIC 5 : found = TransactionIdGetCommitTsData(xid, &ts, &nodeid);
1001 michael 468 ECB :
109 michael 469 GNC 4 : if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE)
109 michael 470 UNC 0 : elog(ERROR, "return type must be a row type");
471 :
1001 michael 472 GIC 4 : if (!found)
473 : {
474 2 : memset(nulls, true, sizeof(nulls));
475 : }
476 : else
1001 michael 477 ECB : {
1001 michael 478 GIC 2 : values[0] = TimestampTzGetDatum(ts);
1001 michael 479 CBC 2 : nulls[0] = false;
480 :
1001 michael 481 GIC 2 : values[1] = ObjectIdGetDatum((Oid) nodeid);
482 2 : nulls[1] = false;
483 : }
484 :
485 4 : htup = heap_form_tuple(tupdesc, values, nulls);
1001 michael 486 ECB :
1001 michael 487 GIC 4 : PG_RETURN_DATUM(HeapTupleGetDatum(htup));
1001 michael 488 ECB : }
489 :
490 : /*
491 : * Number of shared CommitTS buffers.
492 : *
493 : * We use a very similar logic as for the number of CLOG buffers (except we
494 : * scale up twice as fast with shared buffers, and the maximum is twice as
495 : * high); see comments in CLOGShmemBuffers.
496 : */
3049 alvherre 497 : Size
3049 alvherre 498 GIC 4564 : CommitTsShmemBuffers(void)
499 : {
495 500 4564 : return Min(256, Max(4, NBuffers / 256));
3049 alvherre 501 ECB : }
502 :
503 : /*
504 : * Shared memory sizing for CommitTs
505 : */
506 : Size
3049 alvherre 507 GIC 2738 : CommitTsShmemSize(void)
3049 alvherre 508 ECB : {
3049 alvherre 509 GIC 2738 : return SimpleLruShmemSize(CommitTsShmemBuffers(), 0) +
510 : sizeof(CommitTimestampShared);
511 : }
3049 alvherre 512 ECB :
513 : /*
514 : * Initialize CommitTs at system startup (postmaster start or standalone
515 : * backend)
516 : */
517 : void
3049 alvherre 518 CBC 1826 : CommitTsShmemInit(void)
3049 alvherre 519 ECB : {
520 : bool found;
521 :
3049 alvherre 522 GBC 1826 : CommitTsCtl->PagePrecedes = CommitTsPagePrecedes;
1059 tgl 523 CBC 1826 : SimpleLruInit(CommitTsCtl, "CommitTs", CommitTsShmemBuffers(), 0,
1059 tgl 524 GIC 1826 : CommitTsSLRULock, "pg_commit_ts",
525 : LWTRANCHE_COMMITTS_BUFFER,
526 : SYNC_HANDLER_COMMIT_TS);
813 noah 527 1826 : SlruPagePrecedesUnitTests(CommitTsCtl, COMMIT_TS_XACTS_PER_PAGE);
528 :
3049 alvherre 529 1826 : commitTsShared = ShmemInitStruct("CommitTs shared",
530 : sizeof(CommitTimestampShared),
531 : &found);
3049 alvherre 532 ECB :
3049 alvherre 533 GIC 1826 : if (!IsUnderPostmaster)
534 : {
535 1826 : Assert(!found);
536 :
537 1826 : commitTsShared->xidLastCommit = InvalidTransactionId;
538 1826 : TIMESTAMP_NOBEGIN(commitTsShared->dataLastCommit.time);
2902 andres 539 CBC 1826 : commitTsShared->dataLastCommit.nodeid = InvalidRepOriginId;
2721 alvherre 540 GIC 1826 : commitTsShared->commitTsActive = false;
541 : }
542 : else
3049 alvherre 543 UIC 0 : Assert(found);
3049 alvherre 544 GIC 1826 : }
545 :
546 : /*
547 : * This function must be called ONCE on system install.
548 : *
549 : * (The CommitTs directory is assumed to have been created by initdb, and
550 : * CommitTsShmemInit must have been called already.)
3049 alvherre 551 ECB : */
552 : void
3049 alvherre 553 GIC 305 : BootStrapCommitTs(void)
554 : {
3049 alvherre 555 ECB : /*
556 : * Nothing to do here at present, unlike most other SLRU modules; segments
2878 bruce 557 : * are created when the server is started with this module enabled. See
2684 alvherre 558 EUB : * ActivateCommitTs.
559 : */
3049 alvherre 560 CBC 305 : }
561 :
562 : /*
563 : * Initialize (or reinitialize) a page of CommitTs to zeroes.
564 : * If writeXlog is true, also emit an XLOG record saying we did this.
565 : *
566 : * The page is not actually written, just set up in shared memory.
567 : * The slot number of the new page is returned.
3049 alvherre 568 ECB : *
569 : * Control lock must be held at entry, and will be held at exit.
570 : */
571 : static int
3049 alvherre 572 GIC 7 : ZeroCommitTsPage(int pageno, bool writeXlog)
573 : {
574 : int slotno;
575 :
576 7 : slotno = SimpleLruZeroPage(CommitTsCtl, pageno);
577 :
3049 alvherre 578 CBC 7 : if (writeXlog)
3049 alvherre 579 UIC 0 : WriteZeroPageXlogRec(pageno);
580 :
3049 alvherre 581 GIC 7 : return slotno;
582 : }
583 :
584 : /*
585 : * This must be called ONCE during postmaster or standalone-backend startup,
586 : * after StartupXLOG has initialized ShmemVariableCache->nextXid.
587 : */
588 : void
2676 alvherre 589 CBC 8 : StartupCommitTs(void)
3049 alvherre 590 ECB : {
2676 alvherre 591 GIC 8 : ActivateCommitTs();
3049 alvherre 592 CBC 8 : }
3049 alvherre 593 ECB :
594 : /*
595 : * This must be called ONCE during postmaster or standalone-backend startup,
596 : * after recovery has finished.
597 : */
598 : void
2953 alvherre 599 GIC 1142 : CompleteCommitTsInitialization(void)
2953 alvherre 600 ECB : {
601 : /*
602 : * If the feature is not enabled, turn it off for good. This also removes
603 : * any leftover data.
604 : *
605 : * Conversely, we activate the module if the feature is enabled. This is
606 : * necessary for primary and standby as the activation depends on the
607 : * control file contents at the beginning of recovery or when a
608 : * XLOG_PARAMETER_CHANGE is replayed.
609 : */
2953 alvherre 610 GIC 1142 : if (!track_commit_timestamp)
2721 611 1131 : DeactivateCommitTs();
612 : else
2676 613 11 : ActivateCommitTs();
2953 614 1142 : }
2953 alvherre 615 ECB :
616 : /*
2747 617 : * Activate or deactivate CommitTs' upon reception of a XLOG_PARAMETER_CHANGE
1656 michael 618 EUB : * XLog record during recovery.
619 : */
2747 alvherre 620 ECB : void
2747 alvherre 621 CBC 19 : CommitTsParameterChange(bool newvalue, bool oldvalue)
2747 alvherre 622 ECB : {
623 : /*
624 : * If the commit_ts module is disabled in this server and we get word from
625 : * the primary server that it is enabled there, activate it so that we can
626 : * replay future WAL records involving it; also mark it as active on
627 : * pg_control. If the old value was already set, we already did this, so
628 : * don't do anything.
629 : *
630 : * If the module is disabled in the primary, disable it here too, unless
631 : * the module is enabled locally.
632 : *
633 : * Note this only runs in the recovery process, so an unlocked read is
634 : * fine.
635 : */
2747 alvherre 636 GIC 19 : if (newvalue)
637 : {
2721 638 2 : if (!commitTsShared->commitTsActive)
2747 alvherre 639 UIC 0 : ActivateCommitTs();
640 : }
2721 alvherre 641 CBC 17 : else if (commitTsShared->commitTsActive)
2721 alvherre 642 GIC 1 : DeactivateCommitTs();
2747 643 19 : }
644 :
645 : /*
646 : * Activate this module whenever necessary.
2253 heikki.linnakangas 647 ECB : * This must happen during postmaster or standalone-backend startup,
2878 bruce 648 : * or during WAL replay anytime the track_commit_timestamp setting is
649 : * changed in the primary.
2953 alvherre 650 : *
651 : * The reason why this SLRU needs separate activation/deactivation functions is
652 : * that it can be enabled/disabled during start and the activation/deactivation
1029 andres 653 : * on the primary is propagated to the standby via replay. Other SLRUs don't
654 : * have this property and they can be just initialized during normal startup.
3049 alvherre 655 : *
656 : * This is in charge of creating the currently active segment, if it's not
657 : * already there. The reason for this is that the server might have been
658 : * running with this module disabled for a while and thus might have skipped
659 : * the normal creation point.
660 : */
2747 661 : static void
2953 alvherre 662 CBC 19 : ActivateCommitTs(void)
3049 alvherre 663 ECB : {
664 : TransactionId xid;
665 : int pageno;
666 :
667 : /* If we've done this already, there's nothing to do */
2676 alvherre 668 GIC 19 : LWLockAcquire(CommitTsLock, LW_EXCLUSIVE);
669 19 : if (commitTsShared->commitTsActive)
670 : {
671 3 : LWLockRelease(CommitTsLock);
672 3 : return;
673 : }
674 16 : LWLockRelease(CommitTsLock);
675 :
971 andres 676 16 : xid = XidFromFullTransactionId(ShmemVariableCache->nextXid);
2676 alvherre 677 16 : pageno = TransactionIdToCTsPage(xid);
3049 alvherre 678 ECB :
679 : /*
680 : * Re-Initialize our idea of the latest page number.
681 : */
1059 tgl 682 CBC 16 : LWLockAcquire(CommitTsSLRULock, LW_EXCLUSIVE);
3049 alvherre 683 GIC 16 : CommitTsCtl->shared->latest_page_number = pageno;
1059 tgl 684 CBC 16 : LWLockRelease(CommitTsSLRULock);
685 :
686 : /*
3049 alvherre 687 ECB : * If CommitTs is enabled, but it wasn't in the previous server run, we
688 : * need to set the oldest and newest values to the next Xid; that way, we
689 : * will not try to read data that might not have been set.
690 : *
691 : * XXX does this have a problem if a server is started with commitTs
692 : * enabled, then started with commitTs disabled, then restarted with it
693 : * enabled again? It doesn't look like it does, because there should be a
694 : * checkpoint that sets the value to InvalidTransactionId at end of
695 : * recovery; and so any chance of injecting new transactions without
696 : * CommitTs values would occur after the oldestCommitTsXid has been set to
697 : * Invalid temporarily.
698 : */
3049 alvherre 699 CBC 16 : LWLockAcquire(CommitTsLock, LW_EXCLUSIVE);
2659 mail 700 16 : if (ShmemVariableCache->oldestCommitTsXid == InvalidTransactionId)
3049 alvherre 701 ECB : {
2659 mail 702 GIC 10 : ShmemVariableCache->oldestCommitTsXid =
783 tmunro 703 10 : ShmemVariableCache->newestCommitTsXid = ReadNextTransactionId();
704 : }
3049 alvherre 705 16 : LWLockRelease(CommitTsLock);
706 :
707 : /* Create the current segment file, if necessary */
708 16 : if (!SimpleLruDoesPhysicalPageExist(CommitTsCtl, pageno))
709 : {
710 : int slotno;
711 :
1059 tgl 712 7 : LWLockAcquire(CommitTsSLRULock, LW_EXCLUSIVE);
3049 alvherre 713 7 : slotno = ZeroCommitTsPage(pageno, false);
714 7 : SimpleLruWritePage(CommitTsCtl, slotno);
3049 alvherre 715 CBC 7 : Assert(!CommitTsCtl->shared->page_dirty[slotno]);
1059 tgl 716 GIC 7 : LWLockRelease(CommitTsSLRULock);
717 : }
718 :
719 : /* Change the activation status in shared memory. */
2721 alvherre 720 16 : LWLockAcquire(CommitTsLock, LW_EXCLUSIVE);
721 16 : commitTsShared->commitTsActive = true;
722 16 : LWLockRelease(CommitTsLock);
723 : }
3049 alvherre 724 ECB :
725 : /*
2953 726 : * Deactivate this module.
727 : *
728 : * This must be called when the track_commit_timestamp parameter is turned off.
729 : * This happens during postmaster or standalone-backend startup, or during WAL
730 : * replay.
731 : *
732 : * Resets CommitTs into invalid state to make sure we don't hand back
733 : * possibly-invalid data; also removes segments of old data.
734 : */
735 : static void
2721 alvherre 736 GIC 1132 : DeactivateCommitTs(void)
737 : {
738 : /*
739 : * Cleanup the status in the shared memory.
740 : *
741 : * We reset everything in the commitTsShared record to prevent user from
742 : * getting confusing data about last committed transaction on the standby
743 : * when the module was activated repeatedly on the primary.
2953 alvherre 744 ECB : */
2953 alvherre 745 CBC 1132 : LWLockAcquire(CommitTsLock, LW_EXCLUSIVE);
2721 alvherre 746 ECB :
2721 alvherre 747 CBC 1132 : commitTsShared->commitTsActive = false;
2721 alvherre 748 GIC 1132 : commitTsShared->xidLastCommit = InvalidTransactionId;
749 1132 : TIMESTAMP_NOBEGIN(commitTsShared->dataLastCommit.time);
750 1132 : commitTsShared->dataLastCommit.nodeid = InvalidRepOriginId;
751 :
2659 mail 752 1132 : ShmemVariableCache->oldestCommitTsXid = InvalidTransactionId;
2659 mail 753 CBC 1132 : ShmemVariableCache->newestCommitTsXid = InvalidTransactionId;
754 :
2953 alvherre 755 GIC 1132 : LWLockRelease(CommitTsLock);
756 :
757 : /*
758 : * Remove *all* files. This is necessary so that there are no leftover
759 : * files; in the case where this feature is later enabled after running
2747 alvherre 760 ECB : * with it disabled for some time there may be a gap in the file sequence.
761 : * (We can probably tolerate out-of-sequence files, as they are going to
762 : * be overwritten anyway when we wrap around, but it seems better to be
763 : * tidy.)
764 : */
1059 tgl 765 GIC 1132 : LWLockAcquire(CommitTsSLRULock, LW_EXCLUSIVE);
2747 alvherre 766 1132 : (void) SlruScanDirectory(CommitTsCtl, SlruScanDirCbDeleteAll, NULL);
1059 tgl 767 1132 : LWLockRelease(CommitTsSLRULock);
2953 alvherre 768 1132 : }
769 :
770 : /*
771 : * Perform a checkpoint --- either during shutdown, or on-the-fly
772 : */
773 : void
3049 774 2363 : CheckPointCommitTs(void)
3049 alvherre 775 ECB : {
776 : /*
777 : * Write dirty CommitTs pages to disk. This may result in sync requests
778 : * queued for later handling by ProcessSyncRequests(), as part of the
779 : * checkpoint.
780 : */
926 tmunro 781 GIC 2363 : SimpleLruWriteAll(CommitTsCtl, true);
3049 alvherre 782 2363 : }
783 :
3049 alvherre 784 ECB : /*
785 : * Make sure that CommitTs has room for a newly-allocated XID.
786 : *
787 : * NB: this is called while holding XidGenLock. We want it to be very fast
788 : * most of the time; even when it's not so fast, no actual I/O need happen
789 : * unless we're forced to write out a dirty CommitTs or xlog page to make room
790 : * in shared memory.
791 : *
792 : * NB: the current implementation relies on track_commit_timestamp being
793 : * PGC_POSTMASTER.
794 : */
795 : void
3049 alvherre 796 GBC 301130 : ExtendCommitTs(TransactionId newestXact)
797 : {
3049 alvherre 798 EUB : int pageno;
799 :
800 : /*
2495 rhaas 801 : * Nothing to do if module not enabled. Note we do an unlocked read of
802 : * the flag here, which is okay because this routine is only called from
2721 alvherre 803 : * GetNewTransactionId, which is never called in a standby.
804 : */
2721 alvherre 805 GIC 301130 : Assert(!InRecovery);
806 301130 : if (!commitTsShared->commitTsActive)
3049 807 301095 : return;
808 :
809 : /*
810 : * No work except at first XID of a page. But beware: just after
811 : * wraparound, the first XID of page zero is FirstNormalTransactionId.
812 : */
3049 alvherre 813 CBC 35 : if (TransactionIdToCTsEntry(newestXact) != 0 &&
814 : !TransactionIdEquals(newestXact, FirstNormalTransactionId))
3049 alvherre 815 GIC 35 : return;
816 :
3049 alvherre 817 UIC 0 : pageno = TransactionIdToCTsPage(newestXact);
818 :
1059 tgl 819 0 : LWLockAcquire(CommitTsSLRULock, LW_EXCLUSIVE);
820 :
3049 alvherre 821 ECB : /* Zero the page and make an XLOG entry about it */
3049 alvherre 822 UIC 0 : ZeroCommitTsPage(pageno, !InRecovery);
823 :
1059 tgl 824 LBC 0 : LWLockRelease(CommitTsSLRULock);
825 : }
3049 alvherre 826 ECB :
827 : /*
828 : * Remove all CommitTs segments before the one holding the passed
3049 alvherre 829 EUB : * transaction ID.
830 : *
831 : * Note that we don't need to flush XLOG here.
832 : */
833 : void
2721 alvherre 834 GIC 317 : TruncateCommitTs(TransactionId oldestXact)
835 : {
836 : int cutoffPage;
837 :
838 : /*
3049 alvherre 839 ECB : * The cutoff point is the start of the segment containing oldestXact. We
840 : * pass the *page* containing oldestXact to SimpleLruTruncate.
841 : */
3049 alvherre 842 GIC 317 : cutoffPage = TransactionIdToCTsPage(oldestXact);
843 :
844 : /* Check to see if there's any files that could be removed */
3049 alvherre 845 CBC 317 : if (!SlruScanDirectory(CommitTsCtl, SlruScanDirCbReportPresence,
3049 alvherre 846 ECB : &cutoffPage))
3049 alvherre 847 GIC 317 : return; /* nothing to remove */
3049 alvherre 848 EUB :
849 : /* Write XLOG record */
2271 alvherre 850 UBC 0 : WriteTruncateXlogRec(cutoffPage, oldestXact);
3049 alvherre 851 EUB :
852 : /* Now we can remove the old CommitTs segment(s) */
3049 alvherre 853 UIC 0 : SimpleLruTruncate(CommitTsCtl, cutoffPage);
854 : }
3049 alvherre 855 ECB :
856 : /*
857 : * Set the limit values between which commit TS can be consulted.
858 : */
859 : void
3049 alvherre 860 CBC 1481 : SetCommitTsLimit(TransactionId oldestXact, TransactionId newestXact)
861 : {
862 : /*
863 : * Be careful not to overwrite values that are either further into the
864 : * "future" or signal a disabled committs.
865 : */
866 1481 : LWLockAcquire(CommitTsLock, LW_EXCLUSIVE);
2659 mail 867 GIC 1481 : if (ShmemVariableCache->oldestCommitTsXid != InvalidTransactionId)
3049 alvherre 868 ECB : {
2659 mail 869 LBC 0 : if (TransactionIdPrecedes(ShmemVariableCache->oldestCommitTsXid, oldestXact))
2659 mail 870 UBC 0 : ShmemVariableCache->oldestCommitTsXid = oldestXact;
871 0 : if (TransactionIdPrecedes(newestXact, ShmemVariableCache->newestCommitTsXid))
2659 mail 872 LBC 0 : ShmemVariableCache->newestCommitTsXid = newestXact;
3049 alvherre 873 ECB : }
874 : else
875 : {
2659 mail 876 GIC 1481 : Assert(ShmemVariableCache->newestCommitTsXid == InvalidTransactionId);
2358 alvherre 877 1481 : ShmemVariableCache->oldestCommitTsXid = oldestXact;
878 1481 : ShmemVariableCache->newestCommitTsXid = newestXact;
879 : }
3049 880 1481 : LWLockRelease(CommitTsLock);
881 1481 : }
882 :
883 : /*
884 : * Move forwards the oldest commitTS value that can be consulted
885 : */
886 : void
2659 mail 887 317 : AdvanceOldestCommitTsXid(TransactionId oldestXact)
888 : {
3049 alvherre 889 317 : LWLockAcquire(CommitTsLock, LW_EXCLUSIVE);
2659 mail 890 317 : if (ShmemVariableCache->oldestCommitTsXid != InvalidTransactionId &&
2118 tgl 891 UIC 0 : TransactionIdPrecedes(ShmemVariableCache->oldestCommitTsXid, oldestXact))
2659 mail 892 0 : ShmemVariableCache->oldestCommitTsXid = oldestXact;
3049 alvherre 893 GIC 317 : LWLockRelease(CommitTsLock);
894 317 : }
895 :
896 :
897 : /*
898 : * Decide whether a commitTS page number is "older" for truncation purposes.
899 : * Analogous to CLOGPagePrecedes().
3049 alvherre 900 ECB : *
901 : * At default BLCKSZ, (1 << 31) % COMMIT_TS_XACTS_PER_PAGE == 128. This
902 : * introduces differences compared to CLOG and the other SLRUs having (1 <<
903 : * 31) % per_page == 0. This function never tests exactly
904 : * TransactionIdPrecedes(x-2^31, x). When the system reaches xidStopLimit,
813 noah 905 : * there are two possible counts of page boundaries between oldestXact and the
906 : * latest XID assigned, depending on whether oldestXact is within the first
907 : * 128 entries of its page. Since this function doesn't know the location of
908 : * oldestXact within page2, it returns false for one page that actually is
909 : * expendable. This is a wider (yet still negligible) version of the
910 : * truncation opportunity that CLOGPagePrecedes() cannot recognize.
911 : *
912 : * For the sake of a worked example, number entries with decimal values such
913 : * that page1==1 entries range from 1.0 to 1.999. Let N+0.15 be the number of
914 : * pages that 2^31 entries will span (N is an integer). If oldestXact=N+2.1,
915 : * then the final safe XID assignment leaves newestXact=1.95. We keep page 2,
916 : * because entry=2.85 is the border that toggles whether entries precede the
917 : * last entry of the oldestXact page. While page 2 is expendable at
918 : * oldestXact=N+2.1, it would be precious at oldestXact=N+2.9.
3049 alvherre 919 EUB : */
920 : static bool
3049 alvherre 921 GBC 71214 : CommitTsPagePrecedes(int page1, int page2)
3049 alvherre 922 EUB : {
923 : TransactionId xid1;
924 : TransactionId xid2;
925 :
3049 alvherre 926 GIC 71214 : xid1 = ((TransactionId) page1) * COMMIT_TS_XACTS_PER_PAGE;
813 noah 927 71214 : xid1 += FirstNormalTransactionId + 1;
3049 alvherre 928 71214 : xid2 = ((TransactionId) page2) * COMMIT_TS_XACTS_PER_PAGE;
813 noah 929 71214 : xid2 += FirstNormalTransactionId + 1;
3049 alvherre 930 EUB :
813 noah 931 GIC 113212 : return (TransactionIdPrecedes(xid1, xid2) &&
932 41998 : TransactionIdPrecedes(xid1, xid2 + COMMIT_TS_XACTS_PER_PAGE - 1));
933 : }
3049 alvherre 934 EUB :
935 :
936 : /*
937 : * Write a ZEROPAGE xlog record
938 : */
939 : static void
3049 alvherre 940 UBC 0 : WriteZeroPageXlogRec(int pageno)
941 : {
3049 alvherre 942 UIC 0 : XLogBeginInsert();
943 0 : XLogRegisterData((char *) (&pageno), sizeof(int));
944 0 : (void) XLogInsert(RM_COMMIT_TS_ID, COMMIT_TS_ZEROPAGE);
945 0 : }
3049 alvherre 946 EUB :
947 : /*
948 : * Write a TRUNCATE xlog record
949 : */
950 : static void
2271 alvherre 951 UBC 0 : WriteTruncateXlogRec(int pageno, TransactionId oldestXid)
952 : {
2271 alvherre 953 EUB : xl_commit_ts_truncate xlrec;
954 :
2271 alvherre 955 UIC 0 : xlrec.pageno = pageno;
956 0 : xlrec.oldestXid = oldestXid;
957 :
3049 alvherre 958 UBC 0 : XLogBeginInsert();
2271 alvherre 959 UIC 0 : XLogRegisterData((char *) (&xlrec), SizeOfCommitTsTruncate);
3049 alvherre 960 UBC 0 : (void) XLogInsert(RM_COMMIT_TS_ID, COMMIT_TS_TRUNCATE);
3049 alvherre 961 UIC 0 : }
3049 alvherre 962 EUB :
963 : /*
964 : * CommitTS resource manager's routines
965 : */
966 : void
3049 alvherre 967 UIC 0 : commit_ts_redo(XLogReaderState *record)
3049 alvherre 968 EUB : {
3049 alvherre 969 UIC 0 : uint8 info = XLogRecGetInfo(record) & ~XLR_INFO_MASK;
3049 alvherre 970 EUB :
971 : /* Backup blocks are not used in commit_ts records */
3049 alvherre 972 UBC 0 : Assert(!XLogRecHasAnyBlockRefs(record));
973 :
3049 alvherre 974 UIC 0 : if (info == COMMIT_TS_ZEROPAGE)
975 : {
976 : int pageno;
977 : int slotno;
3049 alvherre 978 EUB :
3049 alvherre 979 UIC 0 : memcpy(&pageno, XLogRecGetData(record), sizeof(int));
3049 alvherre 980 EUB :
1059 tgl 981 UIC 0 : LWLockAcquire(CommitTsSLRULock, LW_EXCLUSIVE);
982 :
3049 alvherre 983 UBC 0 : slotno = ZeroCommitTsPage(pageno, false);
984 0 : SimpleLruWritePage(CommitTsCtl, slotno);
3049 alvherre 985 UIC 0 : Assert(!CommitTsCtl->shared->page_dirty[slotno]);
986 :
1059 tgl 987 0 : LWLockRelease(CommitTsSLRULock);
988 : }
3049 alvherre 989 0 : else if (info == COMMIT_TS_TRUNCATE)
3049 alvherre 990 EUB : {
2271 alvherre 991 UIC 0 : xl_commit_ts_truncate *trunc = (xl_commit_ts_truncate *) XLogRecGetData(record);
3049 alvherre 992 EUB :
2271 alvherre 993 UIC 0 : AdvanceOldestCommitTsXid(trunc->oldestXid);
994 :
995 : /*
996 : * During XLOG replay, latest_page_number isn't set up yet; insert a
997 : * suitable value to bypass the sanity test in SimpleLruTruncate.
998 : */
999 0 : CommitTsCtl->shared->latest_page_number = trunc->pageno;
1000 :
1001 0 : SimpleLruTruncate(CommitTsCtl, trunc->pageno);
1002 : }
1003 : else
3049 1004 0 : elog(PANIC, "commit_ts_redo: unknown op code %u", info);
1005 0 : }
1006 :
1007 : /*
1008 : * Entrypoint for sync.c to sync commit_ts files.
1009 : */
1010 : int
926 tmunro 1011 0 : committssyncfiletag(const FileTag *ftag, char *path)
1012 : {
1013 0 : return SlruSyncFileTag(CommitTsCtl, ftag, path);
1014 : }
|