TLA Line data Source code
1 : /*-------------------------------------------------------------------------
2 : *
3 : * commit_ts.c
4 : * PostgreSQL commit timestamp manager
5 : *
6 : * This module is a pg_xact-like system that stores the commit timestamp
7 : * for each transaction.
8 : *
9 : * XLOG interactions: this module generates an XLOG record whenever a new
10 : * CommitTs page is initialized to zeroes. Other writes of CommitTS come
11 : * from recording of transaction commit in xact.c, which generates its own
12 : * XLOG records for these events and will re-perform the status update on
13 : * redo; so we need make no additional XLOG entry here.
14 : *
15 : * Portions Copyright (c) 1996-2023, PostgreSQL Global Development Group
16 : * Portions Copyright (c) 1994, Regents of the University of California
17 : *
18 : * src/backend/access/transam/commit_ts.c
19 : *
20 : *-------------------------------------------------------------------------
21 : */
22 : #include "postgres.h"
23 :
24 : #include "access/commit_ts.h"
25 : #include "access/htup_details.h"
26 : #include "access/slru.h"
27 : #include "access/transam.h"
28 : #include "access/xloginsert.h"
29 : #include "access/xlogutils.h"
30 : #include "catalog/pg_type.h"
31 : #include "funcapi.h"
32 : #include "miscadmin.h"
33 : #include "pg_trace.h"
34 : #include "storage/shmem.h"
35 : #include "utils/builtins.h"
36 : #include "utils/snapmgr.h"
37 : #include "utils/timestamp.h"
38 :
39 : /*
40 : * Defines for CommitTs page sizes. A page is the same BLCKSZ as is used
41 : * everywhere else in Postgres.
42 : *
43 : * Note: because TransactionIds are 32 bits and wrap around at 0xFFFFFFFF,
44 : * CommitTs page numbering also wraps around at
45 : * 0xFFFFFFFF/COMMIT_TS_XACTS_PER_PAGE, and CommitTs segment numbering at
46 : * 0xFFFFFFFF/COMMIT_TS_XACTS_PER_PAGE/SLRU_PAGES_PER_SEGMENT. We need take no
47 : * explicit notice of that fact in this module, except when comparing segment
48 : * and page numbers in TruncateCommitTs (see CommitTsPagePrecedes).
49 : */
50 :
51 : /*
52 : * We need 8+2 bytes per xact. Note that enlarging this struct might mean
53 : * the largest possible file name is more than 5 chars long; see
54 : * SlruScanDirectory.
55 : */
56 : typedef struct CommitTimestampEntry
57 : {
58 : TimestampTz time;
59 : RepOriginId nodeid;
60 : } CommitTimestampEntry;
61 :
62 : #define SizeOfCommitTimestampEntry (offsetof(CommitTimestampEntry, nodeid) + \
63 : sizeof(RepOriginId))
64 :
65 : #define COMMIT_TS_XACTS_PER_PAGE \
66 : (BLCKSZ / SizeOfCommitTimestampEntry)
67 :
68 : #define TransactionIdToCTsPage(xid) \
69 : ((xid) / (TransactionId) COMMIT_TS_XACTS_PER_PAGE)
70 : #define TransactionIdToCTsEntry(xid) \
71 : ((xid) % (TransactionId) COMMIT_TS_XACTS_PER_PAGE)
72 :
73 : /*
74 : * Link to shared-memory data structures for CommitTs control
75 : */
76 : static SlruCtlData CommitTsCtlData;
77 :
78 : #define CommitTsCtl (&CommitTsCtlData)
79 :
80 : /*
81 : * We keep a cache of the last value set in shared memory.
82 : *
83 : * This is also good place to keep the activation status. We keep this
84 : * separate from the GUC so that the standby can activate the module if the
85 : * primary has it active independently of the value of the GUC.
86 : *
87 : * This is protected by CommitTsLock. In some places, we use commitTsActive
88 : * without acquiring the lock; where this happens, a comment explains the
89 : * rationale for it.
90 : */
91 : typedef struct CommitTimestampShared
92 : {
93 : TransactionId xidLastCommit;
94 : CommitTimestampEntry dataLastCommit;
95 : bool commitTsActive;
96 : } CommitTimestampShared;
97 :
98 : static CommitTimestampShared *commitTsShared;
99 :
100 :
101 : /* GUC variable */
102 : bool track_commit_timestamp;
103 :
104 : static void SetXidCommitTsInPage(TransactionId xid, int nsubxids,
105 : TransactionId *subxids, TimestampTz ts,
106 : RepOriginId nodeid, int pageno);
107 : static void TransactionIdSetCommitTs(TransactionId xid, TimestampTz ts,
108 : RepOriginId nodeid, int slotno);
109 : static void error_commit_ts_disabled(void);
110 : static int ZeroCommitTsPage(int pageno, bool writeXlog);
111 : static bool CommitTsPagePrecedes(int page1, int page2);
112 : static void ActivateCommitTs(void);
113 : static void DeactivateCommitTs(void);
114 : static void WriteZeroPageXlogRec(int pageno);
115 : static void WriteTruncateXlogRec(int pageno, TransactionId oldestXid);
116 :
117 : /*
118 : * TransactionTreeSetCommitTsData
119 : *
120 : * Record the final commit timestamp of transaction entries in the commit log
121 : * for a transaction and its subtransaction tree, as efficiently as possible.
122 : *
123 : * xid is the top level transaction id.
124 : *
125 : * subxids is an array of xids of length nsubxids, representing subtransactions
126 : * in the tree of xid. In various cases nsubxids may be zero.
127 : * The reason why tracking just the parent xid commit timestamp is not enough
128 : * is that the subtrans SLRU does not stay valid across crashes (it's not
129 : * permanent) so we need to keep the information about them here. If the
130 : * subtrans implementation changes in the future, we might want to revisit the
131 ECB : * decision of storing timestamp info for each subxid.
132 : */
133 : void
134 GIC 312745 : TransactionTreeSetCommitTsData(TransactionId xid, int nsubxids,
135 : TransactionId *subxids, TimestampTz timestamp,
136 : RepOriginId nodeid)
137 : {
138 : int i;
139 : TransactionId headxid;
140 : TransactionId newestXact;
141 :
142 : /*
143 : * No-op if the module is not active.
144 : *
145 : * An unlocked read here is fine, because in a standby (the only place
146 : * where the flag can change in flight) this routine is only called by the
147 ECB : * recovery process, which is also the only process which can change the
148 : * flag.
149 : */
150 GIC 312745 : if (!commitTsShared->commitTsActive)
151 312697 : return;
152 :
153 : /*
154 ECB : * Figure out the latest Xid in this batch: either the last subxid if
155 EUB : * there's any, otherwise the parent xid.
156 : */
157 CBC 48 : if (nsubxids > 0)
158 UIC 0 : newestXact = subxids[nsubxids - 1];
159 : else
160 GIC 48 : newestXact = xid;
161 :
162 : /*
163 : * We split the xids to set the timestamp to in groups belonging to the
164 : * same SLRU page; the first element in each such set is its head. The
165 : * first group has the main XID as the head; subsequent sets use the first
166 ECB : * subxid not on the previous page as head. This way, we only have to
167 : * lock/modify each SLRU page once.
168 : */
169 GBC 48 : headxid = xid;
170 CBC 48 : i = 0;
171 : for (;;)
172 UIC 0 : {
173 CBC 48 : int pageno = TransactionIdToCTsPage(headxid);
174 : int j;
175 EUB :
176 GBC 48 : for (j = i; j < nsubxids; j++)
177 : {
178 UIC 0 : if (TransactionIdToCTsPage(subxids[j]) != pageno)
179 0 : break;
180 ECB : }
181 : /* subxids[i..j] are on the same page as the head */
182 :
183 GIC 48 : SetXidCommitTsInPage(headxid, j - i, subxids + i, timestamp, nodeid,
184 ECB : pageno);
185 :
186 : /* if we wrote out all subxids, we're done. */
187 GIC 48 : if (j >= nsubxids)
188 48 : break;
189 :
190 : /*
191 EUB : * Set the new head and skip over it, as well as over the subxids we
192 : * just wrote.
193 : */
194 UIC 0 : headxid = subxids[j];
195 0 : i = j + 1;
196 ECB : }
197 :
198 : /* update the cached value in shared memory */
199 CBC 48 : LWLockAcquire(CommitTsLock, LW_EXCLUSIVE);
200 GIC 48 : commitTsShared->xidLastCommit = xid;
201 48 : commitTsShared->dataLastCommit.time = timestamp;
202 CBC 48 : commitTsShared->dataLastCommit.nodeid = nodeid;
203 ECB :
204 : /* and move forwards our endpoint, if needed */
205 GIC 48 : if (TransactionIdPrecedes(ShmemVariableCache->newestCommitTsXid, newestXact))
206 39 : ShmemVariableCache->newestCommitTsXid = newestXact;
207 48 : LWLockRelease(CommitTsLock);
208 : }
209 :
210 : /*
211 : * Record the commit timestamp of transaction entries in the commit log for all
212 ECB : * entries on a single page. Atomic only on this page.
213 : */
214 : static void
215 GIC 48 : SetXidCommitTsInPage(TransactionId xid, int nsubxids,
216 : TransactionId *subxids, TimestampTz ts,
217 : RepOriginId nodeid, int pageno)
218 : {
219 ECB : int slotno;
220 : int i;
221 :
222 GIC 48 : LWLockAcquire(CommitTsSLRULock, LW_EXCLUSIVE);
223 ECB :
224 CBC 48 : slotno = SimpleLruReadPage(CommitTsCtl, pageno, true, xid);
225 EUB :
226 GIC 48 : TransactionIdSetCommitTs(xid, ts, nodeid, slotno);
227 CBC 48 : for (i = 0; i < nsubxids; i++)
228 UIC 0 : TransactionIdSetCommitTs(subxids[i], ts, nodeid, slotno);
229 ECB :
230 CBC 48 : CommitTsCtl->shared->page_dirty[slotno] = true;
231 :
232 GIC 48 : LWLockRelease(CommitTsSLRULock);
233 48 : }
234 :
235 : /*
236 : * Sets the commit timestamp of a single transaction.
237 : *
238 ECB : * Must be called with CommitTsSLRULock held
239 : */
240 : static void
241 CBC 48 : TransactionIdSetCommitTs(TransactionId xid, TimestampTz ts,
242 : RepOriginId nodeid, int slotno)
243 : {
244 48 : int entryno = TransactionIdToCTsEntry(xid);
245 : CommitTimestampEntry entry;
246 ECB :
247 CBC 48 : Assert(TransactionIdIsNormal(xid));
248 :
249 48 : entry.time = ts;
250 48 : entry.nodeid = nodeid;
251 :
252 48 : memcpy(CommitTsCtl->shared->page_buffer[slotno] +
253 GIC 48 : SizeOfCommitTimestampEntry * entryno,
254 : &entry, SizeOfCommitTimestampEntry);
255 48 : }
256 :
257 : /*
258 : * Interrogate the commit timestamp of a transaction.
259 : *
260 : * The return value indicates whether a commit timestamp record was found for
261 : * the given xid. The timestamp value is returned in *ts (which may not be
262 : * null), and the origin node for the Xid is returned in *nodeid, if it's not
263 ECB : * null.
264 : */
265 : bool
266 CBC 31 : TransactionIdGetCommitTsData(TransactionId xid, TimestampTz *ts,
267 ECB : RepOriginId *nodeid)
268 : {
269 GIC 31 : int pageno = TransactionIdToCTsPage(xid);
270 31 : int entryno = TransactionIdToCTsEntry(xid);
271 : int slotno;
272 : CommitTimestampEntry entry;
273 ECB : TransactionId oldestCommitTsXid;
274 : TransactionId newestCommitTsXid;
275 :
276 GIC 31 : if (!TransactionIdIsValid(xid))
277 CBC 3 : ereport(ERROR,
278 : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
279 : errmsg("cannot retrieve commit timestamp for transaction %u", xid)));
280 28 : else if (!TransactionIdIsNormal(xid))
281 ECB : {
282 : /* frozen and bootstrap xids are always committed far in the past */
283 CBC 6 : *ts = 0;
284 GIC 6 : if (nodeid)
285 2 : *nodeid = 0;
286 CBC 6 : return false;
287 : }
288 :
289 22 : LWLockAcquire(CommitTsLock, LW_SHARED);
290 ECB :
291 : /* Error if module not enabled */
292 GIC 22 : if (!commitTsShared->commitTsActive)
293 3 : error_commit_ts_disabled();
294 :
295 : /*
296 ECB : * If we're asked for the cached value, return that. Otherwise, fall
297 : * through to read from SLRU.
298 : */
299 CBC 19 : if (commitTsShared->xidLastCommit == xid)
300 ECB : {
301 GIC 9 : *ts = commitTsShared->dataLastCommit.time;
302 CBC 9 : if (nodeid)
303 2 : *nodeid = commitTsShared->dataLastCommit.nodeid;
304 :
305 GIC 9 : LWLockRelease(CommitTsLock);
306 CBC 9 : return *ts != 0;
307 ECB : }
308 :
309 CBC 10 : oldestCommitTsXid = ShmemVariableCache->oldestCommitTsXid;
310 10 : newestCommitTsXid = ShmemVariableCache->newestCommitTsXid;
311 : /* neither is invalid, or both are */
312 GIC 10 : Assert(TransactionIdIsValid(oldestCommitTsXid) == TransactionIdIsValid(newestCommitTsXid));
313 10 : LWLockRelease(CommitTsLock);
314 :
315 ECB : /*
316 : * Return empty if the requested value is outside our valid range.
317 : */
318 GIC 20 : if (!TransactionIdIsValid(oldestCommitTsXid) ||
319 CBC 17 : TransactionIdPrecedes(xid, oldestCommitTsXid) ||
320 7 : TransactionIdPrecedes(newestCommitTsXid, xid))
321 EUB : {
322 CBC 3 : *ts = 0;
323 GIC 3 : if (nodeid)
324 UIC 0 : *nodeid = InvalidRepOriginId;
325 GIC 3 : return false;
326 ECB : }
327 :
328 : /* lock is acquired by SimpleLruReadPage_ReadOnly */
329 CBC 7 : slotno = SimpleLruReadPage_ReadOnly(CommitTsCtl, pageno, xid);
330 GIC 7 : memcpy(&entry,
331 7 : CommitTsCtl->shared->page_buffer[slotno] +
332 CBC 7 : SizeOfCommitTimestampEntry * entryno,
333 ECB : SizeOfCommitTimestampEntry);
334 EUB :
335 GIC 7 : *ts = entry.time;
336 CBC 7 : if (nodeid)
337 LBC 0 : *nodeid = entry.nodeid;
338 :
339 GIC 7 : LWLockRelease(CommitTsSLRULock);
340 7 : return *ts != 0;
341 : }
342 :
343 : /*
344 : * Return the Xid of the latest committed transaction. (As far as this module
345 : * is concerned, anyway; it's up to the caller to ensure the value is useful
346 : * for its purposes.)
347 : *
348 : * ts and nodeid are filled with the corresponding data; they can be passed
349 ECB : * as NULL if not wanted.
350 : */
351 : TransactionId
352 GIC 3 : GetLatestCommitTsData(TimestampTz *ts, RepOriginId *nodeid)
353 ECB : {
354 : TransactionId xid;
355 :
356 CBC 3 : LWLockAcquire(CommitTsLock, LW_SHARED);
357 EUB :
358 : /* Error if module not enabled */
359 CBC 3 : if (!commitTsShared->commitTsActive)
360 LBC 0 : error_commit_ts_disabled();
361 ECB :
362 CBC 3 : xid = commitTsShared->xidLastCommit;
363 3 : if (ts)
364 3 : *ts = commitTsShared->dataLastCommit.time;
365 GIC 3 : if (nodeid)
366 CBC 3 : *nodeid = commitTsShared->dataLastCommit.nodeid;
367 GIC 3 : LWLockRelease(CommitTsLock);
368 :
369 3 : return xid;
370 ECB : }
371 :
372 : static void
373 GIC 3 : error_commit_ts_disabled(void)
374 : {
375 3 : ereport(ERROR,
376 : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
377 : errmsg("could not get commit timestamp data"),
378 : RecoveryInProgress() ?
379 : errhint("Make sure the configuration parameter \"%s\" is set on the primary server.",
380 : "track_commit_timestamp") :
381 : errhint("Make sure the configuration parameter \"%s\" is set.",
382 : "track_commit_timestamp")));
383 : }
384 :
385 : /*
386 ECB : * SQL-callable wrapper to obtain commit time of a transaction
387 : */
388 : Datum
389 GIC 26 : pg_xact_commit_timestamp(PG_FUNCTION_ARGS)
390 : {
391 26 : TransactionId xid = PG_GETARG_TRANSACTIONID(0);
392 ECB : TimestampTz ts;
393 : bool found;
394 :
395 CBC 26 : found = TransactionIdGetCommitTsData(xid, &ts, NULL);
396 :
397 21 : if (!found)
398 GIC 7 : PG_RETURN_NULL();
399 :
400 14 : PG_RETURN_TIMESTAMPTZ(ts);
401 : }
402 :
403 :
404 : /*
405 : * pg_last_committed_xact
406 : *
407 : * SQL-callable wrapper to obtain some information about the latest
408 : * committed transaction: transaction ID, timestamp and replication
409 ECB : * origin.
410 : */
411 : Datum
412 GIC 3 : pg_last_committed_xact(PG_FUNCTION_ARGS)
413 : {
414 : TransactionId xid;
415 : RepOriginId nodeid;
416 : TimestampTz ts;
417 : Datum values[3];
418 : bool nulls[3];
419 : TupleDesc tupdesc;
420 ECB : HeapTuple htup;
421 :
422 : /* and construct a tuple with our data */
423 GBC 3 : xid = GetLatestCommitTsData(&ts, &nodeid);
424 :
425 GNC 3 : if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE)
426 UNC 0 : elog(ERROR, "return type must be a row type");
427 ECB :
428 CBC 3 : if (!TransactionIdIsNormal(xid))
429 : {
430 UIC 0 : memset(nulls, true, sizeof(nulls));
431 ECB : }
432 : else
433 : {
434 GIC 3 : values[0] = TransactionIdGetDatum(xid);
435 3 : nulls[0] = false;
436 :
437 3 : values[1] = TimestampTzGetDatum(ts);
438 3 : nulls[1] = false;
439 :
440 3 : values[2] = ObjectIdGetDatum((Oid) nodeid);
441 3 : nulls[2] = false;
442 : }
443 ECB :
444 GIC 3 : htup = heap_form_tuple(tupdesc, values, nulls);
445 ECB :
446 GIC 3 : PG_RETURN_DATUM(HeapTupleGetDatum(htup));
447 : }
448 :
449 : /*
450 : * pg_xact_commit_timestamp_origin
451 : *
452 : * SQL-callable wrapper to obtain commit timestamp and replication origin
453 : * of a given transaction.
454 ECB : */
455 : Datum
456 CBC 5 : pg_xact_commit_timestamp_origin(PG_FUNCTION_ARGS)
457 EUB : {
458 GIC 5 : TransactionId xid = PG_GETARG_TRANSACTIONID(0);
459 ECB : RepOriginId nodeid;
460 : TimestampTz ts;
461 : Datum values[2];
462 : bool nulls[2];
463 : TupleDesc tupdesc;
464 : HeapTuple htup;
465 : bool found;
466 :
467 GIC 5 : found = TransactionIdGetCommitTsData(xid, &ts, &nodeid);
468 ECB :
469 GNC 4 : if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE)
470 UNC 0 : elog(ERROR, "return type must be a row type");
471 :
472 GIC 4 : if (!found)
473 : {
474 2 : memset(nulls, true, sizeof(nulls));
475 : }
476 : else
477 ECB : {
478 GIC 2 : values[0] = TimestampTzGetDatum(ts);
479 CBC 2 : nulls[0] = false;
480 :
481 GIC 2 : values[1] = ObjectIdGetDatum((Oid) nodeid);
482 2 : nulls[1] = false;
483 : }
484 :
485 4 : htup = heap_form_tuple(tupdesc, values, nulls);
486 ECB :
487 GIC 4 : PG_RETURN_DATUM(HeapTupleGetDatum(htup));
488 ECB : }
489 :
490 : /*
491 : * Number of shared CommitTS buffers.
492 : *
493 : * We use a very similar logic as for the number of CLOG buffers (except we
494 : * scale up twice as fast with shared buffers, and the maximum is twice as
495 : * high); see comments in CLOGShmemBuffers.
496 : */
497 : Size
498 GIC 4564 : CommitTsShmemBuffers(void)
499 : {
500 4564 : return Min(256, Max(4, NBuffers / 256));
501 ECB : }
502 :
503 : /*
504 : * Shared memory sizing for CommitTs
505 : */
506 : Size
507 GIC 2738 : CommitTsShmemSize(void)
508 ECB : {
509 GIC 2738 : return SimpleLruShmemSize(CommitTsShmemBuffers(), 0) +
510 : sizeof(CommitTimestampShared);
511 : }
512 ECB :
513 : /*
514 : * Initialize CommitTs at system startup (postmaster start or standalone
515 : * backend)
516 : */
517 : void
518 CBC 1826 : CommitTsShmemInit(void)
519 ECB : {
520 : bool found;
521 :
522 GBC 1826 : CommitTsCtl->PagePrecedes = CommitTsPagePrecedes;
523 CBC 1826 : SimpleLruInit(CommitTsCtl, "CommitTs", CommitTsShmemBuffers(), 0,
524 GIC 1826 : CommitTsSLRULock, "pg_commit_ts",
525 : LWTRANCHE_COMMITTS_BUFFER,
526 : SYNC_HANDLER_COMMIT_TS);
527 1826 : SlruPagePrecedesUnitTests(CommitTsCtl, COMMIT_TS_XACTS_PER_PAGE);
528 :
529 1826 : commitTsShared = ShmemInitStruct("CommitTs shared",
530 : sizeof(CommitTimestampShared),
531 : &found);
532 ECB :
533 GIC 1826 : if (!IsUnderPostmaster)
534 : {
535 1826 : Assert(!found);
536 :
537 1826 : commitTsShared->xidLastCommit = InvalidTransactionId;
538 1826 : TIMESTAMP_NOBEGIN(commitTsShared->dataLastCommit.time);
539 CBC 1826 : commitTsShared->dataLastCommit.nodeid = InvalidRepOriginId;
540 GIC 1826 : commitTsShared->commitTsActive = false;
541 : }
542 : else
543 UIC 0 : Assert(found);
544 GIC 1826 : }
545 :
546 : /*
547 : * This function must be called ONCE on system install.
548 : *
549 : * (The CommitTs directory is assumed to have been created by initdb, and
550 : * CommitTsShmemInit must have been called already.)
551 ECB : */
552 : void
553 GIC 305 : BootStrapCommitTs(void)
554 : {
555 ECB : /*
556 : * Nothing to do here at present, unlike most other SLRU modules; segments
557 : * are created when the server is started with this module enabled. See
558 EUB : * ActivateCommitTs.
559 : */
560 CBC 305 : }
561 :
562 : /*
563 : * Initialize (or reinitialize) a page of CommitTs to zeroes.
564 : * If writeXlog is true, also emit an XLOG record saying we did this.
565 : *
566 : * The page is not actually written, just set up in shared memory.
567 : * The slot number of the new page is returned.
568 ECB : *
569 : * Control lock must be held at entry, and will be held at exit.
570 : */
571 : static int
572 GIC 7 : ZeroCommitTsPage(int pageno, bool writeXlog)
573 : {
574 : int slotno;
575 :
576 7 : slotno = SimpleLruZeroPage(CommitTsCtl, pageno);
577 :
578 CBC 7 : if (writeXlog)
579 UIC 0 : WriteZeroPageXlogRec(pageno);
580 :
581 GIC 7 : return slotno;
582 : }
583 :
584 : /*
585 : * This must be called ONCE during postmaster or standalone-backend startup,
586 : * after StartupXLOG has initialized ShmemVariableCache->nextXid.
587 : */
588 : void
589 CBC 8 : StartupCommitTs(void)
590 ECB : {
591 GIC 8 : ActivateCommitTs();
592 CBC 8 : }
593 ECB :
594 : /*
595 : * This must be called ONCE during postmaster or standalone-backend startup,
596 : * after recovery has finished.
597 : */
598 : void
599 GIC 1142 : CompleteCommitTsInitialization(void)
600 ECB : {
601 : /*
602 : * If the feature is not enabled, turn it off for good. This also removes
603 : * any leftover data.
604 : *
605 : * Conversely, we activate the module if the feature is enabled. This is
606 : * necessary for primary and standby as the activation depends on the
607 : * control file contents at the beginning of recovery or when a
608 : * XLOG_PARAMETER_CHANGE is replayed.
609 : */
610 GIC 1142 : if (!track_commit_timestamp)
611 1131 : DeactivateCommitTs();
612 : else
613 11 : ActivateCommitTs();
614 1142 : }
615 ECB :
616 : /*
617 : * Activate or deactivate CommitTs' upon reception of a XLOG_PARAMETER_CHANGE
618 EUB : * XLog record during recovery.
619 : */
620 ECB : void
621 CBC 19 : CommitTsParameterChange(bool newvalue, bool oldvalue)
622 ECB : {
623 : /*
624 : * If the commit_ts module is disabled in this server and we get word from
625 : * the primary server that it is enabled there, activate it so that we can
626 : * replay future WAL records involving it; also mark it as active on
627 : * pg_control. If the old value was already set, we already did this, so
628 : * don't do anything.
629 : *
630 : * If the module is disabled in the primary, disable it here too, unless
631 : * the module is enabled locally.
632 : *
633 : * Note this only runs in the recovery process, so an unlocked read is
634 : * fine.
635 : */
636 GIC 19 : if (newvalue)
637 : {
638 2 : if (!commitTsShared->commitTsActive)
639 UIC 0 : ActivateCommitTs();
640 : }
641 CBC 17 : else if (commitTsShared->commitTsActive)
642 GIC 1 : DeactivateCommitTs();
643 19 : }
644 :
645 : /*
646 : * Activate this module whenever necessary.
647 ECB : * This must happen during postmaster or standalone-backend startup,
648 : * or during WAL replay anytime the track_commit_timestamp setting is
649 : * changed in the primary.
650 : *
651 : * The reason why this SLRU needs separate activation/deactivation functions is
652 : * that it can be enabled/disabled during start and the activation/deactivation
653 : * on the primary is propagated to the standby via replay. Other SLRUs don't
654 : * have this property and they can be just initialized during normal startup.
655 : *
656 : * This is in charge of creating the currently active segment, if it's not
657 : * already there. The reason for this is that the server might have been
658 : * running with this module disabled for a while and thus might have skipped
659 : * the normal creation point.
660 : */
661 : static void
662 CBC 19 : ActivateCommitTs(void)
663 ECB : {
664 : TransactionId xid;
665 : int pageno;
666 :
667 : /* If we've done this already, there's nothing to do */
668 GIC 19 : LWLockAcquire(CommitTsLock, LW_EXCLUSIVE);
669 19 : if (commitTsShared->commitTsActive)
670 : {
671 3 : LWLockRelease(CommitTsLock);
672 3 : return;
673 : }
674 16 : LWLockRelease(CommitTsLock);
675 :
676 16 : xid = XidFromFullTransactionId(ShmemVariableCache->nextXid);
677 16 : pageno = TransactionIdToCTsPage(xid);
678 ECB :
679 : /*
680 : * Re-Initialize our idea of the latest page number.
681 : */
682 CBC 16 : LWLockAcquire(CommitTsSLRULock, LW_EXCLUSIVE);
683 GIC 16 : CommitTsCtl->shared->latest_page_number = pageno;
684 CBC 16 : LWLockRelease(CommitTsSLRULock);
685 :
686 : /*
687 ECB : * If CommitTs is enabled, but it wasn't in the previous server run, we
688 : * need to set the oldest and newest values to the next Xid; that way, we
689 : * will not try to read data that might not have been set.
690 : *
691 : * XXX does this have a problem if a server is started with commitTs
692 : * enabled, then started with commitTs disabled, then restarted with it
693 : * enabled again? It doesn't look like it does, because there should be a
694 : * checkpoint that sets the value to InvalidTransactionId at end of
695 : * recovery; and so any chance of injecting new transactions without
696 : * CommitTs values would occur after the oldestCommitTsXid has been set to
697 : * Invalid temporarily.
698 : */
699 CBC 16 : LWLockAcquire(CommitTsLock, LW_EXCLUSIVE);
700 16 : if (ShmemVariableCache->oldestCommitTsXid == InvalidTransactionId)
701 ECB : {
702 GIC 10 : ShmemVariableCache->oldestCommitTsXid =
703 10 : ShmemVariableCache->newestCommitTsXid = ReadNextTransactionId();
704 : }
705 16 : LWLockRelease(CommitTsLock);
706 :
707 : /* Create the current segment file, if necessary */
708 16 : if (!SimpleLruDoesPhysicalPageExist(CommitTsCtl, pageno))
709 : {
710 : int slotno;
711 :
712 7 : LWLockAcquire(CommitTsSLRULock, LW_EXCLUSIVE);
713 7 : slotno = ZeroCommitTsPage(pageno, false);
714 7 : SimpleLruWritePage(CommitTsCtl, slotno);
715 CBC 7 : Assert(!CommitTsCtl->shared->page_dirty[slotno]);
716 GIC 7 : LWLockRelease(CommitTsSLRULock);
717 : }
718 :
719 : /* Change the activation status in shared memory. */
720 16 : LWLockAcquire(CommitTsLock, LW_EXCLUSIVE);
721 16 : commitTsShared->commitTsActive = true;
722 16 : LWLockRelease(CommitTsLock);
723 : }
724 ECB :
725 : /*
726 : * Deactivate this module.
727 : *
728 : * This must be called when the track_commit_timestamp parameter is turned off.
729 : * This happens during postmaster or standalone-backend startup, or during WAL
730 : * replay.
731 : *
732 : * Resets CommitTs into invalid state to make sure we don't hand back
733 : * possibly-invalid data; also removes segments of old data.
734 : */
735 : static void
736 GIC 1132 : DeactivateCommitTs(void)
737 : {
738 : /*
739 : * Cleanup the status in the shared memory.
740 : *
741 : * We reset everything in the commitTsShared record to prevent user from
742 : * getting confusing data about last committed transaction on the standby
743 : * when the module was activated repeatedly on the primary.
744 ECB : */
745 CBC 1132 : LWLockAcquire(CommitTsLock, LW_EXCLUSIVE);
746 ECB :
747 CBC 1132 : commitTsShared->commitTsActive = false;
748 GIC 1132 : commitTsShared->xidLastCommit = InvalidTransactionId;
749 1132 : TIMESTAMP_NOBEGIN(commitTsShared->dataLastCommit.time);
750 1132 : commitTsShared->dataLastCommit.nodeid = InvalidRepOriginId;
751 :
752 1132 : ShmemVariableCache->oldestCommitTsXid = InvalidTransactionId;
753 CBC 1132 : ShmemVariableCache->newestCommitTsXid = InvalidTransactionId;
754 :
755 GIC 1132 : LWLockRelease(CommitTsLock);
756 :
757 : /*
758 : * Remove *all* files. This is necessary so that there are no leftover
759 : * files; in the case where this feature is later enabled after running
760 ECB : * with it disabled for some time there may be a gap in the file sequence.
761 : * (We can probably tolerate out-of-sequence files, as they are going to
762 : * be overwritten anyway when we wrap around, but it seems better to be
763 : * tidy.)
764 : */
765 GIC 1132 : LWLockAcquire(CommitTsSLRULock, LW_EXCLUSIVE);
766 1132 : (void) SlruScanDirectory(CommitTsCtl, SlruScanDirCbDeleteAll, NULL);
767 1132 : LWLockRelease(CommitTsSLRULock);
768 1132 : }
769 :
770 : /*
771 : * Perform a checkpoint --- either during shutdown, or on-the-fly
772 : */
773 : void
774 2363 : CheckPointCommitTs(void)
775 ECB : {
776 : /*
777 : * Write dirty CommitTs pages to disk. This may result in sync requests
778 : * queued for later handling by ProcessSyncRequests(), as part of the
779 : * checkpoint.
780 : */
781 GIC 2363 : SimpleLruWriteAll(CommitTsCtl, true);
782 2363 : }
783 :
784 ECB : /*
785 : * Make sure that CommitTs has room for a newly-allocated XID.
786 : *
787 : * NB: this is called while holding XidGenLock. We want it to be very fast
788 : * most of the time; even when it's not so fast, no actual I/O need happen
789 : * unless we're forced to write out a dirty CommitTs or xlog page to make room
790 : * in shared memory.
791 : *
792 : * NB: the current implementation relies on track_commit_timestamp being
793 : * PGC_POSTMASTER.
794 : */
795 : void
796 GBC 301130 : ExtendCommitTs(TransactionId newestXact)
797 : {
798 EUB : int pageno;
799 :
800 : /*
801 : * Nothing to do if module not enabled. Note we do an unlocked read of
802 : * the flag here, which is okay because this routine is only called from
803 : * GetNewTransactionId, which is never called in a standby.
804 : */
805 GIC 301130 : Assert(!InRecovery);
806 301130 : if (!commitTsShared->commitTsActive)
807 301095 : return;
808 :
809 : /*
810 : * No work except at first XID of a page. But beware: just after
811 : * wraparound, the first XID of page zero is FirstNormalTransactionId.
812 : */
813 CBC 35 : if (TransactionIdToCTsEntry(newestXact) != 0 &&
814 : !TransactionIdEquals(newestXact, FirstNormalTransactionId))
815 GIC 35 : return;
816 :
817 UIC 0 : pageno = TransactionIdToCTsPage(newestXact);
818 :
819 0 : LWLockAcquire(CommitTsSLRULock, LW_EXCLUSIVE);
820 :
821 ECB : /* Zero the page and make an XLOG entry about it */
822 UIC 0 : ZeroCommitTsPage(pageno, !InRecovery);
823 :
824 LBC 0 : LWLockRelease(CommitTsSLRULock);
825 : }
826 ECB :
827 : /*
828 : * Remove all CommitTs segments before the one holding the passed
829 EUB : * transaction ID.
830 : *
831 : * Note that we don't need to flush XLOG here.
832 : */
833 : void
834 GIC 317 : TruncateCommitTs(TransactionId oldestXact)
835 : {
836 : int cutoffPage;
837 :
838 : /*
839 ECB : * The cutoff point is the start of the segment containing oldestXact. We
840 : * pass the *page* containing oldestXact to SimpleLruTruncate.
841 : */
842 GIC 317 : cutoffPage = TransactionIdToCTsPage(oldestXact);
843 :
844 : /* Check to see if there's any files that could be removed */
845 CBC 317 : if (!SlruScanDirectory(CommitTsCtl, SlruScanDirCbReportPresence,
846 ECB : &cutoffPage))
847 GIC 317 : return; /* nothing to remove */
848 EUB :
849 : /* Write XLOG record */
850 UBC 0 : WriteTruncateXlogRec(cutoffPage, oldestXact);
851 EUB :
852 : /* Now we can remove the old CommitTs segment(s) */
853 UIC 0 : SimpleLruTruncate(CommitTsCtl, cutoffPage);
854 : }
855 ECB :
856 : /*
857 : * Set the limit values between which commit TS can be consulted.
858 : */
859 : void
860 CBC 1481 : SetCommitTsLimit(TransactionId oldestXact, TransactionId newestXact)
861 : {
862 : /*
863 : * Be careful not to overwrite values that are either further into the
864 : * "future" or signal a disabled committs.
865 : */
866 1481 : LWLockAcquire(CommitTsLock, LW_EXCLUSIVE);
867 GIC 1481 : if (ShmemVariableCache->oldestCommitTsXid != InvalidTransactionId)
868 ECB : {
869 LBC 0 : if (TransactionIdPrecedes(ShmemVariableCache->oldestCommitTsXid, oldestXact))
870 UBC 0 : ShmemVariableCache->oldestCommitTsXid = oldestXact;
871 0 : if (TransactionIdPrecedes(newestXact, ShmemVariableCache->newestCommitTsXid))
872 LBC 0 : ShmemVariableCache->newestCommitTsXid = newestXact;
873 ECB : }
874 : else
875 : {
876 GIC 1481 : Assert(ShmemVariableCache->newestCommitTsXid == InvalidTransactionId);
877 1481 : ShmemVariableCache->oldestCommitTsXid = oldestXact;
878 1481 : ShmemVariableCache->newestCommitTsXid = newestXact;
879 : }
880 1481 : LWLockRelease(CommitTsLock);
881 1481 : }
882 :
883 : /*
884 : * Move forwards the oldest commitTS value that can be consulted
885 : */
886 : void
887 317 : AdvanceOldestCommitTsXid(TransactionId oldestXact)
888 : {
889 317 : LWLockAcquire(CommitTsLock, LW_EXCLUSIVE);
890 317 : if (ShmemVariableCache->oldestCommitTsXid != InvalidTransactionId &&
891 UIC 0 : TransactionIdPrecedes(ShmemVariableCache->oldestCommitTsXid, oldestXact))
892 0 : ShmemVariableCache->oldestCommitTsXid = oldestXact;
893 GIC 317 : LWLockRelease(CommitTsLock);
894 317 : }
895 :
896 :
897 : /*
898 : * Decide whether a commitTS page number is "older" for truncation purposes.
899 : * Analogous to CLOGPagePrecedes().
900 ECB : *
901 : * At default BLCKSZ, (1 << 31) % COMMIT_TS_XACTS_PER_PAGE == 128. This
902 : * introduces differences compared to CLOG and the other SLRUs having (1 <<
903 : * 31) % per_page == 0. This function never tests exactly
904 : * TransactionIdPrecedes(x-2^31, x). When the system reaches xidStopLimit,
905 : * there are two possible counts of page boundaries between oldestXact and the
906 : * latest XID assigned, depending on whether oldestXact is within the first
907 : * 128 entries of its page. Since this function doesn't know the location of
908 : * oldestXact within page2, it returns false for one page that actually is
909 : * expendable. This is a wider (yet still negligible) version of the
910 : * truncation opportunity that CLOGPagePrecedes() cannot recognize.
911 : *
912 : * For the sake of a worked example, number entries with decimal values such
913 : * that page1==1 entries range from 1.0 to 1.999. Let N+0.15 be the number of
914 : * pages that 2^31 entries will span (N is an integer). If oldestXact=N+2.1,
915 : * then the final safe XID assignment leaves newestXact=1.95. We keep page 2,
916 : * because entry=2.85 is the border that toggles whether entries precede the
917 : * last entry of the oldestXact page. While page 2 is expendable at
918 : * oldestXact=N+2.1, it would be precious at oldestXact=N+2.9.
919 EUB : */
920 : static bool
921 GBC 71214 : CommitTsPagePrecedes(int page1, int page2)
922 EUB : {
923 : TransactionId xid1;
924 : TransactionId xid2;
925 :
926 GIC 71214 : xid1 = ((TransactionId) page1) * COMMIT_TS_XACTS_PER_PAGE;
927 71214 : xid1 += FirstNormalTransactionId + 1;
928 71214 : xid2 = ((TransactionId) page2) * COMMIT_TS_XACTS_PER_PAGE;
929 71214 : xid2 += FirstNormalTransactionId + 1;
930 EUB :
931 GIC 113212 : return (TransactionIdPrecedes(xid1, xid2) &&
932 41998 : TransactionIdPrecedes(xid1, xid2 + COMMIT_TS_XACTS_PER_PAGE - 1));
933 : }
934 EUB :
935 :
936 : /*
937 : * Write a ZEROPAGE xlog record
938 : */
939 : static void
940 UBC 0 : WriteZeroPageXlogRec(int pageno)
941 : {
942 UIC 0 : XLogBeginInsert();
943 0 : XLogRegisterData((char *) (&pageno), sizeof(int));
944 0 : (void) XLogInsert(RM_COMMIT_TS_ID, COMMIT_TS_ZEROPAGE);
945 0 : }
946 EUB :
947 : /*
948 : * Write a TRUNCATE xlog record
949 : */
950 : static void
951 UBC 0 : WriteTruncateXlogRec(int pageno, TransactionId oldestXid)
952 : {
953 EUB : xl_commit_ts_truncate xlrec;
954 :
955 UIC 0 : xlrec.pageno = pageno;
956 0 : xlrec.oldestXid = oldestXid;
957 :
958 UBC 0 : XLogBeginInsert();
959 UIC 0 : XLogRegisterData((char *) (&xlrec), SizeOfCommitTsTruncate);
960 UBC 0 : (void) XLogInsert(RM_COMMIT_TS_ID, COMMIT_TS_TRUNCATE);
961 UIC 0 : }
962 EUB :
963 : /*
964 : * CommitTS resource manager's routines
965 : */
966 : void
967 UIC 0 : commit_ts_redo(XLogReaderState *record)
968 EUB : {
969 UIC 0 : uint8 info = XLogRecGetInfo(record) & ~XLR_INFO_MASK;
970 EUB :
971 : /* Backup blocks are not used in commit_ts records */
972 UBC 0 : Assert(!XLogRecHasAnyBlockRefs(record));
973 :
974 UIC 0 : if (info == COMMIT_TS_ZEROPAGE)
975 : {
976 : int pageno;
977 : int slotno;
978 EUB :
979 UIC 0 : memcpy(&pageno, XLogRecGetData(record), sizeof(int));
980 EUB :
981 UIC 0 : LWLockAcquire(CommitTsSLRULock, LW_EXCLUSIVE);
982 :
983 UBC 0 : slotno = ZeroCommitTsPage(pageno, false);
984 0 : SimpleLruWritePage(CommitTsCtl, slotno);
985 UIC 0 : Assert(!CommitTsCtl->shared->page_dirty[slotno]);
986 :
987 0 : LWLockRelease(CommitTsSLRULock);
988 : }
989 0 : else if (info == COMMIT_TS_TRUNCATE)
990 EUB : {
991 UIC 0 : xl_commit_ts_truncate *trunc = (xl_commit_ts_truncate *) XLogRecGetData(record);
992 EUB :
993 UIC 0 : AdvanceOldestCommitTsXid(trunc->oldestXid);
994 :
995 : /*
996 : * During XLOG replay, latest_page_number isn't set up yet; insert a
997 : * suitable value to bypass the sanity test in SimpleLruTruncate.
998 : */
999 0 : CommitTsCtl->shared->latest_page_number = trunc->pageno;
1000 :
1001 0 : SimpleLruTruncate(CommitTsCtl, trunc->pageno);
1002 : }
1003 : else
1004 0 : elog(PANIC, "commit_ts_redo: unknown op code %u", info);
1005 0 : }
1006 :
1007 : /*
1008 : * Entrypoint for sync.c to sync commit_ts files.
1009 : */
1010 : int
1011 0 : committssyncfiletag(const FileTag *ftag, char *path)
1012 : {
1013 0 : return SlruSyncFileTag(CommitTsCtl, ftag, path);
1014 : }
|