Age Owner Branch data TLA Line data Source code
1 : : /*-------------------------------------------------------------------------
2 : : *
3 : : * xlogprefetcher.c
4 : : * Prefetching support for recovery.
5 : : *
6 : : * Portions Copyright (c) 2022-2024, PostgreSQL Global Development Group
7 : : * Portions Copyright (c) 1994, Regents of the University of California
8 : : *
9 : : *
10 : : * IDENTIFICATION
11 : : * src/backend/access/transam/xlogprefetcher.c
12 : : *
13 : : * This module provides a drop-in replacement for an XLogReader that tries to
14 : : * minimize I/O stalls by looking ahead in the WAL. If blocks that will be
15 : : * accessed in the near future are not already in the buffer pool, it initiates
16 : : * I/Os that might complete before the caller eventually needs the data. When
17 : : * referenced blocks are found in the buffer pool already, the buffer is
18 : : * recorded in the decoded record so that XLogReadBufferForRedo() can try to
19 : : * avoid a second buffer mapping table lookup.
20 : : *
21 : : * Currently, only the main fork is considered for prefetching. Currently,
22 : : * prefetching is only effective on systems where PrefetchBuffer() does
23 : : * something useful (mainly Linux).
24 : : *
25 : : *-------------------------------------------------------------------------
26 : : */
27 : :
28 : : #include "postgres.h"
29 : :
30 : : #include "access/xlogprefetcher.h"
31 : : #include "access/xlogreader.h"
32 : : #include "catalog/pg_control.h"
33 : : #include "catalog/storage_xlog.h"
34 : : #include "commands/dbcommands_xlog.h"
35 : : #include "funcapi.h"
36 : : #include "miscadmin.h"
37 : : #include "port/atomics.h"
38 : : #include "storage/bufmgr.h"
39 : : #include "storage/shmem.h"
40 : : #include "storage/smgr.h"
41 : : #include "utils/fmgrprotos.h"
42 : : #include "utils/guc_hooks.h"
43 : : #include "utils/hsearch.h"
44 : : #include "utils/timestamp.h"
45 : :
46 : : /*
47 : : * Every time we process this much WAL, we'll update the values in
48 : : * pg_stat_recovery_prefetch.
49 : : */
50 : : #define XLOGPREFETCHER_STATS_DISTANCE BLCKSZ
51 : :
52 : : /*
53 : : * To detect repeated access to the same block and skip useless extra system
54 : : * calls, we remember a small window of recently prefetched blocks.
55 : : */
56 : : #define XLOGPREFETCHER_SEQ_WINDOW_SIZE 4
57 : :
58 : : /*
59 : : * When maintenance_io_concurrency is not saturated, we're prepared to look
60 : : * ahead up to N times that number of block references.
61 : : */
62 : : #define XLOGPREFETCHER_DISTANCE_MULTIPLIER 4
63 : :
64 : : /* Define to log internal debugging messages. */
65 : : /* #define XLOGPREFETCHER_DEBUG_LEVEL LOG */
66 : :
67 : : /* GUCs */
68 : : int recovery_prefetch = RECOVERY_PREFETCH_TRY;
69 : :
70 : : #ifdef USE_PREFETCH
71 : : #define RecoveryPrefetchEnabled() \
72 : : (recovery_prefetch != RECOVERY_PREFETCH_OFF && \
73 : : maintenance_io_concurrency > 0)
74 : : #else
75 : : #define RecoveryPrefetchEnabled() false
76 : : #endif
77 : :
78 : : static int XLogPrefetchReconfigureCount = 0;
79 : :
80 : : /*
81 : : * Enum used to report whether an IO should be started.
82 : : */
83 : : typedef enum
84 : : {
85 : : LRQ_NEXT_NO_IO,
86 : : LRQ_NEXT_IO,
87 : : LRQ_NEXT_AGAIN,
88 : : } LsnReadQueueNextStatus;
89 : :
90 : : /*
91 : : * Type of callback that can decide which block to prefetch next. For now
92 : : * there is only one.
93 : : */
94 : : typedef LsnReadQueueNextStatus (*LsnReadQueueNextFun) (uintptr_t lrq_private,
95 : : XLogRecPtr *lsn);
96 : :
97 : : /*
98 : : * A simple circular queue of LSNs, using to control the number of
99 : : * (potentially) inflight IOs. This stands in for a later more general IO
100 : : * control mechanism, which is why it has the apparently unnecessary
101 : : * indirection through a function pointer.
102 : : */
103 : : typedef struct LsnReadQueue
104 : : {
105 : : LsnReadQueueNextFun next;
106 : : uintptr_t lrq_private;
107 : : uint32 max_inflight;
108 : : uint32 inflight;
109 : : uint32 completed;
110 : : uint32 head;
111 : : uint32 tail;
112 : : uint32 size;
113 : : struct
114 : : {
115 : : bool io;
116 : : XLogRecPtr lsn;
117 : : } queue[FLEXIBLE_ARRAY_MEMBER];
118 : : } LsnReadQueue;
119 : :
120 : : /*
121 : : * A prefetcher. This is a mechanism that wraps an XLogReader, prefetching
122 : : * blocks that will be soon be referenced, to try to avoid IO stalls.
123 : : */
124 : : struct XLogPrefetcher
125 : : {
126 : : /* WAL reader and current reading state. */
127 : : XLogReaderState *reader;
128 : : DecodedXLogRecord *record;
129 : : int next_block_id;
130 : :
131 : : /* When to publish stats. */
132 : : XLogRecPtr next_stats_shm_lsn;
133 : :
134 : : /* Book-keeping to avoid accessing blocks that don't exist yet. */
135 : : HTAB *filter_table;
136 : : dlist_head filter_queue;
137 : :
138 : : /* Book-keeping to avoid repeat prefetches. */
139 : : RelFileLocator recent_rlocator[XLOGPREFETCHER_SEQ_WINDOW_SIZE];
140 : : BlockNumber recent_block[XLOGPREFETCHER_SEQ_WINDOW_SIZE];
141 : : int recent_idx;
142 : :
143 : : /* Book-keeping to disable prefetching temporarily. */
144 : : XLogRecPtr no_readahead_until;
145 : :
146 : : /* IO depth manager. */
147 : : LsnReadQueue *streaming_read;
148 : :
149 : : XLogRecPtr begin_ptr;
150 : :
151 : : int reconfigure_count;
152 : : };
153 : :
154 : : /*
155 : : * A temporary filter used to track block ranges that haven't been created
156 : : * yet, whole relations that haven't been created yet, and whole relations
157 : : * that (we assume) have already been dropped, or will be created by bulk WAL
158 : : * operators.
159 : : */
160 : : typedef struct XLogPrefetcherFilter
161 : : {
162 : : RelFileLocator rlocator;
163 : : XLogRecPtr filter_until_replayed;
164 : : BlockNumber filter_from_block;
165 : : dlist_node link;
166 : : } XLogPrefetcherFilter;
167 : :
168 : : /*
169 : : * Counters exposed in shared memory for pg_stat_recovery_prefetch.
170 : : */
171 : : typedef struct XLogPrefetchStats
172 : : {
173 : : pg_atomic_uint64 reset_time; /* Time of last reset. */
174 : : pg_atomic_uint64 prefetch; /* Prefetches initiated. */
175 : : pg_atomic_uint64 hit; /* Blocks already in cache. */
176 : : pg_atomic_uint64 skip_init; /* Zero-inited blocks skipped. */
177 : : pg_atomic_uint64 skip_new; /* New/missing blocks filtered. */
178 : : pg_atomic_uint64 skip_fpw; /* FPWs skipped. */
179 : : pg_atomic_uint64 skip_rep; /* Repeat accesses skipped. */
180 : :
181 : : /* Dynamic values */
182 : : int wal_distance; /* Number of WAL bytes ahead. */
183 : : int block_distance; /* Number of block references ahead. */
184 : : int io_depth; /* Number of I/Os in progress. */
185 : : } XLogPrefetchStats;
186 : :
187 : : static inline void XLogPrefetcherAddFilter(XLogPrefetcher *prefetcher,
188 : : RelFileLocator rlocator,
189 : : BlockNumber blockno,
190 : : XLogRecPtr lsn);
191 : : static inline bool XLogPrefetcherIsFiltered(XLogPrefetcher *prefetcher,
192 : : RelFileLocator rlocator,
193 : : BlockNumber blockno);
194 : : static inline void XLogPrefetcherCompleteFilters(XLogPrefetcher *prefetcher,
195 : : XLogRecPtr replaying_lsn);
196 : : static LsnReadQueueNextStatus XLogPrefetcherNextBlock(uintptr_t pgsr_private,
197 : : XLogRecPtr *lsn);
198 : :
199 : : static XLogPrefetchStats *SharedStats;
200 : :
201 : : static inline LsnReadQueue *
738 tmunro@postgresql.or 202 :CBC 1816 : lrq_alloc(uint32 max_distance,
203 : : uint32 max_inflight,
204 : : uintptr_t lrq_private,
205 : : LsnReadQueueNextFun next)
206 : : {
207 : : LsnReadQueue *lrq;
208 : : uint32 size;
209 : :
210 [ - + ]: 1816 : Assert(max_distance >= max_inflight);
211 : :
212 : 1816 : size = max_distance + 1; /* full ring buffer has a gap */
213 : 1816 : lrq = palloc(offsetof(LsnReadQueue, queue) + sizeof(lrq->queue[0]) * size);
214 : 1816 : lrq->lrq_private = lrq_private;
215 : 1816 : lrq->max_inflight = max_inflight;
216 : 1816 : lrq->size = size;
217 : 1816 : lrq->next = next;
218 : 1816 : lrq->head = 0;
219 : 1816 : lrq->tail = 0;
220 : 1816 : lrq->inflight = 0;
221 : 1816 : lrq->completed = 0;
222 : :
223 : 1816 : return lrq;
224 : : }
225 : :
226 : : static inline void
227 : 1722 : lrq_free(LsnReadQueue *lrq)
228 : : {
229 : 1722 : pfree(lrq);
230 : 1722 : }
231 : :
232 : : static inline uint32
233 : 71211 : lrq_inflight(LsnReadQueue *lrq)
234 : : {
235 : 71211 : return lrq->inflight;
236 : : }
237 : :
238 : : static inline uint32
239 : 71211 : lrq_completed(LsnReadQueue *lrq)
240 : : {
241 : 71211 : return lrq->completed;
242 : : }
243 : :
244 : : static inline void
245 : 2984132 : lrq_prefetch(LsnReadQueue *lrq)
246 : : {
247 : : /* Try to start as many IOs as we can within our limits. */
248 [ + + ]: 9106213 : while (lrq->inflight < lrq->max_inflight &&
249 [ + + ]: 6105905 : lrq->inflight + lrq->completed < lrq->size - 1)
250 : : {
251 [ - + ]: 3459515 : Assert(((lrq->head + 1) % lrq->size) != lrq->tail);
252 [ + + + - ]: 3459515 : switch (lrq->next(lrq->lrq_private, &lrq->queue[lrq->head].lsn))
253 : : {
254 : 321483 : case LRQ_NEXT_AGAIN:
255 : 321483 : return;
256 : 54785 : case LRQ_NEXT_IO:
257 : 54785 : lrq->queue[lrq->head].io = true;
258 : 54785 : lrq->inflight++;
259 : 54785 : break;
260 : 3083164 : case LRQ_NEXT_NO_IO:
261 : 3083164 : lrq->queue[lrq->head].io = false;
262 : 3083164 : lrq->completed++;
263 : 3083164 : break;
264 : : }
265 : 3137949 : lrq->head++;
266 [ + + ]: 3137949 : if (lrq->head == lrq->size)
267 : 76470 : lrq->head = 0;
268 : : }
269 : : }
270 : :
271 : : static inline void
272 : 2984115 : lrq_complete_lsn(LsnReadQueue *lrq, XLogRecPtr lsn)
273 : : {
274 : : /*
275 : : * We know that LSNs before 'lsn' have been replayed, so we can now assume
276 : : * that any IOs that were started before then have finished.
277 : : */
278 [ + + ]: 9105888 : while (lrq->tail != lrq->head &&
279 [ + + ]: 6081859 : lrq->queue[lrq->tail].lsn < lsn)
280 : : {
281 [ + + ]: 3137658 : if (lrq->queue[lrq->tail].io)
282 : 54784 : lrq->inflight--;
283 : : else
284 : 3082874 : lrq->completed--;
285 : 3137658 : lrq->tail++;
286 [ + + ]: 3137658 : if (lrq->tail == lrq->size)
287 : 76464 : lrq->tail = 0;
288 : : }
289 [ + - + - ]: 2984115 : if (RecoveryPrefetchEnabled())
290 : 2984115 : lrq_prefetch(lrq);
291 : 2984032 : }
292 : :
293 : : size_t
294 : 1679 : XLogPrefetchShmemSize(void)
295 : : {
296 : 1679 : return sizeof(XLogPrefetchStats);
297 : : }
298 : :
299 : : /*
300 : : * Reset all counters to zero.
301 : : */
302 : : void
738 tmunro@postgresql.or 303 :GBC 3 : XLogPrefetchResetStats(void)
304 : : {
305 : 3 : pg_atomic_write_u64(&SharedStats->reset_time, GetCurrentTimestamp());
306 : 3 : pg_atomic_write_u64(&SharedStats->prefetch, 0);
307 : 3 : pg_atomic_write_u64(&SharedStats->hit, 0);
308 : 3 : pg_atomic_write_u64(&SharedStats->skip_init, 0);
309 : 3 : pg_atomic_write_u64(&SharedStats->skip_new, 0);
310 : 3 : pg_atomic_write_u64(&SharedStats->skip_fpw, 0);
311 : 3 : pg_atomic_write_u64(&SharedStats->skip_rep, 0);
312 : 3 : }
313 : :
314 : : void
738 tmunro@postgresql.or 315 :CBC 898 : XLogPrefetchShmemInit(void)
316 : : {
317 : : bool found;
318 : :
319 : 898 : SharedStats = (XLogPrefetchStats *)
320 : 898 : ShmemInitStruct("XLogPrefetchStats",
321 : : sizeof(XLogPrefetchStats),
322 : : &found);
323 : :
324 [ + - ]: 898 : if (!found)
325 : : {
326 : 898 : pg_atomic_init_u64(&SharedStats->reset_time, GetCurrentTimestamp());
327 : 898 : pg_atomic_init_u64(&SharedStats->prefetch, 0);
328 : 898 : pg_atomic_init_u64(&SharedStats->hit, 0);
329 : 898 : pg_atomic_init_u64(&SharedStats->skip_init, 0);
330 : 898 : pg_atomic_init_u64(&SharedStats->skip_new, 0);
331 : 898 : pg_atomic_init_u64(&SharedStats->skip_fpw, 0);
332 : 898 : pg_atomic_init_u64(&SharedStats->skip_rep, 0);
333 : : }
334 : 898 : }
335 : :
336 : : /*
337 : : * Called when any GUC is changed that affects prefetching.
338 : : */
339 : : void
340 : 26 : XLogPrefetchReconfigure(void)
341 : : {
342 : 26 : XLogPrefetchReconfigureCount++;
343 : 26 : }
344 : :
345 : : /*
346 : : * Increment a counter in shared memory. This is equivalent to *counter++ on a
347 : : * plain uint64 without any memory barrier or locking, except on platforms
348 : : * where readers can't read uint64 without possibly observing a torn value.
349 : : */
350 : : static inline void
351 : 3124088 : XLogPrefetchIncrement(pg_atomic_uint64 *counter)
352 : : {
353 [ + + - + ]: 3124088 : Assert(AmStartupProcess() || !IsUnderPostmaster);
354 : 3124088 : pg_atomic_write_u64(counter, pg_atomic_read_u64(counter) + 1);
355 : 3124088 : }
356 : :
357 : : /*
358 : : * Create a prefetcher that is ready to begin prefetching blocks referenced by
359 : : * WAL records.
360 : : */
361 : : XLogPrefetcher *
362 : 823 : XLogPrefetcherAllocate(XLogReaderState *reader)
363 : : {
364 : : XLogPrefetcher *prefetcher;
365 : : static HASHCTL hash_table_ctl = {
366 : : .keysize = sizeof(RelFileLocator),
367 : : .entrysize = sizeof(XLogPrefetcherFilter)
368 : : };
369 : :
370 : 823 : prefetcher = palloc0(sizeof(XLogPrefetcher));
371 : :
372 : 823 : prefetcher->reader = reader;
373 : 823 : prefetcher->filter_table = hash_create("XLogPrefetcherFilterTable", 1024,
374 : : &hash_table_ctl,
375 : : HASH_ELEM | HASH_BLOBS);
376 : 823 : dlist_init(&prefetcher->filter_queue);
377 : :
378 : 823 : SharedStats->wal_distance = 0;
379 : 823 : SharedStats->block_distance = 0;
380 : 823 : SharedStats->io_depth = 0;
381 : :
382 : : /* First usage will cause streaming_read to be allocated. */
383 : 823 : prefetcher->reconfigure_count = XLogPrefetchReconfigureCount - 1;
384 : :
385 : 823 : return prefetcher;
386 : : }
387 : :
388 : : /*
389 : : * Destroy a prefetcher and release all resources.
390 : : */
391 : : void
392 : 729 : XLogPrefetcherFree(XLogPrefetcher *prefetcher)
393 : : {
394 : 729 : lrq_free(prefetcher->streaming_read);
395 : 729 : hash_destroy(prefetcher->filter_table);
396 : 729 : pfree(prefetcher);
397 : 729 : }
398 : :
399 : : /*
400 : : * Provide access to the reader.
401 : : */
402 : : XLogReaderState *
403 : 2983901 : XLogPrefetcherGetReader(XLogPrefetcher *prefetcher)
404 : : {
405 : 2983901 : return prefetcher->reader;
406 : : }
407 : :
408 : : /*
409 : : * Update the statistics visible in the pg_stat_recovery_prefetch view.
410 : : */
411 : : void
412 : 71194 : XLogPrefetcherComputeStats(XLogPrefetcher *prefetcher)
413 : : {
414 : : uint32 io_depth;
415 : : uint32 completed;
416 : : int64 wal_distance;
417 : :
418 : :
419 : : /* How far ahead of replay are we now? */
420 [ + + ]: 71194 : if (prefetcher->reader->decode_queue_tail)
421 : : {
422 : 58239 : wal_distance =
423 : 58239 : prefetcher->reader->decode_queue_tail->lsn -
424 : 58239 : prefetcher->reader->decode_queue_head->lsn;
425 : : }
426 : : else
427 : : {
428 : 12955 : wal_distance = 0;
429 : : }
430 : :
431 : : /* How many IOs are currently in flight and completed? */
432 : 71194 : io_depth = lrq_inflight(prefetcher->streaming_read);
433 : 71194 : completed = lrq_completed(prefetcher->streaming_read);
434 : :
435 : : /* Update the instantaneous stats visible in pg_stat_recovery_prefetch. */
436 : 71194 : SharedStats->io_depth = io_depth;
437 : 71194 : SharedStats->block_distance = io_depth + completed;
438 : 71194 : SharedStats->wal_distance = wal_distance;
439 : :
440 : 71194 : prefetcher->next_stats_shm_lsn =
441 : 71194 : prefetcher->reader->ReadRecPtr + XLOGPREFETCHER_STATS_DISTANCE;
442 : 71194 : }
443 : :
444 : : /*
445 : : * A callback that examines the next block reference in the WAL, and possibly
446 : : * starts an IO so that a later read will be fast.
447 : : *
448 : : * Returns LRQ_NEXT_AGAIN if no more WAL data is available yet.
449 : : *
450 : : * Returns LRQ_NEXT_IO if the next block reference is for a main fork block
451 : : * that isn't in the buffer pool, and the kernel has been asked to start
452 : : * reading it to make a future read system call faster. An LSN is written to
453 : : * *lsn, and the I/O will be considered to have completed once that LSN is
454 : : * replayed.
455 : : *
456 : : * Returns LRQ_NEXT_NO_IO if we examined the next block reference and found
457 : : * that it was already in the buffer pool, or we decided for various reasons
458 : : * not to prefetch.
459 : : */
460 : : static LsnReadQueueNextStatus
461 : 3459515 : XLogPrefetcherNextBlock(uintptr_t pgsr_private, XLogRecPtr *lsn)
462 : : {
463 : 3459515 : XLogPrefetcher *prefetcher = (XLogPrefetcher *) pgsr_private;
464 : 3459515 : XLogReaderState *reader = prefetcher->reader;
465 : 3459515 : XLogRecPtr replaying_lsn = reader->ReadRecPtr;
466 : :
467 : : /*
468 : : * We keep track of the record and block we're up to between calls with
469 : : * prefetcher->record and prefetcher->next_block_id.
470 : : */
471 : : for (;;)
472 : 2982164 : {
473 : : DecodedXLogRecord *record;
474 : :
475 : : /* Try to read a new future record, if we don't already have one. */
476 [ + + ]: 6441679 : if (prefetcher->record == NULL)
477 : : {
478 : : bool nonblocking;
479 : :
480 : : /*
481 : : * If there are already records or an error queued up that could
482 : : * be replayed, we don't want to block here. Otherwise, it's OK
483 : : * to block waiting for more data: presumably the caller has
484 : : * nothing else to do.
485 : : */
486 : 3303736 : nonblocking = XLogReaderHasQueuedRecordOrError(reader);
487 : :
488 : : /* Readahead is disabled until we replay past a certain point. */
728 489 [ + + + + ]: 3303736 : if (nonblocking && replaying_lsn <= prefetcher->no_readahead_until)
738 490 : 306200 : return LRQ_NEXT_AGAIN;
491 : :
492 : 2997536 : record = XLogReadAhead(prefetcher->reader, nonblocking);
493 [ + + ]: 2997453 : if (record == NULL)
494 : : {
495 : : /*
496 : : * We can't read any more, due to an error or lack of data in
497 : : * nonblocking mode. Don't try to read ahead again until
498 : : * we've replayed everything already decoded.
499 : : */
728 500 [ + + + - ]: 13493 : if (nonblocking && prefetcher->reader->decode_queue_tail)
501 : 13231 : prefetcher->no_readahead_until =
502 : 13231 : prefetcher->reader->decode_queue_tail->lsn;
503 : :
738 504 : 13493 : return LRQ_NEXT_AGAIN;
505 : : }
506 : :
507 : : /*
508 : : * If prefetching is disabled, we don't need to analyze the record
509 : : * or issue any prefetches. We just need to cause one record to
510 : : * be decoded.
511 : : */
512 [ + - - + ]: 2983960 : if (!RecoveryPrefetchEnabled())
513 : : {
738 tmunro@postgresql.or 514 :UBC 0 : *lsn = InvalidXLogRecPtr;
515 : 0 : return LRQ_NEXT_NO_IO;
516 : : }
517 : :
518 : : /* We have a new record to process. */
738 tmunro@postgresql.or 519 :CBC 2983960 : prefetcher->record = record;
520 : 2983960 : prefetcher->next_block_id = 0;
521 : : }
522 : : else
523 : : {
524 : : /* Continue to process from last call, or last loop. */
525 : 3137943 : record = prefetcher->record;
526 : : }
527 : :
528 : : /*
529 : : * Check for operations that require us to filter out block ranges, or
530 : : * pause readahead completely.
531 : : */
532 [ + - ]: 6121903 : if (replaying_lsn < record->lsn)
533 : : {
534 : 6121903 : uint8 rmid = record->header.xl_rmid;
535 : 6121903 : uint8 record_type = record->header.xl_info & ~XLR_INFO_MASK;
536 : :
537 [ + + ]: 6121903 : if (rmid == RM_XLOG_ID)
538 : : {
539 [ + + + + ]: 90905 : if (record_type == XLOG_CHECKPOINT_SHUTDOWN ||
540 : : record_type == XLOG_END_OF_RECOVERY)
541 : : {
542 : : /*
543 : : * These records might change the TLI. Avoid potential
544 : : * bugs if we were to allow "read TLI" and "replay TLI" to
545 : : * differ without more analysis.
546 : : */
547 : 1350 : prefetcher->no_readahead_until = record->lsn;
548 : :
549 : : #ifdef XLOGPREFETCHER_DEBUG_LEVEL
550 : : elog(XLOGPREFETCHER_DEBUG_LEVEL,
551 : : "suppressing all readahead until %X/%X is replayed due to possible TLI change",
552 : : LSN_FORMAT_ARGS(record->lsn));
553 : : #endif
554 : :
555 : : /* Fall through so we move past this record. */
556 : : }
557 : : }
558 [ + + ]: 6030998 : else if (rmid == RM_DBASE_ID)
559 : : {
560 : : /*
561 : : * When databases are created with the file-copy strategy,
562 : : * there are no WAL records to tell us about the creation of
563 : : * individual relations.
564 : : */
565 [ + + ]: 78 : if (record_type == XLOG_DBASE_CREATE_FILE_COPY)
566 : : {
567 : 16 : xl_dbase_create_file_copy_rec *xlrec =
568 : : (xl_dbase_create_file_copy_rec *) record->main_data;
648 rhaas@postgresql.org 569 : 16 : RelFileLocator rlocator =
570 : 16 : {InvalidOid, xlrec->db_id, InvalidRelFileNumber};
571 : :
572 : : /*
573 : : * Don't try to prefetch anything in this database until
574 : : * it has been created, or we might confuse the blocks of
575 : : * different generations, if a database OID or
576 : : * relfilenumber is reused. It's also more efficient than
577 : : * discovering that relations don't exist on disk yet with
578 : : * ENOENT errors.
579 : : */
580 : 16 : XLogPrefetcherAddFilter(prefetcher, rlocator, 0, record->lsn);
581 : :
582 : : #ifdef XLOGPREFETCHER_DEBUG_LEVEL
583 : : elog(XLOGPREFETCHER_DEBUG_LEVEL,
584 : : "suppressing prefetch in database %u until %X/%X is replayed due to raw file copy",
585 : : rlocator.dbOid,
586 : : LSN_FORMAT_ARGS(record->lsn));
587 : : #endif
588 : : }
589 : : }
738 tmunro@postgresql.or 590 [ + + ]: 6030920 : else if (rmid == RM_SMGR_ID)
591 : : {
592 [ + + ]: 18512 : if (record_type == XLOG_SMGR_CREATE)
593 : : {
594 : 18459 : xl_smgr_create *xlrec = (xl_smgr_create *)
595 : : record->main_data;
596 : :
597 [ + + ]: 18459 : if (xlrec->forkNum == MAIN_FORKNUM)
598 : : {
599 : : /*
600 : : * Don't prefetch anything for this whole relation
601 : : * until it has been created. Otherwise we might
602 : : * confuse the blocks of different generations, if a
603 : : * relfilenumber is reused. This also avoids the need
604 : : * to discover the problem via extra syscalls that
605 : : * report ENOENT.
606 : : */
648 rhaas@postgresql.org 607 : 15934 : XLogPrefetcherAddFilter(prefetcher, xlrec->rlocator, 0,
608 : : record->lsn);
609 : :
610 : : #ifdef XLOGPREFETCHER_DEBUG_LEVEL
611 : : elog(XLOGPREFETCHER_DEBUG_LEVEL,
612 : : "suppressing prefetch in relation %u/%u/%u until %X/%X is replayed, which creates the relation",
613 : : xlrec->rlocator.spcOid,
614 : : xlrec->rlocator.dbOid,
615 : : xlrec->rlocator.relNumber,
616 : : LSN_FORMAT_ARGS(record->lsn));
617 : : #endif
618 : : }
619 : : }
738 tmunro@postgresql.or 620 [ + - ]: 53 : else if (record_type == XLOG_SMGR_TRUNCATE)
621 : : {
622 : 53 : xl_smgr_truncate *xlrec = (xl_smgr_truncate *)
623 : : record->main_data;
624 : :
625 : : /*
626 : : * Don't consider prefetching anything in the truncated
627 : : * range until the truncation has been performed.
628 : : */
648 rhaas@postgresql.org 629 : 53 : XLogPrefetcherAddFilter(prefetcher, xlrec->rlocator,
630 : : xlrec->blkno,
631 : : record->lsn);
632 : :
633 : : #ifdef XLOGPREFETCHER_DEBUG_LEVEL
634 : : elog(XLOGPREFETCHER_DEBUG_LEVEL,
635 : : "suppressing prefetch in relation %u/%u/%u from block %u until %X/%X is replayed, which truncates the relation",
636 : : xlrec->rlocator.spcOid,
637 : : xlrec->rlocator.dbOid,
638 : : xlrec->rlocator.relNumber,
639 : : xlrec->blkno,
640 : : LSN_FORMAT_ARGS(record->lsn));
641 : : #endif
642 : : }
643 : : }
644 : : }
645 : :
646 : : /* Scan the block references, starting where we left off last time. */
738 tmunro@postgresql.or 647 [ + + ]: 6124254 : while (prefetcher->next_block_id <= record->max_block_id)
648 : : {
649 : 3140300 : int block_id = prefetcher->next_block_id++;
650 : 3140300 : DecodedBkpBlock *block = &record->blocks[block_id];
651 : : SMgrRelation reln;
652 : : PrefetchBufferResult result;
653 : :
654 [ + + ]: 3140300 : if (!block->in_use)
655 : 2130 : continue;
656 : :
600 john.naylor@postgres 657 [ - + ]: 3138170 : Assert(!BufferIsValid(block->prefetch_buffer));
658 : :
659 : : /*
660 : : * Record the LSN of this record. When it's replayed,
661 : : * LsnReadQueue will consider any IOs submitted for earlier LSNs
662 : : * to be finished.
663 : : */
738 tmunro@postgresql.or 664 : 3138170 : *lsn = record->lsn;
665 : :
666 : : /* We don't try to prefetch anything but the main fork for now. */
667 [ + + ]: 3138170 : if (block->forknum != MAIN_FORKNUM)
668 : : {
669 : 3137949 : return LRQ_NEXT_NO_IO;
670 : : }
671 : :
672 : : /*
673 : : * If there is a full page image attached, we won't be reading the
674 : : * page, so don't bother trying to prefetch.
675 : : */
676 [ + + ]: 3124309 : if (block->has_image)
677 : : {
678 : 54929 : XLogPrefetchIncrement(&SharedStats->skip_fpw);
679 : 54929 : return LRQ_NEXT_NO_IO;
680 : : }
681 : :
682 : : /* There is no point in reading a page that will be zeroed. */
683 [ + + ]: 3069380 : if (block->flags & BKPBLOCK_WILL_INIT)
684 : : {
685 : 52925 : XLogPrefetchIncrement(&SharedStats->skip_init);
686 : 52925 : return LRQ_NEXT_NO_IO;
687 : : }
688 : :
689 : : /* Should we skip prefetching this block due to a filter? */
648 rhaas@postgresql.org 690 [ + + ]: 3016455 : if (XLogPrefetcherIsFiltered(prefetcher, block->rlocator, block->blkno))
691 : : {
738 tmunro@postgresql.or 692 : 352595 : XLogPrefetchIncrement(&SharedStats->skip_new);
693 : 352595 : return LRQ_NEXT_NO_IO;
694 : : }
695 : :
696 : : /* There is no point in repeatedly prefetching the same block. */
697 [ + + ]: 8125036 : for (int i = 0; i < XLOGPREFETCHER_SEQ_WINDOW_SIZE; ++i)
698 : : {
699 [ + + ]: 7536378 : if (block->blkno == prefetcher->recent_block[i] &&
648 rhaas@postgresql.org 700 [ + + + + : 2199123 : RelFileLocatorEquals(block->rlocator, prefetcher->recent_rlocator[i]))
+ + ]
701 : : {
702 : : /*
703 : : * XXX If we also remembered where it was, we could set
704 : : * recent_buffer so that recovery could skip smgropen()
705 : : * and a buffer table lookup.
706 : : */
738 tmunro@postgresql.or 707 : 2075202 : XLogPrefetchIncrement(&SharedStats->skip_rep);
708 : 2075202 : return LRQ_NEXT_NO_IO;
709 : : }
710 : : }
648 rhaas@postgresql.org 711 : 588658 : prefetcher->recent_rlocator[prefetcher->recent_idx] = block->rlocator;
738 tmunro@postgresql.or 712 : 588658 : prefetcher->recent_block[prefetcher->recent_idx] = block->blkno;
713 : 588658 : prefetcher->recent_idx =
714 : 588658 : (prefetcher->recent_idx + 1) % XLOGPREFETCHER_SEQ_WINDOW_SIZE;
715 : :
716 : : /*
717 : : * We could try to have a fast path for repeated references to the
718 : : * same relation (with some scheme to handle invalidations
719 : : * safely), but for now we'll call smgropen() every time.
720 : : */
42 heikki.linnakangas@i 721 :GNC 588658 : reln = smgropen(block->rlocator, INVALID_PROC_NUMBER);
722 : :
723 : : /*
724 : : * If the relation file doesn't exist on disk, for example because
725 : : * we're replaying after a crash and the file will be created and
726 : : * then unlinked by WAL that hasn't been replayed yet, suppress
727 : : * further prefetching in the relation until this record is
728 : : * replayed.
729 : : */
738 tmunro@postgresql.or 730 [ + + ]:CBC 588658 : if (!smgrexists(reln, MAIN_FORKNUM))
731 : : {
732 : : #ifdef XLOGPREFETCHER_DEBUG_LEVEL
733 : : elog(XLOGPREFETCHER_DEBUG_LEVEL,
734 : : "suppressing all prefetch in relation %u/%u/%u until %X/%X is replayed, because the relation does not exist on disk",
735 : : reln->smgr_rlocator.locator.spcOid,
736 : : reln->smgr_rlocator.locator.dbOid,
737 : : reln->smgr_rlocator.locator.relNumber,
738 : : LSN_FORMAT_ARGS(record->lsn));
739 : : #endif
648 rhaas@postgresql.org 740 :GBC 6 : XLogPrefetcherAddFilter(prefetcher, block->rlocator, 0,
741 : : record->lsn);
738 tmunro@postgresql.or 742 : 6 : XLogPrefetchIncrement(&SharedStats->skip_new);
743 : 6 : return LRQ_NEXT_NO_IO;
744 : : }
745 : :
746 : : /*
747 : : * If the relation isn't big enough to contain the referenced
748 : : * block yet, suppress prefetching of this block and higher until
749 : : * this record is replayed.
750 : : */
738 tmunro@postgresql.or 751 [ + + ]:CBC 588652 : if (block->blkno >= smgrnblocks(reln, block->forknum))
752 : : {
753 : : #ifdef XLOGPREFETCHER_DEBUG_LEVEL
754 : : elog(XLOGPREFETCHER_DEBUG_LEVEL,
755 : : "suppressing prefetch in relation %u/%u/%u from block %u until %X/%X is replayed, because the relation is too small",
756 : : reln->smgr_rlocator.locator.spcOid,
757 : : reln->smgr_rlocator.locator.dbOid,
758 : : reln->smgr_rlocator.locator.relNumber,
759 : : block->blkno,
760 : : LSN_FORMAT_ARGS(record->lsn));
761 : : #endif
648 rhaas@postgresql.org 762 : 14798 : XLogPrefetcherAddFilter(prefetcher, block->rlocator, block->blkno,
763 : : record->lsn);
738 tmunro@postgresql.or 764 : 14798 : XLogPrefetchIncrement(&SharedStats->skip_new);
765 : 14798 : return LRQ_NEXT_NO_IO;
766 : : }
767 : :
768 : : /* Try to initiate prefetching. */
769 : 573854 : result = PrefetchSharedBuffer(reln, block->forknum, block->blkno);
770 [ + + ]: 573854 : if (BufferIsValid(result.recent_buffer))
771 : : {
772 : : /* Cache hit, nothing to do. */
773 : 518848 : XLogPrefetchIncrement(&SharedStats->hit);
774 : 518848 : block->prefetch_buffer = result.recent_buffer;
775 : 518848 : return LRQ_NEXT_NO_IO;
776 : : }
777 [ + + ]: 55006 : else if (result.initiated_io)
778 : : {
779 : : /* Cache miss, I/O (presumably) started. */
780 : 54785 : XLogPrefetchIncrement(&SharedStats->prefetch);
781 : 54785 : block->prefetch_buffer = InvalidBuffer;
782 : 54785 : return LRQ_NEXT_IO;
783 : : }
372 784 [ - + ]: 221 : else if ((io_direct_flags & IO_DIRECT_DATA) == 0)
785 : : {
786 : : /*
787 : : * This shouldn't be possible, because we already determined
788 : : * that the relation exists on disk and is big enough.
789 : : * Something is wrong with the cache invalidation for
790 : : * smgrexists(), smgrnblocks(), or the file was unlinked or
791 : : * truncated beneath our feet?
792 : : */
738 tmunro@postgresql.or 793 [ # # ]:UBC 0 : elog(ERROR,
794 : : "could not prefetch relation %u/%u/%u block %u",
795 : : reln->smgr_rlocator.locator.spcOid,
796 : : reln->smgr_rlocator.locator.dbOid,
797 : : reln->smgr_rlocator.locator.relNumber,
798 : : block->blkno);
799 : : }
800 : : }
801 : :
802 : : /*
803 : : * Several callsites need to be able to read exactly one record
804 : : * without any internal readahead. Examples: xlog.c reading
805 : : * checkpoint records with emode set to PANIC, which might otherwise
806 : : * cause XLogPageRead() to panic on some future page, and xlog.c
807 : : * determining where to start writing WAL next, which depends on the
808 : : * contents of the reader's internal buffer after reading one record.
809 : : * Therefore, don't even think about prefetching until the first
810 : : * record after XLogPrefetcherBeginRead() has been consumed.
811 : : */
738 tmunro@postgresql.or 812 [ + + ]:CBC 2983954 : if (prefetcher->reader->decode_queue_tail &&
813 [ + + ]: 2983953 : prefetcher->reader->decode_queue_tail->lsn == prefetcher->begin_ptr)
814 : 1790 : return LRQ_NEXT_AGAIN;
815 : :
816 : : /* Advance to the next record. */
817 : 2982164 : prefetcher->record = NULL;
818 : : }
819 : : pg_unreachable();
820 : : }
821 : :
822 : : /*
823 : : * Expose statistics about recovery prefetching.
824 : : */
825 : : Datum
738 tmunro@postgresql.or 826 :GBC 6 : pg_stat_get_recovery_prefetch(PG_FUNCTION_ARGS)
827 : : {
828 : : #define PG_STAT_GET_RECOVERY_PREFETCH_COLS 10
829 : 6 : ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
830 : : Datum values[PG_STAT_GET_RECOVERY_PREFETCH_COLS];
831 : : bool nulls[PG_STAT_GET_RECOVERY_PREFETCH_COLS];
832 : :
544 michael@paquier.xyz 833 : 6 : InitMaterializedSRF(fcinfo, 0);
834 : :
738 tmunro@postgresql.or 835 [ + + ]: 66 : for (int i = 0; i < PG_STAT_GET_RECOVERY_PREFETCH_COLS; ++i)
836 : 60 : nulls[i] = false;
837 : :
838 : 6 : values[0] = TimestampTzGetDatum(pg_atomic_read_u64(&SharedStats->reset_time));
839 : 6 : values[1] = Int64GetDatum(pg_atomic_read_u64(&SharedStats->prefetch));
840 : 6 : values[2] = Int64GetDatum(pg_atomic_read_u64(&SharedStats->hit));
841 : 6 : values[3] = Int64GetDatum(pg_atomic_read_u64(&SharedStats->skip_init));
842 : 6 : values[4] = Int64GetDatum(pg_atomic_read_u64(&SharedStats->skip_new));
843 : 6 : values[5] = Int64GetDatum(pg_atomic_read_u64(&SharedStats->skip_fpw));
844 : 6 : values[6] = Int64GetDatum(pg_atomic_read_u64(&SharedStats->skip_rep));
845 : 6 : values[7] = Int32GetDatum(SharedStats->wal_distance);
846 : 6 : values[8] = Int32GetDatum(SharedStats->block_distance);
847 : 6 : values[9] = Int32GetDatum(SharedStats->io_depth);
848 : 6 : tuplestore_putvalues(rsinfo->setResult, rsinfo->setDesc, values, nulls);
849 : :
850 : 6 : return (Datum) 0;
851 : : }
852 : :
853 : : /*
854 : : * Don't prefetch any blocks >= 'blockno' from a given 'rlocator', until 'lsn'
855 : : * has been replayed.
856 : : */
857 : : static inline void
648 rhaas@postgresql.org 858 :CBC 30807 : XLogPrefetcherAddFilter(XLogPrefetcher *prefetcher, RelFileLocator rlocator,
859 : : BlockNumber blockno, XLogRecPtr lsn)
860 : : {
861 : : XLogPrefetcherFilter *filter;
862 : : bool found;
863 : :
864 : 30807 : filter = hash_search(prefetcher->filter_table, &rlocator, HASH_ENTER, &found);
738 tmunro@postgresql.or 865 [ + + ]: 30807 : if (!found)
866 : : {
867 : : /*
868 : : * Don't allow any prefetching of this block or higher until replayed.
869 : : */
870 : 30789 : filter->filter_until_replayed = lsn;
871 : 30789 : filter->filter_from_block = blockno;
872 : 30789 : dlist_push_head(&prefetcher->filter_queue, &filter->link);
873 : : }
874 : : else
875 : : {
876 : : /*
877 : : * We were already filtering this rlocator. Extend the filter's
878 : : * lifetime to cover this WAL record, but leave the lower of the block
879 : : * numbers there because we don't want to have to track individual
880 : : * blocks.
881 : : */
882 : 18 : filter->filter_until_replayed = lsn;
883 : 18 : dlist_delete(&filter->link);
884 : 18 : dlist_push_head(&prefetcher->filter_queue, &filter->link);
885 : 18 : filter->filter_from_block = Min(filter->filter_from_block, blockno);
886 : : }
887 : 30807 : }
888 : :
889 : : /*
890 : : * Have we replayed any records that caused us to begin filtering a block
891 : : * range? That means that relations should have been created, extended or
892 : : * dropped as required, so we can stop filtering out accesses to a given
893 : : * relfilenumber.
894 : : */
895 : : static inline void
896 : 2984115 : XLogPrefetcherCompleteFilters(XLogPrefetcher *prefetcher, XLogRecPtr replaying_lsn)
897 : : {
898 [ + + ]: 3014899 : while (unlikely(!dlist_is_empty(&prefetcher->filter_queue)))
899 : : {
900 : 738900 : XLogPrefetcherFilter *filter = dlist_tail_element(XLogPrefetcherFilter,
901 : : link,
902 : : &prefetcher->filter_queue);
903 : :
904 [ + + ]: 738900 : if (filter->filter_until_replayed >= replaying_lsn)
905 : 708116 : break;
906 : :
907 : 30784 : dlist_delete(&filter->link);
908 : 30784 : hash_search(prefetcher->filter_table, filter, HASH_REMOVE, NULL);
909 : : }
910 : 2984115 : }
911 : :
912 : : /*
913 : : * Check if a given block should be skipped due to a filter.
914 : : */
915 : : static inline bool
648 rhaas@postgresql.org 916 : 3016455 : XLogPrefetcherIsFiltered(XLogPrefetcher *prefetcher, RelFileLocator rlocator,
917 : : BlockNumber blockno)
918 : : {
919 : : /*
920 : : * Test for empty queue first, because we expect it to be empty most of
921 : : * the time and we can avoid the hash table lookup in that case.
922 : : */
738 tmunro@postgresql.or 923 [ + + ]: 3016455 : if (unlikely(!dlist_is_empty(&prefetcher->filter_queue)))
924 : : {
925 : : XLogPrefetcherFilter *filter;
926 : :
927 : : /* See if the block range is filtered. */
648 rhaas@postgresql.org 928 : 686473 : filter = hash_search(prefetcher->filter_table, &rlocator, HASH_FIND, NULL);
738 tmunro@postgresql.or 929 [ + + + + ]: 686473 : if (filter && filter->filter_from_block <= blockno)
930 : : {
931 : : #ifdef XLOGPREFETCHER_DEBUG_LEVEL
932 : : elog(XLOGPREFETCHER_DEBUG_LEVEL,
933 : : "prefetch of %u/%u/%u block %u suppressed; filtering until LSN %X/%X is replayed (blocks >= %u filtered)",
934 : : rlocator.spcOid, rlocator.dbOid, rlocator.relNumber, blockno,
935 : : LSN_FORMAT_ARGS(filter->filter_until_replayed),
936 : : filter->filter_from_block);
937 : : #endif
938 : 352595 : return true;
939 : : }
940 : :
941 : : /* See if the whole database is filtered. */
648 rhaas@postgresql.org 942 : 333878 : rlocator.relNumber = InvalidRelFileNumber;
943 : 333878 : rlocator.spcOid = InvalidOid;
944 : 333878 : filter = hash_search(prefetcher->filter_table, &rlocator, HASH_FIND, NULL);
738 tmunro@postgresql.or 945 [ - + ]: 333878 : if (filter)
946 : : {
947 : : #ifdef XLOGPREFETCHER_DEBUG_LEVEL
948 : : elog(XLOGPREFETCHER_DEBUG_LEVEL,
949 : : "prefetch of %u/%u/%u block %u suppressed; filtering until LSN %X/%X is replayed (whole database)",
950 : : rlocator.spcOid, rlocator.dbOid, rlocator.relNumber, blockno,
951 : : LSN_FORMAT_ARGS(filter->filter_until_replayed));
952 : : #endif
738 tmunro@postgresql.or 953 :UBC 0 : return true;
954 : : }
955 : : }
956 : :
738 tmunro@postgresql.or 957 :CBC 2663860 : return false;
958 : : }
959 : :
960 : : /*
961 : : * A wrapper for XLogBeginRead() that also resets the prefetcher.
962 : : */
963 : : void
964 : 1790 : XLogPrefetcherBeginRead(XLogPrefetcher *prefetcher, XLogRecPtr recPtr)
965 : : {
966 : : /* This will forget about any in-flight IO. */
967 : 1790 : prefetcher->reconfigure_count--;
968 : :
969 : : /* Book-keeping to avoid readahead on first read. */
970 : 1790 : prefetcher->begin_ptr = recPtr;
971 : :
972 : 1790 : prefetcher->no_readahead_until = 0;
973 : :
974 : : /* This will forget about any queued up records in the decoder. */
975 : 1790 : XLogBeginRead(prefetcher->reader, recPtr);
976 : 1790 : }
977 : :
978 : : /*
979 : : * A wrapper for XLogReadRecord() that provides the same interface, but also
980 : : * tries to initiate I/O for blocks referenced in future WAL records.
981 : : */
982 : : XLogRecord *
983 : 2984115 : XLogPrefetcherReadRecord(XLogPrefetcher *prefetcher, char **errmsg)
984 : : {
985 : : DecodedXLogRecord *record;
986 : : XLogRecPtr replayed_up_to;
987 : :
988 : : /*
989 : : * See if it's time to reset the prefetching machinery, because a relevant
990 : : * GUC was changed.
991 : : */
992 [ + + ]: 2984115 : if (unlikely(XLogPrefetchReconfigureCount != prefetcher->reconfigure_count))
993 : : {
994 : : uint32 max_distance;
995 : : uint32 max_inflight;
996 : :
997 [ + + ]: 1816 : if (prefetcher->streaming_read)
998 : 993 : lrq_free(prefetcher->streaming_read);
999 : :
1000 [ + - + - ]: 1816 : if (RecoveryPrefetchEnabled())
1001 : : {
584 1002 [ - + ]: 1816 : Assert(maintenance_io_concurrency > 0);
1003 : 1816 : max_inflight = maintenance_io_concurrency;
738 1004 : 1816 : max_distance = max_inflight * XLOGPREFETCHER_DISTANCE_MULTIPLIER;
1005 : : }
1006 : : else
1007 : : {
738 tmunro@postgresql.or 1008 :UBC 0 : max_inflight = 1;
1009 : 0 : max_distance = 1;
1010 : : }
1011 : :
738 tmunro@postgresql.or 1012 :CBC 1816 : prefetcher->streaming_read = lrq_alloc(max_distance,
1013 : : max_inflight,
1014 : : (uintptr_t) prefetcher,
1015 : : XLogPrefetcherNextBlock);
1016 : :
1017 : 1816 : prefetcher->reconfigure_count = XLogPrefetchReconfigureCount;
1018 : : }
1019 : :
1020 : : /*
1021 : : * Release last returned record, if there is one, as it's now been
1022 : : * replayed.
1023 : : */
584 1024 : 2984115 : replayed_up_to = XLogReleasePreviousRecord(prefetcher->reader);
1025 : :
1026 : : /*
1027 : : * Can we drop any filters yet? If we were waiting for a relation to be
1028 : : * created or extended, it is now OK to access blocks in the covered
1029 : : * range.
1030 : : */
1031 : 2984115 : XLogPrefetcherCompleteFilters(prefetcher, replayed_up_to);
1032 : :
1033 : : /*
1034 : : * All IO initiated by earlier WAL is now completed. This might trigger
1035 : : * further prefetching.
1036 : : */
1037 : 2984115 : lrq_complete_lsn(prefetcher->streaming_read, replayed_up_to);
1038 : :
1039 : : /*
1040 : : * If there's nothing queued yet, then start prefetching to cause at least
1041 : : * one record to be queued.
1042 : : */
738 1043 [ + + ]: 2984032 : if (!XLogReaderHasQueuedRecordOrError(prefetcher->reader))
1044 : : {
584 1045 [ - + ]: 17 : Assert(lrq_inflight(prefetcher->streaming_read) == 0);
1046 [ - + ]: 17 : Assert(lrq_completed(prefetcher->streaming_read) == 0);
738 1047 : 17 : lrq_prefetch(prefetcher->streaming_read);
1048 : : }
1049 : :
1050 : : /* Read the next record. */
1051 : 2984032 : record = XLogNextRecord(prefetcher->reader, errmsg);
1052 [ + + ]: 2984032 : if (!record)
1053 : 357 : return NULL;
1054 : :
1055 : : /*
1056 : : * The record we just got is the "current" one, for the benefit of the
1057 : : * XLogRecXXX() macros.
1058 : : */
1059 [ - + ]: 2983675 : Assert(record == prefetcher->reader->record);
1060 : :
1061 : : /*
1062 : : * If maintenance_io_concurrency is set very low, we might have started
1063 : : * prefetching some but not all of the blocks referenced in the record
1064 : : * we're about to return. Forget about the rest of the blocks in this
1065 : : * record by dropping the prefetcher's reference to it.
1066 : : */
584 1067 [ + + ]: 2983675 : if (record == prefetcher->record)
1068 : 1790 : prefetcher->record = NULL;
1069 : :
1070 : : /*
1071 : : * See if it's time to compute some statistics, because enough WAL has
1072 : : * been processed.
1073 : : */
738 1074 [ + + ]: 2983675 : if (unlikely(record->lsn >= prefetcher->next_stats_shm_lsn))
1075 : 57510 : XLogPrefetcherComputeStats(prefetcher);
1076 : :
1077 [ - + ]: 2983675 : Assert(record == prefetcher->reader->record);
1078 : :
1079 : 2983675 : return &record->header;
1080 : : }
1081 : :
1082 : : bool
1083 : 928 : check_recovery_prefetch(int *new_value, void **extra, GucSource source)
1084 : : {
1085 : : #ifndef USE_PREFETCH
1086 : : if (*new_value == RECOVERY_PREFETCH_ON)
1087 : : {
1088 : : GUC_check_errdetail("recovery_prefetch is not supported on platforms that lack posix_fadvise().");
1089 : : return false;
1090 : : }
1091 : : #endif
1092 : :
1093 : 928 : return true;
1094 : : }
1095 : :
1096 : : void
1097 : 928 : assign_recovery_prefetch(int new_value, void *extra)
1098 : : {
1099 : : /* Reconfigure prefetching, because a setting it depends on changed. */
1100 : 928 : recovery_prefetch = new_value;
1101 [ - + ]: 928 : if (AmStartupProcess())
738 tmunro@postgresql.or 1102 :UBC 0 : XLogPrefetchReconfigure();
738 tmunro@postgresql.or 1103 :CBC 928 : }
|