TLA Line data Source code
1 : /*-------------------------------------------------------------------------
2 : *
3 : * pg_walinspect.c
4 : * Functions to inspect contents of PostgreSQL Write-Ahead Log
5 : *
6 : * Copyright (c) 2022-2023, PostgreSQL Global Development Group
7 : *
8 : * IDENTIFICATION
9 : * contrib/pg_walinspect/pg_walinspect.c
10 : *
11 : *-------------------------------------------------------------------------
12 : */
13 : #include "postgres.h"
14 :
15 : #include "access/xlog.h"
16 : #include "access/xlog_internal.h"
17 : #include "access/xlogreader.h"
18 : #include "access/xlogrecovery.h"
19 : #include "access/xlogstats.h"
20 : #include "access/xlogutils.h"
21 : #include "funcapi.h"
22 : #include "miscadmin.h"
23 : #include "utils/array.h"
24 : #include "utils/builtins.h"
25 : #include "utils/pg_lsn.h"
26 :
27 : /*
28 : * NOTE: For any code change or issue fix here, it is highly recommended to
29 : * give a thought about doing the same in pg_waldump tool as well.
30 : */
31 :
32 GIC 6 : PG_MODULE_MAGIC;
33 ECB :
34 GNC 5 : PG_FUNCTION_INFO_V1(pg_get_wal_block_info);
35 GIC 5 : PG_FUNCTION_INFO_V1(pg_get_wal_record_info);
36 CBC 7 : PG_FUNCTION_INFO_V1(pg_get_wal_records_info);
37 5 : PG_FUNCTION_INFO_V1(pg_get_wal_records_info_till_end_of_wal);
38 5 : PG_FUNCTION_INFO_V1(pg_get_wal_stats);
39 5 : PG_FUNCTION_INFO_V1(pg_get_wal_stats_till_end_of_wal);
40 ECB :
41 : static void ValidateInputLSNs(XLogRecPtr start_lsn, XLogRecPtr *end_lsn);
42 : static XLogRecPtr GetCurrentLSN(void);
43 : static XLogReaderState *InitXLogReaderState(XLogRecPtr lsn);
44 : static XLogRecord *ReadNextXLogRecord(XLogReaderState *xlogreader);
45 : static void GetWALRecordInfo(XLogReaderState *record, Datum *values,
46 : bool *nulls, uint32 ncols);
47 : static void GetWALRecordsInfo(FunctionCallInfo fcinfo,
48 : XLogRecPtr start_lsn,
49 : XLogRecPtr end_lsn);
50 : static void GetXLogSummaryStats(XLogStats *stats, ReturnSetInfo *rsinfo,
51 : Datum *values, bool *nulls, uint32 ncols,
52 : bool stats_per_record);
53 : static void FillXLogStatsRow(const char *name, uint64 n, uint64 total_count,
54 : uint64 rec_len, uint64 total_rec_len,
55 : uint64 fpi_len, uint64 total_fpi_len,
56 : uint64 tot_len, uint64 total_len,
57 : Datum *values, bool *nulls, uint32 ncols);
58 : static void GetWalStats(FunctionCallInfo fcinfo,
59 : XLogRecPtr start_lsn,
60 : XLogRecPtr end_lsn,
61 : bool stats_per_record);
62 : static void GetWALBlockInfo(FunctionCallInfo fcinfo, XLogReaderState *record,
63 : bool show_data);
64 :
65 : /*
66 : * Return the LSN up to which the server has WAL.
67 : */
68 : static XLogRecPtr
69 GNC 28 : GetCurrentLSN(void)
70 : {
71 : XLogRecPtr curr_lsn;
72 :
73 : /*
74 : * We determine the current LSN of the server similar to how page_read
75 : * callback read_local_xlog_page_no_wait does.
76 ECB : */
77 GIC 28 : if (!RecoveryInProgress())
78 GNC 28 : curr_lsn = GetFlushRecPtr(NULL);
79 : else
80 UNC 0 : curr_lsn = GetXLogReplayRecPtr(NULL);
81 :
82 GNC 28 : Assert(!XLogRecPtrIsInvalid(curr_lsn));
83 :
84 28 : return curr_lsn;
85 : }
86 ECB :
87 : /*
88 : * Intialize WAL reader and identify first valid LSN.
89 : */
90 : static XLogReaderState *
91 GIC 19 : InitXLogReaderState(XLogRecPtr lsn)
92 : {
93 : XLogReaderState *xlogreader;
94 : ReadLocalXLogPageNoWaitPrivate *private_data;
95 ECB : XLogRecPtr first_valid_record;
96 :
97 : /*
98 : * Reading WAL below the first page of the first segments isn't allowed.
99 : * This is a bootstrap WAL page and the page_read callback fails to read
100 : * it.
101 : */
102 GIC 19 : if (lsn < XLOG_BLCKSZ)
103 4 : ereport(ERROR,
104 : (errmsg("could not read WAL at LSN %X/%X",
105 : LSN_FORMAT_ARGS(lsn))));
106 ECB :
107 : private_data = (ReadLocalXLogPageNoWaitPrivate *)
108 GIC 15 : palloc0(sizeof(ReadLocalXLogPageNoWaitPrivate));
109 :
110 15 : xlogreader = XLogReaderAllocate(wal_segment_size, NULL,
111 15 : XL_ROUTINE(.page_read = &read_local_xlog_page_no_wait,
112 ECB : .segment_open = &wal_segment_open,
113 : .segment_close = &wal_segment_close),
114 : private_data);
115 :
116 GIC 15 : if (xlogreader == NULL)
117 UIC 0 : ereport(ERROR,
118 : (errcode(ERRCODE_OUT_OF_MEMORY),
119 : errmsg("out of memory"),
120 ECB : errdetail("Failed while allocating a WAL reading processor.")));
121 EUB :
122 : /* first find a valid recptr to start from */
123 GIC 15 : first_valid_record = XLogFindNextRecord(xlogreader, lsn);
124 :
125 15 : if (XLogRecPtrIsInvalid(first_valid_record))
126 UIC 0 : ereport(ERROR,
127 ECB : (errmsg("could not find a valid record after %X/%X",
128 : LSN_FORMAT_ARGS(lsn))));
129 :
130 GBC 15 : return xlogreader;
131 : }
132 :
133 : /*
134 ECB : * Read next WAL record.
135 : *
136 : * By design, to be less intrusive in a running system, no slot is allocated
137 : * to reserve the WAL we're about to read. Therefore this function can
138 : * encounter read errors for historical WAL.
139 : *
140 : * We guard against ordinary errors trying to read WAL that hasn't been
141 : * written yet by limiting end_lsn to the flushed WAL, but that can also
142 : * encounter errors if the flush pointer falls in the middle of a record. In
143 : * that case we'll return NULL.
144 : */
145 : static XLogRecord *
146 GIC 34339 : ReadNextXLogRecord(XLogReaderState *xlogreader)
147 : {
148 : XLogRecord *record;
149 : char *errormsg;
150 ECB :
151 GIC 34339 : record = XLogReadRecord(xlogreader, &errormsg);
152 :
153 34339 : if (record == NULL)
154 : {
155 ECB : ReadLocalXLogPageNoWaitPrivate *private_data;
156 :
157 : /* return NULL, if end of WAL is reached */
158 GIC 8 : private_data = (ReadLocalXLogPageNoWaitPrivate *)
159 : xlogreader->private_data;
160 :
161 8 : if (private_data->end_of_wal)
162 CBC 8 : return NULL;
163 :
164 UIC 0 : if (errormsg)
165 LBC 0 : ereport(ERROR,
166 ECB : (errcode_for_file_access(),
167 : errmsg("could not read WAL at %X/%X: %s",
168 EUB : LSN_FORMAT_ARGS(xlogreader->EndRecPtr), errormsg)));
169 : else
170 UIC 0 : ereport(ERROR,
171 : (errcode_for_file_access(),
172 : errmsg("could not read WAL at %X/%X",
173 : LSN_FORMAT_ARGS(xlogreader->EndRecPtr))));
174 EUB : }
175 :
176 GIC 34331 : return record;
177 : }
178 :
179 : /*
180 : * Output values that make up a row describing caller's WAL record.
181 : *
182 : * This function leaks memory. Caller may need to use its own custom memory
183 : * context.
184 : *
185 : * Keep this in sync with GetWALBlockInfo.
186 : */
187 : static void
188 34284 : GetWALRecordInfo(XLogReaderState *record, Datum *values,
189 : bool *nulls, uint32 ncols)
190 : {
191 : const char *record_type;
192 : RmgrData desc;
193 34284 : uint32 fpi_len = 0;
194 : StringInfoData rec_desc;
195 : StringInfoData rec_blk_ref;
196 CBC 34284 : int i = 0;
197 :
198 GIC 34284 : desc = GetRmgr(XLogRecGetRmid(record));
199 GNC 34284 : record_type = desc.rm_identify(XLogRecGetInfo(record));
200 :
201 34284 : if (record_type == NULL)
202 UNC 0 : record_type = psprintf("UNKNOWN (%x)", XLogRecGetInfo(record) & ~XLR_INFO_MASK);
203 :
204 CBC 34284 : initStringInfo(&rec_desc);
205 GIC 34284 : desc.rm_desc(&rec_desc, record);
206 ECB :
207 GNC 34284 : if (XLogRecHasAnyBlockRefs(record))
208 : {
209 34270 : initStringInfo(&rec_blk_ref);
210 34270 : XLogRecGetBlockRefInfo(record, false, true, &rec_blk_ref, &fpi_len);
211 : }
212 ECB :
213 CBC 34284 : values[i++] = LSNGetDatum(record->ReadRecPtr);
214 GIC 34284 : values[i++] = LSNGetDatum(record->EndRecPtr);
215 CBC 34284 : values[i++] = LSNGetDatum(XLogRecGetPrev(record));
216 GIC 34284 : values[i++] = TransactionIdGetDatum(XLogRecGetXid(record));
217 CBC 34284 : values[i++] = CStringGetTextDatum(desc.rm_name);
218 GNC 34284 : values[i++] = CStringGetTextDatum(record_type);
219 GIC 34284 : values[i++] = UInt32GetDatum(XLogRecGetTotalLen(record));
220 GNC 34284 : values[i++] = UInt32GetDatum(XLogRecGetDataLen(record));
221 CBC 34284 : values[i++] = UInt32GetDatum(fpi_len);
222 :
223 GNC 34284 : if (rec_desc.len > 0)
224 34273 : values[i++] = CStringGetTextDatum(rec_desc.data);
225 : else
226 11 : nulls[i++] = true;
227 :
228 34284 : if (XLogRecHasAnyBlockRefs(record))
229 34270 : values[i++] = CStringGetTextDatum(rec_blk_ref.data);
230 : else
231 14 : nulls[i++] = true;
232 ECB :
233 CBC 34284 : Assert(i == ncols);
234 34284 : }
235 ECB :
236 :
237 : /*
238 : * Output one or more rows in rsinfo tuple store, each describing a single
239 : * block reference from caller's WAL record. (Should only be called with
240 : * records that have block references.)
241 : *
242 : * This function leaks memory. Caller may need to use its own custom memory
243 : * context.
244 : *
245 : * Keep this in sync with GetWALRecordInfo.
246 : */
247 : static void
248 GNC 18 : GetWALBlockInfo(FunctionCallInfo fcinfo, XLogReaderState *record,
249 : bool show_data)
250 : {
251 : #define PG_GET_WAL_BLOCK_INFO_COLS 20
252 : int block_id;
253 18 : ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
254 : RmgrData desc;
255 : const char *record_type;
256 : StringInfoData rec_desc;
257 :
258 18 : Assert(XLogRecHasAnyBlockRefs(record));
259 :
260 18 : desc = GetRmgr(XLogRecGetRmid(record));
261 18 : record_type = desc.rm_identify(XLogRecGetInfo(record));
262 :
263 18 : if (record_type == NULL)
264 UNC 0 : record_type = psprintf("UNKNOWN (%x)",
265 0 : XLogRecGetInfo(record) & ~XLR_INFO_MASK);
266 :
267 GNC 18 : initStringInfo(&rec_desc);
268 18 : desc.rm_desc(&rec_desc, record);
269 :
270 36 : for (block_id = 0; block_id <= XLogRecMaxBlockId(record); block_id++)
271 : {
272 : DecodedBkpBlock *blk;
273 : BlockNumber blkno;
274 : RelFileLocator rnode;
275 : ForkNumber forknum;
276 18 : Datum values[PG_GET_WAL_BLOCK_INFO_COLS] = {0};
277 18 : bool nulls[PG_GET_WAL_BLOCK_INFO_COLS] = {0};
278 18 : uint32 block_data_len = 0,
279 18 : block_fpi_len = 0;
280 18 : ArrayType *block_fpi_info = NULL;
281 18 : int i = 0;
282 :
283 18 : if (!XLogRecHasBlockRef(record, block_id))
284 UNC 0 : continue;
285 :
286 GNC 18 : blk = XLogRecGetBlock(record, block_id);
287 :
288 18 : (void) XLogRecGetBlockTagExtended(record, block_id,
289 : &rnode, &forknum, &blkno, NULL);
290 :
291 : /* Save block_data_len */
292 18 : if (blk->has_data)
293 17 : block_data_len = blk->data_len;
294 :
295 18 : if (blk->has_image)
296 : {
297 : /* Block reference has an FPI, so prepare relevant output */
298 : int bitcnt;
299 1 : int cnt = 0;
300 : Datum *flags;
301 :
302 : /* Save block_fpi_len */
303 1 : block_fpi_len = blk->bimg_len;
304 :
305 : /* Construct and save block_fpi_info */
306 1 : bitcnt = pg_popcount((const char *) &blk->bimg_info,
307 : sizeof(uint8));
308 1 : flags = (Datum *) palloc0(sizeof(Datum) * bitcnt);
309 1 : if ((blk->bimg_info & BKPIMAGE_HAS_HOLE) != 0)
310 1 : flags[cnt++] = CStringGetTextDatum("HAS_HOLE");
311 1 : if (blk->apply_image)
312 1 : flags[cnt++] = CStringGetTextDatum("APPLY");
313 1 : if ((blk->bimg_info & BKPIMAGE_COMPRESS_PGLZ) != 0)
314 UNC 0 : flags[cnt++] = CStringGetTextDatum("COMPRESS_PGLZ");
315 GNC 1 : if ((blk->bimg_info & BKPIMAGE_COMPRESS_LZ4) != 0)
316 UNC 0 : flags[cnt++] = CStringGetTextDatum("COMPRESS_LZ4");
317 GNC 1 : if ((blk->bimg_info & BKPIMAGE_COMPRESS_ZSTD) != 0)
318 UNC 0 : flags[cnt++] = CStringGetTextDatum("COMPRESS_ZSTD");
319 :
320 GNC 1 : Assert(cnt <= bitcnt);
321 1 : block_fpi_info = construct_array_builtin(flags, cnt, TEXTOID);
322 : }
323 :
324 : /* start_lsn, end_lsn, prev_lsn, and blockid outputs */
325 18 : values[i++] = LSNGetDatum(record->ReadRecPtr);
326 18 : values[i++] = LSNGetDatum(record->EndRecPtr);
327 18 : values[i++] = LSNGetDatum(XLogRecGetPrev(record));
328 18 : values[i++] = Int16GetDatum(block_id);
329 :
330 : /* relfile and block related outputs */
331 18 : values[i++] = ObjectIdGetDatum(blk->rlocator.spcOid);
332 18 : values[i++] = ObjectIdGetDatum(blk->rlocator.dbOid);
333 18 : values[i++] = ObjectIdGetDatum(blk->rlocator.relNumber);
334 18 : values[i++] = Int16GetDatum(forknum);
335 18 : values[i++] = Int64GetDatum((int64) blkno);
336 :
337 : /* xid, resource_manager, and record_type outputs */
338 18 : values[i++] = TransactionIdGetDatum(XLogRecGetXid(record));
339 18 : values[i++] = CStringGetTextDatum(desc.rm_name);
340 18 : values[i++] = CStringGetTextDatum(record_type);
341 :
342 : /*
343 : * record_length, main_data_length, block_data_len, and
344 : * block_fpi_length outputs
345 : */
346 18 : values[i++] = UInt32GetDatum(XLogRecGetTotalLen(record));
347 18 : values[i++] = UInt32GetDatum(XLogRecGetDataLen(record));
348 18 : values[i++] = UInt32GetDatum(block_data_len);
349 18 : values[i++] = UInt32GetDatum(block_fpi_len);
350 :
351 : /* block_fpi_info (text array) output */
352 18 : if (block_fpi_info)
353 1 : values[i++] = PointerGetDatum(block_fpi_info);
354 : else
355 17 : nulls[i++] = true;
356 :
357 : /* description output (describes WAL record) */
358 18 : if (rec_desc.len > 0)
359 18 : values[i++] = CStringGetTextDatum(rec_desc.data);
360 : else
361 UNC 0 : nulls[i++] = true;
362 :
363 : /* block_data output */
364 GNC 18 : if (blk->has_data && show_data)
365 17 : {
366 : bytea *block_data;
367 :
368 17 : block_data = (bytea *) palloc(block_data_len + VARHDRSZ);
369 17 : SET_VARSIZE(block_data, block_data_len + VARHDRSZ);
370 17 : memcpy(VARDATA(block_data), blk->data, block_data_len);
371 17 : values[i++] = PointerGetDatum(block_data);
372 : }
373 : else
374 1 : nulls[i++] = true;
375 :
376 : /* block_fpi_data output */
377 18 : if (blk->has_image && show_data)
378 1 : {
379 : PGAlignedBlock buf;
380 : Page page;
381 : bytea *block_fpi_data;
382 :
383 1 : page = (Page) buf.data;
384 1 : if (!RestoreBlockImage(record, block_id, page))
385 UNC 0 : ereport(ERROR,
386 : (errcode(ERRCODE_INTERNAL_ERROR),
387 : errmsg_internal("%s", record->errormsg_buf)));
388 :
389 GNC 1 : block_fpi_data = (bytea *) palloc(BLCKSZ + VARHDRSZ);
390 1 : SET_VARSIZE(block_fpi_data, BLCKSZ + VARHDRSZ);
391 1 : memcpy(VARDATA(block_fpi_data), page, BLCKSZ);
392 1 : values[i++] = PointerGetDatum(block_fpi_data);
393 : }
394 : else
395 17 : nulls[i++] = true;
396 :
397 18 : Assert(i == PG_GET_WAL_BLOCK_INFO_COLS);
398 :
399 : /* Store a tuple for this block reference */
400 18 : tuplestore_putvalues(rsinfo->setResult, rsinfo->setDesc,
401 : values, nulls);
402 : }
403 :
404 : #undef PG_GET_WAL_FPI_BLOCK_COLS
405 18 : }
406 :
407 : /*
408 : * Get WAL record info, unnested by block reference
409 : */
410 : Datum
411 7 : pg_get_wal_block_info(PG_FUNCTION_ARGS)
412 : {
413 7 : XLogRecPtr start_lsn = PG_GETARG_LSN(0);
414 7 : XLogRecPtr end_lsn = PG_GETARG_LSN(1);
415 7 : bool show_data = PG_GETARG_BOOL(2);
416 : XLogReaderState *xlogreader;
417 : MemoryContext old_cxt;
418 : MemoryContext tmp_cxt;
419 :
420 7 : ValidateInputLSNs(start_lsn, &end_lsn);
421 :
422 5 : InitMaterializedSRF(fcinfo, 0);
423 :
424 5 : xlogreader = InitXLogReaderState(start_lsn);
425 :
426 4 : tmp_cxt = AllocSetContextCreate(CurrentMemoryContext,
427 : "pg_get_wal_block_info temporary cxt",
428 : ALLOCSET_DEFAULT_SIZES);
429 :
430 29 : while (ReadNextXLogRecord(xlogreader) &&
431 26 : xlogreader->EndRecPtr <= end_lsn)
432 : {
433 25 : CHECK_FOR_INTERRUPTS();
434 :
435 25 : if (!XLogRecHasAnyBlockRefs(xlogreader))
436 7 : continue;
437 :
438 : /* Use the tmp context so we can clean up after each tuple is done */
439 18 : old_cxt = MemoryContextSwitchTo(tmp_cxt);
440 :
441 18 : GetWALBlockInfo(fcinfo, xlogreader, show_data);
442 :
443 : /* clean up and switch back */
444 18 : MemoryContextSwitchTo(old_cxt);
445 18 : MemoryContextReset(tmp_cxt);
446 : }
447 :
448 4 : MemoryContextDelete(tmp_cxt);
449 4 : pfree(xlogreader->private_data);
450 4 : XLogReaderFree(xlogreader);
451 :
452 4 : PG_RETURN_VOID();
453 : }
454 :
455 ECB : /*
456 : * Get WAL record info.
457 : */
458 : Datum
459 GIC 3 : pg_get_wal_record_info(PG_FUNCTION_ARGS)
460 ECB : {
461 : #define PG_GET_WAL_RECORD_INFO_COLS 11
462 : Datum result;
463 GNC 3 : Datum values[PG_GET_WAL_RECORD_INFO_COLS] = {0};
464 3 : bool nulls[PG_GET_WAL_RECORD_INFO_COLS] = {0};
465 ECB : XLogRecPtr lsn;
466 : XLogRecPtr curr_lsn;
467 : XLogReaderState *xlogreader;
468 : TupleDesc tupdesc;
469 : HeapTuple tuple;
470 :
471 GIC 3 : lsn = PG_GETARG_LSN(0);
472 GNC 3 : curr_lsn = GetCurrentLSN();
473 :
474 3 : if (lsn > curr_lsn)
475 CBC 1 : ereport(ERROR,
476 : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
477 : errmsg("WAL input LSN must be less than current LSN"),
478 : errdetail("Current WAL LSN on the database system is at %X/%X.",
479 : LSN_FORMAT_ARGS(curr_lsn))));
480 :
481 : /* Build a tuple descriptor for our result type. */
482 GIC 2 : if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE)
483 UIC 0 : elog(ERROR, "return type must be a row type");
484 ECB :
485 GIC 2 : xlogreader = InitXLogReaderState(lsn);
486 ECB :
487 CBC 1 : if (!ReadNextXLogRecord(xlogreader))
488 UIC 0 : ereport(ERROR,
489 ECB : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
490 EUB : errmsg("could not read WAL at %X/%X",
491 : LSN_FORMAT_ARGS(xlogreader->EndRecPtr))));
492 :
493 CBC 1 : GetWALRecordInfo(xlogreader, values, nulls, PG_GET_WAL_RECORD_INFO_COLS);
494 :
495 GIC 1 : pfree(xlogreader->private_data);
496 1 : XLogReaderFree(xlogreader);
497 :
498 1 : tuple = heap_form_tuple(tupdesc, values, nulls);
499 CBC 1 : result = HeapTupleGetDatum(tuple);
500 ECB :
501 CBC 1 : PG_RETURN_DATUM(result);
502 ECB : #undef PG_GET_WAL_RECORD_INFO_COLS
503 : }
504 :
505 : /*
506 : * Validate start and end LSNs coming from the function inputs.
507 : *
508 : * If end_lsn is found to be higher than the current LSN reported by the
509 : * cluster, use the current LSN as the upper bound.
510 EUB : */
511 : static void
512 GNC 21 : ValidateInputLSNs(XLogRecPtr start_lsn, XLogRecPtr *end_lsn)
513 ECB : {
514 GNC 21 : XLogRecPtr curr_lsn = GetCurrentLSN();
515 :
516 21 : if (start_lsn > curr_lsn)
517 GIC 3 : ereport(ERROR,
518 ECB : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
519 : errmsg("WAL start LSN must be less than current LSN"),
520 : errdetail("Current WAL LSN on the database system is at %X/%X.",
521 : LSN_FORMAT_ARGS(curr_lsn))));
522 EUB :
523 GNC 18 : if (start_lsn > *end_lsn)
524 GBC 3 : ereport(ERROR,
525 ECB : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
526 EUB : errmsg("WAL start LSN must be less than end LSN")));
527 :
528 GNC 15 : if (*end_lsn > curr_lsn)
529 3 : *end_lsn = curr_lsn;
530 CBC 15 : }
531 :
532 : /*
533 : * Get info of all WAL records between start LSN and end LSN.
534 ECB : */
535 : static void
536 CBC 8 : GetWALRecordsInfo(FunctionCallInfo fcinfo, XLogRecPtr start_lsn,
537 ECB : XLogRecPtr end_lsn)
538 : {
539 : #define PG_GET_WAL_RECORDS_INFO_COLS 11
540 : XLogReaderState *xlogreader;
541 CBC 8 : ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
542 ECB : MemoryContext old_cxt;
543 : MemoryContext tmp_cxt;
544 :
545 GNC 8 : Assert(start_lsn <= end_lsn);
546 :
547 CBC 8 : InitMaterializedSRF(fcinfo, 0);
548 ECB :
549 CBC 8 : xlogreader = InitXLogReaderState(start_lsn);
550 :
551 GIC 7 : tmp_cxt = AllocSetContextCreate(CurrentMemoryContext,
552 ECB : "GetWALRecordsInfo temporary cxt",
553 : ALLOCSET_DEFAULT_SIZES);
554 :
555 CBC 34290 : while (ReadNextXLogRecord(xlogreader) &&
556 GIC 34287 : xlogreader->EndRecPtr <= end_lsn)
557 : {
558 GNC 34283 : Datum values[PG_GET_WAL_RECORDS_INFO_COLS] = {0};
559 34283 : bool nulls[PG_GET_WAL_RECORDS_INFO_COLS] = {0};
560 :
561 ECB : /* Use the tmp context so we can clean up after each tuple is done */
562 CBC 34283 : old_cxt = MemoryContextSwitchTo(tmp_cxt);
563 :
564 34283 : GetWALRecordInfo(xlogreader, values, nulls,
565 : PG_GET_WAL_RECORDS_INFO_COLS);
566 :
567 34283 : tuplestore_putvalues(rsinfo->setResult, rsinfo->setDesc,
568 ECB : values, nulls);
569 :
570 EUB : /* clean up and switch back */
571 GIC 34283 : MemoryContextSwitchTo(old_cxt);
572 34283 : MemoryContextReset(tmp_cxt);
573 ECB :
574 CBC 34283 : CHECK_FOR_INTERRUPTS();
575 : }
576 :
577 7 : MemoryContextDelete(tmp_cxt);
578 7 : pfree(xlogreader->private_data);
579 7 : XLogReaderFree(xlogreader);
580 ECB :
581 : #undef PG_GET_WAL_RECORDS_INFO_COLS
582 GIC 7 : }
583 ECB :
584 : /*
585 : * Get info of all WAL records between start LSN and end LSN.
586 : */
587 : Datum
588 GIC 9 : pg_get_wal_records_info(PG_FUNCTION_ARGS)
589 ECB : {
590 GNC 9 : XLogRecPtr start_lsn = PG_GETARG_LSN(0);
591 9 : XLogRecPtr end_lsn = PG_GETARG_LSN(1);
592 :
593 9 : ValidateInputLSNs(start_lsn, &end_lsn);
594 CBC 7 : GetWALRecordsInfo(fcinfo, start_lsn, end_lsn);
595 ECB :
596 CBC 6 : PG_RETURN_VOID();
597 : }
598 :
599 : /*
600 : * Fill single row of record counts and sizes for an rmgr or record.
601 ECB : */
602 : static void
603 CBC 66 : FillXLogStatsRow(const char *name,
604 : uint64 n, uint64 total_count,
605 ECB : uint64 rec_len, uint64 total_rec_len,
606 : uint64 fpi_len, uint64 total_fpi_len,
607 : uint64 tot_len, uint64 total_len,
608 : Datum *values, bool *nulls, uint32 ncols)
609 : {
610 : double n_pct,
611 : rec_len_pct,
612 : fpi_len_pct,
613 : tot_len_pct;
614 CBC 66 : int i = 0;
615 :
616 66 : n_pct = 0;
617 66 : if (total_count != 0)
618 GIC 66 : n_pct = 100 * (double) n / total_count;
619 :
620 CBC 66 : rec_len_pct = 0;
621 GIC 66 : if (total_rec_len != 0)
622 CBC 66 : rec_len_pct = 100 * (double) rec_len / total_rec_len;
623 :
624 GIC 66 : fpi_len_pct = 0;
625 CBC 66 : if (total_fpi_len != 0)
626 LBC 0 : fpi_len_pct = 100 * (double) fpi_len / total_fpi_len;
627 :
628 GIC 66 : tot_len_pct = 0;
629 CBC 66 : if (total_len != 0)
630 66 : tot_len_pct = 100 * (double) tot_len / total_len;
631 ECB :
632 GIC 66 : values[i++] = CStringGetTextDatum(name);
633 CBC 66 : values[i++] = Int64GetDatum(n);
634 GIC 66 : values[i++] = Float8GetDatum(n_pct);
635 66 : values[i++] = Int64GetDatum(rec_len);
636 66 : values[i++] = Float8GetDatum(rec_len_pct);
637 66 : values[i++] = Int64GetDatum(fpi_len);
638 66 : values[i++] = Float8GetDatum(fpi_len_pct);
639 66 : values[i++] = Int64GetDatum(tot_len);
640 CBC 66 : values[i++] = Float8GetDatum(tot_len_pct);
641 :
642 GIC 66 : Assert(i == ncols);
643 66 : }
644 ECB :
645 : /*
646 : * Get summary statistics about the records seen so far.
647 : */
648 : static void
649 GIC 3 : GetXLogSummaryStats(XLogStats *stats, ReturnSetInfo *rsinfo,
650 : Datum *values, bool *nulls, uint32 ncols,
651 : bool stats_per_record)
652 ECB : {
653 : MemoryContext old_cxt;
654 : MemoryContext tmp_cxt;
655 CBC 3 : uint64 total_count = 0;
656 GIC 3 : uint64 total_rec_len = 0;
657 CBC 3 : uint64 total_fpi_len = 0;
658 3 : uint64 total_len = 0;
659 : int ri;
660 :
661 : /*
662 : * Each row shows its percentages of the total, so make a first pass to
663 : * calculate column totals.
664 : */
665 771 : for (ri = 0; ri <= RM_MAX_ID; ri++)
666 EUB : {
667 GIC 768 : if (!RmgrIdIsValid(ri))
668 CBC 318 : continue;
669 :
670 450 : total_count += stats->rmgr_stats[ri].count;
671 GBC 450 : total_rec_len += stats->rmgr_stats[ri].rec_len;
672 GIC 450 : total_fpi_len += stats->rmgr_stats[ri].fpi_len;
673 : }
674 3 : total_len = total_rec_len + total_fpi_len;
675 :
676 GNC 3 : tmp_cxt = AllocSetContextCreate(CurrentMemoryContext,
677 : "GetXLogSummaryStats temporary cxt",
678 : ALLOCSET_DEFAULT_SIZES);
679 :
680 CBC 771 : for (ri = 0; ri <= RM_MAX_ID; ri++)
681 : {
682 ECB : uint64 count;
683 : uint64 rec_len;
684 : uint64 fpi_len;
685 : uint64 tot_len;
686 : RmgrData desc;
687 :
688 CBC 768 : if (!RmgrIdIsValid(ri))
689 GIC 702 : continue;
690 :
691 450 : if (!RmgrIdExists(ri))
692 384 : continue;
693 :
694 66 : desc = GetRmgr(ri);
695 :
696 66 : if (stats_per_record)
697 : {
698 : int rj;
699 ECB :
700 UIC 0 : for (rj = 0; rj < MAX_XLINFO_TYPES; rj++)
701 ECB : {
702 : const char *id;
703 :
704 LBC 0 : count = stats->record_stats[ri][rj].count;
705 UIC 0 : rec_len = stats->record_stats[ri][rj].rec_len;
706 0 : fpi_len = stats->record_stats[ri][rj].fpi_len;
707 0 : tot_len = rec_len + fpi_len;
708 :
709 : /* Skip undefined combinations and ones that didn't occur */
710 LBC 0 : if (count == 0)
711 0 : continue;
712 :
713 UNC 0 : old_cxt = MemoryContextSwitchTo(tmp_cxt);
714 :
715 : /* the upper four bits in xl_info are the rmgr's */
716 UIC 0 : id = desc.rm_identify(rj << 4);
717 LBC 0 : if (id == NULL)
718 0 : id = psprintf("UNKNOWN (%x)", rj << 4);
719 ECB :
720 UIC 0 : FillXLogStatsRow(psprintf("%s/%s", desc.rm_name, id), count,
721 : total_count, rec_len, total_rec_len, fpi_len,
722 : total_fpi_len, tot_len, total_len,
723 : values, nulls, ncols);
724 :
725 LBC 0 : tuplestore_putvalues(rsinfo->setResult, rsinfo->setDesc,
726 : values, nulls);
727 :
728 : /* clean up and switch back */
729 UNC 0 : MemoryContextSwitchTo(old_cxt);
730 0 : MemoryContextReset(tmp_cxt);
731 : }
732 : }
733 : else
734 ECB : {
735 GIC 66 : count = stats->rmgr_stats[ri].count;
736 66 : rec_len = stats->rmgr_stats[ri].rec_len;
737 66 : fpi_len = stats->rmgr_stats[ri].fpi_len;
738 CBC 66 : tot_len = rec_len + fpi_len;
739 :
740 GNC 66 : old_cxt = MemoryContextSwitchTo(tmp_cxt);
741 :
742 CBC 66 : FillXLogStatsRow(desc.rm_name, count, total_count, rec_len,
743 : total_rec_len, fpi_len, total_fpi_len, tot_len,
744 ECB : total_len, values, nulls, ncols);
745 :
746 CBC 66 : tuplestore_putvalues(rsinfo->setResult, rsinfo->setDesc,
747 : values, nulls);
748 :
749 : /* clean up and switch back */
750 GNC 66 : MemoryContextSwitchTo(old_cxt);
751 66 : MemoryContextReset(tmp_cxt);
752 : }
753 : }
754 :
755 3 : MemoryContextDelete(tmp_cxt);
756 CBC 3 : }
757 ECB :
758 : /*
759 : * Get WAL stats between start LSN and end LSN.
760 : */
761 : static void
762 GNC 4 : GetWalStats(FunctionCallInfo fcinfo, XLogRecPtr start_lsn, XLogRecPtr end_lsn,
763 : bool stats_per_record)
764 : {
765 ECB : #define PG_GET_WAL_STATS_COLS 9
766 : XLogReaderState *xlogreader;
767 GNC 4 : XLogStats stats = {0};
768 CBC 4 : ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
769 GNC 4 : Datum values[PG_GET_WAL_STATS_COLS] = {0};
770 4 : bool nulls[PG_GET_WAL_STATS_COLS] = {0};
771 :
772 4 : Assert(start_lsn <= end_lsn);
773 :
774 CBC 4 : InitMaterializedSRF(fcinfo, 0);
775 ECB :
776 GIC 4 : xlogreader = InitXLogReaderState(start_lsn);
777 ECB :
778 CBC 22 : while (ReadNextXLogRecord(xlogreader) &&
779 17 : xlogreader->EndRecPtr <= end_lsn)
780 ECB : {
781 GIC 16 : XLogRecStoreStats(&stats, xlogreader);
782 :
783 CBC 16 : CHECK_FOR_INTERRUPTS();
784 : }
785 :
786 GIC 3 : pfree(xlogreader->private_data);
787 3 : XLogReaderFree(xlogreader);
788 :
789 CBC 3 : GetXLogSummaryStats(&stats, rsinfo, values, nulls,
790 : PG_GET_WAL_STATS_COLS,
791 ECB : stats_per_record);
792 :
793 : #undef PG_GET_WAL_STATS_COLS
794 CBC 3 : }
795 :
796 : /*
797 : * Get stats of all WAL records between start LSN and end LSN.
798 ECB : */
799 : Datum
800 GIC 5 : pg_get_wal_stats(PG_FUNCTION_ARGS)
801 : {
802 GNC 5 : XLogRecPtr start_lsn = PG_GETARG_LSN(0);
803 5 : XLogRecPtr end_lsn = PG_GETARG_LSN(1);
804 5 : bool stats_per_record = PG_GETARG_BOOL(2);
805 ECB :
806 GNC 5 : ValidateInputLSNs(start_lsn, &end_lsn);
807 CBC 3 : GetWalStats(fcinfo, start_lsn, end_lsn, stats_per_record);
808 ECB :
809 GIC 2 : PG_RETURN_VOID();
810 ECB : }
811 :
812 : /*
813 : * The following functions have been removed in newer versions in 1.1, but
814 : * they are kept around for compatibility.
815 : */
816 : Datum
817 GNC 2 : pg_get_wal_records_info_till_end_of_wal(PG_FUNCTION_ARGS)
818 : {
819 2 : XLogRecPtr start_lsn = PG_GETARG_LSN(0);
820 2 : XLogRecPtr end_lsn = GetCurrentLSN();
821 :
822 2 : if (start_lsn > end_lsn)
823 1 : ereport(ERROR,
824 : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
825 : errmsg("WAL start LSN must be less than current LSN"),
826 : errdetail("Current WAL LSN on the database system is at %X/%X.",
827 : LSN_FORMAT_ARGS(end_lsn))));
828 :
829 1 : GetWALRecordsInfo(fcinfo, start_lsn, end_lsn);
830 :
831 1 : PG_RETURN_VOID();
832 : }
833 :
834 ECB : Datum
835 CBC 2 : pg_get_wal_stats_till_end_of_wal(PG_FUNCTION_ARGS)
836 ECB : {
837 GNC 2 : XLogRecPtr start_lsn = PG_GETARG_LSN(0);
838 2 : XLogRecPtr end_lsn = GetCurrentLSN();
839 2 : bool stats_per_record = PG_GETARG_BOOL(1);
840 ECB :
841 GNC 2 : if (start_lsn > end_lsn)
842 1 : ereport(ERROR,
843 : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
844 : errmsg("WAL start LSN must be less than current LSN"),
845 : errdetail("Current WAL LSN on the database system is at %X/%X.",
846 : LSN_FORMAT_ARGS(end_lsn))));
847 ECB :
848 CBC 1 : GetWalStats(fcinfo, start_lsn, end_lsn, stats_per_record);
849 :
850 1 : PG_RETURN_VOID();
851 ECB : }
|