Age Owner TLA Line data Source code
1 : /*-------------------------------------------------------------------------
2 : *
3 : * sharedtuplestore.c
4 : * Simple mechanism for sharing tuples between backends.
5 : *
6 : * This module contains a shared temporary tuple storage mechanism providing
7 : * a parallel-aware subset of the features of tuplestore.c. Multiple backends
8 : * can write to a SharedTuplestore, and then multiple backends can later scan
9 : * the stored tuples. Currently, the only scan type supported is a parallel
10 : * scan where each backend reads an arbitrary subset of the tuples that were
11 : * written.
12 : *
13 : * Portions Copyright (c) 1996-2023, PostgreSQL Global Development Group
14 : * Portions Copyright (c) 1994, Regents of the University of California
15 : *
16 : * IDENTIFICATION
17 : * src/backend/utils/sort/sharedtuplestore.c
18 : *
19 : *-------------------------------------------------------------------------
20 : */
21 :
22 : #include "postgres.h"
23 :
24 : #include "access/htup.h"
25 : #include "access/htup_details.h"
26 : #include "miscadmin.h"
27 : #include "storage/buffile.h"
28 : #include "storage/lwlock.h"
29 : #include "storage/sharedfileset.h"
30 : #include "utils/sharedtuplestore.h"
31 :
32 : /*
33 : * The size of chunks, in pages. This is somewhat arbitrarily set to match
34 : * the size of HASH_CHUNK, so that Parallel Hash obtains new chunks of tuples
35 : * at approximately the same rate as it allocates new chunks of memory to
36 : * insert them into.
37 : */
38 : #define STS_CHUNK_PAGES 4
39 : #define STS_CHUNK_HEADER_SIZE offsetof(SharedTuplestoreChunk, data)
40 : #define STS_CHUNK_DATA_SIZE (STS_CHUNK_PAGES * BLCKSZ - STS_CHUNK_HEADER_SIZE)
41 :
42 : /* Chunk written to disk. */
43 : typedef struct SharedTuplestoreChunk
44 : {
45 : int ntuples; /* Number of tuples in this chunk. */
46 : int overflow; /* If overflow, how many including this one? */
47 : char data[FLEXIBLE_ARRAY_MEMBER];
48 : } SharedTuplestoreChunk;
49 :
50 : /* Per-participant shared state. */
51 : typedef struct SharedTuplestoreParticipant
52 : {
53 : LWLock lock;
54 : BlockNumber read_page; /* Page number for next read. */
55 : BlockNumber npages; /* Number of pages written. */
56 : bool writing; /* Used only for assertions. */
57 : } SharedTuplestoreParticipant;
58 :
59 : /* The control object that lives in shared memory. */
60 : struct SharedTuplestore
61 : {
62 : int nparticipants; /* Number of participants that can write. */
63 : int flags; /* Flag bits from SHARED_TUPLESTORE_XXX */
64 : size_t meta_data_size; /* Size of per-tuple header. */
65 : char name[NAMEDATALEN]; /* A name for this tuplestore. */
66 :
67 : /* Followed by per-participant shared state. */
68 : SharedTuplestoreParticipant participants[FLEXIBLE_ARRAY_MEMBER];
69 : };
70 :
71 : /* Per-participant state that lives in backend-local memory. */
72 : struct SharedTuplestoreAccessor
73 : {
74 : int participant; /* My participant number. */
75 : SharedTuplestore *sts; /* The shared state. */
76 : SharedFileSet *fileset; /* The SharedFileSet holding files. */
77 : MemoryContext context; /* Memory context for buffers. */
78 :
79 : /* State for reading. */
80 : int read_participant; /* The current participant to read from. */
81 : BufFile *read_file; /* The current file to read from. */
82 : int read_ntuples_available; /* The number of tuples in chunk. */
83 : int read_ntuples; /* How many tuples have we read from chunk? */
84 : size_t read_bytes; /* How many bytes have we read from chunk? */
85 : char *read_buffer; /* A buffer for loading tuples. */
86 : size_t read_buffer_size;
87 : BlockNumber read_next_page; /* Lowest block we'll consider reading. */
88 :
89 : /* State for writing. */
90 : SharedTuplestoreChunk *write_chunk; /* Buffer for writing. */
91 : BufFile *write_file; /* The current file to write to. */
92 : BlockNumber write_page; /* The next page to write to. */
93 : char *write_pointer; /* Current write pointer within chunk. */
94 : char *write_end; /* One past the end of the current chunk. */
95 : };
96 :
97 : static void sts_filename(char *name, SharedTuplestoreAccessor *accessor,
98 : int participant);
99 :
100 : /*
101 : * Return the amount of shared memory required to hold SharedTuplestore for a
102 : * given number of participants.
103 : */
104 : size_t
1938 andres 105 CBC 2302 : sts_estimate(int participants)
106 : {
107 4604 : return offsetof(SharedTuplestore, participants) +
108 2302 : sizeof(SharedTuplestoreParticipant) * participants;
109 : }
110 :
111 : /*
112 : * Initialize a SharedTuplestore in existing shared memory. There must be
113 : * space for sts_estimate(participants) bytes. If flags includes the value
114 : * SHARED_TUPLESTORE_SINGLE_PASS, the files may in future be removed more
115 : * eagerly (but this isn't yet implemented).
116 : *
117 : * Tuples that are stored may optionally carry a piece of fixed sized
118 : * meta-data which will be retrieved along with the tuple. This is useful for
119 : * the hash values used in multi-batch hash joins, but could have other
120 : * applications.
121 : *
122 : * The caller must supply a SharedFileSet, which is essentially a directory
123 : * that will be cleaned up automatically, and a name which must be unique
124 : * across all SharedTuplestores created in the same SharedFileSet.
125 : */
126 : SharedTuplestoreAccessor *
127 972 : sts_initialize(SharedTuplestore *sts, int participants,
128 : int my_participant_number,
129 : size_t meta_data_size,
130 : int flags,
131 : SharedFileSet *fileset,
132 : const char *name)
133 : {
134 : SharedTuplestoreAccessor *accessor;
135 : int i;
136 :
137 972 : Assert(my_participant_number < participants);
138 :
139 972 : sts->nparticipants = participants;
140 972 : sts->meta_data_size = meta_data_size;
141 972 : sts->flags = flags;
142 :
143 972 : if (strlen(name) > sizeof(sts->name) - 1)
1938 andres 144 UBC 0 : elog(ERROR, "SharedTuplestore name too long");
1938 andres 145 CBC 972 : strcpy(sts->name, name);
146 :
147 : /*
148 : * Limit meta-data so it + tuple size always fits into a single chunk.
149 : * sts_puttuple() and sts_read_tuple() could be made to support scenarios
150 : * where that's not the case, but it's not currently required. If so,
151 : * meta-data size probably should be made variable, too.
152 : */
153 972 : if (meta_data_size + sizeof(uint32) >= STS_CHUNK_DATA_SIZE)
1938 andres 154 UBC 0 : elog(ERROR, "meta-data too long");
155 :
1938 andres 156 CBC 3682 : for (i = 0; i < participants; ++i)
157 : {
158 2710 : LWLockInitialize(&sts->participants[i].lock,
159 : LWTRANCHE_SHARED_TUPLESTORE);
160 2710 : sts->participants[i].read_page = 0;
73 tmunro 161 2710 : sts->participants[i].npages = 0;
1938 andres 162 2710 : sts->participants[i].writing = false;
163 : }
164 :
165 972 : accessor = palloc0(sizeof(SharedTuplestoreAccessor));
166 972 : accessor->participant = my_participant_number;
167 972 : accessor->sts = sts;
168 972 : accessor->fileset = fileset;
169 972 : accessor->context = CurrentMemoryContext;
170 :
171 972 : return accessor;
172 : }
173 :
174 : /*
175 : * Attach to a SharedTuplestore that has been initialized by another backend,
176 : * so that this backend can read and write tuples.
177 : */
178 : SharedTuplestoreAccessor *
179 1221 : sts_attach(SharedTuplestore *sts,
180 : int my_participant_number,
181 : SharedFileSet *fileset)
182 : {
183 : SharedTuplestoreAccessor *accessor;
184 :
185 1221 : Assert(my_participant_number < sts->nparticipants);
186 :
187 1221 : accessor = palloc0(sizeof(SharedTuplestoreAccessor));
188 1221 : accessor->participant = my_participant_number;
189 1221 : accessor->sts = sts;
190 1221 : accessor->fileset = fileset;
191 1221 : accessor->context = CurrentMemoryContext;
192 :
193 1221 : return accessor;
194 : }
195 :
196 : static void
197 1964 : sts_flush_chunk(SharedTuplestoreAccessor *accessor)
198 : {
199 : size_t size;
200 :
201 1964 : size = STS_CHUNK_PAGES * BLCKSZ;
1027 tmunro 202 1964 : BufFileWrite(accessor->write_file, accessor->write_chunk, size);
1938 andres 203 1964 : memset(accessor->write_chunk, 0, size);
204 1964 : accessor->write_pointer = &accessor->write_chunk->data[0];
205 1964 : accessor->sts->participants[accessor->participant].npages +=
206 : STS_CHUNK_PAGES;
207 1964 : }
208 :
209 : /*
210 : * Finish writing tuples. This must be called by all backends that have
211 : * written data before any backend begins reading it.
212 : */
213 : void
214 3507 : sts_end_write(SharedTuplestoreAccessor *accessor)
215 : {
216 3507 : if (accessor->write_file != NULL)
217 : {
218 1193 : sts_flush_chunk(accessor);
219 1193 : BufFileClose(accessor->write_file);
220 1193 : pfree(accessor->write_chunk);
221 1193 : accessor->write_chunk = NULL;
222 1193 : accessor->write_file = NULL;
223 1193 : accessor->sts->participants[accessor->participant].writing = false;
224 : }
225 3507 : }
226 :
227 : /*
228 : * Prepare to rescan. Only one participant must call this. After it returns,
229 : * all participants may call sts_begin_parallel_scan() and then loop over
230 : * sts_parallel_scan_next(). This function must not be called concurrently
231 : * with a scan, and synchronization to avoid that is the caller's
232 : * responsibility.
233 : */
234 : void
1938 andres 235 UBC 0 : sts_reinitialize(SharedTuplestoreAccessor *accessor)
236 : {
237 : int i;
238 :
239 : /*
240 : * Reset the shared read head for all participants' files. Also set the
241 : * initial chunk size to the minimum (any increases from that size will be
242 : * recorded in chunk_expansion_log).
243 : */
244 0 : for (i = 0; i < accessor->sts->nparticipants; ++i)
245 : {
246 0 : accessor->sts->participants[i].read_page = 0;
247 : }
248 0 : }
249 :
250 : /*
251 : * Begin scanning the contents in parallel.
252 : */
253 : void
1938 andres 254 CBC 894 : sts_begin_parallel_scan(SharedTuplestoreAccessor *accessor)
255 : {
256 : int i PG_USED_FOR_ASSERTS_ONLY;
257 :
258 : /* End any existing scan that was in progress. */
259 894 : sts_end_parallel_scan(accessor);
260 :
261 : /*
262 : * Any backend that might have written into this shared tuplestore must
263 : * have called sts_end_write(), so that all buffers are flushed and the
264 : * files have stopped growing.
265 : */
266 3410 : for (i = 0; i < accessor->sts->nparticipants; ++i)
267 2516 : Assert(!accessor->sts->participants[i].writing);
268 :
269 : /*
270 : * We will start out reading the file that THIS backend wrote. There may
271 : * be some caching locality advantage to that.
272 : */
273 894 : accessor->read_participant = accessor->participant;
274 894 : accessor->read_file = NULL;
275 894 : accessor->read_next_page = 0;
276 894 : }
277 :
278 : /*
279 : * Finish a parallel scan, freeing associated backend-local resources.
280 : */
281 : void
282 4431 : sts_end_parallel_scan(SharedTuplestoreAccessor *accessor)
283 : {
284 : /*
285 : * Here we could delete all files if SHARED_TUPLESTORE_SINGLE_PASS, but
286 : * we'd probably need a reference count of current parallel scanners so we
287 : * could safely do it only when the reference count reaches zero.
288 : */
289 4431 : if (accessor->read_file != NULL)
290 : {
1938 andres 291 UBC 0 : BufFileClose(accessor->read_file);
292 0 : accessor->read_file = NULL;
293 : }
1938 andres 294 CBC 4431 : }
295 :
296 : /*
297 : * Write a tuple. If a meta-data size was provided to sts_initialize, then a
298 : * pointer to meta data of that size must be provided.
299 : */
300 : void
301 1250649 : sts_puttuple(SharedTuplestoreAccessor *accessor, void *meta_data,
302 : MinimalTuple tuple)
303 : {
304 : size_t size;
305 :
306 : /* Do we have our own file yet? */
307 1250649 : if (accessor->write_file == NULL)
308 : {
309 : SharedTuplestoreParticipant *participant;
310 : char name[MAXPGPATH];
311 :
312 : /* Create one. Only this backend will write into it. */
313 1193 : sts_filename(name, accessor, accessor->participant);
587 akapila 314 1193 : accessor->write_file =
315 1193 : BufFileCreateFileSet(&accessor->fileset->fs, name);
316 :
317 : /* Set up the shared state for this backend's file. */
1938 andres 318 1193 : participant = &accessor->sts->participants[accessor->participant];
319 1193 : participant->writing = true; /* for assertions only */
320 : }
321 :
322 : /* Do we have space? */
323 1250649 : size = accessor->sts->meta_data_size + tuple->t_len;
73 tmunro 324 1250649 : if (accessor->write_pointer + size > accessor->write_end)
325 : {
1938 andres 326 1856 : if (accessor->write_chunk == NULL)
327 : {
328 : /* First time through. Allocate chunk. */
329 1193 : accessor->write_chunk = (SharedTuplestoreChunk *)
330 1193 : MemoryContextAllocZero(accessor->context,
331 : STS_CHUNK_PAGES * BLCKSZ);
332 1193 : accessor->write_chunk->ntuples = 0;
333 1193 : accessor->write_pointer = &accessor->write_chunk->data[0];
334 1193 : accessor->write_end = (char *)
335 1193 : accessor->write_chunk + STS_CHUNK_PAGES * BLCKSZ;
336 : }
337 : else
338 : {
339 : /* See if flushing helps. */
340 663 : sts_flush_chunk(accessor);
341 : }
342 :
343 : /* It may still not be enough in the case of a gigantic tuple. */
73 tmunro 344 1856 : if (accessor->write_pointer + size > accessor->write_end)
345 : {
346 : size_t written;
347 :
348 : /*
349 : * We'll write the beginning of the oversized tuple, and then
350 : * write the rest in some number of 'overflow' chunks.
351 : *
352 : * sts_initialize() verifies that the size of the tuple +
353 : * meta-data always fits into a chunk. Because the chunk has been
354 : * flushed above, we can be sure to have all of a chunk's usable
355 : * space available.
356 : */
1938 andres 357 12 : Assert(accessor->write_pointer + accessor->sts->meta_data_size +
358 : sizeof(uint32) < accessor->write_end);
359 :
360 : /* Write the meta-data as one chunk. */
361 12 : if (accessor->sts->meta_data_size > 0)
362 12 : memcpy(accessor->write_pointer, meta_data,
363 12 : accessor->sts->meta_data_size);
364 :
365 : /*
366 : * Write as much of the tuple as we can fit. This includes the
367 : * tuple's size at the start.
368 : */
369 12 : written = accessor->write_end - accessor->write_pointer -
370 12 : accessor->sts->meta_data_size;
371 12 : memcpy(accessor->write_pointer + accessor->sts->meta_data_size,
372 : tuple, written);
373 12 : ++accessor->write_chunk->ntuples;
374 12 : size -= accessor->sts->meta_data_size;
375 12 : size -= written;
376 : /* Now write as many overflow chunks as we need for the rest. */
377 120 : while (size > 0)
378 : {
379 : size_t written_this_chunk;
380 :
381 108 : sts_flush_chunk(accessor);
382 :
383 : /*
384 : * How many overflow chunks to go? This will allow readers to
385 : * skip all of them at once instead of reading each one.
386 : */
387 108 : accessor->write_chunk->overflow = (size + STS_CHUNK_DATA_SIZE - 1) /
388 : STS_CHUNK_DATA_SIZE;
389 108 : written_this_chunk =
390 108 : Min(accessor->write_end - accessor->write_pointer, size);
391 108 : memcpy(accessor->write_pointer, (char *) tuple + written,
392 : written_this_chunk);
393 108 : accessor->write_pointer += written_this_chunk;
394 108 : size -= written_this_chunk;
395 108 : written += written_this_chunk;
396 : }
397 12 : return;
398 : }
399 : }
400 :
401 : /* Copy meta-data and tuple into buffer. */
402 1250637 : if (accessor->sts->meta_data_size > 0)
403 1250637 : memcpy(accessor->write_pointer, meta_data,
404 1250637 : accessor->sts->meta_data_size);
405 1250637 : memcpy(accessor->write_pointer + accessor->sts->meta_data_size, tuple,
406 1250637 : tuple->t_len);
407 1250637 : accessor->write_pointer += size;
408 1250637 : ++accessor->write_chunk->ntuples;
409 : }
410 :
411 : static MinimalTuple
412 1250649 : sts_read_tuple(SharedTuplestoreAccessor *accessor, void *meta_data)
413 : {
414 : MinimalTuple tuple;
415 : uint32 size;
416 : size_t remaining_size;
417 : size_t this_chunk_size;
418 : char *destination;
419 :
420 : /*
421 : * We'll keep track of bytes read from this chunk so that we can detect an
422 : * overflowing tuple and switch to reading overflow pages.
423 : */
424 1250649 : if (accessor->sts->meta_data_size > 0)
425 : {
83 peter 426 GNC 1250649 : BufFileReadExact(accessor->read_file, meta_data, accessor->sts->meta_data_size);
1938 andres 427 GIC 1250649 : accessor->read_bytes += accessor->sts->meta_data_size;
1938 andres 428 ECB : }
83 peter 429 GNC 1250649 : BufFileReadExact(accessor->read_file, &size, sizeof(size));
1938 andres 430 CBC 1250649 : accessor->read_bytes += sizeof(size);
1938 andres 431 GIC 1250649 : if (size > accessor->read_buffer_size)
1938 andres 432 ECB : {
433 : size_t new_read_buffer_size;
434 :
1938 andres 435 CBC 721 : if (accessor->read_buffer != NULL)
1938 andres 436 LBC 0 : pfree(accessor->read_buffer);
1938 andres 437 CBC 721 : new_read_buffer_size = Max(size, accessor->read_buffer_size * 2);
1938 andres 438 GIC 721 : accessor->read_buffer =
439 721 : MemoryContextAlloc(accessor->context, new_read_buffer_size);
1938 andres 440 CBC 721 : accessor->read_buffer_size = new_read_buffer_size;
441 : }
1938 andres 442 GIC 1250649 : remaining_size = size - sizeof(uint32);
443 1250649 : this_chunk_size = Min(remaining_size,
444 : BLCKSZ * STS_CHUNK_PAGES - accessor->read_bytes);
1938 andres 445 CBC 1250649 : destination = accessor->read_buffer + sizeof(uint32);
83 peter 446 GNC 1250649 : BufFileReadExact(accessor->read_file, destination, this_chunk_size);
1938 andres 447 CBC 1250649 : accessor->read_bytes += this_chunk_size;
1938 andres 448 GIC 1250649 : remaining_size -= this_chunk_size;
449 1250649 : destination += this_chunk_size;
1938 andres 450 CBC 1250649 : ++accessor->read_ntuples;
1938 andres 451 ECB :
452 : /* Check if we need to read any overflow chunks. */
1938 andres 453 CBC 1250757 : while (remaining_size > 0)
454 : {
455 : /* We are now positioned at the start of an overflow chunk. */
456 : SharedTuplestoreChunk chunk_header;
457 :
83 peter 458 GNC 108 : BufFileReadExact(accessor->read_file, &chunk_header, STS_CHUNK_HEADER_SIZE);
1938 andres 459 CBC 108 : accessor->read_bytes = STS_CHUNK_HEADER_SIZE;
1938 andres 460 GIC 108 : if (chunk_header.overflow == 0)
1938 andres 461 LBC 0 : ereport(ERROR,
462 : (errcode_for_file_access(),
463 : errmsg("unexpected chunk in shared tuplestore temporary file"),
464 : errdetail_internal("Expected overflow chunk.")));
1938 andres 465 GIC 108 : accessor->read_next_page += STS_CHUNK_PAGES;
466 108 : this_chunk_size = Min(remaining_size,
467 : BLCKSZ * STS_CHUNK_PAGES -
1938 andres 468 ECB : STS_CHUNK_HEADER_SIZE);
83 peter 469 GNC 108 : BufFileReadExact(accessor->read_file, destination, this_chunk_size);
1938 andres 470 GIC 108 : accessor->read_bytes += this_chunk_size;
1938 andres 471 CBC 108 : remaining_size -= this_chunk_size;
472 108 : destination += this_chunk_size;
473 :
474 : /*
1938 andres 475 ECB : * These will be used to count regular tuples following the oversized
476 : * tuple that spilled into this overflow chunk.
477 : */
1938 andres 478 GIC 108 : accessor->read_ntuples = 0;
1938 andres 479 CBC 108 : accessor->read_ntuples_available = chunk_header.ntuples;
1938 andres 480 ECB : }
481 :
1938 andres 482 CBC 1250649 : tuple = (MinimalTuple) accessor->read_buffer;
1938 andres 483 GIC 1250649 : tuple->t_len = size;
484 :
1938 andres 485 CBC 1250649 : return tuple;
486 : }
1938 andres 487 ECB :
488 : /*
489 : * Get the next tuple in the current parallel scan.
490 : */
491 : MinimalTuple
1938 andres 492 CBC 1251472 : sts_parallel_scan_next(SharedTuplestoreAccessor *accessor, void *meta_data)
493 : {
494 : SharedTuplestoreParticipant *p;
495 : BlockNumber read_page;
496 : bool eof;
1938 andres 497 ECB :
498 : for (;;)
499 : {
500 : /* Can we read more tuples from the current chunk? */
1938 andres 501 CBC 1254811 : if (accessor->read_ntuples < accessor->read_ntuples_available)
502 1250649 : return sts_read_tuple(accessor, meta_data);
1938 andres 503 ECB :
504 : /* Find the location of a new chunk to read. */
1938 andres 505 GIC 4162 : p = &accessor->sts->participants[accessor->read_participant];
506 :
507 4162 : LWLockAcquire(&p->lock, LW_EXCLUSIVE);
1938 andres 508 ECB : /* We can skip directly past overflow pages we know about. */
1938 andres 509 GBC 4162 : if (p->read_page < accessor->read_next_page)
1938 andres 510 GIC 12 : p->read_page = accessor->read_next_page;
511 4162 : eof = p->read_page >= p->npages;
512 4162 : if (!eof)
1938 andres 513 ECB : {
514 : /* Claim the next chunk. */
1938 andres 515 GIC 1856 : read_page = p->read_page;
516 : /* Advance the read head for the next reader. */
517 1856 : p->read_page += STS_CHUNK_PAGES;
518 1856 : accessor->read_next_page = p->read_page;
1938 andres 519 ECB : }
1938 andres 520 GIC 4162 : LWLockRelease(&p->lock);
1938 andres 521 EUB :
1938 andres 522 GBC 4162 : if (!eof)
1938 andres 523 EUB : {
524 : SharedTuplestoreChunk chunk_header;
1938 andres 525 ECB :
526 : /* Make sure we have the file open. */
1938 andres 527 CBC 1856 : if (accessor->read_file == NULL)
528 : {
529 : char name[MAXPGPATH];
530 :
1938 andres 531 GIC 1226 : sts_filename(name, accessor, accessor->read_participant);
532 1226 : accessor->read_file =
584 akapila 533 CBC 1226 : BufFileOpenFileSet(&accessor->fileset->fs, name, O_RDONLY,
534 : false);
1938 andres 535 ECB : }
536 :
537 : /* Seek and load the chunk header. */
1938 andres 538 GIC 1856 : if (BufFileSeekBlock(accessor->read_file, read_page) != 0)
1938 andres 539 UIC 0 : ereport(ERROR,
540 : (errcode_for_file_access(),
541 : errmsg("could not seek to block %u in shared tuplestore temporary file",
542 : read_page)));
83 peter 543 GNC 1856 : BufFileReadExact(accessor->read_file, &chunk_header, STS_CHUNK_HEADER_SIZE);
544 :
545 : /*
546 : * If this is an overflow chunk, we skip it and any following
1938 andres 547 ECB : * overflow chunks all at once.
548 : */
1938 andres 549 GIC 1856 : if (chunk_header.overflow > 0)
550 : {
1938 andres 551 UIC 0 : accessor->read_next_page = read_page +
552 0 : chunk_header.overflow * STS_CHUNK_PAGES;
553 0 : continue;
1938 andres 554 ECB : }
555 :
1938 andres 556 CBC 1856 : accessor->read_ntuples = 0;
557 1856 : accessor->read_ntuples_available = chunk_header.ntuples;
1938 andres 558 GIC 1856 : accessor->read_bytes = STS_CHUNK_HEADER_SIZE;
559 :
560 : /* Go around again, so we can get a tuple from this chunk. */
561 : }
562 : else
563 : {
564 2306 : if (accessor->read_file != NULL)
565 : {
566 1226 : BufFileClose(accessor->read_file);
567 1226 : accessor->read_file = NULL;
568 : }
569 :
570 : /*
571 : * Try the next participant's file. If we've gone full circle,
572 : * we're done.
573 : */
574 2306 : accessor->read_participant = (accessor->read_participant + 1) %
575 2306 : accessor->sts->nparticipants;
576 2306 : if (accessor->read_participant == accessor->participant)
577 823 : break;
578 1483 : accessor->read_next_page = 0;
579 :
580 : /* Go around again, so we can get a chunk from this file. */
581 : }
582 : }
583 :
584 823 : return NULL;
585 : }
586 :
587 : /*
588 : * Create the name used for the BufFile that a given participant will write.
589 : */
590 : static void
591 2419 : sts_filename(char *name, SharedTuplestoreAccessor *accessor, int participant)
592 : {
593 2419 : snprintf(name, MAXPGPATH, "%s.p%d", accessor->sts->name, participant);
594 2419 : }
|