Age Owner TLA Line data Source code
1 : /*-------------------------------------------------------------------------
2 : *
3 : * logtape.c
4 : * Management of "logical tapes" within temporary files.
5 : *
6 : * This module exists to support sorting via multiple merge passes (see
7 : * tuplesort.c). Merging is an ideal algorithm for tape devices, but if
8 : * we implement it on disk by creating a separate file for each "tape",
9 : * there is an annoying problem: the peak space usage is at least twice
10 : * the volume of actual data to be sorted. (This must be so because each
11 : * datum will appear in both the input and output tapes of the final
12 : * merge pass.)
13 : *
14 : * We can work around this problem by recognizing that any one tape
15 : * dataset (with the possible exception of the final output) is written
16 : * and read exactly once in a perfectly sequential manner. Therefore,
17 : * a datum once read will not be required again, and we can recycle its
18 : * space for use by the new tape dataset(s) being generated. In this way,
19 : * the total space usage is essentially just the actual data volume, plus
20 : * insignificant bookkeeping and start/stop overhead.
21 : *
22 : * Few OSes allow arbitrary parts of a file to be released back to the OS,
23 : * so we have to implement this space-recycling ourselves within a single
24 : * logical file. logtape.c exists to perform this bookkeeping and provide
25 : * the illusion of N independent tape devices to tuplesort.c. Note that
26 : * logtape.c itself depends on buffile.c to provide a "logical file" of
27 : * larger size than the underlying OS may support.
28 : *
29 : * For simplicity, we allocate and release space in the underlying file
30 : * in BLCKSZ-size blocks. Space allocation boils down to keeping track
31 : * of which blocks in the underlying file belong to which logical tape,
32 : * plus any blocks that are free (recycled and not yet reused).
33 : * The blocks in each logical tape form a chain, with a prev- and next-
34 : * pointer in each block.
35 : *
36 : * The initial write pass is guaranteed to fill the underlying file
37 : * perfectly sequentially, no matter how data is divided into logical tapes.
38 : * Once we begin merge passes, the access pattern becomes considerably
39 : * less predictable --- but the seeking involved should be comparable to
40 : * what would happen if we kept each logical tape in a separate file,
41 : * so there's no serious performance penalty paid to obtain the space
42 : * savings of recycling. We try to localize the write accesses by always
43 : * writing to the lowest-numbered free block when we have a choice; it's
44 : * not clear this helps much, but it can't hurt. (XXX perhaps a LIFO
45 : * policy for free blocks would be better?)
46 : *
47 : * To further make the I/Os more sequential, we can use a larger buffer
48 : * when reading, and read multiple blocks from the same tape in one go,
49 : * whenever the buffer becomes empty.
50 : *
51 : * To support the above policy of writing to the lowest free block, the
52 : * freelist is a min heap.
53 : *
54 : * Since all the bookkeeping and buffer memory is allocated with palloc(),
55 : * and the underlying file(s) are made with OpenTemporaryFile, all resources
56 : * for a logical tape set are certain to be cleaned up even if processing
57 : * is aborted by ereport(ERROR). To avoid confusion, the caller should take
58 : * care that all calls for a single LogicalTapeSet are made in the same
59 : * palloc context.
60 : *
61 : * To support parallel sort operations involving coordinated callers to
62 : * tuplesort.c routines across multiple workers, it is necessary to
63 : * concatenate each worker BufFile/tapeset into one single logical tapeset
64 : * managed by the leader. Workers should have produced one final
65 : * materialized tape (their entire output) when this happens in leader.
66 : * There will always be the same number of runs as input tapes, and the same
67 : * number of input tapes as participants (worker Tuplesortstates).
68 : *
69 : * Portions Copyright (c) 1996-2023, PostgreSQL Global Development Group
70 : * Portions Copyright (c) 1994, Regents of the University of California
71 : *
72 : * IDENTIFICATION
73 : * src/backend/utils/sort/logtape.c
74 : *
75 : *-------------------------------------------------------------------------
76 : */
77 :
78 : #include "postgres.h"
79 :
80 : #include <fcntl.h>
81 :
82 : #include "storage/buffile.h"
83 : #include "utils/builtins.h"
84 : #include "utils/logtape.h"
85 : #include "utils/memdebug.h"
86 : #include "utils/memutils.h"
87 :
88 : /*
89 : * A TapeBlockTrailer is stored at the end of each BLCKSZ block.
90 : *
91 : * The first block of a tape has prev == -1. The last block of a tape
92 : * stores the number of valid bytes on the block, inverted, in 'next'
93 : * Therefore next < 0 indicates the last block.
94 : */
95 : typedef struct TapeBlockTrailer
96 : {
97 : long prev; /* previous block on this tape, or -1 on first
98 : * block */
99 : long next; /* next block on this tape, or # of valid
100 : * bytes on last block (if < 0) */
101 : } TapeBlockTrailer;
102 :
103 : #define TapeBlockPayloadSize (BLCKSZ - sizeof(TapeBlockTrailer))
104 : #define TapeBlockGetTrailer(buf) \
105 : ((TapeBlockTrailer *) ((char *) buf + TapeBlockPayloadSize))
106 :
107 : #define TapeBlockIsLast(buf) (TapeBlockGetTrailer(buf)->next < 0)
108 : #define TapeBlockGetNBytes(buf) \
109 : (TapeBlockIsLast(buf) ? \
110 : (- TapeBlockGetTrailer(buf)->next) : TapeBlockPayloadSize)
111 : #define TapeBlockSetNBytes(buf, nbytes) \
112 : (TapeBlockGetTrailer(buf)->next = -(nbytes))
113 :
114 : /*
115 : * When multiple tapes are being written to concurrently (as in HashAgg),
116 : * avoid excessive fragmentation by preallocating block numbers to individual
117 : * tapes. Each preallocation doubles in size starting at
118 : * TAPE_WRITE_PREALLOC_MIN blocks up to TAPE_WRITE_PREALLOC_MAX blocks.
119 : *
120 : * No filesystem operations are performed for preallocation; only the block
121 : * numbers are reserved. This may lead to sparse writes, which will cause
122 : * ltsWriteBlock() to fill in holes with zeros.
123 : */
124 : #define TAPE_WRITE_PREALLOC_MIN 8
125 : #define TAPE_WRITE_PREALLOC_MAX 128
126 :
127 : /*
128 : * This data structure represents a single "logical tape" within the set
129 : * of logical tapes stored in the same file.
130 : *
131 : * While writing, we hold the current partially-written data block in the
132 : * buffer. While reading, we can hold multiple blocks in the buffer. Note
133 : * that we don't retain the trailers of a block when it's read into the
134 : * buffer. The buffer therefore contains one large contiguous chunk of data
135 : * from the tape.
136 : */
137 : struct LogicalTape
138 : {
139 : LogicalTapeSet *tapeSet; /* tape set this tape is part of */
140 :
141 : bool writing; /* T while in write phase */
142 : bool frozen; /* T if blocks should not be freed when read */
143 : bool dirty; /* does buffer need to be written? */
144 :
145 : /*
146 : * Block numbers of the first, current, and next block of the tape.
147 : *
148 : * The "current" block number is only valid when writing, or reading from
149 : * a frozen tape. (When reading from an unfrozen tape, we use a larger
150 : * read buffer that holds multiple blocks, so the "current" block is
151 : * ambiguous.)
152 : *
153 : * When concatenation of worker tape BufFiles is performed, an offset to
154 : * the first block in the unified BufFile space is applied during reads.
155 : */
156 : long firstBlockNumber;
157 : long curBlockNumber;
158 : long nextBlockNumber;
159 : long offsetBlockNumber;
160 :
161 : /*
162 : * Buffer for current data block(s).
163 : */
164 : char *buffer; /* physical buffer (separately palloc'd) */
165 : int buffer_size; /* allocated size of the buffer */
166 : int max_size; /* highest useful, safe buffer_size */
167 : int pos; /* next read/write position in buffer */
168 : int nbytes; /* total # of valid bytes in buffer */
169 :
170 : /*
171 : * Preallocated block numbers are held in an array sorted in descending
172 : * order; blocks are consumed from the end of the array (lowest block
173 : * numbers first).
174 : */
175 : long *prealloc;
176 : int nprealloc; /* number of elements in list */
177 : int prealloc_size; /* number of elements list can hold */
178 : };
179 :
180 : /*
181 : * This data structure represents a set of related "logical tapes" sharing
182 : * space in a single underlying file. (But that "file" may be multiple files
183 : * if needed to escape OS limits on file size; buffile.c handles that for us.)
184 : * Tapes belonging to a tape set can be created and destroyed on-the-fly, on
185 : * demand.
186 : */
187 : struct LogicalTapeSet
188 : {
189 : BufFile *pfile; /* underlying file for whole tape set */
190 : SharedFileSet *fileset;
191 : int worker; /* worker # if shared, -1 for leader/serial */
192 :
193 : /*
194 : * File size tracking. nBlocksWritten is the size of the underlying file,
195 : * in BLCKSZ blocks. nBlocksAllocated is the number of blocks allocated
196 : * by ltsReleaseBlock(), and it is always greater than or equal to
197 : * nBlocksWritten. Blocks between nBlocksAllocated and nBlocksWritten are
198 : * blocks that have been allocated for a tape, but have not been written
199 : * to the underlying file yet. nHoleBlocks tracks the total number of
200 : * blocks that are in unused holes between worker spaces following BufFile
201 : * concatenation.
202 : */
203 : long nBlocksAllocated; /* # of blocks allocated */
204 : long nBlocksWritten; /* # of blocks used in underlying file */
205 : long nHoleBlocks; /* # of "hole" blocks left */
206 :
207 : /*
208 : * We store the numbers of recycled-and-available blocks in freeBlocks[].
209 : * When there are no such blocks, we extend the underlying file.
210 : *
211 : * If forgetFreeSpace is true then any freed blocks are simply forgotten
212 : * rather than being remembered in freeBlocks[]. See notes for
213 : * LogicalTapeSetForgetFreeSpace().
214 : */
215 : bool forgetFreeSpace; /* are we remembering free blocks? */
216 : long *freeBlocks; /* resizable array holding minheap */
217 : long nFreeBlocks; /* # of currently free blocks */
218 : Size freeBlocksLen; /* current allocated length of freeBlocks[] */
219 : bool enable_prealloc; /* preallocate write blocks? */
220 : };
221 :
222 : static LogicalTape *ltsCreateTape(LogicalTapeSet *lts);
223 : static void ltsWriteBlock(LogicalTapeSet *lts, long blocknum, const void *buffer);
224 : static void ltsReadBlock(LogicalTapeSet *lts, long blocknum, void *buffer);
225 : static long ltsGetBlock(LogicalTapeSet *lts, LogicalTape *lt);
226 : static long ltsGetFreeBlock(LogicalTapeSet *lts);
227 : static long ltsGetPreallocBlock(LogicalTapeSet *lts, LogicalTape *lt);
228 : static void ltsReleaseBlock(LogicalTapeSet *lts, long blocknum);
229 : static void ltsInitReadBuffer(LogicalTape *lt);
230 :
231 :
232 : /*
233 : * Write a block-sized buffer to the specified block of the underlying file.
234 : *
235 : * No need for an error return convention; we ereport() on any error.
236 : */
237 : static void
100 peter 238 GNC 28343 : ltsWriteBlock(LogicalTapeSet *lts, long blocknum, const void *buffer)
239 : {
240 : /*
241 : * BufFile does not support "holes", so if we're about to write a block
242 : * that's past the current end of file, fill the space between the current
243 : * end of file and the target block with zeros.
244 : *
245 : * This can happen either when tapes preallocate blocks; or for the last
246 : * block of a tape which might not have been flushed.
247 : *
248 : * Note that BufFile concatenation can leave "holes" in BufFile between
249 : * worker-owned block ranges. These are tracked for reporting purposes
250 : * only. We never read from nor write to these hole blocks, and so they
251 : * are not considered here.
252 : */
2258 heikki.linnakangas 253 CBC 30569 : while (blocknum > lts->nBlocksWritten)
254 : {
255 : PGIOAlignedBlock zerobuf;
256 :
1681 tgl 257 2226 : MemSet(zerobuf.data, 0, sizeof(zerobuf));
258 :
259 2226 : ltsWriteBlock(lts, lts->nBlocksWritten, zerobuf.data);
260 : }
261 :
262 : /* Write the requested block */
1027 tmunro 263 28343 : if (BufFileSeekBlock(lts->pfile, blocknum) != 0)
7198 tgl 264 UBC 0 : ereport(ERROR,
265 : (errcode_for_file_access(),
266 : errmsg("could not seek to block %ld of temporary file",
267 : blocknum)));
1027 tmunro 268 CBC 28343 : BufFileWrite(lts->pfile, buffer, BLCKSZ);
269 :
270 : /* Update nBlocksWritten, if we extended the file */
2258 heikki.linnakangas 271 28343 : if (blocknum == lts->nBlocksWritten)
272 9552 : lts->nBlocksWritten++;
8576 tgl 273 28343 : }
274 :
275 : /*
276 : * Read a block-sized buffer from the specified block of the underlying file.
277 : *
278 : * No need for an error return convention; we ereport() on any error. This
279 : * module should never attempt to read a block it doesn't know is there.
280 : */
281 : static void
282 25938 : ltsReadBlock(LogicalTapeSet *lts, long blocknum, void *buffer)
283 : {
1027 tmunro 284 GIC 25938 : if (BufFileSeekBlock(lts->pfile, blocknum) != 0)
7198 tgl 285 UIC 0 : ereport(ERROR,
286 : (errcode_for_file_access(),
1027 tmunro 287 ECB : errmsg("could not seek to block %ld of temporary file",
7198 tgl 288 : blocknum)));
83 peter 289 GNC 25938 : BufFileReadExact(lts->pfile, buffer, BLCKSZ);
8576 tgl 290 GIC 25938 : }
8576 tgl 291 ECB :
292 : /*
2379 heikki.linnakangas 293 : * Read as many blocks as we can into the per-tape buffer.
294 : *
295 : * Returns true if anything was read, 'false' on EOF.
296 : */
297 : static bool
538 heikki.linnakangas 298 CBC 31836 : ltsReadFillBuffer(LogicalTape *lt)
2379 heikki.linnakangas 299 ECB : {
2379 heikki.linnakangas 300 GIC 31836 : lt->pos = 0;
301 31836 : lt->nbytes = 0;
2379 heikki.linnakangas 302 ECB :
303 : do
304 : {
2299 heikki.linnakangas 305 CBC 39190 : char *thisbuf = lt->buffer + lt->nbytes;
1892 rhaas 306 GIC 39190 : long datablocknum = lt->nextBlockNumber;
307 :
2299 heikki.linnakangas 308 ECB : /* Fetch next block number */
1892 rhaas 309 CBC 39190 : if (datablocknum == -1L)
2299 heikki.linnakangas 310 13506 : break; /* EOF */
1892 rhaas 311 ECB : /* Apply worker offset, needed for leader tapesets */
1892 rhaas 312 GIC 25684 : datablocknum += lt->offsetBlockNumber;
2379 heikki.linnakangas 313 ECB :
314 : /* Read the block */
100 peter 315 GNC 25684 : ltsReadBlock(lt->tapeSet, datablocknum, thisbuf);
2379 heikki.linnakangas 316 CBC 25684 : if (!lt->frozen)
538 heikki.linnakangas 317 GIC 25333 : ltsReleaseBlock(lt->tapeSet, datablocknum);
2299 heikki.linnakangas 318 CBC 25684 : lt->curBlockNumber = lt->nextBlockNumber;
319 :
2299 heikki.linnakangas 320 GIC 25684 : lt->nbytes += TapeBlockGetNBytes(thisbuf);
2299 heikki.linnakangas 321 CBC 25684 : if (TapeBlockIsLast(thisbuf))
322 : {
2299 heikki.linnakangas 323 GIC 14106 : lt->nextBlockNumber = -1L;
2379 heikki.linnakangas 324 ECB : /* EOF */
2379 heikki.linnakangas 325 GIC 14106 : break;
2379 heikki.linnakangas 326 ECB : }
327 : else
2299 heikki.linnakangas 328 GIC 11578 : lt->nextBlockNumber = TapeBlockGetTrailer(thisbuf)->next;
329 :
2379 heikki.linnakangas 330 ECB : /* Advance to next block, if we have buffer space left */
2299 heikki.linnakangas 331 GIC 11578 : } while (lt->buffer_size - lt->nbytes > BLCKSZ);
2379 heikki.linnakangas 332 ECB :
2379 heikki.linnakangas 333 GIC 31836 : return (lt->nbytes > 0);
334 : }
335 :
1158 jdavis 336 ECB : static inline unsigned long
1158 jdavis 337 GIC 773128 : left_offset(unsigned long i)
1158 jdavis 338 ECB : {
1158 jdavis 339 GIC 773128 : return 2 * i + 1;
340 : }
341 :
1158 jdavis 342 ECB : static inline unsigned long
481 tgl 343 GIC 773128 : right_offset(unsigned long i)
1158 jdavis 344 ECB : {
1158 jdavis 345 GIC 773128 : return 2 * i + 2;
346 : }
347 :
348 : static inline unsigned long
349 479780 : parent_offset(unsigned long i)
350 : {
1158 jdavis 351 CBC 479780 : return (i - 1) / 2;
352 : }
6242 tgl 353 ECB :
8576 354 : /*
355 : * Get the next block for writing.
940 jdavis 356 : */
357 : static long
940 jdavis 358 GIC 26117 : ltsGetBlock(LogicalTapeSet *lts, LogicalTape *lt)
359 : {
360 26117 : if (lts->enable_prealloc)
361 14037 : return ltsGetPreallocBlock(lts, lt);
362 : else
363 12080 : return ltsGetFreeBlock(lts);
940 jdavis 364 ECB : }
365 :
366 : /*
367 : * Select the lowest currently unused block from the tape set's global free
368 : * list min heap.
369 : */
370 : static long
8576 tgl 371 GIC 120320 : ltsGetFreeBlock(LogicalTapeSet *lts)
372 : {
1060 tgl 373 CBC 120320 : long *heap = lts->freeBlocks;
1060 tgl 374 ECB : long blocknum;
375 : int heapsize;
376 : long holeval;
481 377 : unsigned long holepos;
378 :
1158 jdavis 379 : /* freelist empty; allocate a new block */
1158 jdavis 380 CBC 120320 : if (lts->nFreeBlocks == 0)
1158 jdavis 381 GIC 9780 : return lts->nBlocksAllocated++;
382 :
383 : /* easy if heap contains one element */
1158 jdavis 384 CBC 110540 : if (lts->nFreeBlocks == 1)
385 : {
1158 jdavis 386 GIC 480 : lts->nFreeBlocks--;
1158 jdavis 387 CBC 480 : return lts->freeBlocks[0];
388 : }
389 :
481 tgl 390 ECB : /* remove top of minheap */
1158 jdavis 391 CBC 110060 : blocknum = heap[0];
392 :
481 tgl 393 ECB : /* we'll replace it with end of minheap array */
481 tgl 394 CBC 110060 : holeval = heap[--lts->nFreeBlocks];
1158 jdavis 395 ECB :
396 : /* sift down */
481 tgl 397 GIC 110060 : holepos = 0; /* holepos is where the "hole" is */
1158 jdavis 398 CBC 110060 : heapsize = lts->nFreeBlocks;
1158 jdavis 399 ECB : while (true)
1158 jdavis 400 CBC 663068 : {
481 tgl 401 773128 : unsigned long left = left_offset(holepos);
402 773128 : unsigned long right = right_offset(holepos);
1158 jdavis 403 EUB : unsigned long min_child;
404 :
1158 jdavis 405 CBC 773128 : if (left < heapsize && right < heapsize)
1158 jdavis 406 GIC 668982 : min_child = (heap[left] < heap[right]) ? left : right;
1158 jdavis 407 CBC 104146 : else if (left < heapsize)
408 22255 : min_child = left;
1158 jdavis 409 GIC 81891 : else if (right < heapsize)
1158 jdavis 410 LBC 0 : min_child = right;
1158 jdavis 411 ECB : else
1158 jdavis 412 GIC 81891 : break;
1158 jdavis 413 ECB :
481 tgl 414 GIC 691237 : if (heap[min_child] >= holeval)
1158 jdavis 415 CBC 28169 : break;
416 :
481 tgl 417 GIC 663068 : heap[holepos] = heap[min_child];
418 663068 : holepos = min_child;
419 : }
420 110060 : heap[holepos] = holeval;
421 :
1158 jdavis 422 110060 : return blocknum;
423 : }
8576 tgl 424 ECB :
425 : /*
426 : * Return the lowest free block number from the tape's preallocation list.
940 jdavis 427 : * Refill the preallocation list with blocks from the tape set's free list if
428 : * necessary.
429 : */
1048 430 : static long
1048 jdavis 431 GIC 14037 : ltsGetPreallocBlock(LogicalTapeSet *lts, LogicalTape *lt)
1048 jdavis 432 ECB : {
433 : /* sorted in descending order, so return the last element */
1048 jdavis 434 GIC 14037 : if (lt->nprealloc > 0)
1048 jdavis 435 CBC 519 : return lt->prealloc[--lt->nprealloc];
436 :
1048 jdavis 437 GIC 13518 : if (lt->prealloc == NULL)
1048 jdavis 438 ECB : {
1048 jdavis 439 CBC 13506 : lt->prealloc_size = TAPE_WRITE_PREALLOC_MIN;
1048 jdavis 440 GBC 13506 : lt->prealloc = (long *) palloc(sizeof(long) * lt->prealloc_size);
1048 jdavis 441 ECB : }
1048 jdavis 442 CBC 12 : else if (lt->prealloc_size < TAPE_WRITE_PREALLOC_MAX)
443 : {
444 : /* when the preallocation list runs out, double the size */
1048 jdavis 445 GIC 12 : lt->prealloc_size *= 2;
1048 jdavis 446 CBC 12 : if (lt->prealloc_size > TAPE_WRITE_PREALLOC_MAX)
1048 jdavis 447 LBC 0 : lt->prealloc_size = TAPE_WRITE_PREALLOC_MAX;
1048 jdavis 448 GIC 12 : lt->prealloc = (long *) repalloc(lt->prealloc,
1048 jdavis 449 CBC 12 : sizeof(long) * lt->prealloc_size);
450 : }
451 :
1048 jdavis 452 ECB : /* refill preallocation list */
1048 jdavis 453 GIC 13518 : lt->nprealloc = lt->prealloc_size;
454 121758 : for (int i = lt->nprealloc; i > 0; i--)
1048 jdavis 455 ECB : {
1048 jdavis 456 GIC 108240 : lt->prealloc[i - 1] = ltsGetFreeBlock(lts);
457 :
458 : /* verify descending order */
459 108240 : Assert(i == lt->nprealloc || lt->prealloc[i - 1] > lt->prealloc[i]);
460 : }
461 :
1048 jdavis 462 CBC 13518 : return lt->prealloc[--lt->nprealloc];
463 : }
464 :
465 : /*
466 : * Return a block# to the freelist.
467 : */
468 : static void
8576 tgl 469 GIC 119536 : ltsReleaseBlock(LogicalTapeSet *lts, long blocknum)
8576 tgl 470 ECB : {
1060 471 : long *heap;
472 : unsigned long holepos;
473 :
474 : /*
475 : * Do nothing if we're no longer interested in remembering free space.
6242 476 : */
6242 tgl 477 GIC 119536 : if (lts->forgetFreeSpace)
478 6784 : return;
479 :
480 : /*
481 : * Enlarge freeBlocks array if full.
8576 tgl 482 ECB : */
8576 tgl 483 GBC 112752 : if (lts->nFreeBlocks >= lts->freeBlocksLen)
484 : {
1158 jdavis 485 ECB : /*
486 : * If the freelist becomes very large, just return and leak this free
487 : * block.
488 : */
947 jdavis 489 GIC 36 : if (lts->freeBlocksLen * 2 * sizeof(long) > MaxAllocSize)
1158 jdavis 490 UIC 0 : return;
1158 jdavis 491 ECB :
8576 tgl 492 CBC 36 : lts->freeBlocksLen *= 2;
493 36 : lts->freeBlocks = (long *) repalloc(lts->freeBlocks,
2118 tgl 494 GIC 36 : lts->freeBlocksLen * sizeof(long));
495 : }
8397 bruce 496 ECB :
497 : /* create a "hole" at end of minheap array */
1158 jdavis 498 CBC 112752 : heap = lts->freeBlocks;
481 tgl 499 GIC 112752 : holepos = lts->nFreeBlocks;
1158 jdavis 500 CBC 112752 : lts->nFreeBlocks++;
1158 jdavis 501 ECB :
502 : /* sift up to insert blocknum */
481 tgl 503 CBC 497598 : while (holepos != 0)
1158 jdavis 504 ECB : {
481 tgl 505 GIC 479780 : unsigned long parent = parent_offset(holepos);
1060 tgl 506 ECB :
481 tgl 507 GIC 479780 : if (heap[parent] < blocknum)
1158 jdavis 508 94934 : break;
509 :
481 tgl 510 384846 : heap[holepos] = heap[parent];
511 384846 : holepos = parent;
512 : }
513 112752 : heap[holepos] = blocknum;
514 : }
8576 tgl 515 ECB :
516 : /*
1151 jdavis 517 : * Lazily allocate and initialize the read buffer. This avoids waste when many
518 : * tapes are open at once, but not all are active between rewinding and
519 : * reading.
520 : */
521 : static void
538 heikki.linnakangas 522 CBC 14109 : ltsInitReadBuffer(LogicalTape *lt)
1151 jdavis 523 ECB : {
1146 jdavis 524 CBC 14109 : Assert(lt->buffer_size > 0);
525 14109 : lt->buffer = palloc(lt->buffer_size);
526 :
527 : /* Read the first block, or reset if tape is empty */
1151 jdavis 528 GIC 14109 : lt->nextBlockNumber = lt->firstBlockNumber;
529 14109 : lt->pos = 0;
530 14109 : lt->nbytes = 0;
538 heikki.linnakangas 531 14109 : ltsReadFillBuffer(lt);
1151 jdavis 532 14109 : }
533 :
534 : /*
535 : * Create a tape set, backed by a temporary underlying file.
536 : *
537 : * The tape set is initially empty. Use LogicalTapeCreate() to create
538 : * tapes in it.
539 : *
540 : * In a single-process sort, pass NULL argument for fileset, and -1 for
541 : * worker.
542 : *
543 : * In a parallel sort, parallel workers pass the shared fileset handle and
544 : * their own worker number. After the workers have finished, create the
545 : * tape set in the leader, passing the shared fileset handle and -1 for
546 : * worker, and use LogicalTapeImport() to import the worker tapes into it.
547 : *
548 : * Currently, the leader will only import worker tapes into the set, it does
538 heikki.linnakangas 549 ECB : * not create tapes of its own, although in principle that should work.
550 : *
551 : * If preallocate is true, blocks for each individual tape are allocated in
552 : * batches. This avoids fragmentation when writing multiple tapes at the
553 : * same time.
554 : */
555 : LogicalTapeSet *
538 heikki.linnakangas 556 CBC 367 : LogicalTapeSetCreate(bool preallocate, SharedFileSet *fileset, int worker)
8576 tgl 557 ECB : {
558 : LogicalTapeSet *lts;
559 :
560 : /*
2969 561 : * Create top-level struct including per-tape LogicalTape structs.
8576 562 : */
1129 jdavis 563 CBC 367 : lts = (LogicalTapeSet *) palloc(sizeof(LogicalTapeSet));
2258 heikki.linnakangas 564 367 : lts->nBlocksAllocated = 0L;
2258 heikki.linnakangas 565 GIC 367 : lts->nBlocksWritten = 0L;
1892 rhaas 566 CBC 367 : lts->nHoleBlocks = 0L;
6242 tgl 567 367 : lts->forgetFreeSpace = false;
8576 tgl 568 GIC 367 : lts->freeBlocksLen = 32; /* reasonable initial guess */
569 367 : lts->freeBlocks = (long *) palloc(lts->freeBlocksLen * sizeof(long));
570 367 : lts->nFreeBlocks = 0;
940 jdavis 571 367 : lts->enable_prealloc = preallocate;
572 :
538 heikki.linnakangas 573 367 : lts->fileset = fileset;
574 367 : lts->worker = worker;
575 :
576 : /*
577 : * Create temp BufFile storage as required.
1892 rhaas 578 ECB : *
538 heikki.linnakangas 579 : * In leader, we hijack the BufFile of the first tape that's imported, and
580 : * concatenate the BufFiles of any subsequent tapes to that. Hence don't
581 : * create a BufFile here. Things are simpler for the worker case and the
582 : * serial case, though. They are generally very similar -- workers use a
583 : * shared fileset, whereas serial sorts use a conventional serial BufFile.
1892 rhaas 584 : */
538 heikki.linnakangas 585 CBC 367 : if (fileset && worker == -1)
538 heikki.linnakangas 586 GIC 71 : lts->pfile = NULL;
1892 rhaas 587 296 : else if (fileset)
1892 rhaas 588 ECB : {
589 : char filename[MAXPGPATH];
590 :
1892 rhaas 591 GIC 206 : pg_itoa(worker, filename);
587 akapila 592 206 : lts->pfile = BufFileCreateFileSet(&fileset->fs, filename);
593 : }
594 : else
1892 rhaas 595 90 : lts->pfile = BufFileCreateTemp(false);
596 :
8576 tgl 597 367 : return lts;
598 : }
599 :
600 : /*
601 : * Claim ownership of a logical tape from an existing shared BufFile.
538 heikki.linnakangas 602 ECB : *
603 : * Caller should be leader process. Though tapes are marked as frozen in
604 : * workers, they are not frozen when opened within leader, since unfrozen tapes
605 : * use a larger read buffer. (Frozen tapes have smaller read buffer, optimized
606 : * for random access.)
607 : */
608 : LogicalTape *
538 heikki.linnakangas 609 GIC 142 : LogicalTapeImport(LogicalTapeSet *lts, int worker, TapeShare *shared)
8576 tgl 610 ECB : {
611 : LogicalTape *lt;
612 : long tapeblocks;
613 : char filename[MAXPGPATH];
614 : BufFile *file;
615 : int64 filesize;
616 :
538 heikki.linnakangas 617 CBC 142 : lt = ltsCreateTape(lts);
538 heikki.linnakangas 618 ECB :
619 : /*
620 : * build concatenated view of all buffiles, remembering the block number
621 : * where each source file begins.
622 : */
538 heikki.linnakangas 623 GIC 142 : pg_itoa(worker, filename);
538 heikki.linnakangas 624 CBC 142 : file = BufFileOpenFileSet(<s->fileset->fs, filename, O_RDONLY, false);
625 142 : filesize = BufFileSize(file);
626 :
538 heikki.linnakangas 627 ECB : /*
628 : * Stash first BufFile, and concatenate subsequent BufFiles to that. Store
629 : * block offset into each tape as we go.
630 : */
538 heikki.linnakangas 631 GIC 142 : lt->firstBlockNumber = shared->firstblocknumber;
538 heikki.linnakangas 632 CBC 142 : if (lts->pfile == NULL)
633 : {
538 heikki.linnakangas 634 GIC 71 : lts->pfile = file;
538 heikki.linnakangas 635 CBC 71 : lt->offsetBlockNumber = 0L;
8576 tgl 636 ECB : }
637 : else
638 : {
538 heikki.linnakangas 639 GIC 71 : lt->offsetBlockNumber = BufFileAppend(lts->pfile, file);
640 : }
641 : /* Don't allocate more for read buffer than could possibly help */
642 142 : lt->max_size = Min(MaxAllocSize, filesize);
643 142 : tapeblocks = filesize / BLCKSZ;
644 :
538 heikki.linnakangas 645 ECB : /*
646 : * Update # of allocated blocks and # blocks written to reflect the
647 : * imported BufFile. Allocated/written blocks include space used by holes
648 : * left between concatenated BufFiles. Also track the number of hole
649 : * blocks so that we can later work backwards to calculate the number of
650 : * physical blocks for instrumentation.
651 : */
538 heikki.linnakangas 652 GIC 142 : lts->nHoleBlocks += lt->offsetBlockNumber - lts->nBlocksAllocated;
653 :
654 142 : lts->nBlocksAllocated = lt->offsetBlockNumber + tapeblocks;
655 142 : lts->nBlocksWritten = lts->nBlocksAllocated;
656 :
657 142 : return lt;
658 : }
659 :
538 heikki.linnakangas 660 ECB : /*
661 : * Close a logical tape set and release all resources.
662 : *
663 : * NOTE: This doesn't close any of the tapes! You must close them
664 : * first, or you can let them be destroyed along with the memory context.
665 : */
666 : void
538 heikki.linnakangas 667 GIC 367 : LogicalTapeSetClose(LogicalTapeSet *lts)
668 : {
669 367 : BufFileClose(lts->pfile);
8576 tgl 670 367 : pfree(lts->freeBlocks);
671 367 : pfree(lts);
672 367 : }
8576 tgl 673 ECB :
674 : /*
675 : * Create a logical tape in the given tapeset.
676 : *
677 : * The tape is initialized in write state.
678 : */
679 : LogicalTape *
538 heikki.linnakangas 680 GIC 25945 : LogicalTapeCreate(LogicalTapeSet *lts)
681 : {
538 heikki.linnakangas 682 ECB : /*
538 heikki.linnakangas 683 EUB : * The only thing that currently prevents creating new tapes in leader is
684 : * the fact that BufFiles opened using BufFileOpenShared() are read-only
538 heikki.linnakangas 685 ECB : * by definition, but that could be changed if it seemed worthwhile. For
686 : * now, writing to the leader tape will raise a "Bad file descriptor"
687 : * error, so tuplesort must avoid writing to the leader tape altogether.
688 : */
538 heikki.linnakangas 689 CBC 25945 : if (lts->fileset && lts->worker == -1)
538 heikki.linnakangas 690 UIC 0 : elog(ERROR, "cannot create new tapes in leader process");
691 :
538 heikki.linnakangas 692 GIC 25945 : return ltsCreateTape(lts);
693 : }
694 :
695 : static LogicalTape *
538 heikki.linnakangas 696 CBC 26087 : ltsCreateTape(LogicalTapeSet *lts)
538 heikki.linnakangas 697 ECB : {
698 : LogicalTape *lt;
699 :
700 : /*
701 : * Create per-tape struct. Note we allocate the I/O buffer lazily.
702 : */
538 heikki.linnakangas 703 CBC 26087 : lt = palloc(sizeof(LogicalTape));
704 26087 : lt->tapeSet = lts;
705 26087 : lt->writing = true;
706 26087 : lt->frozen = false;
538 heikki.linnakangas 707 GIC 26087 : lt->dirty = false;
538 heikki.linnakangas 708 CBC 26087 : lt->firstBlockNumber = -1L;
709 26087 : lt->curBlockNumber = -1L;
710 26087 : lt->nextBlockNumber = -1L;
711 26087 : lt->offsetBlockNumber = 0L;
712 26087 : lt->buffer = NULL;
713 26087 : lt->buffer_size = 0;
714 : /* palloc() larger than MaxAllocSize would fail */
715 26087 : lt->max_size = MaxAllocSize;
538 heikki.linnakangas 716 GIC 26087 : lt->pos = 0;
717 26087 : lt->nbytes = 0;
718 26087 : lt->prealloc = NULL;
719 26087 : lt->nprealloc = 0;
720 26087 : lt->prealloc_size = 0;
721 :
722 26087 : return lt;
723 : }
724 :
725 : /*
538 heikki.linnakangas 726 ECB : * Close a logical tape.
727 : *
728 : * Note: This doesn't return any blocks to the free list! You must read
729 : * the tape to the end first, to reuse the space. In current use, though,
730 : * we only close tapes after fully reading them.
731 : */
732 : void
538 heikki.linnakangas 733 GIC 13980 : LogicalTapeClose(LogicalTape *lt)
734 : {
735 13980 : if (lt->buffer)
736 13980 : pfree(lt->buffer);
737 13980 : pfree(lt);
738 13980 : }
739 :
740 : /*
741 : * Mark a logical tape set as not needing management of free space anymore.
742 : *
6242 tgl 743 ECB : * This should be called if the caller does not intend to write any more data
744 : * into the tape set, but is reading from un-frozen tapes. Since no more
745 : * writes are planned, remembering free blocks is no longer useful. Setting
746 : * this flag lets us avoid wasting time and space in ltsReleaseBlock(), which
747 : * is not designed to handle large numbers of free blocks.
748 : */
749 : void
6242 tgl 750 GIC 119 : LogicalTapeSetForgetFreeSpace(LogicalTapeSet *lts)
751 : {
752 119 : lts->forgetFreeSpace = true;
753 119 : }
6242 tgl 754 ECB :
755 : /*
8576 756 : * Write to a logical tape.
757 : *
758 : * There are no error returns; we ereport() on failure.
759 : */
760 : void
100 peter 761 GNC 9297487 : LogicalTapeWrite(LogicalTape *lt, const void *ptr, size_t size)
762 : {
538 heikki.linnakangas 763 CBC 9297487 : LogicalTapeSet *lts = lt->tapeSet;
764 : size_t nthistime;
8576 tgl 765 ECB :
8576 tgl 766 CBC 9297487 : Assert(lt->writing);
1892 rhaas 767 GIC 9297487 : Assert(lt->offsetBlockNumber == 0L);
8576 tgl 768 ECB :
769 : /* Allocate data buffer and first block on first write */
6258 tgl 770 CBC 9297487 : if (lt->buffer == NULL)
2379 heikki.linnakangas 771 ECB : {
6258 tgl 772 GIC 14179 : lt->buffer = (char *) palloc(BLCKSZ);
2379 heikki.linnakangas 773 CBC 14179 : lt->buffer_size = BLCKSZ;
2379 heikki.linnakangas 774 ECB : }
2299 heikki.linnakangas 775 GIC 9297487 : if (lt->curBlockNumber == -1)
6258 tgl 776 ECB : {
2299 heikki.linnakangas 777 GIC 14179 : Assert(lt->firstBlockNumber == -1);
778 14179 : Assert(lt->pos == 0);
2299 heikki.linnakangas 779 ECB :
940 jdavis 780 CBC 14179 : lt->curBlockNumber = ltsGetBlock(lts, lt);
2299 heikki.linnakangas 781 GIC 14179 : lt->firstBlockNumber = lt->curBlockNumber;
2299 heikki.linnakangas 782 ECB :
2299 heikki.linnakangas 783 GIC 14179 : TapeBlockGetTrailer(lt->buffer)->prev = -1L;
784 : }
785 :
2379 786 9297487 : Assert(lt->buffer_size == BLCKSZ);
8576 tgl 787 CBC 18602804 : while (size > 0)
788 : {
1036 jdavis 789 GIC 9305317 : if (lt->pos >= (int) TapeBlockPayloadSize)
8576 tgl 790 EUB : {
791 : /* Buffer full, dump it out */
792 : long nextBlockNumber;
793 :
2299 heikki.linnakangas 794 GIC 11938 : if (!lt->dirty)
795 : {
796 : /* Hmm, went directly from reading to writing? */
7198 tgl 797 LBC 0 : elog(ERROR, "invalid logtape state: should be dirty");
798 : }
799 :
2299 heikki.linnakangas 800 ECB : /*
801 : * First allocate the next block, so that we can store it in the
802 : * 'next' pointer of this block.
803 : */
538 heikki.linnakangas 804 CBC 11938 : nextBlockNumber = ltsGetBlock(lt->tapeSet, lt);
2299 heikki.linnakangas 805 ECB :
806 : /* set the next-pointer and dump the current block. */
2299 heikki.linnakangas 807 CBC 11938 : TapeBlockGetTrailer(lt->buffer)->next = nextBlockNumber;
100 peter 808 GNC 11938 : ltsWriteBlock(lt->tapeSet, lt->curBlockNumber, lt->buffer);
809 :
2299 heikki.linnakangas 810 ECB : /* initialize the prev-pointer of the next block */
2299 heikki.linnakangas 811 CBC 11938 : TapeBlockGetTrailer(lt->buffer)->prev = lt->curBlockNumber;
812 11938 : lt->curBlockNumber = nextBlockNumber;
8576 tgl 813 11938 : lt->pos = 0;
8576 tgl 814 GIC 11938 : lt->nbytes = 0;
8576 tgl 815 ECB : }
816 :
2299 heikki.linnakangas 817 CBC 9305317 : nthistime = TapeBlockPayloadSize - lt->pos;
8576 tgl 818 9305317 : if (nthistime > size)
819 9293376 : nthistime = size;
820 9305317 : Assert(nthistime > 0);
8576 tgl 821 ECB :
8576 tgl 822 CBC 9305317 : memcpy(lt->buffer + lt->pos, ptr, nthistime);
823 :
824 9305317 : lt->dirty = true;
8576 tgl 825 GIC 9305317 : lt->pos += nthistime;
826 9305317 : if (lt->nbytes < lt->pos)
827 9305317 : lt->nbytes = lt->pos;
100 peter 828 GNC 9305317 : ptr = (const char *) ptr + nthistime;
8576 tgl 829 GIC 9305317 : size -= nthistime;
830 : }
831 9297487 : }
832 :
833 : /*
834 : * Rewind logical tape and switch from writing to reading.
835 : *
836 : * The tape must currently be in writing state, or "frozen" in read state.
837 : *
838 : * 'buffer_size' specifies how much memory to use for the read buffer.
2299 heikki.linnakangas 839 ECB : * Regardless of the argument, the actual amount of memory used is between
840 : * BLCKSZ and MaxAllocSize, and is a multiple of BLCKSZ. The given value is
841 : * rounded down and truncated to fit those constraints, if necessary. If the
842 : * tape is frozen, the 'buffer_size' argument is ignored, and a small BLCKSZ
843 : * byte buffer is used.
844 : */
845 : void
538 heikki.linnakangas 846 CBC 14109 : LogicalTapeRewindForRead(LogicalTape *lt, size_t buffer_size)
8576 tgl 847 ECB : {
538 heikki.linnakangas 848 GIC 14109 : LogicalTapeSet *lts = lt->tapeSet;
849 :
850 : /*
2370 heikki.linnakangas 851 ECB : * Round and cap buffer_size if needed.
852 : */
2370 heikki.linnakangas 853 GIC 14109 : if (lt->frozen)
854 3 : buffer_size = BLCKSZ;
2370 heikki.linnakangas 855 ECB : else
8576 tgl 856 : {
857 : /* need at least one block */
2370 heikki.linnakangas 858 GIC 14106 : if (buffer_size < BLCKSZ)
2370 heikki.linnakangas 859 CBC 288 : buffer_size = BLCKSZ;
860 :
861 : /* palloc() larger than max_size is unlikely to be helpful */
1892 rhaas 862 14106 : if (buffer_size > lt->max_size)
1892 rhaas 863 GIC 142 : buffer_size = lt->max_size;
864 :
865 : /* round down to BLCKSZ boundary */
2370 heikki.linnakangas 866 14106 : buffer_size -= buffer_size % BLCKSZ;
867 : }
2370 heikki.linnakangas 868 ECB :
2370 heikki.linnakangas 869 GIC 14109 : if (lt->writing)
870 : {
871 : /*
872 : * Completion of a write phase. Flush last partial data block, and
873 : * rewind for normal (destructive) read.
874 : */
875 14106 : if (lt->dirty)
876 : {
877 : /*
878 : * As long as we've filled the buffer at least once, its contents
879 : * are entirely defined from valgrind's point of view, even though
880 : * contents beyond the current end point may be stale. But it's
881 : * possible - at least in the case of a parallel sort - to sort
1872 rhaas 882 ECB : * such small amount of data that we do not fill the buffer even
883 : * once. Tell valgrind that its contents are defined, so it
884 : * doesn't bleat.
885 : */
886 : VALGRIND_MAKE_MEM_DEFINED(lt->buffer + lt->nbytes,
887 : lt->buffer_size - lt->nbytes);
888 :
2299 heikki.linnakangas 889 GIC 13964 : TapeBlockSetNBytes(lt->buffer, lt->nbytes);
100 peter 890 GNC 13964 : ltsWriteBlock(lt->tapeSet, lt->curBlockNumber, lt->buffer);
891 : }
2370 heikki.linnakangas 892 GIC 14106 : lt->writing = false;
8576 tgl 893 ECB : }
894 : else
895 : {
896 : /*
2370 heikki.linnakangas 897 : * This is only OK if tape is frozen; we rewind for (another) read
898 : * pass.
899 : */
2370 heikki.linnakangas 900 CBC 3 : Assert(lt->frozen);
2370 heikki.linnakangas 901 ECB : }
902 :
2370 heikki.linnakangas 903 GIC 14109 : if (lt->buffer)
2370 heikki.linnakangas 904 CBC 13967 : pfree(lt->buffer);
905 :
1146 jdavis 906 ECB : /* the buffer is lazily allocated, but set the size here */
2370 heikki.linnakangas 907 CBC 14109 : lt->buffer = NULL;
1146 jdavis 908 14109 : lt->buffer_size = buffer_size;
1048 jdavis 909 ECB :
910 : /* free the preallocation list, and return unused block numbers */
1048 jdavis 911 CBC 14109 : if (lt->prealloc != NULL)
912 : {
913 107709 : for (int i = lt->nprealloc; i > 0; i--)
1048 jdavis 914 GIC 94203 : ltsReleaseBlock(lts, lt->prealloc[i - 1]);
915 13506 : pfree(lt->prealloc);
916 13506 : lt->prealloc = NULL;
917 13506 : lt->nprealloc = 0;
918 13506 : lt->prealloc_size = 0;
919 : }
2370 heikki.linnakangas 920 14109 : }
2370 heikki.linnakangas 921 ECB :
922 : /*
8576 tgl 923 : * Read from a logical tape.
924 : *
925 : * Early EOF is indicated by return value less than #bytes requested.
926 : */
927 : size_t
538 heikki.linnakangas 928 CBC 9435087 : LogicalTapeRead(LogicalTape *lt, void *ptr, size_t size)
8576 tgl 929 ECB : {
8397 bruce 930 GIC 9435087 : size_t nread = 0;
8397 bruce 931 ECB : size_t nthistime;
932 :
8397 bruce 933 CBC 9435087 : Assert(!lt->writing);
934 :
1151 jdavis 935 GIC 9435087 : if (lt->buffer == NULL)
538 heikki.linnakangas 936 CBC 14109 : ltsInitReadBuffer(lt);
1151 jdavis 937 ECB :
8576 tgl 938 GIC 18859374 : while (size > 0)
939 : {
8576 tgl 940 CBC 9437793 : if (lt->pos >= lt->nbytes)
8576 tgl 941 ECB : {
942 : /* Try to load more data into buffer. */
538 heikki.linnakangas 943 CBC 17727 : if (!ltsReadFillBuffer(lt))
8576 tgl 944 GIC 13506 : break; /* EOF */
8576 tgl 945 ECB : }
946 :
8576 tgl 947 CBC 9424287 : nthistime = lt->nbytes - lt->pos;
948 9424287 : if (nthistime > size)
949 9405963 : nthistime = size;
950 9424287 : Assert(nthistime > 0);
951 :
8576 tgl 952 GIC 9424287 : memcpy(ptr, lt->buffer + lt->pos, nthistime);
8576 tgl 953 ECB :
8576 tgl 954 GIC 9424287 : lt->pos += nthistime;
100 peter 955 GNC 9424287 : ptr = (char *) ptr + nthistime;
8576 tgl 956 GIC 9424287 : size -= nthistime;
957 9424287 : nread += nthistime;
958 : }
959 :
960 9435087 : return nread;
961 : }
962 :
963 : /*
964 : * "Freeze" the contents of a tape so that it can be read multiple times
965 : * and/or read backwards. Once a tape is frozen, its contents will not
966 : * be released until the LogicalTapeSet is destroyed. This is expected
967 : * to be used only for the final output pass of a merge.
968 : *
969 : * This *must* be called just at the end of a write pass, before the
970 : * tape is rewound (after rewind is too late!). It performs a rewind
971 : * and switch to read mode "for free". An immediately following rewind-
972 : * for-read call is OK but not necessary.
973 : *
1892 rhaas 974 ECB : * share output argument is set with details of storage used for tape after
975 : * freezing, which may be passed to LogicalTapeSetCreate within leader
976 : * process later. This metadata is only of interest to worker callers
977 : * freezing their final output for leader (single materialized tape).
978 : * Serial sorts should set share to NULL.
8576 tgl 979 : */
980 : void
538 heikki.linnakangas 981 GIC 215 : LogicalTapeFreeze(LogicalTape *lt, TapeShare *share)
982 : {
983 215 : LogicalTapeSet *lts = lt->tapeSet;
984 :
8576 tgl 985 CBC 215 : Assert(lt->writing);
1892 rhaas 986 GIC 215 : Assert(lt->offsetBlockNumber == 0L);
987 :
988 : /*
989 : * Completion of a write phase. Flush last partial data block, and rewind
990 : * for nondestructive read.
991 : */
8576 tgl 992 215 : if (lt->dirty)
993 : {
994 : /*
995 : * As long as we've filled the buffer at least once, its contents are
996 : * entirely defined from valgrind's point of view, even though
997 : * contents beyond the current end point may be stale. But it's
1888 rhaas 998 ECB : * possible - at least in the case of a parallel sort - to sort such
999 : * small amount of data that we do not fill the buffer even once. Tell
1000 : * valgrind that its contents are defined, so it doesn't bleat.
1001 : */
1002 : VALGRIND_MAKE_MEM_DEFINED(lt->buffer + lt->nbytes,
1003 : lt->buffer_size - lt->nbytes);
1004 :
2299 heikki.linnakangas 1005 GIC 215 : TapeBlockSetNBytes(lt->buffer, lt->nbytes);
100 peter 1006 GNC 215 : ltsWriteBlock(lt->tapeSet, lt->curBlockNumber, lt->buffer);
1007 : }
8576 tgl 1008 GIC 215 : lt->writing = false;
1009 215 : lt->frozen = true;
1010 :
2379 heikki.linnakangas 1011 ECB : /*
1012 : * The seek and backspace functions assume a single block read buffer.
2379 heikki.linnakangas 1013 EUB : * That's OK with current usage. A larger buffer is helpful to make the
1014 : * read pattern of the backing file look more sequential to the OS, when
1015 : * we're reading from multiple tapes. But at the end of a sort, when a
1016 : * tape is frozen, we only read from a single tape anyway.
1017 : */
2379 heikki.linnakangas 1018 GIC 215 : if (!lt->buffer || lt->buffer_size != BLCKSZ)
1019 : {
2379 heikki.linnakangas 1020 LBC 0 : if (lt->buffer)
1021 0 : pfree(lt->buffer);
1022 0 : lt->buffer = palloc(BLCKSZ);
2379 heikki.linnakangas 1023 UIC 0 : lt->buffer_size = BLCKSZ;
2379 heikki.linnakangas 1024 ECB : }
2379 heikki.linnakangas 1025 EUB :
8576 tgl 1026 ECB : /* Read the first block, or reset if tape is empty */
2299 heikki.linnakangas 1027 CBC 215 : lt->curBlockNumber = lt->firstBlockNumber;
8576 tgl 1028 215 : lt->pos = 0;
8576 tgl 1029 GIC 215 : lt->nbytes = 0;
2299 heikki.linnakangas 1030 ECB :
2299 heikki.linnakangas 1031 CBC 215 : if (lt->firstBlockNumber == -1L)
2299 heikki.linnakangas 1032 UIC 0 : lt->nextBlockNumber = -1L;
100 peter 1033 GNC 215 : ltsReadBlock(lt->tapeSet, lt->curBlockNumber, lt->buffer);
2299 heikki.linnakangas 1034 CBC 215 : if (TapeBlockIsLast(lt->buffer))
2299 heikki.linnakangas 1035 GIC 181 : lt->nextBlockNumber = -1L;
2299 heikki.linnakangas 1036 ECB : else
2299 heikki.linnakangas 1037 CBC 34 : lt->nextBlockNumber = TapeBlockGetTrailer(lt->buffer)->next;
2299 heikki.linnakangas 1038 GIC 215 : lt->nbytes = TapeBlockGetNBytes(lt->buffer);
1892 rhaas 1039 ECB :
1040 : /* Handle extra steps when caller is to share its tapeset */
1892 rhaas 1041 GIC 215 : if (share)
1042 : {
587 akapila 1043 206 : BufFileExportFileSet(lts->pfile);
1892 rhaas 1044 206 : share->firstblocknumber = lt->firstBlockNumber;
1045 : }
8576 tgl 1046 215 : }
1047 :
1048 : /*
1049 : * Backspace the tape a given number of bytes. (We also support a more
1050 : * general seek interface, see below.)
1051 : *
1052 : * *Only* a frozen-for-read tape can be backed up; we don't support
1053 : * random access during write, and an unfrozen read tape may have
1054 : * already discarded the desired data!
8576 tgl 1055 ECB : *
1056 : * Returns the number of bytes backed up. It can be less than the
2299 heikki.linnakangas 1057 : * requested amount, if there isn't that much data before the current
1058 : * position. The tape is positioned to the beginning of the tape in
1059 : * that case.
8576 tgl 1060 : */
1061 : size_t
538 heikki.linnakangas 1062 CBC 36 : LogicalTapeBackspace(LogicalTape *lt, size_t size)
8576 tgl 1063 EUB : {
2299 heikki.linnakangas 1064 GIC 36 : size_t seekpos = 0;
1065 :
8576 tgl 1066 36 : Assert(lt->frozen);
2379 heikki.linnakangas 1067 36 : Assert(lt->buffer_size == BLCKSZ);
8576 tgl 1068 ECB :
1151 jdavis 1069 GIC 36 : if (lt->buffer == NULL)
538 heikki.linnakangas 1070 LBC 0 : ltsInitReadBuffer(lt);
1151 jdavis 1071 ECB :
1072 : /*
1073 : * Easy case for seek within current block.
1074 : */
8576 tgl 1075 GIC 36 : if (size <= (size_t) lt->pos)
1076 : {
1077 33 : lt->pos -= (int) size;
2299 heikki.linnakangas 1078 33 : return size;
8576 tgl 1079 ECB : }
8397 bruce 1080 :
1081 : /*
2299 heikki.linnakangas 1082 : * Not-so-easy case, have to walk back the chain of blocks. This
1083 : * implementation would be pretty inefficient for long seeks, but we
1084 : * really aren't doing that (a seek over one tuple is typical).
1085 : */
2299 heikki.linnakangas 1086 GIC 3 : seekpos = (size_t) lt->pos; /* part within this block */
2299 heikki.linnakangas 1087 CBC 3 : while (size > seekpos)
8576 tgl 1088 EUB : {
2299 heikki.linnakangas 1089 CBC 3 : long prev = TapeBlockGetTrailer(lt->buffer)->prev;
8576 tgl 1090 ECB :
2299 heikki.linnakangas 1091 GIC 3 : if (prev == -1L)
1092 : {
2299 heikki.linnakangas 1093 EUB : /* Tried to back up beyond the beginning of tape. */
2299 heikki.linnakangas 1094 GIC 3 : if (lt->curBlockNumber != lt->firstBlockNumber)
2299 heikki.linnakangas 1095 UBC 0 : elog(ERROR, "unexpected end of tape");
2299 heikki.linnakangas 1096 GBC 3 : lt->pos = 0;
2299 heikki.linnakangas 1097 GIC 3 : return seekpos;
1098 : }
1099 :
100 peter 1100 UNC 0 : ltsReadBlock(lt->tapeSet, prev, lt->buffer);
2299 heikki.linnakangas 1101 EUB :
2299 heikki.linnakangas 1102 UBC 0 : if (TapeBlockGetTrailer(lt->buffer)->next != lt->curBlockNumber)
1103 0 : elog(ERROR, "broken tape, next of block %ld is %ld, expected %ld",
1104 : prev,
2299 heikki.linnakangas 1105 EUB : TapeBlockGetTrailer(lt->buffer)->next,
1106 : lt->curBlockNumber);
1107 :
2299 heikki.linnakangas 1108 UIC 0 : lt->nbytes = TapeBlockPayloadSize;
1109 0 : lt->curBlockNumber = prev;
1110 0 : lt->nextBlockNumber = TapeBlockGetTrailer(lt->buffer)->next;
1111 :
1112 0 : seekpos += TapeBlockPayloadSize;
8576 tgl 1113 EUB : }
2299 heikki.linnakangas 1114 :
1115 : /*
1116 : * 'seekpos' can now be greater than 'size', because it points to the
1117 : * beginning the target block. The difference is the position within the
1118 : * page.
1119 : */
2299 heikki.linnakangas 1120 UIC 0 : lt->pos = seekpos - size;
1121 0 : return size;
1122 : }
1123 :
1124 : /*
1125 : * Seek to an arbitrary position in a logical tape.
8576 tgl 1126 ECB : *
1127 : * *Only* a frozen-for-read tape can be seeked.
1128 : *
2299 heikki.linnakangas 1129 : * Must be called with a block/offset previously returned by
1130 : * LogicalTapeTell().
1131 : */
1132 : void
538 heikki.linnakangas 1133 GBC 3096 : LogicalTapeSeek(LogicalTape *lt, long blocknum, int offset)
1134 : {
8576 tgl 1135 CBC 3096 : Assert(lt->frozen);
2299 heikki.linnakangas 1136 GIC 3096 : Assert(offset >= 0 && offset <= TapeBlockPayloadSize);
2379 heikki.linnakangas 1137 CBC 3096 : Assert(lt->buffer_size == BLCKSZ);
8576 tgl 1138 ECB :
1151 jdavis 1139 CBC 3096 : if (lt->buffer == NULL)
538 heikki.linnakangas 1140 LBC 0 : ltsInitReadBuffer(lt);
1141 :
2299 heikki.linnakangas 1142 GIC 3096 : if (blocknum != lt->curBlockNumber)
8576 tgl 1143 ECB : {
100 peter 1144 GNC 39 : ltsReadBlock(lt->tapeSet, blocknum, lt->buffer);
2299 heikki.linnakangas 1145 CBC 39 : lt->curBlockNumber = blocknum;
1146 39 : lt->nbytes = TapeBlockPayloadSize;
2299 heikki.linnakangas 1147 GIC 39 : lt->nextBlockNumber = TapeBlockGetTrailer(lt->buffer)->next;
1148 : }
1149 :
1150 3096 : if (offset > lt->nbytes)
2299 heikki.linnakangas 1151 UIC 0 : elog(ERROR, "invalid tape seek position");
8576 tgl 1152 GIC 3096 : lt->pos = offset;
1153 3096 : }
1154 :
8576 tgl 1155 ECB : /*
1156 : * Obtain current position in a form suitable for a later LogicalTapeSeek.
1157 : *
8576 tgl 1158 EUB : * NOTE: it'd be OK to do this during write phase with intention of using
1159 : * the position for a seek after freezing. Not clear if anyone needs that.
8576 tgl 1160 ECB : */
1161 : void
538 heikki.linnakangas 1162 GIC 4404 : LogicalTapeTell(LogicalTape *lt, long *blocknum, int *offset)
8576 tgl 1163 ECB : {
1151 jdavis 1164 GIC 4404 : if (lt->buffer == NULL)
538 heikki.linnakangas 1165 LBC 0 : ltsInitReadBuffer(lt);
1151 jdavis 1166 ECB :
1892 rhaas 1167 CBC 4404 : Assert(lt->offsetBlockNumber == 0L);
1168 :
1169 : /* With a larger buffer, 'pos' wouldn't be the same as offset within page */
2379 heikki.linnakangas 1170 GIC 4404 : Assert(lt->buffer_size == BLCKSZ);
1171 :
8576 tgl 1172 4404 : *blocknum = lt->curBlockNumber;
1173 4404 : *offset = lt->pos;
1174 4404 : }
1175 :
6382 tgl 1176 ECB : /*
1177 : * Obtain total disk space currently used by a LogicalTapeSet, in blocks.
936 jdavis 1178 : *
1179 : * This should not be called while there are open write buffers; otherwise it
1180 : * may not account for buffered data.
1181 : */
1182 : long
6382 tgl 1183 GIC 13873 : LogicalTapeSetBlocks(LogicalTapeSet *lts)
1184 : {
936 jdavis 1185 13873 : return lts->nBlocksWritten - lts->nHoleBlocks;
1186 : }
|