Age Owner Branch data TLA Line data Source code
1 : : /*-------------------------------------------------------------------------
2 : : *
3 : : * nodeHashjoin.c
4 : : * Routines to handle hash join nodes
5 : : *
6 : : * Portions Copyright (c) 1996-2024, PostgreSQL Global Development Group
7 : : * Portions Copyright (c) 1994, Regents of the University of California
8 : : *
9 : : *
10 : : * IDENTIFICATION
11 : : * src/backend/executor/nodeHashjoin.c
12 : : *
13 : : * HASH JOIN
14 : : *
15 : : * This is based on the "hybrid hash join" algorithm described shortly in the
16 : : * following page
17 : : *
18 : : * https://en.wikipedia.org/wiki/Hash_join#Hybrid_hash_join
19 : : *
20 : : * and in detail in the referenced paper:
21 : : *
22 : : * "An Adaptive Hash Join Algorithm for Multiuser Environments"
23 : : * Hansjörg Zeller; Jim Gray (1990). Proceedings of the 16th VLDB conference.
24 : : * Brisbane: 186–197.
25 : : *
26 : : * If the inner side tuples of a hash join do not fit in memory, the hash join
27 : : * can be executed in multiple batches.
28 : : *
29 : : * If the statistics on the inner side relation are accurate, planner chooses a
30 : : * multi-batch strategy and estimates the number of batches.
31 : : *
32 : : * The query executor measures the real size of the hashtable and increases the
33 : : * number of batches if the hashtable grows too large.
34 : : *
35 : : * The number of batches is always a power of two, so an increase in the number
36 : : * of batches doubles it.
37 : : *
38 : : * Serial hash join measures batch size lazily -- waiting until it is loading a
39 : : * batch to determine if it will fit in memory. While inserting tuples into the
40 : : * hashtable, serial hash join will, if that tuple were to exceed work_mem,
41 : : * dump out the hashtable and reassign them either to other batch files or the
42 : : * current batch resident in the hashtable.
43 : : *
44 : : * Parallel hash join, on the other hand, completes all changes to the number
45 : : * of batches during the build phase. If it increases the number of batches, it
46 : : * dumps out all the tuples from all batches and reassigns them to entirely new
47 : : * batch files. Then it checks every batch to ensure it will fit in the space
48 : : * budget for the query.
49 : : *
50 : : * In both parallel and serial hash join, the executor currently makes a best
51 : : * effort. If a particular batch will not fit in memory, it tries doubling the
52 : : * number of batches. If after a batch increase, there is a batch which
53 : : * retained all or none of its tuples, the executor disables growth in the
54 : : * number of batches globally. After growth is disabled, all batches that would
55 : : * have previously triggered an increase in the number of batches instead
56 : : * exceed the space allowed.
57 : : *
58 : : * PARALLELISM
59 : : *
60 : : * Hash joins can participate in parallel query execution in several ways. A
61 : : * parallel-oblivious hash join is one where the node is unaware that it is
62 : : * part of a parallel plan. In this case, a copy of the inner plan is used to
63 : : * build a copy of the hash table in every backend, and the outer plan could
64 : : * either be built from a partial or complete path, so that the results of the
65 : : * hash join are correspondingly either partial or complete. A parallel-aware
66 : : * hash join is one that behaves differently, coordinating work between
67 : : * backends, and appears as Parallel Hash Join in EXPLAIN output. A Parallel
68 : : * Hash Join always appears with a Parallel Hash node.
69 : : *
70 : : * Parallel-aware hash joins use the same per-backend state machine to track
71 : : * progress through the hash join algorithm as parallel-oblivious hash joins.
72 : : * In a parallel-aware hash join, there is also a shared state machine that
73 : : * co-operating backends use to synchronize their local state machines and
74 : : * program counters. The shared state machine is managed with a Barrier IPC
75 : : * primitive. When all attached participants arrive at a barrier, the phase
76 : : * advances and all waiting participants are released.
77 : : *
78 : : * When a participant begins working on a parallel hash join, it must first
79 : : * figure out how much progress has already been made, because participants
80 : : * don't wait for each other to begin. For this reason there are switch
81 : : * statements at key points in the code where we have to synchronize our local
82 : : * state machine with the phase, and then jump to the correct part of the
83 : : * algorithm so that we can get started.
84 : : *
85 : : * One barrier called build_barrier is used to coordinate the hashing phases.
86 : : * The phase is represented by an integer which begins at zero and increments
87 : : * one by one, but in the code it is referred to by symbolic names as follows.
88 : : * An asterisk indicates a phase that is performed by a single arbitrarily
89 : : * chosen process.
90 : : *
91 : : * PHJ_BUILD_ELECT -- initial state
92 : : * PHJ_BUILD_ALLOCATE* -- one sets up the batches and table 0
93 : : * PHJ_BUILD_HASH_INNER -- all hash the inner rel
94 : : * PHJ_BUILD_HASH_OUTER -- (multi-batch only) all hash the outer
95 : : * PHJ_BUILD_RUN -- building done, probing can begin
96 : : * PHJ_BUILD_FREE* -- all work complete, one frees batches
97 : : *
98 : : * While in the phase PHJ_BUILD_HASH_INNER a separate pair of barriers may
99 : : * be used repeatedly as required to coordinate expansions in the number of
100 : : * batches or buckets. Their phases are as follows:
101 : : *
102 : : * PHJ_GROW_BATCHES_ELECT -- initial state
103 : : * PHJ_GROW_BATCHES_REALLOCATE* -- one allocates new batches
104 : : * PHJ_GROW_BATCHES_REPARTITION -- all repartition
105 : : * PHJ_GROW_BATCHES_DECIDE* -- one detects skew and cleans up
106 : : * PHJ_GROW_BATCHES_FINISH -- finished one growth cycle
107 : : *
108 : : * PHJ_GROW_BUCKETS_ELECT -- initial state
109 : : * PHJ_GROW_BUCKETS_REALLOCATE* -- one allocates new buckets
110 : : * PHJ_GROW_BUCKETS_REINSERT -- all insert tuples
111 : : *
112 : : * If the planner got the number of batches and buckets right, those won't be
113 : : * necessary, but on the other hand we might finish up needing to expand the
114 : : * buckets or batches multiple times while hashing the inner relation to stay
115 : : * within our memory budget and load factor target. For that reason it's a
116 : : * separate pair of barriers using circular phases.
117 : : *
118 : : * The PHJ_BUILD_HASH_OUTER phase is required only for multi-batch joins,
119 : : * because we need to divide the outer relation into batches up front in order
120 : : * to be able to process batches entirely independently. In contrast, the
121 : : * parallel-oblivious algorithm simply throws tuples 'forward' to 'later'
122 : : * batches whenever it encounters them while scanning and probing, which it
123 : : * can do because it processes batches in serial order.
124 : : *
125 : : * Once PHJ_BUILD_RUN is reached, backends then split up and process
126 : : * different batches, or gang up and work together on probing batches if there
127 : : * aren't enough to go around. For each batch there is a separate barrier
128 : : * with the following phases:
129 : : *
130 : : * PHJ_BATCH_ELECT -- initial state
131 : : * PHJ_BATCH_ALLOCATE* -- one allocates buckets
132 : : * PHJ_BATCH_LOAD -- all load the hash table from disk
133 : : * PHJ_BATCH_PROBE -- all probe
134 : : * PHJ_BATCH_SCAN* -- one does right/right-anti/full unmatched scan
135 : : * PHJ_BATCH_FREE* -- one frees memory
136 : : *
137 : : * Batch 0 is a special case, because it starts out in phase
138 : : * PHJ_BATCH_PROBE; populating batch 0's hash table is done during
139 : : * PHJ_BUILD_HASH_INNER so we can skip loading.
140 : : *
141 : : * Initially we try to plan for a single-batch hash join using the combined
142 : : * hash_mem of all participants to create a large shared hash table. If that
143 : : * turns out either at planning or execution time to be impossible then we
144 : : * fall back to regular hash_mem sized hash tables.
145 : : *
146 : : * To avoid deadlocks, we never wait for any barrier unless it is known that
147 : : * all other backends attached to it are actively executing the node or have
148 : : * finished. Practically, that means that we never emit a tuple while attached
149 : : * to a barrier, unless the barrier has reached a phase that means that no
150 : : * process will wait on it again. We emit tuples while attached to the build
151 : : * barrier in phase PHJ_BUILD_RUN, and to a per-batch barrier in phase
152 : : * PHJ_BATCH_PROBE. These are advanced to PHJ_BUILD_FREE and PHJ_BATCH_SCAN
153 : : * respectively without waiting, using BarrierArriveAndDetach() and
154 : : * BarrierArriveAndDetachExceptLast() respectively. The last to detach
155 : : * receives a different return value so that it knows that it's safe to
156 : : * clean up. Any straggler process that attaches after that phase is reached
157 : : * will see that it's too late to participate or access the relevant shared
158 : : * memory objects.
159 : : *
160 : : *-------------------------------------------------------------------------
161 : : */
162 : :
163 : : #include "postgres.h"
164 : :
165 : : #include "access/htup_details.h"
166 : : #include "access/parallel.h"
167 : : #include "executor/executor.h"
168 : : #include "executor/hashjoin.h"
169 : : #include "executor/nodeHash.h"
170 : : #include "executor/nodeHashjoin.h"
171 : : #include "miscadmin.h"
172 : : #include "utils/sharedtuplestore.h"
173 : : #include "utils/wait_event.h"
174 : :
175 : :
176 : : /*
177 : : * States of the ExecHashJoin state machine
178 : : */
179 : : #define HJ_BUILD_HASHTABLE 1
180 : : #define HJ_NEED_NEW_OUTER 2
181 : : #define HJ_SCAN_BUCKET 3
182 : : #define HJ_FILL_OUTER_TUPLE 4
183 : : #define HJ_FILL_INNER_TUPLES 5
184 : : #define HJ_NEED_NEW_BATCH 6
185 : :
186 : : /* Returns true if doing null-fill on outer relation */
187 : : #define HJ_FILL_OUTER(hjstate) ((hjstate)->hj_NullInnerTupleSlot != NULL)
188 : : /* Returns true if doing null-fill on inner relation */
189 : : #define HJ_FILL_INNER(hjstate) ((hjstate)->hj_NullOuterTupleSlot != NULL)
190 : :
191 : : static TupleTableSlot *ExecHashJoinOuterGetTuple(PlanState *outerNode,
192 : : HashJoinState *hjstate,
193 : : uint32 *hashvalue);
194 : : static TupleTableSlot *ExecParallelHashJoinOuterGetTuple(PlanState *outerNode,
195 : : HashJoinState *hjstate,
196 : : uint32 *hashvalue);
197 : : static TupleTableSlot *ExecHashJoinGetSavedTuple(HashJoinState *hjstate,
198 : : BufFile *file,
199 : : uint32 *hashvalue,
200 : : TupleTableSlot *tupleSlot);
201 : : static bool ExecHashJoinNewBatch(HashJoinState *hjstate);
202 : : static bool ExecParallelHashJoinNewBatch(HashJoinState *hjstate);
203 : : static void ExecParallelHashJoinPartitionOuter(HashJoinState *hjstate);
204 : :
205 : :
206 : : /* ----------------------------------------------------------------
207 : : * ExecHashJoinImpl
208 : : *
209 : : * This function implements the Hybrid Hashjoin algorithm. It is marked
210 : : * with an always-inline attribute so that ExecHashJoin() and
211 : : * ExecParallelHashJoin() can inline it. Compilers that respect the
212 : : * attribute should create versions specialized for parallel == true and
213 : : * parallel == false with unnecessary branches removed.
214 : : *
215 : : * Note: the relation we build hash table on is the "inner"
216 : : * the other one is "outer".
217 : : * ----------------------------------------------------------------
218 : : */
219 : : static pg_attribute_always_inline TupleTableSlot *
2307 andres@anarazel.de 220 :CBC 4615208 : ExecHashJoinImpl(PlanState *pstate, bool parallel)
221 : : {
2463 222 : 4615208 : HashJoinState *node = castNode(HashJoinState, pstate);
223 : : PlanState *outerNode;
224 : : HashState *hashNode;
225 : : ExprState *joinqual;
226 : : ExprState *otherqual;
227 : : ExprContext *econtext;
228 : : HashJoinTable hashtable;
229 : : TupleTableSlot *outerTupleSlot;
230 : : uint32 hashvalue;
231 : : int batchno;
232 : : ParallelHashJoinState *parallel_state;
233 : :
234 : : /*
235 : : * get information from HashJoin node
236 : : */
7801 tgl@sss.pgh.pa.us 237 : 4615208 : joinqual = node->js.joinqual;
238 : 4615208 : otherqual = node->js.ps.qual;
239 : 4615208 : hashNode = (HashState *) innerPlanState(node);
240 : 4615208 : outerNode = outerPlanState(node);
241 : 4615208 : hashtable = node->hj_HashTable;
242 : 4615208 : econtext = node->js.ps.ps_ExprContext;
2307 andres@anarazel.de 243 : 4615208 : parallel_state = hashNode->parallel_state;
244 : :
245 : : /*
246 : : * Reset per-tuple memory context to free any expression evaluation
247 : : * storage allocated in the previous tuple cycle.
248 : : */
8634 tgl@sss.pgh.pa.us 249 : 4615208 : ResetExprContext(econtext);
250 : :
251 : : /*
252 : : * run the hash join state machine
253 : : */
254 : : for (;;)
255 : : {
256 : : /*
257 : : * It's possible to iterate this loop many times before returning a
258 : : * tuple, in some pathological cases such as needing to move much of
259 : : * the current batch to a later batch. So let's check for interrupts
260 : : * each time through.
261 : : */
2455 andres@anarazel.de 262 [ + + ]: 17739120 : CHECK_FOR_INTERRUPTS();
263 : :
4854 tgl@sss.pgh.pa.us 264 [ + + + + : 17739120 : switch (node->hj_JoinState)
+ + - ]
265 : : {
266 : 11941 : case HJ_BUILD_HASHTABLE:
267 : :
268 : : /*
269 : : * First time through: build hash table for inner relation.
270 : : */
271 [ - + ]: 11941 : Assert(hashtable == NULL);
272 : :
273 : : /*
274 : : * If the outer relation is completely empty, and it's not
275 : : * right/right-anti/full join, we can quit without building
276 : : * the hash table. However, for an inner join it is only a
277 : : * win to check this when the outer relation's startup cost is
278 : : * less than the projected cost of building the hash table.
279 : : * Otherwise it's best to build the hash table first and see
280 : : * if the inner relation is empty. (When it's a left join, we
281 : : * should always make this check, since we aren't going to be
282 : : * able to skip the join on the strength of an empty inner
283 : : * relation anyway.)
284 : : *
285 : : * If we are rescanning the join, we make use of information
286 : : * gained on the previous scan: don't bother to try the
287 : : * prefetch if the previous scan found the outer relation
288 : : * nonempty. This is not 100% reliable since with new
289 : : * parameters the outer relation might yield different
290 : : * results, but it's a good heuristic.
291 : : *
292 : : * The only way to make the check is to try to fetch a tuple
293 : : * from the outer plan node. If we succeed, we have to stash
294 : : * it away for later consumption by ExecHashJoinOuterGetTuple.
295 : : */
296 [ + + ]: 11941 : if (HJ_FILL_INNER(node))
297 : : {
298 : : /* no chance to not build the hash table */
299 : 2694 : node->hj_FirstOuterTupleSlot = NULL;
300 : : }
2307 andres@anarazel.de 301 [ + + ]: 9247 : else if (parallel)
302 : : {
303 : : /*
304 : : * The empty-outer optimization is not implemented for
305 : : * shared hash tables, because no one participant can
306 : : * determine that there are no outer tuples, and it's not
307 : : * yet clear that it's worth the synchronization overhead
308 : : * of reaching consensus to figure that out. So we have
309 : : * to build the hash table.
310 : : */
311 : 162 : node->hj_FirstOuterTupleSlot = NULL;
312 : : }
4854 tgl@sss.pgh.pa.us 313 [ + + ]: 9085 : else if (HJ_FILL_OUTER(node) ||
314 [ + + ]: 6655 : (outerNode->plan->startup_cost < hashNode->ps.plan->total_cost &&
315 [ + + ]: 6087 : !node->hj_OuterNotEmpty))
316 : : {
317 : 8067 : node->hj_FirstOuterTupleSlot = ExecProcNode(outerNode);
318 [ + + + + ]: 8067 : if (TupIsNull(node->hj_FirstOuterTupleSlot))
319 : : {
320 : 1538 : node->hj_OuterNotEmpty = false;
321 : 1538 : return NULL;
322 : : }
323 : : else
324 : 6529 : node->hj_OuterNotEmpty = true;
325 : : }
326 : : else
327 : 1018 : node->hj_FirstOuterTupleSlot = NULL;
328 : :
329 : : /*
330 : : * Create the hash table. If using Parallel Hash, then
331 : : * whoever gets here first will create the hash table and any
332 : : * later arrivals will merely attach to it.
333 : : */
2307 andres@anarazel.de 334 : 10403 : hashtable = ExecHashTableCreate(hashNode,
335 : : node->hj_HashOperators,
336 : : node->hj_Collations,
4854 tgl@sss.pgh.pa.us 337 : 10403 : HJ_FILL_INNER(node));
338 : 10403 : node->hj_HashTable = hashtable;
339 : :
340 : : /*
341 : : * Execute the Hash node, to build the hash table. If using
342 : : * Parallel Hash, then we'll try to help hashing unless we
343 : : * arrived too late.
344 : : */
345 : 10403 : hashNode->hashtable = hashtable;
346 : 10403 : (void) MultiExecProcNode((PlanState *) hashNode);
347 : :
348 : : /*
349 : : * If the inner relation is completely empty, and we're not
350 : : * doing a left outer join, we can quit without scanning the
351 : : * outer relation.
352 : : */
353 [ + + + + ]: 10401 : if (hashtable->totalTuples == 0 && !HJ_FILL_OUTER(node))
354 : : {
390 tmunro@postgresql.or 355 [ - + ]: 680 : if (parallel)
356 : : {
357 : : /*
358 : : * Advance the build barrier to PHJ_BUILD_RUN before
359 : : * proceeding so we can negotiate resource cleanup.
360 : : */
390 tmunro@postgresql.or 361 :UBC 0 : Barrier *build_barrier = ¶llel_state->build_barrier;
362 : :
388 363 [ # # ]: 0 : while (BarrierPhase(build_barrier) < PHJ_BUILD_RUN)
390 364 : 0 : BarrierArriveAndWait(build_barrier, 0);
365 : : }
4854 tgl@sss.pgh.pa.us 366 :CBC 680 : return NULL;
367 : : }
368 : :
369 : : /*
370 : : * need to remember whether nbatch has increased since we
371 : : * began scanning the outer relation
372 : : */
373 : 9721 : hashtable->nbatch_outstart = hashtable->nbatch;
374 : :
375 : : /*
376 : : * Reset OuterNotEmpty for scan. (It's OK if we fetched a
377 : : * tuple above, because ExecHashJoinOuterGetTuple will
378 : : * immediately set it again.)
379 : : */
380 : 9721 : node->hj_OuterNotEmpty = false;
381 : :
2307 andres@anarazel.de 382 [ + + ]: 9721 : if (parallel)
383 : 198 : {
384 : : Barrier *build_barrier;
385 : :
386 : 198 : build_barrier = ¶llel_state->build_barrier;
388 tmunro@postgresql.or 387 [ + + - + : 198 : Assert(BarrierPhase(build_barrier) == PHJ_BUILD_HASH_OUTER ||
- - ]
388 : : BarrierPhase(build_barrier) == PHJ_BUILD_RUN ||
389 : : BarrierPhase(build_barrier) == PHJ_BUILD_FREE);
390 [ + + ]: 198 : if (BarrierPhase(build_barrier) == PHJ_BUILD_HASH_OUTER)
391 : : {
392 : : /*
393 : : * If multi-batch, we need to hash the outer relation
394 : : * up front.
395 : : */
2307 andres@anarazel.de 396 [ + + ]: 177 : if (hashtable->nbatch > 1)
397 : 93 : ExecParallelHashJoinPartitionOuter(node);
398 : 177 : BarrierArriveAndWait(build_barrier,
399 : : WAIT_EVENT_HASH_BUILD_HASH_OUTER);
400 : : }
388 tmunro@postgresql.or 401 [ - + ]: 21 : else if (BarrierPhase(build_barrier) == PHJ_BUILD_FREE)
402 : : {
403 : : /*
404 : : * If we attached so late that the job is finished and
405 : : * the batch state has been freed, we can return
406 : : * immediately.
407 : : */
390 tmunro@postgresql.or 408 :UBC 0 : return NULL;
409 : : }
410 : :
411 : : /* Each backend should now select a batch to work on. */
388 tmunro@postgresql.or 412 [ - + ]:CBC 198 : Assert(BarrierPhase(build_barrier) == PHJ_BUILD_RUN);
2307 andres@anarazel.de 413 : 198 : hashtable->curbatch = -1;
414 : 198 : node->hj_JoinState = HJ_NEED_NEW_BATCH;
415 : :
416 : 198 : continue;
417 : : }
418 : : else
419 : 9523 : node->hj_JoinState = HJ_NEED_NEW_OUTER;
420 : :
421 : : /* FALL THRU */
422 : :
4854 tgl@sss.pgh.pa.us 423 : 8431575 : case HJ_NEED_NEW_OUTER:
424 : :
425 : : /*
426 : : * We don't have an outer tuple, try to get the next one
427 : : */
2307 andres@anarazel.de 428 [ + + ]: 8431575 : if (parallel)
429 : : outerTupleSlot =
430 : 1080588 : ExecParallelHashJoinOuterGetTuple(outerNode, node,
431 : : &hashvalue);
432 : : else
433 : : outerTupleSlot =
434 : 7350987 : ExecHashJoinOuterGetTuple(outerNode, node, &hashvalue);
435 : :
4854 tgl@sss.pgh.pa.us 436 [ + + - + ]: 8431575 : if (TupIsNull(outerTupleSlot))
437 : : {
438 : : /* end of batch, or maybe whole join */
439 [ + + ]: 11205 : if (HJ_FILL_INNER(node))
440 : : {
441 : : /* set up to scan for unmatched inner tuples */
380 tmunro@postgresql.or 442 [ + + ]: 2571 : if (parallel)
443 : : {
444 : : /*
445 : : * Only one process is currently allow to handle
446 : : * each batch's unmatched tuples, in a parallel
447 : : * join.
448 : : */
449 [ + + ]: 54 : if (ExecParallelPrepHashTableForUnmatched(node))
450 : 33 : node->hj_JoinState = HJ_FILL_INNER_TUPLES;
451 : : else
452 : 21 : node->hj_JoinState = HJ_NEED_NEW_BATCH;
453 : : }
454 : : else
455 : : {
456 : 2517 : ExecPrepHashTableForUnmatched(node);
457 : 2517 : node->hj_JoinState = HJ_FILL_INNER_TUPLES;
458 : : }
459 : : }
460 : : else
4854 tgl@sss.pgh.pa.us 461 : 8634 : node->hj_JoinState = HJ_NEED_NEW_BATCH;
462 : 11205 : continue;
463 : : }
464 : :
465 : 8420370 : econtext->ecxt_outertuple = outerTupleSlot;
466 : 8420370 : node->hj_MatchedOuter = false;
467 : :
468 : : /*
469 : : * Find the corresponding bucket for this tuple in the main
470 : : * hash table or skew hash table.
471 : : */
472 : 8420370 : node->hj_CurHashValue = hashvalue;
473 : 8420370 : ExecHashGetBucketAndBatch(hashtable, hashvalue,
474 : : &node->hj_CurBucketNo, &batchno);
475 : 8420370 : node->hj_CurSkewBucketNo = ExecHashGetSkewBucket(hashtable,
476 : : hashvalue);
477 : 8420370 : node->hj_CurTuple = NULL;
478 : :
479 : : /*
480 : : * The tuple might not belong to the current batch (where
481 : : * "current batch" includes the skew buckets if any).
482 : : */
483 [ + + ]: 8420370 : if (batchno != hashtable->curbatch &&
484 [ + + ]: 735696 : node->hj_CurSkewBucketNo == INVALID_SKEW_BUCKET_NO)
485 : 735096 : {
486 : : bool shouldFree;
1977 andres@anarazel.de 487 : 735096 : MinimalTuple mintuple = ExecFetchSlotMinimalTuple(outerTupleSlot,
488 : : &shouldFree);
489 : :
490 : : /*
491 : : * Need to postpone this outer tuple to a later batch.
492 : : * Save it in the corresponding outer-batch file.
493 : : */
2307 494 [ - + ]: 735096 : Assert(parallel_state == NULL);
4854 tgl@sss.pgh.pa.us 495 [ - + ]: 735096 : Assert(batchno > hashtable->curbatch);
1977 andres@anarazel.de 496 : 735096 : ExecHashJoinSaveTuple(mintuple, hashvalue,
331 tomas.vondra@postgre 497 : 735096 : &hashtable->outerBatchFile[batchno],
498 : : hashtable);
499 : :
1977 andres@anarazel.de 500 [ + - ]: 735096 : if (shouldFree)
501 : 735096 : heap_free_minimal_tuple(mintuple);
502 : :
503 : : /* Loop around, staying in HJ_NEED_NEW_OUTER state */
4854 tgl@sss.pgh.pa.us 504 : 735096 : continue;
505 : : }
506 : :
507 : : /* OK, let's scan the bucket for matches */
508 : 7685274 : node->hj_JoinState = HJ_SCAN_BUCKET;
509 : :
510 : : /* FALL THRU */
511 : :
512 : 10799090 : case HJ_SCAN_BUCKET:
513 : :
514 : : /*
515 : : * Scan the selected hash bucket for matches to current outer
516 : : */
2307 andres@anarazel.de 517 [ + + ]: 10799090 : if (parallel)
518 : : {
519 [ + + ]: 2100027 : if (!ExecParallelScanHashBucket(node, econtext))
520 : : {
521 : : /* out of matches; check for possible outer-join fill */
522 : 1080015 : node->hj_JoinState = HJ_FILL_OUTER_TUPLE;
523 : 1080015 : continue;
524 : : }
525 : : }
526 : : else
527 : : {
528 [ + + ]: 8699063 : if (!ExecScanHashBucket(node, econtext))
529 : : {
530 : : /* out of matches; check for possible outer-join fill */
531 : 4863750 : node->hj_JoinState = HJ_FILL_OUTER_TUPLE;
532 : 4863750 : continue;
533 : : }
534 : : }
535 : :
536 : : /*
537 : : * We've got a match, but still need to test non-hashed quals.
538 : : * ExecScanHashBucket already set up all the state needed to
539 : : * call ExecQual.
540 : : *
541 : : * If we pass the qual, then save state for next call and have
542 : : * ExecProject form the projection, store it in the tuple
543 : : * table, and return the slot.
544 : : *
545 : : * Only the joinquals determine tuple match status, but all
546 : : * quals must pass to actually return the tuple.
547 : : */
2588 548 [ + + + + ]: 4855325 : if (joinqual == NULL || ExecQual(joinqual, econtext))
549 : : {
4854 tgl@sss.pgh.pa.us 550 : 4778924 : node->hj_MatchedOuter = true;
551 : :
552 : :
553 : : /*
554 : : * This is really only needed if HJ_FILL_INNER(node), but
555 : : * we'll avoid the branch and just set it always.
556 : : */
380 tmunro@postgresql.or 557 [ + + ]: 4778924 : if (!HeapTupleHeaderHasMatch(HJTUPLE_MINTUPLE(node->hj_CurTuple)))
1539 558 : 2870914 : HeapTupleHeaderSetMatch(HJTUPLE_MINTUPLE(node->hj_CurTuple));
559 : :
560 : : /* In an antijoin, we never return a matched tuple */
4854 tgl@sss.pgh.pa.us 561 [ + + ]: 4778924 : if (node->js.jointype == JOIN_ANTI)
562 : : {
563 : 782187 : node->hj_JoinState = HJ_NEED_NEW_OUTER;
564 : 782187 : continue;
565 : : }
566 : :
567 : : /*
568 : : * In a right-antijoin, we never return a matched tuple.
569 : : * And we need to stay on the current outer tuple to
570 : : * continue scanning the inner side for matches.
571 : : */
375 572 [ + + ]: 3996737 : if (node->js.jointype == JOIN_RIGHT_ANTI)
573 : 11270 : continue;
574 : :
575 : : /*
576 : : * If we only need to join to the first matching inner
577 : : * tuple, then consider returning this one, but after that
578 : : * continue with next outer tuple.
579 : : */
2564 580 [ + + ]: 3985467 : if (node->js.single_match)
4854 581 : 959279 : node->hj_JoinState = HJ_NEED_NEW_OUTER;
582 : :
2588 andres@anarazel.de 583 [ + + + + ]: 3985467 : if (otherqual == NULL || ExecQual(otherqual, econtext))
2642 584 : 3893426 : return ExecProject(node->js.ps.ps_ProjInfo);
585 : : else
4588 tgl@sss.pgh.pa.us 586 [ - + ]: 92041 : InstrCountFiltered2(node, 1);
587 : : }
588 : : else
589 [ - + ]: 76401 : InstrCountFiltered1(node, 1);
4854 590 : 168442 : break;
591 : :
592 : 5943765 : case HJ_FILL_OUTER_TUPLE:
593 : :
594 : : /*
595 : : * The current outer tuple has run out of matches, so check
596 : : * whether to emit a dummy outer-join tuple. Whether we emit
597 : : * one or not, the next state is NEED_NEW_OUTER.
598 : : */
599 : 5943765 : node->hj_JoinState = HJ_NEED_NEW_OUTER;
600 : :
601 [ + + ]: 5943765 : if (!node->hj_MatchedOuter &&
602 [ + + ]: 3560865 : HJ_FILL_OUTER(node))
603 : : {
604 : : /*
605 : : * Generate a fake join tuple with nulls for the inner
606 : : * tuple, and return it if it passes the non-join quals.
607 : : */
608 : 1016819 : econtext->ecxt_innertuple = node->hj_NullInnerTupleSlot;
609 : :
2588 andres@anarazel.de 610 [ + + + + ]: 1016819 : if (otherqual == NULL || ExecQual(otherqual, econtext))
2642 611 : 479523 : return ExecProject(node->js.ps.ps_ProjInfo);
612 : : else
4588 tgl@sss.pgh.pa.us 613 [ - + ]: 537296 : InstrCountFiltered2(node, 1);
614 : : }
4854 615 : 5464242 : break;
616 : :
617 : 236149 : case HJ_FILL_INNER_TUPLES:
618 : :
619 : : /*
620 : : * We have finished a batch, but we are doing
621 : : * right/right-anti/full join, so any unmatched inner tuples
622 : : * in the hashtable have to be emitted before we continue to
623 : : * the next batch.
624 : : */
380 tmunro@postgresql.or 625 [ + + + + ]: 412262 : if (!(parallel ? ExecParallelScanHashTableForUnmatched(node, econtext)
626 : 176113 : : ExecScanHashTableForUnmatched(node, econtext)))
627 : : {
628 : : /* no more unmatched tuples */
4854 tgl@sss.pgh.pa.us 629 : 2544 : node->hj_JoinState = HJ_NEED_NEW_BATCH;
630 : 2544 : continue;
631 : : }
632 : :
633 : : /*
634 : : * Generate a fake join tuple with nulls for the outer tuple,
635 : : * and return it if it passes the non-join quals.
636 : : */
637 : 233605 : econtext->ecxt_outertuple = node->hj_NullOuterTupleSlot;
638 : :
2588 andres@anarazel.de 639 [ + + + + ]: 233605 : if (otherqual == NULL || ExecQual(otherqual, econtext))
2642 640 : 230028 : return ExecProject(node->js.ps.ps_ProjInfo);
641 : : else
4588 tgl@sss.pgh.pa.us 642 [ - + ]: 3577 : InstrCountFiltered2(node, 1);
4854 643 : 3577 : break;
644 : :
645 : 11397 : case HJ_NEED_NEW_BATCH:
646 : :
647 : : /*
648 : : * Try to advance to next batch. Done if there are no more.
649 : : */
2307 andres@anarazel.de 650 [ + + ]: 11397 : if (parallel)
651 : : {
652 [ + + ]: 771 : if (!ExecParallelHashJoinNewBatch(node))
653 : 198 : return NULL; /* end of parallel-aware join */
654 : : }
655 : : else
656 : : {
657 [ + + ]: 10626 : if (!ExecHashJoinNewBatch(node))
658 : 9813 : return NULL; /* end of parallel-oblivious join */
659 : : }
4854 tgl@sss.pgh.pa.us 660 : 1386 : node->hj_JoinState = HJ_NEED_NEW_OUTER;
661 : 1386 : break;
662 : :
4854 tgl@sss.pgh.pa.us 663 :UBC 0 : default:
664 [ # # ]: 0 : elog(ERROR, "unrecognized hashjoin state: %d",
665 : : (int) node->hj_JoinState);
666 : : }
667 : : }
668 : : }
669 : :
670 : : /* ----------------------------------------------------------------
671 : : * ExecHashJoin
672 : : *
673 : : * Parallel-oblivious version.
674 : : * ----------------------------------------------------------------
675 : : */
676 : : static TupleTableSlot * /* return: a tuple or NULL */
2307 andres@anarazel.de 677 :CBC 3474992 : ExecHashJoin(PlanState *pstate)
678 : : {
679 : : /*
680 : : * On sufficiently smart compilers this should be inlined with the
681 : : * parallel-aware branches removed.
682 : : */
683 : 3474992 : return ExecHashJoinImpl(pstate, false);
684 : : }
685 : :
686 : : /* ----------------------------------------------------------------
687 : : * ExecParallelHashJoin
688 : : *
689 : : * Parallel-aware version.
690 : : * ----------------------------------------------------------------
691 : : */
692 : : static TupleTableSlot * /* return: a tuple or NULL */
693 : 1140216 : ExecParallelHashJoin(PlanState *pstate)
694 : : {
695 : : /*
696 : : * On sufficiently smart compilers this should be inlined with the
697 : : * parallel-oblivious branches removed.
698 : : */
699 : 1140216 : return ExecHashJoinImpl(pstate, true);
700 : : }
701 : :
702 : : /* ----------------------------------------------------------------
703 : : * ExecInitHashJoin
704 : : *
705 : : * Init routine for HashJoin node.
706 : : * ----------------------------------------------------------------
707 : : */
708 : : HashJoinState *
6620 tgl@sss.pgh.pa.us 709 : 14898 : ExecInitHashJoin(HashJoin *node, EState *estate, int eflags)
710 : : {
711 : : HashJoinState *hjstate;
712 : : Plan *outerNode;
713 : : Hash *hashNode;
714 : : TupleDesc outerDesc,
715 : : innerDesc;
716 : : const TupleTableSlotOps *ops;
717 : :
718 : : /* check for unsupported flags */
719 [ - + ]: 14898 : Assert(!(eflags & (EXEC_FLAG_BACKWARD | EXEC_FLAG_MARK)));
720 : :
721 : : /*
722 : : * create state structure
723 : : */
9716 bruce@momjian.us 724 : 14898 : hjstate = makeNode(HashJoinState);
7801 tgl@sss.pgh.pa.us 725 : 14898 : hjstate->js.ps.plan = (Plan *) node;
726 : 14898 : hjstate->js.ps.state = estate;
727 : :
728 : : /*
729 : : * See ExecHashJoinInitializeDSM() and ExecHashJoinInitializeWorker()
730 : : * where this function may be replaced with a parallel version, if we
731 : : * managed to launch a parallel query.
732 : : */
2463 andres@anarazel.de 733 : 14898 : hjstate->js.ps.ExecProcNode = ExecHashJoin;
2249 734 : 14898 : hjstate->js.jointype = node->join.jointype;
735 : :
736 : : /*
737 : : * Miscellaneous initialization
738 : : *
739 : : * create expression context for node
740 : : */
7801 tgl@sss.pgh.pa.us 741 : 14898 : ExecAssignExprContext(estate, &hjstate->js.ps);
742 : :
743 : : /*
744 : : * initialize child nodes
745 : : *
746 : : * Note: we could suppress the REWIND flag for the inner input, which
747 : : * would amount to betting that the hash will be a single batch. Not
748 : : * clear if this would be a win or not.
749 : : */
750 : 14898 : outerNode = outerPlan(node);
751 : 14898 : hashNode = (Hash *) innerPlan(node);
752 : :
6620 753 : 14898 : outerPlanState(hjstate) = ExecInitNode(outerNode, estate, eflags);
2249 andres@anarazel.de 754 : 14898 : outerDesc = ExecGetResultType(outerPlanState(hjstate));
6620 tgl@sss.pgh.pa.us 755 : 14898 : innerPlanState(hjstate) = ExecInitNode((Plan *) hashNode, estate, eflags);
2249 andres@anarazel.de 756 : 14898 : innerDesc = ExecGetResultType(innerPlanState(hjstate));
757 : :
758 : : /*
759 : : * Initialize result slot, type and projection.
760 : : */
1977 761 : 14898 : ExecInitResultTupleSlotTL(&hjstate->js.ps, &TTSOpsVirtual);
2249 762 : 14898 : ExecAssignProjectionInfo(&hjstate->js.ps, NULL);
763 : :
764 : : /*
765 : : * tuple table initialization
766 : : */
1977 767 : 14898 : ops = ExecGetResultSlotOps(outerPlanState(hjstate), NULL);
768 : 14898 : hjstate->hj_OuterTupleSlot = ExecInitExtraTupleSlot(estate, outerDesc,
769 : : ops);
770 : :
771 : : /*
772 : : * detect whether we need only consider the first matching inner tuple
773 : : */
2564 tgl@sss.pgh.pa.us 774 [ + + ]: 22725 : hjstate->js.single_match = (node->join.inner_unique ||
775 [ + + ]: 7827 : node->join.jointype == JOIN_SEMI);
776 : :
777 : : /* set up null tuples for outer joins, if needed */
8615 778 [ + + + + : 14898 : switch (node->join.jointype)
- ]
779 : : {
780 : 9005 : case JOIN_INNER:
781 : : case JOIN_SEMI:
782 : 9005 : break;
783 : 2655 : case JOIN_LEFT:
784 : : case JOIN_ANTI:
785 : 2655 : hjstate->hj_NullInnerTupleSlot =
1977 andres@anarazel.de 786 : 2655 : ExecInitNullTupleSlot(estate, innerDesc, &TTSOpsVirtual);
8615 tgl@sss.pgh.pa.us 787 : 2655 : break;
4854 788 : 2693 : case JOIN_RIGHT:
789 : : case JOIN_RIGHT_ANTI:
790 : 2693 : hjstate->hj_NullOuterTupleSlot =
1977 andres@anarazel.de 791 : 2693 : ExecInitNullTupleSlot(estate, outerDesc, &TTSOpsVirtual);
4854 tgl@sss.pgh.pa.us 792 : 2693 : break;
793 : 545 : case JOIN_FULL:
794 : 545 : hjstate->hj_NullOuterTupleSlot =
1977 andres@anarazel.de 795 : 545 : ExecInitNullTupleSlot(estate, outerDesc, &TTSOpsVirtual);
4854 tgl@sss.pgh.pa.us 796 : 545 : hjstate->hj_NullInnerTupleSlot =
1977 andres@anarazel.de 797 : 545 : ExecInitNullTupleSlot(estate, innerDesc, &TTSOpsVirtual);
4854 tgl@sss.pgh.pa.us 798 : 545 : break;
8615 tgl@sss.pgh.pa.us 799 :UBC 0 : default:
7573 800 [ # # ]: 0 : elog(ERROR, "unrecognized join type: %d",
801 : : (int) node->join.jointype);
802 : : }
803 : :
804 : : /*
805 : : * now for some voodoo. our temporary tuple slot is actually the result
806 : : * tuple slot of the Hash node (which is our inner plan). we can do this
807 : : * because Hash nodes don't return tuples via ExecProcNode() -- instead
808 : : * the hash join node uses ExecScanHashBucket() to get at the contents of
809 : : * the hash table. -cim 6/9/91
810 : : */
811 : : {
7801 tgl@sss.pgh.pa.us 812 :CBC 14898 : HashState *hashstate = (HashState *) innerPlanState(hjstate);
813 : 14898 : TupleTableSlot *slot = hashstate->ps.ps_ResultTupleSlot;
814 : :
9716 bruce@momjian.us 815 : 14898 : hjstate->hj_HashTupleSlot = slot;
816 : : }
817 : :
818 : : /*
819 : : * initialize child expressions
820 : : */
2249 andres@anarazel.de 821 : 14898 : hjstate->js.ps.qual =
822 : 14898 : ExecInitQual(node->join.plan.qual, (PlanState *) hjstate);
823 : 14898 : hjstate->js.joinqual =
824 : 14898 : ExecInitQual(node->join.joinqual, (PlanState *) hjstate);
825 : 14898 : hjstate->hashclauses =
826 : 14898 : ExecInitQual(node->hashclauses, (PlanState *) hjstate);
827 : :
828 : : /*
829 : : * initialize hash-specific info
830 : : */
7403 neilc@samurai.com 831 : 14898 : hjstate->hj_HashTable = NULL;
6776 tgl@sss.pgh.pa.us 832 : 14898 : hjstate->hj_FirstOuterTupleSlot = NULL;
833 : :
6979 834 : 14898 : hjstate->hj_CurHashValue = 0;
9098 835 : 14898 : hjstate->hj_CurBucketNo = 0;
5503 836 : 14898 : hjstate->hj_CurSkewBucketNo = INVALID_SKEW_BUCKET_NO;
7403 neilc@samurai.com 837 : 14898 : hjstate->hj_CurTuple = NULL;
838 : :
1717 andres@anarazel.de 839 : 14898 : hjstate->hj_OuterHashKeys = ExecInitExprList(node->hashkeys,
840 : : (PlanState *) hjstate);
841 : 14898 : hjstate->hj_HashOperators = node->hashoperators;
842 : 14898 : hjstate->hj_Collations = node->hashcollations;
843 : :
4854 tgl@sss.pgh.pa.us 844 : 14898 : hjstate->hj_JoinState = HJ_BUILD_HASHTABLE;
8615 845 : 14898 : hjstate->hj_MatchedOuter = false;
6712 846 : 14898 : hjstate->hj_OuterNotEmpty = false;
847 : :
7801 848 : 14898 : return hjstate;
849 : : }
850 : :
851 : : /* ----------------------------------------------------------------
852 : : * ExecEndHashJoin
853 : : *
854 : : * clean up routine for HashJoin node
855 : : * ----------------------------------------------------------------
856 : : */
857 : : void
858 : 14843 : ExecEndHashJoin(HashJoinState *node)
859 : : {
860 : : /*
861 : : * Free hash table
862 : : */
863 [ + + ]: 14843 : if (node->hj_HashTable)
864 : : {
865 : 9837 : ExecHashTableDestroy(node->hj_HashTable);
866 : 9837 : node->hj_HashTable = NULL;
867 : : }
868 : :
869 : : /*
870 : : * clean up subtrees
871 : : */
7791 872 : 14843 : ExecEndNode(outerPlanState(node));
873 : 14843 : ExecEndNode(innerPlanState(node));
10141 scrappy@hub.org 874 : 14843 : }
875 : :
876 : : /*
877 : : * ExecHashJoinOuterGetTuple
878 : : *
879 : : * get the next outer tuple for a parallel oblivious hashjoin: either by
880 : : * executing the outer plan node in the first pass, or from the temp
881 : : * files for the hashjoin batches.
882 : : *
883 : : * Returns a null slot if no more outer tuples (within the current batch).
884 : : *
885 : : * On success, the tuple's hash value is stored at *hashvalue --- this is
886 : : * either originally computed, or re-read from the temp file.
887 : : */
888 : : static TupleTableSlot *
6979 tgl@sss.pgh.pa.us 889 : 7350987 : ExecHashJoinOuterGetTuple(PlanState *outerNode,
890 : : HashJoinState *hjstate,
891 : : uint32 *hashvalue)
892 : : {
9091 bruce@momjian.us 893 : 7350987 : HashJoinTable hashtable = hjstate->hj_HashTable;
894 : 7350987 : int curbatch = hashtable->curbatch;
895 : : TupleTableSlot *slot;
896 : :
6286 tgl@sss.pgh.pa.us 897 [ + + ]: 7350987 : if (curbatch == 0) /* if it is the first pass */
898 : : {
899 : : /*
900 : : * Check to see if first outer tuple was already fetched by
901 : : * ExecHashJoin() and not used yet.
902 : : */
6776 903 : 6615078 : slot = hjstate->hj_FirstOuterTupleSlot;
904 [ + + + - ]: 6615078 : if (!TupIsNull(slot))
905 : 6213 : hjstate->hj_FirstOuterTupleSlot = NULL;
906 : : else
907 : 6608865 : slot = ExecProcNode(outerNode);
908 : :
6286 909 [ + + + + ]: 6615485 : while (!TupIsNull(slot))
910 : : {
911 : : /*
912 : : * We have to compute the tuple's hash value.
913 : : */
6979 914 : 6605666 : ExprContext *econtext = hjstate->js.ps.ps_ExprContext;
915 : :
916 : 6605666 : econtext->ecxt_outertuple = slot;
6286 917 [ + + ]: 6605666 : if (ExecHashGetHashValue(hashtable, econtext,
918 : : hjstate->hj_OuterHashKeys,
919 : : true, /* outer tuple */
4854 920 : 6605666 : HJ_FILL_OUTER(hjstate),
921 : : hashvalue))
922 : : {
923 : : /* remember outer relation is not empty for possible rescan */
6286 924 : 6605259 : hjstate->hj_OuterNotEmpty = true;
925 : :
926 : 6605259 : return slot;
927 : : }
928 : :
929 : : /*
930 : : * That tuple couldn't match because of a NULL, so discard it and
931 : : * continue with the next one.
932 : : */
933 : 407 : slot = ExecProcNode(outerNode);
934 : : }
935 : : }
4854 936 [ + - ]: 735909 : else if (curbatch < hashtable->nbatch)
937 : : {
938 : 735909 : BufFile *file = hashtable->outerBatchFile[curbatch];
939 : :
940 : : /*
941 : : * In outer-join cases, we could get here even though the batch file
942 : : * is empty.
943 : : */
944 [ - + ]: 735909 : if (file == NULL)
4854 tgl@sss.pgh.pa.us 945 :UBC 0 : return NULL;
946 : :
9098 tgl@sss.pgh.pa.us 947 :CBC 735909 : slot = ExecHashJoinGetSavedTuple(hjstate,
948 : : file,
949 : : hashvalue,
950 : : hjstate->hj_OuterTupleSlot);
9091 bruce@momjian.us 951 [ + + + - ]: 735909 : if (!TupIsNull(slot))
9098 tgl@sss.pgh.pa.us 952 : 735096 : return slot;
953 : : }
954 : :
955 : : /* End of this batch */
956 : 10632 : return NULL;
957 : : }
958 : :
959 : : /*
960 : : * ExecHashJoinOuterGetTuple variant for the parallel case.
961 : : */
962 : : static TupleTableSlot *
2307 andres@anarazel.de 963 : 1080588 : ExecParallelHashJoinOuterGetTuple(PlanState *outerNode,
964 : : HashJoinState *hjstate,
965 : : uint32 *hashvalue)
966 : : {
967 : 1080588 : HashJoinTable hashtable = hjstate->hj_HashTable;
968 : 1080588 : int curbatch = hashtable->curbatch;
969 : : TupleTableSlot *slot;
970 : :
971 : : /*
972 : : * In the Parallel Hash case we only run the outer plan directly for
973 : : * single-batch hash joins. Otherwise we have to go to batch files, even
974 : : * for batch 0.
975 : : */
976 [ + + + + ]: 1080588 : if (curbatch == 0 && hashtable->nbatch == 1)
977 : : {
978 : 480087 : slot = ExecProcNode(outerNode);
979 : :
980 [ + + + - ]: 480087 : while (!TupIsNull(slot))
981 : : {
982 : 480003 : ExprContext *econtext = hjstate->js.ps.ps_ExprContext;
983 : :
984 : 480003 : econtext->ecxt_outertuple = slot;
985 [ + - ]: 480003 : if (ExecHashGetHashValue(hashtable, econtext,
986 : : hjstate->hj_OuterHashKeys,
987 : : true, /* outer tuple */
988 : 480003 : HJ_FILL_OUTER(hjstate),
989 : : hashvalue))
990 : 480003 : return slot;
991 : :
992 : : /*
993 : : * That tuple couldn't match because of a NULL, so discard it and
994 : : * continue with the next one.
995 : : */
2307 andres@anarazel.de 996 :UBC 0 : slot = ExecProcNode(outerNode);
997 : : }
998 : : }
2307 andres@anarazel.de 999 [ + - ]:CBC 600501 : else if (curbatch < hashtable->nbatch)
1000 : : {
1001 : : MinimalTuple tuple;
1002 : :
1003 : 600501 : tuple = sts_parallel_scan_next(hashtable->batches[curbatch].outer_tuples,
1004 : : hashvalue);
1005 [ + + ]: 600501 : if (tuple != NULL)
1006 : : {
1976 1007 : 600012 : ExecForceStoreMinimalTuple(tuple,
1008 : : hjstate->hj_OuterTupleSlot,
1009 : : false);
1010 : 600012 : slot = hjstate->hj_OuterTupleSlot;
2307 1011 : 600012 : return slot;
1012 : : }
1013 : : else
1014 : 489 : ExecClearTuple(hjstate->hj_OuterTupleSlot);
1015 : : }
1016 : :
1017 : : /* End of this batch */
380 tmunro@postgresql.or 1018 : 573 : hashtable->batches[curbatch].outer_eof = true;
1019 : :
2307 andres@anarazel.de 1020 : 573 : return NULL;
1021 : : }
1022 : :
1023 : : /*
1024 : : * ExecHashJoinNewBatch
1025 : : * switch to a new hashjoin batch
1026 : : *
1027 : : * Returns true if successful, false if there are no more batches.
1028 : : */
1029 : : static bool
9715 bruce@momjian.us 1030 : 10626 : ExecHashJoinNewBatch(HashJoinState *hjstate)
1031 : : {
9098 tgl@sss.pgh.pa.us 1032 : 10626 : HashJoinTable hashtable = hjstate->hj_HashTable;
1033 : : int nbatch;
1034 : : int curbatch;
1035 : : BufFile *innerFile;
1036 : : TupleTableSlot *slot;
1037 : : uint32 hashvalue;
1038 : :
6979 1039 : 10626 : nbatch = hashtable->nbatch;
1040 : 10626 : curbatch = hashtable->curbatch;
1041 : :
1042 [ + + ]: 10626 : if (curbatch > 0)
1043 : : {
1044 : : /*
1045 : : * We no longer need the previous outer batch file; close it right
1046 : : * away to free disk space.
1047 : : */
1048 [ + - ]: 813 : if (hashtable->outerBatchFile[curbatch])
1049 : 813 : BufFileClose(hashtable->outerBatchFile[curbatch]);
1050 : 813 : hashtable->outerBatchFile[curbatch] = NULL;
1051 : : }
1052 : : else /* we just finished the first batch */
1053 : : {
1054 : : /*
1055 : : * Reset some of the skew optimization state variables, since we no
1056 : : * longer need to consider skew tuples after the first batch. The
1057 : : * memory context reset we are about to do will release the skew
1058 : : * hashtable itself.
1059 : : */
5503 1060 : 9813 : hashtable->skewEnabled = false;
1061 : 9813 : hashtable->skewBucket = NULL;
1062 : 9813 : hashtable->skewBucketNums = NULL;
4854 1063 : 9813 : hashtable->nSkewBuckets = 0;
5503 1064 : 9813 : hashtable->spaceUsedSkew = 0;
1065 : : }
1066 : :
1067 : : /*
1068 : : * We can always skip over any batches that are completely empty on both
1069 : : * sides. We can sometimes skip over batches that are empty on only one
1070 : : * side, but there are exceptions:
1071 : : *
1072 : : * 1. In a left/full outer join, we have to process outer batches even if
1073 : : * the inner batch is empty. Similarly, in a right/right-anti/full outer
1074 : : * join, we have to process inner batches even if the outer batch is
1075 : : * empty.
1076 : : *
1077 : : * 2. If we have increased nbatch since the initial estimate, we have to
1078 : : * scan inner batches since they might contain tuples that need to be
1079 : : * reassigned to later inner batches.
1080 : : *
1081 : : * 3. Similarly, if we have increased nbatch since starting the outer
1082 : : * scan, we have to rescan outer batches in case they contain tuples that
1083 : : * need to be reassigned.
1084 : : */
6979 1085 : 10626 : curbatch++;
1086 [ + + ]: 10626 : while (curbatch < nbatch &&
1087 [ - + ]: 813 : (hashtable->outerBatchFile[curbatch] == NULL ||
1088 [ - + ]: 813 : hashtable->innerBatchFile[curbatch] == NULL))
1089 : : {
6979 tgl@sss.pgh.pa.us 1090 [ # # ]:UBC 0 : if (hashtable->outerBatchFile[curbatch] &&
4854 1091 [ # # ]: 0 : HJ_FILL_OUTER(hjstate))
1092 : 0 : break; /* must process due to rule 1 */
1093 [ # # ]: 0 : if (hashtable->innerBatchFile[curbatch] &&
1094 [ # # ]: 0 : HJ_FILL_INNER(hjstate))
6979 1095 : 0 : break; /* must process due to rule 1 */
1096 [ # # ]: 0 : if (hashtable->innerBatchFile[curbatch] &&
1097 [ # # ]: 0 : nbatch != hashtable->nbatch_original)
1098 : 0 : break; /* must process due to rule 2 */
1099 [ # # ]: 0 : if (hashtable->outerBatchFile[curbatch] &&
1100 [ # # ]: 0 : nbatch != hashtable->nbatch_outstart)
1101 : 0 : break; /* must process due to rule 3 */
1102 : : /* We can ignore this batch. */
1103 : : /* Release associated temp files right away. */
1104 [ # # ]: 0 : if (hashtable->innerBatchFile[curbatch])
1105 : 0 : BufFileClose(hashtable->innerBatchFile[curbatch]);
1106 : 0 : hashtable->innerBatchFile[curbatch] = NULL;
1107 [ # # ]: 0 : if (hashtable->outerBatchFile[curbatch])
1108 : 0 : BufFileClose(hashtable->outerBatchFile[curbatch]);
1109 : 0 : hashtable->outerBatchFile[curbatch] = NULL;
1110 : 0 : curbatch++;
1111 : : }
1112 : :
6979 tgl@sss.pgh.pa.us 1113 [ + + ]:CBC 10626 : if (curbatch >= nbatch)
4753 bruce@momjian.us 1114 : 9813 : return false; /* no more batches */
1115 : :
6979 tgl@sss.pgh.pa.us 1116 : 813 : hashtable->curbatch = curbatch;
1117 : :
1118 : : /*
1119 : : * Reload the hash table with the new inner batch (which could be empty)
1120 : : */
1121 : 813 : ExecHashTableReset(hashtable);
1122 : :
1123 : 813 : innerFile = hashtable->innerBatchFile[curbatch];
1124 : :
1125 [ + - ]: 813 : if (innerFile != NULL)
1126 : : {
382 peter@eisentraut.org 1127 [ - + ]: 813 : if (BufFileSeek(innerFile, 0, 0, SEEK_SET))
6979 tgl@sss.pgh.pa.us 1128 [ # # ]:UBC 0 : ereport(ERROR,
1129 : : (errcode_for_file_access(),
1130 : : errmsg("could not rewind hash-join temporary file")));
1131 : :
6979 tgl@sss.pgh.pa.us 1132 [ + + ]:CBC 1810308 : while ((slot = ExecHashJoinGetSavedTuple(hjstate,
1133 : : innerFile,
1134 : : &hashvalue,
1135 : : hjstate->hj_HashTupleSlot)))
1136 : : {
1137 : : /*
1138 : : * NOTE: some tuples may be sent to future batches. Also, it is
1139 : : * possible for hashtable->nbatch to be increased here!
1140 : : */
6501 1141 : 1809495 : ExecHashTableInsert(hashtable, slot, hashvalue);
1142 : : }
1143 : :
1144 : : /*
1145 : : * after we build the hash table, the inner batch file is no longer
1146 : : * needed
1147 : : */
6979 1148 : 813 : BufFileClose(innerFile);
1149 : 813 : hashtable->innerBatchFile[curbatch] = NULL;
1150 : : }
1151 : :
1152 : : /*
1153 : : * Rewind outer batch file (if present), so that we can start reading it.
1154 : : */
4854 1155 [ + - ]: 813 : if (hashtable->outerBatchFile[curbatch] != NULL)
1156 : : {
382 peter@eisentraut.org 1157 [ - + ]: 813 : if (BufFileSeek(hashtable->outerBatchFile[curbatch], 0, 0, SEEK_SET))
4854 tgl@sss.pgh.pa.us 1158 [ # # ]:UBC 0 : ereport(ERROR,
1159 : : (errcode_for_file_access(),
1160 : : errmsg("could not rewind hash-join temporary file")));
1161 : : }
1162 : :
4854 tgl@sss.pgh.pa.us 1163 :CBC 813 : return true;
1164 : : }
1165 : :
1166 : : /*
1167 : : * Choose a batch to work on, and attach to it. Returns true if successful,
1168 : : * false if there are no more batches.
1169 : : */
1170 : : static bool
2307 andres@anarazel.de 1171 : 771 : ExecParallelHashJoinNewBatch(HashJoinState *hjstate)
1172 : : {
1173 : 771 : HashJoinTable hashtable = hjstate->hj_HashTable;
1174 : : int start_batchno;
1175 : : int batchno;
1176 : :
1177 : : /*
1178 : : * If we were already attached to a batch, remember not to bother checking
1179 : : * it again, and detach from it (possibly freeing the hash table if we are
1180 : : * last to detach).
1181 : : */
1182 [ + + ]: 771 : if (hashtable->curbatch >= 0)
1183 : : {
1184 : 552 : hashtable->batches[hashtable->curbatch].done = true;
1185 : 552 : ExecHashTableDetachBatch(hashtable);
1186 : : }
1187 : :
1188 : : /*
1189 : : * Search for a batch that isn't done. We use an atomic counter to start
1190 : : * our search at a different batch in every participant when there are
1191 : : * more batches than participants.
1192 : : */
1193 : 771 : batchno = start_batchno =
1194 : 771 : pg_atomic_fetch_add_u32(&hashtable->parallel_state->distributor, 1) %
1195 : 771 : hashtable->nbatch;
1196 : : do
1197 : : {
1198 : : uint32 hashvalue;
1199 : : MinimalTuple tuple;
1200 : : TupleTableSlot *slot;
1201 : :
1202 [ + + ]: 2072 : if (!hashtable->batches[batchno].done)
1203 : : {
1204 : : SharedTuplestoreAccessor *inner_tuples;
1205 : 1077 : Barrier *batch_barrier =
331 tgl@sss.pgh.pa.us 1206 : 1077 : &hashtable->batches[batchno].shared->batch_barrier;
1207 : :
2307 andres@anarazel.de 1208 [ + + + + : 1077 : switch (BarrierAttach(batch_barrier))
- + - ]
1209 : : {
388 tmunro@postgresql.or 1210 : 387 : case PHJ_BATCH_ELECT:
1211 : :
1212 : : /* One backend allocates the hash table. */
2307 andres@anarazel.de 1213 [ + - ]: 387 : if (BarrierArriveAndWait(batch_barrier,
1214 : : WAIT_EVENT_HASH_BATCH_ELECT))
1215 : 387 : ExecParallelHashTableAlloc(hashtable, batchno);
1216 : : /* Fall through. */
1217 : :
1218 : : case PHJ_BATCH_ALLOCATE:
1219 : : /* Wait for allocation to complete. */
1220 : 388 : BarrierArriveAndWait(batch_barrier,
1221 : : WAIT_EVENT_HASH_BATCH_ALLOCATE);
1222 : : /* Fall through. */
1223 : :
388 tmunro@postgresql.or 1224 : 401 : case PHJ_BATCH_LOAD:
1225 : : /* Start (or join in) loading tuples. */
2307 andres@anarazel.de 1226 : 401 : ExecParallelHashTableSetCurrentBatch(hashtable, batchno);
1227 : 401 : inner_tuples = hashtable->batches[batchno].inner_tuples;
1228 : 401 : sts_begin_parallel_scan(inner_tuples);
1229 [ + + ]: 549341 : while ((tuple = sts_parallel_scan_next(inner_tuples,
1230 : : &hashvalue)))
1231 : : {
1976 1232 : 548940 : ExecForceStoreMinimalTuple(tuple,
1233 : : hjstate->hj_HashTupleSlot,
1234 : : false);
1235 : 548940 : slot = hjstate->hj_HashTupleSlot;
2307 1236 : 548940 : ExecParallelHashTableInsertCurrentBatch(hashtable, slot,
1237 : : hashvalue);
1238 : : }
1239 : 401 : sts_end_parallel_scan(inner_tuples);
1240 : 401 : BarrierArriveAndWait(batch_barrier,
1241 : : WAIT_EVENT_HASH_BATCH_LOAD);
1242 : : /* Fall through. */
1243 : :
388 tmunro@postgresql.or 1244 : 573 : case PHJ_BATCH_PROBE:
1245 : :
1246 : : /*
1247 : : * This batch is ready to probe. Return control to
1248 : : * caller. We stay attached to batch_barrier so that the
1249 : : * hash table stays alive until everyone's finished
1250 : : * probing it, but no participant is allowed to wait at
1251 : : * this barrier again (or else a deadlock could occur).
1252 : : * All attached participants must eventually detach from
1253 : : * the barrier and one worker must advance the phase so
1254 : : * that the final phase is reached.
1255 : : */
2307 andres@anarazel.de 1256 : 573 : ExecParallelHashTableSetCurrentBatch(hashtable, batchno);
1257 : 573 : sts_begin_parallel_scan(hashtable->batches[batchno].outer_tuples);
1258 : :
1259 : 573 : return true;
380 tmunro@postgresql.or 1260 :UBC 0 : case PHJ_BATCH_SCAN:
1261 : :
1262 : : /*
1263 : : * In principle, we could help scan for unmatched tuples,
1264 : : * since that phase is already underway (the thing we
1265 : : * can't do under current deadlock-avoidance rules is wait
1266 : : * for others to arrive at PHJ_BATCH_SCAN, because
1267 : : * PHJ_BATCH_PROBE emits tuples, but in this case we just
1268 : : * got here without waiting). That is not yet done. For
1269 : : * now, we just detach and go around again. We have to
1270 : : * use ExecHashTableDetachBatch() because there's a small
1271 : : * chance we'll be the last to detach, and then we're
1272 : : * responsible for freeing memory.
1273 : : */
1274 : 0 : ExecParallelHashTableSetCurrentBatch(hashtable, batchno);
1275 : 0 : hashtable->batches[batchno].done = true;
1276 : 0 : ExecHashTableDetachBatch(hashtable);
1277 : 0 : break;
1278 : :
388 tmunro@postgresql.or 1279 :CBC 504 : case PHJ_BATCH_FREE:
1280 : :
1281 : : /*
1282 : : * Already done. Detach and go around again (if any
1283 : : * remain).
1284 : : */
2307 andres@anarazel.de 1285 : 504 : BarrierDetach(batch_barrier);
1286 : 504 : hashtable->batches[batchno].done = true;
1287 : 504 : hashtable->curbatch = -1;
1288 : 504 : break;
1289 : :
2307 andres@anarazel.de 1290 :UBC 0 : default:
1291 [ # # ]: 0 : elog(ERROR, "unexpected batch phase %d",
1292 : : BarrierPhase(batch_barrier));
1293 : : }
1294 : : }
2307 andres@anarazel.de 1295 :CBC 1499 : batchno = (batchno + 1) % hashtable->nbatch;
1296 [ + + ]: 1499 : } while (batchno != start_batchno);
1297 : :
1298 : 198 : return false;
1299 : : }
1300 : :
1301 : : /*
1302 : : * ExecHashJoinSaveTuple
1303 : : * save a tuple to a batch file.
1304 : : *
1305 : : * The data recorded in the file for each tuple is its hash value,
1306 : : * then the tuple in MinimalTuple format.
1307 : : *
1308 : : * fileptr points to a batch file in one of the hashtable arrays.
1309 : : *
1310 : : * The batch files (and their buffers) are allocated in the spill context
1311 : : * created for the hashtable.
1312 : : */
1313 : : void
6156 tgl@sss.pgh.pa.us 1314 : 2544591 : ExecHashJoinSaveTuple(MinimalTuple tuple, uint32 hashvalue,
1315 : : BufFile **fileptr, HashJoinTable hashtable)
1316 : : {
6756 bruce@momjian.us 1317 : 2544591 : BufFile *file = *fileptr;
1318 : :
1319 : : /*
1320 : : * The batch file is lazily created. If this is the first tuple written to
1321 : : * this batch, the batch file is created and its buffer is allocated in
1322 : : * the spillCxt context, NOT in the batchCxt.
1323 : : *
1324 : : * During the build phase, buffered files are created for inner batches.
1325 : : * Each batch's buffered file is closed (and its buffer freed) after the
1326 : : * batch is loaded into memory during the outer side scan. Therefore, it
1327 : : * is necessary to allocate the batch file buffer in a memory context
1328 : : * which outlives the batch itself.
1329 : : *
1330 : : * Also, we use spillCxt instead of hashCxt for a better accounting of the
1331 : : * spilling memory consumption.
1332 : : */
6979 tgl@sss.pgh.pa.us 1333 [ + + ]: 2544591 : if (file == NULL)
1334 : : {
331 1335 : 1626 : MemoryContext oldctx = MemoryContextSwitchTo(hashtable->spillCxt);
1336 : :
6156 1337 : 1626 : file = BufFileCreateTemp(false);
6979 1338 : 1626 : *fileptr = file;
1339 : :
331 tomas.vondra@postgre 1340 : 1626 : MemoryContextSwitchTo(oldctx);
1341 : : }
1342 : :
493 peter@eisentraut.org 1343 : 2544591 : BufFileWrite(file, &hashvalue, sizeof(uint32));
1344 : 2544591 : BufFileWrite(file, tuple, tuple->t_len);
10141 scrappy@hub.org 1345 : 2544591 : }
1346 : :
1347 : : /*
1348 : : * ExecHashJoinGetSavedTuple
1349 : : * read the next tuple from a batch file. Return NULL if no more.
1350 : : *
1351 : : * On success, *hashvalue is set to the tuple's hash value, and the tuple
1352 : : * itself is stored in the given slot.
1353 : : */
1354 : : static TupleTableSlot *
6979 tgl@sss.pgh.pa.us 1355 : 2546217 : ExecHashJoinGetSavedTuple(HashJoinState *hjstate,
1356 : : BufFile *file,
1357 : : uint32 *hashvalue,
1358 : : TupleTableSlot *tupleSlot)
1359 : : {
1360 : : uint32 header[2];
1361 : : size_t nread;
1362 : : MinimalTuple tuple;
1363 : :
1364 : : /*
1365 : : * We check for interrupts here because this is typically taken as an
1366 : : * alternative code path to an ExecProcNode() call, which would include
1367 : : * such a check.
1368 : : */
2615 1369 [ + + ]: 2546217 : CHECK_FOR_INTERRUPTS();
1370 : :
1371 : : /*
1372 : : * Since both the hash value and the MinimalTuple length word are uint32,
1373 : : * we can read them both in one BufFileRead() call without any type
1374 : : * cheating.
1375 : : */
454 peter@eisentraut.org 1376 : 2546217 : nread = BufFileReadMaybeEOF(file, header, sizeof(header), true);
6402 bruce@momjian.us 1377 [ + + ]: 2546217 : if (nread == 0) /* end of file */
1378 : : {
6501 tgl@sss.pgh.pa.us 1379 : 1626 : ExecClearTuple(tupleSlot);
1380 : 1626 : return NULL;
1381 : : }
1382 : 2544591 : *hashvalue = header[0];
1383 : 2544591 : tuple = (MinimalTuple) palloc(header[1]);
1384 : 2544591 : tuple->t_len = header[1];
454 peter@eisentraut.org 1385 : 2544591 : BufFileReadExact(file,
1386 : : (char *) tuple + sizeof(uint32),
1387 : 2544591 : header[1] - sizeof(uint32));
1976 andres@anarazel.de 1388 : 2544591 : ExecForceStoreMinimalTuple(tuple, tupleSlot, true);
1389 : 2544591 : return tupleSlot;
1390 : : }
1391 : :
1392 : :
1393 : : void
5025 tgl@sss.pgh.pa.us 1394 : 1117 : ExecReScanHashJoin(HashJoinState *node)
1395 : : {
647 1396 : 1117 : PlanState *outerPlan = outerPlanState(node);
1397 : 1117 : PlanState *innerPlan = innerPlanState(node);
1398 : :
1399 : : /*
1400 : : * In a multi-batch join, we currently have to do rescans the hard way,
1401 : : * primarily because batch temp files may have already been released. But
1402 : : * if it's a single-batch join, and there is no parameter change for the
1403 : : * inner subnode, then we can just re-use the existing hash table without
1404 : : * rebuilding it.
1405 : : */
6712 1406 [ + + ]: 1117 : if (node->hj_HashTable != NULL)
1407 : : {
1408 [ + - ]: 910 : if (node->hj_HashTable->nbatch == 1 &&
647 1409 [ + + ]: 910 : innerPlan->chgParam == NULL)
1410 : : {
1411 : : /*
1412 : : * Okay to reuse the hash table; needn't rescan inner, either.
1413 : : *
1414 : : * However, if it's a right/right-anti/full join, we'd better
1415 : : * reset the inner-tuple match flags contained in the table.
1416 : : */
4854 1417 [ + + ]: 398 : if (HJ_FILL_INNER(node))
1418 : 6 : ExecHashTableResetMatchFlags(node->hj_HashTable);
1419 : :
1420 : : /*
1421 : : * Also, we need to reset our state about the emptiness of the
1422 : : * outer relation, so that the new scan of the outer will update
1423 : : * it correctly if it turns out to be empty this time. (There's no
1424 : : * harm in clearing it now because ExecHashJoin won't need the
1425 : : * info. In the other cases, where the hash table doesn't exist
1426 : : * or we are destroying it, we leave this state alone because
1427 : : * ExecHashJoin will need it the first time through.)
1428 : : */
6712 1429 : 398 : node->hj_OuterNotEmpty = false;
1430 : :
1431 : : /* ExecHashJoin can skip the BUILD_HASHTABLE step */
4854 1432 : 398 : node->hj_JoinState = HJ_NEED_NEW_OUTER;
1433 : : }
1434 : : else
1435 : : {
1436 : : /* must destroy and rebuild hash table */
647 1437 : 512 : HashState *hashNode = castNode(HashState, innerPlan);
1438 : :
1464 1439 [ - + ]: 512 : Assert(hashNode->hashtable == node->hj_HashTable);
1440 : : /* accumulate stats from old hash table, if wanted */
1441 : : /* (this should match ExecShutdownHash) */
1442 [ - + - - ]: 512 : if (hashNode->ps.instrument && !hashNode->hinstrument)
1464 tgl@sss.pgh.pa.us 1443 :UBC 0 : hashNode->hinstrument = (HashInstrumentation *)
1444 : 0 : palloc0(sizeof(HashInstrumentation));
1464 tgl@sss.pgh.pa.us 1445 [ - + ]:CBC 512 : if (hashNode->hinstrument)
1464 tgl@sss.pgh.pa.us 1446 :UBC 0 : ExecHashAccumInstrumentation(hashNode->hinstrument,
1447 : : hashNode->hashtable);
1448 : : /* for safety, be sure to clear child plan node's pointer too */
1464 tgl@sss.pgh.pa.us 1449 :CBC 512 : hashNode->hashtable = NULL;
1450 : :
6712 1451 : 512 : ExecHashTableDestroy(node->hj_HashTable);
1452 : 512 : node->hj_HashTable = NULL;
4854 1453 : 512 : node->hj_JoinState = HJ_BUILD_HASHTABLE;
1454 : :
1455 : : /*
1456 : : * if chgParam of subnode is not null then plan will be re-scanned
1457 : : * by first ExecProcNode.
1458 : : */
647 1459 [ - + ]: 512 : if (innerPlan->chgParam == NULL)
647 tgl@sss.pgh.pa.us 1460 :UBC 0 : ExecReScan(innerPlan);
1461 : : }
1462 : : }
1463 : :
1464 : : /* Always reset intra-tuple state */
6979 tgl@sss.pgh.pa.us 1465 :CBC 1117 : node->hj_CurHashValue = 0;
7801 1466 : 1117 : node->hj_CurBucketNo = 0;
5503 1467 : 1117 : node->hj_CurSkewBucketNo = INVALID_SKEW_BUCKET_NO;
7403 neilc@samurai.com 1468 : 1117 : node->hj_CurTuple = NULL;
1469 : :
7801 tgl@sss.pgh.pa.us 1470 : 1117 : node->hj_MatchedOuter = false;
6712 1471 : 1117 : node->hj_FirstOuterTupleSlot = NULL;
1472 : :
1473 : : /*
1474 : : * if chgParam of subnode is not null then plan will be re-scanned by
1475 : : * first ExecProcNode.
1476 : : */
647 1477 [ + + ]: 1117 : if (outerPlan->chgParam == NULL)
1478 : 744 : ExecReScan(outerPlan);
9557 vadim4o@yahoo.com 1479 : 1117 : }
1480 : :
1481 : : void
2307 andres@anarazel.de 1482 : 13175 : ExecShutdownHashJoin(HashJoinState *node)
1483 : : {
1484 [ + + ]: 13175 : if (node->hj_HashTable)
1485 : : {
1486 : : /*
1487 : : * Detach from shared state before DSM memory goes away. This makes
1488 : : * sure that we don't have any pointers into DSM memory by the time
1489 : : * ExecEndHashJoin runs.
1490 : : */
1491 : 9826 : ExecHashTableDetachBatch(node->hj_HashTable);
1492 : 9826 : ExecHashTableDetach(node->hj_HashTable);
1493 : : }
1494 : 13175 : }
1495 : :
1496 : : static void
1497 : 93 : ExecParallelHashJoinPartitionOuter(HashJoinState *hjstate)
1498 : : {
1499 : 93 : PlanState *outerState = outerPlanState(hjstate);
1500 : 93 : ExprContext *econtext = hjstate->js.ps.ps_ExprContext;
1501 : 93 : HashJoinTable hashtable = hjstate->hj_HashTable;
1502 : : TupleTableSlot *slot;
1503 : : uint32 hashvalue;
1504 : : int i;
1505 : :
1506 [ - + ]: 93 : Assert(hjstate->hj_FirstOuterTupleSlot == NULL);
1507 : :
1508 : : /* Execute outer plan, writing all tuples to shared tuplestores. */
1509 : : for (;;)
1510 : : {
1511 : 600105 : slot = ExecProcNode(outerState);
1512 [ + + + + ]: 600105 : if (TupIsNull(slot))
1513 : : break;
1514 : 600012 : econtext->ecxt_outertuple = slot;
1515 [ + - ]: 600012 : if (ExecHashGetHashValue(hashtable, econtext,
1516 : : hjstate->hj_OuterHashKeys,
1517 : : true, /* outer tuple */
1989 tmunro@postgresql.or 1518 : 600012 : HJ_FILL_OUTER(hjstate),
1519 : : &hashvalue))
1520 : : {
1521 : : int batchno;
1522 : : int bucketno;
1523 : : bool shouldFree;
1977 andres@anarazel.de 1524 : 600012 : MinimalTuple mintup = ExecFetchSlotMinimalTuple(slot, &shouldFree);
1525 : :
2307 1526 : 600012 : ExecHashGetBucketAndBatch(hashtable, hashvalue, &bucketno,
1527 : : &batchno);
1528 : 600012 : sts_puttuple(hashtable->batches[batchno].outer_tuples,
1529 : : &hashvalue, mintup);
1530 : :
1977 1531 [ + - ]: 600012 : if (shouldFree)
1532 : 600012 : heap_free_minimal_tuple(mintup);
1533 : : }
2307 1534 [ - + ]: 600012 : CHECK_FOR_INTERRUPTS();
1535 : : }
1536 : :
1537 : : /* Make sure all outer partitions are readable by any backend. */
1538 [ + + ]: 981 : for (i = 0; i < hashtable->nbatch; ++i)
1539 : 888 : sts_end_write(hashtable->batches[i].outer_tuples);
1540 : 93 : }
1541 : :
1542 : : void
1543 : 60 : ExecHashJoinEstimate(HashJoinState *state, ParallelContext *pcxt)
1544 : : {
1545 : 60 : shm_toc_estimate_chunk(&pcxt->estimator, sizeof(ParallelHashJoinState));
1546 : 60 : shm_toc_estimate_keys(&pcxt->estimator, 1);
1547 : 60 : }
1548 : :
1549 : : void
1550 : 60 : ExecHashJoinInitializeDSM(HashJoinState *state, ParallelContext *pcxt)
1551 : : {
1552 : 60 : int plan_node_id = state->js.ps.plan->plan_node_id;
1553 : : HashState *hashNode;
1554 : : ParallelHashJoinState *pstate;
1555 : :
1556 : : /*
1557 : : * Disable shared hash table mode if we failed to create a real DSM
1558 : : * segment, because that means that we don't have a DSA area to work with.
1559 : : */
1560 [ - + ]: 60 : if (pcxt->seg == NULL)
2307 andres@anarazel.de 1561 :UBC 0 : return;
1562 : :
2307 andres@anarazel.de 1563 :CBC 60 : ExecSetExecProcNode(&state->js.ps, ExecParallelHashJoin);
1564 : :
1565 : : /*
1566 : : * Set up the state needed to coordinate access to the shared hash
1567 : : * table(s), using the plan node ID as the toc key.
1568 : : */
1569 : 60 : pstate = shm_toc_allocate(pcxt->toc, sizeof(ParallelHashJoinState));
1570 : 60 : shm_toc_insert(pcxt->toc, plan_node_id, pstate);
1571 : :
1572 : : /*
1573 : : * Set up the shared hash join state with no batches initially.
1574 : : * ExecHashTableCreate() will prepare at least one later and set nbatch
1575 : : * and space_allowed.
1576 : : */
1577 : 60 : pstate->nbatch = 0;
1578 : 60 : pstate->space_allowed = 0;
1579 : 60 : pstate->batches = InvalidDsaPointer;
1580 : 60 : pstate->old_batches = InvalidDsaPointer;
1581 : 60 : pstate->nbuckets = 0;
1582 : 60 : pstate->growth = PHJ_GROWTH_OK;
1583 : 60 : pstate->chunk_work_queue = InvalidDsaPointer;
1584 : 60 : pg_atomic_init_u32(&pstate->distributor, 0);
1585 : 60 : pstate->nparticipants = pcxt->nworkers + 1;
1586 : 60 : pstate->total_tuples = 0;
1587 : 60 : LWLockInitialize(&pstate->lock,
1588 : : LWTRANCHE_PARALLEL_HASH_JOIN);
1589 : 60 : BarrierInit(&pstate->build_barrier, 0);
1590 : 60 : BarrierInit(&pstate->grow_batches_barrier, 0);
1591 : 60 : BarrierInit(&pstate->grow_buckets_barrier, 0);
1592 : :
1593 : : /* Set up the space we'll use for shared temporary files. */
1594 : 60 : SharedFileSetInit(&pstate->fileset, pcxt->seg);
1595 : :
1596 : : /* Initialize the shared state in the hash node. */
1597 : 60 : hashNode = (HashState *) innerPlanState(state);
1598 : 60 : hashNode->parallel_state = pstate;
1599 : : }
1600 : :
1601 : : /* ----------------------------------------------------------------
1602 : : * ExecHashJoinReInitializeDSM
1603 : : *
1604 : : * Reset shared state before beginning a fresh scan.
1605 : : * ----------------------------------------------------------------
1606 : : */
1607 : : void
573 pg@bowt.ie 1608 : 24 : ExecHashJoinReInitializeDSM(HashJoinState *state, ParallelContext *pcxt)
1609 : : {
2307 andres@anarazel.de 1610 : 24 : int plan_node_id = state->js.ps.plan->plan_node_id;
1611 : : ParallelHashJoinState *pstate =
331 tgl@sss.pgh.pa.us 1612 : 24 : shm_toc_lookup(pcxt->toc, plan_node_id, false);
1613 : :
1614 : : /*
1615 : : * It would be possible to reuse the shared hash table in single-batch
1616 : : * cases by resetting and then fast-forwarding build_barrier to
1617 : : * PHJ_BUILD_FREE and batch 0's batch_barrier to PHJ_BATCH_PROBE, but
1618 : : * currently shared hash tables are already freed by now (by the last
1619 : : * participant to detach from the batch). We could consider keeping it
1620 : : * around for single-batch joins. We'd also need to adjust
1621 : : * finalize_plan() so that it doesn't record a dummy dependency for
1622 : : * Parallel Hash nodes, preventing the rescan optimization. For now we
1623 : : * don't try.
1624 : : */
1625 : :
1626 : : /* Detach, freeing any remaining shared memory. */
2307 andres@anarazel.de 1627 [ - + ]: 24 : if (state->hj_HashTable != NULL)
1628 : : {
2307 andres@anarazel.de 1629 :UBC 0 : ExecHashTableDetachBatch(state->hj_HashTable);
1630 : 0 : ExecHashTableDetach(state->hj_HashTable);
1631 : : }
1632 : :
1633 : : /* Clear any shared batch files. */
2307 andres@anarazel.de 1634 :CBC 24 : SharedFileSetDeleteAll(&pstate->fileset);
1635 : :
1636 : : /* Reset build_barrier to PHJ_BUILD_ELECT so we can go around again. */
1637 : 24 : BarrierInit(&pstate->build_barrier, 0);
1638 : 24 : }
1639 : :
1640 : : void
1641 : 153 : ExecHashJoinInitializeWorker(HashJoinState *state,
1642 : : ParallelWorkerContext *pwcxt)
1643 : : {
1644 : : HashState *hashNode;
1645 : 153 : int plan_node_id = state->js.ps.plan->plan_node_id;
1646 : : ParallelHashJoinState *pstate =
331 tgl@sss.pgh.pa.us 1647 : 153 : shm_toc_lookup(pwcxt->toc, plan_node_id, false);
1648 : :
1649 : : /* Attach to the space for shared temporary files. */
2307 andres@anarazel.de 1650 : 153 : SharedFileSetAttach(&pstate->fileset, pwcxt->seg);
1651 : :
1652 : : /* Attach to the shared state in the hash node. */
1653 : 153 : hashNode = (HashState *) innerPlanState(state);
1654 : 153 : hashNode->parallel_state = pstate;
1655 : :
1656 : 153 : ExecSetExecProcNode(&state->js.ps, ExecParallelHashJoin);
1657 : 153 : }
|