Age Owner Branch data TLA Line data Source code
1 : : /*-------------------------------------------------------------------------
2 : : *
3 : : * tuplestore.c
4 : : * Generalized routines for temporary tuple storage.
5 : : *
6 : : * This module handles temporary storage of tuples for purposes such
7 : : * as Materialize nodes, hashjoin batch files, etc. It is essentially
8 : : * a dumbed-down version of tuplesort.c; it does no sorting of tuples
9 : : * but can only store and regurgitate a sequence of tuples. However,
10 : : * because no sort is required, it is allowed to start reading the sequence
11 : : * before it has all been written. This is particularly useful for cursors,
12 : : * because it allows random access within the already-scanned portion of
13 : : * a query without having to process the underlying scan to completion.
14 : : * Also, it is possible to support multiple independent read pointers.
15 : : *
16 : : * A temporary file is used to handle the data if it exceeds the
17 : : * space limit specified by the caller.
18 : : *
19 : : * The (approximate) amount of memory allowed to the tuplestore is specified
20 : : * in kilobytes by the caller. We absorb tuples and simply store them in an
21 : : * in-memory array as long as we haven't exceeded maxKBytes. If we do exceed
22 : : * maxKBytes, we dump all the tuples into a temp file and then read from that
23 : : * when needed.
24 : : *
25 : : * Upon creation, a tuplestore supports a single read pointer, numbered 0.
26 : : * Additional read pointers can be created using tuplestore_alloc_read_pointer.
27 : : * Mark/restore behavior is supported by copying read pointers.
28 : : *
29 : : * When the caller requests backward-scan capability, we write the temp file
30 : : * in a format that allows either forward or backward scan. Otherwise, only
31 : : * forward scan is allowed. A request for backward scan must be made before
32 : : * putting any tuples into the tuplestore. Rewind is normally allowed but
33 : : * can be turned off via tuplestore_set_eflags; turning off rewind for all
34 : : * read pointers enables truncation of the tuplestore at the oldest read point
35 : : * for minimal memory usage. (The caller must explicitly call tuplestore_trim
36 : : * at appropriate times for truncation to actually happen.)
37 : : *
38 : : * Note: in TSS_WRITEFILE state, the temp file's seek position is the
39 : : * current write position, and the write-position variables in the tuplestore
40 : : * aren't kept up to date. Similarly, in TSS_READFILE state the temp file's
41 : : * seek position is the active read pointer's position, and that read pointer
42 : : * isn't kept up to date. We update the appropriate variables using ftell()
43 : : * before switching to the other state or activating a different read pointer.
44 : : *
45 : : *
46 : : * Portions Copyright (c) 1996-2024, PostgreSQL Global Development Group
47 : : * Portions Copyright (c) 1994, Regents of the University of California
48 : : *
49 : : * IDENTIFICATION
50 : : * src/backend/utils/sort/tuplestore.c
51 : : *
52 : : *-------------------------------------------------------------------------
53 : : */
54 : :
55 : : #include "postgres.h"
56 : :
57 : : #include <limits.h>
58 : :
59 : : #include "access/htup_details.h"
60 : : #include "commands/tablespace.h"
61 : : #include "executor/executor.h"
62 : : #include "miscadmin.h"
63 : : #include "storage/buffile.h"
64 : : #include "utils/memutils.h"
65 : : #include "utils/resowner.h"
66 : :
67 : :
68 : : /*
69 : : * Possible states of a Tuplestore object. These denote the states that
70 : : * persist between calls of Tuplestore routines.
71 : : */
72 : : typedef enum
73 : : {
74 : : TSS_INMEM, /* Tuples still fit in memory */
75 : : TSS_WRITEFILE, /* Writing to temp file */
76 : : TSS_READFILE, /* Reading from temp file */
77 : : } TupStoreStatus;
78 : :
79 : : /*
80 : : * State for a single read pointer. If we are in state INMEM then all the
81 : : * read pointers' "current" fields denote the read positions. In state
82 : : * WRITEFILE, the file/offset fields denote the read positions. In state
83 : : * READFILE, inactive read pointers have valid file/offset, but the active
84 : : * read pointer implicitly has position equal to the temp file's seek position.
85 : : *
86 : : * Special case: if eof_reached is true, then the pointer's read position is
87 : : * implicitly equal to the write position, and current/file/offset aren't
88 : : * maintained. This way we need not update all the read pointers each time
89 : : * we write.
90 : : */
91 : : typedef struct
92 : : {
93 : : int eflags; /* capability flags */
94 : : bool eof_reached; /* read has reached EOF */
95 : : int current; /* next array index to read */
96 : : int file; /* temp file# */
97 : : off_t offset; /* byte offset in file */
98 : : } TSReadPointer;
99 : :
100 : : /*
101 : : * Private state of a Tuplestore operation.
102 : : */
103 : : struct Tuplestorestate
104 : : {
105 : : TupStoreStatus status; /* enumerated value as shown above */
106 : : int eflags; /* capability flags (OR of pointers' flags) */
107 : : bool backward; /* store extra length words in file? */
108 : : bool interXact; /* keep open through transactions? */
109 : : bool truncated; /* tuplestore_trim has removed tuples? */
110 : : int64 availMem; /* remaining memory available, in bytes */
111 : : int64 allowedMem; /* total memory allowed, in bytes */
112 : : int64 tuples; /* number of tuples added */
113 : : BufFile *myfile; /* underlying file, or NULL if none */
114 : : MemoryContext context; /* memory context for holding tuples */
115 : : ResourceOwner resowner; /* resowner for holding temp files */
116 : :
117 : : /*
118 : : * These function pointers decouple the routines that must know what kind
119 : : * of tuple we are handling from the routines that don't need to know it.
120 : : * They are set up by the tuplestore_begin_xxx routines.
121 : : *
122 : : * (Although tuplestore.c currently only supports heap tuples, I've copied
123 : : * this part of tuplesort.c so that extension to other kinds of objects
124 : : * will be easy if it's ever needed.)
125 : : *
126 : : * Function to copy a supplied input tuple into palloc'd space. (NB: we
127 : : * assume that a single pfree() is enough to release the tuple later, so
128 : : * the representation must be "flat" in one palloc chunk.) state->availMem
129 : : * must be decreased by the amount of space used.
130 : : */
131 : : void *(*copytup) (Tuplestorestate *state, void *tup);
132 : :
133 : : /*
134 : : * Function to write a stored tuple onto tape. The representation of the
135 : : * tuple on tape need not be the same as it is in memory; requirements on
136 : : * the tape representation are given below. After writing the tuple,
137 : : * pfree() it, and increase state->availMem by the amount of memory space
138 : : * thereby released.
139 : : */
140 : : void (*writetup) (Tuplestorestate *state, void *tup);
141 : :
142 : : /*
143 : : * Function to read a stored tuple from tape back into memory. 'len' is
144 : : * the already-read length of the stored tuple. Create and return a
145 : : * palloc'd copy, and decrease state->availMem by the amount of memory
146 : : * space consumed.
147 : : */
148 : : void *(*readtup) (Tuplestorestate *state, unsigned int len);
149 : :
150 : : /*
151 : : * This array holds pointers to tuples in memory if we are in state INMEM.
152 : : * In states WRITEFILE and READFILE it's not used.
153 : : *
154 : : * When memtupdeleted > 0, the first memtupdeleted pointers are already
155 : : * released due to a tuplestore_trim() operation, but we haven't expended
156 : : * the effort to slide the remaining pointers down. These unused pointers
157 : : * are set to NULL to catch any invalid accesses. Note that memtupcount
158 : : * includes the deleted pointers.
159 : : */
160 : : void **memtuples; /* array of pointers to palloc'd tuples */
161 : : int memtupdeleted; /* the first N slots are currently unused */
162 : : int memtupcount; /* number of tuples currently present */
163 : : int memtupsize; /* allocated length of memtuples array */
164 : : bool growmemtuples; /* memtuples' growth still underway? */
165 : :
166 : : /*
167 : : * These variables are used to keep track of the current positions.
168 : : *
169 : : * In state WRITEFILE, the current file seek position is the write point;
170 : : * in state READFILE, the write position is remembered in writepos_xxx.
171 : : * (The write position is the same as EOF, but since BufFileSeek doesn't
172 : : * currently implement SEEK_END, we have to remember it explicitly.)
173 : : */
174 : : TSReadPointer *readptrs; /* array of read pointers */
175 : : int activeptr; /* index of the active read pointer */
176 : : int readptrcount; /* number of pointers currently valid */
177 : : int readptrsize; /* allocated length of readptrs array */
178 : :
179 : : int writepos_file; /* file# (valid if READFILE state) */
180 : : off_t writepos_offset; /* offset (valid if READFILE state) */
181 : : };
182 : :
183 : : #define COPYTUP(state,tup) ((*(state)->copytup) (state, tup))
184 : : #define WRITETUP(state,tup) ((*(state)->writetup) (state, tup))
185 : : #define READTUP(state,len) ((*(state)->readtup) (state, len))
186 : : #define LACKMEM(state) ((state)->availMem < 0)
187 : : #define USEMEM(state,amt) ((state)->availMem -= (amt))
188 : : #define FREEMEM(state,amt) ((state)->availMem += (amt))
189 : :
190 : : /*--------------------
191 : : *
192 : : * NOTES about on-tape representation of tuples:
193 : : *
194 : : * We require the first "unsigned int" of a stored tuple to be the total size
195 : : * on-tape of the tuple, including itself (so it is never zero).
196 : : * The remainder of the stored tuple
197 : : * may or may not match the in-memory representation of the tuple ---
198 : : * any conversion needed is the job of the writetup and readtup routines.
199 : : *
200 : : * If state->backward is true, then the stored representation of
201 : : * the tuple must be followed by another "unsigned int" that is a copy of the
202 : : * length --- so the total tape space used is actually sizeof(unsigned int)
203 : : * more than the stored length value. This allows read-backwards. When
204 : : * state->backward is not set, the write/read routines may omit the extra
205 : : * length word.
206 : : *
207 : : * writetup is expected to write both length words as well as the tuple
208 : : * data. When readtup is called, the tape is positioned just after the
209 : : * front length word; readtup must read the tuple data and advance past
210 : : * the back length word (if present).
211 : : *
212 : : * The write/read routines can make use of the tuple description data
213 : : * stored in the Tuplestorestate record, if needed. They are also expected
214 : : * to adjust state->availMem by the amount of memory space (not tape space!)
215 : : * released or consumed. There is no error return from either writetup
216 : : * or readtup; they should ereport() on failure.
217 : : *
218 : : *
219 : : * NOTES about memory consumption calculations:
220 : : *
221 : : * We count space allocated for tuples against the maxKBytes limit,
222 : : * plus the space used by the variable-size array memtuples.
223 : : * Fixed-size space (primarily the BufFile I/O buffer) is not counted.
224 : : * We don't worry about the size of the read pointer array, either.
225 : : *
226 : : * Note that we count actual space used (as shown by GetMemoryChunkSpace)
227 : : * rather than the originally-requested size. This is important since
228 : : * palloc can add substantial overhead. It's not a complete answer since
229 : : * we won't count any wasted space in palloc allocation blocks, but it's
230 : : * a lot better than what we were doing before 7.3.
231 : : *
232 : : *--------------------
233 : : */
234 : :
235 : :
236 : : static Tuplestorestate *tuplestore_begin_common(int eflags,
237 : : bool interXact,
238 : : int maxKBytes);
239 : : static void tuplestore_puttuple_common(Tuplestorestate *state, void *tuple);
240 : : static void dumptuples(Tuplestorestate *state);
241 : : static unsigned int getlen(Tuplestorestate *state, bool eofOK);
242 : : static void *copytup_heap(Tuplestorestate *state, void *tup);
243 : : static void writetup_heap(Tuplestorestate *state, void *tup);
244 : : static void *readtup_heap(Tuplestorestate *state, unsigned int len);
245 : :
246 : :
247 : : /*
248 : : * tuplestore_begin_xxx
249 : : *
250 : : * Initialize for a tuple store operation.
251 : : */
252 : : static Tuplestorestate *
6173 tgl@sss.pgh.pa.us 253 :CBC 123887 : tuplestore_begin_common(int eflags, bool interXact, int maxKBytes)
254 : : {
255 : : Tuplestorestate *state;
256 : :
7823 bruce@momjian.us 257 : 123887 : state = (Tuplestorestate *) palloc0(sizeof(Tuplestorestate));
258 : :
7707 tgl@sss.pgh.pa.us 259 : 123887 : state->status = TSS_INMEM;
6173 260 : 123887 : state->eflags = eflags;
7656 261 : 123887 : state->interXact = interXact;
5587 262 : 123887 : state->truncated = false;
4105 263 : 123887 : state->allowedMem = maxKBytes * 1024L;
264 : 123887 : state->availMem = state->allowedMem;
8701 265 : 123887 : state->myfile = NULL;
5220 heikki.linnakangas@i 266 : 123887 : state->context = CurrentMemoryContext;
267 : 123887 : state->resowner = CurrentResourceOwner;
268 : :
4874 tgl@sss.pgh.pa.us 269 : 123887 : state->memtupdeleted = 0;
8701 270 : 123887 : state->memtupcount = 0;
2571 kgrittn@postgresql.o 271 : 123887 : state->tuples = 0;
272 : :
273 : : /*
274 : : * Initial size of array must be more than ALLOCSET_SEPARATE_THRESHOLD;
275 : : * see comments in grow_memtuples().
276 : : */
3176 tgl@sss.pgh.pa.us 277 : 123887 : state->memtupsize = Max(16384 / sizeof(void *),
278 : : ALLOCSET_SEPARATE_THRESHOLD / sizeof(void *) + 1);
279 : :
4105 280 : 123887 : state->growmemtuples = true;
8701 281 : 123887 : state->memtuples = (void **) palloc(state->memtupsize * sizeof(void *));
282 : :
7916 283 : 123887 : USEMEM(state, GetMemoryChunkSpace(state->memtuples));
284 : :
5674 285 : 123887 : state->activeptr = 0;
286 : 123887 : state->readptrcount = 1;
287 : 123887 : state->readptrsize = 8; /* arbitrary */
288 : 123887 : state->readptrs = (TSReadPointer *)
289 : 123887 : palloc(state->readptrsize * sizeof(TSReadPointer));
290 : :
291 : 123887 : state->readptrs[0].eflags = eflags;
292 : 123887 : state->readptrs[0].eof_reached = false;
293 : 123887 : state->readptrs[0].current = 0;
294 : :
8701 295 : 123887 : return state;
296 : : }
297 : :
298 : : /*
299 : : * tuplestore_begin_heap
300 : : *
301 : : * Create a new tuplestore; other types of tuple stores (other than
302 : : * "heap" tuple stores, for heap tuples) are possible, but not presently
303 : : * implemented.
304 : : *
305 : : * randomAccess: if true, both forward and backward accesses to the
306 : : * tuple store are allowed.
307 : : *
308 : : * interXact: if true, the files used for on-disk storage persist beyond the
309 : : * end of the current transaction. NOTE: It's the caller's responsibility to
310 : : * create such a tuplestore in a memory context and resource owner that will
311 : : * also survive transaction boundaries, and to ensure the tuplestore is closed
312 : : * when it's no longer wanted.
313 : : *
314 : : * maxKBytes: how much data to store in memory (any data beyond this
315 : : * amount is paged to disk). When in doubt, use work_mem.
316 : : */
317 : : Tuplestorestate *
7656 318 : 123887 : tuplestore_begin_heap(bool randomAccess, bool interXact, int maxKBytes)
319 : : {
320 : : Tuplestorestate *state;
321 : : int eflags;
322 : :
323 : : /*
324 : : * This interpretation of the meaning of randomAccess is compatible with
325 : : * the pre-8.3 behavior of tuplestores.
326 : : */
6173 327 : 123887 : eflags = randomAccess ?
5674 328 [ + + ]: 123887 : (EXEC_FLAG_BACKWARD | EXEC_FLAG_REWIND) :
329 : : (EXEC_FLAG_REWIND);
330 : :
6173 331 : 123887 : state = tuplestore_begin_common(eflags, interXact, maxKBytes);
332 : :
8701 333 : 123887 : state->copytup = copytup_heap;
334 : 123887 : state->writetup = writetup_heap;
335 : 123887 : state->readtup = readtup_heap;
336 : :
337 : 123887 : return state;
338 : : }
339 : :
340 : : /*
341 : : * tuplestore_set_eflags
342 : : *
343 : : * Set the capability flags for read pointer 0 at a finer grain than is
344 : : * allowed by tuplestore_begin_xxx. This must be called before inserting
345 : : * any data into the tuplestore.
346 : : *
347 : : * eflags is a bitmask following the meanings used for executor node
348 : : * startup flags (see executor.h). tuplestore pays attention to these bits:
349 : : * EXEC_FLAG_REWIND need rewind to start
350 : : * EXEC_FLAG_BACKWARD need backward fetch
351 : : * If tuplestore_set_eflags is not called, REWIND is allowed, and BACKWARD
352 : : * is set per "randomAccess" in the tuplestore_begin_xxx call.
353 : : *
354 : : * NOTE: setting BACKWARD without REWIND means the pointer can read backwards,
355 : : * but not further than the truncation point (the furthest-back read pointer
356 : : * position at the time of the last tuplestore_trim call).
357 : : */
358 : : void
6173 359 : 3990 : tuplestore_set_eflags(Tuplestorestate *state, int eflags)
360 : : {
361 : : int i;
362 : :
5674 363 [ + - - + ]: 3990 : if (state->status != TSS_INMEM || state->memtupcount != 0)
5674 tgl@sss.pgh.pa.us 364 [ # # ]:UBC 0 : elog(ERROR, "too late to call tuplestore_set_eflags");
365 : :
5674 tgl@sss.pgh.pa.us 366 :CBC 3990 : state->readptrs[0].eflags = eflags;
367 [ - + ]: 3990 : for (i = 1; i < state->readptrcount; i++)
5674 tgl@sss.pgh.pa.us 368 :UBC 0 : eflags |= state->readptrs[i].eflags;
6173 tgl@sss.pgh.pa.us 369 :CBC 3990 : state->eflags = eflags;
370 : 3990 : }
371 : :
372 : : /*
373 : : * tuplestore_alloc_read_pointer - allocate another read pointer.
374 : : *
375 : : * Returns the pointer's index.
376 : : *
377 : : * The new pointer initially copies the position of read pointer 0.
378 : : * It can have its own eflags, but if any data has been inserted into
379 : : * the tuplestore, these eflags must not represent an increase in
380 : : * requirements.
381 : : */
382 : : int
5674 383 : 6121 : tuplestore_alloc_read_pointer(Tuplestorestate *state, int eflags)
384 : : {
385 : : /* Check for possible increase of requirements */
386 [ + - + + ]: 6121 : if (state->status != TSS_INMEM || state->memtupcount != 0)
387 : : {
388 [ - + ]: 307 : if ((state->eflags | eflags) != state->eflags)
5674 tgl@sss.pgh.pa.us 389 [ # # ]:UBC 0 : elog(ERROR, "too late to require new tuplestore eflags");
390 : : }
391 : :
392 : : /* Make room for another read pointer if needed */
5674 tgl@sss.pgh.pa.us 393 [ + + ]:CBC 6121 : if (state->readptrcount >= state->readptrsize)
394 : : {
5421 bruce@momjian.us 395 : 6 : int newcnt = state->readptrsize * 2;
396 : :
5674 tgl@sss.pgh.pa.us 397 : 6 : state->readptrs = (TSReadPointer *)
398 : 6 : repalloc(state->readptrs, newcnt * sizeof(TSReadPointer));
399 : 6 : state->readptrsize = newcnt;
400 : : }
401 : :
402 : : /* And set it up */
403 : 6121 : state->readptrs[state->readptrcount] = state->readptrs[0];
404 : 6121 : state->readptrs[state->readptrcount].eflags = eflags;
405 : :
406 : 6121 : state->eflags |= eflags;
407 : :
408 : 6121 : return state->readptrcount++;
409 : : }
410 : :
411 : : /*
412 : : * tuplestore_clear
413 : : *
414 : : * Delete all the contents of a tuplestore, and reset its read pointers
415 : : * to the start.
416 : : */
417 : : void
5671 418 : 58514 : tuplestore_clear(Tuplestorestate *state)
419 : : {
420 : : int i;
421 : : TSReadPointer *readptr;
422 : :
423 [ - + ]: 58514 : if (state->myfile)
5671 tgl@sss.pgh.pa.us 424 :UBC 0 : BufFileClose(state->myfile);
5671 tgl@sss.pgh.pa.us 425 :CBC 58514 : state->myfile = NULL;
426 [ + - ]: 58514 : if (state->memtuples)
427 : : {
4874 428 [ + + ]: 122816 : for (i = state->memtupdeleted; i < state->memtupcount; i++)
429 : : {
5671 430 : 64302 : FREEMEM(state, GetMemoryChunkSpace(state->memtuples[i]));
431 : 64302 : pfree(state->memtuples[i]);
432 : : }
433 : : }
434 : 58514 : state->status = TSS_INMEM;
5587 435 : 58514 : state->truncated = false;
4874 436 : 58514 : state->memtupdeleted = 0;
5671 437 : 58514 : state->memtupcount = 0;
2571 kgrittn@postgresql.o 438 : 58514 : state->tuples = 0;
5671 tgl@sss.pgh.pa.us 439 : 58514 : readptr = state->readptrs;
440 [ + + ]: 117370 : for (i = 0; i < state->readptrcount; readptr++, i++)
441 : : {
442 : 58856 : readptr->eof_reached = false;
443 : 58856 : readptr->current = 0;
444 : : }
445 : 58514 : }
446 : :
447 : : /*
448 : : * tuplestore_end
449 : : *
450 : : * Release resources and clean up.
451 : : */
452 : : void
8701 453 : 105818 : tuplestore_end(Tuplestorestate *state)
454 : : {
455 : : int i;
456 : :
457 [ + + ]: 105818 : if (state->myfile)
458 : 58 : BufFileClose(state->myfile);
459 [ + - ]: 105818 : if (state->memtuples)
460 : : {
4874 461 [ + + ]: 4753247 : for (i = state->memtupdeleted; i < state->memtupcount; i++)
8701 462 : 4647429 : pfree(state->memtuples[i]);
463 : 105818 : pfree(state->memtuples);
464 : : }
5674 465 : 105818 : pfree(state->readptrs);
6100 neilc@samurai.com 466 : 105818 : pfree(state);
8701 tgl@sss.pgh.pa.us 467 : 105818 : }
468 : :
469 : : /*
470 : : * tuplestore_select_read_pointer - make the specified read pointer active
471 : : */
472 : : void
5674 473 : 1904744 : tuplestore_select_read_pointer(Tuplestorestate *state, int ptr)
474 : : {
475 : : TSReadPointer *readptr;
476 : : TSReadPointer *oldptr;
477 : :
478 [ + - - + ]: 1904744 : Assert(ptr >= 0 && ptr < state->readptrcount);
479 : :
480 : : /* No work if already active */
481 [ + + ]: 1904744 : if (ptr == state->activeptr)
482 : 424115 : return;
483 : :
5668 484 : 1480629 : readptr = &state->readptrs[ptr];
485 : 1480629 : oldptr = &state->readptrs[state->activeptr];
486 : :
5674 487 [ + - - ]: 1480629 : switch (state->status)
488 : : {
489 : 1480629 : case TSS_INMEM:
490 : : case TSS_WRITEFILE:
491 : : /* no work */
492 : 1480629 : break;
5674 tgl@sss.pgh.pa.us 493 :UBC 0 : case TSS_READFILE:
494 : :
495 : : /*
496 : : * First, save the current read position in the pointer about to
497 : : * become inactive.
498 : : */
5668 499 [ # # ]: 0 : if (!oldptr->eof_reached)
500 : 0 : BufFileTell(state->myfile,
501 : : &oldptr->file,
502 : : &oldptr->offset);
503 : :
504 : : /*
505 : : * We have to make the temp file's seek position equal to the
506 : : * logical position of the new read pointer. In eof_reached
507 : : * state, that's the EOF, which we have available from the saved
508 : : * write position.
509 : : */
5674 510 [ # # ]: 0 : if (readptr->eof_reached)
511 : : {
512 [ # # ]: 0 : if (BufFileSeek(state->myfile,
513 : : state->writepos_file,
514 : : state->writepos_offset,
515 : : SEEK_SET) != 0)
3594 516 [ # # ]: 0 : ereport(ERROR,
517 : : (errcode_for_file_access(),
518 : : errmsg("could not seek in tuplestore temporary file")));
519 : : }
520 : : else
521 : : {
5674 522 [ # # ]: 0 : if (BufFileSeek(state->myfile,
523 : : readptr->file,
524 : : readptr->offset,
525 : : SEEK_SET) != 0)
3594 526 [ # # ]: 0 : ereport(ERROR,
527 : : (errcode_for_file_access(),
528 : : errmsg("could not seek in tuplestore temporary file")));
529 : : }
5674 530 : 0 : break;
531 : 0 : default:
532 [ # # ]: 0 : elog(ERROR, "invalid tuplestore state");
533 : : break;
534 : : }
535 : :
5674 tgl@sss.pgh.pa.us 536 :CBC 1480629 : state->activeptr = ptr;
537 : : }
538 : :
539 : : /*
540 : : * tuplestore_tuple_count
541 : : *
542 : : * Returns the number of tuples added since creation or the last
543 : : * tuplestore_clear().
544 : : */
545 : : int64
2571 kgrittn@postgresql.o 546 : 3170 : tuplestore_tuple_count(Tuplestorestate *state)
547 : : {
548 : 3170 : return state->tuples;
549 : : }
550 : :
551 : : /*
552 : : * tuplestore_ateof
553 : : *
554 : : * Returns the active read pointer's eof_reached state.
555 : : */
556 : : bool
7707 tgl@sss.pgh.pa.us 557 : 1197823 : tuplestore_ateof(Tuplestorestate *state)
558 : : {
5674 559 : 1197823 : return state->readptrs[state->activeptr].eof_reached;
560 : : }
561 : :
562 : : /*
563 : : * Grow the memtuples[] array, if possible within our memory constraint. We
564 : : * must not exceed INT_MAX tuples in memory or the caller-provided memory
565 : : * limit. Return true if we were able to enlarge the array, false if not.
566 : : *
567 : : * Normally, at each increment we double the size of the array. When doing
568 : : * that would exceed a limit, we attempt one last, smaller increase (and then
569 : : * clear the growmemtuples flag so we don't try any more). That allows us to
570 : : * use memory as fully as permitted; sticking to the pure doubling rule could
571 : : * result in almost half going unused. Because availMem moves around with
572 : : * tuple addition/removal, we need some rule to prevent making repeated small
573 : : * increases in memtupsize, which would just be useless thrashing. The
574 : : * growmemtuples flag accomplishes that and also prevents useless
575 : : * recalculations in this function.
576 : : */
577 : : static bool
4105 578 : 956 : grow_memtuples(Tuplestorestate *state)
579 : : {
580 : : int newmemtupsize;
581 : 956 : int memtupsize = state->memtupsize;
3937 noah@leadboat.com 582 : 956 : int64 memNowUsed = state->allowedMem - state->availMem;
583 : :
584 : : /* Forget it if we've already maxed out memtuples, per comment above */
4105 tgl@sss.pgh.pa.us 585 [ + + ]: 956 : if (!state->growmemtuples)
586 : 27 : return false;
587 : :
588 : : /* Select new value of memtupsize */
589 [ + + ]: 929 : if (memNowUsed <= state->availMem)
590 : : {
591 : : /*
592 : : * We've used no more than half of allowedMem; double our usage,
593 : : * clamping at INT_MAX tuples.
594 : : */
3944 noah@leadboat.com 595 [ + - ]: 895 : if (memtupsize < INT_MAX / 2)
596 : 895 : newmemtupsize = memtupsize * 2;
597 : : else
598 : : {
3944 noah@leadboat.com 599 :UBC 0 : newmemtupsize = INT_MAX;
600 : 0 : state->growmemtuples = false;
601 : : }
602 : : }
603 : : else
604 : : {
605 : : /*
606 : : * This will be the last increment of memtupsize. Abandon doubling
607 : : * strategy and instead increase as much as we safely can.
608 : : *
609 : : * To stay within allowedMem, we can't increase memtupsize by more
610 : : * than availMem / sizeof(void *) elements. In practice, we want to
611 : : * increase it by considerably less, because we need to leave some
612 : : * space for the tuples to which the new array slots will refer. We
613 : : * assume the new tuples will be about the same size as the tuples
614 : : * we've already seen, and thus we can extrapolate from the space
615 : : * consumption so far to estimate an appropriate new size for the
616 : : * memtuples array. The optimal value might be higher or lower than
617 : : * this estimate, but it's hard to know that in advance. We again
618 : : * clamp at INT_MAX tuples.
619 : : *
620 : : * This calculation is safe against enlarging the array so much that
621 : : * LACKMEM becomes true, because the memory currently used includes
622 : : * the present array; thus, there would be enough allowedMem for the
623 : : * new array elements even if no other memory were currently used.
624 : : *
625 : : * We do the arithmetic in float8, because otherwise the product of
626 : : * memtupsize and allowedMem could overflow. Any inaccuracy in the
627 : : * result should be insignificant; but even if we computed a
628 : : * completely insane result, the checks below will prevent anything
629 : : * really bad from happening.
630 : : */
631 : : double grow_ratio;
632 : :
4105 tgl@sss.pgh.pa.us 633 :CBC 34 : grow_ratio = (double) state->allowedMem / (double) memNowUsed;
3944 noah@leadboat.com 634 [ + - ]: 34 : if (memtupsize * grow_ratio < INT_MAX)
635 : 34 : newmemtupsize = (int) (memtupsize * grow_ratio);
636 : : else
3944 noah@leadboat.com 637 :UBC 0 : newmemtupsize = INT_MAX;
638 : :
639 : : /* We won't make any further enlargement attempts */
4105 tgl@sss.pgh.pa.us 640 :CBC 34 : state->growmemtuples = false;
641 : : }
642 : :
643 : : /* Must enlarge array by at least one element, else report failure */
644 [ - + ]: 929 : if (newmemtupsize <= memtupsize)
4105 tgl@sss.pgh.pa.us 645 :UBC 0 : goto noalloc;
646 : :
647 : : /*
648 : : * On a 32-bit machine, allowedMem could exceed MaxAllocHugeSize. Clamp
649 : : * to ensure our request won't be rejected. Note that we can easily
650 : : * exhaust address space before facing this outcome. (This is presently
651 : : * impossible due to guc.c's MAX_KILOBYTES limitation on work_mem, but
652 : : * don't rely on that at this distance.)
653 : : */
3944 noah@leadboat.com 654 [ - + ]:CBC 929 : if ((Size) newmemtupsize >= MaxAllocHugeSize / sizeof(void *))
655 : : {
3944 noah@leadboat.com 656 :UBC 0 : newmemtupsize = (int) (MaxAllocHugeSize / sizeof(void *));
4105 tgl@sss.pgh.pa.us 657 : 0 : state->growmemtuples = false; /* can't grow any more */
658 : : }
659 : :
660 : : /*
661 : : * We need to be sure that we do not cause LACKMEM to become true, else
662 : : * the space management algorithm will go nuts. The code above should
663 : : * never generate a dangerous request, but to be safe, check explicitly
664 : : * that the array growth fits within availMem. (We could still cause
665 : : * LACKMEM if the memory chunk overhead associated with the memtuples
666 : : * array were to increase. That shouldn't happen because we chose the
667 : : * initial array size large enough to ensure that palloc will be treating
668 : : * both old and new arrays as separate chunks. But we'll check LACKMEM
669 : : * explicitly below just in case.)
670 : : */
3937 noah@leadboat.com 671 [ - + ]:CBC 929 : if (state->availMem < (int64) ((newmemtupsize - memtupsize) * sizeof(void *)))
4105 tgl@sss.pgh.pa.us 672 :UBC 0 : goto noalloc;
673 : :
674 : : /* OK, do it */
4105 tgl@sss.pgh.pa.us 675 :CBC 929 : FREEMEM(state, GetMemoryChunkSpace(state->memtuples));
676 : 929 : state->memtupsize = newmemtupsize;
677 : 929 : state->memtuples = (void **)
3944 noah@leadboat.com 678 : 929 : repalloc_huge(state->memtuples,
679 : 929 : state->memtupsize * sizeof(void *));
4105 tgl@sss.pgh.pa.us 680 : 929 : USEMEM(state, GetMemoryChunkSpace(state->memtuples));
681 [ - + ]: 929 : if (LACKMEM(state))
3176 tgl@sss.pgh.pa.us 682 [ # # ]:UBC 0 : elog(ERROR, "unexpected out-of-memory situation in tuplestore");
4105 tgl@sss.pgh.pa.us 683 :CBC 929 : return true;
684 : :
4105 tgl@sss.pgh.pa.us 685 :UBC 0 : noalloc:
686 : : /* If for any reason we didn't realloc, shut off future attempts */
687 : 0 : state->growmemtuples = false;
688 : 0 : return false;
689 : : }
690 : :
691 : : /*
692 : : * Accept one tuple and append it to the tuplestore.
693 : : *
694 : : * Note that the input tuple is always copied; the caller need not save it.
695 : : *
696 : : * If the active read pointer is currently "at EOF", it remains so (the read
697 : : * pointer implicitly advances along with the write pointer); otherwise the
698 : : * read pointer is unchanged. Non-active read pointers do not move, which
699 : : * means they are certain to not be "at EOF" immediately after puttuple.
700 : : * This curious-seeming behavior is for the convenience of nodeMaterial.c and
701 : : * nodeCtescan.c, which would otherwise need to do extra pointer repositioning
702 : : * steps.
703 : : *
704 : : * tuplestore_puttupleslot() is a convenience routine to collect data from
705 : : * a TupleTableSlot without an extra copy operation.
706 : : */
707 : : void
6501 tgl@sss.pgh.pa.us 708 :CBC 975501 : tuplestore_puttupleslot(Tuplestorestate *state,
709 : : TupleTableSlot *slot)
710 : : {
711 : : MinimalTuple tuple;
5220 heikki.linnakangas@i 712 : 975501 : MemoryContext oldcxt = MemoryContextSwitchTo(state->context);
713 : :
714 : : /*
715 : : * Form a MinimalTuple in working memory
716 : : */
6501 tgl@sss.pgh.pa.us 717 : 975501 : tuple = ExecCopySlotMinimalTuple(slot);
718 : 975501 : USEMEM(state, GetMemoryChunkSpace(tuple));
719 : :
720 : 975501 : tuplestore_puttuple_common(state, (void *) tuple);
721 : :
5220 heikki.linnakangas@i 722 : 975501 : MemoryContextSwitchTo(oldcxt);
6501 tgl@sss.pgh.pa.us 723 : 975501 : }
724 : :
725 : : /*
726 : : * "Standard" case to copy from a HeapTuple. This is actually now somewhat
727 : : * deprecated, but not worth getting rid of in view of the number of callers.
728 : : */
729 : : void
730 : 791673 : tuplestore_puttuple(Tuplestorestate *state, HeapTuple tuple)
731 : : {
5220 heikki.linnakangas@i 732 : 791673 : MemoryContext oldcxt = MemoryContextSwitchTo(state->context);
733 : :
734 : : /*
735 : : * Copy the tuple. (Must do this even in WRITEFILE case. Note that
736 : : * COPYTUP includes USEMEM, so we needn't do that here.)
737 : : */
8701 tgl@sss.pgh.pa.us 738 : 791673 : tuple = COPYTUP(state, tuple);
739 : :
6501 740 : 791673 : tuplestore_puttuple_common(state, (void *) tuple);
741 : :
5220 heikki.linnakangas@i 742 : 791673 : MemoryContextSwitchTo(oldcxt);
6501 tgl@sss.pgh.pa.us 743 : 791673 : }
744 : :
745 : : /*
746 : : * Similar to tuplestore_puttuple(), but work from values + nulls arrays.
747 : : * This avoids an extra tuple-construction operation.
748 : : */
749 : : void
5864 neilc@samurai.com 750 : 7638275 : tuplestore_putvalues(Tuplestorestate *state, TupleDesc tdesc,
751 : : const Datum *values, const bool *isnull)
752 : : {
753 : : MinimalTuple tuple;
5220 heikki.linnakangas@i 754 : 7638275 : MemoryContext oldcxt = MemoryContextSwitchTo(state->context);
755 : :
5864 neilc@samurai.com 756 : 7638275 : tuple = heap_form_minimal_tuple(tdesc, values, isnull);
4687 tgl@sss.pgh.pa.us 757 : 7638275 : USEMEM(state, GetMemoryChunkSpace(tuple));
758 : :
5864 neilc@samurai.com 759 : 7638275 : tuplestore_puttuple_common(state, (void *) tuple);
760 : :
5161 bruce@momjian.us 761 : 7638275 : MemoryContextSwitchTo(oldcxt);
5864 neilc@samurai.com 762 : 7638275 : }
763 : :
764 : : static void
6501 tgl@sss.pgh.pa.us 765 : 9405449 : tuplestore_puttuple_common(Tuplestorestate *state, void *tuple)
766 : : {
767 : : TSReadPointer *readptr;
768 : : int i;
769 : : ResourceOwner oldowner;
770 : :
2571 kgrittn@postgresql.o 771 : 9405449 : state->tuples++;
772 : :
8701 tgl@sss.pgh.pa.us 773 [ + + - - ]: 9405449 : switch (state->status)
774 : : {
7707 775 : 6829403 : case TSS_INMEM:
776 : :
777 : : /*
778 : : * Update read pointers as needed; see API spec above.
779 : : */
5671 780 : 6829403 : readptr = state->readptrs;
781 [ + + ]: 14791214 : for (i = 0; i < state->readptrcount; readptr++, i++)
782 : : {
783 [ + + + + ]: 7961811 : if (readptr->eof_reached && i != state->activeptr)
784 : : {
785 : 227 : readptr->eof_reached = false;
786 : 227 : readptr->current = state->memtupcount;
787 : : }
788 : : }
789 : :
790 : : /*
791 : : * Grow the array as needed. Note that we try to grow the array
792 : : * when there is still one free slot remaining --- if we fail,
793 : : * there'll still be room to store the incoming tuple, and then
794 : : * we'll switch to tape-based operation.
795 : : */
6616 796 [ + + ]: 6829403 : if (state->memtupcount >= state->memtupsize - 1)
797 : : {
4105 798 : 956 : (void) grow_memtuples(state);
799 [ - + ]: 956 : Assert(state->memtupcount < state->memtupsize);
800 : : }
801 : :
802 : : /* Stash the tuple in the in-memory array */
8701 803 : 6829403 : state->memtuples[state->memtupcount++] = tuple;
804 : :
805 : : /*
806 : : * Done if we still fit in available memory and have array slots.
807 : : */
6616 808 [ + + + + ]: 6829403 : if (state->memtupcount < state->memtupsize && !LACKMEM(state))
8701 809 : 6829343 : return;
810 : :
811 : : /*
812 : : * Nope; time to switch to tape-based operation. Make sure that
813 : : * the temp file(s) are created in suitable temp tablespaces.
814 : : */
6156 815 : 60 : PrepareTempTablespaces();
816 : :
817 : : /* associate the file with the store's resource owner */
5220 heikki.linnakangas@i 818 : 60 : oldowner = CurrentResourceOwner;
819 : 60 : CurrentResourceOwner = state->resowner;
820 : :
6156 tgl@sss.pgh.pa.us 821 : 60 : state->myfile = BufFileCreateTemp(state->interXact);
822 : :
5220 heikki.linnakangas@i 823 : 60 : CurrentResourceOwner = oldowner;
824 : :
825 : : /*
826 : : * Freeze the decision about whether trailing length words will be
827 : : * used. We can't change this choice once data is on tape, even
828 : : * though callers might drop the requirement.
829 : : */
5674 tgl@sss.pgh.pa.us 830 : 60 : state->backward = (state->eflags & EXEC_FLAG_BACKWARD) != 0;
8701 831 : 60 : state->status = TSS_WRITEFILE;
832 : 60 : dumptuples(state);
833 : 60 : break;
834 : 2576046 : case TSS_WRITEFILE:
835 : :
836 : : /*
837 : : * Update read pointers as needed; see API spec above. Note:
838 : : * BufFileTell is quite cheap, so not worth trying to avoid
839 : : * multiple calls.
840 : : */
5671 841 : 2576046 : readptr = state->readptrs;
842 [ + + ]: 5152092 : for (i = 0; i < state->readptrcount; readptr++, i++)
843 : : {
844 [ - + - - ]: 2576046 : if (readptr->eof_reached && i != state->activeptr)
845 : : {
5671 tgl@sss.pgh.pa.us 846 :UBC 0 : readptr->eof_reached = false;
847 : 0 : BufFileTell(state->myfile,
848 : : &readptr->file,
849 : : &readptr->offset);
850 : : }
851 : : }
852 : :
8701 tgl@sss.pgh.pa.us 853 :CBC 2576046 : WRITETUP(state, tuple);
854 : 2576046 : break;
7707 tgl@sss.pgh.pa.us 855 :UBC 0 : case TSS_READFILE:
856 : :
857 : : /*
858 : : * Switch from reading to writing.
859 : : */
5674 860 [ # # ]: 0 : if (!state->readptrs[state->activeptr].eof_reached)
7707 861 : 0 : BufFileTell(state->myfile,
5674 862 : 0 : &state->readptrs[state->activeptr].file,
863 : 0 : &state->readptrs[state->activeptr].offset);
7707 864 [ # # ]: 0 : if (BufFileSeek(state->myfile,
865 : : state->writepos_file, state->writepos_offset,
866 : : SEEK_SET) != 0)
3594 867 [ # # ]: 0 : ereport(ERROR,
868 : : (errcode_for_file_access(),
869 : : errmsg("could not seek in tuplestore temporary file")));
7707 870 : 0 : state->status = TSS_WRITEFILE;
871 : :
872 : : /*
873 : : * Update read pointers as needed; see API spec above.
874 : : */
5671 875 : 0 : readptr = state->readptrs;
876 [ # # ]: 0 : for (i = 0; i < state->readptrcount; readptr++, i++)
877 : : {
878 [ # # # # ]: 0 : if (readptr->eof_reached && i != state->activeptr)
879 : : {
880 : 0 : readptr->eof_reached = false;
881 : 0 : readptr->file = state->writepos_file;
882 : 0 : readptr->offset = state->writepos_offset;
883 : : }
884 : : }
885 : :
7707 886 : 0 : WRITETUP(state, tuple);
8701 887 : 0 : break;
888 : 0 : default:
7569 889 [ # # ]: 0 : elog(ERROR, "invalid tuplestore state");
890 : : break;
891 : : }
892 : : }
893 : :
894 : : /*
895 : : * Fetch the next tuple in either forward or back direction.
896 : : * Returns NULL if no more tuples. If should_free is set, the
897 : : * caller must pfree the returned tuple when done with it.
898 : : *
899 : : * Backward scan is only allowed if randomAccess was set true or
900 : : * EXEC_FLAG_BACKWARD was specified to tuplestore_set_eflags().
901 : : */
902 : : static void *
8701 tgl@sss.pgh.pa.us 903 :CBC 10380120 : tuplestore_gettuple(Tuplestorestate *state, bool forward,
904 : : bool *should_free)
905 : : {
5674 906 : 10380120 : TSReadPointer *readptr = &state->readptrs[state->activeptr];
907 : : unsigned int tuplen;
908 : : void *tup;
909 : :
910 [ + + - + ]: 10380120 : Assert(forward || (readptr->eflags & EXEC_FLAG_BACKWARD));
911 : :
8701 912 [ + + + - ]: 10380120 : switch (state->status)
913 : : {
7707 914 : 6790216 : case TSS_INMEM:
8701 915 : 6790216 : *should_free = false;
916 [ + + ]: 6790216 : if (forward)
917 : : {
5674 918 [ + + ]: 6702578 : if (readptr->eof_reached)
919 : 90 : return NULL;
920 [ + + ]: 6702488 : if (readptr->current < state->memtupcount)
921 : : {
922 : : /* We have another tuple, so return it */
923 : 6532425 : return state->memtuples[readptr->current++];
924 : : }
925 : 170063 : readptr->eof_reached = true;
8701 926 : 170063 : return NULL;
927 : : }
928 : : else
929 : : {
930 : : /*
931 : : * if all tuples are fetched already then we return last
932 : : * tuple, else tuple before last returned.
933 : : */
5674 934 [ + + ]: 87638 : if (readptr->eof_reached)
935 : : {
936 : 1298 : readptr->current = state->memtupcount;
937 : 1298 : readptr->eof_reached = false;
938 : : }
939 : : else
940 : : {
4874 941 [ - + ]: 86340 : if (readptr->current <= state->memtupdeleted)
942 : : {
5587 tgl@sss.pgh.pa.us 943 [ # # ]:UBC 0 : Assert(!state->truncated);
8701 944 : 0 : return NULL;
945 : : }
5421 bruce@momjian.us 946 :CBC 86340 : readptr->current--; /* last returned tuple */
947 : : }
4874 tgl@sss.pgh.pa.us 948 [ + + ]: 87638 : if (readptr->current <= state->memtupdeleted)
949 : : {
5587 950 [ - + ]: 15 : Assert(!state->truncated);
5674 951 : 15 : return NULL;
952 : : }
953 : 87623 : return state->memtuples[readptr->current - 1];
954 : : }
955 : : break;
956 : :
7707 957 : 59 : case TSS_WRITEFILE:
958 : : /* Skip state change if we'll just return NULL */
5674 959 [ - + - - ]: 59 : if (readptr->eof_reached && forward)
7707 tgl@sss.pgh.pa.us 960 :UBC 0 : return NULL;
961 : :
962 : : /*
963 : : * Switch from writing to reading.
964 : : */
7707 tgl@sss.pgh.pa.us 965 :CBC 59 : BufFileTell(state->myfile,
966 : : &state->writepos_file, &state->writepos_offset);
5674 967 [ + - ]: 59 : if (!readptr->eof_reached)
7707 968 [ - + ]: 59 : if (BufFileSeek(state->myfile,
969 : : readptr->file, readptr->offset,
970 : : SEEK_SET) != 0)
3594 tgl@sss.pgh.pa.us 971 [ # # ]:UBC 0 : ereport(ERROR,
972 : : (errcode_for_file_access(),
973 : : errmsg("could not seek in tuplestore temporary file")));
7707 tgl@sss.pgh.pa.us 974 :CBC 59 : state->status = TSS_READFILE;
975 : : /* FALLTHROUGH */
976 : :
8701 977 : 3589904 : case TSS_READFILE:
978 : 3589904 : *should_free = true;
979 [ + + ]: 3589904 : if (forward)
980 : : {
981 [ + + ]: 3589898 : if ((tuplen = getlen(state, true)) != 0)
982 : : {
983 : 3589845 : tup = READTUP(state, tuplen);
984 : 3589845 : return tup;
985 : : }
986 : : else
987 : : {
5674 988 : 53 : readptr->eof_reached = true;
8701 989 : 53 : return NULL;
990 : : }
991 : : }
992 : :
993 : : /*
994 : : * Backward.
995 : : *
996 : : * if all tuples are fetched already then we return last tuple,
997 : : * else tuple before last returned.
998 : : *
999 : : * Back up to fetch previously-returned tuple's ending length
1000 : : * word. If seek fails, assume we are at start of file.
1001 : : */
7707 1002 [ - + ]: 6 : if (BufFileSeek(state->myfile, 0, -(long) sizeof(unsigned int),
1003 : : SEEK_CUR) != 0)
1004 : : {
1005 : : /* even a failed backwards fetch gets you out of eof state */
5674 tgl@sss.pgh.pa.us 1006 :UBC 0 : readptr->eof_reached = false;
5587 1007 [ # # ]: 0 : Assert(!state->truncated);
7707 1008 : 0 : return NULL;
1009 : : }
7707 tgl@sss.pgh.pa.us 1010 :CBC 6 : tuplen = getlen(state, false);
1011 : :
5674 1012 [ + + ]: 6 : if (readptr->eof_reached)
1013 : : {
1014 : 3 : readptr->eof_reached = false;
1015 : : /* We will return the tuple returned before returning NULL */
1016 : : }
1017 : : else
1018 : : {
1019 : : /*
1020 : : * Back up to get ending length word of tuple before it.
1021 : : */
8701 1022 [ - + ]: 3 : if (BufFileSeek(state->myfile, 0,
6756 bruce@momjian.us 1023 : 3 : -(long) (tuplen + 2 * sizeof(unsigned int)),
1024 : : SEEK_CUR) != 0)
1025 : : {
1026 : : /*
1027 : : * If that fails, presumably the prev tuple is the first
1028 : : * in the file. Back up so that it becomes next to read
1029 : : * in forward direction (not obviously right, but that is
1030 : : * what in-memory case does).
1031 : : */
8701 tgl@sss.pgh.pa.us 1032 [ # # ]:UBC 0 : if (BufFileSeek(state->myfile, 0,
6756 bruce@momjian.us 1033 : 0 : -(long) (tuplen + sizeof(unsigned int)),
1034 : : SEEK_CUR) != 0)
3594 tgl@sss.pgh.pa.us 1035 [ # # ]: 0 : ereport(ERROR,
1036 : : (errcode_for_file_access(),
1037 : : errmsg("could not seek in tuplestore temporary file")));
5587 1038 [ # # ]: 0 : Assert(!state->truncated);
8701 1039 : 0 : return NULL;
1040 : : }
7707 tgl@sss.pgh.pa.us 1041 :CBC 3 : tuplen = getlen(state, false);
1042 : : }
1043 : :
1044 : : /*
1045 : : * Now we have the length of the prior tuple, back up and read it.
1046 : : * Note: READTUP expects we are positioned after the initial
1047 : : * length word of the tuple, so back up to that point.
1048 : : */
8701 1049 [ - + ]: 6 : if (BufFileSeek(state->myfile, 0,
8424 bruce@momjian.us 1050 : 6 : -(long) tuplen,
1051 : : SEEK_CUR) != 0)
3594 tgl@sss.pgh.pa.us 1052 [ # # ]:UBC 0 : ereport(ERROR,
1053 : : (errcode_for_file_access(),
1054 : : errmsg("could not seek in tuplestore temporary file")));
8701 tgl@sss.pgh.pa.us 1055 :CBC 6 : tup = READTUP(state, tuplen);
1056 : 6 : return tup;
1057 : :
8701 tgl@sss.pgh.pa.us 1058 :UBC 0 : default:
7569 1059 [ # # ]: 0 : elog(ERROR, "invalid tuplestore state");
1060 : : return NULL; /* keep compiler quiet */
1061 : : }
1062 : : }
1063 : :
1064 : : /*
1065 : : * tuplestore_gettupleslot - exported function to fetch a MinimalTuple
1066 : : *
1067 : : * If successful, put tuple in slot and return true; else, clear the slot
1068 : : * and return false.
1069 : : *
1070 : : * If copy is true, the slot receives a copied tuple (allocated in current
1071 : : * memory context) that will stay valid regardless of future manipulations of
1072 : : * the tuplestore's state. If copy is false, the slot may just receive a
1073 : : * pointer to a tuple held within the tuplestore. The latter is more
1074 : : * efficient but the slot contents may be corrupted if additional writes to
1075 : : * the tuplestore occur. (If using tuplestore_trim, see comments therein.)
1076 : : */
1077 : : bool
6501 tgl@sss.pgh.pa.us 1078 :CBC 10293988 : tuplestore_gettupleslot(Tuplestorestate *state, bool forward,
1079 : : bool copy, TupleTableSlot *slot)
1080 : : {
1081 : : MinimalTuple tuple;
1082 : : bool should_free;
1083 : :
1084 : 10293988 : tuple = (MinimalTuple) tuplestore_gettuple(state, forward, &should_free);
1085 : :
1086 [ + + ]: 10293988 : if (tuple)
1087 : : {
5497 1088 [ + + + - ]: 10125041 : if (copy && !should_free)
1089 : : {
1090 : 874622 : tuple = heap_copy_minimal_tuple(tuple);
1091 : 874622 : should_free = true;
1092 : : }
6501 1093 : 10125041 : ExecStoreMinimalTuple(tuple, slot, should_free);
1094 : 10125041 : return true;
1095 : : }
1096 : : else
1097 : : {
1098 : 168947 : ExecClearTuple(slot);
1099 : 168947 : return false;
1100 : : }
1101 : : }
1102 : :
1103 : : /*
1104 : : * tuplestore_advance - exported function to adjust position without fetching
1105 : : *
1106 : : * We could optimize this case to avoid palloc/pfree overhead, but for the
1107 : : * moment it doesn't seem worthwhile.
1108 : : */
1109 : : bool
1110 : 86126 : tuplestore_advance(Tuplestorestate *state, bool forward)
1111 : : {
1112 : : void *tuple;
1113 : : bool should_free;
1114 : :
1115 : 86126 : tuple = tuplestore_gettuple(state, forward, &should_free);
1116 : :
1117 [ + + ]: 86126 : if (tuple)
1118 : : {
1119 [ - + ]: 84852 : if (should_free)
6501 tgl@sss.pgh.pa.us 1120 :UBC 0 : pfree(tuple);
6501 tgl@sss.pgh.pa.us 1121 :CBC 84852 : return true;
1122 : : }
1123 : : else
1124 : : {
1125 : 1274 : return false;
1126 : : }
1127 : : }
1128 : :
1129 : : /*
1130 : : * Advance over N tuples in either forward or back direction,
1131 : : * without returning any data. N<=0 is a no-op.
1132 : : * Returns true if successful, false if ran out of tuples.
1133 : : */
1134 : : bool
3654 1135 : 640603 : tuplestore_skiptuples(Tuplestorestate *state, int64 ntuples, bool forward)
1136 : : {
1137 : 640603 : TSReadPointer *readptr = &state->readptrs[state->activeptr];
1138 : :
1139 [ + + - + ]: 640603 : Assert(forward || (readptr->eflags & EXEC_FLAG_BACKWARD));
1140 : :
1141 [ + + ]: 640603 : if (ntuples <= 0)
1142 : 9 : return true;
1143 : :
1144 [ + + ]: 640594 : switch (state->status)
1145 : : {
1146 : 640591 : case TSS_INMEM:
1147 [ + + ]: 640591 : if (forward)
1148 : : {
1149 [ - + ]: 639227 : if (readptr->eof_reached)
3654 tgl@sss.pgh.pa.us 1150 :UBC 0 : return false;
3654 tgl@sss.pgh.pa.us 1151 [ + + ]:CBC 639227 : if (state->memtupcount - readptr->current >= ntuples)
1152 : : {
1153 : 639182 : readptr->current += ntuples;
1154 : 639182 : return true;
1155 : : }
1156 : 45 : readptr->current = state->memtupcount;
1157 : 45 : readptr->eof_reached = true;
1158 : 45 : return false;
1159 : : }
1160 : : else
1161 : : {
1162 [ - + ]: 1364 : if (readptr->eof_reached)
1163 : : {
3654 tgl@sss.pgh.pa.us 1164 :UBC 0 : readptr->current = state->memtupcount;
1165 : 0 : readptr->eof_reached = false;
1166 : 0 : ntuples--;
1167 : : }
3654 tgl@sss.pgh.pa.us 1168 [ + - ]:CBC 1364 : if (readptr->current - state->memtupdeleted > ntuples)
1169 : : {
1170 : 1364 : readptr->current -= ntuples;
1171 : 1364 : return true;
1172 : : }
3654 tgl@sss.pgh.pa.us 1173 [ # # ]:UBC 0 : Assert(!state->truncated);
1174 : 0 : readptr->current = state->memtupdeleted;
1175 : 0 : return false;
1176 : : }
1177 : : break;
1178 : :
3654 tgl@sss.pgh.pa.us 1179 :CBC 3 : default:
1180 : : /* We don't currently try hard to optimize other cases */
1181 [ + + ]: 9 : while (ntuples-- > 0)
1182 : : {
1183 : : void *tuple;
1184 : : bool should_free;
1185 : :
1186 : 6 : tuple = tuplestore_gettuple(state, forward, &should_free);
1187 : :
1188 [ - + ]: 6 : if (tuple == NULL)
3654 tgl@sss.pgh.pa.us 1189 :UBC 0 : return false;
3654 tgl@sss.pgh.pa.us 1190 [ + - ]:CBC 6 : if (should_free)
1191 : 6 : pfree(tuple);
1192 [ - + ]: 6 : CHECK_FOR_INTERRUPTS();
1193 : : }
1194 : 3 : return true;
1195 : : }
1196 : : }
1197 : :
1198 : : /*
1199 : : * dumptuples - remove tuples from memory and write to tape
1200 : : *
1201 : : * As a side effect, we must convert each read pointer's position from
1202 : : * "current" to file/offset format. But eof_reached pointers don't
1203 : : * need to change state.
1204 : : */
1205 : : static void
8701 1206 : 60 : dumptuples(Tuplestorestate *state)
1207 : : {
1208 : : int i;
1209 : :
4874 1210 : 60 : for (i = state->memtupdeleted;; i++)
7707 1211 : 1672667 : {
5674 1212 : 1672727 : TSReadPointer *readptr = state->readptrs;
1213 : : int j;
1214 : :
1215 [ + + ]: 3345454 : for (j = 0; j < state->readptrcount; readptr++, j++)
1216 : : {
1217 [ + + + - ]: 1672727 : if (i == readptr->current && !readptr->eof_reached)
1218 : 60 : BufFileTell(state->myfile,
1219 : : &readptr->file, &readptr->offset);
1220 : : }
7707 1221 [ + + ]: 1672727 : if (i >= state->memtupcount)
1222 : 60 : break;
8701 1223 : 1672667 : WRITETUP(state, state->memtuples[i]);
1224 : : }
4874 1225 : 60 : state->memtupdeleted = 0;
8701 1226 : 60 : state->memtupcount = 0;
1227 : 60 : }
1228 : :
1229 : : /*
1230 : : * tuplestore_rescan - rewind the active read pointer to start
1231 : : */
1232 : : void
1233 : 138585 : tuplestore_rescan(Tuplestorestate *state)
1234 : : {
5674 1235 : 138585 : TSReadPointer *readptr = &state->readptrs[state->activeptr];
1236 : :
1237 [ - + ]: 138585 : Assert(readptr->eflags & EXEC_FLAG_REWIND);
5587 1238 [ - + ]: 138585 : Assert(!state->truncated);
1239 : :
8701 1240 [ + + - - ]: 138585 : switch (state->status)
1241 : : {
7707 1242 : 138526 : case TSS_INMEM:
5674 1243 : 138526 : readptr->eof_reached = false;
1244 : 138526 : readptr->current = 0;
7707 1245 : 138526 : break;
1246 : 59 : case TSS_WRITEFILE:
5674 1247 : 59 : readptr->eof_reached = false;
1248 : 59 : readptr->file = 0;
382 peter@eisentraut.org 1249 : 59 : readptr->offset = 0;
8701 tgl@sss.pgh.pa.us 1250 : 59 : break;
8701 tgl@sss.pgh.pa.us 1251 :UBC 0 : case TSS_READFILE:
5674 1252 : 0 : readptr->eof_reached = false;
382 peter@eisentraut.org 1253 [ # # ]: 0 : if (BufFileSeek(state->myfile, 0, 0, SEEK_SET) != 0)
3594 tgl@sss.pgh.pa.us 1254 [ # # ]: 0 : ereport(ERROR,
1255 : : (errcode_for_file_access(),
1256 : : errmsg("could not seek in tuplestore temporary file")));
8701 1257 : 0 : break;
1258 : 0 : default:
7569 1259 [ # # ]: 0 : elog(ERROR, "invalid tuplestore state");
1260 : : break;
1261 : : }
8701 tgl@sss.pgh.pa.us 1262 :CBC 138585 : }
1263 : :
1264 : : /*
1265 : : * tuplestore_copy_read_pointer - copy a read pointer's state to another
1266 : : */
1267 : : void
5674 1268 : 30463 : tuplestore_copy_read_pointer(Tuplestorestate *state,
1269 : : int srcptr, int destptr)
1270 : : {
1271 : 30463 : TSReadPointer *sptr = &state->readptrs[srcptr];
1272 : 30463 : TSReadPointer *dptr = &state->readptrs[destptr];
1273 : :
1274 [ + - - + ]: 30463 : Assert(srcptr >= 0 && srcptr < state->readptrcount);
1275 [ + - - + ]: 30463 : Assert(destptr >= 0 && destptr < state->readptrcount);
1276 : :
1277 : : /* Assigning to self is a no-op */
1278 [ - + ]: 30463 : if (srcptr == destptr)
5674 tgl@sss.pgh.pa.us 1279 :UBC 0 : return;
1280 : :
5674 tgl@sss.pgh.pa.us 1281 [ - + ]:CBC 30463 : if (dptr->eflags != sptr->eflags)
1282 : : {
1283 : : /* Possible change of overall eflags, so copy and then recompute */
1284 : : int eflags;
1285 : : int i;
1286 : :
5674 tgl@sss.pgh.pa.us 1287 :UBC 0 : *dptr = *sptr;
1288 : 0 : eflags = state->readptrs[0].eflags;
1289 [ # # ]: 0 : for (i = 1; i < state->readptrcount; i++)
1290 : 0 : eflags |= state->readptrs[i].eflags;
1291 : 0 : state->eflags = eflags;
1292 : : }
1293 : : else
5674 tgl@sss.pgh.pa.us 1294 :CBC 30463 : *dptr = *sptr;
1295 : :
8701 1296 [ + - - ]: 30463 : switch (state->status)
1297 : : {
7707 1298 : 30463 : case TSS_INMEM:
1299 : : case TSS_WRITEFILE:
1300 : : /* no work */
8701 1301 : 30463 : break;
8701 tgl@sss.pgh.pa.us 1302 :UBC 0 : case TSS_READFILE:
1303 : :
1304 : : /*
1305 : : * This case is a bit tricky since the active read pointer's
1306 : : * position corresponds to the seek point, not what is in its
1307 : : * variables. Assigning to the active requires a seek, and
1308 : : * assigning from the active requires a tell, except when
1309 : : * eof_reached.
1310 : : */
5674 1311 [ # # ]: 0 : if (destptr == state->activeptr)
1312 : : {
1313 [ # # ]: 0 : if (dptr->eof_reached)
1314 : : {
1315 [ # # ]: 0 : if (BufFileSeek(state->myfile,
1316 : : state->writepos_file,
1317 : : state->writepos_offset,
1318 : : SEEK_SET) != 0)
3594 1319 [ # # ]: 0 : ereport(ERROR,
1320 : : (errcode_for_file_access(),
1321 : : errmsg("could not seek in tuplestore temporary file")));
1322 : : }
1323 : : else
1324 : : {
5674 1325 [ # # ]: 0 : if (BufFileSeek(state->myfile,
1326 : : dptr->file, dptr->offset,
1327 : : SEEK_SET) != 0)
3594 1328 [ # # ]: 0 : ereport(ERROR,
1329 : : (errcode_for_file_access(),
1330 : : errmsg("could not seek in tuplestore temporary file")));
1331 : : }
1332 : : }
5674 1333 [ # # ]: 0 : else if (srcptr == state->activeptr)
1334 : : {
1335 [ # # ]: 0 : if (!dptr->eof_reached)
1336 : 0 : BufFileTell(state->myfile,
1337 : : &dptr->file,
1338 : : &dptr->offset);
1339 : : }
8701 1340 : 0 : break;
1341 : 0 : default:
7569 1342 [ # # ]: 0 : elog(ERROR, "invalid tuplestore state");
1343 : : break;
1344 : : }
1345 : : }
1346 : :
1347 : : /*
1348 : : * tuplestore_trim - remove all no-longer-needed tuples
1349 : : *
1350 : : * Calling this function authorizes the tuplestore to delete all tuples
1351 : : * before the oldest read pointer, if no read pointer is marked as requiring
1352 : : * REWIND capability.
1353 : : *
1354 : : * Note: this is obviously safe if no pointer has BACKWARD capability either.
1355 : : * If a pointer is marked as BACKWARD but not REWIND capable, it means that
1356 : : * the pointer can be moved backward but not before the oldest other read
1357 : : * pointer.
1358 : : */
1359 : : void
5674 tgl@sss.pgh.pa.us 1360 :CBC 413399 : tuplestore_trim(Tuplestorestate *state)
1361 : : {
1362 : : int oldest;
1363 : : int nremove;
1364 : : int i;
1365 : :
1366 : : /*
1367 : : * Truncation is disallowed if any read pointer requires rewind
1368 : : * capability.
1369 : : */
5587 1370 [ - + ]: 413399 : if (state->eflags & EXEC_FLAG_REWIND)
5674 tgl@sss.pgh.pa.us 1371 :UBC 0 : return;
1372 : :
1373 : : /*
1374 : : * We don't bother trimming temp files since it usually would mean more
1375 : : * work than just letting them sit in kernel buffers until they age out.
1376 : : */
6173 tgl@sss.pgh.pa.us 1377 [ - + ]:CBC 413399 : if (state->status != TSS_INMEM)
6173 tgl@sss.pgh.pa.us 1378 :UBC 0 : return;
1379 : :
1380 : : /* Find the oldest read pointer */
5674 tgl@sss.pgh.pa.us 1381 :CBC 413399 : oldest = state->memtupcount;
1382 [ + + ]: 1782168 : for (i = 0; i < state->readptrcount; i++)
1383 : : {
1384 [ + + ]: 1368769 : if (!state->readptrs[i].eof_reached)
1385 : 1354161 : oldest = Min(oldest, state->readptrs[i].current);
1386 : : }
1387 : :
1388 : : /*
1389 : : * Note: you might think we could remove all the tuples before the oldest
1390 : : * "current", since that one is the next to be returned. However, since
1391 : : * tuplestore_gettuple returns a direct pointer to our internal copy of
1392 : : * the tuple, it's likely that the caller has still got the tuple just
1393 : : * before "current" referenced in a slot. So we keep one extra tuple
1394 : : * before the oldest "current". (Strictly speaking, we could require such
1395 : : * callers to use the "copy" flag to tuplestore_gettupleslot, but for
1396 : : * efficiency we allow this one case to not use "copy".)
1397 : : */
1398 : 413399 : nremove = oldest - 1;
6173 1399 [ + + ]: 413399 : if (nremove <= 0)
1400 : 3551 : return; /* nothing to do */
1401 : :
4874 1402 [ - + ]: 409848 : Assert(nremove >= state->memtupdeleted);
6173 1403 [ - + ]: 409848 : Assert(nremove <= state->memtupcount);
1404 : :
1405 : : /* Release no-longer-needed tuples */
4874 1406 [ + + ]: 820222 : for (i = state->memtupdeleted; i < nremove; i++)
1407 : : {
6173 1408 : 410374 : FREEMEM(state, GetMemoryChunkSpace(state->memtuples[i]));
1409 : 410374 : pfree(state->memtuples[i]);
4874 1410 : 410374 : state->memtuples[i] = NULL;
1411 : : }
1412 : 409848 : state->memtupdeleted = nremove;
1413 : :
1414 : : /* mark tuplestore as truncated (used for Assert crosschecks only) */
1415 : 409848 : state->truncated = true;
1416 : :
1417 : : /*
1418 : : * If nremove is less than 1/8th memtupcount, just stop here, leaving the
1419 : : * "deleted" slots as NULL. This prevents us from expending O(N^2) time
1420 : : * repeatedly memmove-ing a large pointer array. The worst case space
1421 : : * wastage is pretty small, since it's just pointers and not whole tuples.
1422 : : */
1423 [ + + ]: 409848 : if (nremove < state->memtupcount / 8)
1424 : 56340 : return;
1425 : :
1426 : : /*
1427 : : * Slide the array down and readjust pointers.
1428 : : *
1429 : : * In mergejoin's current usage, it's demonstrable that there will always
1430 : : * be exactly one non-removed tuple; so optimize that case.
1431 : : */
6173 1432 [ + + ]: 353508 : if (nremove + 1 == state->memtupcount)
1433 : 315126 : state->memtuples[0] = state->memtuples[nremove];
1434 : : else
1435 : 38382 : memmove(state->memtuples, state->memtuples + nremove,
1436 : 38382 : (state->memtupcount - nremove) * sizeof(void *));
1437 : :
4874 1438 : 353508 : state->memtupdeleted = 0;
6173 1439 : 353508 : state->memtupcount -= nremove;
5674 1440 [ + + ]: 1535853 : for (i = 0; i < state->readptrcount; i++)
1441 : : {
1442 [ + + ]: 1182345 : if (!state->readptrs[i].eof_reached)
1443 : 1180220 : state->readptrs[i].current -= nremove;
1444 : : }
1445 : : }
1446 : :
1447 : : /*
1448 : : * tuplestore_in_memory
1449 : : *
1450 : : * Returns true if the tuplestore has not spilled to disk.
1451 : : *
1452 : : * XXX exposing this is a violation of modularity ... should get rid of it.
1453 : : */
1454 : : bool
5586 1455 : 776691 : tuplestore_in_memory(Tuplestorestate *state)
1456 : : {
1457 : 776691 : return (state->status == TSS_INMEM);
1458 : : }
1459 : :
1460 : :
1461 : : /*
1462 : : * Tape interface routines
1463 : : */
1464 : :
1465 : : static unsigned int
8701 1466 : 3589907 : getlen(Tuplestorestate *state, bool eofOK)
1467 : : {
1468 : : unsigned int len;
1469 : : size_t nbytes;
1470 : :
454 peter@eisentraut.org 1471 : 3589907 : nbytes = BufFileReadMaybeEOF(state->myfile, &len, sizeof(len), eofOK);
1472 [ + + ]: 3589907 : if (nbytes == 0)
1473 : 53 : return 0;
1474 : : else
7707 tgl@sss.pgh.pa.us 1475 : 3589854 : return len;
1476 : : }
1477 : :
1478 : :
1479 : : /*
1480 : : * Routines specialized for HeapTuple case
1481 : : *
1482 : : * The stored form is actually a MinimalTuple, but for largely historical
1483 : : * reasons we allow COPYTUP to work from a HeapTuple.
1484 : : *
1485 : : * Since MinimalTuple already has length in its first word, we don't need
1486 : : * to write that separately.
1487 : : */
1488 : :
1489 : : static void *
8701 1490 : 791673 : copytup_heap(Tuplestorestate *state, void *tup)
1491 : : {
1492 : : MinimalTuple tuple;
1493 : :
6501 1494 : 791673 : tuple = minimal_tuple_from_heap_tuple((HeapTuple) tup);
7916 1495 : 791673 : USEMEM(state, GetMemoryChunkSpace(tuple));
1496 : 791673 : return (void *) tuple;
1497 : : }
1498 : :
1499 : : static void
8701 1500 : 4248713 : writetup_heap(Tuplestorestate *state, void *tup)
1501 : : {
6501 1502 : 4248713 : MinimalTuple tuple = (MinimalTuple) tup;
1503 : :
1504 : : /* the part of the MinimalTuple we'll write: */
5647 1505 : 4248713 : char *tupbody = (char *) tuple + MINIMAL_TUPLE_DATA_OFFSET;
1506 : 4248713 : unsigned int tupbodylen = tuple->t_len - MINIMAL_TUPLE_DATA_OFFSET;
1507 : :
1508 : : /* total on-disk footprint: */
1509 : 4248713 : unsigned int tuplen = tupbodylen + sizeof(int);
1510 : :
493 peter@eisentraut.org 1511 : 4248713 : BufFileWrite(state->myfile, &tuplen, sizeof(tuplen));
1512 : 4248713 : BufFileWrite(state->myfile, tupbody, tupbodylen);
5674 tgl@sss.pgh.pa.us 1513 [ + + ]: 4248713 : if (state->backward) /* need trailing length word? */
493 peter@eisentraut.org 1514 : 30000 : BufFileWrite(state->myfile, &tuplen, sizeof(tuplen));
1515 : :
7916 tgl@sss.pgh.pa.us 1516 : 4248713 : FREEMEM(state, GetMemoryChunkSpace(tuple));
6501 1517 : 4248713 : heap_free_minimal_tuple(tuple);
8701 1518 : 4248713 : }
1519 : :
1520 : : static void *
1521 : 3589851 : readtup_heap(Tuplestorestate *state, unsigned int len)
1522 : : {
5647 1523 : 3589851 : unsigned int tupbodylen = len - sizeof(int);
1524 : 3589851 : unsigned int tuplen = tupbodylen + MINIMAL_TUPLE_DATA_OFFSET;
1525 : 3589851 : MinimalTuple tuple = (MinimalTuple) palloc(tuplen);
1526 : 3589851 : char *tupbody = (char *) tuple + MINIMAL_TUPLE_DATA_OFFSET;
1527 : :
7916 1528 : 3589851 : USEMEM(state, GetMemoryChunkSpace(tuple));
1529 : : /* read in the tuple proper */
5647 1530 : 3589851 : tuple->t_len = tuplen;
454 peter@eisentraut.org 1531 : 3589851 : BufFileReadExact(state->myfile, tupbody, tupbodylen);
5674 tgl@sss.pgh.pa.us 1532 [ + + ]: 3589851 : if (state->backward) /* need trailing length word? */
454 peter@eisentraut.org 1533 : 30009 : BufFileReadExact(state->myfile, &tuplen, sizeof(tuplen));
8701 tgl@sss.pgh.pa.us 1534 : 3589851 : return (void *) tuple;
1535 : : }
|