Age Owner TLA Line data Source code
1 : /*-------------------------------------------------------------------------
2 : *
3 : * xlogprefetcher.c
4 : * Prefetching support for recovery.
5 : *
6 : * Portions Copyright (c) 2022-2023, PostgreSQL Global Development Group
7 : * Portions Copyright (c) 1994, Regents of the University of California
8 : *
9 : *
10 : * IDENTIFICATION
11 : * src/backend/access/transam/xlogprefetcher.c
12 : *
13 : * This module provides a drop-in replacement for an XLogReader that tries to
14 : * minimize I/O stalls by looking ahead in the WAL. If blocks that will be
15 : * accessed in the near future are not already in the buffer pool, it initiates
16 : * I/Os that might complete before the caller eventually needs the data. When
17 : * referenced blocks are found in the buffer pool already, the buffer is
18 : * recorded in the decoded record so that XLogReadBufferForRedo() can try to
19 : * avoid a second buffer mapping table lookup.
20 : *
21 : * Currently, only the main fork is considered for prefetching. Currently,
22 : * prefetching is only effective on systems where PrefetchBuffer() does
23 : * something useful (mainly Linux).
24 : *
25 : *-------------------------------------------------------------------------
26 : */
27 :
28 : #include "postgres.h"
29 :
30 : #include "access/xlog.h"
31 : #include "access/xlogprefetcher.h"
32 : #include "access/xlogreader.h"
33 : #include "access/xlogutils.h"
34 : #include "catalog/pg_class.h"
35 : #include "catalog/pg_control.h"
36 : #include "catalog/storage_xlog.h"
37 : #include "commands/dbcommands_xlog.h"
38 : #include "utils/fmgrprotos.h"
39 : #include "utils/timestamp.h"
40 : #include "funcapi.h"
41 : #include "pgstat.h"
42 : #include "miscadmin.h"
43 : #include "port/atomics.h"
44 : #include "storage/bufmgr.h"
45 : #include "storage/shmem.h"
46 : #include "storage/smgr.h"
47 : #include "utils/guc_hooks.h"
48 : #include "utils/hsearch.h"
49 :
50 : /*
51 : * Every time we process this much WAL, we'll update the values in
52 : * pg_stat_recovery_prefetch.
53 : */
54 : #define XLOGPREFETCHER_STATS_DISTANCE BLCKSZ
55 :
56 : /*
57 : * To detect repeated access to the same block and skip useless extra system
58 : * calls, we remember a small window of recently prefetched blocks.
59 : */
60 : #define XLOGPREFETCHER_SEQ_WINDOW_SIZE 4
61 :
62 : /*
63 : * When maintenance_io_concurrency is not saturated, we're prepared to look
64 : * ahead up to N times that number of block references.
65 : */
66 : #define XLOGPREFETCHER_DISTANCE_MULTIPLIER 4
67 :
68 : /* Define to log internal debugging messages. */
69 : /* #define XLOGPREFETCHER_DEBUG_LEVEL LOG */
70 :
71 : /* GUCs */
72 : int recovery_prefetch = RECOVERY_PREFETCH_TRY;
73 :
74 : #ifdef USE_PREFETCH
75 : #define RecoveryPrefetchEnabled() \
76 : (recovery_prefetch != RECOVERY_PREFETCH_OFF && \
77 : maintenance_io_concurrency > 0)
78 : #else
79 : #define RecoveryPrefetchEnabled() false
80 : #endif
81 :
82 : static int XLogPrefetchReconfigureCount = 0;
83 :
84 : /*
85 : * Enum used to report whether an IO should be started.
86 : */
87 : typedef enum
88 : {
89 : LRQ_NEXT_NO_IO,
90 : LRQ_NEXT_IO,
91 : LRQ_NEXT_AGAIN
92 : } LsnReadQueueNextStatus;
93 :
94 : /*
95 : * Type of callback that can decide which block to prefetch next. For now
96 : * there is only one.
97 : */
98 : typedef LsnReadQueueNextStatus (*LsnReadQueueNextFun) (uintptr_t lrq_private,
99 : XLogRecPtr *lsn);
100 :
101 : /*
102 : * A simple circular queue of LSNs, using to control the number of
103 : * (potentially) inflight IOs. This stands in for a later more general IO
104 : * control mechanism, which is why it has the apparently unnecessary
105 : * indirection through a function pointer.
106 : */
107 : typedef struct LsnReadQueue
108 : {
109 : LsnReadQueueNextFun next;
110 : uintptr_t lrq_private;
111 : uint32 max_inflight;
112 : uint32 inflight;
113 : uint32 completed;
114 : uint32 head;
115 : uint32 tail;
116 : uint32 size;
117 : struct
118 : {
119 : bool io;
120 : XLogRecPtr lsn;
121 : } queue[FLEXIBLE_ARRAY_MEMBER];
122 : } LsnReadQueue;
123 :
124 : /*
125 : * A prefetcher. This is a mechanism that wraps an XLogReader, prefetching
126 : * blocks that will be soon be referenced, to try to avoid IO stalls.
127 : */
128 : struct XLogPrefetcher
129 : {
130 : /* WAL reader and current reading state. */
131 : XLogReaderState *reader;
132 : DecodedXLogRecord *record;
133 : int next_block_id;
134 :
135 : /* When to publish stats. */
136 : XLogRecPtr next_stats_shm_lsn;
137 :
138 : /* Book-keeping to avoid accessing blocks that don't exist yet. */
139 : HTAB *filter_table;
140 : dlist_head filter_queue;
141 :
142 : /* Book-keeping to avoid repeat prefetches. */
143 : RelFileLocator recent_rlocator[XLOGPREFETCHER_SEQ_WINDOW_SIZE];
144 : BlockNumber recent_block[XLOGPREFETCHER_SEQ_WINDOW_SIZE];
145 : int recent_idx;
146 :
147 : /* Book-keeping to disable prefetching temporarily. */
148 : XLogRecPtr no_readahead_until;
149 :
150 : /* IO depth manager. */
151 : LsnReadQueue *streaming_read;
152 :
153 : XLogRecPtr begin_ptr;
154 :
155 : int reconfigure_count;
156 : };
157 :
158 : /*
159 : * A temporary filter used to track block ranges that haven't been created
160 : * yet, whole relations that haven't been created yet, and whole relations
161 : * that (we assume) have already been dropped, or will be created by bulk WAL
162 : * operators.
163 : */
164 : typedef struct XLogPrefetcherFilter
165 : {
166 : RelFileLocator rlocator;
167 : XLogRecPtr filter_until_replayed;
168 : BlockNumber filter_from_block;
169 : dlist_node link;
170 : } XLogPrefetcherFilter;
171 :
172 : /*
173 : * Counters exposed in shared memory for pg_stat_recovery_prefetch.
174 : */
175 : typedef struct XLogPrefetchStats
176 : {
177 : pg_atomic_uint64 reset_time; /* Time of last reset. */
178 : pg_atomic_uint64 prefetch; /* Prefetches initiated. */
179 : pg_atomic_uint64 hit; /* Blocks already in cache. */
180 : pg_atomic_uint64 skip_init; /* Zero-inited blocks skipped. */
181 : pg_atomic_uint64 skip_new; /* New/missing blocks filtered. */
182 : pg_atomic_uint64 skip_fpw; /* FPWs skipped. */
183 : pg_atomic_uint64 skip_rep; /* Repeat accesses skipped. */
184 :
185 : /* Dynamic values */
186 : int wal_distance; /* Number of WAL bytes ahead. */
187 : int block_distance; /* Number of block references ahead. */
188 : int io_depth; /* Number of I/Os in progress. */
189 : } XLogPrefetchStats;
190 :
191 : static inline void XLogPrefetcherAddFilter(XLogPrefetcher *prefetcher,
192 : RelFileLocator rlocator,
193 : BlockNumber blockno,
194 : XLogRecPtr lsn);
195 : static inline bool XLogPrefetcherIsFiltered(XLogPrefetcher *prefetcher,
196 : RelFileLocator rlocator,
197 : BlockNumber blockno);
198 : static inline void XLogPrefetcherCompleteFilters(XLogPrefetcher *prefetcher,
199 : XLogRecPtr replaying_lsn);
200 : static LsnReadQueueNextStatus XLogPrefetcherNextBlock(uintptr_t pgsr_private,
201 : XLogRecPtr *lsn);
202 :
203 : static XLogPrefetchStats *SharedStats;
204 :
205 : static inline LsnReadQueue *
367 tmunro 206 CBC 2457 : lrq_alloc(uint32 max_distance,
207 : uint32 max_inflight,
208 : uintptr_t lrq_private,
209 : LsnReadQueueNextFun next)
210 : {
211 : LsnReadQueue *lrq;
212 : uint32 size;
213 :
214 2457 : Assert(max_distance >= max_inflight);
215 :
216 2457 : size = max_distance + 1; /* full ring buffer has a gap */
217 2457 : lrq = palloc(offsetof(LsnReadQueue, queue) + sizeof(lrq->queue[0]) * size);
218 2457 : lrq->lrq_private = lrq_private;
219 2457 : lrq->max_inflight = max_inflight;
220 2457 : lrq->size = size;
221 2457 : lrq->next = next;
222 2457 : lrq->head = 0;
223 2457 : lrq->tail = 0;
224 2457 : lrq->inflight = 0;
225 2457 : lrq->completed = 0;
226 :
227 2457 : return lrq;
228 : }
229 :
230 : static inline void
231 2423 : lrq_free(LsnReadQueue *lrq)
232 : {
233 2423 : pfree(lrq);
234 2423 : }
235 :
236 : static inline uint32
237 57369 : lrq_inflight(LsnReadQueue *lrq)
238 : {
239 57369 : return lrq->inflight;
240 : }
241 :
242 : static inline uint32
243 57369 : lrq_completed(LsnReadQueue *lrq)
244 : {
245 57369 : return lrq->completed;
246 : }
247 :
248 : static inline void
249 2506697 : lrq_prefetch(LsnReadQueue *lrq)
250 : {
251 : /* Try to start as many IOs as we can within our limits. */
252 7668089 : while (lrq->inflight < lrq->max_inflight &&
253 5149006 : lrq->inflight + lrq->completed < lrq->size - 1)
254 : {
255 2997153 : Assert(((lrq->head + 1) % lrq->size) != lrq->tail);
256 2997153 : switch (lrq->next(lrq->lrq_private, &lrq->queue[lrq->head].lsn))
257 : {
258 342427 : case LRQ_NEXT_AGAIN:
259 342427 : return;
260 40647 : case LRQ_NEXT_IO:
261 40647 : lrq->queue[lrq->head].io = true;
262 40647 : lrq->inflight++;
263 40647 : break;
264 2614048 : case LRQ_NEXT_NO_IO:
265 2614048 : lrq->queue[lrq->head].io = false;
266 2614048 : lrq->completed++;
267 2614048 : break;
268 : }
269 2654695 : lrq->head++;
270 2654695 : if (lrq->head == lrq->size)
271 64701 : lrq->head = 0;
272 : }
273 : }
274 :
275 : static inline void
276 2506683 : lrq_complete_lsn(LsnReadQueue *lrq, XLogRecPtr lsn)
277 : {
278 : /*
279 : * We know that LSNs before 'lsn' have been replayed, so we can now assume
280 : * that any IOs that were started before then have finished.
281 : */
282 7668057 : while (lrq->tail != lrq->head &&
283 5119675 : lrq->queue[lrq->tail].lsn < lsn)
284 : {
285 2654691 : if (lrq->queue[lrq->tail].io)
286 40647 : lrq->inflight--;
287 : else
288 2614044 : lrq->completed--;
289 2654691 : lrq->tail++;
290 2654691 : if (lrq->tail == lrq->size)
291 64701 : lrq->tail = 0;
292 : }
293 2506683 : if (RecoveryPrefetchEnabled())
294 2506683 : lrq_prefetch(lrq);
295 2506652 : }
296 :
297 : size_t
298 2738 : XLogPrefetchShmemSize(void)
299 : {
300 2738 : return sizeof(XLogPrefetchStats);
301 : }
302 :
303 : /*
304 : * Reset all counters to zero.
305 : */
306 : void
367 tmunro 307 UBC 0 : XLogPrefetchResetStats(void)
308 : {
309 0 : pg_atomic_write_u64(&SharedStats->reset_time, GetCurrentTimestamp());
310 0 : pg_atomic_write_u64(&SharedStats->prefetch, 0);
311 0 : pg_atomic_write_u64(&SharedStats->hit, 0);
312 0 : pg_atomic_write_u64(&SharedStats->skip_init, 0);
313 0 : pg_atomic_write_u64(&SharedStats->skip_new, 0);
314 0 : pg_atomic_write_u64(&SharedStats->skip_fpw, 0);
315 0 : pg_atomic_write_u64(&SharedStats->skip_rep, 0);
316 0 : }
317 :
318 : void
367 tmunro 319 CBC 1826 : XLogPrefetchShmemInit(void)
320 : {
321 : bool found;
322 :
323 1826 : SharedStats = (XLogPrefetchStats *)
324 1826 : ShmemInitStruct("XLogPrefetchStats",
325 : sizeof(XLogPrefetchStats),
326 : &found);
327 :
328 1826 : if (!found)
329 : {
330 1826 : pg_atomic_init_u64(&SharedStats->reset_time, GetCurrentTimestamp());
331 1826 : pg_atomic_init_u64(&SharedStats->prefetch, 0);
332 1826 : pg_atomic_init_u64(&SharedStats->hit, 0);
333 1826 : pg_atomic_init_u64(&SharedStats->skip_init, 0);
334 1826 : pg_atomic_init_u64(&SharedStats->skip_new, 0);
335 1826 : pg_atomic_init_u64(&SharedStats->skip_fpw, 0);
336 1826 : pg_atomic_init_u64(&SharedStats->skip_rep, 0);
337 : }
338 1826 : }
339 :
340 : /*
341 : * Called when any GUC is changed that affects prefetching.
342 : */
343 : void
344 9 : XLogPrefetchReconfigure(void)
345 : {
346 9 : XLogPrefetchReconfigureCount++;
347 9 : }
348 :
349 : /*
350 : * Increment a counter in shared memory. This is equivalent to *counter++ on a
351 : * plain uint64 without any memory barrier or locking, except on platforms
352 : * where readers can't read uint64 without possibly observing a torn value.
353 : */
354 : static inline void
355 2647357 : XLogPrefetchIncrement(pg_atomic_uint64 *counter)
356 : {
357 2647357 : Assert(AmStartupProcess() || !IsUnderPostmaster);
358 2647357 : pg_atomic_write_u64(counter, pg_atomic_read_u64(counter) + 1);
359 2647357 : }
360 :
361 : /*
362 : * Create a prefetcher that is ready to begin prefetching blocks referenced by
363 : * WAL records.
364 : */
365 : XLogPrefetcher *
366 1176 : XLogPrefetcherAllocate(XLogReaderState *reader)
367 : {
368 : XLogPrefetcher *prefetcher;
369 : static HASHCTL hash_table_ctl = {
370 : .keysize = sizeof(RelFileLocator),
371 : .entrysize = sizeof(XLogPrefetcherFilter)
372 : };
373 :
374 1176 : prefetcher = palloc0(sizeof(XLogPrefetcher));
375 :
376 1176 : prefetcher->reader = reader;
377 1176 : prefetcher->filter_table = hash_create("XLogPrefetcherFilterTable", 1024,
378 : &hash_table_ctl,
379 : HASH_ELEM | HASH_BLOBS);
380 1176 : dlist_init(&prefetcher->filter_queue);
381 :
382 1176 : SharedStats->wal_distance = 0;
383 1176 : SharedStats->block_distance = 0;
384 1176 : SharedStats->io_depth = 0;
385 :
386 : /* First usage will cause streaming_read to be allocated. */
387 1176 : prefetcher->reconfigure_count = XLogPrefetchReconfigureCount - 1;
388 :
389 1176 : return prefetcher;
390 : }
391 :
392 : /*
393 : * Destroy a prefetcher and release all resources.
394 : */
395 : void
396 1142 : XLogPrefetcherFree(XLogPrefetcher *prefetcher)
397 : {
398 1142 : lrq_free(prefetcher->streaming_read);
399 1142 : hash_destroy(prefetcher->filter_table);
400 1142 : pfree(prefetcher);
401 1142 : }
402 :
403 : /*
404 : * Provide access to the reader.
405 : */
406 : XLogReaderState *
407 2506604 : XLogPrefetcherGetReader(XLogPrefetcher *prefetcher)
408 : {
409 2506604 : return prefetcher->reader;
410 : }
411 :
412 : /*
413 : * Update the statistics visible in the pg_stat_recovery_prefetch view.
414 : */
415 : void
416 57355 : XLogPrefetcherComputeStats(XLogPrefetcher *prefetcher)
417 : {
418 : uint32 io_depth;
419 : uint32 completed;
420 : int64 wal_distance;
421 :
422 :
423 : /* How far ahead of replay are we now? */
424 57355 : if (prefetcher->reader->decode_queue_tail)
425 : {
426 44411 : wal_distance =
427 44411 : prefetcher->reader->decode_queue_tail->lsn -
428 44411 : prefetcher->reader->decode_queue_head->lsn;
429 : }
430 : else
431 : {
432 12944 : wal_distance = 0;
433 : }
434 :
435 : /* How many IOs are currently in flight and completed? */
436 57355 : io_depth = lrq_inflight(prefetcher->streaming_read);
437 57355 : completed = lrq_completed(prefetcher->streaming_read);
438 :
439 : /* Update the instantaneous stats visible in pg_stat_recovery_prefetch. */
440 57355 : SharedStats->io_depth = io_depth;
441 57355 : SharedStats->block_distance = io_depth + completed;
442 57355 : SharedStats->wal_distance = wal_distance;
443 :
444 57355 : prefetcher->next_stats_shm_lsn =
445 57355 : prefetcher->reader->ReadRecPtr + XLOGPREFETCHER_STATS_DISTANCE;
446 57355 : }
447 :
448 : /*
449 : * A callback that examines the next block reference in the WAL, and possibly
450 : * starts an IO so that a later read will be fast.
451 : *
452 : * Returns LRQ_NEXT_AGAIN if no more WAL data is available yet.
453 : *
454 : * Returns LRQ_NEXT_IO if the next block reference is for a main fork block
455 : * that isn't in the buffer pool, and the kernel has been asked to start
456 : * reading it to make a future read system call faster. An LSN is written to
457 : * *lsn, and the I/O will be considered to have completed once that LSN is
458 : * replayed.
459 : *
460 : * Returns LRQ_NO_IO if we examined the next block reference and found that it
461 : * was already in the buffer pool, or we decided for various reasons not to
462 : * prefetch.
463 : */
464 : static LsnReadQueueNextStatus
465 2997153 : XLogPrefetcherNextBlock(uintptr_t pgsr_private, XLogRecPtr *lsn)
466 : {
467 2997153 : XLogPrefetcher *prefetcher = (XLogPrefetcher *) pgsr_private;
468 2997153 : XLogReaderState *reader = prefetcher->reader;
469 2997153 : XLogRecPtr replaying_lsn = reader->ReadRecPtr;
470 :
471 : /*
472 : * We keep track of the record and block we're up to between calls with
473 : * prefetcher->record and prefetcher->next_block_id.
474 : */
475 : for (;;)
476 2504015 : {
477 : DecodedXLogRecord *record;
478 :
479 : /* Try to read a new future record, if we don't already have one. */
480 5501168 : if (prefetcher->record == NULL)
481 : {
482 : bool nonblocking;
483 :
484 : /*
485 : * If there are already records or an error queued up that could
486 : * be replayed, we don't want to block here. Otherwise, it's OK
487 : * to block waiting for more data: presumably the caller has
488 : * nothing else to do.
489 : */
490 2846473 : nonblocking = XLogReaderHasQueuedRecordOrError(reader);
491 :
492 : /* Readahead is disabled until we replay past a certain point. */
357 493 2846473 : if (nonblocking && replaying_lsn <= prefetcher->no_readahead_until)
367 494 326360 : return LRQ_NEXT_AGAIN;
495 :
496 2520113 : record = XLogReadAhead(prefetcher->reader, nonblocking);
497 2520082 : if (record == NULL)
498 : {
499 : /*
500 : * We can't read any more, due to an error or lack of data in
501 : * nonblocking mode. Don't try to read ahead again until
502 : * we've replayed everything already decoded.
503 : */
357 504 13619 : if (nonblocking && prefetcher->reader->decode_queue_tail)
505 13484 : prefetcher->no_readahead_until =
506 13484 : prefetcher->reader->decode_queue_tail->lsn;
507 :
367 508 13619 : return LRQ_NEXT_AGAIN;
509 : }
510 :
511 : /*
512 : * If prefetching is disabled, we don't need to analyze the record
513 : * or issue any prefetches. We just need to cause one record to
514 : * be decoded.
515 : */
516 2506463 : if (!RecoveryPrefetchEnabled())
517 : {
367 tmunro 518 UBC 0 : *lsn = InvalidXLogRecPtr;
519 0 : return LRQ_NEXT_NO_IO;
520 : }
521 :
522 : /* We have a new record to process. */
367 tmunro 523 CBC 2506463 : prefetcher->record = record;
524 2506463 : prefetcher->next_block_id = 0;
525 : }
526 : else
527 : {
528 : /* Continue to process from last call, or last loop. */
529 2654695 : record = prefetcher->record;
530 : }
531 :
532 : /*
533 : * Check for operations that require us to filter out block ranges, or
534 : * pause readahead completely.
535 : */
536 5161158 : if (replaying_lsn < record->lsn)
537 : {
538 5161158 : uint8 rmid = record->header.xl_rmid;
539 5161158 : uint8 record_type = record->header.xl_info & ~XLR_INFO_MASK;
540 :
541 5161158 : if (rmid == RM_XLOG_ID)
542 : {
543 60464 : if (record_type == XLOG_CHECKPOINT_SHUTDOWN ||
544 : record_type == XLOG_END_OF_RECOVERY)
545 : {
546 : /*
547 : * These records might change the TLI. Avoid potential
548 : * bugs if we were to allow "read TLI" and "replay TLI" to
549 : * differ without more analysis.
550 : */
551 2168 : prefetcher->no_readahead_until = record->lsn;
552 :
553 : #ifdef XLOGPREFETCHER_DEBUG_LEVEL
554 : elog(XLOGPREFETCHER_DEBUG_LEVEL,
555 : "suppressing all readahead until %X/%X is replayed due to possible TLI change",
556 : LSN_FORMAT_ARGS(record->lsn));
557 : #endif
558 :
559 : /* Fall through so we move past this record. */
560 : }
561 : }
562 5100694 : else if (rmid == RM_DBASE_ID)
563 : {
564 : /*
565 : * When databases are created with the file-copy strategy,
566 : * there are no WAL records to tell us about the creation of
567 : * individual relations.
568 : */
569 30 : if (record_type == XLOG_DBASE_CREATE_FILE_COPY)
570 : {
571 3 : xl_dbase_create_file_copy_rec *xlrec =
572 : (xl_dbase_create_file_copy_rec *) record->main_data;
277 rhaas 573 GNC 3 : RelFileLocator rlocator =
574 3 : {InvalidOid, xlrec->db_id, InvalidRelFileNumber};
367 tmunro 575 ECB :
576 : /*
577 : * Don't try to prefetch anything in this database until
578 : * it has been created, or we might confuse the blocks of
579 : * different generations, if a database OID or
580 : * relfilenumber is reused. It's also more efficient than
581 : * discovering that relations don't exist on disk yet with
582 : * ENOENT errors.
583 : */
277 rhaas 584 GNC 3 : XLogPrefetcherAddFilter(prefetcher, rlocator, 0, record->lsn);
367 tmunro 585 ECB :
586 : #ifdef XLOGPREFETCHER_DEBUG_LEVEL
587 : elog(XLOGPREFETCHER_DEBUG_LEVEL,
588 : "suppressing prefetch in database %u until %X/%X is replayed due to raw file copy",
589 : rlocator.dbOid,
590 : LSN_FORMAT_ARGS(record->lsn));
591 : #endif
592 : }
593 : }
367 tmunro 594 GIC 5100664 : else if (rmid == RM_SMGR_ID)
367 tmunro 595 ECB : {
367 tmunro 596 GIC 12961 : if (record_type == XLOG_SMGR_CREATE)
367 tmunro 597 ECB : {
367 tmunro 598 GIC 12919 : xl_smgr_create *xlrec = (xl_smgr_create *)
367 tmunro 599 ECB : record->main_data;
600 :
367 tmunro 601 GIC 12919 : if (xlrec->forkNum == MAIN_FORKNUM)
367 tmunro 602 ECB : {
603 : /*
604 : * Don't prefetch anything for this whole relation
605 : * until it has been created. Otherwise we might
606 : * confuse the blocks of different generations, if a
607 : * relfilenumber is reused. This also avoids the need
608 : * to discover the problem via extra syscalls that
609 : * report ENOENT.
610 : */
277 rhaas 611 GNC 11538 : XLogPrefetcherAddFilter(prefetcher, xlrec->rlocator, 0,
367 tmunro 612 ECB : record->lsn);
613 :
614 : #ifdef XLOGPREFETCHER_DEBUG_LEVEL
615 : elog(XLOGPREFETCHER_DEBUG_LEVEL,
616 : "suppressing prefetch in relation %u/%u/%u until %X/%X is replayed, which creates the relation",
617 : xlrec->rlocator.spcOid,
618 : xlrec->rlocator.dbOid,
619 : xlrec->rlocator.relNumber,
620 : LSN_FORMAT_ARGS(record->lsn));
621 : #endif
622 : }
623 : }
367 tmunro 624 GIC 42 : else if (record_type == XLOG_SMGR_TRUNCATE)
367 tmunro 625 ECB : {
367 tmunro 626 GIC 42 : xl_smgr_truncate *xlrec = (xl_smgr_truncate *)
367 tmunro 627 ECB : record->main_data;
628 :
629 : /*
630 : * Don't consider prefetching anything in the truncated
631 : * range until the truncation has been performed.
632 : */
277 rhaas 633 GNC 42 : XLogPrefetcherAddFilter(prefetcher, xlrec->rlocator,
367 tmunro 634 ECB : xlrec->blkno,
635 : record->lsn);
636 :
637 : #ifdef XLOGPREFETCHER_DEBUG_LEVEL
638 : elog(XLOGPREFETCHER_DEBUG_LEVEL,
639 : "suppressing prefetch in relation %u/%u/%u from block %u until %X/%X is replayed, which truncates the relation",
640 : xlrec->rlocator.spcOid,
641 : xlrec->rlocator.dbOid,
642 : xlrec->rlocator.relNumber,
643 : xlrec->blkno,
644 : LSN_FORMAT_ARGS(record->lsn));
645 : #endif
646 : }
647 : }
648 : }
649 :
650 : /* Scan the block references, starting where we left off last time. */
367 tmunro 651 GIC 5163299 : while (prefetcher->next_block_id <= record->max_block_id)
367 tmunro 652 ECB : {
367 tmunro 653 GIC 2656836 : int block_id = prefetcher->next_block_id++;
367 tmunro 654 CBC 2656836 : DecodedBkpBlock *block = &record->blocks[block_id];
367 tmunro 655 ECB : SMgrRelation reln;
656 : PrefetchBufferResult result;
657 :
367 tmunro 658 GIC 2656836 : if (!block->in_use)
367 tmunro 659 CBC 1983 : continue;
367 tmunro 660 ECB :
229 john.naylor 661 GNC 2654853 : Assert(!BufferIsValid(block->prefetch_buffer));
367 tmunro 662 ECB :
663 : /*
664 : * Record the LSN of this record. When it's replayed,
665 : * LsnReadQueue will consider any IOs submitted for earlier LSNs
666 : * to be finished.
667 : */
367 tmunro 668 GIC 2654853 : *lsn = record->lsn;
367 tmunro 669 ECB :
670 : /* We don't try to prefetch anything but the main fork for now. */
367 tmunro 671 GIC 2654853 : if (block->forknum != MAIN_FORKNUM)
367 tmunro 672 ECB : {
367 tmunro 673 GIC 2654695 : return LRQ_NEXT_NO_IO;
367 tmunro 674 ECB : }
675 :
676 : /*
677 : * If there is a full page image attached, we won't be reading the
678 : * page, so don't bother trying to prefetch.
679 : */
367 tmunro 680 GIC 2647515 : if (block->has_image)
367 tmunro 681 ECB : {
367 tmunro 682 GIC 32632 : XLogPrefetchIncrement(&SharedStats->skip_fpw);
367 tmunro 683 CBC 32632 : return LRQ_NEXT_NO_IO;
367 tmunro 684 ECB : }
685 :
686 : /* There is no point in reading a page that will be zeroed. */
367 tmunro 687 GIC 2614883 : if (block->flags & BKPBLOCK_WILL_INIT)
367 tmunro 688 ECB : {
367 tmunro 689 GIC 50233 : XLogPrefetchIncrement(&SharedStats->skip_init);
367 tmunro 690 CBC 50233 : return LRQ_NEXT_NO_IO;
367 tmunro 691 ECB : }
692 :
693 : /* Should we skip prefetching this block due to a filter? */
277 rhaas 694 GNC 2564650 : if (XLogPrefetcherIsFiltered(prefetcher, block->rlocator, block->blkno))
367 tmunro 695 ECB : {
367 tmunro 696 GIC 315982 : XLogPrefetchIncrement(&SharedStats->skip_new);
367 tmunro 697 CBC 315982 : return LRQ_NEXT_NO_IO;
367 tmunro 698 ECB : }
699 :
700 : /* There is no point in repeatedly prefetching the same block. */
367 tmunro 701 GIC 6806864 : for (int i = 0; i < XLOGPREFETCHER_SEQ_WINDOW_SIZE; ++i)
367 tmunro 702 ECB : {
367 tmunro 703 GIC 6329456 : if (block->blkno == prefetcher->recent_block[i] &&
277 rhaas 704 GNC 1875837 : RelFileLocatorEquals(block->rlocator, prefetcher->recent_rlocator[i]))
367 tmunro 705 ECB : {
706 : /*
707 : * XXX If we also remembered where it was, we could set
708 : * recent_buffer so that recovery could skip smgropen()
709 : * and a buffer table lookup.
710 : */
367 tmunro 711 GIC 1771260 : XLogPrefetchIncrement(&SharedStats->skip_rep);
367 tmunro 712 CBC 1771260 : return LRQ_NEXT_NO_IO;
367 tmunro 713 ECB : }
714 : }
277 rhaas 715 GNC 477408 : prefetcher->recent_rlocator[prefetcher->recent_idx] = block->rlocator;
367 tmunro 716 CBC 477408 : prefetcher->recent_block[prefetcher->recent_idx] = block->blkno;
717 477408 : prefetcher->recent_idx =
718 477408 : (prefetcher->recent_idx + 1) % XLOGPREFETCHER_SEQ_WINDOW_SIZE;
367 tmunro 719 ECB :
720 : /*
721 : * We could try to have a fast path for repeated references to the
722 : * same relation (with some scheme to handle invalidations
723 : * safely), but for now we'll call smgropen() every time.
724 : */
277 rhaas 725 GNC 477408 : reln = smgropen(block->rlocator, InvalidBackendId);
367 tmunro 726 ECB :
727 : /*
728 : * If the relation file doesn't exist on disk, for example because
729 : * we're replaying after a crash and the file will be created and
730 : * then unlinked by WAL that hasn't been replayed yet, suppress
731 : * further prefetching in the relation until this record is
732 : * replayed.
733 : */
367 tmunro 734 GIC 477408 : if (!smgrexists(reln, MAIN_FORKNUM))
367 tmunro 735 ECB : {
736 : #ifdef XLOGPREFETCHER_DEBUG_LEVEL
737 : elog(XLOGPREFETCHER_DEBUG_LEVEL,
738 : "suppressing all prefetch in relation %u/%u/%u until %X/%X is replayed, because the relation does not exist on disk",
739 : reln->smgr_rlocator.locator.spcOid,
740 : reln->smgr_rlocator.locator.dbOid,
741 : reln->smgr_rlocator.locator.relNumber,
742 : LSN_FORMAT_ARGS(record->lsn));
743 : #endif
277 rhaas 744 UNC 0 : XLogPrefetcherAddFilter(prefetcher, block->rlocator, 0,
367 tmunro 745 EUB : record->lsn);
367 tmunro 746 UIC 0 : XLogPrefetchIncrement(&SharedStats->skip_new);
367 tmunro 747 UBC 0 : return LRQ_NEXT_NO_IO;
367 tmunro 748 EUB : }
749 :
750 : /*
751 : * If the relation isn't big enough to contain the referenced
752 : * block yet, suppress prefetching of this block and higher until
753 : * this record is replayed.
754 : */
367 tmunro 755 GIC 477408 : if (block->blkno >= smgrnblocks(reln, block->forknum))
367 tmunro 756 ECB : {
757 : #ifdef XLOGPREFETCHER_DEBUG_LEVEL
758 : elog(XLOGPREFETCHER_DEBUG_LEVEL,
759 : "suppressing prefetch in relation %u/%u/%u from block %u until %X/%X is replayed, because the relation is too small",
760 : reln->smgr_rlocator.locator.spcOid,
761 : reln->smgr_rlocator.locator.dbOid,
762 : reln->smgr_rlocator.locator.relNumber,
763 : block->blkno,
764 : LSN_FORMAT_ARGS(record->lsn));
765 : #endif
277 rhaas 766 GNC 12988 : XLogPrefetcherAddFilter(prefetcher, block->rlocator, block->blkno,
367 tmunro 767 ECB : record->lsn);
367 tmunro 768 GIC 12988 : XLogPrefetchIncrement(&SharedStats->skip_new);
367 tmunro 769 CBC 12988 : return LRQ_NEXT_NO_IO;
367 tmunro 770 ECB : }
771 :
772 : /* Try to initiate prefetching. */
367 tmunro 773 GIC 464420 : result = PrefetchSharedBuffer(reln, block->forknum, block->blkno);
367 tmunro 774 CBC 464420 : if (BufferIsValid(result.recent_buffer))
367 tmunro 775 ECB : {
776 : /* Cache hit, nothing to do. */
367 tmunro 777 GIC 423615 : XLogPrefetchIncrement(&SharedStats->hit);
367 tmunro 778 CBC 423615 : block->prefetch_buffer = result.recent_buffer;
779 423615 : return LRQ_NEXT_NO_IO;
367 tmunro 780 ECB : }
367 tmunro 781 GIC 40805 : else if (result.initiated_io)
367 tmunro 782 ECB : {
783 : /* Cache miss, I/O (presumably) started. */
367 tmunro 784 GIC 40647 : XLogPrefetchIncrement(&SharedStats->prefetch);
367 tmunro 785 CBC 40647 : block->prefetch_buffer = InvalidBuffer;
786 40647 : return LRQ_NEXT_IO;
367 tmunro 787 ECB : }
1 tmunro 788 GNC 158 : else if ((io_direct_flags & IO_DIRECT_DATA) == 0)
367 tmunro 789 ECB : {
790 : /*
791 : * This shouldn't be possible, because we already determined
792 : * that the relation exists on disk and is big enough.
793 : * Something is wrong with the cache invalidation for
794 : * smgrexists(), smgrnblocks(), or the file was unlinked or
795 : * truncated beneath our feet?
796 : */
367 tmunro 797 UIC 0 : elog(ERROR,
193 rhaas 798 EUB : "could not prefetch relation %u/%u/%u block %u",
799 : reln->smgr_rlocator.locator.spcOid,
800 : reln->smgr_rlocator.locator.dbOid,
801 : reln->smgr_rlocator.locator.relNumber,
802 : block->blkno);
803 : }
804 : }
805 :
806 : /*
807 : * Several callsites need to be able to read exactly one record
808 : * without any internal readahead. Examples: xlog.c reading
809 : * checkpoint records with emode set to PANIC, which might otherwise
810 : * cause XLogPageRead() to panic on some future page, and xlog.c
811 : * determining where to start writing WAL next, which depends on the
812 : * contents of the reader's internal buffer after reading one record.
813 : * Therefore, don't even think about prefetching until the first
814 : * record after XLogPrefetcherBeginRead() has been consumed.
815 : */
367 tmunro 816 GIC 2506463 : if (prefetcher->reader->decode_queue_tail &&
367 tmunro 817 CBC 2506463 : prefetcher->reader->decode_queue_tail->lsn == prefetcher->begin_ptr)
818 2448 : return LRQ_NEXT_AGAIN;
367 tmunro 819 ECB :
820 : /* Advance to the next record. */
367 tmunro 821 GIC 2504015 : prefetcher->record = NULL;
367 tmunro 822 ECB : }
823 : pg_unreachable();
824 : }
825 :
826 : /*
827 : * Expose statistics about recovery prefetching.
828 : */
829 : Datum
367 tmunro 830 UIC 0 : pg_stat_get_recovery_prefetch(PG_FUNCTION_ARGS)
367 tmunro 831 EUB : {
832 : #define PG_STAT_GET_RECOVERY_PREFETCH_COLS 10
367 tmunro 833 UIC 0 : ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
367 tmunro 834 EUB : Datum values[PG_STAT_GET_RECOVERY_PREFETCH_COLS];
835 : bool nulls[PG_STAT_GET_RECOVERY_PREFETCH_COLS];
836 :
173 michael 837 UIC 0 : InitMaterializedSRF(fcinfo, 0);
367 tmunro 838 EUB :
367 tmunro 839 UIC 0 : for (int i = 0; i < PG_STAT_GET_RECOVERY_PREFETCH_COLS; ++i)
367 tmunro 840 UBC 0 : nulls[i] = false;
367 tmunro 841 EUB :
367 tmunro 842 UIC 0 : values[0] = TimestampTzGetDatum(pg_atomic_read_u64(&SharedStats->reset_time));
367 tmunro 843 UBC 0 : values[1] = Int64GetDatum(pg_atomic_read_u64(&SharedStats->prefetch));
844 0 : values[2] = Int64GetDatum(pg_atomic_read_u64(&SharedStats->hit));
845 0 : values[3] = Int64GetDatum(pg_atomic_read_u64(&SharedStats->skip_init));
846 0 : values[4] = Int64GetDatum(pg_atomic_read_u64(&SharedStats->skip_new));
847 0 : values[5] = Int64GetDatum(pg_atomic_read_u64(&SharedStats->skip_fpw));
848 0 : values[6] = Int64GetDatum(pg_atomic_read_u64(&SharedStats->skip_rep));
849 0 : values[7] = Int32GetDatum(SharedStats->wal_distance);
850 0 : values[8] = Int32GetDatum(SharedStats->block_distance);
851 0 : values[9] = Int32GetDatum(SharedStats->io_depth);
852 0 : tuplestore_putvalues(rsinfo->setResult, rsinfo->setDesc, values, nulls);
367 tmunro 853 EUB :
367 tmunro 854 UIC 0 : return (Datum) 0;
367 tmunro 855 EUB : }
856 :
857 : /*
858 : * Don't prefetch any blocks >= 'blockno' from a given 'rlocator', until 'lsn'
859 : * has been replayed.
860 : */
861 : static inline void
277 rhaas 862 GNC 24571 : XLogPrefetcherAddFilter(XLogPrefetcher *prefetcher, RelFileLocator rlocator,
367 tmunro 863 ECB : BlockNumber blockno, XLogRecPtr lsn)
864 : {
865 : XLogPrefetcherFilter *filter;
866 : bool found;
867 :
277 rhaas 868 GNC 24571 : filter = hash_search(prefetcher->filter_table, &rlocator, HASH_ENTER, &found);
367 tmunro 869 CBC 24571 : if (!found)
367 tmunro 870 ECB : {
871 : /*
872 : * Don't allow any prefetching of this block or higher until replayed.
873 : */
367 tmunro 874 GIC 24556 : filter->filter_until_replayed = lsn;
367 tmunro 875 CBC 24556 : filter->filter_from_block = blockno;
876 24556 : dlist_push_head(&prefetcher->filter_queue, &filter->link);
367 tmunro 877 ECB : }
878 : else
879 : {
880 : /*
881 : * We were already filtering this rlocator. Extend the filter's
882 : * lifetime to cover this WAL record, but leave the lower of the block
883 : * numbers there because we don't want to have to track individual
884 : * blocks.
885 : */
367 tmunro 886 GIC 15 : filter->filter_until_replayed = lsn;
887 15 : dlist_delete(&filter->link);
367 tmunro 888 CBC 15 : dlist_push_head(&prefetcher->filter_queue, &filter->link);
889 15 : filter->filter_from_block = Min(filter->filter_from_block, blockno);
367 tmunro 890 ECB : }
367 tmunro 891 CBC 24571 : }
892 :
367 tmunro 893 ECB : /*
894 : * Have we replayed any records that caused us to begin filtering a block
895 : * range? That means that relations should have been created, extended or
896 : * dropped as required, so we can stop filtering out accesses to a given
897 : * relfilenumber.
898 : */
899 : static inline void
367 tmunro 900 GIC 2506683 : XLogPrefetcherCompleteFilters(XLogPrefetcher *prefetcher, XLogRecPtr replaying_lsn)
901 : {
367 tmunro 902 CBC 2531239 : while (unlikely(!dlist_is_empty(&prefetcher->filter_queue)))
903 : {
904 608883 : XLogPrefetcherFilter *filter = dlist_tail_element(XLogPrefetcherFilter,
905 : link,
367 tmunro 906 ECB : &prefetcher->filter_queue);
907 :
367 tmunro 908 GIC 608883 : if (filter->filter_until_replayed >= replaying_lsn)
909 584327 : break;
367 tmunro 910 ECB :
367 tmunro 911 CBC 24556 : dlist_delete(&filter->link);
367 tmunro 912 GIC 24556 : hash_search(prefetcher->filter_table, filter, HASH_REMOVE, NULL);
367 tmunro 913 ECB : }
367 tmunro 914 CBC 2506683 : }
915 :
367 tmunro 916 ECB : /*
917 : * Check if a given block should be skipped due to a filter.
918 : */
919 : static inline bool
277 rhaas 920 GNC 2564650 : XLogPrefetcherIsFiltered(XLogPrefetcher *prefetcher, RelFileLocator rlocator,
921 : BlockNumber blockno)
367 tmunro 922 ECB : {
923 : /*
924 : * Test for empty queue first, because we expect it to be empty most of
925 : * the time and we can avoid the hash table lookup in that case.
926 : */
367 tmunro 927 GIC 2564650 : if (unlikely(!dlist_is_empty(&prefetcher->filter_queue)))
928 : {
367 tmunro 929 ECB : XLogPrefetcherFilter *filter;
930 :
931 : /* See if the block range is filtered. */
277 rhaas 932 GNC 583644 : filter = hash_search(prefetcher->filter_table, &rlocator, HASH_FIND, NULL);
367 tmunro 933 GIC 583644 : if (filter && filter->filter_from_block <= blockno)
367 tmunro 934 ECB : {
935 : #ifdef XLOGPREFETCHER_DEBUG_LEVEL
936 : elog(XLOGPREFETCHER_DEBUG_LEVEL,
937 : "prefetch of %u/%u/%u block %u suppressed; filtering until LSN %X/%X is replayed (blocks >= %u filtered)",
938 : rlocator.spcOid, rlocator.dbOid, rlocator.relNumber, blockno,
939 : LSN_FORMAT_ARGS(filter->filter_until_replayed),
940 : filter->filter_from_block);
941 : #endif
367 tmunro 942 GIC 315982 : return true;
943 : }
367 tmunro 944 ECB :
945 : /* See if the whole database is filtered. */
277 rhaas 946 GNC 267662 : rlocator.relNumber = InvalidRelFileNumber;
947 267662 : rlocator.spcOid = InvalidOid;
948 267662 : filter = hash_search(prefetcher->filter_table, &rlocator, HASH_FIND, NULL);
367 tmunro 949 CBC 267662 : if (filter)
367 tmunro 950 ECB : {
951 : #ifdef XLOGPREFETCHER_DEBUG_LEVEL
952 : elog(XLOGPREFETCHER_DEBUG_LEVEL,
953 : "prefetch of %u/%u/%u block %u suppressed; filtering until LSN %X/%X is replayed (whole database)",
954 : rlocator.spcOid, rlocator.dbOid, rlocator.relNumber, blockno,
955 : LSN_FORMAT_ARGS(filter->filter_until_replayed));
956 : #endif
367 tmunro 957 UIC 0 : return true;
958 : }
367 tmunro 959 EUB : }
960 :
367 tmunro 961 GIC 2248668 : return false;
962 : }
367 tmunro 963 ECB :
964 : /*
965 : * A wrapper for XLogBeginRead() that also resets the prefetcher.
966 : */
967 : void
367 tmunro 968 GIC 2448 : XLogPrefetcherBeginRead(XLogPrefetcher *prefetcher, XLogRecPtr recPtr)
969 : {
367 tmunro 970 ECB : /* This will forget about any in-flight IO. */
367 tmunro 971 GIC 2448 : prefetcher->reconfigure_count--;
972 :
367 tmunro 973 ECB : /* Book-keeping to avoid readahead on first read. */
367 tmunro 974 GIC 2448 : prefetcher->begin_ptr = recPtr;
975 :
367 tmunro 976 CBC 2448 : prefetcher->no_readahead_until = 0;
977 :
367 tmunro 978 ECB : /* This will forget about any queued up records in the decoder. */
367 tmunro 979 GIC 2448 : XLogBeginRead(prefetcher->reader, recPtr);
980 2448 : }
367 tmunro 981 ECB :
982 : /*
983 : * A wrapper for XLogReadRecord() that provides the same interface, but also
984 : * tries to initiate I/O for blocks referenced in future WAL records.
985 : */
986 : XLogRecord *
367 tmunro 987 GIC 2506683 : XLogPrefetcherReadRecord(XLogPrefetcher *prefetcher, char **errmsg)
988 : {
367 tmunro 989 ECB : DecodedXLogRecord *record;
990 : XLogRecPtr replayed_up_to;
991 :
992 : /*
993 : * See if it's time to reset the prefetching machinery, because a relevant
994 : * GUC was changed.
995 : */
367 tmunro 996 GIC 2506683 : if (unlikely(XLogPrefetchReconfigureCount != prefetcher->reconfigure_count))
997 : {
367 tmunro 998 ECB : uint32 max_distance;
999 : uint32 max_inflight;
1000 :
367 tmunro 1001 GIC 2457 : if (prefetcher->streaming_read)
1002 1281 : lrq_free(prefetcher->streaming_read);
367 tmunro 1003 ECB :
367 tmunro 1004 CBC 2457 : if (RecoveryPrefetchEnabled())
1005 : {
213 1006 2457 : Assert(maintenance_io_concurrency > 0);
213 tmunro 1007 GIC 2457 : max_inflight = maintenance_io_concurrency;
367 tmunro 1008 CBC 2457 : max_distance = max_inflight * XLOGPREFETCHER_DISTANCE_MULTIPLIER;
367 tmunro 1009 ECB : }
1010 : else
1011 : {
367 tmunro 1012 UIC 0 : max_inflight = 1;
1013 0 : max_distance = 1;
367 tmunro 1014 EUB : }
1015 :
367 tmunro 1016 GIC 2457 : prefetcher->streaming_read = lrq_alloc(max_distance,
1017 : max_inflight,
367 tmunro 1018 ECB : (uintptr_t) prefetcher,
1019 : XLogPrefetcherNextBlock);
1020 :
367 tmunro 1021 GIC 2457 : prefetcher->reconfigure_count = XLogPrefetchReconfigureCount;
1022 : }
367 tmunro 1023 ECB :
1024 : /*
1025 : * Release last returned record, if there is one, as it's now been
1026 : * replayed.
1027 : */
213 tmunro 1028 GIC 2506683 : replayed_up_to = XLogReleasePreviousRecord(prefetcher->reader);
1029 :
213 tmunro 1030 ECB : /*
1031 : * Can we drop any filters yet? If we were waiting for a relation to be
1032 : * created or extended, it is now OK to access blocks in the covered
1033 : * range.
1034 : */
213 tmunro 1035 GIC 2506683 : XLogPrefetcherCompleteFilters(prefetcher, replayed_up_to);
1036 :
213 tmunro 1037 ECB : /*
1038 : * All IO initiated by earlier WAL is now completed. This might trigger
1039 : * further prefetching.
1040 : */
213 tmunro 1041 GIC 2506683 : lrq_complete_lsn(prefetcher->streaming_read, replayed_up_to);
1042 :
213 tmunro 1043 ECB : /*
1044 : * If there's nothing queued yet, then start prefetching to cause at least
1045 : * one record to be queued.
1046 : */
367 tmunro 1047 GIC 2506652 : if (!XLogReaderHasQueuedRecordOrError(prefetcher->reader))
1048 : {
213 tmunro 1049 CBC 14 : Assert(lrq_inflight(prefetcher->streaming_read) == 0);
213 tmunro 1050 GIC 14 : Assert(lrq_completed(prefetcher->streaming_read) == 0);
367 tmunro 1051 CBC 14 : lrq_prefetch(prefetcher->streaming_read);
213 tmunro 1052 ECB : }
367 1053 :
1054 : /* Read the next record. */
367 tmunro 1055 GIC 2506652 : record = XLogNextRecord(prefetcher->reader, errmsg);
1056 2506652 : if (!record)
367 tmunro 1057 CBC 195 : return NULL;
367 tmunro 1058 ECB :
1059 : /*
1060 : * The record we just got is the "current" one, for the benefit of the
1061 : * XLogRecXXX() macros.
1062 : */
367 tmunro 1063 GIC 2506457 : Assert(record == prefetcher->reader->record);
1064 :
367 tmunro 1065 ECB : /*
1066 : * If maintenance_io_concurrency is set very low, we might have started
1067 : * prefetching some but not all of the blocks referenced in the record
1068 : * we're about to return. Forget about the rest of the blocks in this
1069 : * record by dropping the prefetcher's reference to it.
1070 : */
213 tmunro 1071 GIC 2506457 : if (record == prefetcher->record)
1072 2448 : prefetcher->record = NULL;
367 tmunro 1073 ECB :
1074 : /*
1075 : * See if it's time to compute some statistics, because enough WAL has
1076 : * been processed.
1077 : */
367 tmunro 1078 GIC 2506457 : if (unlikely(record->lsn >= prefetcher->next_stats_shm_lsn))
1079 43269 : XLogPrefetcherComputeStats(prefetcher);
367 tmunro 1080 ECB :
367 tmunro 1081 CBC 2506457 : Assert(record == prefetcher->reader->record);
1082 :
1083 2506457 : return &record->header;
1084 : }
367 tmunro 1085 ECB :
1086 : bool
367 tmunro 1087 GIC 1857 : check_recovery_prefetch(int *new_value, void **extra, GucSource source)
1088 : {
367 tmunro 1089 ECB : #ifndef USE_PREFETCH
1090 : if (*new_value == RECOVERY_PREFETCH_ON)
1091 : {
1092 : GUC_check_errdetail("recovery_prefetch is not supported on platforms that lack posix_fadvise().");
1093 : return false;
1094 : }
1095 : #endif
1096 :
367 tmunro 1097 GIC 1857 : return true;
1098 : }
367 tmunro 1099 ECB :
1100 : void
367 tmunro 1101 GIC 1857 : assign_recovery_prefetch(int new_value, void *extra)
1102 : {
367 tmunro 1103 ECB : /* Reconfigure prefetching, because a setting it depends on changed. */
367 tmunro 1104 GIC 1857 : recovery_prefetch = new_value;
1105 1857 : if (AmStartupProcess())
367 tmunro 1106 LBC 0 : XLogPrefetchReconfigure();
367 tmunro 1107 CBC 1857 : }
|