Age Owner TLA Line data Source code
1 : /*-------------------------------------------------------------------------
2 : *
3 : * buffile.c
4 : * Management of large buffered temporary files.
5 : *
6 : * Portions Copyright (c) 1996-2023, PostgreSQL Global Development Group
7 : * Portions Copyright (c) 1994, Regents of the University of California
8 : *
9 : * IDENTIFICATION
10 : * src/backend/storage/file/buffile.c
11 : *
12 : * NOTES:
13 : *
14 : * BufFiles provide a very incomplete emulation of stdio atop virtual Files
15 : * (as managed by fd.c). Currently, we only support the buffered-I/O
16 : * aspect of stdio: a read or write of the low-level File occurs only
17 : * when the buffer is filled or emptied. This is an even bigger win
18 : * for virtual Files than for ordinary kernel files, since reducing the
19 : * frequency with which a virtual File is touched reduces "thrashing"
20 : * of opening/closing file descriptors.
21 : *
22 : * Note that BufFile structs are allocated with palloc(), and therefore
23 : * will go away automatically at query/transaction end. Since the underlying
24 : * virtual Files are made with OpenTemporaryFile, all resources for
25 : * the file are certain to be cleaned up even if processing is aborted
26 : * by ereport(ERROR). The data structures required are made in the
27 : * palloc context that was current when the BufFile was created, and
28 : * any external resources such as temp files are owned by the ResourceOwner
29 : * that was current at that time.
30 : *
31 : * BufFile also supports temporary files that exceed the OS file size limit
32 : * (by opening multiple fd.c temporary files). This is an essential feature
33 : * for sorts and hashjoins on large amounts of data.
34 : *
35 : * BufFile supports temporary files that can be shared with other backends, as
36 : * infrastructure for parallel execution. Such files need to be created as a
37 : * member of a SharedFileSet that all participants are attached to.
38 : *
39 : * BufFile also supports temporary files that can be used by the single backend
40 : * when the corresponding files need to be survived across the transaction and
41 : * need to be opened and closed multiple times. Such files need to be created
42 : * as a member of a FileSet.
43 : *-------------------------------------------------------------------------
44 : */
45 :
46 : #include "postgres.h"
47 :
48 : #include "commands/tablespace.h"
49 : #include "executor/instrument.h"
50 : #include "miscadmin.h"
51 : #include "pgstat.h"
52 : #include "storage/buf_internals.h"
53 : #include "storage/buffile.h"
54 : #include "storage/fd.h"
55 : #include "utils/resowner.h"
56 :
57 : /*
58 : * We break BufFiles into gigabyte-sized segments, regardless of RELSEG_SIZE.
59 : * The reason is that we'd like large BufFiles to be spread across multiple
60 : * tablespaces when available.
61 : */
62 : #define MAX_PHYSICAL_FILESIZE 0x40000000
63 : #define BUFFILE_SEG_SIZE (MAX_PHYSICAL_FILESIZE / BLCKSZ)
64 :
65 : /*
66 : * This data structure represents a buffered file that consists of one or
67 : * more physical files (each accessed through a virtual file descriptor
68 : * managed by fd.c).
69 : */
70 : struct BufFile
71 : {
72 : int numFiles; /* number of physical files in set */
73 : /* all files except the last have length exactly MAX_PHYSICAL_FILESIZE */
74 : File *files; /* palloc'd array with numFiles entries */
75 :
76 : bool isInterXact; /* keep open over transactions? */
77 : bool dirty; /* does buffer need to be written? */
78 : bool readOnly; /* has the file been set to read only? */
79 :
80 : FileSet *fileset; /* space for fileset based segment files */
81 : const char *name; /* name of fileset based BufFile */
82 :
83 : /*
84 : * resowner is the ResourceOwner to use for underlying temp files. (We
85 : * don't need to remember the memory context we're using explicitly,
86 : * because after creation we only repalloc our arrays larger.)
87 : */
88 : ResourceOwner resowner;
89 :
90 : /*
91 : * "current pos" is position of start of buffer within the logical file.
92 : * Position as seen by user of BufFile is (curFile, curOffset + pos).
93 : */
94 : int curFile; /* file index (0..n) part of current pos */
95 : off_t curOffset; /* offset part of current pos */
96 : int pos; /* next read/write position in buffer */
97 : int nbytes; /* total # of valid bytes in buffer */
98 :
99 : /*
100 : * XXX Should ideally us PGIOAlignedBlock, but might need a way to avoid
101 : * wasting per-file alignment padding when some users create many
102 : * files.
103 : */
104 : PGAlignedBlock buffer;
105 : };
106 :
107 : static BufFile *makeBufFileCommon(int nfiles);
108 : static BufFile *makeBufFile(File firstfile);
109 : static void extendBufFile(BufFile *file);
110 : static void BufFileLoadBuffer(BufFile *file);
111 : static void BufFileDumpBuffer(BufFile *file);
112 : static void BufFileFlush(BufFile *file);
113 : static File MakeNewFileSetSegment(BufFile *buffile, int segment);
114 :
115 : /*
116 : * Create BufFile and perform the common initialization.
117 : */
118 : static BufFile *
1758 ishii 119 GIC 5116 : makeBufFileCommon(int nfiles)
120 : {
8397 bruce 121 5116 : BufFile *file = (BufFile *) palloc(sizeof(BufFile));
122 :
1758 ishii 123 5116 : file->numFiles = nfiles;
5789 tgl 124 5116 : file->isInterXact = false;
8576 tgl 125 CBC 5116 : file->dirty = false;
3446 tgl 126 GIC 5116 : file->resowner = CurrentResourceOwner;
8576 tgl 127 CBC 5116 : file->curFile = 0;
11 peter 128 GNC 5116 : file->curOffset = 0;
8576 tgl 129 CBC 5116 : file->pos = 0;
130 5116 : file->nbytes = 0;
1758 ishii 131 ECB :
1758 ishii 132 CBC 5116 : return file;
1758 ishii 133 ECB : }
134 :
135 : /*
136 : * Create a BufFile given the first underlying physical file.
137 : * NOTE: caller must set isInterXact if appropriate.
138 : */
139 : static BufFile *
1758 ishii 140 GIC 1783 : makeBufFile(File firstfile)
141 : {
142 1783 : BufFile *file = makeBufFileCommon(1);
143 :
144 1783 : file->files = (File *) palloc(sizeof(File));
145 1783 : file->files[0] = firstfile;
1955 andres 146 CBC 1783 : file->readOnly = false;
1955 andres 147 GIC 1783 : file->fileset = NULL;
1955 andres 148 CBC 1783 : file->name = NULL;
149 :
8579 tgl 150 1783 : return file;
8579 tgl 151 ECB : }
152 :
153 : /*
154 : * Add another component temp file.
155 : */
156 : static void
8576 tgl 157 UIC 0 : extendBufFile(BufFile *file)
158 : {
159 : File pfile;
160 : ResourceOwner oldowner;
161 :
162 : /* Be sure to associate the file with the BufFile's resource owner */
3446 tgl 163 UBC 0 : oldowner = CurrentResourceOwner;
3446 tgl 164 UIC 0 : CurrentResourceOwner = file->resowner;
165 :
1955 andres 166 0 : if (file->fileset == NULL)
167 0 : pfile = OpenTemporaryFile(file->isInterXact);
168 : else
587 akapila 169 UBC 0 : pfile = MakeNewFileSetSegment(file, file->numFiles);
1955 andres 170 EUB :
8579 tgl 171 UIC 0 : Assert(pfile >= 0);
8579 tgl 172 EUB :
3446 tgl 173 UBC 0 : CurrentResourceOwner = oldowner;
174 :
8579 175 0 : file->files = (File *) repalloc(file->files,
8397 bruce 176 UIC 0 : (file->numFiles + 1) * sizeof(File));
8579 tgl 177 UBC 0 : file->files[file->numFiles] = pfile;
8579 tgl 178 UIC 0 : file->numFiles++;
8579 tgl 179 UBC 0 : }
180 :
8579 tgl 181 EUB : /*
182 : * Create a BufFile for a new temporary file (which will expand to become
183 : * multiple temporary files if more than MAX_PHYSICAL_FILESIZE bytes are
184 : * written to it).
7285 185 : *
186 : * If interXact is true, the temp file will not be automatically deleted
187 : * at end of transaction.
188 : *
189 : * Note: if interXact is true, the caller had better be calling us in a
190 : * memory context, and with a resource owner, that will survive across
191 : * transaction boundaries.
192 : */
193 : BufFile *
5785 tgl 194 GIC 1783 : BufFileCreateTemp(bool interXact)
195 : {
196 : BufFile *file;
197 : File pfile;
198 :
199 : /*
1422 tgl 200 ECB : * Ensure that temp tablespaces are set up for OpenTemporaryFile to use.
201 : * Possibly the caller will have done this already, but it seems useful to
202 : * double-check here. Failure to do this at all would result in the temp
203 : * files always getting placed in the default tablespace, which is a
204 : * pretty hard-to-detect bug. Callers may prefer to do it earlier if they
205 : * want to be sure that any required catalog access is done in some other
206 : * resource context.
207 : */
1422 tgl 208 GIC 1783 : PrepareTempTablespaces();
209 :
5785 210 1783 : pfile = OpenTemporaryFile(interXact);
8579 211 1783 : Assert(pfile >= 0);
212 :
8576 213 1783 : file = makeBufFile(pfile);
7285 tgl 214 CBC 1783 : file->isInterXact = interXact;
215 :
8576 216 1783 : return file;
8579 tgl 217 ECB : }
218 :
1955 andres 219 : /*
220 : * Build the name for a given segment of a given BufFile.
221 : */
222 : static void
587 akapila 223 GIC 7326 : FileSetSegmentName(char *name, const char *buffile_name, int segment)
224 : {
1955 andres 225 7326 : snprintf(name, MAXPGPATH, "%s.%d", buffile_name, segment);
226 7326 : }
227 :
228 : /*
587 akapila 229 ECB : * Create a new segment file backing a fileset based BufFile.
230 : */
1955 andres 231 : static File
587 akapila 232 CBC 1439 : MakeNewFileSetSegment(BufFile *buffile, int segment)
233 : {
234 : char name[MAXPGPATH];
235 : File file;
236 :
237 : /*
1943 andres 238 ECB : * It is possible that there are files left over from before a crash
239 : * restart with the same name. In order for BufFileOpenFileSet() not to
240 : * get confused about how many segments there are, we'll unlink the next
241 : * segment number if it already exists.
242 : */
587 akapila 243 GIC 1439 : FileSetSegmentName(name, buffile->name, segment + 1);
244 1439 : FileSetDelete(buffile->fileset, name, true);
245 :
246 : /* Create the new segment. */
247 1439 : FileSetSegmentName(name, buffile->name, segment);
248 1439 : file = FileSetCreate(buffile->fileset, name);
1955 andres 249 ECB :
587 akapila 250 : /* FileSetCreate would've errored out */
1955 andres 251 GIC 1439 : Assert(file > 0);
252 :
1955 andres 253 CBC 1439 : return file;
1955 andres 254 ECB : }
255 :
256 : /*
257 : * Create a BufFile that can be discovered and opened read-only by other
258 : * backends that are attached to the same SharedFileSet using the same name.
259 : *
260 : * The naming scheme for fileset based BufFiles is left up to the calling code.
261 : * The name will appear as part of one or more filenames on disk, and might
262 : * provide clues to administrators about which subsystem is generating
263 : * temporary file data. Since each SharedFileSet object is backed by one or
264 : * more uniquely named temporary directory, names don't conflict with
265 : * unrelated SharedFileSet objects.
266 : */
267 : BufFile *
587 akapila 268 GIC 1439 : BufFileCreateFileSet(FileSet *fileset, const char *name)
269 : {
270 : BufFile *file;
271 :
1758 ishii 272 1439 : file = makeBufFileCommon(1);
1955 andres 273 1439 : file->fileset = fileset;
1955 andres 274 CBC 1439 : file->name = pstrdup(name);
1955 andres 275 GIC 1439 : file->files = (File *) palloc(sizeof(File));
587 akapila 276 1439 : file->files[0] = MakeNewFileSetSegment(file, 0);
1955 andres 277 1439 : file->readOnly = false;
1955 andres 278 ECB :
1955 andres 279 CBC 1439 : return file;
1955 andres 280 ECB : }
281 :
282 : /*
283 : * Open a file that was previously created in another backend (or this one)
284 : * with BufFileCreateFileSet in the same FileSet using the same name.
285 : * The backend that created the file must have called BufFileClose() or
286 : * BufFileExportFileSet() to make sure that it is ready to be opened by other
287 : * backends and render it read-only. If missing_ok is true, which indicates
288 : * that missing files can be safely ignored, then return NULL if the BufFile
289 : * with the given name is not found, otherwise, throw an error.
290 : */
291 : BufFile *
584 akapila 292 GIC 2165 : BufFileOpenFileSet(FileSet *fileset, const char *name, int mode,
293 : bool missing_ok)
294 : {
295 : BufFile *file;
296 : char segment_name[MAXPGPATH];
1955 andres 297 2165 : Size capacity = 16;
1759 ishii 298 ECB : File *files;
1955 andres 299 GIC 2165 : int nfiles = 0;
300 :
301 2165 : files = palloc(sizeof(File) * capacity);
302 :
1955 andres 303 ECB : /*
304 : * We don't know how many segments there are, so we'll probe the
305 : * filesystem to find out.
306 : */
307 : for (;;)
308 : {
309 : /* See if we need to expand our file segment array. */
1955 andres 310 GIC 4059 : if (nfiles + 1 > capacity)
311 : {
1955 andres 312 UIC 0 : capacity *= 2;
313 0 : files = repalloc(files, sizeof(File) * capacity);
314 : }
315 : /* Try to load a segment. */
587 akapila 316 CBC 4059 : FileSetSegmentName(segment_name, name, nfiles);
587 akapila 317 GIC 4059 : files[nfiles] = FileSetOpen(fileset, segment_name, mode);
1955 andres 318 GBC 4059 : if (files[nfiles] <= 0)
319 2165 : break;
1955 andres 320 GIC 1894 : ++nfiles;
321 :
1955 andres 322 CBC 1894 : CHECK_FOR_INTERRUPTS();
1955 andres 323 ECB : }
324 :
325 : /*
326 : * If we didn't find any files at all, then no BufFile exists with this
327 : * name.
328 : */
1955 andres 329 GIC 2165 : if (nfiles == 0)
330 : {
331 : /* free the memory */
584 akapila 332 271 : pfree(files);
333 :
334 271 : if (missing_ok)
584 akapila 335 CBC 271 : return NULL;
336 :
1943 andres 337 UIC 0 : ereport(ERROR,
1943 andres 338 ECB : (errcode_for_file_access(),
339 : errmsg("could not open temporary file \"%s\" from BufFile \"%s\": %m",
1593 pg 340 : segment_name, name)));
584 akapila 341 : }
342 :
1758 ishii 343 GBC 1894 : file = makeBufFileCommon(nfiles);
1955 andres 344 GIC 1894 : file->files = files;
578 michael 345 1894 : file->readOnly = (mode == O_RDONLY);
1955 andres 346 1894 : file->fileset = fileset;
347 1894 : file->name = pstrdup(name);
348 :
1955 andres 349 CBC 1894 : return file;
1955 andres 350 ECB : }
351 :
352 : /*
587 akapila 353 : * Delete a BufFile that was created by BufFileCreateFileSet in the given
354 : * FileSet using the given name.
1955 andres 355 : *
356 : * It is not necessary to delete files explicitly with this function. It is
357 : * provided only as a way to delete files proactively, rather than waiting for
358 : * the FileSet to be cleaned up.
359 : *
360 : * Only one backend should attempt to delete a given name, and should know
361 : * that it exists and has been exported or closed otherwise missing_ok should
362 : * be passed true.
363 : */
364 : void
584 akapila 365 GIC 350 : BufFileDeleteFileSet(FileSet *fileset, const char *name, bool missing_ok)
366 : {
367 : char segment_name[MAXPGPATH];
1955 andres 368 350 : int segment = 0;
369 350 : bool found = false;
370 :
1955 andres 371 ECB : /*
372 : * We don't know how many segments the file has. We'll keep deleting
373 : * until we run out. If we don't manage to find even an initial segment,
374 : * raise an error.
375 : */
376 : for (;;)
377 : {
587 akapila 378 GIC 389 : FileSetSegmentName(segment_name, name, segment);
379 389 : if (!FileSetDelete(fileset, segment_name, true))
1955 andres 380 350 : break;
381 39 : found = true;
382 39 : ++segment;
383 :
1955 andres 384 CBC 39 : CHECK_FOR_INTERRUPTS();
1955 andres 385 ECB : }
386 :
584 akapila 387 CBC 350 : if (!found && !missing_ok)
587 akapila 388 LBC 0 : elog(ERROR, "could not delete unknown BufFile \"%s\"", name);
1955 andres 389 GIC 350 : }
1955 andres 390 ECB :
391 : /*
392 : * BufFileExportFileSet --- flush and make read-only, in preparation for sharing.
393 : */
1955 andres 394 EUB : void
587 akapila 395 CBC 206 : BufFileExportFileSet(BufFile *file)
396 : {
397 : /* Must be a file belonging to a FileSet. */
1955 andres 398 GIC 206 : Assert(file->fileset != NULL);
399 :
400 : /* It's probably a bug if someone calls this twice. */
1955 andres 401 CBC 206 : Assert(!file->readOnly);
402 :
1955 andres 403 GIC 206 : BufFileFlush(file);
1955 andres 404 CBC 206 : file->readOnly = true;
1955 andres 405 GIC 206 : }
406 :
8579 tgl 407 ECB : /*
408 : * Close a BufFile
409 : *
410 : * Like fclose(), this also implicitly FileCloses the underlying File.
411 : */
412 : void
8579 tgl 413 GIC 5036 : BufFileClose(BufFile *file)
414 : {
415 : int i;
416 :
417 : /* flush any unwritten data */
418 5036 : BufFileFlush(file);
1961 tgl 419 ECB : /* close and delete the underlying file(s) */
8576 tgl 420 GIC 10143 : for (i = 0; i < file->numFiles; i++)
421 5107 : FileClose(file->files[i]);
422 : /* release the buffer space */
423 5036 : pfree(file->files);
8579 tgl 424 CBC 5036 : pfree(file);
8579 tgl 425 GIC 5036 : }
8579 tgl 426 ECB :
8576 427 : /*
428 : * BufFileLoadBuffer
8579 429 : *
430 : * Load some data into buffer, if possible, starting from curOffset.
431 : * At call, must have dirty = false, pos and nbytes = 0.
432 : * On exit, nbytes is number of bytes loaded.
433 : */
434 : static void
8579 tgl 435 GIC 54301 : BufFileLoadBuffer(BufFile *file)
436 : {
437 : File thisfile;
438 : instr_time io_start;
439 : instr_time io_time;
440 :
8579 tgl 441 ECB : /*
442 : * Advance to next component file if necessary and possible.
443 : */
8579 tgl 444 GIC 54301 : if (file->curOffset >= MAX_PHYSICAL_FILESIZE &&
8397 bruce 445 UIC 0 : file->curFile + 1 < file->numFiles)
446 : {
8579 tgl 447 0 : file->curFile++;
11 peter 448 UNC 0 : file->curOffset = 0;
449 : }
8397 bruce 450 ECB :
366 michael 451 GBC 54301 : thisfile = file->files[file->curFile];
452 :
453 54301 : if (track_io_timing)
366 michael 454 UBC 0 : INSTR_TIME_SET_CURRENT(io_start);
455 : else
79 andres 456 GNC 54301 : INSTR_TIME_SET_ZERO(io_start);
457 :
458 : /*
8576 tgl 459 ECB : * Read whatever we can get, up to a full bufferload.
460 : */
2213 rhaas 461 CBC 108602 : file->nbytes = FileRead(thisfile,
1681 tgl 462 GBC 54301 : file->buffer.data,
463 : sizeof(file->buffer),
1614 tmunro 464 ECB : file->curOffset,
465 : WAIT_EVENT_BUFFILE_READ);
8579 tgl 466 GIC 54301 : if (file->nbytes < 0)
467 : {
8579 tgl 468 UIC 0 : file->nbytes = 0;
1027 tmunro 469 LBC 0 : ereport(ERROR,
1027 tmunro 470 ECB : (errcode_for_file_access(),
471 : errmsg("could not read file \"%s\": %m",
472 : FilePathName(thisfile))));
473 : }
474 :
366 michael 475 GIC 54301 : if (track_io_timing)
366 michael 476 EUB : {
366 michael 477 UBC 0 : INSTR_TIME_SET_CURRENT(io_time);
10 andres 478 UNC 0 : INSTR_TIME_ACCUM_DIFF(pgBufferUsage.temp_blk_read_time, io_time, io_start);
479 : }
480 :
481 : /* we choose not to advance curOffset here */
5317 tgl 482 ECB :
1986 rhaas 483 GIC 54301 : if (file->nbytes > 0)
1986 rhaas 484 GBC 52717 : pgBufferUsage.temp_blks_read++;
8579 tgl 485 54301 : }
486 :
487 : /*
488 : * BufFileDumpBuffer
489 : *
8579 tgl 490 ECB : * Dump buffer contents starting at curOffset.
491 : * At call, should have dirty = true, nbytes > 0.
492 : * On exit, dirty is cleared if successful write, and curOffset is advanced.
493 : */
494 : static void
8579 tgl 495 GIC 59021 : BufFileDumpBuffer(BufFile *file)
496 : {
497 59021 : int wpos = 0;
498 : int bytestowrite;
499 : File thisfile;
500 :
501 : /*
8397 bruce 502 ECB : * Unlike BufFileLoadBuffer, we must dump the whole buffer even if it
503 : * crosses a component-file boundary; so we need a loop.
8579 tgl 504 : */
8579 tgl 505 GIC 118042 : while (wpos < file->nbytes)
506 : {
507 : off_t availbytes;
508 : instr_time io_start;
509 : instr_time io_time;
510 :
511 : /*
8579 tgl 512 ECB : * Advance to next component file if necessary and possible.
513 : */
1970 andres 514 GIC 59021 : if (file->curOffset >= MAX_PHYSICAL_FILESIZE)
515 : {
8397 bruce 516 UIC 0 : while (file->curFile + 1 >= file->numFiles)
8576 tgl 517 0 : extendBufFile(file);
8579 518 0 : file->curFile++;
11 peter 519 UNC 0 : file->curOffset = 0;
520 : }
8397 bruce 521 ECB :
522 : /*
1961 tgl 523 EUB : * Determine how much we need to write into this file.
8579 524 : */
8579 tgl 525 GBC 59021 : bytestowrite = file->nbytes - wpos;
1970 andres 526 59021 : availbytes = MAX_PHYSICAL_FILESIZE - file->curOffset;
527 :
1970 andres 528 GIC 59021 : if ((off_t) bytestowrite > availbytes)
1970 andres 529 UIC 0 : bytestowrite = (int) availbytes;
530 :
8576 tgl 531 GIC 59021 : thisfile = file->files[file->curFile];
366 michael 532 ECB :
366 michael 533 CBC 59021 : if (track_io_timing)
366 michael 534 UIC 0 : INSTR_TIME_SET_CURRENT(io_start);
535 : else
79 andres 536 GNC 59021 : INSTR_TIME_SET_ZERO(io_start);
366 michael 537 ECB :
2213 rhaas 538 GBC 59021 : bytestowrite = FileWrite(thisfile,
1681 tgl 539 GIC 59021 : file->buffer.data + wpos,
2213 rhaas 540 ECB : bytestowrite,
541 : file->curOffset,
542 : WAIT_EVENT_BUFFILE_WRITE);
8579 tgl 543 GBC 59021 : if (bytestowrite <= 0)
1027 tmunro 544 UIC 0 : ereport(ERROR,
1027 tmunro 545 ECB : (errcode_for_file_access(),
546 : errmsg("could not write to file \"%s\": %m",
547 : FilePathName(thisfile))));
366 michael 548 :
366 michael 549 GIC 59021 : if (track_io_timing)
550 : {
366 michael 551 UIC 0 : INSTR_TIME_SET_CURRENT(io_time);
10 andres 552 UNC 0 : INSTR_TIME_ACCUM_DIFF(pgBufferUsage.temp_blk_write_time, io_time, io_start);
553 : }
554 :
8579 tgl 555 GIC 59021 : file->curOffset += bytestowrite;
556 59021 : wpos += bytestowrite;
5317 tgl 557 ECB :
4863 rhaas 558 GIC 59021 : pgBufferUsage.temp_blks_written++;
8579 tgl 559 EUB : }
8579 tgl 560 GBC 59021 : file->dirty = false;
561 :
562 : /*
6385 bruce 563 ECB : * At this point, curOffset has been advanced to the end of the buffer,
564 : * ie, its original value + nbytes. We need to make it point to the
565 : * logical file position, ie, original value + pos, in case that is less
566 : * (as could happen due to a small backwards seek in a dirty buffer!)
567 : */
8579 tgl 568 CBC 59021 : file->curOffset -= (file->nbytes - file->pos);
8579 tgl 569 GIC 59021 : if (file->curOffset < 0) /* handle possible segment crossing */
570 : {
8579 tgl 571 UIC 0 : file->curFile--;
572 0 : Assert(file->curFile >= 0);
573 0 : file->curOffset += MAX_PHYSICAL_FILESIZE;
574 : }
575 :
8397 bruce 576 ECB : /*
6385 577 : * Now we can set the buffer empty without changing the logical position
578 : */
8579 tgl 579 GBC 59021 : file->pos = 0;
580 59021 : file->nbytes = 0;
581 59021 : }
582 :
583 : /*
584 : * BufFileRead variants
585 : *
586 : * Like fread() except we assume 1-byte element size and report I/O errors via
1027 tmunro 587 ECB : * ereport().
588 : *
589 : * If 'exact' is true, then an error is also raised if the number of bytes
590 : * read is not exactly 'size' (no short reads). If 'exact' and 'eofOK' are
591 : * true, then reading zero bytes is ok.
8579 tgl 592 : */
593 : static size_t
83 peter 594 GNC 16167292 : BufFileReadCommon(BufFile *file, void *ptr, size_t size, bool exact, bool eofOK)
595 : {
596 16167292 : size_t start_size = size;
8579 tgl 597 GIC 16167292 : size_t nread = 0;
598 : size_t nthistime;
599 :
1027 tmunro 600 16167292 : BufFileFlush(file);
601 :
8579 tgl 602 32348235 : while (size > 0)
603 : {
604 16182527 : if (file->pos >= file->nbytes)
605 : {
606 : /* Try to load more data into buffer. */
8579 tgl 607 CBC 54301 : file->curOffset += file->pos;
8579 tgl 608 GIC 54301 : file->pos = 0;
8579 tgl 609 CBC 54301 : file->nbytes = 0;
610 54301 : BufFileLoadBuffer(file);
8579 tgl 611 GIC 54301 : if (file->nbytes <= 0)
612 1584 : break; /* no more data available */
8579 tgl 613 ECB : }
614 :
8579 tgl 615 CBC 16180943 : nthistime = file->nbytes - file->pos;
8579 tgl 616 GIC 16180943 : if (nthistime > size)
8579 tgl 617 CBC 16129816 : nthistime = size;
8579 tgl 618 GIC 16180943 : Assert(nthistime > 0);
619 :
1681 tgl 620 CBC 16180943 : memcpy(ptr, file->buffer.data + file->pos, nthistime);
8579 tgl 621 ECB :
8579 tgl 622 CBC 16180943 : file->pos += nthistime;
100 peter 623 GNC 16180943 : ptr = (char *) ptr + nthistime;
8579 tgl 624 CBC 16180943 : size -= nthistime;
625 16180943 : nread += nthistime;
626 : }
627 :
83 peter 628 GNC 16167292 : if (exact &&
629 1584 : (nread != start_size && !(nread == 0 && eofOK)))
83 peter 630 UNC 0 : ereport(ERROR,
631 : errcode_for_file_access(),
632 : file->name ?
633 : errmsg("could not read from file set \"%s\": read only %zu of %zu bytes",
634 : file->name, nread, start_size) :
635 : errmsg("could not read from temporary file: read only %zu of %zu bytes",
636 : nread, start_size));
637 :
8579 tgl 638 CBC 16167292 : return nread;
8579 tgl 639 ECB : }
640 :
641 : /*
642 : * Legacy interface where the caller needs to check for end of file or short
643 : * reads.
644 : */
645 : size_t
83 peter 646 UNC 0 : BufFileRead(BufFile *file, void *ptr, size_t size)
647 : {
648 0 : return BufFileReadCommon(file, ptr, size, false, false);
649 : }
650 :
651 : /*
652 : * Require read of exactly the specified size.
653 : */
654 : void
83 peter 655 GNC 9988299 : BufFileReadExact(BufFile *file, void *ptr, size_t size)
656 : {
657 9988299 : BufFileReadCommon(file, ptr, size, true, false);
658 9988299 : }
659 :
660 : /*
661 : * Require read of exactly the specified size, but optionally allow end of
662 : * file (in which case 0 is returned).
663 : */
664 : size_t
665 6178993 : BufFileReadMaybeEOF(BufFile *file, void *ptr, size_t size, bool eofOK)
666 : {
667 6178993 : return BufFileReadCommon(file, ptr, size, true, eofOK);
668 : }
669 :
8576 tgl 670 ECB : /*
671 : * BufFileWrite
8579 672 : *
673 : * Like fwrite() except we assume 1-byte element size and report errors via
1027 tmunro 674 : * ereport().
8579 tgl 675 : */
1027 tmunro 676 : void
100 peter 677 GNC 13197936 : BufFileWrite(BufFile *file, const void *ptr, size_t size)
678 : {
679 : size_t nthistime;
8579 tgl 680 ECB :
1955 andres 681 CBC 13197936 : Assert(!file->readOnly);
1955 andres 682 EUB :
8579 tgl 683 GIC 26418619 : while (size > 0)
684 : {
685 13220683 : if (file->pos >= BLCKSZ)
686 : {
687 : /* Buffer full, dump it out */
688 38068 : if (file->dirty)
689 37600 : BufFileDumpBuffer(file);
8579 tgl 690 ECB : else
691 : {
692 : /* Hmm, went directly from reading to writing? */
8579 tgl 693 GIC 468 : file->curOffset += file->pos;
694 468 : file->pos = 0;
695 468 : file->nbytes = 0;
696 : }
697 : }
8579 tgl 698 EUB :
8579 tgl 699 GIC 13220683 : nthistime = BLCKSZ - file->pos;
8579 tgl 700 GBC 13220683 : if (nthistime > size)
8579 tgl 701 GIC 13163779 : nthistime = size;
702 13220683 : Assert(nthistime > 0);
703 :
1681 704 13220683 : memcpy(file->buffer.data + file->pos, ptr, nthistime);
705 :
8579 706 13220683 : file->dirty = true;
8579 tgl 707 CBC 13220683 : file->pos += nthistime;
8579 tgl 708 GIC 13220683 : if (file->nbytes < file->pos)
8579 tgl 709 CBC 13218748 : file->nbytes = file->pos;
100 peter 710 GNC 13220683 : ptr = (const char *) ptr + nthistime;
8579 tgl 711 GIC 13220683 : size -= nthistime;
712 : }
713 13197936 : }
714 :
715 : /*
716 : * BufFileFlush
8579 tgl 717 ECB : *
718 : * Like fflush(), except that I/O errors are reported with ereport().
719 : */
720 : static void
8579 tgl 721 GIC 16201345 : BufFileFlush(BufFile *file)
722 : {
723 16201345 : if (file->dirty)
724 21421 : BufFileDumpBuffer(file);
725 :
1027 tmunro 726 16201345 : Assert(!file->dirty);
8579 tgl 727 16201345 : }
728 :
8576 tgl 729 ECB : /*
730 : * BufFileSeek
731 : *
732 : * Like fseek(), except that target position needs two values in order to
1961 733 : * work when logical filesize exceeds maximum value representable by off_t.
734 : * We do not support relative seeks across more than that, however.
1027 tmunro 735 : * I/O errors are reported by ereport().
736 : *
8576 tgl 737 : * Result is 0 if OK, EOF if not. Logical position is not moved if an
738 : * impossible seek is attempted.
739 : */
8579 740 : int
5508 tgl 741 CBC 58492 : BufFileSeek(BufFile *file, int fileno, off_t offset, int whence)
742 : {
743 : int newFile;
744 : off_t newOffset;
8397 bruce 745 ECB :
8579 tgl 746 CBC 58492 : switch (whence)
8579 tgl 747 ECB : {
8579 tgl 748 GIC 58144 : case SEEK_SET:
8573 749 58144 : if (fileno < 0)
8579 tgl 750 UIC 0 : return EOF;
8579 tgl 751 CBC 58144 : newFile = fileno;
752 58144 : newOffset = offset;
753 58144 : break;
754 15 : case SEEK_CUR:
755 :
8579 tgl 756 ECB : /*
757 : * Relative seek considers only the signed offset, ignoring
1363 michael 758 : * fileno. Note that large offsets (> 1 GB) risk overflow in this
5508 tgl 759 : * add, unless we have 64-bit off_t.
8579 760 : */
8579 tgl 761 CBC 15 : newFile = file->curFile;
762 15 : newOffset = (file->curOffset + file->pos) + offset;
763 15 : break;
8579 tgl 764 GIC 333 : case SEEK_END:
956 akapila 765 ECB :
766 : /*
767 : * The file size of the last file gives us the end offset of that
768 : * file.
769 : */
956 akapila 770 GIC 333 : newFile = file->numFiles - 1;
771 333 : newOffset = FileSize(file->files[file->numFiles - 1]);
772 333 : if (newOffset < 0)
956 akapila 773 LBC 0 : ereport(ERROR,
774 : (errcode_for_file_access(),
956 akapila 775 ECB : errmsg("could not determine size of temporary file \"%s\" from BufFile \"%s\": %m",
776 : FilePathName(file->files[file->numFiles - 1]),
777 : file->name)));
8579 tgl 778 CBC 333 : break;
8579 tgl 779 LBC 0 : default:
7199 tgl 780 UIC 0 : elog(ERROR, "invalid whence: %d", whence);
781 : return EOF;
782 : }
8579 tgl 783 GIC 58492 : while (newOffset < 0)
784 : {
8579 tgl 785 UIC 0 : if (--newFile < 0)
786 0 : return EOF;
787 0 : newOffset += MAX_PHYSICAL_FILESIZE;
788 : }
8579 tgl 789 GIC 58492 : if (newFile == file->curFile &&
790 58421 : newOffset >= file->curOffset &&
791 42591 : newOffset <= file->curOffset + file->nbytes)
792 : {
8579 tgl 793 ECB : /*
794 : * Seek is to a point within existing buffer; we can just adjust
795 : * pos-within-buffer, without flushing buffer. Note this is OK
796 : * whether reading or writing, but buffer remains dirty if we were
797 : * writing.
798 : */
8579 tgl 799 GIC 29681 : file->pos = (int) (newOffset - file->curOffset);
8579 tgl 800 CBC 29681 : return 0;
8579 tgl 801 ECB : }
8579 tgl 802 EUB : /* Otherwise, must reposition buffer, so flush any dirty data */
1027 tmunro 803 CBC 28811 : BufFileFlush(file);
8397 bruce 804 ECB :
8573 tgl 805 : /*
8397 bruce 806 : * At this point and no sooner, check for seek past last segment. The
807 : * above flush could have created a new segment, so checking sooner would
808 : * not work (at least not with this code).
809 : */
810 :
811 : /* convert seek to "start of next seg" to "end of last seg" */
1970 andres 812 GIC 28811 : if (newFile == file->numFiles && newOffset == 0)
8573 tgl 813 ECB : {
1970 andres 814 LBC 0 : newFile--;
815 0 : newOffset = MAX_PHYSICAL_FILESIZE;
1970 andres 816 ECB : }
1970 andres 817 GIC 28811 : while (newOffset > MAX_PHYSICAL_FILESIZE)
818 : {
1970 andres 819 UIC 0 : if (++newFile >= file->numFiles)
820 0 : return EOF;
821 0 : newOffset -= MAX_PHYSICAL_FILESIZE;
8573 tgl 822 ECB : }
8573 tgl 823 CBC 28811 : if (newFile >= file->numFiles)
8573 tgl 824 LBC 0 : return EOF;
8573 tgl 825 EUB : /* Seek is OK! */
8579 tgl 826 GIC 28811 : file->curFile = newFile;
827 28811 : file->curOffset = newOffset;
828 28811 : file->pos = 0;
829 28811 : file->nbytes = 0;
8579 tgl 830 CBC 28811 : return 0;
8579 tgl 831 EUB : }
832 :
833 : void
5508 tgl 834 GIC 88619 : BufFileTell(BufFile *file, int *fileno, off_t *offset)
8579 tgl 835 ECB : {
8579 tgl 836 GIC 88619 : *fileno = file->curFile;
8579 tgl 837 GBC 88619 : *offset = file->curOffset + file->pos;
838 88619 : }
8576 tgl 839 EUB :
840 : /*
8576 tgl 841 ECB : * BufFileSeekBlock --- block-oriented seek
842 : *
843 : * Performs absolute seek to the start of the n'th BLCKSZ-sized block of
844 : * the file. Note that users of this interface will fail if their files
845 : * exceed BLCKSZ * LONG_MAX bytes, but that is quite a lot; we don't work
846 : * with tables bigger than that, either...
847 : *
848 : * Result is 0 if OK, EOF if not. Logical position is not moved if an
849 : * impossible seek is attempted.
850 : */
851 : int
8576 tgl 852 CBC 56461 : BufFileSeekBlock(BufFile *file, long blknum)
853 : {
8576 tgl 854 GIC 112922 : return BufFileSeek(file,
5508 tgl 855 CBC 56461 : (int) (blknum / BUFFILE_SEG_SIZE),
5508 tgl 856 GIC 56461 : (off_t) (blknum % BUFFILE_SEG_SIZE) * BLCKSZ,
857 : SEEK_SET);
858 : }
859 :
860 : #ifdef NOT_USED
861 : /*
862 : * BufFileTellBlock --- block-oriented tell
863 : *
8576 tgl 864 ECB : * Any fractional part of a block in the current seek position is ignored.
865 : */
8576 tgl 866 EUB : long
867 : BufFileTellBlock(BufFile *file)
868 : {
8397 bruce 869 ECB : long blknum;
870 :
8576 tgl 871 EUB : blknum = (file->curOffset + file->pos) / BLCKSZ;
5508 872 : blknum += file->curFile * BUFFILE_SEG_SIZE;
8576 873 : return blknum;
874 : }
7833 bruce 875 ECB :
8053 bruce 876 EUB : #endif
877 :
1892 rhaas 878 ECB : /*
587 akapila 879 : * Return the current fileset based BufFile size.
1803 heikki.linnakangas 880 : *
881 : * Counts any holes left behind by BufFileAppend as part of the size.
1593 pg 882 : * ereport()s on failure.
883 : */
884 : int64
1892 rhaas 885 GIC 142 : BufFileSize(BufFile *file)
1892 rhaas 886 ECB : {
887 : int64 lastFileSize;
1803 heikki.linnakangas 888 :
1593 pg 889 CBC 142 : Assert(file->fileset != NULL);
1593 pg 890 ECB :
891 : /* Get the size of the last physical file. */
1614 tmunro 892 GIC 142 : lastFileSize = FileSize(file->files[file->numFiles - 1]);
1803 heikki.linnakangas 893 142 : if (lastFileSize < 0)
1593 pg 894 UIC 0 : ereport(ERROR,
895 : (errcode_for_file_access(),
896 : errmsg("could not determine size of temporary file \"%s\" from BufFile \"%s\": %m",
897 : FilePathName(file->files[file->numFiles - 1]),
898 : file->name)));
899 :
1606 tmunro 900 GIC 142 : return ((file->numFiles - 1) * (int64) MAX_PHYSICAL_FILESIZE) +
901 : lastFileSize;
902 : }
903 :
1892 rhaas 904 ECB : /*
905 : * Append the contents of source file (managed within fileset) to
587 akapila 906 : * end of target file (managed within same fileset).
1892 rhaas 907 : *
908 : * Note that operation subsumes ownership of underlying resources from
909 : * "source". Caller should never call BufFileClose against source having
910 : * called here first. Resource owners for source and target must match,
911 : * too.
912 : *
913 : * This operation works by manipulating lists of segment files, so the
914 : * file content is always appended at a MAX_PHYSICAL_FILESIZE-aligned
915 : * boundary, typically creating empty holes before the boundary. These
916 : * areas do not contain any interesting data, and cannot be read from by
917 : * caller.
918 : *
919 : * Returns the block number within target where the contents of source
920 : * begins. Caller should apply this as an offset when working off block
921 : * positions that are in terms of the original BufFile space.
922 : */
923 : long
1892 rhaas 924 GIC 71 : BufFileAppend(BufFile *target, BufFile *source)
925 : {
926 71 : long startBlock = target->numFiles * BUFFILE_SEG_SIZE;
927 71 : int newNumFiles = target->numFiles + source->numFiles;
928 : int i;
929 :
930 71 : Assert(target->fileset != NULL);
931 71 : Assert(source->readOnly);
932 71 : Assert(!source->dirty);
933 71 : Assert(source->fileset != NULL);
934 :
935 71 : if (target->resowner != source->resowner)
1892 rhaas 936 UIC 0 : elog(ERROR, "could not append BufFile with non-matching resource owner");
1892 rhaas 937 ECB :
1892 rhaas 938 GIC 71 : target->files = (File *)
939 71 : repalloc(target->files, sizeof(File) * newNumFiles);
940 142 : for (i = target->numFiles; i < newNumFiles; i++)
1892 rhaas 941 CBC 71 : target->files[i] = source->files[i - target->numFiles];
1892 rhaas 942 GIC 71 : target->numFiles = newNumFiles;
943 :
1892 rhaas 944 CBC 71 : return startBlock;
1892 rhaas 945 ECB : }
956 akapila 946 EUB :
947 : /*
948 : * Truncate a BufFile created by BufFileCreateFileSet up to the given fileno
949 : * and the offset.
950 : */
951 : void
587 akapila 952 CBC 9 : BufFileTruncateFileSet(BufFile *file, int fileno, off_t offset)
953 : {
956 akapila 954 GIC 9 : int numFiles = file->numFiles;
955 9 : int newFile = fileno;
956 9 : off_t newOffset = file->curOffset;
957 : char segment_name[MAXPGPATH];
958 : int i;
959 :
960 : /*
961 : * Loop over all the files up to the given fileno and remove the files
962 : * that are greater than the fileno and truncate the given file up to the
963 : * offset. Note that we also remove the given fileno if the offset is 0
964 : * provided it is not the first file in which we truncate it.
965 : */
966 18 : for (i = file->numFiles - 1; i >= fileno; i--)
967 : {
968 9 : if ((i != fileno || offset == 0) && i != 0)
969 : {
587 akapila 970 UIC 0 : FileSetSegmentName(segment_name, file->name, i);
956 971 0 : FileClose(file->files[i]);
587 972 0 : if (!FileSetDelete(file->fileset, segment_name, true))
956 973 0 : ereport(ERROR,
974 : (errcode_for_file_access(),
975 : errmsg("could not delete fileset \"%s\": %m",
956 akapila 976 ECB : segment_name)));
956 akapila 977 UIC 0 : numFiles--;
956 akapila 978 LBC 0 : newOffset = MAX_PHYSICAL_FILESIZE;
956 akapila 979 ECB :
980 : /*
981 : * This is required to indicate that we have deleted the given
982 : * fileno.
983 : */
956 akapila 984 LBC 0 : if (i == fileno)
985 0 : newFile--;
986 : }
956 akapila 987 ECB : else
956 akapila 988 EUB : {
956 akapila 989 GIC 9 : if (FileTruncate(file->files[i], offset,
956 akapila 990 ECB : WAIT_EVENT_BUFFILE_TRUNCATE) < 0)
956 akapila 991 LBC 0 : ereport(ERROR,
956 akapila 992 ECB : (errcode_for_file_access(),
993 : errmsg("could not truncate file \"%s\": %m",
994 : FilePathName(file->files[i]))));
956 akapila 995 GIC 9 : newOffset = offset;
956 akapila 996 ECB : }
997 : }
998 :
956 akapila 999 GIC 9 : file->numFiles = numFiles;
1000 :
1001 : /*
1002 : * If the truncate point is within existing buffer then we can just adjust
1003 : * pos within buffer.
956 akapila 1004 ECB : */
956 akapila 1005 GIC 9 : if (newFile == file->curFile &&
956 akapila 1006 CBC 9 : newOffset >= file->curOffset &&
1007 9 : newOffset <= file->curOffset + file->nbytes)
956 akapila 1008 ECB : {
1009 : /* No need to reset the current pos if the new pos is greater. */
956 akapila 1010 UIC 0 : if (newOffset <= file->curOffset + file->pos)
1011 0 : file->pos = (int) (newOffset - file->curOffset);
1012 :
1013 : /* Adjust the nbytes for the current buffer. */
1014 0 : file->nbytes = (int) (newOffset - file->curOffset);
1015 : }
956 akapila 1016 GIC 9 : else if (newFile == file->curFile &&
1017 9 : newOffset < file->curOffset)
956 akapila 1018 ECB : {
1019 : /*
1020 : * The truncate point is within the existing file but prior to the
1021 : * current position, so we can forget the current buffer and reset the
956 akapila 1022 EUB : * current position.
1023 : */
956 akapila 1024 UBC 0 : file->curOffset = newOffset;
1025 0 : file->pos = 0;
956 akapila 1026 UIC 0 : file->nbytes = 0;
1027 : }
956 akapila 1028 GIC 9 : else if (newFile < file->curFile)
956 akapila 1029 EUB : {
1030 : /*
1031 : * The truncate point is prior to the current file, so need to reset
1032 : * the current position accordingly.
1033 : */
956 akapila 1034 UIC 0 : file->curFile = newFile;
1035 0 : file->curOffset = newOffset;
956 akapila 1036 UBC 0 : file->pos = 0;
1037 0 : file->nbytes = 0;
1038 : }
1039 : /* Nothing to do, if the truncate point is beyond current file. */
956 akapila 1040 GIC 9 : }
|