Age Owner Branch data TLA Line data Source code
1 : : /*-------------------------------------------------------------------------
2 : : *
3 : : * read_stream.c
4 : : * Mechanism for accessing buffered relation data with look-ahead
5 : : *
6 : : * Code that needs to access relation data typically pins blocks one at a
7 : : * time, often in a predictable order that might be sequential or data-driven.
8 : : * Calling the simple ReadBuffer() function for each block is inefficient,
9 : : * because blocks that are not yet in the buffer pool require I/O operations
10 : : * that are small and might stall waiting for storage. This mechanism looks
11 : : * into the future and calls StartReadBuffers() and WaitReadBuffers() to read
12 : : * neighboring blocks together and ahead of time, with an adaptive look-ahead
13 : : * distance.
14 : : *
15 : : * A user-provided callback generates a stream of block numbers that is used
16 : : * to form reads of up to io_combine_limit, by attempting to merge them with a
17 : : * pending read. When that isn't possible, the existing pending read is sent
18 : : * to StartReadBuffers() so that a new one can begin to form.
19 : : *
20 : : * The algorithm for controlling the look-ahead distance tries to classify the
21 : : * stream into three ideal behaviors:
22 : : *
23 : : * A) No I/O is necessary, because the requested blocks are fully cached
24 : : * already. There is no benefit to looking ahead more than one block, so
25 : : * distance is 1. This is the default initial assumption.
26 : : *
27 : : * B) I/O is necessary, but fadvise is undesirable because the access is
28 : : * sequential, or impossible because direct I/O is enabled or the system
29 : : * doesn't support advice. There is no benefit in looking ahead more than
30 : : * io_combine_limit, because in this case only goal is larger read system
31 : : * calls. Looking further ahead would pin many buffers and perform
32 : : * speculative work looking ahead for no benefit.
33 : : *
34 : : * C) I/O is necesssary, it appears random, and this system supports fadvise.
35 : : * We'll look further ahead in order to reach the configured level of I/O
36 : : * concurrency.
37 : : *
38 : : * The distance increases rapidly and decays slowly, so that it moves towards
39 : : * those levels as different I/O patterns are discovered. For example, a
40 : : * sequential scan of fully cached data doesn't bother looking ahead, but a
41 : : * sequential scan that hits a region of uncached blocks will start issuing
42 : : * increasingly wide read calls until it plateaus at io_combine_limit.
43 : : *
44 : : * The main data structure is a circular queue of buffers of size
45 : : * max_pinned_buffers plus some extra space for technical reasons, ready to be
46 : : * returned by read_stream_next_buffer(). Each buffer also has an optional
47 : : * variable sized object that is passed from the callback to the consumer of
48 : : * buffers.
49 : : *
50 : : * Parallel to the queue of buffers, there is a circular queue of in-progress
51 : : * I/Os that have been started with StartReadBuffers(), and for which
52 : : * WaitReadBuffers() must be called before returning the buffer.
53 : : *
54 : : * For example, if the callback return block numbers 10, 42, 43, 60 in
55 : : * successive calls, then these data structures might appear as follows:
56 : : *
57 : : * buffers buf/data ios
58 : : *
59 : : * +----+ +-----+ +--------+
60 : : * | | | | +----+ 42..44 | <- oldest_io_index
61 : : * +----+ +-----+ | +--------+
62 : : * oldest_buffer_index -> | 10 | | ? | | +--+ 60..60 |
63 : : * +----+ +-----+ | | +--------+
64 : : * | 42 | | ? |<-+ | | | <- next_io_index
65 : : * +----+ +-----+ | +--------+
66 : : * | 43 | | ? | | | |
67 : : * +----+ +-----+ | +--------+
68 : : * | 44 | | ? | | | |
69 : : * +----+ +-----+ | +--------+
70 : : * | 60 | | ? |<---+
71 : : * +----+ +-----+
72 : : * next_buffer_index -> | | | |
73 : : * +----+ +-----+
74 : : *
75 : : * In the example, 5 buffers are pinned, and the next buffer to be streamed to
76 : : * the client is block 10. Block 10 was a hit and has no associated I/O, but
77 : : * the range 42..44 requires an I/O wait before its buffers are returned, as
78 : : * does block 60.
79 : : *
80 : : *
81 : : * Portions Copyright (c) 2024, PostgreSQL Global Development Group
82 : : * Portions Copyright (c) 1994, Regents of the University of California
83 : : *
84 : : * IDENTIFICATION
85 : : * src/backend/storage/aio/read_stream.c
86 : : *
87 : : *-------------------------------------------------------------------------
88 : : */
89 : : #include "postgres.h"
90 : :
91 : : #include "catalog/pg_tablespace.h"
92 : : #include "miscadmin.h"
93 : : #include "storage/fd.h"
94 : : #include "storage/smgr.h"
95 : : #include "storage/read_stream.h"
96 : : #include "utils/memdebug.h"
97 : : #include "utils/rel.h"
98 : : #include "utils/spccache.h"
99 : :
100 : : typedef struct InProgressIO
101 : : {
102 : : int16 buffer_index;
103 : : ReadBuffersOperation op;
104 : : } InProgressIO;
105 : :
106 : : /*
107 : : * State for managing a stream of reads.
108 : : */
109 : : struct ReadStream
110 : : {
111 : : int16 max_ios;
112 : : int16 ios_in_progress;
113 : : int16 queue_size;
114 : : int16 max_pinned_buffers;
115 : : int16 pinned_buffers;
116 : : int16 distance;
117 : : bool advice_enabled;
118 : :
119 : : /*
120 : : * Small buffer of block numbers, useful for 'ungetting' to resolve flow
121 : : * control problems when I/Os are split. Also useful for batch-loading
122 : : * block numbers in the fast path.
123 : : */
124 : : BlockNumber blocknums[16];
125 : : int16 blocknums_count;
126 : : int16 blocknums_next;
127 : :
128 : : /*
129 : : * The callback that will tell us which block numbers to read, and an
130 : : * opaque pointer that will be pass to it for its own purposes.
131 : : */
132 : : ReadStreamBlockNumberCB callback;
133 : : void *callback_private_data;
134 : :
135 : : /* Next expected block, for detecting sequential access. */
136 : : BlockNumber seq_blocknum;
137 : :
138 : : /* The read operation we are currently preparing. */
139 : : BlockNumber pending_read_blocknum;
140 : : int16 pending_read_nblocks;
141 : :
142 : : /* Space for buffers and optional per-buffer private data. */
143 : : size_t per_buffer_data_size;
144 : : void *per_buffer_data;
145 : :
146 : : /* Read operations that have been started but not waited for yet. */
147 : : InProgressIO *ios;
148 : : int16 oldest_io_index;
149 : : int16 next_io_index;
150 : :
151 : : bool fast_path;
152 : :
153 : : /* Circular queue of buffers. */
154 : : int16 oldest_buffer_index; /* Next pinned buffer to return */
155 : : int16 next_buffer_index; /* Index of next buffer to pin */
156 : : Buffer buffers[FLEXIBLE_ARRAY_MEMBER];
157 : : };
158 : :
159 : : /*
160 : : * Return a pointer to the per-buffer data by index.
161 : : */
162 : : static inline void *
11 tmunro@postgresql.or 163 :GNC 1506190 : get_per_buffer_data(ReadStream *stream, int16 buffer_index)
164 : : {
165 : 3012380 : return (char *) stream->per_buffer_data +
166 : 1506190 : stream->per_buffer_data_size * buffer_index;
167 : : }
168 : :
169 : : /*
170 : : * Ask the callback which block it would like us to read next, with a small
171 : : * buffer in front to allow read_stream_unget_block() to work and to allow the
172 : : * fast path to skip this function and work directly from the array.
173 : : */
174 : : static inline BlockNumber
175 : 1506190 : read_stream_get_block(ReadStream *stream, void *per_buffer_data)
176 : : {
177 [ + + ]: 1506190 : if (stream->blocknums_next < stream->blocknums_count)
178 : 29545 : return stream->blocknums[stream->blocknums_next++];
179 : :
180 : : /*
181 : : * We only bother to fetch one at a time here (but see the fast path which
182 : : * uses more).
183 : : */
184 : 1476645 : return stream->callback(stream,
185 : : stream->callback_private_data,
186 : : per_buffer_data);
187 : : }
188 : :
189 : : /*
190 : : * In order to deal with short reads in StartReadBuffers(), we sometimes need
191 : : * to defer handling of a block until later.
192 : : */
193 : : static inline void
11 tmunro@postgresql.or 194 :UNC 0 : read_stream_unget_block(ReadStream *stream, BlockNumber blocknum)
195 : : {
196 [ # # ]: 0 : if (stream->blocknums_next == stream->blocknums_count)
197 : : {
198 : : /* Never initialized or entirely consumed. Re-initialize. */
199 : 0 : stream->blocknums[0] = blocknum;
200 : 0 : stream->blocknums_count = 1;
201 : 0 : stream->blocknums_next = 0;
202 : : }
203 : : else
204 : : {
205 : : /* Must be the last value return from blocknums array. */
206 [ # # ]: 0 : Assert(stream->blocknums_next > 0);
207 : 0 : stream->blocknums_next--;
208 [ # # ]: 0 : Assert(stream->blocknums[stream->blocknums_next] == blocknum);
209 : : }
210 : 0 : }
211 : :
212 : : #ifndef READ_STREAM_DISABLE_FAST_PATH
213 : : static void
11 tmunro@postgresql.or 214 :GNC 179148 : read_stream_fill_blocknums(ReadStream *stream)
215 : : {
216 : : BlockNumber blocknum;
217 : 179148 : int i = 0;
218 : :
219 : : do
220 : : {
221 : 1899178 : blocknum = stream->callback(stream,
222 : : stream->callback_private_data,
223 : : NULL);
224 : 1899178 : stream->blocknums[i++] = blocknum;
225 [ + + + + ]: 1899178 : } while (i < lengthof(stream->blocknums) &&
226 : : blocknum != InvalidBlockNumber);
227 : 179148 : stream->blocknums_count = i;
228 : 179148 : stream->blocknums_next = 0;
229 : 179148 : }
230 : : #endif
231 : :
232 : : static void
233 : 805176 : read_stream_start_pending_read(ReadStream *stream, bool suppress_advice)
234 : : {
235 : : bool need_wait;
236 : : int nblocks;
237 : : int flags;
238 : : int16 io_index;
239 : : int16 overflow;
240 : : int16 buffer_index;
241 : :
242 : : /* This should only be called with a pending read. */
243 [ - + ]: 805176 : Assert(stream->pending_read_nblocks > 0);
244 [ - + ]: 805176 : Assert(stream->pending_read_nblocks <= io_combine_limit);
245 : :
246 : : /* We had better not exceed the pin limit by starting this read. */
247 [ - + ]: 805176 : Assert(stream->pinned_buffers + stream->pending_read_nblocks <=
248 : : stream->max_pinned_buffers);
249 : :
250 : : /* We had better not be overwriting an existing pinned buffer. */
251 [ + + ]: 805176 : if (stream->pinned_buffers > 0)
252 [ - + ]: 2488 : Assert(stream->next_buffer_index != stream->oldest_buffer_index);
253 : : else
254 [ - + ]: 802688 : Assert(stream->next_buffer_index == stream->oldest_buffer_index);
255 : :
256 : : /*
257 : : * If advice hasn't been suppressed, this system supports it, and this
258 : : * isn't a strictly sequential pattern, then we'll issue advice.
259 : : */
260 [ + + ]: 805176 : if (!suppress_advice &&
261 [ + + ]: 377738 : stream->advice_enabled &&
262 [ + + ]: 18150 : stream->pending_read_blocknum != stream->seq_blocknum)
263 : 1873 : flags = READ_BUFFERS_ISSUE_ADVICE;
264 : : else
265 : 803303 : flags = 0;
266 : :
267 : : /* We say how many blocks we want to read, but may be smaller on return. */
268 : 805176 : buffer_index = stream->next_buffer_index;
269 : 805176 : io_index = stream->next_io_index;
270 : 805176 : nblocks = stream->pending_read_nblocks;
271 : 805176 : need_wait = StartReadBuffers(&stream->ios[io_index].op,
272 : 805176 : &stream->buffers[buffer_index],
273 : : stream->pending_read_blocknum,
274 : : &nblocks,
275 : : flags);
276 : 805176 : stream->pinned_buffers += nblocks;
277 : :
278 : : /* Remember whether we need to wait before returning this buffer. */
279 [ + + ]: 805176 : if (!need_wait)
280 : : {
281 : : /* Look-ahead distance decays, no I/O necessary (behavior A). */
282 [ + + ]: 559151 : if (stream->distance > 1)
283 : 1092 : stream->distance--;
284 : : }
285 : : else
286 : : {
287 : : /*
288 : : * Remember to call WaitReadBuffers() before returning head buffer.
289 : : * Look-ahead distance will be adjusted after waiting.
290 : : */
291 : 246025 : stream->ios[io_index].buffer_index = buffer_index;
292 [ + + ]: 246025 : if (++stream->next_io_index == stream->max_ios)
293 : 232738 : stream->next_io_index = 0;
294 [ - + ]: 246025 : Assert(stream->ios_in_progress < stream->max_ios);
295 : 246025 : stream->ios_in_progress++;
296 : 246025 : stream->seq_blocknum = stream->pending_read_blocknum + nblocks;
297 : : }
298 : :
299 : : /*
300 : : * We gave a contiguous range of buffer space to StartReadBuffers(), but
301 : : * we want it to wrap around at queue_size. Slide overflowing buffers to
302 : : * the front of the array.
303 : : */
304 : 805176 : overflow = (buffer_index + nblocks) - stream->queue_size;
305 [ + + ]: 805176 : if (overflow > 0)
306 : 1782 : memmove(&stream->buffers[0],
307 : 1782 : &stream->buffers[stream->queue_size],
308 : : sizeof(stream->buffers[0]) * overflow);
309 : :
310 : : /* Compute location of start of next read, without using % operator. */
311 : 805176 : buffer_index += nblocks;
312 [ + + ]: 805176 : if (buffer_index >= stream->queue_size)
313 : 146665 : buffer_index -= stream->queue_size;
314 [ + - - + ]: 805176 : Assert(buffer_index >= 0 && buffer_index < stream->queue_size);
315 : 805176 : stream->next_buffer_index = buffer_index;
316 : :
317 : : /* Adjust the pending read to cover the remaining portion, if any. */
318 : 805176 : stream->pending_read_blocknum += nblocks;
319 : 805176 : stream->pending_read_nblocks -= nblocks;
320 : 805176 : }
321 : :
322 : : static void
323 : 1587164 : read_stream_look_ahead(ReadStream *stream, bool suppress_advice)
324 : : {
325 [ + + ]: 2430531 : while (stream->ios_in_progress < stream->max_ios &&
326 [ + + ]: 2425072 : stream->pinned_buffers + stream->pending_read_nblocks < stream->distance)
327 : : {
328 : : BlockNumber blocknum;
329 : : int16 buffer_index;
330 : : void *per_buffer_data;
331 : :
332 [ - + ]: 1506190 : if (stream->pending_read_nblocks == io_combine_limit)
333 : : {
11 tmunro@postgresql.or 334 :UNC 0 : read_stream_start_pending_read(stream, suppress_advice);
335 : 0 : suppress_advice = false;
336 : 0 : continue;
337 : : }
338 : :
339 : : /*
340 : : * See which block the callback wants next in the stream. We need to
341 : : * compute the index of the Nth block of the pending read including
342 : : * wrap-around, but we don't want to use the expensive % operator.
343 : : */
11 tmunro@postgresql.or 344 :GNC 1506190 : buffer_index = stream->next_buffer_index + stream->pending_read_nblocks;
345 [ + + ]: 1506190 : if (buffer_index >= stream->queue_size)
346 : 19050 : buffer_index -= stream->queue_size;
347 [ + - - + ]: 1506190 : Assert(buffer_index >= 0 && buffer_index < stream->queue_size);
348 : 1506190 : per_buffer_data = get_per_buffer_data(stream, buffer_index);
349 : 1506190 : blocknum = read_stream_get_block(stream, per_buffer_data);
350 [ + + ]: 1506190 : if (blocknum == InvalidBlockNumber)
351 : : {
352 : : /* End of stream. */
353 : 662823 : stream->distance = 0;
354 : 662823 : break;
355 : : }
356 : :
357 : : /* Can we merge it with the pending read? */
358 [ + + ]: 843367 : if (stream->pending_read_nblocks > 0 &&
359 [ + - ]: 40325 : stream->pending_read_blocknum + stream->pending_read_nblocks == blocknum)
360 : : {
361 : 40325 : stream->pending_read_nblocks++;
362 : 40325 : continue;
363 : : }
364 : :
365 : : /* We have to start the pending read before we can build another. */
7 366 [ - + ]: 803042 : while (stream->pending_read_nblocks > 0)
367 : : {
11 tmunro@postgresql.or 368 :UNC 0 : read_stream_start_pending_read(stream, suppress_advice);
369 : 0 : suppress_advice = false;
370 [ # # ]: 0 : if (stream->ios_in_progress == stream->max_ios)
371 : : {
372 : : /* And we've hit the limit. Rewind, and stop here. */
373 : 0 : read_stream_unget_block(stream, blocknum);
374 : 0 : return;
375 : : }
376 : : }
377 : :
378 : : /* This is the start of a new pending read. */
11 tmunro@postgresql.or 379 :GNC 803042 : stream->pending_read_blocknum = blocknum;
380 : 803042 : stream->pending_read_nblocks = 1;
381 : : }
382 : :
383 : : /*
384 : : * We don't start the pending read just because we've hit the distance
385 : : * limit, preferring to give it another chance to grow to full
386 : : * io_combine_limit size once more buffers have been consumed. However,
387 : : * if we've already reached io_combine_limit, or we've reached the
388 : : * distance limit and there isn't anything pinned yet, or the callback has
389 : : * signaled end-of-stream, we start the read immediately.
390 : : */
391 [ + + ]: 1587164 : if (stream->pending_read_nblocks > 0 &&
392 [ + + ]: 828491 : (stream->pending_read_nblocks == io_combine_limit ||
393 [ + + ]: 827010 : (stream->pending_read_nblocks == stream->distance &&
394 [ - + ]: 800882 : stream->pinned_buffers == 0) ||
395 [ + + ]: 26128 : stream->distance == 0) &&
396 [ + + ]: 805213 : stream->ios_in_progress < stream->max_ios)
397 : 805176 : read_stream_start_pending_read(stream, suppress_advice);
398 : : }
399 : :
400 : : /*
401 : : * Create a new read stream object that can be used to perform the equivalent
402 : : * of a series of ReadBuffer() calls for one fork of one relation.
403 : : * Internally, it generates larger vectored reads where possible by looking
404 : : * ahead. The callback should return block numbers or InvalidBlockNumber to
405 : : * signal end-of-stream, and if per_buffer_data_size is non-zero, it may also
406 : : * write extra data for each block into the space provided to it. It will
407 : : * also receive callback_private_data for its own purposes.
408 : : */
409 : : ReadStream *
410 : 314077 : read_stream_begin_relation(int flags,
411 : : BufferAccessStrategy strategy,
412 : : Relation rel,
413 : : ForkNumber forknum,
414 : : ReadStreamBlockNumberCB callback,
415 : : void *callback_private_data,
416 : : size_t per_buffer_data_size)
417 : : {
418 : : ReadStream *stream;
419 : : size_t size;
420 : : int16 queue_size;
421 : : int16 max_ios;
422 : : int strategy_pin_limit;
423 : : uint32 max_pinned_buffers;
424 : : Oid tablespace_id;
425 : : SMgrRelation smgr;
426 : :
427 : 314077 : smgr = RelationGetSmgr(rel);
428 : :
429 : : /*
430 : : * Decide how many I/Os we will allow to run at the same time. That
431 : : * currently means advice to the kernel to tell it that we will soon read.
432 : : * This number also affects how far we look ahead for opportunities to
433 : : * start more I/Os.
434 : : */
435 : 314077 : tablespace_id = smgr->smgr_rlocator.locator.spcOid;
436 [ + + + + ]: 619172 : if (!OidIsValid(MyDatabaseId) ||
437 [ - + ]: 374642 : IsCatalogRelation(rel) ||
438 : 69547 : IsCatalogRelationOid(smgr->smgr_rlocator.locator.relNumber))
439 : : {
440 : : /*
441 : : * Avoid circularity while trying to look up tablespace settings or
442 : : * before spccache.c is ready.
443 : : */
444 : 244530 : max_ios = effective_io_concurrency;
445 : : }
446 [ + + ]: 69547 : else if (flags & READ_STREAM_MAINTENANCE)
447 : 2782 : max_ios = get_tablespace_maintenance_io_concurrency(tablespace_id);
448 : : else
449 : 66765 : max_ios = get_tablespace_io_concurrency(tablespace_id);
450 : 314077 : max_ios = Min(max_ios, PG_INT16_MAX);
451 : :
452 : : /*
453 : : * Choose the maximum number of buffers we're prepared to pin. We try to
454 : : * pin fewer if we can, though. We clamp it to at least io_combine_limit
455 : : * so that we can have a chance to build up a full io_combine_limit sized
456 : : * read, even when max_ios is zero. Be careful not to allow int16 to
457 : : * overflow (even though that's not possible with the current GUC range
458 : : * limits), allowing also for the spare entry and the overflow space.
459 : : */
460 : 314077 : max_pinned_buffers = Max(max_ios * 4, io_combine_limit);
461 : 314077 : max_pinned_buffers = Min(max_pinned_buffers,
462 : : PG_INT16_MAX - io_combine_limit - 1);
463 : :
464 : : /* Give the strategy a chance to limit the number of buffers we pin. */
8 465 : 314077 : strategy_pin_limit = GetAccessStrategyPinLimit(strategy);
466 : 314077 : max_pinned_buffers = Min(strategy_pin_limit, max_pinned_buffers);
467 : :
468 : : /* Don't allow this backend to pin more than its share of buffers. */
11 469 [ + + ]: 314077 : if (SmgrIsTemp(smgr))
470 : 5344 : LimitAdditionalLocalPins(&max_pinned_buffers);
471 : : else
472 : 308733 : LimitAdditionalPins(&max_pinned_buffers);
473 [ - + ]: 314077 : Assert(max_pinned_buffers > 0);
474 : :
475 : : /*
476 : : * We need one extra entry for buffers and per-buffer data, because users
477 : : * of per-buffer data have access to the object until the next call to
478 : : * read_stream_next_buffer(), so we need a gap between the head and tail
479 : : * of the queue so that we don't clobber it.
480 : : */
481 : 314077 : queue_size = max_pinned_buffers + 1;
482 : :
483 : : /*
484 : : * Allocate the object, the buffers, the ios and per_data_data space in
485 : : * one big chunk. Though we have queue_size buffers, we want to be able
486 : : * to assume that all the buffers for a single read are contiguous (i.e.
487 : : * don't wrap around halfway through), so we allow temporary overflows of
488 : : * up to the maximum possible read size by allocating an extra
489 : : * io_combine_limit - 1 elements.
490 : : */
491 : 314077 : size = offsetof(ReadStream, buffers);
492 : 314077 : size += sizeof(Buffer) * (queue_size + io_combine_limit - 1);
493 [ + - ]: 314077 : size += sizeof(InProgressIO) * Max(1, max_ios);
494 : 314077 : size += per_buffer_data_size * queue_size;
495 : 314077 : size += MAXIMUM_ALIGNOF * 2;
496 : 314077 : stream = (ReadStream *) palloc(size);
497 : 314077 : memset(stream, 0, offsetof(ReadStream, buffers));
498 : 314077 : stream->ios = (InProgressIO *)
499 : 314077 : MAXALIGN(&stream->buffers[queue_size + io_combine_limit - 1]);
500 [ - + ]: 314077 : if (per_buffer_data_size > 0)
11 tmunro@postgresql.or 501 :UNC 0 : stream->per_buffer_data = (void *)
502 [ # # ]: 0 : MAXALIGN(&stream->ios[Max(1, max_ios)]);
503 : :
504 : : #ifdef USE_PREFETCH
505 : :
506 : : /*
507 : : * This system supports prefetching advice. We can use it as long as
508 : : * direct I/O isn't enabled, the caller hasn't promised sequential access
509 : : * (overriding our detection heuristics), and max_ios hasn't been set to
510 : : * zero.
511 : : */
11 tmunro@postgresql.or 512 [ + + ]:GNC 314077 : if ((io_direct_flags & IO_DIRECT_DATA) == 0 &&
513 [ + + + - ]: 313974 : (flags & READ_STREAM_SEQUENTIAL) == 0 &&
514 : : max_ios > 0)
515 : 9570 : stream->advice_enabled = true;
516 : : #endif
517 : :
518 : : /*
519 : : * For now, max_ios = 0 is interpreted as max_ios = 1 with advice disabled
520 : : * above. If we had real asynchronous I/O we might need a slightly
521 : : * different definition.
522 : : */
523 [ - + ]: 314077 : if (max_ios == 0)
11 tmunro@postgresql.or 524 :UNC 0 : max_ios = 1;
525 : :
11 tmunro@postgresql.or 526 :GNC 314077 : stream->max_ios = max_ios;
527 : 314077 : stream->per_buffer_data_size = per_buffer_data_size;
528 : 314077 : stream->max_pinned_buffers = max_pinned_buffers;
529 : 314077 : stream->queue_size = queue_size;
530 : 314077 : stream->callback = callback;
531 : 314077 : stream->callback_private_data = callback_private_data;
532 : :
533 : : /*
534 : : * Skip the initial ramp-up phase if the caller says we're going to be
535 : : * reading the whole relation. This way we start out assuming we'll be
536 : : * doing full io_combine_limit sized reads (behavior B).
537 : : */
538 [ + + ]: 314077 : if (flags & READ_STREAM_FULL)
539 : 2751 : stream->distance = Min(max_pinned_buffers, io_combine_limit);
540 : : else
541 : 311326 : stream->distance = 1;
542 : :
543 : : /*
544 : : * Since we always always access the same relation, we can initialize
545 : : * parts of the ReadBuffersOperation objects and leave them that way, to
546 : : * avoid wasting CPU cycles writing to them for each read.
547 : : */
548 [ + + ]: 705377 : for (int i = 0; i < max_ios; ++i)
549 : : {
550 : 391300 : stream->ios[i].op.rel = rel;
551 : 391300 : stream->ios[i].op.smgr = RelationGetSmgr(rel);
552 : 391300 : stream->ios[i].op.smgr_persistence = 0;
553 : 391300 : stream->ios[i].op.forknum = forknum;
554 : 391300 : stream->ios[i].op.strategy = strategy;
555 : : }
556 : :
557 : 314077 : return stream;
558 : : }
559 : :
560 : : /*
561 : : * Pull one pinned buffer out of a stream. Each call returns successive
562 : : * blocks in the order specified by the callback. If per_buffer_data_size was
563 : : * set to a non-zero size, *per_buffer_data receives a pointer to the extra
564 : : * per-buffer data that the callback had a chance to populate, which remains
565 : : * valid until the next call to read_stream_next_buffer(). When the stream
566 : : * runs out of data, InvalidBuffer is returned. The caller may decide to end
567 : : * the stream early at any time by calling read_stream_end().
568 : : */
569 : : Buffer
570 : 3705969 : read_stream_next_buffer(ReadStream *stream, void **per_buffer_data)
571 : : {
572 : : Buffer buffer;
573 : : int16 oldest_buffer_index;
574 : :
575 : : #ifndef READ_STREAM_DISABLE_FAST_PATH
576 : :
577 : : /*
578 : : * A fast path for all-cached scans (behavior A). This is the same as the
579 : : * usual algorithm, but it is specialized for no I/O and no per-buffer
580 : : * data, so we can skip the queue management code, stay in the same buffer
581 : : * slot and use singular StartReadBuffer().
582 : : */
583 [ + + ]: 3705969 : if (likely(stream->fast_path))
584 : : {
585 : : BlockNumber next_blocknum;
586 : :
587 : : /* Fast path assumptions. */
588 [ - + ]: 1441940 : Assert(stream->ios_in_progress == 0);
589 [ - + ]: 1441940 : Assert(stream->pinned_buffers == 1);
590 [ - + ]: 1441940 : Assert(stream->distance == 1);
8 591 [ - + ]: 1441940 : Assert(stream->pending_read_nblocks == 0);
11 592 [ - + ]: 1441940 : Assert(stream->per_buffer_data_size == 0);
593 : :
594 : : /* We're going to return the buffer we pinned last time. */
595 : 1441940 : oldest_buffer_index = stream->oldest_buffer_index;
596 [ - + ]: 1441940 : Assert((oldest_buffer_index + 1) % stream->queue_size ==
597 : : stream->next_buffer_index);
598 : 1441940 : buffer = stream->buffers[oldest_buffer_index];
599 [ - + ]: 1441940 : Assert(buffer != InvalidBuffer);
600 : :
601 : : /* Choose the next block to pin. */
602 [ + + ]: 1441940 : if (unlikely(stream->blocknums_next == stream->blocknums_count))
603 : 179148 : read_stream_fill_blocknums(stream);
604 : 1441940 : next_blocknum = stream->blocknums[stream->blocknums_next++];
605 : :
8 606 [ + + ]: 1441940 : if (likely(next_blocknum != InvalidBlockNumber))
607 : : {
608 : : /*
609 : : * Pin a buffer for the next call. Same buffer entry, and
610 : : * arbitrary I/O entry (they're all free). We don't have to
611 : : * adjust pinned_buffers because we're transferring one to caller
612 : : * but pinning one more.
613 : : */
614 [ + + + + ]: 1374720 : if (likely(!StartReadBuffer(&stream->ios[0].op,
615 : : &stream->buffers[oldest_buffer_index],
616 : : next_blocknum,
617 : : stream->advice_enabled ?
618 : : READ_BUFFERS_ISSUE_ADVICE : 0)))
619 : : {
620 : : /* Fast return. */
621 : 1366469 : return buffer;
622 : : }
623 : :
624 : : /* Next call must wait for I/O for the newly pinned buffer. */
11 625 : 8251 : stream->oldest_io_index = 0;
626 : 8251 : stream->next_io_index = stream->max_ios > 1 ? 1 : 0;
627 : 8251 : stream->ios_in_progress = 1;
628 : 8251 : stream->ios[0].buffer_index = oldest_buffer_index;
629 : 8251 : stream->seq_blocknum = next_blocknum + 1;
630 : : }
631 : : else
632 : : {
633 : : /* No more blocks, end of stream. */
8 634 : 67220 : stream->distance = 0;
635 : 67220 : stream->oldest_buffer_index = stream->next_buffer_index;
636 : 67220 : stream->pinned_buffers = 0;
637 : : }
638 : :
639 : 75471 : stream->fast_path = false;
11 640 : 75471 : return buffer;
641 : : }
642 : : #endif
643 : :
644 [ + + ]: 2264029 : if (unlikely(stream->pinned_buffers == 0))
645 : : {
646 [ - + ]: 1915347 : Assert(stream->oldest_buffer_index == stream->next_buffer_index);
647 : :
648 : : /* End of stream reached? */
649 [ + + ]: 1915347 : if (stream->distance == 0)
650 : 1104303 : return InvalidBuffer;
651 : :
652 : : /*
653 : : * The usual order of operations is that we look ahead at the bottom
654 : : * of this function after potentially finishing an I/O and making
655 : : * space for more, but if we're just starting up we'll need to crank
656 : : * the handle to get started.
657 : : */
658 : 811044 : read_stream_look_ahead(stream, true);
659 : :
660 : : /* End of stream reached? */
661 [ + + ]: 811044 : if (stream->pinned_buffers == 0)
662 : : {
663 [ - + ]: 383606 : Assert(stream->distance == 0);
664 : 383606 : return InvalidBuffer;
665 : : }
666 : : }
667 : :
668 : : /* Grab the oldest pinned buffer and associated per-buffer data. */
669 [ - + ]: 776120 : Assert(stream->pinned_buffers > 0);
670 : 776120 : oldest_buffer_index = stream->oldest_buffer_index;
671 [ + - - + ]: 776120 : Assert(oldest_buffer_index >= 0 &&
672 : : oldest_buffer_index < stream->queue_size);
673 : 776120 : buffer = stream->buffers[oldest_buffer_index];
674 [ - + ]: 776120 : if (per_buffer_data)
11 tmunro@postgresql.or 675 :UNC 0 : *per_buffer_data = get_per_buffer_data(stream, oldest_buffer_index);
676 : :
11 tmunro@postgresql.or 677 [ - + ]:GNC 776120 : Assert(BufferIsValid(buffer));
678 : :
679 : : /* Do we have to wait for an associated I/O first? */
680 [ + + ]: 776120 : if (stream->ios_in_progress > 0 &&
681 [ + + ]: 259753 : stream->ios[stream->oldest_io_index].buffer_index == oldest_buffer_index)
682 : : {
683 : 254272 : int16 io_index = stream->oldest_io_index;
684 : : int16 distance;
685 : :
686 : : /* Sanity check that we still agree on the buffers. */
687 [ - + ]: 254272 : Assert(stream->ios[io_index].op.buffers ==
688 : : &stream->buffers[oldest_buffer_index]);
689 : :
690 : 254272 : WaitReadBuffers(&stream->ios[io_index].op);
691 : :
692 [ - + ]: 254272 : Assert(stream->ios_in_progress > 0);
693 : 254272 : stream->ios_in_progress--;
694 [ + + ]: 254272 : if (++stream->oldest_io_index == stream->max_ios)
695 : 240952 : stream->oldest_io_index = 0;
696 : :
697 [ + + ]: 254272 : if (stream->ios[io_index].op.flags & READ_BUFFERS_ISSUE_ADVICE)
698 : : {
699 : : /* Distance ramps up fast (behavior C). */
700 : 84 : distance = stream->distance * 2;
701 : 84 : distance = Min(distance, stream->max_pinned_buffers);
702 : 84 : stream->distance = distance;
703 : : }
704 : : else
705 : : {
706 : : /* No advice; move towards io_combine_limit (behavior B). */
707 [ - + ]: 254188 : if (stream->distance > io_combine_limit)
708 : : {
11 tmunro@postgresql.or 709 :UNC 0 : stream->distance--;
710 : : }
711 : : else
712 : : {
11 tmunro@postgresql.or 713 :GNC 254188 : distance = stream->distance * 2;
714 : 254188 : distance = Min(distance, io_combine_limit);
715 : 254188 : distance = Min(distance, stream->max_pinned_buffers);
716 : 254188 : stream->distance = distance;
717 : : }
718 : : }
719 : : }
720 : :
721 : : #ifdef CLOBBER_FREED_MEMORY
722 : : /* Clobber old buffer and per-buffer data for debugging purposes. */
723 : 776120 : stream->buffers[oldest_buffer_index] = InvalidBuffer;
724 : :
725 : : /*
726 : : * The caller will get access to the per-buffer data, until the next call.
727 : : * We wipe the one before, which is never occupied because queue_size
728 : : * allowed one extra element. This will hopefully trip up client code
729 : : * that is holding a dangling pointer to it.
730 : : */
731 [ - + ]: 776120 : if (stream->per_buffer_data)
11 tmunro@postgresql.or 732 [ # # ]:UNC 0 : wipe_mem(get_per_buffer_data(stream,
733 : : oldest_buffer_index == 0 ?
734 : 0 : stream->queue_size - 1 :
735 : 0 : oldest_buffer_index - 1),
736 : : stream->per_buffer_data_size);
737 : : #endif
738 : :
739 : : /* Pin transferred to caller. */
11 tmunro@postgresql.or 740 [ - + ]:GNC 776120 : Assert(stream->pinned_buffers > 0);
741 : 776120 : stream->pinned_buffers--;
742 : :
743 : : /* Advance oldest buffer, with wrap-around. */
744 : 776120 : stream->oldest_buffer_index++;
745 [ + + ]: 776120 : if (stream->oldest_buffer_index == stream->queue_size)
746 : 141420 : stream->oldest_buffer_index = 0;
747 : :
748 : : /* Prepare for the next call. */
749 : 776120 : read_stream_look_ahead(stream, false);
750 : :
751 : : #ifndef READ_STREAM_DISABLE_FAST_PATH
752 : : /* See if we can take the fast path for all-cached scans next time. */
753 [ + + ]: 776120 : if (stream->ios_in_progress == 0 &&
754 [ + + ]: 545989 : stream->pinned_buffers == 1 &&
755 [ + + ]: 158826 : stream->distance == 1 &&
8 756 [ + + ]: 151582 : stream->pending_read_nblocks == 0 &&
11 757 [ + - ]: 151432 : stream->per_buffer_data_size == 0)
758 : : {
759 : 151432 : stream->fast_path = true;
760 : : }
761 : : #endif
762 : :
763 : 776120 : return buffer;
764 : : }
765 : :
766 : : /*
767 : : * Reset a read stream by releasing any queued up buffers, allowing the stream
768 : : * to be used again for different blocks. This can be used to clear an
769 : : * end-of-stream condition and start again, or to throw away blocks that were
770 : : * speculatively read and read some different blocks instead.
771 : : */
772 : : void
773 : 810476 : read_stream_reset(ReadStream *stream)
774 : : {
775 : : Buffer buffer;
776 : :
777 : : /* Stop looking ahead. */
778 : 810476 : stream->distance = 0;
779 : :
780 : : /* Forget buffered block numbers and fast path state. */
8 781 : 810476 : stream->blocknums_next = 0;
782 : 810476 : stream->blocknums_count = 0;
783 : 810476 : stream->fast_path = false;
784 : :
785 : : /* Unpin anything that wasn't consumed. */
11 786 [ + + ]: 901220 : while ((buffer = read_stream_next_buffer(stream, NULL)) != InvalidBuffer)
787 : 90744 : ReleaseBuffer(buffer);
788 : :
789 [ - + ]: 810476 : Assert(stream->pinned_buffers == 0);
790 [ - + ]: 810476 : Assert(stream->ios_in_progress == 0);
791 : :
792 : : /* Start off assuming data is cached. */
793 : 810476 : stream->distance = 1;
794 : 810476 : }
795 : :
796 : : /*
797 : : * Release and free a read stream.
798 : : */
799 : : void
800 : 312912 : read_stream_end(ReadStream *stream)
801 : : {
802 : 312912 : read_stream_reset(stream);
803 : 312912 : pfree(stream);
804 : 312912 : }
|