TLA Line data Source code
1 : /*-------------------------------------------------------------------------
2 : *
3 : * nodeHashjoin.c
4 : * Routines to handle hash join nodes
5 : *
6 : * Portions Copyright (c) 1996-2023, PostgreSQL Global Development Group
7 : * Portions Copyright (c) 1994, Regents of the University of California
8 : *
9 : *
10 : * IDENTIFICATION
11 : * src/backend/executor/nodeHashjoin.c
12 : *
13 : * PARALLELISM
14 : *
15 : * Hash joins can participate in parallel query execution in several ways. A
16 : * parallel-oblivious hash join is one where the node is unaware that it is
17 : * part of a parallel plan. In this case, a copy of the inner plan is used to
18 : * build a copy of the hash table in every backend, and the outer plan could
19 : * either be built from a partial or complete path, so that the results of the
20 : * hash join are correspondingly either partial or complete. A parallel-aware
21 : * hash join is one that behaves differently, coordinating work between
22 : * backends, and appears as Parallel Hash Join in EXPLAIN output. A Parallel
23 : * Hash Join always appears with a Parallel Hash node.
24 : *
25 : * Parallel-aware hash joins use the same per-backend state machine to track
26 : * progress through the hash join algorithm as parallel-oblivious hash joins.
27 : * In a parallel-aware hash join, there is also a shared state machine that
28 : * co-operating backends use to synchronize their local state machines and
29 : * program counters. The shared state machine is managed with a Barrier IPC
30 : * primitive. When all attached participants arrive at a barrier, the phase
31 : * advances and all waiting participants are released.
32 : *
33 : * When a participant begins working on a parallel hash join, it must first
34 : * figure out how much progress has already been made, because participants
35 : * don't wait for each other to begin. For this reason there are switch
36 : * statements at key points in the code where we have to synchronize our local
37 : * state machine with the phase, and then jump to the correct part of the
38 : * algorithm so that we can get started.
39 : *
40 : * One barrier called build_barrier is used to coordinate the hashing phases.
41 : * The phase is represented by an integer which begins at zero and increments
42 : * one by one, but in the code it is referred to by symbolic names as follows.
43 : * An asterisk indicates a phase that is performed by a single arbitrarily
44 : * chosen process.
45 : *
46 : * PHJ_BUILD_ELECT -- initial state
47 : * PHJ_BUILD_ALLOCATE* -- one sets up the batches and table 0
48 : * PHJ_BUILD_HASH_INNER -- all hash the inner rel
49 : * PHJ_BUILD_HASH_OUTER -- (multi-batch only) all hash the outer
50 : * PHJ_BUILD_RUN -- building done, probing can begin
51 : * PHJ_BUILD_FREE* -- all work complete, one frees batches
52 : *
53 : * While in the phase PHJ_BUILD_HASH_INNER a separate pair of barriers may
54 : * be used repeatedly as required to coordinate expansions in the number of
55 : * batches or buckets. Their phases are as follows:
56 : *
57 : * PHJ_GROW_BATCHES_ELECT -- initial state
58 : * PHJ_GROW_BATCHES_REALLOCATE* -- one allocates new batches
59 : * PHJ_GROW_BATCHES_REPARTITION -- all repartition
60 : * PHJ_GROW_BATCHES_DECIDE* -- one detects skew and cleans up
61 : * PHJ_GROW_BATCHES_FINISH -- finished one growth cycle
62 : *
63 : * PHJ_GROW_BUCKETS_ELECT -- initial state
64 : * PHJ_GROW_BUCKETS_REALLOCATE* -- one allocates new buckets
65 : * PHJ_GROW_BUCKETS_REINSERT -- all insert tuples
66 : *
67 : * If the planner got the number of batches and buckets right, those won't be
68 : * necessary, but on the other hand we might finish up needing to expand the
69 : * buckets or batches multiple times while hashing the inner relation to stay
70 : * within our memory budget and load factor target. For that reason it's a
71 : * separate pair of barriers using circular phases.
72 : *
73 : * The PHJ_BUILD_HASH_OUTER phase is required only for multi-batch joins,
74 : * because we need to divide the outer relation into batches up front in order
75 : * to be able to process batches entirely independently. In contrast, the
76 : * parallel-oblivious algorithm simply throws tuples 'forward' to 'later'
77 : * batches whenever it encounters them while scanning and probing, which it
78 : * can do because it processes batches in serial order.
79 : *
80 : * Once PHJ_BUILD_RUN is reached, backends then split up and process
81 : * different batches, or gang up and work together on probing batches if there
82 : * aren't enough to go around. For each batch there is a separate barrier
83 : * with the following phases:
84 : *
85 : * PHJ_BATCH_ELECT -- initial state
86 : * PHJ_BATCH_ALLOCATE* -- one allocates buckets
87 : * PHJ_BATCH_LOAD -- all load the hash table from disk
88 : * PHJ_BATCH_PROBE -- all probe
89 : * PHJ_BATCH_SCAN* -- one does right/right-anti/full unmatched scan
90 : * PHJ_BATCH_FREE* -- one frees memory
91 : *
92 : * Batch 0 is a special case, because it starts out in phase
93 : * PHJ_BATCH_PROBE; populating batch 0's hash table is done during
94 : * PHJ_BUILD_HASH_INNER so we can skip loading.
95 : *
96 : * Initially we try to plan for a single-batch hash join using the combined
97 : * hash_mem of all participants to create a large shared hash table. If that
98 : * turns out either at planning or execution time to be impossible then we
99 : * fall back to regular hash_mem sized hash tables.
100 : *
101 : * To avoid deadlocks, we never wait for any barrier unless it is known that
102 : * all other backends attached to it are actively executing the node or have
103 : * finished. Practically, that means that we never emit a tuple while attached
104 : * to a barrier, unless the barrier has reached a phase that means that no
105 : * process will wait on it again. We emit tuples while attached to the build
106 : * barrier in phase PHJ_BUILD_RUN, and to a per-batch barrier in phase
107 : * PHJ_BATCH_PROBE. These are advanced to PHJ_BUILD_FREE and PHJ_BATCH_SCAN
108 : * respectively without waiting, using BarrierArriveAndDetach() and
109 : * BarrierArriveAndDetachExceptLast() respectively. The last to detach
110 : * receives a different return value so that it knows that it's safe to
111 : * clean up. Any straggler process that attaches after that phase is reached
112 : * will see that it's too late to participate or access the relevant shared
113 : * memory objects.
114 : *
115 : *-------------------------------------------------------------------------
116 : */
117 :
118 : #include "postgres.h"
119 :
120 : #include "access/htup_details.h"
121 : #include "access/parallel.h"
122 : #include "executor/executor.h"
123 : #include "executor/hashjoin.h"
124 : #include "executor/nodeHash.h"
125 : #include "executor/nodeHashjoin.h"
126 : #include "miscadmin.h"
127 : #include "pgstat.h"
128 : #include "utils/memutils.h"
129 : #include "utils/sharedtuplestore.h"
130 :
131 :
132 : /*
133 : * States of the ExecHashJoin state machine
134 : */
135 : #define HJ_BUILD_HASHTABLE 1
136 : #define HJ_NEED_NEW_OUTER 2
137 : #define HJ_SCAN_BUCKET 3
138 : #define HJ_FILL_OUTER_TUPLE 4
139 : #define HJ_FILL_INNER_TUPLES 5
140 : #define HJ_NEED_NEW_BATCH 6
141 :
142 : /* Returns true if doing null-fill on outer relation */
143 : #define HJ_FILL_OUTER(hjstate) ((hjstate)->hj_NullInnerTupleSlot != NULL)
144 : /* Returns true if doing null-fill on inner relation */
145 : #define HJ_FILL_INNER(hjstate) ((hjstate)->hj_NullOuterTupleSlot != NULL)
146 :
147 : static TupleTableSlot *ExecHashJoinOuterGetTuple(PlanState *outerNode,
148 : HashJoinState *hjstate,
149 : uint32 *hashvalue);
150 : static TupleTableSlot *ExecParallelHashJoinOuterGetTuple(PlanState *outerNode,
151 : HashJoinState *hjstate,
152 : uint32 *hashvalue);
153 : static TupleTableSlot *ExecHashJoinGetSavedTuple(HashJoinState *hjstate,
154 : BufFile *file,
155 : uint32 *hashvalue,
156 : TupleTableSlot *tupleSlot);
157 : static bool ExecHashJoinNewBatch(HashJoinState *hjstate);
158 : static bool ExecParallelHashJoinNewBatch(HashJoinState *hjstate);
159 : static void ExecParallelHashJoinPartitionOuter(HashJoinState *hjstate);
160 :
161 :
162 : /* ----------------------------------------------------------------
163 : * ExecHashJoinImpl
164 : *
165 : * This function implements the Hybrid Hashjoin algorithm. It is marked
166 : * with an always-inline attribute so that ExecHashJoin() and
167 : * ExecParallelHashJoin() can inline it. Compilers that respect the
168 : * attribute should create versions specialized for parallel == true and
169 : * parallel == false with unnecessary branches removed.
170 : *
171 : * Note: the relation we build hash table on is the "inner"
172 : * the other one is "outer".
173 : * ----------------------------------------------------------------
174 : */
175 : static pg_attribute_always_inline TupleTableSlot *
176 GIC 5030235 : ExecHashJoinImpl(PlanState *pstate, bool parallel)
177 : {
178 5030235 : HashJoinState *node = castNode(HashJoinState, pstate);
179 : PlanState *outerNode;
180 : HashState *hashNode;
181 ECB : ExprState *joinqual;
182 : ExprState *otherqual;
183 : ExprContext *econtext;
184 : HashJoinTable hashtable;
185 : TupleTableSlot *outerTupleSlot;
186 : uint32 hashvalue;
187 : int batchno;
188 : ParallelHashJoinState *parallel_state;
189 :
190 : /*
191 : * get information from HashJoin node
192 : */
193 GIC 5030235 : joinqual = node->js.joinqual;
194 5030235 : otherqual = node->js.ps.qual;
195 5030235 : hashNode = (HashState *) innerPlanState(node);
196 5030235 : outerNode = outerPlanState(node);
197 5030235 : hashtable = node->hj_HashTable;
198 CBC 5030235 : econtext = node->js.ps.ps_ExprContext;
199 5030235 : parallel_state = hashNode->parallel_state;
200 ECB :
201 : /*
202 : * Reset per-tuple memory context to free any expression evaluation
203 : * storage allocated in the previous tuple cycle.
204 : */
205 GIC 5030235 : ResetExprContext(econtext);
206 :
207 : /*
208 : * run the hash join state machine
209 : */
210 ECB : for (;;)
211 : {
212 : /*
213 : * It's possible to iterate this loop many times before returning a
214 : * tuple, in some pathological cases such as needing to move much of
215 : * the current batch to a later batch. So let's check for interrupts
216 : * each time through.
217 : */
218 GIC 18182291 : CHECK_FOR_INTERRUPTS();
219 :
220 18182291 : switch (node->hj_JoinState)
221 : {
222 11658 : case HJ_BUILD_HASHTABLE:
223 ECB :
224 : /*
225 : * First time through: build hash table for inner relation.
226 : */
227 CBC 11658 : Assert(hashtable == NULL);
228 :
229 : /*
230 : * If the outer relation is completely empty, and it's not
231 : * right/right-anti/full join, we can quit without building
232 : * the hash table. However, for an inner join it is only a
233 : * win to check this when the outer relation's startup cost is
234 : * less than the projected cost of building the hash table.
235 : * Otherwise it's best to build the hash table first and see
236 : * if the inner relation is empty. (When it's a left join, we
237 : * should always make this check, since we aren't going to be
238 : * able to skip the join on the strength of an empty inner
239 : * relation anyway.)
240 : *
241 : * If we are rescanning the join, we make use of information
242 : * gained on the previous scan: don't bother to try the
243 : * prefetch if the previous scan found the outer relation
244 : * nonempty. This is not 100% reliable since with new
245 : * parameters the outer relation might yield different
246 : * results, but it's a good heuristic.
247 : *
248 : * The only way to make the check is to try to fetch a tuple
249 : * from the outer plan node. If we succeed, we have to stash
250 : * it away for later consumption by ExecHashJoinOuterGetTuple.
251 : */
252 GIC 11658 : if (HJ_FILL_INNER(node))
253 : {
254 : /* no chance to not build the hash table */
255 2505 : node->hj_FirstOuterTupleSlot = NULL;
256 : }
257 CBC 9153 : else if (parallel)
258 : {
259 : /*
260 ECB : * The empty-outer optimization is not implemented for
261 : * shared hash tables, because no one participant can
262 : * determine that there are no outer tuples, and it's not
263 : * yet clear that it's worth the synchronization overhead
264 : * of reaching consensus to figure that out. So we have
265 : * to build the hash table.
266 : */
267 GIC 162 : node->hj_FirstOuterTupleSlot = NULL;
268 : }
269 8991 : else if (HJ_FILL_OUTER(node) ||
270 6998 : (outerNode->plan->startup_cost < hashNode->ps.plan->total_cost &&
271 6599 : !node->hj_OuterNotEmpty))
272 ECB : {
273 GIC 8225 : node->hj_FirstOuterTupleSlot = ExecProcNode(outerNode);
274 CBC 8225 : if (TupIsNull(node->hj_FirstOuterTupleSlot))
275 ECB : {
276 CBC 1772 : node->hj_OuterNotEmpty = false;
277 GIC 1772 : return NULL;
278 ECB : }
279 : else
280 GIC 6453 : node->hj_OuterNotEmpty = true;
281 ECB : }
282 : else
283 GIC 766 : node->hj_FirstOuterTupleSlot = NULL;
284 :
285 ECB : /*
286 : * Create the hash table. If using Parallel Hash, then
287 : * whoever gets here first will create the hash table and any
288 : * later arrivals will merely attach to it.
289 : */
290 GIC 9886 : hashtable = ExecHashTableCreate(hashNode,
291 : node->hj_HashOperators,
292 : node->hj_Collations,
293 9886 : HJ_FILL_INNER(node));
294 9886 : node->hj_HashTable = hashtable;
295 ECB :
296 : /*
297 : * Execute the Hash node, to build the hash table. If using
298 : * Parallel Hash, then we'll try to help hashing unless we
299 : * arrived too late.
300 : */
301 GIC 9886 : hashNode->hashtable = hashtable;
302 9886 : (void) MultiExecProcNode((PlanState *) hashNode);
303 :
304 : /*
305 : * If the inner relation is completely empty, and we're not
306 ECB : * doing a left outer join, we can quit without scanning the
307 : * outer relation.
308 : */
309 GIC 9886 : if (hashtable->totalTuples == 0 && !HJ_FILL_OUTER(node))
310 : {
311 521 : if (parallel)
312 : {
313 : /*
314 : * Advance the build barrier to PHJ_BUILD_RUN before
315 : * proceeding so we can negotiate resource cleanup.
316 : */
317 UIC 0 : Barrier *build_barrier = ¶llel_state->build_barrier;
318 :
319 UNC 0 : while (BarrierPhase(build_barrier) < PHJ_BUILD_RUN)
320 UIC 0 : BarrierArriveAndWait(build_barrier, 0);
321 EUB : }
322 GIC 521 : return NULL;
323 EUB : }
324 :
325 : /*
326 ECB : * need to remember whether nbatch has increased since we
327 : * began scanning the outer relation
328 : */
329 GIC 9365 : hashtable->nbatch_outstart = hashtable->nbatch;
330 :
331 : /*
332 : * Reset OuterNotEmpty for scan. (It's OK if we fetched a
333 ECB : * tuple above, because ExecHashJoinOuterGetTuple will
334 : * immediately set it again.)
335 : */
336 GIC 9365 : node->hj_OuterNotEmpty = false;
337 :
338 9365 : if (parallel)
339 189 : {
340 ECB : Barrier *build_barrier;
341 :
342 CBC 189 : build_barrier = ¶llel_state->build_barrier;
343 GNC 189 : Assert(BarrierPhase(build_barrier) == PHJ_BUILD_HASH_OUTER ||
344 : BarrierPhase(build_barrier) == PHJ_BUILD_RUN ||
345 : BarrierPhase(build_barrier) == PHJ_BUILD_FREE);
346 189 : if (BarrierPhase(build_barrier) == PHJ_BUILD_HASH_OUTER)
347 ECB : {
348 : /*
349 : * If multi-batch, we need to hash the outer relation
350 : * up front.
351 : */
352 GIC 151 : if (hashtable->nbatch > 1)
353 87 : ExecParallelHashJoinPartitionOuter(node);
354 151 : BarrierArriveAndWait(build_barrier,
355 : WAIT_EVENT_HASH_BUILD_HASH_OUTER);
356 ECB : }
357 GNC 38 : else if (BarrierPhase(build_barrier) == PHJ_BUILD_FREE)
358 ECB : {
359 : /*
360 : * If we attached so late that the job is finished and
361 : * the batch state has been freed, we can return
362 : * immediately.
363 : */
364 UIC 0 : return NULL;
365 : }
366 :
367 : /* Each backend should now select a batch to work on. */
368 GNC 189 : Assert(BarrierPhase(build_barrier) == PHJ_BUILD_RUN);
369 GIC 189 : hashtable->curbatch = -1;
370 189 : node->hj_JoinState = HJ_NEED_NEW_BATCH;
371 :
372 CBC 189 : continue;
373 ECB : }
374 : else
375 GIC 9176 : node->hj_JoinState = HJ_NEED_NEW_OUTER;
376 ECB :
377 : /* FALL THRU */
378 :
379 CBC 8723214 : case HJ_NEED_NEW_OUTER:
380 :
381 : /*
382 : * We don't have an outer tuple, try to get the next one
383 ECB : */
384 GIC 8723214 : if (parallel)
385 : outerTupleSlot =
386 1080511 : ExecParallelHashJoinOuterGetTuple(outerNode, node,
387 : &hashvalue);
388 ECB : else
389 : outerTupleSlot =
390 CBC 7642703 : ExecHashJoinOuterGetTuple(outerNode, node, &hashvalue);
391 :
392 GIC 8723214 : if (TupIsNull(outerTupleSlot))
393 : {
394 ECB : /* end of batch, or maybe whole join */
395 GIC 10589 : if (HJ_FILL_INNER(node))
396 ECB : {
397 : /* set up to scan for unmatched inner tuples */
398 GNC 2427 : if (parallel)
399 : {
400 : /*
401 : * Only one process is currently allow to handle
402 : * each batch's unmatched tuples, in a parallel
403 : * join.
404 : */
405 43 : if (ExecParallelPrepHashTableForUnmatched(node))
406 30 : node->hj_JoinState = HJ_FILL_INNER_TUPLES;
407 : else
408 13 : node->hj_JoinState = HJ_NEED_NEW_BATCH;
409 : }
410 : else
411 : {
412 2384 : ExecPrepHashTableForUnmatched(node);
413 2384 : node->hj_JoinState = HJ_FILL_INNER_TUPLES;
414 : }
415 : }
416 : else
417 CBC 8162 : node->hj_JoinState = HJ_NEED_NEW_BATCH;
418 GIC 10589 : continue;
419 : }
420 :
421 8712625 : econtext->ecxt_outertuple = outerTupleSlot;
422 8712625 : node->hj_MatchedOuter = false;
423 :
424 ECB : /*
425 : * Find the corresponding bucket for this tuple in the main
426 : * hash table or skew hash table.
427 : */
428 GIC 8712625 : node->hj_CurHashValue = hashvalue;
429 8712625 : ExecHashGetBucketAndBatch(hashtable, hashvalue,
430 : &node->hj_CurBucketNo, &batchno);
431 CBC 8712625 : node->hj_CurSkewBucketNo = ExecHashGetSkewBucket(hashtable,
432 ECB : hashvalue);
433 GIC 8712625 : node->hj_CurTuple = NULL;
434 :
435 : /*
436 ECB : * The tuple might not belong to the current batch (where
437 : * "current batch" includes the skew buckets if any).
438 : */
439 GIC 8712625 : if (batchno != hashtable->curbatch &&
440 CBC 735696 : node->hj_CurSkewBucketNo == INVALID_SKEW_BUCKET_NO)
441 735096 : {
442 : bool shouldFree;
443 GIC 735096 : MinimalTuple mintuple = ExecFetchSlotMinimalTuple(outerTupleSlot,
444 : &shouldFree);
445 :
446 : /*
447 ECB : * Need to postpone this outer tuple to a later batch.
448 : * Save it in the corresponding outer-batch file.
449 : */
450 CBC 735096 : Assert(parallel_state == NULL);
451 GIC 735096 : Assert(batchno > hashtable->curbatch);
452 CBC 735096 : ExecHashJoinSaveTuple(mintuple, hashvalue,
453 GIC 735096 : &hashtable->outerBatchFile[batchno]);
454 :
455 735096 : if (shouldFree)
456 735096 : heap_free_minimal_tuple(mintuple);
457 :
458 ECB : /* Loop around, staying in HJ_NEED_NEW_OUTER state */
459 CBC 735096 : continue;
460 ECB : }
461 :
462 : /* OK, let's scan the bucket for matches */
463 GIC 7977529 : node->hj_JoinState = HJ_SCAN_BUCKET;
464 :
465 : /* FALL THRU */
466 :
467 11062550 : case HJ_SCAN_BUCKET:
468 :
469 ECB : /*
470 : * Scan the selected hash bucket for matches to current outer
471 : */
472 CBC 11062550 : if (parallel)
473 : {
474 2100024 : if (!ExecParallelScanHashBucket(node, econtext))
475 ECB : {
476 : /* out of matches; check for possible outer-join fill */
477 GIC 1080012 : node->hj_JoinState = HJ_FILL_OUTER_TUPLE;
478 CBC 1080012 : continue;
479 : }
480 : }
481 : else
482 ECB : {
483 GIC 8962526 : if (!ExecScanHashBucket(node, econtext))
484 : {
485 : /* out of matches; check for possible outer-join fill */
486 CBC 4844799 : node->hj_JoinState = HJ_FILL_OUTER_TUPLE;
487 GIC 4844799 : continue;
488 : }
489 : }
490 :
491 ECB : /*
492 : * We've got a match, but still need to test non-hashed quals.
493 : * ExecScanHashBucket already set up all the state needed to
494 : * call ExecQual.
495 : *
496 : * If we pass the qual, then save state for next call and have
497 : * ExecProject form the projection, store it in the tuple
498 : * table, and return the slot.
499 : *
500 : * Only the joinquals determine tuple match status, but all
501 : * quals must pass to actually return the tuple.
502 : */
503 GIC 5137739 : if (joinqual == NULL || ExecQual(joinqual, econtext))
504 : {
505 CBC 5062757 : node->hj_MatchedOuter = true;
506 ECB :
507 :
508 : /*
509 : * This is really only needed if HJ_FILL_INNER(node), but
510 : * we'll avoid the branch and just set it always.
511 : */
512 GNC 5062757 : if (!HeapTupleHeaderHasMatch(HJTUPLE_MINTUPLE(node->hj_CurTuple)))
513 CBC 3070831 : HeapTupleHeaderSetMatch(HJTUPLE_MINTUPLE(node->hj_CurTuple));
514 :
515 : /* In an antijoin, we never return a matched tuple */
516 GIC 5062757 : if (node->js.jointype == JOIN_ANTI)
517 : {
518 772331 : node->hj_JoinState = HJ_NEED_NEW_OUTER;
519 CBC 772331 : continue;
520 ECB : }
521 :
522 : /*
523 : * In a right-antijoin, we never return a matched tuple.
524 : * And we need to stay on the current outer tuple to
525 : * continue scanning the inner side for matches.
526 : */
527 GNC 4290426 : if (node->js.jointype == JOIN_RIGHT_ANTI)
528 14426 : continue;
529 :
530 : /*
531 ECB : * If we only need to join to the first matching inner
532 : * tuple, then consider returning this one, but after that
533 : * continue with next outer tuple.
534 : */
535 GIC 4276000 : if (node->js.single_match)
536 1280353 : node->hj_JoinState = HJ_NEED_NEW_OUTER;
537 :
538 4276000 : if (otherqual == NULL || ExecQual(otherqual, econtext))
539 4186260 : return ExecProject(node->js.ps.ps_ProjInfo);
540 : else
541 89740 : InstrCountFiltered2(node, 1);
542 ECB : }
543 : else
544 GIC 74982 : InstrCountFiltered1(node, 1);
545 164722 : break;
546 :
547 5924811 : case HJ_FILL_OUTER_TUPLE:
548 :
549 : /*
550 ECB : * The current outer tuple has run out of matches, so check
551 : * whether to emit a dummy outer-join tuple. Whether we emit
552 : * one or not, the next state is NEED_NEW_OUTER.
553 : */
554 CBC 5924811 : node->hj_JoinState = HJ_NEED_NEW_OUTER;
555 :
556 5924811 : if (!node->hj_MatchedOuter &&
557 GIC 3573372 : HJ_FILL_OUTER(node))
558 : {
559 ECB : /*
560 : * Generate a fake join tuple with nulls for the inner
561 : * tuple, and return it if it passes the non-join quals.
562 : */
563 GIC 818897 : econtext->ecxt_innertuple = node->hj_NullInnerTupleSlot;
564 :
565 818897 : if (otherqual == NULL || ExecQual(otherqual, econtext))
566 402058 : return ExecProject(node->js.ps.ps_ProjInfo);
567 : else
568 416839 : InstrCountFiltered2(node, 1);
569 ECB : }
570 GIC 5522753 : break;
571 ECB :
572 CBC 435988 : case HJ_FILL_INNER_TUPLES:
573 :
574 : /*
575 : * We have finished a batch, but we are doing
576 : * right/right-anti/full join, so any unmatched inner tuples
577 : * in the hashtable have to be emitted before we continue to
578 : * the next batch.
579 ECB : */
580 GNC 811946 : if (!(parallel ? ExecParallelScanHashTableForUnmatched(node, econtext)
581 375958 : : ExecScanHashTableForUnmatched(node, econtext)))
582 ECB : {
583 : /* no more unmatched tuples */
584 GIC 2411 : node->hj_JoinState = HJ_NEED_NEW_BATCH;
585 CBC 2411 : continue;
586 : }
587 ECB :
588 : /*
589 : * Generate a fake join tuple with nulls for the outer tuple,
590 : * and return it if it passes the non-join quals.
591 : */
592 GIC 433577 : econtext->ecxt_outertuple = node->hj_NullOuterTupleSlot;
593 :
594 433577 : if (otherqual == NULL || ExecQual(otherqual, econtext))
595 430100 : return ExecProject(node->js.ps.ps_ProjInfo);
596 : else
597 CBC 3477 : InstrCountFiltered2(node, 1);
598 3477 : break;
599 :
600 GIC 10775 : case HJ_NEED_NEW_BATCH:
601 ECB :
602 : /*
603 : * Try to advance to next batch. Done if there are no more.
604 : */
605 GIC 10775 : if (parallel)
606 : {
607 688 : if (!ExecParallelHashJoinNewBatch(node))
608 189 : return NULL; /* end of parallel-aware join */
609 ECB : }
610 : else
611 : {
612 CBC 10087 : if (!ExecHashJoinNewBatch(node))
613 GIC 9335 : return NULL; /* end of parallel-oblivious join */
614 ECB : }
615 CBC 1251 : node->hj_JoinState = HJ_NEED_NEW_OUTER;
616 GIC 1251 : break;
617 ECB :
618 UIC 0 : default:
619 0 : elog(ERROR, "unrecognized hashjoin state: %d",
620 : (int) node->hj_JoinState);
621 : }
622 ECB : }
623 : }
624 :
625 : /* ----------------------------------------------------------------
626 : * ExecHashJoin
627 : *
628 : * Parallel-oblivious version.
629 : * ----------------------------------------------------------------
630 : */
631 : static TupleTableSlot * /* return: a tuple or NULL */
632 CBC 3890034 : ExecHashJoin(PlanState *pstate)
633 ECB : {
634 : /*
635 EUB : * On sufficiently smart compilers this should be inlined with the
636 : * parallel-aware branches removed.
637 : */
638 GIC 3890034 : return ExecHashJoinImpl(pstate, false);
639 : }
640 :
641 : /* ----------------------------------------------------------------
642 : * ExecParallelHashJoin
643 : *
644 : * Parallel-aware version.
645 : * ----------------------------------------------------------------
646 : */
647 : static TupleTableSlot * /* return: a tuple or NULL */
648 1140201 : ExecParallelHashJoin(PlanState *pstate)
649 ECB : {
650 : /*
651 : * On sufficiently smart compilers this should be inlined with the
652 : * parallel-oblivious branches removed.
653 : */
654 GIC 1140201 : return ExecHashJoinImpl(pstate, true);
655 ECB : }
656 :
657 : /* ----------------------------------------------------------------
658 : * ExecInitHashJoin
659 : *
660 : * Init routine for HashJoin node.
661 : * ----------------------------------------------------------------
662 : */
663 : HashJoinState *
664 GIC 14214 : ExecInitHashJoin(HashJoin *node, EState *estate, int eflags)
665 ECB : {
666 : HashJoinState *hjstate;
667 : Plan *outerNode;
668 : Hash *hashNode;
669 : TupleDesc outerDesc,
670 : innerDesc;
671 : const TupleTableSlotOps *ops;
672 :
673 : /* check for unsupported flags */
674 GIC 14214 : Assert(!(eflags & (EXEC_FLAG_BACKWARD | EXEC_FLAG_MARK)));
675 :
676 : /*
677 : * create state structure
678 : */
679 14214 : hjstate = makeNode(HashJoinState);
680 14214 : hjstate->js.ps.plan = (Plan *) node;
681 CBC 14214 : hjstate->js.ps.state = estate;
682 :
683 : /*
684 : * See ExecHashJoinInitializeDSM() and ExecHashJoinInitializeWorker()
685 : * where this function may be replaced with a parallel version, if we
686 : * managed to launch a parallel query.
687 : */
688 GIC 14214 : hjstate->js.ps.ExecProcNode = ExecHashJoin;
689 14214 : hjstate->js.jointype = node->join.jointype;
690 :
691 ECB : /*
692 : * Miscellaneous initialization
693 : *
694 : * create expression context for node
695 : */
696 CBC 14214 : ExecAssignExprContext(estate, &hjstate->js.ps);
697 ECB :
698 : /*
699 : * initialize child nodes
700 : *
701 : * Note: we could suppress the REWIND flag for the inner input, which
702 : * would amount to betting that the hash will be a single batch. Not
703 : * clear if this would be a win or not.
704 : */
705 CBC 14214 : outerNode = outerPlan(node);
706 14214 : hashNode = (Hash *) innerPlan(node);
707 :
708 GIC 14214 : outerPlanState(hjstate) = ExecInitNode(outerNode, estate, eflags);
709 14214 : outerDesc = ExecGetResultType(outerPlanState(hjstate));
710 14214 : innerPlanState(hjstate) = ExecInitNode((Plan *) hashNode, estate, eflags);
711 14214 : innerDesc = ExecGetResultType(innerPlanState(hjstate));
712 :
713 ECB : /*
714 : * Initialize result slot, type and projection.
715 : */
716 GIC 14214 : ExecInitResultTupleSlotTL(&hjstate->js.ps, &TTSOpsVirtual);
717 14214 : ExecAssignProjectionInfo(&hjstate->js.ps, NULL);
718 :
719 : /*
720 : * tuple table initialization
721 : */
722 CBC 14214 : ops = ExecGetResultSlotOps(outerPlanState(hjstate), NULL);
723 14214 : hjstate->hj_OuterTupleSlot = ExecInitExtraTupleSlot(estate, outerDesc,
724 : ops);
725 ECB :
726 : /*
727 : * detect whether we need only consider the first matching inner tuple
728 : */
729 GIC 21208 : hjstate->js.single_match = (node->join.inner_unique ||
730 6994 : node->join.jointype == JOIN_SEMI);
731 :
732 : /* set up null tuples for outer joins, if needed */
733 CBC 14214 : switch (node->join.jointype)
734 ECB : {
735 GIC 9014 : case JOIN_INNER:
736 : case JOIN_SEMI:
737 9014 : break;
738 2167 : case JOIN_LEFT:
739 ECB : case JOIN_ANTI:
740 CBC 2167 : hjstate->hj_NullInnerTupleSlot =
741 GIC 2167 : ExecInitNullTupleSlot(estate, innerDesc, &TTSOpsVirtual);
742 2167 : break;
743 2524 : case JOIN_RIGHT:
744 : case JOIN_RIGHT_ANTI:
745 2524 : hjstate->hj_NullOuterTupleSlot =
746 2524 : ExecInitNullTupleSlot(estate, outerDesc, &TTSOpsVirtual);
747 CBC 2524 : break;
748 509 : case JOIN_FULL:
749 GIC 509 : hjstate->hj_NullOuterTupleSlot =
750 509 : ExecInitNullTupleSlot(estate, outerDesc, &TTSOpsVirtual);
751 CBC 509 : hjstate->hj_NullInnerTupleSlot =
752 GIC 509 : ExecInitNullTupleSlot(estate, innerDesc, &TTSOpsVirtual);
753 CBC 509 : break;
754 UIC 0 : default:
755 LBC 0 : elog(ERROR, "unrecognized join type: %d",
756 ECB : (int) node->join.jointype);
757 : }
758 :
759 : /*
760 : * now for some voodoo. our temporary tuple slot is actually the result
761 : * tuple slot of the Hash node (which is our inner plan). we can do this
762 : * because Hash nodes don't return tuples via ExecProcNode() -- instead
763 : * the hash join node uses ExecScanHashBucket() to get at the contents of
764 : * the hash table. -cim 6/9/91
765 : */
766 : {
767 CBC 14214 : HashState *hashstate = (HashState *) innerPlanState(hjstate);
768 14214 : TupleTableSlot *slot = hashstate->ps.ps_ResultTupleSlot;
769 ECB :
770 CBC 14214 : hjstate->hj_HashTupleSlot = slot;
771 ECB : }
772 EUB :
773 : /*
774 : * initialize child expressions
775 : */
776 GIC 14214 : hjstate->js.ps.qual =
777 14214 : ExecInitQual(node->join.plan.qual, (PlanState *) hjstate);
778 14214 : hjstate->js.joinqual =
779 14214 : ExecInitQual(node->join.joinqual, (PlanState *) hjstate);
780 14214 : hjstate->hashclauses =
781 14214 : ExecInitQual(node->hashclauses, (PlanState *) hjstate);
782 :
783 : /*
784 : * initialize hash-specific info
785 ECB : */
786 CBC 14214 : hjstate->hj_HashTable = NULL;
787 GIC 14214 : hjstate->hj_FirstOuterTupleSlot = NULL;
788 ECB :
789 GIC 14214 : hjstate->hj_CurHashValue = 0;
790 14214 : hjstate->hj_CurBucketNo = 0;
791 14214 : hjstate->hj_CurSkewBucketNo = INVALID_SKEW_BUCKET_NO;
792 14214 : hjstate->hj_CurTuple = NULL;
793 :
794 CBC 14214 : hjstate->hj_OuterHashKeys = ExecInitExprList(node->hashkeys,
795 ECB : (PlanState *) hjstate);
796 CBC 14214 : hjstate->hj_HashOperators = node->hashoperators;
797 14214 : hjstate->hj_Collations = node->hashcollations;
798 ECB :
799 CBC 14214 : hjstate->hj_JoinState = HJ_BUILD_HASHTABLE;
800 GIC 14214 : hjstate->hj_MatchedOuter = false;
801 14214 : hjstate->hj_OuterNotEmpty = false;
802 :
803 14214 : return hjstate;
804 ECB : }
805 :
806 : /* ----------------------------------------------------------------
807 : * ExecEndHashJoin
808 : *
809 : * clean up routine for HashJoin node
810 : * ----------------------------------------------------------------
811 : */
812 : void
813 GIC 14172 : ExecEndHashJoin(HashJoinState *node)
814 ECB : {
815 : /*
816 : * Free hash table
817 : */
818 CBC 14172 : if (node->hj_HashTable)
819 ECB : {
820 GIC 9433 : ExecHashTableDestroy(node->hj_HashTable);
821 CBC 9433 : node->hj_HashTable = NULL;
822 : }
823 :
824 : /*
825 : * Free the exprcontext
826 : */
827 GIC 14172 : ExecFreeExprContext(&node->js.ps);
828 :
829 : /*
830 : * clean out the tuple table
831 ECB : */
832 GIC 14172 : ExecClearTuple(node->js.ps.ps_ResultTupleSlot);
833 14172 : ExecClearTuple(node->hj_OuterTupleSlot);
834 14172 : ExecClearTuple(node->hj_HashTupleSlot);
835 :
836 ECB : /*
837 : * clean up subtrees
838 : */
839 CBC 14172 : ExecEndNode(outerPlanState(node));
840 GIC 14172 : ExecEndNode(innerPlanState(node));
841 14172 : }
842 :
843 : /*
844 : * ExecHashJoinOuterGetTuple
845 ECB : *
846 : * get the next outer tuple for a parallel oblivious hashjoin: either by
847 : * executing the outer plan node in the first pass, or from the temp
848 : * files for the hashjoin batches.
849 : *
850 : * Returns a null slot if no more outer tuples (within the current batch).
851 : *
852 : * On success, the tuple's hash value is stored at *hashvalue --- this is
853 : * either originally computed, or re-read from the temp file.
854 : */
855 : static TupleTableSlot *
856 GIC 7642703 : ExecHashJoinOuterGetTuple(PlanState *outerNode,
857 ECB : HashJoinState *hjstate,
858 : uint32 *hashvalue)
859 : {
860 GIC 7642703 : HashJoinTable hashtable = hjstate->hj_HashTable;
861 7642703 : int curbatch = hashtable->curbatch;
862 : TupleTableSlot *slot;
863 :
864 7642703 : if (curbatch == 0) /* if it is the first pass */
865 : {
866 : /*
867 : * Check to see if first outer tuple was already fetched by
868 : * ExecHashJoin() and not used yet.
869 : */
870 6906855 : slot = hjstate->hj_FirstOuterTupleSlot;
871 6906855 : if (!TupIsNull(slot))
872 6152 : hjstate->hj_FirstOuterTupleSlot = NULL;
873 : else
874 CBC 6900703 : slot = ExecProcNode(outerNode);
875 :
876 GIC 6907262 : while (!TupIsNull(slot))
877 : {
878 ECB : /*
879 : * We have to compute the tuple's hash value.
880 : */
881 GIC 6897924 : ExprContext *econtext = hjstate->js.ps.ps_ExprContext;
882 ECB :
883 GIC 6897924 : econtext->ecxt_outertuple = slot;
884 6897924 : if (ExecHashGetHashValue(hashtable, econtext,
885 : hjstate->hj_OuterHashKeys,
886 : true, /* outer tuple */
887 6897924 : HJ_FILL_OUTER(hjstate),
888 ECB : hashvalue))
889 : {
890 : /* remember outer relation is not empty for possible rescan */
891 GIC 6897517 : hjstate->hj_OuterNotEmpty = true;
892 ECB :
893 GIC 6897517 : return slot;
894 ECB : }
895 :
896 : /*
897 : * That tuple couldn't match because of a NULL, so discard it and
898 : * continue with the next one.
899 : */
900 GIC 407 : slot = ExecProcNode(outerNode);
901 ECB : }
902 : }
903 GIC 735848 : else if (curbatch < hashtable->nbatch)
904 : {
905 CBC 735848 : BufFile *file = hashtable->outerBatchFile[curbatch];
906 :
907 : /*
908 : * In outer-join cases, we could get here even though the batch file
909 ECB : * is empty.
910 : */
911 CBC 735848 : if (file == NULL)
912 UIC 0 : return NULL;
913 :
914 GIC 735848 : slot = ExecHashJoinGetSavedTuple(hjstate,
915 : file,
916 : hashvalue,
917 : hjstate->hj_OuterTupleSlot);
918 CBC 735848 : if (!TupIsNull(slot))
919 GIC 735096 : return slot;
920 : }
921 ECB :
922 : /* End of this batch */
923 CBC 10090 : return NULL;
924 : }
925 :
926 : /*
927 : * ExecHashJoinOuterGetTuple variant for the parallel case.
928 : */
929 ECB : static TupleTableSlot *
930 GBC 1080511 : ExecParallelHashJoinOuterGetTuple(PlanState *outerNode,
931 : HashJoinState *hjstate,
932 ECB : uint32 *hashvalue)
933 : {
934 GIC 1080511 : HashJoinTable hashtable = hjstate->hj_HashTable;
935 1080511 : int curbatch = hashtable->curbatch;
936 ECB : TupleTableSlot *slot;
937 :
938 : /*
939 : * In the Parallel Hash case we only run the outer plan directly for
940 : * single-batch hash joins. Otherwise we have to go to batch files, even
941 : * for batch 0.
942 : */
943 GIC 1080511 : if (curbatch == 0 && hashtable->nbatch == 1)
944 : {
945 480071 : slot = ExecProcNode(outerNode);
946 :
947 480071 : while (!TupIsNull(slot))
948 ECB : {
949 GIC 480000 : ExprContext *econtext = hjstate->js.ps.ps_ExprContext;
950 :
951 480000 : econtext->ecxt_outertuple = slot;
952 CBC 480000 : if (ExecHashGetHashValue(hashtable, econtext,
953 ECB : hjstate->hj_OuterHashKeys,
954 : true, /* outer tuple */
955 GIC 480000 : HJ_FILL_OUTER(hjstate),
956 : hashvalue))
957 480000 : return slot;
958 :
959 : /*
960 : * That tuple couldn't match because of a NULL, so discard it and
961 ECB : * continue with the next one.
962 : */
963 LBC 0 : slot = ExecProcNode(outerNode);
964 : }
965 ECB : }
966 GIC 600440 : else if (curbatch < hashtable->nbatch)
967 ECB : {
968 : MinimalTuple tuple;
969 :
970 CBC 600440 : tuple = sts_parallel_scan_next(hashtable->batches[curbatch].outer_tuples,
971 : hashvalue);
972 GIC 600440 : if (tuple != NULL)
973 ECB : {
974 GIC 600012 : ExecForceStoreMinimalTuple(tuple,
975 ECB : hjstate->hj_OuterTupleSlot,
976 : false);
977 GIC 600012 : slot = hjstate->hj_OuterTupleSlot;
978 600012 : return slot;
979 : }
980 : else
981 GBC 428 : ExecClearTuple(hjstate->hj_OuterTupleSlot);
982 : }
983 :
984 ECB : /* End of this batch */
985 GNC 499 : hashtable->batches[curbatch].outer_eof = true;
986 :
987 GIC 499 : return NULL;
988 : }
989 :
990 ECB : /*
991 : * ExecHashJoinNewBatch
992 : * switch to a new hashjoin batch
993 : *
994 : * Returns true if successful, false if there are no more batches.
995 : */
996 : static bool
997 CBC 10087 : ExecHashJoinNewBatch(HashJoinState *hjstate)
998 ECB : {
999 GIC 10087 : HashJoinTable hashtable = hjstate->hj_HashTable;
1000 : int nbatch;
1001 ECB : int curbatch;
1002 : BufFile *innerFile;
1003 : TupleTableSlot *slot;
1004 : uint32 hashvalue;
1005 :
1006 GIC 10087 : nbatch = hashtable->nbatch;
1007 CBC 10087 : curbatch = hashtable->curbatch;
1008 :
1009 GIC 10087 : if (curbatch > 0)
1010 : {
1011 : /*
1012 : * We no longer need the previous outer batch file; close it right
1013 : * away to free disk space.
1014 : */
1015 752 : if (hashtable->outerBatchFile[curbatch])
1016 752 : BufFileClose(hashtable->outerBatchFile[curbatch]);
1017 CBC 752 : hashtable->outerBatchFile[curbatch] = NULL;
1018 : }
1019 ECB : else /* we just finished the first batch */
1020 : {
1021 : /*
1022 : * Reset some of the skew optimization state variables, since we no
1023 : * longer need to consider skew tuples after the first batch. The
1024 : * memory context reset we are about to do will release the skew
1025 : * hashtable itself.
1026 : */
1027 CBC 9335 : hashtable->skewEnabled = false;
1028 GIC 9335 : hashtable->skewBucket = NULL;
1029 CBC 9335 : hashtable->skewBucketNums = NULL;
1030 GIC 9335 : hashtable->nSkewBuckets = 0;
1031 9335 : hashtable->spaceUsedSkew = 0;
1032 : }
1033 :
1034 : /*
1035 ECB : * We can always skip over any batches that are completely empty on both
1036 : * sides. We can sometimes skip over batches that are empty on only one
1037 : * side, but there are exceptions:
1038 : *
1039 : * 1. In a left/full outer join, we have to process outer batches even if
1040 : * the inner batch is empty. Similarly, in a right/right-anti/full outer
1041 : * join, we have to process inner batches even if the outer batch is
1042 : * empty.
1043 : *
1044 : * 2. If we have increased nbatch since the initial estimate, we have to
1045 : * scan inner batches since they might contain tuples that need to be
1046 : * reassigned to later inner batches.
1047 : *
1048 : * 3. Similarly, if we have increased nbatch since starting the outer
1049 : * scan, we have to rescan outer batches in case they contain tuples that
1050 : * need to be reassigned.
1051 : */
1052 CBC 10087 : curbatch++;
1053 GIC 10087 : while (curbatch < nbatch &&
1054 752 : (hashtable->outerBatchFile[curbatch] == NULL ||
1055 752 : hashtable->innerBatchFile[curbatch] == NULL))
1056 : {
1057 UIC 0 : if (hashtable->outerBatchFile[curbatch] &&
1058 0 : HJ_FILL_OUTER(hjstate))
1059 0 : break; /* must process due to rule 1 */
1060 0 : if (hashtable->innerBatchFile[curbatch] &&
1061 0 : HJ_FILL_INNER(hjstate))
1062 0 : break; /* must process due to rule 1 */
1063 0 : if (hashtable->innerBatchFile[curbatch] &&
1064 0 : nbatch != hashtable->nbatch_original)
1065 0 : break; /* must process due to rule 2 */
1066 0 : if (hashtable->outerBatchFile[curbatch] &&
1067 0 : nbatch != hashtable->nbatch_outstart)
1068 0 : break; /* must process due to rule 3 */
1069 : /* We can ignore this batch. */
1070 : /* Release associated temp files right away. */
1071 0 : if (hashtable->innerBatchFile[curbatch])
1072 0 : BufFileClose(hashtable->innerBatchFile[curbatch]);
1073 LBC 0 : hashtable->innerBatchFile[curbatch] = NULL;
1074 0 : if (hashtable->outerBatchFile[curbatch])
1075 0 : BufFileClose(hashtable->outerBatchFile[curbatch]);
1076 0 : hashtable->outerBatchFile[curbatch] = NULL;
1077 UIC 0 : curbatch++;
1078 EUB : }
1079 :
1080 GBC 10087 : if (curbatch >= nbatch)
1081 9335 : return false; /* no more batches */
1082 EUB :
1083 GBC 752 : hashtable->curbatch = curbatch;
1084 EUB :
1085 : /*
1086 : * Reload the hash table with the new inner batch (which could be empty)
1087 : */
1088 GBC 752 : ExecHashTableReset(hashtable);
1089 EUB :
1090 GIC 752 : innerFile = hashtable->innerBatchFile[curbatch];
1091 :
1092 GBC 752 : if (innerFile != NULL)
1093 EUB : {
1094 GNC 752 : if (BufFileSeek(innerFile, 0, 0, SEEK_SET))
1095 UBC 0 : ereport(ERROR,
1096 EUB : (errcode_for_file_access(),
1097 : errmsg("could not rewind hash-join temporary file")));
1098 :
1099 GIC 1669353 : while ((slot = ExecHashJoinGetSavedTuple(hjstate,
1100 : innerFile,
1101 ECB : &hashvalue,
1102 : hjstate->hj_HashTupleSlot)))
1103 : {
1104 : /*
1105 : * NOTE: some tuples may be sent to future batches. Also, it is
1106 : * possible for hashtable->nbatch to be increased here!
1107 : */
1108 GIC 1668601 : ExecHashTableInsert(hashtable, slot, hashvalue);
1109 ECB : }
1110 :
1111 : /*
1112 : * after we build the hash table, the inner batch file is no longer
1113 : * needed
1114 : */
1115 CBC 752 : BufFileClose(innerFile);
1116 GBC 752 : hashtable->innerBatchFile[curbatch] = NULL;
1117 : }
1118 :
1119 : /*
1120 ECB : * Rewind outer batch file (if present), so that we can start reading it.
1121 : */
1122 GIC 752 : if (hashtable->outerBatchFile[curbatch] != NULL)
1123 : {
1124 GNC 752 : if (BufFileSeek(hashtable->outerBatchFile[curbatch], 0, 0, SEEK_SET))
1125 UIC 0 : ereport(ERROR,
1126 : (errcode_for_file_access(),
1127 : errmsg("could not rewind hash-join temporary file")));
1128 : }
1129 ECB :
1130 GIC 752 : return true;
1131 : }
1132 :
1133 : /*
1134 : * Choose a batch to work on, and attach to it. Returns true if successful,
1135 : * false if there are no more batches.
1136 ECB : */
1137 : static bool
1138 GIC 688 : ExecParallelHashJoinNewBatch(HashJoinState *hjstate)
1139 : {
1140 688 : HashJoinTable hashtable = hjstate->hj_HashTable;
1141 : int start_batchno;
1142 : int batchno;
1143 ECB :
1144 : /*
1145 : * If we were already attached to a batch, remember not to bother checking
1146 EUB : * it again, and detach from it (possibly freeing the hash table if we are
1147 : * last to detach).
1148 : */
1149 GIC 688 : if (hashtable->curbatch >= 0)
1150 : {
1151 CBC 486 : hashtable->batches[hashtable->curbatch].done = true;
1152 GIC 486 : ExecHashTableDetachBatch(hashtable);
1153 : }
1154 :
1155 : /*
1156 : * Search for a batch that isn't done. We use an atomic counter to start
1157 : * our search at a different batch in every participant when there are
1158 : * more batches than participants.
1159 ECB : */
1160 GIC 688 : batchno = start_batchno =
1161 CBC 688 : pg_atomic_fetch_add_u32(&hashtable->parallel_state->distributor, 1) %
1162 GIC 688 : hashtable->nbatch;
1163 : do
1164 : {
1165 : uint32 hashvalue;
1166 : MinimalTuple tuple;
1167 : TupleTableSlot *slot;
1168 :
1169 1799 : if (!hashtable->batches[batchno].done)
1170 ECB : {
1171 : SharedTuplestoreAccessor *inner_tuples;
1172 CBC 948 : Barrier *batch_barrier =
1173 948 : &hashtable->batches[batchno].shared->batch_barrier;
1174 :
1175 GIC 948 : switch (BarrierAttach(batch_barrier))
1176 : {
1177 GNC 327 : case PHJ_BATCH_ELECT:
1178 :
1179 : /* One backend allocates the hash table. */
1180 GIC 327 : if (BarrierArriveAndWait(batch_barrier,
1181 ECB : WAIT_EVENT_HASH_BATCH_ELECT))
1182 CBC 327 : ExecParallelHashTableAlloc(hashtable, batchno);
1183 ECB : /* Fall through. */
1184 :
1185 : case PHJ_BATCH_ALLOCATE:
1186 : /* Wait for allocation to complete. */
1187 GIC 328 : BarrierArriveAndWait(batch_barrier,
1188 : WAIT_EVENT_HASH_BATCH_ALLOCATE);
1189 : /* Fall through. */
1190 ECB :
1191 GNC 342 : case PHJ_BATCH_LOAD:
1192 : /* Start (or join in) loading tuples. */
1193 CBC 342 : ExecParallelHashTableSetCurrentBatch(hashtable, batchno);
1194 342 : inner_tuples = hashtable->batches[batchno].inner_tuples;
1195 GIC 342 : sts_begin_parallel_scan(inner_tuples);
1196 CBC 542379 : while ((tuple = sts_parallel_scan_next(inner_tuples,
1197 : &hashvalue)))
1198 ECB : {
1199 GIC 542037 : ExecForceStoreMinimalTuple(tuple,
1200 : hjstate->hj_HashTupleSlot,
1201 ECB : false);
1202 GIC 542037 : slot = hjstate->hj_HashTupleSlot;
1203 CBC 542037 : ExecParallelHashTableInsertCurrentBatch(hashtable, slot,
1204 : hashvalue);
1205 : }
1206 GIC 342 : sts_end_parallel_scan(inner_tuples);
1207 342 : BarrierArriveAndWait(batch_barrier,
1208 ECB : WAIT_EVENT_HASH_BATCH_LOAD);
1209 : /* Fall through. */
1210 :
1211 GNC 499 : case PHJ_BATCH_PROBE:
1212 ECB :
1213 : /*
1214 : * This batch is ready to probe. Return control to
1215 : * caller. We stay attached to batch_barrier so that the
1216 : * hash table stays alive until everyone's finished
1217 : * probing it, but no participant is allowed to wait at
1218 : * this barrier again (or else a deadlock could occur).
1219 : * All attached participants must eventually detach from
1220 : * the barrier and one worker must advance the phase so
1221 : * that the final phase is reached.
1222 : */
1223 CBC 499 : ExecParallelHashTableSetCurrentBatch(hashtable, batchno);
1224 499 : sts_begin_parallel_scan(hashtable->batches[batchno].outer_tuples);
1225 :
1226 GNC 499 : return true;
1227 2 : case PHJ_BATCH_SCAN:
1228 :
1229 : /*
1230 : * In principle, we could help scan for unmatched tuples,
1231 : * since that phase is already underway (the thing we
1232 : * can't do under current deadlock-avoidance rules is wait
1233 : * for others to arrive at PHJ_BATCH_SCAN, because
1234 : * PHJ_BATCH_PROBE emits tuples, but in this case we just
1235 : * got here without waiting). That is not yet done. For
1236 : * now, we just detach and go around again. We have to
1237 : * use ExecHashTableDetachBatch() because there's a small
1238 : * chance we'll be the last to detach, and then we're
1239 : * responsible for freeing memory.
1240 : */
1241 2 : ExecParallelHashTableSetCurrentBatch(hashtable, batchno);
1242 2 : hashtable->batches[batchno].done = true;
1243 2 : ExecHashTableDetachBatch(hashtable);
1244 2 : break;
1245 :
1246 447 : case PHJ_BATCH_FREE:
1247 ECB :
1248 : /*
1249 : * Already done. Detach and go around again (if any
1250 : * remain).
1251 : */
1252 GIC 447 : BarrierDetach(batch_barrier);
1253 447 : hashtable->batches[batchno].done = true;
1254 447 : hashtable->curbatch = -1;
1255 447 : break;
1256 :
1257 UIC 0 : default:
1258 0 : elog(ERROR, "unexpected batch phase %d",
1259 : BarrierPhase(batch_barrier));
1260 : }
1261 : }
1262 GIC 1300 : batchno = (batchno + 1) % hashtable->nbatch;
1263 CBC 1300 : } while (batchno != start_batchno);
1264 ECB :
1265 GIC 189 : return false;
1266 ECB : }
1267 :
1268 : /*
1269 : * ExecHashJoinSaveTuple
1270 : * save a tuple to a batch file.
1271 : *
1272 : * The data recorded in the file for each tuple is its hash value,
1273 : * then the tuple in MinimalTuple format.
1274 : *
1275 : * Note: it is important always to call this in the regular executor
1276 : * context, not in a shorter-lived context; else the temp file buffers
1277 : * will get messed up.
1278 : */
1279 : void
1280 GIC 2403697 : ExecHashJoinSaveTuple(MinimalTuple tuple, uint32 hashvalue,
1281 ECB : BufFile **fileptr)
1282 : {
1283 CBC 2403697 : BufFile *file = *fileptr;
1284 ECB :
1285 GIC 2403697 : if (file == NULL)
1286 ECB : {
1287 : /* First write to this batch file, so open it. */
1288 GIC 1504 : file = BufFileCreateTemp(false);
1289 1504 : *fileptr = file;
1290 : }
1291 :
1292 GNC 2403697 : BufFileWrite(file, &hashvalue, sizeof(uint32));
1293 2403697 : BufFileWrite(file, tuple, tuple->t_len);
1294 CBC 2403697 : }
1295 ECB :
1296 : /*
1297 EUB : * ExecHashJoinGetSavedTuple
1298 : * read the next tuple from a batch file. Return NULL if no more.
1299 : *
1300 : * On success, *hashvalue is set to the tuple's hash value, and the tuple
1301 : * itself is stored in the given slot.
1302 ECB : */
1303 : static TupleTableSlot *
1304 GIC 2405201 : ExecHashJoinGetSavedTuple(HashJoinState *hjstate,
1305 ECB : BufFile *file,
1306 : uint32 *hashvalue,
1307 : TupleTableSlot *tupleSlot)
1308 : {
1309 : uint32 header[2];
1310 : size_t nread;
1311 : MinimalTuple tuple;
1312 :
1313 : /*
1314 : * We check for interrupts here because this is typically taken as an
1315 : * alternative code path to an ExecProcNode() call, which would include
1316 : * such a check.
1317 : */
1318 GIC 2405201 : CHECK_FOR_INTERRUPTS();
1319 :
1320 ECB : /*
1321 : * Since both the hash value and the MinimalTuple length word are uint32,
1322 : * we can read them both in one BufFileRead() call without any type
1323 : * cheating.
1324 : */
1325 GNC 2405201 : nread = BufFileReadMaybeEOF(file, header, sizeof(header), true);
1326 GIC 2405201 : if (nread == 0) /* end of file */
1327 : {
1328 CBC 1504 : ExecClearTuple(tupleSlot);
1329 1504 : return NULL;
1330 : }
1331 GIC 2403697 : *hashvalue = header[0];
1332 2403697 : tuple = (MinimalTuple) palloc(header[1]);
1333 2403697 : tuple->t_len = header[1];
1334 GNC 2403697 : BufFileReadExact(file,
1335 : (char *) tuple + sizeof(uint32),
1336 2403697 : header[1] - sizeof(uint32));
1337 GIC 2403697 : ExecForceStoreMinimalTuple(tuple, tupleSlot, true);
1338 2403697 : return tupleSlot;
1339 : }
1340 :
1341 :
1342 : void
1343 1143 : ExecReScanHashJoin(HashJoinState *node)
1344 : {
1345 GNC 1143 : PlanState *outerPlan = outerPlanState(node);
1346 1143 : PlanState *innerPlan = innerPlanState(node);
1347 :
1348 : /*
1349 : * In a multi-batch join, we currently have to do rescans the hard way,
1350 : * primarily because batch temp files may have already been released. But
1351 ECB : * if it's a single-batch join, and there is no parameter change for the
1352 : * inner subnode, then we can just re-use the existing hash table without
1353 : * rebuilding it.
1354 : */
1355 GIC 1143 : if (node->hj_HashTable != NULL)
1356 : {
1357 662 : if (node->hj_HashTable->nbatch == 1 &&
1358 GNC 662 : innerPlan->chgParam == NULL)
1359 ECB : {
1360 : /*
1361 : * Okay to reuse the hash table; needn't rescan inner, either.
1362 : *
1363 : * However, if it's a right/right-anti/full join, we'd better
1364 : * reset the inner-tuple match flags contained in the table.
1365 : */
1366 CBC 251 : if (HJ_FILL_INNER(node))
1367 3 : ExecHashTableResetMatchFlags(node->hj_HashTable);
1368 :
1369 ECB : /*
1370 : * Also, we need to reset our state about the emptiness of the
1371 : * outer relation, so that the new scan of the outer will update
1372 : * it correctly if it turns out to be empty this time. (There's no
1373 : * harm in clearing it now because ExecHashJoin won't need the
1374 : * info. In the other cases, where the hash table doesn't exist
1375 : * or we are destroying it, we leave this state alone because
1376 : * ExecHashJoin will need it the first time through.)
1377 : */
1378 CBC 251 : node->hj_OuterNotEmpty = false;
1379 ECB :
1380 : /* ExecHashJoin can skip the BUILD_HASHTABLE step */
1381 GIC 251 : node->hj_JoinState = HJ_NEED_NEW_OUTER;
1382 : }
1383 : else
1384 : {
1385 : /* must destroy and rebuild hash table */
1386 GNC 411 : HashState *hashNode = castNode(HashState, innerPlan);
1387 :
1388 CBC 411 : Assert(hashNode->hashtable == node->hj_HashTable);
1389 : /* accumulate stats from old hash table, if wanted */
1390 ECB : /* (this should match ExecShutdownHash) */
1391 CBC 411 : if (hashNode->ps.instrument && !hashNode->hinstrument)
1392 UIC 0 : hashNode->hinstrument = (HashInstrumentation *)
1393 0 : palloc0(sizeof(HashInstrumentation));
1394 GIC 411 : if (hashNode->hinstrument)
1395 UIC 0 : ExecHashAccumInstrumentation(hashNode->hinstrument,
1396 : hashNode->hashtable);
1397 : /* for safety, be sure to clear child plan node's pointer too */
1398 GIC 411 : hashNode->hashtable = NULL;
1399 ECB :
1400 CBC 411 : ExecHashTableDestroy(node->hj_HashTable);
1401 GIC 411 : node->hj_HashTable = NULL;
1402 411 : node->hj_JoinState = HJ_BUILD_HASHTABLE;
1403 :
1404 : /*
1405 : * if chgParam of subnode is not null then plan will be re-scanned
1406 : * by first ExecProcNode.
1407 : */
1408 GNC 411 : if (innerPlan->chgParam == NULL)
1409 UNC 0 : ExecReScan(innerPlan);
1410 : }
1411 ECB : }
1412 :
1413 : /* Always reset intra-tuple state */
1414 CBC 1143 : node->hj_CurHashValue = 0;
1415 GIC 1143 : node->hj_CurBucketNo = 0;
1416 1143 : node->hj_CurSkewBucketNo = INVALID_SKEW_BUCKET_NO;
1417 1143 : node->hj_CurTuple = NULL;
1418 :
1419 CBC 1143 : node->hj_MatchedOuter = false;
1420 GIC 1143 : node->hj_FirstOuterTupleSlot = NULL;
1421 ECB :
1422 : /*
1423 : * if chgParam of subnode is not null then plan will be re-scanned by
1424 : * first ExecProcNode.
1425 EUB : */
1426 GNC 1143 : if (outerPlan->chgParam == NULL)
1427 826 : ExecReScan(outerPlan);
1428 GBC 1143 : }
1429 :
1430 : void
1431 CBC 12754 : ExecShutdownHashJoin(HashJoinState *node)
1432 : {
1433 12754 : if (node->hj_HashTable)
1434 ECB : {
1435 : /*
1436 : * Detach from shared state before DSM memory goes away. This makes
1437 : * sure that we don't have any pointers into DSM memory by the time
1438 : * ExecEndHashJoin runs.
1439 : */
1440 GIC 9424 : ExecHashTableDetachBatch(node->hj_HashTable);
1441 CBC 9424 : ExecHashTableDetach(node->hj_HashTable);
1442 EUB : }
1443 GIC 12754 : }
1444 :
1445 : static void
1446 87 : ExecParallelHashJoinPartitionOuter(HashJoinState *hjstate)
1447 ECB : {
1448 CBC 87 : PlanState *outerState = outerPlanState(hjstate);
1449 87 : ExprContext *econtext = hjstate->js.ps.ps_ExprContext;
1450 87 : HashJoinTable hashtable = hjstate->hj_HashTable;
1451 : TupleTableSlot *slot;
1452 ECB : uint32 hashvalue;
1453 : int i;
1454 :
1455 GIC 87 : Assert(hjstate->hj_FirstOuterTupleSlot == NULL);
1456 :
1457 : /* Execute outer plan, writing all tuples to shared tuplestores. */
1458 : for (;;)
1459 ECB : {
1460 CBC 600099 : slot = ExecProcNode(outerState);
1461 600099 : if (TupIsNull(slot))
1462 : break;
1463 GIC 600012 : econtext->ecxt_outertuple = slot;
1464 CBC 600012 : if (ExecHashGetHashValue(hashtable, econtext,
1465 : hjstate->hj_OuterHashKeys,
1466 ECB : true, /* outer tuple */
1467 GIC 600012 : HJ_FILL_OUTER(hjstate),
1468 : &hashvalue))
1469 : {
1470 : int batchno;
1471 : int bucketno;
1472 : bool shouldFree;
1473 CBC 600012 : MinimalTuple mintup = ExecFetchSlotMinimalTuple(slot, &shouldFree);
1474 ECB :
1475 GIC 600012 : ExecHashGetBucketAndBatch(hashtable, hashvalue, &bucketno,
1476 ECB : &batchno);
1477 GIC 600012 : sts_puttuple(hashtable->batches[batchno].outer_tuples,
1478 : &hashvalue, mintup);
1479 ECB :
1480 GIC 600012 : if (shouldFree)
1481 CBC 600012 : heap_free_minimal_tuple(mintup);
1482 ECB : }
1483 CBC 600012 : CHECK_FOR_INTERRUPTS();
1484 : }
1485 :
1486 : /* Make sure all outer partitions are readable by any backend. */
1487 GIC 811 : for (i = 0; i < hashtable->nbatch; ++i)
1488 CBC 724 : sts_end_write(hashtable->batches[i].outer_tuples);
1489 GIC 87 : }
1490 :
1491 : void
1492 57 : ExecHashJoinEstimate(HashJoinState *state, ParallelContext *pcxt)
1493 ECB : {
1494 CBC 57 : shm_toc_estimate_chunk(&pcxt->estimator, sizeof(ParallelHashJoinState));
1495 GIC 57 : shm_toc_estimate_keys(&pcxt->estimator, 1);
1496 CBC 57 : }
1497 ECB :
1498 : void
1499 GIC 57 : ExecHashJoinInitializeDSM(HashJoinState *state, ParallelContext *pcxt)
1500 ECB : {
1501 GIC 57 : int plan_node_id = state->js.ps.plan->plan_node_id;
1502 : HashState *hashNode;
1503 : ParallelHashJoinState *pstate;
1504 :
1505 : /*
1506 ECB : * Disable shared hash table mode if we failed to create a real DSM
1507 : * segment, because that means that we don't have a DSA area to work with.
1508 : */
1509 GIC 57 : if (pcxt->seg == NULL)
1510 LBC 0 : return;
1511 :
1512 GIC 57 : ExecSetExecProcNode(&state->js.ps, ExecParallelHashJoin);
1513 ECB :
1514 : /*
1515 : * Set up the state needed to coordinate access to the shared hash
1516 : * table(s), using the plan node ID as the toc key.
1517 : */
1518 GIC 57 : pstate = shm_toc_allocate(pcxt->toc, sizeof(ParallelHashJoinState));
1519 57 : shm_toc_insert(pcxt->toc, plan_node_id, pstate);
1520 ECB :
1521 : /*
1522 : * Set up the shared hash join state with no batches initially.
1523 : * ExecHashTableCreate() will prepare at least one later and set nbatch
1524 : * and space_allowed.
1525 : */
1526 GIC 57 : pstate->nbatch = 0;
1527 CBC 57 : pstate->space_allowed = 0;
1528 57 : pstate->batches = InvalidDsaPointer;
1529 57 : pstate->old_batches = InvalidDsaPointer;
1530 GIC 57 : pstate->nbuckets = 0;
1531 57 : pstate->growth = PHJ_GROWTH_OK;
1532 CBC 57 : pstate->chunk_work_queue = InvalidDsaPointer;
1533 GIC 57 : pg_atomic_init_u32(&pstate->distributor, 0);
1534 CBC 57 : pstate->nparticipants = pcxt->nworkers + 1;
1535 GIC 57 : pstate->total_tuples = 0;
1536 57 : LWLockInitialize(&pstate->lock,
1537 : LWTRANCHE_PARALLEL_HASH_JOIN);
1538 57 : BarrierInit(&pstate->build_barrier, 0);
1539 57 : BarrierInit(&pstate->grow_batches_barrier, 0);
1540 57 : BarrierInit(&pstate->grow_buckets_barrier, 0);
1541 :
1542 ECB : /* Set up the space we'll use for shared temporary files. */
1543 GBC 57 : SharedFileSetInit(&pstate->fileset, pcxt->seg);
1544 :
1545 ECB : /* Initialize the shared state in the hash node. */
1546 GIC 57 : hashNode = (HashState *) innerPlanState(state);
1547 57 : hashNode->parallel_state = pstate;
1548 : }
1549 :
1550 : /* ----------------------------------------------------------------
1551 ECB : * ExecHashJoinReInitializeDSM
1552 : *
1553 : * Reset shared state before beginning a fresh scan.
1554 : * ----------------------------------------------------------------
1555 : */
1556 : void
1557 GNC 24 : ExecHashJoinReInitializeDSM(HashJoinState *state, ParallelContext *pcxt)
1558 : {
1559 CBC 24 : int plan_node_id = state->js.ps.plan->plan_node_id;
1560 ECB : ParallelHashJoinState *pstate =
1561 GNC 24 : shm_toc_lookup(pcxt->toc, plan_node_id, false);
1562 ECB :
1563 : /*
1564 : * It would be possible to reuse the shared hash table in single-batch
1565 : * cases by resetting and then fast-forwarding build_barrier to
1566 : * PHJ_BUILD_FREE and batch 0's batch_barrier to PHJ_BATCH_PROBE, but
1567 : * currently shared hash tables are already freed by now (by the last
1568 : * participant to detach from the batch). We could consider keeping it
1569 : * around for single-batch joins. We'd also need to adjust
1570 : * finalize_plan() so that it doesn't record a dummy dependency for
1571 : * Parallel Hash nodes, preventing the rescan optimization. For now we
1572 : * don't try.
1573 : */
1574 :
1575 : /* Detach, freeing any remaining shared memory. */
1576 CBC 24 : if (state->hj_HashTable != NULL)
1577 : {
1578 UIC 0 : ExecHashTableDetachBatch(state->hj_HashTable);
1579 LBC 0 : ExecHashTableDetach(state->hj_HashTable);
1580 ECB : }
1581 :
1582 : /* Clear any shared batch files. */
1583 GIC 24 : SharedFileSetDeleteAll(&pstate->fileset);
1584 :
1585 : /* Reset build_barrier to PHJ_BUILD_ELECT so we can go around again. */
1586 24 : BarrierInit(&pstate->build_barrier, 0);
1587 24 : }
1588 :
1589 : void
1590 CBC 147 : ExecHashJoinInitializeWorker(HashJoinState *state,
1591 : ParallelWorkerContext *pwcxt)
1592 ECB : {
1593 : HashState *hashNode;
1594 CBC 147 : int plan_node_id = state->js.ps.plan->plan_node_id;
1595 : ParallelHashJoinState *pstate =
1596 GIC 147 : shm_toc_lookup(pwcxt->toc, plan_node_id, false);
1597 :
1598 : /* Attach to the space for shared temporary files. */
1599 147 : SharedFileSetAttach(&pstate->fileset, pwcxt->seg);
1600 :
1601 : /* Attach to the shared state in the hash node. */
1602 147 : hashNode = (HashState *) innerPlanState(state);
1603 147 : hashNode->parallel_state = pstate;
1604 :
1605 147 : ExecSetExecProcNode(&state->js.ps, ExecParallelHashJoin);
1606 147 : }
|