Age Owner TLA Line data Source code
1 : /*-------------------------------------------------------------------------
2 : *
3 : * nodeHashjoin.c
4 : * Routines to handle hash join nodes
5 : *
6 : * Portions Copyright (c) 1996-2023, PostgreSQL Global Development Group
7 : * Portions Copyright (c) 1994, Regents of the University of California
8 : *
9 : *
10 : * IDENTIFICATION
11 : * src/backend/executor/nodeHashjoin.c
12 : *
13 : * PARALLELISM
14 : *
15 : * Hash joins can participate in parallel query execution in several ways. A
16 : * parallel-oblivious hash join is one where the node is unaware that it is
17 : * part of a parallel plan. In this case, a copy of the inner plan is used to
18 : * build a copy of the hash table in every backend, and the outer plan could
19 : * either be built from a partial or complete path, so that the results of the
20 : * hash join are correspondingly either partial or complete. A parallel-aware
21 : * hash join is one that behaves differently, coordinating work between
22 : * backends, and appears as Parallel Hash Join in EXPLAIN output. A Parallel
23 : * Hash Join always appears with a Parallel Hash node.
24 : *
25 : * Parallel-aware hash joins use the same per-backend state machine to track
26 : * progress through the hash join algorithm as parallel-oblivious hash joins.
27 : * In a parallel-aware hash join, there is also a shared state machine that
28 : * co-operating backends use to synchronize their local state machines and
29 : * program counters. The shared state machine is managed with a Barrier IPC
30 : * primitive. When all attached participants arrive at a barrier, the phase
31 : * advances and all waiting participants are released.
32 : *
33 : * When a participant begins working on a parallel hash join, it must first
34 : * figure out how much progress has already been made, because participants
35 : * don't wait for each other to begin. For this reason there are switch
36 : * statements at key points in the code where we have to synchronize our local
37 : * state machine with the phase, and then jump to the correct part of the
38 : * algorithm so that we can get started.
39 : *
40 : * One barrier called build_barrier is used to coordinate the hashing phases.
41 : * The phase is represented by an integer which begins at zero and increments
42 : * one by one, but in the code it is referred to by symbolic names as follows.
43 : * An asterisk indicates a phase that is performed by a single arbitrarily
44 : * chosen process.
45 : *
46 : * PHJ_BUILD_ELECT -- initial state
47 : * PHJ_BUILD_ALLOCATE* -- one sets up the batches and table 0
48 : * PHJ_BUILD_HASH_INNER -- all hash the inner rel
49 : * PHJ_BUILD_HASH_OUTER -- (multi-batch only) all hash the outer
50 : * PHJ_BUILD_RUN -- building done, probing can begin
51 : * PHJ_BUILD_FREE* -- all work complete, one frees batches
52 : *
53 : * While in the phase PHJ_BUILD_HASH_INNER a separate pair of barriers may
54 : * be used repeatedly as required to coordinate expansions in the number of
55 : * batches or buckets. Their phases are as follows:
56 : *
57 : * PHJ_GROW_BATCHES_ELECT -- initial state
58 : * PHJ_GROW_BATCHES_REALLOCATE* -- one allocates new batches
59 : * PHJ_GROW_BATCHES_REPARTITION -- all repartition
60 : * PHJ_GROW_BATCHES_DECIDE* -- one detects skew and cleans up
61 : * PHJ_GROW_BATCHES_FINISH -- finished one growth cycle
62 : *
63 : * PHJ_GROW_BUCKETS_ELECT -- initial state
64 : * PHJ_GROW_BUCKETS_REALLOCATE* -- one allocates new buckets
65 : * PHJ_GROW_BUCKETS_REINSERT -- all insert tuples
66 : *
67 : * If the planner got the number of batches and buckets right, those won't be
68 : * necessary, but on the other hand we might finish up needing to expand the
69 : * buckets or batches multiple times while hashing the inner relation to stay
70 : * within our memory budget and load factor target. For that reason it's a
71 : * separate pair of barriers using circular phases.
72 : *
73 : * The PHJ_BUILD_HASH_OUTER phase is required only for multi-batch joins,
74 : * because we need to divide the outer relation into batches up front in order
75 : * to be able to process batches entirely independently. In contrast, the
76 : * parallel-oblivious algorithm simply throws tuples 'forward' to 'later'
77 : * batches whenever it encounters them while scanning and probing, which it
78 : * can do because it processes batches in serial order.
79 : *
80 : * Once PHJ_BUILD_RUN is reached, backends then split up and process
81 : * different batches, or gang up and work together on probing batches if there
82 : * aren't enough to go around. For each batch there is a separate barrier
83 : * with the following phases:
84 : *
85 : * PHJ_BATCH_ELECT -- initial state
86 : * PHJ_BATCH_ALLOCATE* -- one allocates buckets
87 : * PHJ_BATCH_LOAD -- all load the hash table from disk
88 : * PHJ_BATCH_PROBE -- all probe
89 : * PHJ_BATCH_SCAN* -- one does right/right-anti/full unmatched scan
90 : * PHJ_BATCH_FREE* -- one frees memory
91 : *
92 : * Batch 0 is a special case, because it starts out in phase
93 : * PHJ_BATCH_PROBE; populating batch 0's hash table is done during
94 : * PHJ_BUILD_HASH_INNER so we can skip loading.
95 : *
96 : * Initially we try to plan for a single-batch hash join using the combined
97 : * hash_mem of all participants to create a large shared hash table. If that
98 : * turns out either at planning or execution time to be impossible then we
99 : * fall back to regular hash_mem sized hash tables.
100 : *
101 : * To avoid deadlocks, we never wait for any barrier unless it is known that
102 : * all other backends attached to it are actively executing the node or have
103 : * finished. Practically, that means that we never emit a tuple while attached
104 : * to a barrier, unless the barrier has reached a phase that means that no
105 : * process will wait on it again. We emit tuples while attached to the build
106 : * barrier in phase PHJ_BUILD_RUN, and to a per-batch barrier in phase
107 : * PHJ_BATCH_PROBE. These are advanced to PHJ_BUILD_FREE and PHJ_BATCH_SCAN
108 : * respectively without waiting, using BarrierArriveAndDetach() and
109 : * BarrierArriveAndDetachExceptLast() respectively. The last to detach
110 : * receives a different return value so that it knows that it's safe to
111 : * clean up. Any straggler process that attaches after that phase is reached
112 : * will see that it's too late to participate or access the relevant shared
113 : * memory objects.
114 : *
115 : *-------------------------------------------------------------------------
116 : */
117 :
118 : #include "postgres.h"
119 :
120 : #include "access/htup_details.h"
121 : #include "access/parallel.h"
122 : #include "executor/executor.h"
123 : #include "executor/hashjoin.h"
124 : #include "executor/nodeHash.h"
125 : #include "executor/nodeHashjoin.h"
126 : #include "miscadmin.h"
127 : #include "pgstat.h"
128 : #include "utils/memutils.h"
129 : #include "utils/sharedtuplestore.h"
130 :
131 :
132 : /*
133 : * States of the ExecHashJoin state machine
134 : */
135 : #define HJ_BUILD_HASHTABLE 1
136 : #define HJ_NEED_NEW_OUTER 2
137 : #define HJ_SCAN_BUCKET 3
138 : #define HJ_FILL_OUTER_TUPLE 4
139 : #define HJ_FILL_INNER_TUPLES 5
140 : #define HJ_NEED_NEW_BATCH 6
141 :
142 : /* Returns true if doing null-fill on outer relation */
143 : #define HJ_FILL_OUTER(hjstate) ((hjstate)->hj_NullInnerTupleSlot != NULL)
144 : /* Returns true if doing null-fill on inner relation */
145 : #define HJ_FILL_INNER(hjstate) ((hjstate)->hj_NullOuterTupleSlot != NULL)
146 :
147 : static TupleTableSlot *ExecHashJoinOuterGetTuple(PlanState *outerNode,
148 : HashJoinState *hjstate,
149 : uint32 *hashvalue);
150 : static TupleTableSlot *ExecParallelHashJoinOuterGetTuple(PlanState *outerNode,
151 : HashJoinState *hjstate,
152 : uint32 *hashvalue);
153 : static TupleTableSlot *ExecHashJoinGetSavedTuple(HashJoinState *hjstate,
154 : BufFile *file,
155 : uint32 *hashvalue,
156 : TupleTableSlot *tupleSlot);
157 : static bool ExecHashJoinNewBatch(HashJoinState *hjstate);
158 : static bool ExecParallelHashJoinNewBatch(HashJoinState *hjstate);
159 : static void ExecParallelHashJoinPartitionOuter(HashJoinState *hjstate);
160 :
161 :
162 : /* ----------------------------------------------------------------
163 : * ExecHashJoinImpl
164 : *
165 : * This function implements the Hybrid Hashjoin algorithm. It is marked
166 : * with an always-inline attribute so that ExecHashJoin() and
167 : * ExecParallelHashJoin() can inline it. Compilers that respect the
168 : * attribute should create versions specialized for parallel == true and
169 : * parallel == false with unnecessary branches removed.
170 : *
171 : * Note: the relation we build hash table on is the "inner"
172 : * the other one is "outer".
173 : * ----------------------------------------------------------------
174 : */
175 : static pg_attribute_always_inline TupleTableSlot *
1936 andres 176 GIC 5030235 : ExecHashJoinImpl(PlanState *pstate, bool parallel)
177 : {
2092 178 5030235 : HashJoinState *node = castNode(HashJoinState, pstate);
179 : PlanState *outerNode;
180 : HashState *hashNode;
2217 andres 181 ECB : ExprState *joinqual;
182 : ExprState *otherqual;
9344 bruce 183 : ExprContext *econtext;
184 : HashJoinTable hashtable;
185 : TupleTableSlot *outerTupleSlot;
186 : uint32 hashvalue;
187 : int batchno;
188 : ParallelHashJoinState *parallel_state;
189 :
190 : /*
191 : * get information from HashJoin node
192 : */
7430 tgl 193 GIC 5030235 : joinqual = node->js.joinqual;
194 5030235 : otherqual = node->js.ps.qual;
195 5030235 : hashNode = (HashState *) innerPlanState(node);
196 5030235 : outerNode = outerPlanState(node);
197 5030235 : hashtable = node->hj_HashTable;
7430 tgl 198 CBC 5030235 : econtext = node->js.ps.ps_ExprContext;
1936 andres 199 5030235 : parallel_state = hashNode->parallel_state;
9770 scrappy 200 ECB :
8053 bruce 201 : /*
202 : * Reset per-tuple memory context to free any expression evaluation
2271 andres 203 : * storage allocated in the previous tuple cycle.
8263 tgl 204 : */
8263 tgl 205 GIC 5030235 : ResetExprContext(econtext);
206 :
207 : /*
208 : * run the hash join state machine
209 : */
4483 tgl 210 ECB : for (;;)
211 : {
212 : /*
213 : * It's possible to iterate this loop many times before returning a
214 : * tuple, in some pathological cases such as needing to move much of
215 : * the current batch to a later batch. So let's check for interrupts
216 : * each time through.
217 : */
2084 andres 218 GIC 18182291 : CHECK_FOR_INTERRUPTS();
219 :
4483 tgl 220 18182291 : switch (node->hj_JoinState)
221 : {
222 11658 : case HJ_BUILD_HASHTABLE:
4382 bruce 223 ECB :
224 : /*
4483 tgl 225 : * First time through: build hash table for inner relation.
226 : */
4483 tgl 227 CBC 11658 : Assert(hashtable == NULL);
228 :
229 : /*
230 : * If the outer relation is completely empty, and it's not
231 : * right/right-anti/full join, we can quit without building
232 : * the hash table. However, for an inner join it is only a
233 : * win to check this when the outer relation's startup cost is
234 : * less than the projected cost of building the hash table.
235 : * Otherwise it's best to build the hash table first and see
236 : * if the inner relation is empty. (When it's a left join, we
237 : * should always make this check, since we aren't going to be
238 : * able to skip the join on the strength of an empty inner
239 : * relation anyway.)
240 : *
241 : * If we are rescanning the join, we make use of information
242 : * gained on the previous scan: don't bother to try the
243 : * prefetch if the previous scan found the outer relation
244 : * nonempty. This is not 100% reliable since with new
245 : * parameters the outer relation might yield different
246 : * results, but it's a good heuristic.
247 : *
248 : * The only way to make the check is to try to fetch a tuple
249 : * from the outer plan node. If we succeed, we have to stash
250 : * it away for later consumption by ExecHashJoinOuterGetTuple.
251 : */
4483 tgl 252 GIC 11658 : if (HJ_FILL_INNER(node))
253 : {
254 : /* no chance to not build the hash table */
255 2505 : node->hj_FirstOuterTupleSlot = NULL;
256 : }
1936 andres 257 CBC 9153 : else if (parallel)
258 : {
259 : /*
1936 andres 260 ECB : * The empty-outer optimization is not implemented for
261 : * shared hash tables, because no one participant can
262 : * determine that there are no outer tuples, and it's not
263 : * yet clear that it's worth the synchronization overhead
264 : * of reaching consensus to figure that out. So we have
265 : * to build the hash table.
266 : */
1936 andres 267 GIC 162 : node->hj_FirstOuterTupleSlot = NULL;
268 : }
4483 tgl 269 8991 : else if (HJ_FILL_OUTER(node) ||
270 6998 : (outerNode->plan->startup_cost < hashNode->ps.plan->total_cost &&
271 6599 : !node->hj_OuterNotEmpty))
4483 tgl 272 ECB : {
4483 tgl 273 GIC 8225 : node->hj_FirstOuterTupleSlot = ExecProcNode(outerNode);
4483 tgl 274 CBC 8225 : if (TupIsNull(node->hj_FirstOuterTupleSlot))
4483 tgl 275 ECB : {
4483 tgl 276 CBC 1772 : node->hj_OuterNotEmpty = false;
4483 tgl 277 GIC 1772 : return NULL;
4483 tgl 278 ECB : }
279 : else
4483 tgl 280 GIC 6453 : node->hj_OuterNotEmpty = true;
4483 tgl 281 ECB : }
282 : else
4483 tgl 283 GIC 766 : node->hj_FirstOuterTupleSlot = NULL;
284 :
4483 tgl 285 ECB : /*
286 : * Create the hash table. If using Parallel Hash, then
287 : * whoever gets here first will create the hash table and any
1936 andres 288 : * later arrivals will merely attach to it.
289 : */
1936 andres 290 GIC 9886 : hashtable = ExecHashTableCreate(hashNode,
291 : node->hj_HashOperators,
292 : node->hj_Collations,
4483 tgl 293 9886 : HJ_FILL_INNER(node));
294 9886 : node->hj_HashTable = hashtable;
8053 bruce 295 ECB :
296 : /*
297 : * Execute the Hash node, to build the hash table. If using
1936 andres 298 : * Parallel Hash, then we'll try to help hashing unless we
299 : * arrived too late.
300 : */
4483 tgl 301 GIC 9886 : hashNode->hashtable = hashtable;
302 9886 : (void) MultiExecProcNode((PlanState *) hashNode);
303 :
304 : /*
305 : * If the inner relation is completely empty, and we're not
4483 tgl 306 ECB : * doing a left outer join, we can quit without scanning the
307 : * outer relation.
308 : */
4483 tgl 309 GIC 9886 : if (hashtable->totalTuples == 0 && !HJ_FILL_OUTER(node))
310 : {
19 tmunro 311 521 : if (parallel)
312 : {
313 : /*
314 : * Advance the build barrier to PHJ_BUILD_RUN before
315 : * proceeding so we can negotiate resource cleanup.
316 : */
19 tmunro 317 UIC 0 : Barrier *build_barrier = ¶llel_state->build_barrier;
318 :
17 tmunro 319 UNC 0 : while (BarrierPhase(build_barrier) < PHJ_BUILD_RUN)
19 tmunro 320 UIC 0 : BarrierArriveAndWait(build_barrier, 0);
19 tmunro 321 EUB : }
4483 tgl 322 GIC 521 : return NULL;
19 tmunro 323 EUB : }
6341 tgl 324 :
325 : /*
4382 bruce 326 ECB : * need to remember whether nbatch has increased since we
327 : * began scanning the outer relation
328 : */
4483 tgl 329 GIC 9365 : hashtable->nbatch_outstart = hashtable->nbatch;
330 :
331 : /*
332 : * Reset OuterNotEmpty for scan. (It's OK if we fetched a
4483 tgl 333 ECB : * tuple above, because ExecHashJoinOuterGetTuple will
334 : * immediately set it again.)
335 : */
4483 tgl 336 GIC 9365 : node->hj_OuterNotEmpty = false;
337 :
1936 andres 338 9365 : if (parallel)
339 189 : {
1936 andres 340 ECB : Barrier *build_barrier;
341 :
1936 andres 342 CBC 189 : build_barrier = ¶llel_state->build_barrier;
17 tmunro 343 GNC 189 : Assert(BarrierPhase(build_barrier) == PHJ_BUILD_HASH_OUTER ||
344 : BarrierPhase(build_barrier) == PHJ_BUILD_RUN ||
345 : BarrierPhase(build_barrier) == PHJ_BUILD_FREE);
346 189 : if (BarrierPhase(build_barrier) == PHJ_BUILD_HASH_OUTER)
1936 andres 347 ECB : {
348 : /*
349 : * If multi-batch, we need to hash the outer relation
350 : * up front.
351 : */
1936 andres 352 GIC 151 : if (hashtable->nbatch > 1)
353 87 : ExecParallelHashJoinPartitionOuter(node);
354 151 : BarrierArriveAndWait(build_barrier,
355 : WAIT_EVENT_HASH_BUILD_HASH_OUTER);
1936 andres 356 ECB : }
17 tmunro 357 GNC 38 : else if (BarrierPhase(build_barrier) == PHJ_BUILD_FREE)
19 tmunro 358 ECB : {
359 : /*
360 : * If we attached so late that the job is finished and
361 : * the batch state has been freed, we can return
362 : * immediately.
363 : */
19 tmunro 364 UIC 0 : return NULL;
365 : }
366 :
367 : /* Each backend should now select a batch to work on. */
17 tmunro 368 GNC 189 : Assert(BarrierPhase(build_barrier) == PHJ_BUILD_RUN);
1936 andres 369 GIC 189 : hashtable->curbatch = -1;
370 189 : node->hj_JoinState = HJ_NEED_NEW_BATCH;
371 :
1936 andres 372 CBC 189 : continue;
1936 andres 373 ECB : }
374 : else
1936 andres 375 GIC 9176 : node->hj_JoinState = HJ_NEED_NEW_OUTER;
6405 tgl 376 ECB :
377 : /* FALL THRU */
378 :
4483 tgl 379 CBC 8723214 : case HJ_NEED_NEW_OUTER:
380 :
381 : /*
382 : * We don't have an outer tuple, try to get the next one
6405 tgl 383 ECB : */
1936 andres 384 GIC 8723214 : if (parallel)
385 : outerTupleSlot =
386 1080511 : ExecParallelHashJoinOuterGetTuple(outerNode, node,
387 : &hashvalue);
1936 andres 388 ECB : else
389 : outerTupleSlot =
1936 andres 390 CBC 7642703 : ExecHashJoinOuterGetTuple(outerNode, node, &hashvalue);
391 :
4483 tgl 392 GIC 8723214 : if (TupIsNull(outerTupleSlot))
393 : {
4483 tgl 394 ECB : /* end of batch, or maybe whole join */
4483 tgl 395 GIC 10589 : if (HJ_FILL_INNER(node))
4483 tgl 396 ECB : {
397 : /* set up to scan for unmatched inner tuples */
9 tmunro 398 GNC 2427 : if (parallel)
399 : {
400 : /*
401 : * Only one process is currently allow to handle
402 : * each batch's unmatched tuples, in a parallel
403 : * join.
404 : */
405 43 : if (ExecParallelPrepHashTableForUnmatched(node))
406 30 : node->hj_JoinState = HJ_FILL_INNER_TUPLES;
407 : else
408 13 : node->hj_JoinState = HJ_NEED_NEW_BATCH;
409 : }
410 : else
411 : {
412 2384 : ExecPrepHashTableForUnmatched(node);
413 2384 : node->hj_JoinState = HJ_FILL_INNER_TUPLES;
414 : }
415 : }
416 : else
4483 tgl 417 CBC 8162 : node->hj_JoinState = HJ_NEED_NEW_BATCH;
4483 tgl 418 GIC 10589 : continue;
419 : }
420 :
421 8712625 : econtext->ecxt_outertuple = outerTupleSlot;
422 8712625 : node->hj_MatchedOuter = false;
423 :
4483 tgl 424 ECB : /*
425 : * Find the corresponding bucket for this tuple in the main
426 : * hash table or skew hash table.
427 : */
4483 tgl 428 GIC 8712625 : node->hj_CurHashValue = hashvalue;
429 8712625 : ExecHashGetBucketAndBatch(hashtable, hashvalue,
430 : &node->hj_CurBucketNo, &batchno);
4483 tgl 431 CBC 8712625 : node->hj_CurSkewBucketNo = ExecHashGetSkewBucket(hashtable,
4483 tgl 432 ECB : hashvalue);
4483 tgl 433 GIC 8712625 : node->hj_CurTuple = NULL;
434 :
435 : /*
4483 tgl 436 ECB : * The tuple might not belong to the current batch (where
437 : * "current batch" includes the skew buckets if any).
438 : */
4483 tgl 439 GIC 8712625 : if (batchno != hashtable->curbatch &&
4483 tgl 440 CBC 735696 : node->hj_CurSkewBucketNo == INVALID_SKEW_BUCKET_NO)
441 735096 : {
442 : bool shouldFree;
1606 andres 443 GIC 735096 : MinimalTuple mintuple = ExecFetchSlotMinimalTuple(outerTupleSlot,
444 : &shouldFree);
445 :
446 : /*
4483 tgl 447 ECB : * Need to postpone this outer tuple to a later batch.
448 : * Save it in the corresponding outer-batch file.
449 : */
1936 andres 450 CBC 735096 : Assert(parallel_state == NULL);
4483 tgl 451 GIC 735096 : Assert(batchno > hashtable->curbatch);
1606 andres 452 CBC 735096 : ExecHashJoinSaveTuple(mintuple, hashvalue,
2118 tgl 453 GIC 735096 : &hashtable->outerBatchFile[batchno]);
454 :
1606 andres 455 735096 : if (shouldFree)
456 735096 : heap_free_minimal_tuple(mintuple);
457 :
4483 tgl 458 ECB : /* Loop around, staying in HJ_NEED_NEW_OUTER state */
4483 tgl 459 CBC 735096 : continue;
4483 tgl 460 ECB : }
461 :
462 : /* OK, let's scan the bucket for matches */
4483 tgl 463 GIC 7977529 : node->hj_JoinState = HJ_SCAN_BUCKET;
464 :
465 : /* FALL THRU */
466 :
467 11062550 : case HJ_SCAN_BUCKET:
468 :
4483 tgl 469 ECB : /*
470 : * Scan the selected hash bucket for matches to current outer
471 : */
1936 andres 472 CBC 11062550 : if (parallel)
473 : {
474 2100024 : if (!ExecParallelScanHashBucket(node, econtext))
1936 andres 475 ECB : {
476 : /* out of matches; check for possible outer-join fill */
1936 andres 477 GIC 1080012 : node->hj_JoinState = HJ_FILL_OUTER_TUPLE;
1936 andres 478 CBC 1080012 : continue;
479 : }
480 : }
481 : else
1936 andres 482 ECB : {
1936 andres 483 GIC 8962526 : if (!ExecScanHashBucket(node, econtext))
484 : {
485 : /* out of matches; check for possible outer-join fill */
1936 andres 486 CBC 4844799 : node->hj_JoinState = HJ_FILL_OUTER_TUPLE;
1936 andres 487 GIC 4844799 : continue;
488 : }
489 : }
490 :
5350 tgl 491 ECB : /*
492 : * We've got a match, but still need to test non-hashed quals.
4483 493 : * ExecScanHashBucket already set up all the state needed to
494 : * call ExecQual.
495 : *
496 : * If we pass the qual, then save state for next call and have
497 : * ExecProject form the projection, store it in the tuple
498 : * table, and return the slot.
499 : *
500 : * Only the joinquals determine tuple match status, but all
501 : * quals must pass to actually return the tuple.
5350 502 : */
2217 andres 503 GIC 5137739 : if (joinqual == NULL || ExecQual(joinqual, econtext))
504 : {
4483 tgl 505 CBC 5062757 : node->hj_MatchedOuter = true;
1168 tmunro 506 ECB :
507 :
508 : /*
509 : * This is really only needed if HJ_FILL_INNER(node), but
510 : * we'll avoid the branch and just set it always.
511 : */
9 tmunro 512 GNC 5062757 : if (!HeapTupleHeaderHasMatch(HJTUPLE_MINTUPLE(node->hj_CurTuple)))
1168 tmunro 513 CBC 3070831 : HeapTupleHeaderSetMatch(HJTUPLE_MINTUPLE(node->hj_CurTuple));
514 :
515 : /* In an antijoin, we never return a matched tuple */
4483 tgl 516 GIC 5062757 : if (node->js.jointype == JOIN_ANTI)
517 : {
518 772331 : node->hj_JoinState = HJ_NEED_NEW_OUTER;
4483 tgl 519 CBC 772331 : continue;
4483 tgl 520 ECB : }
521 :
522 : /*
523 : * In a right-antijoin, we never return a matched tuple.
524 : * And we need to stay on the current outer tuple to
525 : * continue scanning the inner side for matches.
526 : */
4 tgl 527 GNC 4290426 : if (node->js.jointype == JOIN_RIGHT_ANTI)
528 14426 : continue;
529 :
530 : /*
2193 tgl 531 ECB : * If we only need to join to the first matching inner
532 : * tuple, then consider returning this one, but after that
533 : * continue with next outer tuple.
4483 534 : */
2193 tgl 535 GIC 4276000 : if (node->js.single_match)
4483 536 1280353 : node->hj_JoinState = HJ_NEED_NEW_OUTER;
537 :
2217 andres 538 4276000 : if (otherqual == NULL || ExecQual(otherqual, econtext))
2271 539 4186260 : return ExecProject(node->js.ps.ps_ProjInfo);
540 : else
4217 tgl 541 89740 : InstrCountFiltered2(node, 1);
7377 tgl 542 ECB : }
4217 543 : else
4217 tgl 544 GIC 74982 : InstrCountFiltered1(node, 1);
4483 545 164722 : break;
546 :
547 5924811 : case HJ_FILL_OUTER_TUPLE:
548 :
549 : /*
4483 tgl 550 ECB : * The current outer tuple has run out of matches, so check
4382 bruce 551 : * whether to emit a dummy outer-join tuple. Whether we emit
552 : * one or not, the next state is NEED_NEW_OUTER.
5350 tgl 553 : */
4483 tgl 554 CBC 5924811 : node->hj_JoinState = HJ_NEED_NEW_OUTER;
555 :
556 5924811 : if (!node->hj_MatchedOuter &&
4483 tgl 557 GIC 3573372 : HJ_FILL_OUTER(node))
558 : {
4483 tgl 559 ECB : /*
560 : * Generate a fake join tuple with nulls for the inner
561 : * tuple, and return it if it passes the non-join quals.
562 : */
4483 tgl 563 GIC 818897 : econtext->ecxt_innertuple = node->hj_NullInnerTupleSlot;
564 :
2217 andres 565 818897 : if (otherqual == NULL || ExecQual(otherqual, econtext))
2271 566 402058 : return ExecProject(node->js.ps.ps_ProjInfo);
567 : else
4217 tgl 568 416839 : InstrCountFiltered2(node, 1);
4483 tgl 569 ECB : }
4483 tgl 570 GIC 5522753 : break;
4483 tgl 571 ECB :
4483 tgl 572 CBC 435988 : case HJ_FILL_INNER_TUPLES:
573 :
574 : /*
575 : * We have finished a batch, but we are doing
576 : * right/right-anti/full join, so any unmatched inner tuples
577 : * in the hashtable have to be emitted before we continue to
578 : * the next batch.
8244 tgl 579 ECB : */
9 tmunro 580 GNC 811946 : if (!(parallel ? ExecParallelScanHashTableForUnmatched(node, econtext)
581 375958 : : ExecScanHashTableForUnmatched(node, econtext)))
4483 tgl 582 ECB : {
583 : /* no more unmatched tuples */
4483 tgl 584 GIC 2411 : node->hj_JoinState = HJ_NEED_NEW_BATCH;
4483 tgl 585 CBC 2411 : continue;
586 : }
8244 tgl 587 ECB :
588 : /*
4483 589 : * Generate a fake join tuple with nulls for the outer tuple,
590 : * and return it if it passes the non-join quals.
591 : */
4483 tgl 592 GIC 433577 : econtext->ecxt_outertuple = node->hj_NullOuterTupleSlot;
593 :
2217 andres 594 433577 : if (otherqual == NULL || ExecQual(otherqual, econtext))
2271 595 430100 : return ExecProject(node->js.ps.ps_ProjInfo);
596 : else
4217 tgl 597 CBC 3477 : InstrCountFiltered2(node, 1);
4483 598 3477 : break;
599 :
4483 tgl 600 GIC 10775 : case HJ_NEED_NEW_BATCH:
4382 bruce 601 ECB :
4483 tgl 602 : /*
603 : * Try to advance to next batch. Done if there are no more.
604 : */
1936 andres 605 GIC 10775 : if (parallel)
606 : {
607 688 : if (!ExecParallelHashJoinNewBatch(node))
608 189 : return NULL; /* end of parallel-aware join */
1936 andres 609 ECB : }
610 : else
611 : {
1936 andres 612 CBC 10087 : if (!ExecHashJoinNewBatch(node))
1936 andres 613 GIC 9335 : return NULL; /* end of parallel-oblivious join */
1936 andres 614 ECB : }
4483 tgl 615 CBC 1251 : node->hj_JoinState = HJ_NEED_NEW_OUTER;
4483 tgl 616 GIC 1251 : break;
4483 tgl 617 ECB :
4483 tgl 618 UIC 0 : default:
619 0 : elog(ERROR, "unrecognized hashjoin state: %d",
620 : (int) node->hj_JoinState);
621 : }
9345 bruce 622 ECB : }
623 : }
9770 scrappy 624 :
1936 andres 625 : /* ----------------------------------------------------------------
626 : * ExecHashJoin
627 : *
628 : * Parallel-oblivious version.
629 : * ----------------------------------------------------------------
630 : */
631 : static TupleTableSlot * /* return: a tuple or NULL */
1936 andres 632 CBC 3890034 : ExecHashJoin(PlanState *pstate)
1936 andres 633 ECB : {
634 : /*
1936 andres 635 EUB : * On sufficiently smart compilers this should be inlined with the
636 : * parallel-aware branches removed.
637 : */
1936 andres 638 GIC 3890034 : return ExecHashJoinImpl(pstate, false);
639 : }
640 :
641 : /* ----------------------------------------------------------------
642 : * ExecParallelHashJoin
643 : *
644 : * Parallel-aware version.
645 : * ----------------------------------------------------------------
646 : */
647 : static TupleTableSlot * /* return: a tuple or NULL */
648 1140201 : ExecParallelHashJoin(PlanState *pstate)
1936 andres 649 ECB : {
650 : /*
651 : * On sufficiently smart compilers this should be inlined with the
652 : * parallel-oblivious branches removed.
653 : */
1936 andres 654 GIC 1140201 : return ExecHashJoinImpl(pstate, true);
1936 andres 655 ECB : }
656 :
657 : /* ----------------------------------------------------------------
658 : * ExecInitHashJoin
659 : *
660 : * Init routine for HashJoin node.
661 : * ----------------------------------------------------------------
662 : */
663 : HashJoinState *
6249 tgl 664 GIC 14214 : ExecInitHashJoin(HashJoin *node, EState *estate, int eflags)
9770 scrappy 665 ECB : {
666 : HashJoinState *hjstate;
667 : Plan *outerNode;
668 : Hash *hashNode;
669 : TupleDesc outerDesc,
670 : innerDesc;
1606 andres 671 : const TupleTableSlotOps *ops;
672 :
673 : /* check for unsupported flags */
6249 tgl 674 GIC 14214 : Assert(!(eflags & (EXEC_FLAG_BACKWARD | EXEC_FLAG_MARK)));
675 :
676 : /*
677 : * create state structure
678 : */
9345 bruce 679 14214 : hjstate = makeNode(HashJoinState);
7430 tgl 680 14214 : hjstate->js.ps.plan = (Plan *) node;
7430 tgl 681 CBC 14214 : hjstate->js.ps.state = estate;
682 :
683 : /*
684 : * See ExecHashJoinInitializeDSM() and ExecHashJoinInitializeWorker()
685 : * where this function may be replaced with a parallel version, if we
686 : * managed to launch a parallel query.
687 : */
2092 andres 688 GIC 14214 : hjstate->js.ps.ExecProcNode = ExecHashJoin;
1878 689 14214 : hjstate->js.jointype = node->join.jointype;
690 :
8053 bruce 691 ECB : /*
692 : * Miscellaneous initialization
693 : *
694 : * create expression context for node
695 : */
7430 tgl 696 CBC 14214 : ExecAssignExprContext(estate, &hjstate->js.ps);
7430 tgl 697 ECB :
8053 bruce 698 : /*
699 : * initialize child nodes
700 : *
701 : * Note: we could suppress the REWIND flag for the inner input, which
702 : * would amount to betting that the hash will be a single batch. Not
703 : * clear if this would be a win or not.
704 : */
7430 tgl 705 CBC 14214 : outerNode = outerPlan(node);
706 14214 : hashNode = (Hash *) innerPlan(node);
707 :
6249 tgl 708 GIC 14214 : outerPlanState(hjstate) = ExecInitNode(outerNode, estate, eflags);
1878 andres 709 14214 : outerDesc = ExecGetResultType(outerPlanState(hjstate));
6249 tgl 710 14214 : innerPlanState(hjstate) = ExecInitNode((Plan *) hashNode, estate, eflags);
1878 andres 711 14214 : innerDesc = ExecGetResultType(innerPlanState(hjstate));
712 :
1878 andres 713 ECB : /*
714 : * Initialize result slot, type and projection.
715 : */
1606 andres 716 GIC 14214 : ExecInitResultTupleSlotTL(&hjstate->js.ps, &TTSOpsVirtual);
1878 717 14214 : ExecAssignProjectionInfo(&hjstate->js.ps, NULL);
718 :
719 : /*
720 : * tuple table initialization
721 : */
1606 andres 722 CBC 14214 : ops = ExecGetResultSlotOps(outerPlanState(hjstate), NULL);
723 14214 : hjstate->hj_OuterTupleSlot = ExecInitExtraTupleSlot(estate, outerDesc,
724 : ops);
8244 tgl 725 ECB :
2193 726 : /*
727 : * detect whether we need only consider the first matching inner tuple
728 : */
2193 tgl 729 GIC 21208 : hjstate->js.single_match = (node->join.inner_unique ||
730 6994 : node->join.jointype == JOIN_SEMI);
731 :
732 : /* set up null tuples for outer joins, if needed */
8244 tgl 733 CBC 14214 : switch (node->join.jointype)
8244 tgl 734 ECB : {
8244 tgl 735 GIC 9014 : case JOIN_INNER:
736 : case JOIN_SEMI:
737 9014 : break;
738 2167 : case JOIN_LEFT:
5351 tgl 739 ECB : case JOIN_ANTI:
8244 tgl 740 CBC 2167 : hjstate->hj_NullInnerTupleSlot =
1606 andres 741 GIC 2167 : ExecInitNullTupleSlot(estate, innerDesc, &TTSOpsVirtual);
8244 tgl 742 2167 : break;
4483 743 2524 : case JOIN_RIGHT:
744 : case JOIN_RIGHT_ANTI:
745 2524 : hjstate->hj_NullOuterTupleSlot =
1606 andres 746 2524 : ExecInitNullTupleSlot(estate, outerDesc, &TTSOpsVirtual);
4483 tgl 747 CBC 2524 : break;
748 509 : case JOIN_FULL:
4483 tgl 749 GIC 509 : hjstate->hj_NullOuterTupleSlot =
1606 andres 750 509 : ExecInitNullTupleSlot(estate, outerDesc, &TTSOpsVirtual);
4483 tgl 751 CBC 509 : hjstate->hj_NullInnerTupleSlot =
1606 andres 752 GIC 509 : ExecInitNullTupleSlot(estate, innerDesc, &TTSOpsVirtual);
4483 tgl 753 CBC 509 : break;
8244 tgl 754 UIC 0 : default:
7202 tgl 755 LBC 0 : elog(ERROR, "unrecognized join type: %d",
8244 tgl 756 ECB : (int) node->join.jointype);
757 : }
758 :
8053 bruce 759 : /*
6385 760 : * now for some voodoo. our temporary tuple slot is actually the result
4483 tgl 761 : * tuple slot of the Hash node (which is our inner plan). we can do this
762 : * because Hash nodes don't return tuples via ExecProcNode() -- instead
6385 bruce 763 : * the hash join node uses ExecScanHashBucket() to get at the contents of
3260 764 : * the hash table. -cim 6/9/91
9345 765 : */
766 : {
7430 tgl 767 CBC 14214 : HashState *hashstate = (HashState *) innerPlanState(hjstate);
768 14214 : TupleTableSlot *slot = hashstate->ps.ps_ResultTupleSlot;
9345 bruce 769 ECB :
9345 bruce 770 CBC 14214 : hjstate->hj_HashTupleSlot = slot;
9345 bruce 771 ECB : }
9345 bruce 772 EUB :
8053 773 : /*
774 : * initialize child expressions
775 : */
1878 andres 776 GIC 14214 : hjstate->js.ps.qual =
777 14214 : ExecInitQual(node->join.plan.qual, (PlanState *) hjstate);
778 14214 : hjstate->js.joinqual =
779 14214 : ExecInitQual(node->join.joinqual, (PlanState *) hjstate);
780 14214 : hjstate->hashclauses =
781 14214 : ExecInitQual(node->hashclauses, (PlanState *) hjstate);
782 :
783 : /*
784 : * initialize hash-specific info
9345 bruce 785 ECB : */
7032 neilc 786 CBC 14214 : hjstate->hj_HashTable = NULL;
6405 tgl 787 GIC 14214 : hjstate->hj_FirstOuterTupleSlot = NULL;
7254 tgl 788 ECB :
6608 tgl 789 GIC 14214 : hjstate->hj_CurHashValue = 0;
8727 790 14214 : hjstate->hj_CurBucketNo = 0;
5132 791 14214 : hjstate->hj_CurSkewBucketNo = INVALID_SKEW_BUCKET_NO;
7032 neilc 792 14214 : hjstate->hj_CurTuple = NULL;
793 :
1346 andres 794 CBC 14214 : hjstate->hj_OuterHashKeys = ExecInitExprList(node->hashkeys,
1346 andres 795 ECB : (PlanState *) hjstate);
1346 andres 796 CBC 14214 : hjstate->hj_HashOperators = node->hashoperators;
797 14214 : hjstate->hj_Collations = node->hashcollations;
9345 bruce 798 ECB :
4483 tgl 799 CBC 14214 : hjstate->hj_JoinState = HJ_BUILD_HASHTABLE;
8244 tgl 800 GIC 14214 : hjstate->hj_MatchedOuter = false;
6341 801 14214 : hjstate->hj_OuterNotEmpty = false;
802 :
7430 803 14214 : return hjstate;
9770 scrappy 804 ECB : }
805 :
806 : /* ----------------------------------------------------------------
9345 bruce 807 : * ExecEndHashJoin
9770 scrappy 808 : *
9345 bruce 809 : * clean up routine for HashJoin node
9770 scrappy 810 : * ----------------------------------------------------------------
811 : */
812 : void
7430 tgl 813 GIC 14172 : ExecEndHashJoin(HashJoinState *node)
9770 scrappy 814 ECB : {
8053 bruce 815 : /*
816 : * Free hash table
9345 817 : */
7430 tgl 818 CBC 14172 : if (node->hj_HashTable)
9345 bruce 819 ECB : {
7430 tgl 820 GIC 9433 : ExecHashTableDestroy(node->hj_HashTable);
7430 tgl 821 CBC 9433 : node->hj_HashTable = NULL;
822 : }
823 :
824 : /*
825 : * Free the exprcontext
826 : */
7430 tgl 827 GIC 14172 : ExecFreeExprContext(&node->js.ps);
828 :
829 : /*
830 : * clean out the tuple table
9345 bruce 831 ECB : */
7430 tgl 832 GIC 14172 : ExecClearTuple(node->js.ps.ps_ResultTupleSlot);
833 14172 : ExecClearTuple(node->hj_OuterTupleSlot);
834 14172 : ExecClearTuple(node->hj_HashTupleSlot);
835 :
7420 tgl 836 ECB : /*
837 : * clean up subtrees
838 : */
7420 tgl 839 CBC 14172 : ExecEndNode(outerPlanState(node));
7420 tgl 840 GIC 14172 : ExecEndNode(innerPlanState(node));
9770 scrappy 841 14172 : }
842 :
843 : /*
844 : * ExecHashJoinOuterGetTuple
9770 scrappy 845 ECB : *
846 : * get the next outer tuple for a parallel oblivious hashjoin: either by
847 : * executing the outer plan node in the first pass, or from the temp
848 : * files for the hashjoin batches.
849 : *
4483 tgl 850 : * Returns a null slot if no more outer tuples (within the current batch).
851 : *
852 : * On success, the tuple's hash value is stored at *hashvalue --- this is
853 : * either originally computed, or re-read from the temp file.
854 : */
855 : static TupleTableSlot *
6608 tgl 856 GIC 7642703 : ExecHashJoinOuterGetTuple(PlanState *outerNode,
6608 tgl 857 ECB : HashJoinState *hjstate,
858 : uint32 *hashvalue)
9770 scrappy 859 : {
8720 bruce 860 GIC 7642703 : HashJoinTable hashtable = hjstate->hj_HashTable;
861 7642703 : int curbatch = hashtable->curbatch;
862 : TupleTableSlot *slot;
863 :
5915 tgl 864 7642703 : if (curbatch == 0) /* if it is the first pass */
865 : {
866 : /*
867 : * Check to see if first outer tuple was already fetched by
868 : * ExecHashJoin() and not used yet.
869 : */
6405 870 6906855 : slot = hjstate->hj_FirstOuterTupleSlot;
871 6906855 : if (!TupIsNull(slot))
872 6152 : hjstate->hj_FirstOuterTupleSlot = NULL;
873 : else
6405 tgl 874 CBC 6900703 : slot = ExecProcNode(outerNode);
875 :
5915 tgl 876 GIC 6907262 : while (!TupIsNull(slot))
877 : {
6608 tgl 878 ECB : /*
879 : * We have to compute the tuple's hash value.
880 : */
6608 tgl 881 GIC 6897924 : ExprContext *econtext = hjstate->js.ps.ps_ExprContext;
6608 tgl 882 ECB :
6608 tgl 883 GIC 6897924 : econtext->ecxt_outertuple = slot;
5915 884 6897924 : if (ExecHashGetHashValue(hashtable, econtext,
885 : hjstate->hj_OuterHashKeys,
886 : true, /* outer tuple */
4483 887 6897924 : HJ_FILL_OUTER(hjstate),
5915 tgl 888 ECB : hashvalue))
889 : {
890 : /* remember outer relation is not empty for possible rescan */
5915 tgl 891 GIC 6897517 : hjstate->hj_OuterNotEmpty = true;
6608 tgl 892 ECB :
5915 tgl 893 GIC 6897517 : return slot;
5915 tgl 894 ECB : }
895 :
896 : /*
897 : * That tuple couldn't match because of a NULL, so discard it and
898 : * continue with the next one.
899 : */
5915 tgl 900 GIC 407 : slot = ExecProcNode(outerNode);
6608 tgl 901 ECB : }
4483 902 : }
4483 tgl 903 GIC 735848 : else if (curbatch < hashtable->nbatch)
904 : {
4483 tgl 905 CBC 735848 : BufFile *file = hashtable->outerBatchFile[curbatch];
906 :
907 : /*
908 : * In outer-join cases, we could get here even though the batch file
4483 tgl 909 ECB : * is empty.
910 : */
4483 tgl 911 CBC 735848 : if (file == NULL)
4483 tgl 912 UIC 0 : return NULL;
913 :
8727 tgl 914 GIC 735848 : slot = ExecHashJoinGetSavedTuple(hjstate,
915 : file,
916 : hashvalue,
917 : hjstate->hj_OuterTupleSlot);
8720 bruce 918 CBC 735848 : if (!TupIsNull(slot))
8727 tgl 919 GIC 735096 : return slot;
920 : }
8727 tgl 921 ECB :
922 : /* End of this batch */
8727 tgl 923 CBC 10090 : return NULL;
924 : }
925 :
926 : /*
927 : * ExecHashJoinOuterGetTuple variant for the parallel case.
928 : */
1936 andres 929 ECB : static TupleTableSlot *
1936 andres 930 GBC 1080511 : ExecParallelHashJoinOuterGetTuple(PlanState *outerNode,
931 : HashJoinState *hjstate,
1936 andres 932 ECB : uint32 *hashvalue)
933 : {
1936 andres 934 GIC 1080511 : HashJoinTable hashtable = hjstate->hj_HashTable;
935 1080511 : int curbatch = hashtable->curbatch;
1936 andres 936 ECB : TupleTableSlot *slot;
937 :
938 : /*
939 : * In the Parallel Hash case we only run the outer plan directly for
940 : * single-batch hash joins. Otherwise we have to go to batch files, even
941 : * for batch 0.
942 : */
1936 andres 943 GIC 1080511 : if (curbatch == 0 && hashtable->nbatch == 1)
944 : {
945 480071 : slot = ExecProcNode(outerNode);
946 :
947 480071 : while (!TupIsNull(slot))
1936 andres 948 ECB : {
1936 andres 949 GIC 480000 : ExprContext *econtext = hjstate->js.ps.ps_ExprContext;
950 :
951 480000 : econtext->ecxt_outertuple = slot;
1936 andres 952 CBC 480000 : if (ExecHashGetHashValue(hashtable, econtext,
1936 andres 953 ECB : hjstate->hj_OuterHashKeys,
954 : true, /* outer tuple */
1936 andres 955 GIC 480000 : HJ_FILL_OUTER(hjstate),
956 : hashvalue))
957 480000 : return slot;
958 :
959 : /*
960 : * That tuple couldn't match because of a NULL, so discard it and
1936 andres 961 ECB : * continue with the next one.
962 : */
1936 andres 963 LBC 0 : slot = ExecProcNode(outerNode);
964 : }
1936 andres 965 ECB : }
1936 andres 966 GIC 600440 : else if (curbatch < hashtable->nbatch)
1936 andres 967 ECB : {
968 : MinimalTuple tuple;
969 :
1936 andres 970 CBC 600440 : tuple = sts_parallel_scan_next(hashtable->batches[curbatch].outer_tuples,
971 : hashvalue);
1936 andres 972 GIC 600440 : if (tuple != NULL)
1936 andres 973 ECB : {
1605 andres 974 GIC 600012 : ExecForceStoreMinimalTuple(tuple,
1605 andres 975 ECB : hjstate->hj_OuterTupleSlot,
976 : false);
1605 andres 977 GIC 600012 : slot = hjstate->hj_OuterTupleSlot;
1936 978 600012 : return slot;
979 : }
980 : else
1936 andres 981 GBC 428 : ExecClearTuple(hjstate->hj_OuterTupleSlot);
982 : }
983 :
1936 andres 984 ECB : /* End of this batch */
9 tmunro 985 GNC 499 : hashtable->batches[curbatch].outer_eof = true;
986 :
1936 andres 987 GIC 499 : return NULL;
988 : }
989 :
6608 tgl 990 ECB : /*
991 : * ExecHashJoinNewBatch
9345 bruce 992 : * switch to a new hashjoin batch
993 : *
4483 tgl 994 : * Returns true if successful, false if there are no more batches.
995 : */
996 : static bool
9344 bruce 997 CBC 10087 : ExecHashJoinNewBatch(HashJoinState *hjstate)
9770 scrappy 998 ECB : {
8727 tgl 999 GIC 10087 : HashJoinTable hashtable = hjstate->hj_HashTable;
1000 : int nbatch;
6608 tgl 1001 ECB : int curbatch;
1002 : BufFile *innerFile;
1003 : TupleTableSlot *slot;
1004 : uint32 hashvalue;
9345 bruce 1005 :
6608 tgl 1006 GIC 10087 : nbatch = hashtable->nbatch;
6608 tgl 1007 CBC 10087 : curbatch = hashtable->curbatch;
1008 :
6608 tgl 1009 GIC 10087 : if (curbatch > 0)
1010 : {
1011 : /*
1012 : * We no longer need the previous outer batch file; close it right
1013 : * away to free disk space.
1014 : */
1015 752 : if (hashtable->outerBatchFile[curbatch])
1016 752 : BufFileClose(hashtable->outerBatchFile[curbatch]);
6608 tgl 1017 CBC 752 : hashtable->outerBatchFile[curbatch] = NULL;
1018 : }
2118 tgl 1019 ECB : else /* we just finished the first batch */
1020 : {
1021 : /*
1022 : * Reset some of the skew optimization state variables, since we no
1023 : * longer need to consider skew tuples after the first batch. The
1024 : * memory context reset we are about to do will release the skew
1025 : * hashtable itself.
5132 1026 : */
5132 tgl 1027 CBC 9335 : hashtable->skewEnabled = false;
5132 tgl 1028 GIC 9335 : hashtable->skewBucket = NULL;
5132 tgl 1029 CBC 9335 : hashtable->skewBucketNums = NULL;
4483 tgl 1030 GIC 9335 : hashtable->nSkewBuckets = 0;
5132 1031 9335 : hashtable->spaceUsedSkew = 0;
1032 : }
1033 :
1034 : /*
6608 tgl 1035 ECB : * We can always skip over any batches that are completely empty on both
1036 : * sides. We can sometimes skip over batches that are empty on only one
1037 : * side, but there are exceptions:
1038 : *
1039 : * 1. In a left/full outer join, we have to process outer batches even if
1040 : * the inner batch is empty. Similarly, in a right/right-anti/full outer
1041 : * join, we have to process inner batches even if the outer batch is
1042 : * empty.
1043 : *
1044 : * 2. If we have increased nbatch since the initial estimate, we have to
1045 : * scan inner batches since they might contain tuples that need to be
1046 : * reassigned to later inner batches.
1047 : *
6347 bruce 1048 : * 3. Similarly, if we have increased nbatch since starting the outer
1049 : * scan, we have to rescan outer batches in case they contain tuples that
1050 : * need to be reassigned.
9770 scrappy 1051 : */
6608 tgl 1052 CBC 10087 : curbatch++;
6608 tgl 1053 GIC 10087 : while (curbatch < nbatch &&
1054 752 : (hashtable->outerBatchFile[curbatch] == NULL ||
1055 752 : hashtable->innerBatchFile[curbatch] == NULL))
1056 : {
6608 tgl 1057 UIC 0 : if (hashtable->outerBatchFile[curbatch] &&
4483 1058 0 : HJ_FILL_OUTER(hjstate))
1059 0 : break; /* must process due to rule 1 */
1060 0 : if (hashtable->innerBatchFile[curbatch] &&
1061 0 : HJ_FILL_INNER(hjstate))
6608 1062 0 : break; /* must process due to rule 1 */
1063 0 : if (hashtable->innerBatchFile[curbatch] &&
1064 0 : nbatch != hashtable->nbatch_original)
1065 0 : break; /* must process due to rule 2 */
1066 0 : if (hashtable->outerBatchFile[curbatch] &&
1067 0 : nbatch != hashtable->nbatch_outstart)
1068 0 : break; /* must process due to rule 3 */
1069 : /* We can ignore this batch. */
1070 : /* Release associated temp files right away. */
1071 0 : if (hashtable->innerBatchFile[curbatch])
1072 0 : BufFileClose(hashtable->innerBatchFile[curbatch]);
6608 tgl 1073 LBC 0 : hashtable->innerBatchFile[curbatch] = NULL;
1074 0 : if (hashtable->outerBatchFile[curbatch])
1075 0 : BufFileClose(hashtable->outerBatchFile[curbatch]);
1076 0 : hashtable->outerBatchFile[curbatch] = NULL;
6608 tgl 1077 UIC 0 : curbatch++;
9770 scrappy 1078 EUB : }
8727 tgl 1079 :
6608 tgl 1080 GBC 10087 : if (curbatch >= nbatch)
4382 bruce 1081 9335 : return false; /* no more batches */
8727 tgl 1082 EUB :
6608 tgl 1083 GBC 752 : hashtable->curbatch = curbatch;
8727 tgl 1084 EUB :
1085 : /*
6608 1086 : * Reload the hash table with the new inner batch (which could be empty)
8727 1087 : */
6608 tgl 1088 GBC 752 : ExecHashTableReset(hashtable);
9345 bruce 1089 EUB :
6608 tgl 1090 GIC 752 : innerFile = hashtable->innerBatchFile[curbatch];
1091 :
6608 tgl 1092 GBC 752 : if (innerFile != NULL)
9345 bruce 1093 EUB : {
11 peter 1094 GNC 752 : if (BufFileSeek(innerFile, 0, 0, SEEK_SET))
6608 tgl 1095 UBC 0 : ereport(ERROR,
6608 tgl 1096 EUB : (errcode_for_file_access(),
1027 tmunro 1097 : errmsg("could not rewind hash-join temporary file")));
6608 tgl 1098 :
6608 tgl 1099 GIC 1669353 : while ((slot = ExecHashJoinGetSavedTuple(hjstate,
1100 : innerFile,
6608 tgl 1101 ECB : &hashvalue,
1102 : hjstate->hj_HashTupleSlot)))
1103 : {
1104 : /*
1105 : * NOTE: some tuples may be sent to future batches. Also, it is
1106 : * possible for hashtable->nbatch to be increased here!
1107 : */
6130 tgl 1108 GIC 1668601 : ExecHashTableInsert(hashtable, slot, hashvalue);
6608 tgl 1109 ECB : }
1110 :
1111 : /*
1112 : * after we build the hash table, the inner batch file is no longer
1113 : * needed
1114 : */
6608 tgl 1115 CBC 752 : BufFileClose(innerFile);
6608 tgl 1116 GBC 752 : hashtable->innerBatchFile[curbatch] = NULL;
1117 : }
1118 :
1119 : /*
4483 tgl 1120 ECB : * Rewind outer batch file (if present), so that we can start reading it.
1121 : */
4483 tgl 1122 GIC 752 : if (hashtable->outerBatchFile[curbatch] != NULL)
1123 : {
11 peter 1124 GNC 752 : if (BufFileSeek(hashtable->outerBatchFile[curbatch], 0, 0, SEEK_SET))
4483 tgl 1125 UIC 0 : ereport(ERROR,
1126 : (errcode_for_file_access(),
1127 : errmsg("could not rewind hash-join temporary file")));
1128 : }
6608 tgl 1129 ECB :
4483 tgl 1130 GIC 752 : return true;
1131 : }
1132 :
1133 : /*
1134 : * Choose a batch to work on, and attach to it. Returns true if successful,
1135 : * false if there are no more batches.
1936 andres 1136 ECB : */
1137 : static bool
1936 andres 1138 GIC 688 : ExecParallelHashJoinNewBatch(HashJoinState *hjstate)
1139 : {
1140 688 : HashJoinTable hashtable = hjstate->hj_HashTable;
1141 : int start_batchno;
1142 : int batchno;
1936 andres 1143 ECB :
1144 : /*
1145 : * If we were already attached to a batch, remember not to bother checking
1936 andres 1146 EUB : * it again, and detach from it (possibly freeing the hash table if we are
1147 : * last to detach).
1148 : */
1936 andres 1149 GIC 688 : if (hashtable->curbatch >= 0)
1150 : {
1936 andres 1151 CBC 486 : hashtable->batches[hashtable->curbatch].done = true;
1936 andres 1152 GIC 486 : ExecHashTableDetachBatch(hashtable);
1153 : }
1154 :
1155 : /*
1156 : * Search for a batch that isn't done. We use an atomic counter to start
1157 : * our search at a different batch in every participant when there are
1158 : * more batches than participants.
1936 andres 1159 ECB : */
1936 andres 1160 GIC 688 : batchno = start_batchno =
1936 andres 1161 CBC 688 : pg_atomic_fetch_add_u32(&hashtable->parallel_state->distributor, 1) %
1936 andres 1162 GIC 688 : hashtable->nbatch;
1163 : do
1164 : {
1165 : uint32 hashvalue;
1166 : MinimalTuple tuple;
1167 : TupleTableSlot *slot;
1168 :
1169 1799 : if (!hashtable->batches[batchno].done)
1936 andres 1170 ECB : {
1171 : SharedTuplestoreAccessor *inner_tuples;
1936 andres 1172 CBC 948 : Barrier *batch_barrier =
1173 948 : &hashtable->batches[batchno].shared->batch_barrier;
1174 :
1936 andres 1175 GIC 948 : switch (BarrierAttach(batch_barrier))
1176 : {
17 tmunro 1177 GNC 327 : case PHJ_BATCH_ELECT:
1178 :
1179 : /* One backend allocates the hash table. */
1936 andres 1180 GIC 327 : if (BarrierArriveAndWait(batch_barrier,
1058 tgl 1181 ECB : WAIT_EVENT_HASH_BATCH_ELECT))
1936 andres 1182 CBC 327 : ExecParallelHashTableAlloc(hashtable, batchno);
1061 alvherre 1183 ECB : /* Fall through. */
1184 :
1185 : case PHJ_BATCH_ALLOCATE:
1186 : /* Wait for allocation to complete. */
1936 andres 1187 GIC 328 : BarrierArriveAndWait(batch_barrier,
1188 : WAIT_EVENT_HASH_BATCH_ALLOCATE);
1189 : /* Fall through. */
1936 andres 1190 ECB :
17 tmunro 1191 GNC 342 : case PHJ_BATCH_LOAD:
1192 : /* Start (or join in) loading tuples. */
1936 andres 1193 CBC 342 : ExecParallelHashTableSetCurrentBatch(hashtable, batchno);
1194 342 : inner_tuples = hashtable->batches[batchno].inner_tuples;
1936 andres 1195 GIC 342 : sts_begin_parallel_scan(inner_tuples);
1936 andres 1196 CBC 542379 : while ((tuple = sts_parallel_scan_next(inner_tuples,
1197 : &hashvalue)))
1936 andres 1198 ECB : {
1605 andres 1199 GIC 542037 : ExecForceStoreMinimalTuple(tuple,
1200 : hjstate->hj_HashTupleSlot,
1605 andres 1201 ECB : false);
1605 andres 1202 GIC 542037 : slot = hjstate->hj_HashTupleSlot;
1936 andres 1203 CBC 542037 : ExecParallelHashTableInsertCurrentBatch(hashtable, slot,
1204 : hashvalue);
1205 : }
1936 andres 1206 GIC 342 : sts_end_parallel_scan(inner_tuples);
1207 342 : BarrierArriveAndWait(batch_barrier,
1058 tgl 1208 ECB : WAIT_EVENT_HASH_BATCH_LOAD);
1209 : /* Fall through. */
1210 :
17 tmunro 1211 GNC 499 : case PHJ_BATCH_PROBE:
1936 andres 1212 ECB :
1213 : /*
1214 : * This batch is ready to probe. Return control to
1215 : * caller. We stay attached to batch_barrier so that the
1216 : * hash table stays alive until everyone's finished
1217 : * probing it, but no participant is allowed to wait at
1218 : * this barrier again (or else a deadlock could occur).
1219 : * All attached participants must eventually detach from
1220 : * the barrier and one worker must advance the phase so
1221 : * that the final phase is reached.
1222 : */
1936 andres 1223 CBC 499 : ExecParallelHashTableSetCurrentBatch(hashtable, batchno);
1224 499 : sts_begin_parallel_scan(hashtable->batches[batchno].outer_tuples);
1225 :
1936 andres 1226 GNC 499 : return true;
9 tmunro 1227 2 : case PHJ_BATCH_SCAN:
1228 :
1229 : /*
1230 : * In principle, we could help scan for unmatched tuples,
1231 : * since that phase is already underway (the thing we
1232 : * can't do under current deadlock-avoidance rules is wait
1233 : * for others to arrive at PHJ_BATCH_SCAN, because
1234 : * PHJ_BATCH_PROBE emits tuples, but in this case we just
1235 : * got here without waiting). That is not yet done. For
1236 : * now, we just detach and go around again. We have to
1237 : * use ExecHashTableDetachBatch() because there's a small
1238 : * chance we'll be the last to detach, and then we're
1239 : * responsible for freeing memory.
1240 : */
1241 2 : ExecParallelHashTableSetCurrentBatch(hashtable, batchno);
1242 2 : hashtable->batches[batchno].done = true;
1243 2 : ExecHashTableDetachBatch(hashtable);
1244 2 : break;
1245 :
17 1246 447 : case PHJ_BATCH_FREE:
1936 andres 1247 ECB :
1248 : /*
1249 : * Already done. Detach and go around again (if any
1250 : * remain).
1251 : */
1936 andres 1252 GIC 447 : BarrierDetach(batch_barrier);
1253 447 : hashtable->batches[batchno].done = true;
1254 447 : hashtable->curbatch = -1;
1255 447 : break;
1256 :
1936 andres 1257 UIC 0 : default:
1258 0 : elog(ERROR, "unexpected batch phase %d",
1259 : BarrierPhase(batch_barrier));
1260 : }
1261 : }
1936 andres 1262 GIC 1300 : batchno = (batchno + 1) % hashtable->nbatch;
1936 andres 1263 CBC 1300 : } while (batchno != start_batchno);
1936 andres 1264 ECB :
1936 andres 1265 GIC 189 : return false;
1936 andres 1266 ECB : }
1267 :
1268 : /*
1269 : * ExecHashJoinSaveTuple
1270 : * save a tuple to a batch file.
1271 : *
1272 : * The data recorded in the file for each tuple is its hash value,
1273 : * then the tuple in MinimalTuple format.
1274 : *
1275 : * Note: it is important always to call this in the regular executor
1276 : * context, not in a shorter-lived context; else the temp file buffers
1277 : * will get messed up.
1278 : */
1279 : void
5785 tgl 1280 GIC 2403697 : ExecHashJoinSaveTuple(MinimalTuple tuple, uint32 hashvalue,
6608 tgl 1281 ECB : BufFile **fileptr)
9770 scrappy 1282 : {
6385 bruce 1283 CBC 2403697 : BufFile *file = *fileptr;
8727 tgl 1284 ECB :
6608 tgl 1285 GIC 2403697 : if (file == NULL)
6608 tgl 1286 ECB : {
1287 : /* First write to this batch file, so open it. */
5785 tgl 1288 GIC 1504 : file = BufFileCreateTemp(false);
6608 1289 1504 : *fileptr = file;
1290 : }
1291 :
122 peter 1292 GNC 2403697 : BufFileWrite(file, &hashvalue, sizeof(uint32));
1293 2403697 : BufFileWrite(file, tuple, tuple->t_len);
9770 scrappy 1294 CBC 2403697 : }
9186 vadim4o 1295 ECB :
1296 : /*
6608 tgl 1297 EUB : * ExecHashJoinGetSavedTuple
3260 bruce 1298 : * read the next tuple from a batch file. Return NULL if no more.
1299 : *
1300 : * On success, *hashvalue is set to the tuple's hash value, and the tuple
1301 : * itself is stored in the given slot.
6608 tgl 1302 ECB : */
1303 : static TupleTableSlot *
6608 tgl 1304 GIC 2405201 : ExecHashJoinGetSavedTuple(HashJoinState *hjstate,
6608 tgl 1305 ECB : BufFile *file,
1306 : uint32 *hashvalue,
1307 : TupleTableSlot *tupleSlot)
1308 : {
1309 : uint32 header[2];
1310 : size_t nread;
1311 : MinimalTuple tuple;
1312 :
1313 : /*
1314 : * We check for interrupts here because this is typically taken as an
1315 : * alternative code path to an ExecProcNode() call, which would include
1316 : * such a check.
1317 : */
2244 tgl 1318 GIC 2405201 : CHECK_FOR_INTERRUPTS();
1319 :
6130 tgl 1320 ECB : /*
1321 : * Since both the hash value and the MinimalTuple length word are uint32,
1322 : * we can read them both in one BufFileRead() call without any type
6031 bruce 1323 : * cheating.
1324 : */
83 peter 1325 GNC 2405201 : nread = BufFileReadMaybeEOF(file, header, sizeof(header), true);
6031 bruce 1326 GIC 2405201 : if (nread == 0) /* end of file */
1327 : {
6130 tgl 1328 CBC 1504 : ExecClearTuple(tupleSlot);
1329 1504 : return NULL;
1330 : }
6130 tgl 1331 GIC 2403697 : *hashvalue = header[0];
1332 2403697 : tuple = (MinimalTuple) palloc(header[1]);
1333 2403697 : tuple->t_len = header[1];
83 peter 1334 GNC 2403697 : BufFileReadExact(file,
1335 : (char *) tuple + sizeof(uint32),
1336 2403697 : header[1] - sizeof(uint32));
1605 andres 1337 GIC 2403697 : ExecForceStoreMinimalTuple(tuple, tupleSlot, true);
1338 2403697 : return tupleSlot;
1339 : }
1340 :
1341 :
1342 : void
4654 tgl 1343 1143 : ExecReScanHashJoin(HashJoinState *node)
1344 : {
276 tgl 1345 GNC 1143 : PlanState *outerPlan = outerPlanState(node);
1346 1143 : PlanState *innerPlan = innerPlanState(node);
1347 :
1348 : /*
1349 : * In a multi-batch join, we currently have to do rescans the hard way,
1350 : * primarily because batch temp files may have already been released. But
6385 bruce 1351 ECB : * if it's a single-batch join, and there is no parameter change for the
1352 : * inner subnode, then we can just re-use the existing hash table without
1353 : * rebuilding it.
1354 : */
6341 tgl 1355 GIC 1143 : if (node->hj_HashTable != NULL)
1356 : {
1357 662 : if (node->hj_HashTable->nbatch == 1 &&
276 tgl 1358 GNC 662 : innerPlan->chgParam == NULL)
6341 tgl 1359 ECB : {
1360 : /*
4483 1361 : * Okay to reuse the hash table; needn't rescan inner, either.
6341 1362 : *
1363 : * However, if it's a right/right-anti/full join, we'd better
1364 : * reset the inner-tuple match flags contained in the table.
4483 1365 : */
4483 tgl 1366 CBC 251 : if (HJ_FILL_INNER(node))
1367 3 : ExecHashTableResetMatchFlags(node->hj_HashTable);
1368 :
4483 tgl 1369 ECB : /*
4382 bruce 1370 : * Also, we need to reset our state about the emptiness of the
1371 : * outer relation, so that the new scan of the outer will update
1372 : * it correctly if it turns out to be empty this time. (There's no
1373 : * harm in clearing it now because ExecHashJoin won't need the
1374 : * info. In the other cases, where the hash table doesn't exist
1375 : * or we are destroying it, we leave this state alone because
1376 : * ExecHashJoin will need it the first time through.)
1377 : */
6341 tgl 1378 CBC 251 : node->hj_OuterNotEmpty = false;
4483 tgl 1379 ECB :
1380 : /* ExecHashJoin can skip the BUILD_HASHTABLE step */
4483 tgl 1381 GIC 251 : node->hj_JoinState = HJ_NEED_NEW_OUTER;
1382 : }
1383 : else
1384 : {
1385 : /* must destroy and rebuild hash table */
276 tgl 1386 GNC 411 : HashState *hashNode = castNode(HashState, innerPlan);
1387 :
1093 tgl 1388 CBC 411 : Assert(hashNode->hashtable == node->hj_HashTable);
1389 : /* accumulate stats from old hash table, if wanted */
1093 tgl 1390 ECB : /* (this should match ExecShutdownHash) */
1093 tgl 1391 CBC 411 : if (hashNode->ps.instrument && !hashNode->hinstrument)
1093 tgl 1392 UIC 0 : hashNode->hinstrument = (HashInstrumentation *)
1393 0 : palloc0(sizeof(HashInstrumentation));
1093 tgl 1394 GIC 411 : if (hashNode->hinstrument)
1093 tgl 1395 UIC 0 : ExecHashAccumInstrumentation(hashNode->hinstrument,
1396 : hashNode->hashtable);
1397 : /* for safety, be sure to clear child plan node's pointer too */
1093 tgl 1398 GIC 411 : hashNode->hashtable = NULL;
1093 tgl 1399 ECB :
6341 tgl 1400 CBC 411 : ExecHashTableDestroy(node->hj_HashTable);
6341 tgl 1401 GIC 411 : node->hj_HashTable = NULL;
4483 1402 411 : node->hj_JoinState = HJ_BUILD_HASHTABLE;
1403 :
1404 : /*
1405 : * if chgParam of subnode is not null then plan will be re-scanned
1406 : * by first ExecProcNode.
1407 : */
276 tgl 1408 GNC 411 : if (innerPlan->chgParam == NULL)
276 tgl 1409 UNC 0 : ExecReScan(innerPlan);
1410 : }
9186 vadim4o 1411 ECB : }
1412 :
1413 : /* Always reset intra-tuple state */
6608 tgl 1414 CBC 1143 : node->hj_CurHashValue = 0;
7430 tgl 1415 GIC 1143 : node->hj_CurBucketNo = 0;
5132 1416 1143 : node->hj_CurSkewBucketNo = INVALID_SKEW_BUCKET_NO;
7032 neilc 1417 1143 : node->hj_CurTuple = NULL;
1418 :
7430 tgl 1419 CBC 1143 : node->hj_MatchedOuter = false;
6341 tgl 1420 GIC 1143 : node->hj_FirstOuterTupleSlot = NULL;
9173 bruce 1421 ECB :
1422 : /*
1423 : * if chgParam of subnode is not null then plan will be re-scanned by
7188 1424 : * first ExecProcNode.
9186 vadim4o 1425 EUB : */
276 tgl 1426 GNC 1143 : if (outerPlan->chgParam == NULL)
1427 826 : ExecReScan(outerPlan);
9186 vadim4o 1428 GBC 1143 : }
1429 :
1430 : void
1936 andres 1431 CBC 12754 : ExecShutdownHashJoin(HashJoinState *node)
1432 : {
1433 12754 : if (node->hj_HashTable)
1936 andres 1434 ECB : {
1435 : /*
1436 : * Detach from shared state before DSM memory goes away. This makes
1437 : * sure that we don't have any pointers into DSM memory by the time
1438 : * ExecEndHashJoin runs.
1439 : */
1936 andres 1440 GIC 9424 : ExecHashTableDetachBatch(node->hj_HashTable);
1936 andres 1441 CBC 9424 : ExecHashTableDetach(node->hj_HashTable);
1936 andres 1442 EUB : }
1936 andres 1443 GIC 12754 : }
1444 :
1445 : static void
1446 87 : ExecParallelHashJoinPartitionOuter(HashJoinState *hjstate)
1936 andres 1447 ECB : {
1936 andres 1448 CBC 87 : PlanState *outerState = outerPlanState(hjstate);
1449 87 : ExprContext *econtext = hjstate->js.ps.ps_ExprContext;
1450 87 : HashJoinTable hashtable = hjstate->hj_HashTable;
1451 : TupleTableSlot *slot;
1936 andres 1452 ECB : uint32 hashvalue;
1453 : int i;
1454 :
1936 andres 1455 GIC 87 : Assert(hjstate->hj_FirstOuterTupleSlot == NULL);
1456 :
1457 : /* Execute outer plan, writing all tuples to shared tuplestores. */
1458 : for (;;)
1936 andres 1459 ECB : {
1936 andres 1460 CBC 600099 : slot = ExecProcNode(outerState);
1461 600099 : if (TupIsNull(slot))
1462 : break;
1936 andres 1463 GIC 600012 : econtext->ecxt_outertuple = slot;
1936 andres 1464 CBC 600012 : if (ExecHashGetHashValue(hashtable, econtext,
1465 : hjstate->hj_OuterHashKeys,
1936 andres 1466 ECB : true, /* outer tuple */
1618 tmunro 1467 GIC 600012 : HJ_FILL_OUTER(hjstate),
1468 : &hashvalue))
1469 : {
1470 : int batchno;
1471 : int bucketno;
1472 : bool shouldFree;
1606 andres 1473 CBC 600012 : MinimalTuple mintup = ExecFetchSlotMinimalTuple(slot, &shouldFree);
1936 andres 1474 ECB :
1936 andres 1475 GIC 600012 : ExecHashGetBucketAndBatch(hashtable, hashvalue, &bucketno,
1936 andres 1476 ECB : &batchno);
1936 andres 1477 GIC 600012 : sts_puttuple(hashtable->batches[batchno].outer_tuples,
1478 : &hashvalue, mintup);
1606 andres 1479 ECB :
1606 andres 1480 GIC 600012 : if (shouldFree)
1606 andres 1481 CBC 600012 : heap_free_minimal_tuple(mintup);
1936 andres 1482 ECB : }
1936 andres 1483 CBC 600012 : CHECK_FOR_INTERRUPTS();
1484 : }
1485 :
1486 : /* Make sure all outer partitions are readable by any backend. */
1936 andres 1487 GIC 811 : for (i = 0; i < hashtable->nbatch; ++i)
1936 andres 1488 CBC 724 : sts_end_write(hashtable->batches[i].outer_tuples);
1936 andres 1489 GIC 87 : }
1490 :
1491 : void
1492 57 : ExecHashJoinEstimate(HashJoinState *state, ParallelContext *pcxt)
1936 andres 1493 ECB : {
1936 andres 1494 CBC 57 : shm_toc_estimate_chunk(&pcxt->estimator, sizeof(ParallelHashJoinState));
1936 andres 1495 GIC 57 : shm_toc_estimate_keys(&pcxt->estimator, 1);
1936 andres 1496 CBC 57 : }
1936 andres 1497 ECB :
1498 : void
1936 andres 1499 GIC 57 : ExecHashJoinInitializeDSM(HashJoinState *state, ParallelContext *pcxt)
1936 andres 1500 ECB : {
1936 andres 1501 GIC 57 : int plan_node_id = state->js.ps.plan->plan_node_id;
1502 : HashState *hashNode;
1503 : ParallelHashJoinState *pstate;
1504 :
1505 : /*
1936 andres 1506 ECB : * Disable shared hash table mode if we failed to create a real DSM
1507 : * segment, because that means that we don't have a DSA area to work with.
1508 : */
1936 andres 1509 GIC 57 : if (pcxt->seg == NULL)
1936 andres 1510 LBC 0 : return;
1511 :
1936 andres 1512 GIC 57 : ExecSetExecProcNode(&state->js.ps, ExecParallelHashJoin);
1936 andres 1513 ECB :
1514 : /*
1515 : * Set up the state needed to coordinate access to the shared hash
1516 : * table(s), using the plan node ID as the toc key.
1517 : */
1936 andres 1518 GIC 57 : pstate = shm_toc_allocate(pcxt->toc, sizeof(ParallelHashJoinState));
1519 57 : shm_toc_insert(pcxt->toc, plan_node_id, pstate);
1936 andres 1520 ECB :
1521 : /*
1522 : * Set up the shared hash join state with no batches initially.
1523 : * ExecHashTableCreate() will prepare at least one later and set nbatch
1524 : * and space_allowed.
1525 : */
1936 andres 1526 GIC 57 : pstate->nbatch = 0;
1936 andres 1527 CBC 57 : pstate->space_allowed = 0;
1528 57 : pstate->batches = InvalidDsaPointer;
1529 57 : pstate->old_batches = InvalidDsaPointer;
1936 andres 1530 GIC 57 : pstate->nbuckets = 0;
1531 57 : pstate->growth = PHJ_GROWTH_OK;
1936 andres 1532 CBC 57 : pstate->chunk_work_queue = InvalidDsaPointer;
1936 andres 1533 GIC 57 : pg_atomic_init_u32(&pstate->distributor, 0);
1936 andres 1534 CBC 57 : pstate->nparticipants = pcxt->nworkers + 1;
1936 andres 1535 GIC 57 : pstate->total_tuples = 0;
1536 57 : LWLockInitialize(&pstate->lock,
1537 : LWTRANCHE_PARALLEL_HASH_JOIN);
1538 57 : BarrierInit(&pstate->build_barrier, 0);
1539 57 : BarrierInit(&pstate->grow_batches_barrier, 0);
1540 57 : BarrierInit(&pstate->grow_buckets_barrier, 0);
1541 :
1936 andres 1542 ECB : /* Set up the space we'll use for shared temporary files. */
1936 andres 1543 GBC 57 : SharedFileSetInit(&pstate->fileset, pcxt->seg);
1544 :
1936 andres 1545 ECB : /* Initialize the shared state in the hash node. */
1936 andres 1546 GIC 57 : hashNode = (HashState *) innerPlanState(state);
1547 57 : hashNode->parallel_state = pstate;
1548 : }
1549 :
1550 : /* ----------------------------------------------------------------
1936 andres 1551 ECB : * ExecHashJoinReInitializeDSM
1552 : *
1553 : * Reset shared state before beginning a fresh scan.
1554 : * ----------------------------------------------------------------
1555 : */
1556 : void
202 pg 1557 GNC 24 : ExecHashJoinReInitializeDSM(HashJoinState *state, ParallelContext *pcxt)
1558 : {
1936 andres 1559 CBC 24 : int plan_node_id = state->js.ps.plan->plan_node_id;
1936 andres 1560 ECB : ParallelHashJoinState *pstate =
202 pg 1561 GNC 24 : shm_toc_lookup(pcxt->toc, plan_node_id, false);
1936 andres 1562 ECB :
1563 : /*
1564 : * It would be possible to reuse the shared hash table in single-batch
1565 : * cases by resetting and then fast-forwarding build_barrier to
1566 : * PHJ_BUILD_FREE and batch 0's batch_barrier to PHJ_BATCH_PROBE, but
1567 : * currently shared hash tables are already freed by now (by the last
1568 : * participant to detach from the batch). We could consider keeping it
1569 : * around for single-batch joins. We'd also need to adjust
1570 : * finalize_plan() so that it doesn't record a dummy dependency for
1571 : * Parallel Hash nodes, preventing the rescan optimization. For now we
1572 : * don't try.
1573 : */
1574 :
1575 : /* Detach, freeing any remaining shared memory. */
1936 andres 1576 CBC 24 : if (state->hj_HashTable != NULL)
1577 : {
1936 andres 1578 UIC 0 : ExecHashTableDetachBatch(state->hj_HashTable);
1936 andres 1579 LBC 0 : ExecHashTableDetach(state->hj_HashTable);
1936 andres 1580 ECB : }
1581 :
1582 : /* Clear any shared batch files. */
1936 andres 1583 GIC 24 : SharedFileSetDeleteAll(&pstate->fileset);
1584 :
1585 : /* Reset build_barrier to PHJ_BUILD_ELECT so we can go around again. */
1586 24 : BarrierInit(&pstate->build_barrier, 0);
1587 24 : }
1588 :
1589 : void
1936 andres 1590 CBC 147 : ExecHashJoinInitializeWorker(HashJoinState *state,
1591 : ParallelWorkerContext *pwcxt)
1936 andres 1592 ECB : {
1593 : HashState *hashNode;
1936 andres 1594 CBC 147 : int plan_node_id = state->js.ps.plan->plan_node_id;
1595 : ParallelHashJoinState *pstate =
1936 andres 1596 GIC 147 : shm_toc_lookup(pwcxt->toc, plan_node_id, false);
1597 :
1598 : /* Attach to the space for shared temporary files. */
1599 147 : SharedFileSetAttach(&pstate->fileset, pwcxt->seg);
1600 :
1601 : /* Attach to the shared state in the hash node. */
1602 147 : hashNode = (HashState *) innerPlanState(state);
1603 147 : hashNode->parallel_state = pstate;
1604 :
1605 147 : ExecSetExecProcNode(&state->js.ps, ExecParallelHashJoin);
1606 147 : }
|