LCOV - differential code coverage report
Current view: top level - src/backend/executor - nodeHashjoin.c (source / functions) Coverage Total Hit UNC LBC UIC UBC GBC GIC GNC CBC EUB ECB DUB DCB
Current: Differential Code Coverage HEAD vs 15 Lines: 91.7 % 480 440 2 8 29 1 10 270 42 118 26 292 3 22
Current Date: 2023-04-08 15:15:32 Functions: 100.0 % 18 18 17 1 17 1
Baseline: 15
Baseline Date: 2023-04-08 15:09:40
Legend: Lines: hit not hit

           TLA  Line data    Source code
       1                 : /*-------------------------------------------------------------------------
       2                 :  *
       3                 :  * nodeHashjoin.c
       4                 :  *    Routines to handle hash join nodes
       5                 :  *
       6                 :  * Portions Copyright (c) 1996-2023, PostgreSQL Global Development Group
       7                 :  * Portions Copyright (c) 1994, Regents of the University of California
       8                 :  *
       9                 :  *
      10                 :  * IDENTIFICATION
      11                 :  *    src/backend/executor/nodeHashjoin.c
      12                 :  *
      13                 :  * PARALLELISM
      14                 :  *
      15                 :  * Hash joins can participate in parallel query execution in several ways.  A
      16                 :  * parallel-oblivious hash join is one where the node is unaware that it is
      17                 :  * part of a parallel plan.  In this case, a copy of the inner plan is used to
      18                 :  * build a copy of the hash table in every backend, and the outer plan could
      19                 :  * either be built from a partial or complete path, so that the results of the
      20                 :  * hash join are correspondingly either partial or complete.  A parallel-aware
      21                 :  * hash join is one that behaves differently, coordinating work between
      22                 :  * backends, and appears as Parallel Hash Join in EXPLAIN output.  A Parallel
      23                 :  * Hash Join always appears with a Parallel Hash node.
      24                 :  *
      25                 :  * Parallel-aware hash joins use the same per-backend state machine to track
      26                 :  * progress through the hash join algorithm as parallel-oblivious hash joins.
      27                 :  * In a parallel-aware hash join, there is also a shared state machine that
      28                 :  * co-operating backends use to synchronize their local state machines and
      29                 :  * program counters.  The shared state machine is managed with a Barrier IPC
      30                 :  * primitive.  When all attached participants arrive at a barrier, the phase
      31                 :  * advances and all waiting participants are released.
      32                 :  *
      33                 :  * When a participant begins working on a parallel hash join, it must first
      34                 :  * figure out how much progress has already been made, because participants
      35                 :  * don't wait for each other to begin.  For this reason there are switch
      36                 :  * statements at key points in the code where we have to synchronize our local
      37                 :  * state machine with the phase, and then jump to the correct part of the
      38                 :  * algorithm so that we can get started.
      39                 :  *
      40                 :  * One barrier called build_barrier is used to coordinate the hashing phases.
      41                 :  * The phase is represented by an integer which begins at zero and increments
      42                 :  * one by one, but in the code it is referred to by symbolic names as follows.
      43                 :  * An asterisk indicates a phase that is performed by a single arbitrarily
      44                 :  * chosen process.
      45                 :  *
      46                 :  *   PHJ_BUILD_ELECT                 -- initial state
      47                 :  *   PHJ_BUILD_ALLOCATE*             -- one sets up the batches and table 0
      48                 :  *   PHJ_BUILD_HASH_INNER            -- all hash the inner rel
      49                 :  *   PHJ_BUILD_HASH_OUTER            -- (multi-batch only) all hash the outer
      50                 :  *   PHJ_BUILD_RUN                   -- building done, probing can begin
      51                 :  *   PHJ_BUILD_FREE*                 -- all work complete, one frees batches
      52                 :  *
      53                 :  * While in the phase PHJ_BUILD_HASH_INNER a separate pair of barriers may
      54                 :  * be used repeatedly as required to coordinate expansions in the number of
      55                 :  * batches or buckets.  Their phases are as follows:
      56                 :  *
      57                 :  *   PHJ_GROW_BATCHES_ELECT          -- initial state
      58                 :  *   PHJ_GROW_BATCHES_REALLOCATE*    -- one allocates new batches
      59                 :  *   PHJ_GROW_BATCHES_REPARTITION    -- all repartition
      60                 :  *   PHJ_GROW_BATCHES_DECIDE*        -- one detects skew and cleans up
      61                 :  *   PHJ_GROW_BATCHES_FINISH         -- finished one growth cycle
      62                 :  *
      63                 :  *   PHJ_GROW_BUCKETS_ELECT          -- initial state
      64                 :  *   PHJ_GROW_BUCKETS_REALLOCATE*    -- one allocates new buckets
      65                 :  *   PHJ_GROW_BUCKETS_REINSERT       -- all insert tuples
      66                 :  *
      67                 :  * If the planner got the number of batches and buckets right, those won't be
      68                 :  * necessary, but on the other hand we might finish up needing to expand the
      69                 :  * buckets or batches multiple times while hashing the inner relation to stay
      70                 :  * within our memory budget and load factor target.  For that reason it's a
      71                 :  * separate pair of barriers using circular phases.
      72                 :  *
      73                 :  * The PHJ_BUILD_HASH_OUTER phase is required only for multi-batch joins,
      74                 :  * because we need to divide the outer relation into batches up front in order
      75                 :  * to be able to process batches entirely independently.  In contrast, the
      76                 :  * parallel-oblivious algorithm simply throws tuples 'forward' to 'later'
      77                 :  * batches whenever it encounters them while scanning and probing, which it
      78                 :  * can do because it processes batches in serial order.
      79                 :  *
      80                 :  * Once PHJ_BUILD_RUN is reached, backends then split up and process
      81                 :  * different batches, or gang up and work together on probing batches if there
      82                 :  * aren't enough to go around.  For each batch there is a separate barrier
      83                 :  * with the following phases:
      84                 :  *
      85                 :  *  PHJ_BATCH_ELECT          -- initial state
      86                 :  *  PHJ_BATCH_ALLOCATE*      -- one allocates buckets
      87                 :  *  PHJ_BATCH_LOAD           -- all load the hash table from disk
      88                 :  *  PHJ_BATCH_PROBE          -- all probe
      89                 :  *  PHJ_BATCH_SCAN*          -- one does right/right-anti/full unmatched scan
      90                 :  *  PHJ_BATCH_FREE*          -- one frees memory
      91                 :  *
      92                 :  * Batch 0 is a special case, because it starts out in phase
      93                 :  * PHJ_BATCH_PROBE; populating batch 0's hash table is done during
      94                 :  * PHJ_BUILD_HASH_INNER so we can skip loading.
      95                 :  *
      96                 :  * Initially we try to plan for a single-batch hash join using the combined
      97                 :  * hash_mem of all participants to create a large shared hash table.  If that
      98                 :  * turns out either at planning or execution time to be impossible then we
      99                 :  * fall back to regular hash_mem sized hash tables.
     100                 :  *
     101                 :  * To avoid deadlocks, we never wait for any barrier unless it is known that
     102                 :  * all other backends attached to it are actively executing the node or have
     103                 :  * finished.  Practically, that means that we never emit a tuple while attached
     104                 :  * to a barrier, unless the barrier has reached a phase that means that no
     105                 :  * process will wait on it again.  We emit tuples while attached to the build
     106                 :  * barrier in phase PHJ_BUILD_RUN, and to a per-batch barrier in phase
     107                 :  * PHJ_BATCH_PROBE.  These are advanced to PHJ_BUILD_FREE and PHJ_BATCH_SCAN
     108                 :  * respectively without waiting, using BarrierArriveAndDetach() and
     109                 :  * BarrierArriveAndDetachExceptLast() respectively.  The last to detach
     110                 :  * receives a different return value so that it knows that it's safe to
     111                 :  * clean up.  Any straggler process that attaches after that phase is reached
     112                 :  * will see that it's too late to participate or access the relevant shared
     113                 :  * memory objects.
     114                 :  *
     115                 :  *-------------------------------------------------------------------------
     116                 :  */
     117                 : 
     118                 : #include "postgres.h"
     119                 : 
     120                 : #include "access/htup_details.h"
     121                 : #include "access/parallel.h"
     122                 : #include "executor/executor.h"
     123                 : #include "executor/hashjoin.h"
     124                 : #include "executor/nodeHash.h"
     125                 : #include "executor/nodeHashjoin.h"
     126                 : #include "miscadmin.h"
     127                 : #include "pgstat.h"
     128                 : #include "utils/memutils.h"
     129                 : #include "utils/sharedtuplestore.h"
     130                 : 
     131                 : 
     132                 : /*
     133                 :  * States of the ExecHashJoin state machine
     134                 :  */
     135                 : #define HJ_BUILD_HASHTABLE      1
     136                 : #define HJ_NEED_NEW_OUTER       2
     137                 : #define HJ_SCAN_BUCKET          3
     138                 : #define HJ_FILL_OUTER_TUPLE     4
     139                 : #define HJ_FILL_INNER_TUPLES    5
     140                 : #define HJ_NEED_NEW_BATCH       6
     141                 : 
     142                 : /* Returns true if doing null-fill on outer relation */
     143                 : #define HJ_FILL_OUTER(hjstate)  ((hjstate)->hj_NullInnerTupleSlot != NULL)
     144                 : /* Returns true if doing null-fill on inner relation */
     145                 : #define HJ_FILL_INNER(hjstate)  ((hjstate)->hj_NullOuterTupleSlot != NULL)
     146                 : 
     147                 : static TupleTableSlot *ExecHashJoinOuterGetTuple(PlanState *outerNode,
     148                 :                                                  HashJoinState *hjstate,
     149                 :                                                  uint32 *hashvalue);
     150                 : static TupleTableSlot *ExecParallelHashJoinOuterGetTuple(PlanState *outerNode,
     151                 :                                                          HashJoinState *hjstate,
     152                 :                                                          uint32 *hashvalue);
     153                 : static TupleTableSlot *ExecHashJoinGetSavedTuple(HashJoinState *hjstate,
     154                 :                                                  BufFile *file,
     155                 :                                                  uint32 *hashvalue,
     156                 :                                                  TupleTableSlot *tupleSlot);
     157                 : static bool ExecHashJoinNewBatch(HashJoinState *hjstate);
     158                 : static bool ExecParallelHashJoinNewBatch(HashJoinState *hjstate);
     159                 : static void ExecParallelHashJoinPartitionOuter(HashJoinState *hjstate);
     160                 : 
     161                 : 
     162                 : /* ----------------------------------------------------------------
     163                 :  *      ExecHashJoinImpl
     164                 :  *
     165                 :  *      This function implements the Hybrid Hashjoin algorithm.  It is marked
     166                 :  *      with an always-inline attribute so that ExecHashJoin() and
     167                 :  *      ExecParallelHashJoin() can inline it.  Compilers that respect the
     168                 :  *      attribute should create versions specialized for parallel == true and
     169                 :  *      parallel == false with unnecessary branches removed.
     170                 :  *
     171                 :  *      Note: the relation we build hash table on is the "inner"
     172                 :  *            the other one is "outer".
     173                 :  * ----------------------------------------------------------------
     174                 :  */
     175                 : static pg_attribute_always_inline TupleTableSlot *
     176 GIC     5030235 : ExecHashJoinImpl(PlanState *pstate, bool parallel)
     177                 : {
     178         5030235 :     HashJoinState *node = castNode(HashJoinState, pstate);
     179                 :     PlanState  *outerNode;
     180                 :     HashState  *hashNode;
     181 ECB             :     ExprState  *joinqual;
     182                 :     ExprState  *otherqual;
     183                 :     ExprContext *econtext;
     184                 :     HashJoinTable hashtable;
     185                 :     TupleTableSlot *outerTupleSlot;
     186                 :     uint32      hashvalue;
     187                 :     int         batchno;
     188                 :     ParallelHashJoinState *parallel_state;
     189                 : 
     190                 :     /*
     191                 :      * get information from HashJoin node
     192                 :      */
     193 GIC     5030235 :     joinqual = node->js.joinqual;
     194         5030235 :     otherqual = node->js.ps.qual;
     195         5030235 :     hashNode = (HashState *) innerPlanState(node);
     196         5030235 :     outerNode = outerPlanState(node);
     197         5030235 :     hashtable = node->hj_HashTable;
     198 CBC     5030235 :     econtext = node->js.ps.ps_ExprContext;
     199         5030235 :     parallel_state = hashNode->parallel_state;
     200 ECB             : 
     201                 :     /*
     202                 :      * Reset per-tuple memory context to free any expression evaluation
     203                 :      * storage allocated in the previous tuple cycle.
     204                 :      */
     205 GIC     5030235 :     ResetExprContext(econtext);
     206                 : 
     207                 :     /*
     208                 :      * run the hash join state machine
     209                 :      */
     210 ECB             :     for (;;)
     211                 :     {
     212                 :         /*
     213                 :          * It's possible to iterate this loop many times before returning a
     214                 :          * tuple, in some pathological cases such as needing to move much of
     215                 :          * the current batch to a later batch.  So let's check for interrupts
     216                 :          * each time through.
     217                 :          */
     218 GIC    18182291 :         CHECK_FOR_INTERRUPTS();
     219                 : 
     220        18182291 :         switch (node->hj_JoinState)
     221                 :         {
     222           11658 :             case HJ_BUILD_HASHTABLE:
     223 ECB             : 
     224                 :                 /*
     225                 :                  * First time through: build hash table for inner relation.
     226                 :                  */
     227 CBC       11658 :                 Assert(hashtable == NULL);
     228                 : 
     229                 :                 /*
     230                 :                  * If the outer relation is completely empty, and it's not
     231                 :                  * right/right-anti/full join, we can quit without building
     232                 :                  * the hash table.  However, for an inner join it is only a
     233                 :                  * win to check this when the outer relation's startup cost is
     234                 :                  * less than the projected cost of building the hash table.
     235                 :                  * Otherwise it's best to build the hash table first and see
     236                 :                  * if the inner relation is empty.  (When it's a left join, we
     237                 :                  * should always make this check, since we aren't going to be
     238                 :                  * able to skip the join on the strength of an empty inner
     239                 :                  * relation anyway.)
     240                 :                  *
     241                 :                  * If we are rescanning the join, we make use of information
     242                 :                  * gained on the previous scan: don't bother to try the
     243                 :                  * prefetch if the previous scan found the outer relation
     244                 :                  * nonempty. This is not 100% reliable since with new
     245                 :                  * parameters the outer relation might yield different
     246                 :                  * results, but it's a good heuristic.
     247                 :                  *
     248                 :                  * The only way to make the check is to try to fetch a tuple
     249                 :                  * from the outer plan node.  If we succeed, we have to stash
     250                 :                  * it away for later consumption by ExecHashJoinOuterGetTuple.
     251                 :                  */
     252 GIC       11658 :                 if (HJ_FILL_INNER(node))
     253                 :                 {
     254                 :                     /* no chance to not build the hash table */
     255            2505 :                     node->hj_FirstOuterTupleSlot = NULL;
     256                 :                 }
     257 CBC        9153 :                 else if (parallel)
     258                 :                 {
     259                 :                     /*
     260 ECB             :                      * The empty-outer optimization is not implemented for
     261                 :                      * shared hash tables, because no one participant can
     262                 :                      * determine that there are no outer tuples, and it's not
     263                 :                      * yet clear that it's worth the synchronization overhead
     264                 :                      * of reaching consensus to figure that out.  So we have
     265                 :                      * to build the hash table.
     266                 :                      */
     267 GIC         162 :                     node->hj_FirstOuterTupleSlot = NULL;
     268                 :                 }
     269            8991 :                 else if (HJ_FILL_OUTER(node) ||
     270            6998 :                          (outerNode->plan->startup_cost < hashNode->ps.plan->total_cost &&
     271            6599 :                           !node->hj_OuterNotEmpty))
     272 ECB             :                 {
     273 GIC        8225 :                     node->hj_FirstOuterTupleSlot = ExecProcNode(outerNode);
     274 CBC        8225 :                     if (TupIsNull(node->hj_FirstOuterTupleSlot))
     275 ECB             :                     {
     276 CBC        1772 :                         node->hj_OuterNotEmpty = false;
     277 GIC        1772 :                         return NULL;
     278 ECB             :                     }
     279                 :                     else
     280 GIC        6453 :                         node->hj_OuterNotEmpty = true;
     281 ECB             :                 }
     282                 :                 else
     283 GIC         766 :                     node->hj_FirstOuterTupleSlot = NULL;
     284                 : 
     285 ECB             :                 /*
     286                 :                  * Create the hash table.  If using Parallel Hash, then
     287                 :                  * whoever gets here first will create the hash table and any
     288                 :                  * later arrivals will merely attach to it.
     289                 :                  */
     290 GIC        9886 :                 hashtable = ExecHashTableCreate(hashNode,
     291                 :                                                 node->hj_HashOperators,
     292                 :                                                 node->hj_Collations,
     293            9886 :                                                 HJ_FILL_INNER(node));
     294            9886 :                 node->hj_HashTable = hashtable;
     295 ECB             : 
     296                 :                 /*
     297                 :                  * Execute the Hash node, to build the hash table.  If using
     298                 :                  * Parallel Hash, then we'll try to help hashing unless we
     299                 :                  * arrived too late.
     300                 :                  */
     301 GIC        9886 :                 hashNode->hashtable = hashtable;
     302            9886 :                 (void) MultiExecProcNode((PlanState *) hashNode);
     303                 : 
     304                 :                 /*
     305                 :                  * If the inner relation is completely empty, and we're not
     306 ECB             :                  * doing a left outer join, we can quit without scanning the
     307                 :                  * outer relation.
     308                 :                  */
     309 GIC        9886 :                 if (hashtable->totalTuples == 0 && !HJ_FILL_OUTER(node))
     310                 :                 {
     311             521 :                     if (parallel)
     312                 :                     {
     313                 :                         /*
     314                 :                          * Advance the build barrier to PHJ_BUILD_RUN before
     315                 :                          * proceeding so we can negotiate resource cleanup.
     316                 :                          */
     317 UIC           0 :                         Barrier    *build_barrier = &parallel_state->build_barrier;
     318                 : 
     319 UNC           0 :                         while (BarrierPhase(build_barrier) < PHJ_BUILD_RUN)
     320 UIC           0 :                             BarrierArriveAndWait(build_barrier, 0);
     321 EUB             :                     }
     322 GIC         521 :                     return NULL;
     323 EUB             :                 }
     324                 : 
     325                 :                 /*
     326 ECB             :                  * need to remember whether nbatch has increased since we
     327                 :                  * began scanning the outer relation
     328                 :                  */
     329 GIC        9365 :                 hashtable->nbatch_outstart = hashtable->nbatch;
     330                 : 
     331                 :                 /*
     332                 :                  * Reset OuterNotEmpty for scan.  (It's OK if we fetched a
     333 ECB             :                  * tuple above, because ExecHashJoinOuterGetTuple will
     334                 :                  * immediately set it again.)
     335                 :                  */
     336 GIC        9365 :                 node->hj_OuterNotEmpty = false;
     337                 : 
     338            9365 :                 if (parallel)
     339             189 :                 {
     340 ECB             :                     Barrier    *build_barrier;
     341                 : 
     342 CBC         189 :                     build_barrier = &parallel_state->build_barrier;
     343 GNC         189 :                     Assert(BarrierPhase(build_barrier) == PHJ_BUILD_HASH_OUTER ||
     344                 :                            BarrierPhase(build_barrier) == PHJ_BUILD_RUN ||
     345                 :                            BarrierPhase(build_barrier) == PHJ_BUILD_FREE);
     346             189 :                     if (BarrierPhase(build_barrier) == PHJ_BUILD_HASH_OUTER)
     347 ECB             :                     {
     348                 :                         /*
     349                 :                          * If multi-batch, we need to hash the outer relation
     350                 :                          * up front.
     351                 :                          */
     352 GIC         151 :                         if (hashtable->nbatch > 1)
     353              87 :                             ExecParallelHashJoinPartitionOuter(node);
     354             151 :                         BarrierArriveAndWait(build_barrier,
     355                 :                                              WAIT_EVENT_HASH_BUILD_HASH_OUTER);
     356 ECB             :                     }
     357 GNC          38 :                     else if (BarrierPhase(build_barrier) == PHJ_BUILD_FREE)
     358 ECB             :                     {
     359                 :                         /*
     360                 :                          * If we attached so late that the job is finished and
     361                 :                          * the batch state has been freed, we can return
     362                 :                          * immediately.
     363                 :                          */
     364 UIC           0 :                         return NULL;
     365                 :                     }
     366                 : 
     367                 :                     /* Each backend should now select a batch to work on. */
     368 GNC         189 :                     Assert(BarrierPhase(build_barrier) == PHJ_BUILD_RUN);
     369 GIC         189 :                     hashtable->curbatch = -1;
     370             189 :                     node->hj_JoinState = HJ_NEED_NEW_BATCH;
     371                 : 
     372 CBC         189 :                     continue;
     373 ECB             :                 }
     374                 :                 else
     375 GIC        9176 :                     node->hj_JoinState = HJ_NEED_NEW_OUTER;
     376 ECB             : 
     377                 :                 /* FALL THRU */
     378                 : 
     379 CBC     8723214 :             case HJ_NEED_NEW_OUTER:
     380                 : 
     381                 :                 /*
     382                 :                  * We don't have an outer tuple, try to get the next one
     383 ECB             :                  */
     384 GIC     8723214 :                 if (parallel)
     385                 :                     outerTupleSlot =
     386         1080511 :                         ExecParallelHashJoinOuterGetTuple(outerNode, node,
     387                 :                                                           &hashvalue);
     388 ECB             :                 else
     389                 :                     outerTupleSlot =
     390 CBC     7642703 :                         ExecHashJoinOuterGetTuple(outerNode, node, &hashvalue);
     391                 : 
     392 GIC     8723214 :                 if (TupIsNull(outerTupleSlot))
     393                 :                 {
     394 ECB             :                     /* end of batch, or maybe whole join */
     395 GIC       10589 :                     if (HJ_FILL_INNER(node))
     396 ECB             :                     {
     397                 :                         /* set up to scan for unmatched inner tuples */
     398 GNC        2427 :                         if (parallel)
     399                 :                         {
     400                 :                             /*
     401                 :                              * Only one process is currently allow to handle
     402                 :                              * each batch's unmatched tuples, in a parallel
     403                 :                              * join.
     404                 :                              */
     405              43 :                             if (ExecParallelPrepHashTableForUnmatched(node))
     406              30 :                                 node->hj_JoinState = HJ_FILL_INNER_TUPLES;
     407                 :                             else
     408              13 :                                 node->hj_JoinState = HJ_NEED_NEW_BATCH;
     409                 :                         }
     410                 :                         else
     411                 :                         {
     412            2384 :                             ExecPrepHashTableForUnmatched(node);
     413            2384 :                             node->hj_JoinState = HJ_FILL_INNER_TUPLES;
     414                 :                         }
     415                 :                     }
     416                 :                     else
     417 CBC        8162 :                         node->hj_JoinState = HJ_NEED_NEW_BATCH;
     418 GIC       10589 :                     continue;
     419                 :                 }
     420                 : 
     421         8712625 :                 econtext->ecxt_outertuple = outerTupleSlot;
     422         8712625 :                 node->hj_MatchedOuter = false;
     423                 : 
     424 ECB             :                 /*
     425                 :                  * Find the corresponding bucket for this tuple in the main
     426                 :                  * hash table or skew hash table.
     427                 :                  */
     428 GIC     8712625 :                 node->hj_CurHashValue = hashvalue;
     429         8712625 :                 ExecHashGetBucketAndBatch(hashtable, hashvalue,
     430                 :                                           &node->hj_CurBucketNo, &batchno);
     431 CBC     8712625 :                 node->hj_CurSkewBucketNo = ExecHashGetSkewBucket(hashtable,
     432 ECB             :                                                                  hashvalue);
     433 GIC     8712625 :                 node->hj_CurTuple = NULL;
     434                 : 
     435                 :                 /*
     436 ECB             :                  * The tuple might not belong to the current batch (where
     437                 :                  * "current batch" includes the skew buckets if any).
     438                 :                  */
     439 GIC     8712625 :                 if (batchno != hashtable->curbatch &&
     440 CBC      735696 :                     node->hj_CurSkewBucketNo == INVALID_SKEW_BUCKET_NO)
     441          735096 :                 {
     442                 :                     bool        shouldFree;
     443 GIC      735096 :                     MinimalTuple mintuple = ExecFetchSlotMinimalTuple(outerTupleSlot,
     444                 :                                                                       &shouldFree);
     445                 : 
     446                 :                     /*
     447 ECB             :                      * Need to postpone this outer tuple to a later batch.
     448                 :                      * Save it in the corresponding outer-batch file.
     449                 :                      */
     450 CBC      735096 :                     Assert(parallel_state == NULL);
     451 GIC      735096 :                     Assert(batchno > hashtable->curbatch);
     452 CBC      735096 :                     ExecHashJoinSaveTuple(mintuple, hashvalue,
     453 GIC      735096 :                                           &hashtable->outerBatchFile[batchno]);
     454                 : 
     455          735096 :                     if (shouldFree)
     456          735096 :                         heap_free_minimal_tuple(mintuple);
     457                 : 
     458 ECB             :                     /* Loop around, staying in HJ_NEED_NEW_OUTER state */
     459 CBC      735096 :                     continue;
     460 ECB             :                 }
     461                 : 
     462                 :                 /* OK, let's scan the bucket for matches */
     463 GIC     7977529 :                 node->hj_JoinState = HJ_SCAN_BUCKET;
     464                 : 
     465                 :                 /* FALL THRU */
     466                 : 
     467        11062550 :             case HJ_SCAN_BUCKET:
     468                 : 
     469 ECB             :                 /*
     470                 :                  * Scan the selected hash bucket for matches to current outer
     471                 :                  */
     472 CBC    11062550 :                 if (parallel)
     473                 :                 {
     474         2100024 :                     if (!ExecParallelScanHashBucket(node, econtext))
     475 ECB             :                     {
     476                 :                         /* out of matches; check for possible outer-join fill */
     477 GIC     1080012 :                         node->hj_JoinState = HJ_FILL_OUTER_TUPLE;
     478 CBC     1080012 :                         continue;
     479                 :                     }
     480                 :                 }
     481                 :                 else
     482 ECB             :                 {
     483 GIC     8962526 :                     if (!ExecScanHashBucket(node, econtext))
     484                 :                     {
     485                 :                         /* out of matches; check for possible outer-join fill */
     486 CBC     4844799 :                         node->hj_JoinState = HJ_FILL_OUTER_TUPLE;
     487 GIC     4844799 :                         continue;
     488                 :                     }
     489                 :                 }
     490                 : 
     491 ECB             :                 /*
     492                 :                  * We've got a match, but still need to test non-hashed quals.
     493                 :                  * ExecScanHashBucket already set up all the state needed to
     494                 :                  * call ExecQual.
     495                 :                  *
     496                 :                  * If we pass the qual, then save state for next call and have
     497                 :                  * ExecProject form the projection, store it in the tuple
     498                 :                  * table, and return the slot.
     499                 :                  *
     500                 :                  * Only the joinquals determine tuple match status, but all
     501                 :                  * quals must pass to actually return the tuple.
     502                 :                  */
     503 GIC     5137739 :                 if (joinqual == NULL || ExecQual(joinqual, econtext))
     504                 :                 {
     505 CBC     5062757 :                     node->hj_MatchedOuter = true;
     506 ECB             : 
     507                 : 
     508                 :                     /*
     509                 :                      * This is really only needed if HJ_FILL_INNER(node), but
     510                 :                      * we'll avoid the branch and just set it always.
     511                 :                      */
     512 GNC     5062757 :                     if (!HeapTupleHeaderHasMatch(HJTUPLE_MINTUPLE(node->hj_CurTuple)))
     513 CBC     3070831 :                         HeapTupleHeaderSetMatch(HJTUPLE_MINTUPLE(node->hj_CurTuple));
     514                 : 
     515                 :                     /* In an antijoin, we never return a matched tuple */
     516 GIC     5062757 :                     if (node->js.jointype == JOIN_ANTI)
     517                 :                     {
     518          772331 :                         node->hj_JoinState = HJ_NEED_NEW_OUTER;
     519 CBC      772331 :                         continue;
     520 ECB             :                     }
     521                 : 
     522                 :                     /*
     523                 :                      * In a right-antijoin, we never return a matched tuple.
     524                 :                      * And we need to stay on the current outer tuple to
     525                 :                      * continue scanning the inner side for matches.
     526                 :                      */
     527 GNC     4290426 :                     if (node->js.jointype == JOIN_RIGHT_ANTI)
     528           14426 :                         continue;
     529                 : 
     530                 :                     /*
     531 ECB             :                      * If we only need to join to the first matching inner
     532                 :                      * tuple, then consider returning this one, but after that
     533                 :                      * continue with next outer tuple.
     534                 :                      */
     535 GIC     4276000 :                     if (node->js.single_match)
     536         1280353 :                         node->hj_JoinState = HJ_NEED_NEW_OUTER;
     537                 : 
     538         4276000 :                     if (otherqual == NULL || ExecQual(otherqual, econtext))
     539         4186260 :                         return ExecProject(node->js.ps.ps_ProjInfo);
     540                 :                     else
     541           89740 :                         InstrCountFiltered2(node, 1);
     542 ECB             :                 }
     543                 :                 else
     544 GIC       74982 :                     InstrCountFiltered1(node, 1);
     545          164722 :                 break;
     546                 : 
     547         5924811 :             case HJ_FILL_OUTER_TUPLE:
     548                 : 
     549                 :                 /*
     550 ECB             :                  * The current outer tuple has run out of matches, so check
     551                 :                  * whether to emit a dummy outer-join tuple.  Whether we emit
     552                 :                  * one or not, the next state is NEED_NEW_OUTER.
     553                 :                  */
     554 CBC     5924811 :                 node->hj_JoinState = HJ_NEED_NEW_OUTER;
     555                 : 
     556         5924811 :                 if (!node->hj_MatchedOuter &&
     557 GIC     3573372 :                     HJ_FILL_OUTER(node))
     558                 :                 {
     559 ECB             :                     /*
     560                 :                      * Generate a fake join tuple with nulls for the inner
     561                 :                      * tuple, and return it if it passes the non-join quals.
     562                 :                      */
     563 GIC      818897 :                     econtext->ecxt_innertuple = node->hj_NullInnerTupleSlot;
     564                 : 
     565          818897 :                     if (otherqual == NULL || ExecQual(otherqual, econtext))
     566          402058 :                         return ExecProject(node->js.ps.ps_ProjInfo);
     567                 :                     else
     568          416839 :                         InstrCountFiltered2(node, 1);
     569 ECB             :                 }
     570 GIC     5522753 :                 break;
     571 ECB             : 
     572 CBC      435988 :             case HJ_FILL_INNER_TUPLES:
     573                 : 
     574                 :                 /*
     575                 :                  * We have finished a batch, but we are doing
     576                 :                  * right/right-anti/full join, so any unmatched inner tuples
     577                 :                  * in the hashtable have to be emitted before we continue to
     578                 :                  * the next batch.
     579 ECB             :                  */
     580 GNC      811946 :                 if (!(parallel ? ExecParallelScanHashTableForUnmatched(node, econtext)
     581          375958 :                       : ExecScanHashTableForUnmatched(node, econtext)))
     582 ECB             :                 {
     583                 :                     /* no more unmatched tuples */
     584 GIC        2411 :                     node->hj_JoinState = HJ_NEED_NEW_BATCH;
     585 CBC        2411 :                     continue;
     586                 :                 }
     587 ECB             : 
     588                 :                 /*
     589                 :                  * Generate a fake join tuple with nulls for the outer tuple,
     590                 :                  * and return it if it passes the non-join quals.
     591                 :                  */
     592 GIC      433577 :                 econtext->ecxt_outertuple = node->hj_NullOuterTupleSlot;
     593                 : 
     594          433577 :                 if (otherqual == NULL || ExecQual(otherqual, econtext))
     595          430100 :                     return ExecProject(node->js.ps.ps_ProjInfo);
     596                 :                 else
     597 CBC        3477 :                     InstrCountFiltered2(node, 1);
     598            3477 :                 break;
     599                 : 
     600 GIC       10775 :             case HJ_NEED_NEW_BATCH:
     601 ECB             : 
     602                 :                 /*
     603                 :                  * Try to advance to next batch.  Done if there are no more.
     604                 :                  */
     605 GIC       10775 :                 if (parallel)
     606                 :                 {
     607             688 :                     if (!ExecParallelHashJoinNewBatch(node))
     608             189 :                         return NULL;    /* end of parallel-aware join */
     609 ECB             :                 }
     610                 :                 else
     611                 :                 {
     612 CBC       10087 :                     if (!ExecHashJoinNewBatch(node))
     613 GIC        9335 :                         return NULL;    /* end of parallel-oblivious join */
     614 ECB             :                 }
     615 CBC        1251 :                 node->hj_JoinState = HJ_NEED_NEW_OUTER;
     616 GIC        1251 :                 break;
     617 ECB             : 
     618 UIC           0 :             default:
     619               0 :                 elog(ERROR, "unrecognized hashjoin state: %d",
     620                 :                      (int) node->hj_JoinState);
     621                 :         }
     622 ECB             :     }
     623                 : }
     624                 : 
     625                 : /* ----------------------------------------------------------------
     626                 :  *      ExecHashJoin
     627                 :  *
     628                 :  *      Parallel-oblivious version.
     629                 :  * ----------------------------------------------------------------
     630                 :  */
     631                 : static TupleTableSlot *         /* return: a tuple or NULL */
     632 CBC     3890034 : ExecHashJoin(PlanState *pstate)
     633 ECB             : {
     634                 :     /*
     635 EUB             :      * On sufficiently smart compilers this should be inlined with the
     636                 :      * parallel-aware branches removed.
     637                 :      */
     638 GIC     3890034 :     return ExecHashJoinImpl(pstate, false);
     639                 : }
     640                 : 
     641                 : /* ----------------------------------------------------------------
     642                 :  *      ExecParallelHashJoin
     643                 :  *
     644                 :  *      Parallel-aware version.
     645                 :  * ----------------------------------------------------------------
     646                 :  */
     647                 : static TupleTableSlot *         /* return: a tuple or NULL */
     648         1140201 : ExecParallelHashJoin(PlanState *pstate)
     649 ECB             : {
     650                 :     /*
     651                 :      * On sufficiently smart compilers this should be inlined with the
     652                 :      * parallel-oblivious branches removed.
     653                 :      */
     654 GIC     1140201 :     return ExecHashJoinImpl(pstate, true);
     655 ECB             : }
     656                 : 
     657                 : /* ----------------------------------------------------------------
     658                 :  *      ExecInitHashJoin
     659                 :  *
     660                 :  *      Init routine for HashJoin node.
     661                 :  * ----------------------------------------------------------------
     662                 :  */
     663                 : HashJoinState *
     664 GIC       14214 : ExecInitHashJoin(HashJoin *node, EState *estate, int eflags)
     665 ECB             : {
     666                 :     HashJoinState *hjstate;
     667                 :     Plan       *outerNode;
     668                 :     Hash       *hashNode;
     669                 :     TupleDesc   outerDesc,
     670                 :                 innerDesc;
     671                 :     const TupleTableSlotOps *ops;
     672                 : 
     673                 :     /* check for unsupported flags */
     674 GIC       14214 :     Assert(!(eflags & (EXEC_FLAG_BACKWARD | EXEC_FLAG_MARK)));
     675                 : 
     676                 :     /*
     677                 :      * create state structure
     678                 :      */
     679           14214 :     hjstate = makeNode(HashJoinState);
     680           14214 :     hjstate->js.ps.plan = (Plan *) node;
     681 CBC       14214 :     hjstate->js.ps.state = estate;
     682                 : 
     683                 :     /*
     684                 :      * See ExecHashJoinInitializeDSM() and ExecHashJoinInitializeWorker()
     685                 :      * where this function may be replaced with a parallel version, if we
     686                 :      * managed to launch a parallel query.
     687                 :      */
     688 GIC       14214 :     hjstate->js.ps.ExecProcNode = ExecHashJoin;
     689           14214 :     hjstate->js.jointype = node->join.jointype;
     690                 : 
     691 ECB             :     /*
     692                 :      * Miscellaneous initialization
     693                 :      *
     694                 :      * create expression context for node
     695                 :      */
     696 CBC       14214 :     ExecAssignExprContext(estate, &hjstate->js.ps);
     697 ECB             : 
     698                 :     /*
     699                 :      * initialize child nodes
     700                 :      *
     701                 :      * Note: we could suppress the REWIND flag for the inner input, which
     702                 :      * would amount to betting that the hash will be a single batch.  Not
     703                 :      * clear if this would be a win or not.
     704                 :      */
     705 CBC       14214 :     outerNode = outerPlan(node);
     706           14214 :     hashNode = (Hash *) innerPlan(node);
     707                 : 
     708 GIC       14214 :     outerPlanState(hjstate) = ExecInitNode(outerNode, estate, eflags);
     709           14214 :     outerDesc = ExecGetResultType(outerPlanState(hjstate));
     710           14214 :     innerPlanState(hjstate) = ExecInitNode((Plan *) hashNode, estate, eflags);
     711           14214 :     innerDesc = ExecGetResultType(innerPlanState(hjstate));
     712                 : 
     713 ECB             :     /*
     714                 :      * Initialize result slot, type and projection.
     715                 :      */
     716 GIC       14214 :     ExecInitResultTupleSlotTL(&hjstate->js.ps, &TTSOpsVirtual);
     717           14214 :     ExecAssignProjectionInfo(&hjstate->js.ps, NULL);
     718                 : 
     719                 :     /*
     720                 :      * tuple table initialization
     721                 :      */
     722 CBC       14214 :     ops = ExecGetResultSlotOps(outerPlanState(hjstate), NULL);
     723           14214 :     hjstate->hj_OuterTupleSlot = ExecInitExtraTupleSlot(estate, outerDesc,
     724                 :                                                         ops);
     725 ECB             : 
     726                 :     /*
     727                 :      * detect whether we need only consider the first matching inner tuple
     728                 :      */
     729 GIC       21208 :     hjstate->js.single_match = (node->join.inner_unique ||
     730            6994 :                                 node->join.jointype == JOIN_SEMI);
     731                 : 
     732                 :     /* set up null tuples for outer joins, if needed */
     733 CBC       14214 :     switch (node->join.jointype)
     734 ECB             :     {
     735 GIC        9014 :         case JOIN_INNER:
     736                 :         case JOIN_SEMI:
     737            9014 :             break;
     738            2167 :         case JOIN_LEFT:
     739 ECB             :         case JOIN_ANTI:
     740 CBC        2167 :             hjstate->hj_NullInnerTupleSlot =
     741 GIC        2167 :                 ExecInitNullTupleSlot(estate, innerDesc, &TTSOpsVirtual);
     742            2167 :             break;
     743            2524 :         case JOIN_RIGHT:
     744                 :         case JOIN_RIGHT_ANTI:
     745            2524 :             hjstate->hj_NullOuterTupleSlot =
     746            2524 :                 ExecInitNullTupleSlot(estate, outerDesc, &TTSOpsVirtual);
     747 CBC        2524 :             break;
     748             509 :         case JOIN_FULL:
     749 GIC         509 :             hjstate->hj_NullOuterTupleSlot =
     750             509 :                 ExecInitNullTupleSlot(estate, outerDesc, &TTSOpsVirtual);
     751 CBC         509 :             hjstate->hj_NullInnerTupleSlot =
     752 GIC         509 :                 ExecInitNullTupleSlot(estate, innerDesc, &TTSOpsVirtual);
     753 CBC         509 :             break;
     754 UIC           0 :         default:
     755 LBC           0 :             elog(ERROR, "unrecognized join type: %d",
     756 ECB             :                  (int) node->join.jointype);
     757                 :     }
     758                 : 
     759                 :     /*
     760                 :      * now for some voodoo.  our temporary tuple slot is actually the result
     761                 :      * tuple slot of the Hash node (which is our inner plan).  we can do this
     762                 :      * because Hash nodes don't return tuples via ExecProcNode() -- instead
     763                 :      * the hash join node uses ExecScanHashBucket() to get at the contents of
     764                 :      * the hash table.  -cim 6/9/91
     765                 :      */
     766                 :     {
     767 CBC       14214 :         HashState  *hashstate = (HashState *) innerPlanState(hjstate);
     768           14214 :         TupleTableSlot *slot = hashstate->ps.ps_ResultTupleSlot;
     769 ECB             : 
     770 CBC       14214 :         hjstate->hj_HashTupleSlot = slot;
     771 ECB             :     }
     772 EUB             : 
     773                 :     /*
     774                 :      * initialize child expressions
     775                 :      */
     776 GIC       14214 :     hjstate->js.ps.qual =
     777           14214 :         ExecInitQual(node->join.plan.qual, (PlanState *) hjstate);
     778           14214 :     hjstate->js.joinqual =
     779           14214 :         ExecInitQual(node->join.joinqual, (PlanState *) hjstate);
     780           14214 :     hjstate->hashclauses =
     781           14214 :         ExecInitQual(node->hashclauses, (PlanState *) hjstate);
     782                 : 
     783                 :     /*
     784                 :      * initialize hash-specific info
     785 ECB             :      */
     786 CBC       14214 :     hjstate->hj_HashTable = NULL;
     787 GIC       14214 :     hjstate->hj_FirstOuterTupleSlot = NULL;
     788 ECB             : 
     789 GIC       14214 :     hjstate->hj_CurHashValue = 0;
     790           14214 :     hjstate->hj_CurBucketNo = 0;
     791           14214 :     hjstate->hj_CurSkewBucketNo = INVALID_SKEW_BUCKET_NO;
     792           14214 :     hjstate->hj_CurTuple = NULL;
     793                 : 
     794 CBC       14214 :     hjstate->hj_OuterHashKeys = ExecInitExprList(node->hashkeys,
     795 ECB             :                                                  (PlanState *) hjstate);
     796 CBC       14214 :     hjstate->hj_HashOperators = node->hashoperators;
     797           14214 :     hjstate->hj_Collations = node->hashcollations;
     798 ECB             : 
     799 CBC       14214 :     hjstate->hj_JoinState = HJ_BUILD_HASHTABLE;
     800 GIC       14214 :     hjstate->hj_MatchedOuter = false;
     801           14214 :     hjstate->hj_OuterNotEmpty = false;
     802                 : 
     803           14214 :     return hjstate;
     804 ECB             : }
     805                 : 
     806                 : /* ----------------------------------------------------------------
     807                 :  *      ExecEndHashJoin
     808                 :  *
     809                 :  *      clean up routine for HashJoin node
     810                 :  * ----------------------------------------------------------------
     811                 :  */
     812                 : void
     813 GIC       14172 : ExecEndHashJoin(HashJoinState *node)
     814 ECB             : {
     815                 :     /*
     816                 :      * Free hash table
     817                 :      */
     818 CBC       14172 :     if (node->hj_HashTable)
     819 ECB             :     {
     820 GIC        9433 :         ExecHashTableDestroy(node->hj_HashTable);
     821 CBC        9433 :         node->hj_HashTable = NULL;
     822                 :     }
     823                 : 
     824                 :     /*
     825                 :      * Free the exprcontext
     826                 :      */
     827 GIC       14172 :     ExecFreeExprContext(&node->js.ps);
     828                 : 
     829                 :     /*
     830                 :      * clean out the tuple table
     831 ECB             :      */
     832 GIC       14172 :     ExecClearTuple(node->js.ps.ps_ResultTupleSlot);
     833           14172 :     ExecClearTuple(node->hj_OuterTupleSlot);
     834           14172 :     ExecClearTuple(node->hj_HashTupleSlot);
     835                 : 
     836 ECB             :     /*
     837                 :      * clean up subtrees
     838                 :      */
     839 CBC       14172 :     ExecEndNode(outerPlanState(node));
     840 GIC       14172 :     ExecEndNode(innerPlanState(node));
     841           14172 : }
     842                 : 
     843                 : /*
     844                 :  * ExecHashJoinOuterGetTuple
     845 ECB             :  *
     846                 :  *      get the next outer tuple for a parallel oblivious hashjoin: either by
     847                 :  *      executing the outer plan node in the first pass, or from the temp
     848                 :  *      files for the hashjoin batches.
     849                 :  *
     850                 :  * Returns a null slot if no more outer tuples (within the current batch).
     851                 :  *
     852                 :  * On success, the tuple's hash value is stored at *hashvalue --- this is
     853                 :  * either originally computed, or re-read from the temp file.
     854                 :  */
     855                 : static TupleTableSlot *
     856 GIC     7642703 : ExecHashJoinOuterGetTuple(PlanState *outerNode,
     857 ECB             :                           HashJoinState *hjstate,
     858                 :                           uint32 *hashvalue)
     859                 : {
     860 GIC     7642703 :     HashJoinTable hashtable = hjstate->hj_HashTable;
     861         7642703 :     int         curbatch = hashtable->curbatch;
     862                 :     TupleTableSlot *slot;
     863                 : 
     864         7642703 :     if (curbatch == 0)          /* if it is the first pass */
     865                 :     {
     866                 :         /*
     867                 :          * Check to see if first outer tuple was already fetched by
     868                 :          * ExecHashJoin() and not used yet.
     869                 :          */
     870         6906855 :         slot = hjstate->hj_FirstOuterTupleSlot;
     871         6906855 :         if (!TupIsNull(slot))
     872            6152 :             hjstate->hj_FirstOuterTupleSlot = NULL;
     873                 :         else
     874 CBC     6900703 :             slot = ExecProcNode(outerNode);
     875                 : 
     876 GIC     6907262 :         while (!TupIsNull(slot))
     877                 :         {
     878 ECB             :             /*
     879                 :              * We have to compute the tuple's hash value.
     880                 :              */
     881 GIC     6897924 :             ExprContext *econtext = hjstate->js.ps.ps_ExprContext;
     882 ECB             : 
     883 GIC     6897924 :             econtext->ecxt_outertuple = slot;
     884         6897924 :             if (ExecHashGetHashValue(hashtable, econtext,
     885                 :                                      hjstate->hj_OuterHashKeys,
     886                 :                                      true,  /* outer tuple */
     887         6897924 :                                      HJ_FILL_OUTER(hjstate),
     888 ECB             :                                      hashvalue))
     889                 :             {
     890                 :                 /* remember outer relation is not empty for possible rescan */
     891 GIC     6897517 :                 hjstate->hj_OuterNotEmpty = true;
     892 ECB             : 
     893 GIC     6897517 :                 return slot;
     894 ECB             :             }
     895                 : 
     896                 :             /*
     897                 :              * That tuple couldn't match because of a NULL, so discard it and
     898                 :              * continue with the next one.
     899                 :              */
     900 GIC         407 :             slot = ExecProcNode(outerNode);
     901 ECB             :         }
     902                 :     }
     903 GIC      735848 :     else if (curbatch < hashtable->nbatch)
     904                 :     {
     905 CBC      735848 :         BufFile    *file = hashtable->outerBatchFile[curbatch];
     906                 : 
     907                 :         /*
     908                 :          * In outer-join cases, we could get here even though the batch file
     909 ECB             :          * is empty.
     910                 :          */
     911 CBC      735848 :         if (file == NULL)
     912 UIC           0 :             return NULL;
     913                 : 
     914 GIC      735848 :         slot = ExecHashJoinGetSavedTuple(hjstate,
     915                 :                                          file,
     916                 :                                          hashvalue,
     917                 :                                          hjstate->hj_OuterTupleSlot);
     918 CBC      735848 :         if (!TupIsNull(slot))
     919 GIC      735096 :             return slot;
     920                 :     }
     921 ECB             : 
     922                 :     /* End of this batch */
     923 CBC       10090 :     return NULL;
     924                 : }
     925                 : 
     926                 : /*
     927                 :  * ExecHashJoinOuterGetTuple variant for the parallel case.
     928                 :  */
     929 ECB             : static TupleTableSlot *
     930 GBC     1080511 : ExecParallelHashJoinOuterGetTuple(PlanState *outerNode,
     931                 :                                   HashJoinState *hjstate,
     932 ECB             :                                   uint32 *hashvalue)
     933                 : {
     934 GIC     1080511 :     HashJoinTable hashtable = hjstate->hj_HashTable;
     935         1080511 :     int         curbatch = hashtable->curbatch;
     936 ECB             :     TupleTableSlot *slot;
     937                 : 
     938                 :     /*
     939                 :      * In the Parallel Hash case we only run the outer plan directly for
     940                 :      * single-batch hash joins.  Otherwise we have to go to batch files, even
     941                 :      * for batch 0.
     942                 :      */
     943 GIC     1080511 :     if (curbatch == 0 && hashtable->nbatch == 1)
     944                 :     {
     945          480071 :         slot = ExecProcNode(outerNode);
     946                 : 
     947          480071 :         while (!TupIsNull(slot))
     948 ECB             :         {
     949 GIC      480000 :             ExprContext *econtext = hjstate->js.ps.ps_ExprContext;
     950                 : 
     951          480000 :             econtext->ecxt_outertuple = slot;
     952 CBC      480000 :             if (ExecHashGetHashValue(hashtable, econtext,
     953 ECB             :                                      hjstate->hj_OuterHashKeys,
     954                 :                                      true,  /* outer tuple */
     955 GIC      480000 :                                      HJ_FILL_OUTER(hjstate),
     956                 :                                      hashvalue))
     957          480000 :                 return slot;
     958                 : 
     959                 :             /*
     960                 :              * That tuple couldn't match because of a NULL, so discard it and
     961 ECB             :              * continue with the next one.
     962                 :              */
     963 LBC           0 :             slot = ExecProcNode(outerNode);
     964                 :         }
     965 ECB             :     }
     966 GIC      600440 :     else if (curbatch < hashtable->nbatch)
     967 ECB             :     {
     968                 :         MinimalTuple tuple;
     969                 : 
     970 CBC      600440 :         tuple = sts_parallel_scan_next(hashtable->batches[curbatch].outer_tuples,
     971                 :                                        hashvalue);
     972 GIC      600440 :         if (tuple != NULL)
     973 ECB             :         {
     974 GIC      600012 :             ExecForceStoreMinimalTuple(tuple,
     975 ECB             :                                        hjstate->hj_OuterTupleSlot,
     976                 :                                        false);
     977 GIC      600012 :             slot = hjstate->hj_OuterTupleSlot;
     978          600012 :             return slot;
     979                 :         }
     980                 :         else
     981 GBC         428 :             ExecClearTuple(hjstate->hj_OuterTupleSlot);
     982                 :     }
     983                 : 
     984 ECB             :     /* End of this batch */
     985 GNC         499 :     hashtable->batches[curbatch].outer_eof = true;
     986                 : 
     987 GIC         499 :     return NULL;
     988                 : }
     989                 : 
     990 ECB             : /*
     991                 :  * ExecHashJoinNewBatch
     992                 :  *      switch to a new hashjoin batch
     993                 :  *
     994                 :  * Returns true if successful, false if there are no more batches.
     995                 :  */
     996                 : static bool
     997 CBC       10087 : ExecHashJoinNewBatch(HashJoinState *hjstate)
     998 ECB             : {
     999 GIC       10087 :     HashJoinTable hashtable = hjstate->hj_HashTable;
    1000                 :     int         nbatch;
    1001 ECB             :     int         curbatch;
    1002                 :     BufFile    *innerFile;
    1003                 :     TupleTableSlot *slot;
    1004                 :     uint32      hashvalue;
    1005                 : 
    1006 GIC       10087 :     nbatch = hashtable->nbatch;
    1007 CBC       10087 :     curbatch = hashtable->curbatch;
    1008                 : 
    1009 GIC       10087 :     if (curbatch > 0)
    1010                 :     {
    1011                 :         /*
    1012                 :          * We no longer need the previous outer batch file; close it right
    1013                 :          * away to free disk space.
    1014                 :          */
    1015             752 :         if (hashtable->outerBatchFile[curbatch])
    1016             752 :             BufFileClose(hashtable->outerBatchFile[curbatch]);
    1017 CBC         752 :         hashtable->outerBatchFile[curbatch] = NULL;
    1018                 :     }
    1019 ECB             :     else                        /* we just finished the first batch */
    1020                 :     {
    1021                 :         /*
    1022                 :          * Reset some of the skew optimization state variables, since we no
    1023                 :          * longer need to consider skew tuples after the first batch. The
    1024                 :          * memory context reset we are about to do will release the skew
    1025                 :          * hashtable itself.
    1026                 :          */
    1027 CBC        9335 :         hashtable->skewEnabled = false;
    1028 GIC        9335 :         hashtable->skewBucket = NULL;
    1029 CBC        9335 :         hashtable->skewBucketNums = NULL;
    1030 GIC        9335 :         hashtable->nSkewBuckets = 0;
    1031            9335 :         hashtable->spaceUsedSkew = 0;
    1032                 :     }
    1033                 : 
    1034                 :     /*
    1035 ECB             :      * We can always skip over any batches that are completely empty on both
    1036                 :      * sides.  We can sometimes skip over batches that are empty on only one
    1037                 :      * side, but there are exceptions:
    1038                 :      *
    1039                 :      * 1. In a left/full outer join, we have to process outer batches even if
    1040                 :      * the inner batch is empty.  Similarly, in a right/right-anti/full outer
    1041                 :      * join, we have to process inner batches even if the outer batch is
    1042                 :      * empty.
    1043                 :      *
    1044                 :      * 2. If we have increased nbatch since the initial estimate, we have to
    1045                 :      * scan inner batches since they might contain tuples that need to be
    1046                 :      * reassigned to later inner batches.
    1047                 :      *
    1048                 :      * 3. Similarly, if we have increased nbatch since starting the outer
    1049                 :      * scan, we have to rescan outer batches in case they contain tuples that
    1050                 :      * need to be reassigned.
    1051                 :      */
    1052 CBC       10087 :     curbatch++;
    1053 GIC       10087 :     while (curbatch < nbatch &&
    1054             752 :            (hashtable->outerBatchFile[curbatch] == NULL ||
    1055             752 :             hashtable->innerBatchFile[curbatch] == NULL))
    1056                 :     {
    1057 UIC           0 :         if (hashtable->outerBatchFile[curbatch] &&
    1058               0 :             HJ_FILL_OUTER(hjstate))
    1059               0 :             break;              /* must process due to rule 1 */
    1060               0 :         if (hashtable->innerBatchFile[curbatch] &&
    1061               0 :             HJ_FILL_INNER(hjstate))
    1062               0 :             break;              /* must process due to rule 1 */
    1063               0 :         if (hashtable->innerBatchFile[curbatch] &&
    1064               0 :             nbatch != hashtable->nbatch_original)
    1065               0 :             break;              /* must process due to rule 2 */
    1066               0 :         if (hashtable->outerBatchFile[curbatch] &&
    1067               0 :             nbatch != hashtable->nbatch_outstart)
    1068               0 :             break;              /* must process due to rule 3 */
    1069                 :         /* We can ignore this batch. */
    1070                 :         /* Release associated temp files right away. */
    1071               0 :         if (hashtable->innerBatchFile[curbatch])
    1072               0 :             BufFileClose(hashtable->innerBatchFile[curbatch]);
    1073 LBC           0 :         hashtable->innerBatchFile[curbatch] = NULL;
    1074               0 :         if (hashtable->outerBatchFile[curbatch])
    1075               0 :             BufFileClose(hashtable->outerBatchFile[curbatch]);
    1076               0 :         hashtable->outerBatchFile[curbatch] = NULL;
    1077 UIC           0 :         curbatch++;
    1078 EUB             :     }
    1079                 : 
    1080 GBC       10087 :     if (curbatch >= nbatch)
    1081            9335 :         return false;           /* no more batches */
    1082 EUB             : 
    1083 GBC         752 :     hashtable->curbatch = curbatch;
    1084 EUB             : 
    1085                 :     /*
    1086                 :      * Reload the hash table with the new inner batch (which could be empty)
    1087                 :      */
    1088 GBC         752 :     ExecHashTableReset(hashtable);
    1089 EUB             : 
    1090 GIC         752 :     innerFile = hashtable->innerBatchFile[curbatch];
    1091                 : 
    1092 GBC         752 :     if (innerFile != NULL)
    1093 EUB             :     {
    1094 GNC         752 :         if (BufFileSeek(innerFile, 0, 0, SEEK_SET))
    1095 UBC           0 :             ereport(ERROR,
    1096 EUB             :                     (errcode_for_file_access(),
    1097                 :                      errmsg("could not rewind hash-join temporary file")));
    1098                 : 
    1099 GIC     1669353 :         while ((slot = ExecHashJoinGetSavedTuple(hjstate,
    1100                 :                                                  innerFile,
    1101 ECB             :                                                  &hashvalue,
    1102                 :                                                  hjstate->hj_HashTupleSlot)))
    1103                 :         {
    1104                 :             /*
    1105                 :              * NOTE: some tuples may be sent to future batches.  Also, it is
    1106                 :              * possible for hashtable->nbatch to be increased here!
    1107                 :              */
    1108 GIC     1668601 :             ExecHashTableInsert(hashtable, slot, hashvalue);
    1109 ECB             :         }
    1110                 : 
    1111                 :         /*
    1112                 :          * after we build the hash table, the inner batch file is no longer
    1113                 :          * needed
    1114                 :          */
    1115 CBC         752 :         BufFileClose(innerFile);
    1116 GBC         752 :         hashtable->innerBatchFile[curbatch] = NULL;
    1117                 :     }
    1118                 : 
    1119                 :     /*
    1120 ECB             :      * Rewind outer batch file (if present), so that we can start reading it.
    1121                 :      */
    1122 GIC         752 :     if (hashtable->outerBatchFile[curbatch] != NULL)
    1123                 :     {
    1124 GNC         752 :         if (BufFileSeek(hashtable->outerBatchFile[curbatch], 0, 0, SEEK_SET))
    1125 UIC           0 :             ereport(ERROR,
    1126                 :                     (errcode_for_file_access(),
    1127                 :                      errmsg("could not rewind hash-join temporary file")));
    1128                 :     }
    1129 ECB             : 
    1130 GIC         752 :     return true;
    1131                 : }
    1132                 : 
    1133                 : /*
    1134                 :  * Choose a batch to work on, and attach to it.  Returns true if successful,
    1135                 :  * false if there are no more batches.
    1136 ECB             :  */
    1137                 : static bool
    1138 GIC         688 : ExecParallelHashJoinNewBatch(HashJoinState *hjstate)
    1139                 : {
    1140             688 :     HashJoinTable hashtable = hjstate->hj_HashTable;
    1141                 :     int         start_batchno;
    1142                 :     int         batchno;
    1143 ECB             : 
    1144                 :     /*
    1145                 :      * If we were already attached to a batch, remember not to bother checking
    1146 EUB             :      * it again, and detach from it (possibly freeing the hash table if we are
    1147                 :      * last to detach).
    1148                 :      */
    1149 GIC         688 :     if (hashtable->curbatch >= 0)
    1150                 :     {
    1151 CBC         486 :         hashtable->batches[hashtable->curbatch].done = true;
    1152 GIC         486 :         ExecHashTableDetachBatch(hashtable);
    1153                 :     }
    1154                 : 
    1155                 :     /*
    1156                 :      * Search for a batch that isn't done.  We use an atomic counter to start
    1157                 :      * our search at a different batch in every participant when there are
    1158                 :      * more batches than participants.
    1159 ECB             :      */
    1160 GIC         688 :     batchno = start_batchno =
    1161 CBC         688 :         pg_atomic_fetch_add_u32(&hashtable->parallel_state->distributor, 1) %
    1162 GIC         688 :         hashtable->nbatch;
    1163                 :     do
    1164                 :     {
    1165                 :         uint32      hashvalue;
    1166                 :         MinimalTuple tuple;
    1167                 :         TupleTableSlot *slot;
    1168                 : 
    1169            1799 :         if (!hashtable->batches[batchno].done)
    1170 ECB             :         {
    1171                 :             SharedTuplestoreAccessor *inner_tuples;
    1172 CBC         948 :             Barrier    *batch_barrier =
    1173             948 :             &hashtable->batches[batchno].shared->batch_barrier;
    1174                 : 
    1175 GIC         948 :             switch (BarrierAttach(batch_barrier))
    1176                 :             {
    1177 GNC         327 :                 case PHJ_BATCH_ELECT:
    1178                 : 
    1179                 :                     /* One backend allocates the hash table. */
    1180 GIC         327 :                     if (BarrierArriveAndWait(batch_barrier,
    1181 ECB             :                                              WAIT_EVENT_HASH_BATCH_ELECT))
    1182 CBC         327 :                         ExecParallelHashTableAlloc(hashtable, batchno);
    1183 ECB             :                     /* Fall through. */
    1184                 : 
    1185                 :                 case PHJ_BATCH_ALLOCATE:
    1186                 :                     /* Wait for allocation to complete. */
    1187 GIC         328 :                     BarrierArriveAndWait(batch_barrier,
    1188                 :                                          WAIT_EVENT_HASH_BATCH_ALLOCATE);
    1189                 :                     /* Fall through. */
    1190 ECB             : 
    1191 GNC         342 :                 case PHJ_BATCH_LOAD:
    1192                 :                     /* Start (or join in) loading tuples. */
    1193 CBC         342 :                     ExecParallelHashTableSetCurrentBatch(hashtable, batchno);
    1194             342 :                     inner_tuples = hashtable->batches[batchno].inner_tuples;
    1195 GIC         342 :                     sts_begin_parallel_scan(inner_tuples);
    1196 CBC      542379 :                     while ((tuple = sts_parallel_scan_next(inner_tuples,
    1197                 :                                                            &hashvalue)))
    1198 ECB             :                     {
    1199 GIC      542037 :                         ExecForceStoreMinimalTuple(tuple,
    1200                 :                                                    hjstate->hj_HashTupleSlot,
    1201 ECB             :                                                    false);
    1202 GIC      542037 :                         slot = hjstate->hj_HashTupleSlot;
    1203 CBC      542037 :                         ExecParallelHashTableInsertCurrentBatch(hashtable, slot,
    1204                 :                                                                 hashvalue);
    1205                 :                     }
    1206 GIC         342 :                     sts_end_parallel_scan(inner_tuples);
    1207             342 :                     BarrierArriveAndWait(batch_barrier,
    1208 ECB             :                                          WAIT_EVENT_HASH_BATCH_LOAD);
    1209                 :                     /* Fall through. */
    1210                 : 
    1211 GNC         499 :                 case PHJ_BATCH_PROBE:
    1212 ECB             : 
    1213                 :                     /*
    1214                 :                      * This batch is ready to probe.  Return control to
    1215                 :                      * caller. We stay attached to batch_barrier so that the
    1216                 :                      * hash table stays alive until everyone's finished
    1217                 :                      * probing it, but no participant is allowed to wait at
    1218                 :                      * this barrier again (or else a deadlock could occur).
    1219                 :                      * All attached participants must eventually detach from
    1220                 :                      * the barrier and one worker must advance the phase so
    1221                 :                      * that the final phase is reached.
    1222                 :                      */
    1223 CBC         499 :                     ExecParallelHashTableSetCurrentBatch(hashtable, batchno);
    1224             499 :                     sts_begin_parallel_scan(hashtable->batches[batchno].outer_tuples);
    1225                 : 
    1226 GNC         499 :                     return true;
    1227               2 :                 case PHJ_BATCH_SCAN:
    1228                 : 
    1229                 :                     /*
    1230                 :                      * In principle, we could help scan for unmatched tuples,
    1231                 :                      * since that phase is already underway (the thing we
    1232                 :                      * can't do under current deadlock-avoidance rules is wait
    1233                 :                      * for others to arrive at PHJ_BATCH_SCAN, because
    1234                 :                      * PHJ_BATCH_PROBE emits tuples, but in this case we just
    1235                 :                      * got here without waiting).  That is not yet done.  For
    1236                 :                      * now, we just detach and go around again.  We have to
    1237                 :                      * use ExecHashTableDetachBatch() because there's a small
    1238                 :                      * chance we'll be the last to detach, and then we're
    1239                 :                      * responsible for freeing memory.
    1240                 :                      */
    1241               2 :                     ExecParallelHashTableSetCurrentBatch(hashtable, batchno);
    1242               2 :                     hashtable->batches[batchno].done = true;
    1243               2 :                     ExecHashTableDetachBatch(hashtable);
    1244               2 :                     break;
    1245                 : 
    1246             447 :                 case PHJ_BATCH_FREE:
    1247 ECB             : 
    1248                 :                     /*
    1249                 :                      * Already done.  Detach and go around again (if any
    1250                 :                      * remain).
    1251                 :                      */
    1252 GIC         447 :                     BarrierDetach(batch_barrier);
    1253             447 :                     hashtable->batches[batchno].done = true;
    1254             447 :                     hashtable->curbatch = -1;
    1255             447 :                     break;
    1256                 : 
    1257 UIC           0 :                 default:
    1258               0 :                     elog(ERROR, "unexpected batch phase %d",
    1259                 :                          BarrierPhase(batch_barrier));
    1260                 :             }
    1261                 :         }
    1262 GIC        1300 :         batchno = (batchno + 1) % hashtable->nbatch;
    1263 CBC        1300 :     } while (batchno != start_batchno);
    1264 ECB             : 
    1265 GIC         189 :     return false;
    1266 ECB             : }
    1267                 : 
    1268                 : /*
    1269                 :  * ExecHashJoinSaveTuple
    1270                 :  *      save a tuple to a batch file.
    1271                 :  *
    1272                 :  * The data recorded in the file for each tuple is its hash value,
    1273                 :  * then the tuple in MinimalTuple format.
    1274                 :  *
    1275                 :  * Note: it is important always to call this in the regular executor
    1276                 :  * context, not in a shorter-lived context; else the temp file buffers
    1277                 :  * will get messed up.
    1278                 :  */
    1279                 : void
    1280 GIC     2403697 : ExecHashJoinSaveTuple(MinimalTuple tuple, uint32 hashvalue,
    1281 ECB             :                       BufFile **fileptr)
    1282                 : {
    1283 CBC     2403697 :     BufFile    *file = *fileptr;
    1284 ECB             : 
    1285 GIC     2403697 :     if (file == NULL)
    1286 ECB             :     {
    1287                 :         /* First write to this batch file, so open it. */
    1288 GIC        1504 :         file = BufFileCreateTemp(false);
    1289            1504 :         *fileptr = file;
    1290                 :     }
    1291                 : 
    1292 GNC     2403697 :     BufFileWrite(file, &hashvalue, sizeof(uint32));
    1293         2403697 :     BufFileWrite(file, tuple, tuple->t_len);
    1294 CBC     2403697 : }
    1295 ECB             : 
    1296                 : /*
    1297 EUB             :  * ExecHashJoinGetSavedTuple
    1298                 :  *      read the next tuple from a batch file.  Return NULL if no more.
    1299                 :  *
    1300                 :  * On success, *hashvalue is set to the tuple's hash value, and the tuple
    1301                 :  * itself is stored in the given slot.
    1302 ECB             :  */
    1303                 : static TupleTableSlot *
    1304 GIC     2405201 : ExecHashJoinGetSavedTuple(HashJoinState *hjstate,
    1305 ECB             :                           BufFile *file,
    1306                 :                           uint32 *hashvalue,
    1307                 :                           TupleTableSlot *tupleSlot)
    1308                 : {
    1309                 :     uint32      header[2];
    1310                 :     size_t      nread;
    1311                 :     MinimalTuple tuple;
    1312                 : 
    1313                 :     /*
    1314                 :      * We check for interrupts here because this is typically taken as an
    1315                 :      * alternative code path to an ExecProcNode() call, which would include
    1316                 :      * such a check.
    1317                 :      */
    1318 GIC     2405201 :     CHECK_FOR_INTERRUPTS();
    1319                 : 
    1320 ECB             :     /*
    1321                 :      * Since both the hash value and the MinimalTuple length word are uint32,
    1322                 :      * we can read them both in one BufFileRead() call without any type
    1323                 :      * cheating.
    1324                 :      */
    1325 GNC     2405201 :     nread = BufFileReadMaybeEOF(file, header, sizeof(header), true);
    1326 GIC     2405201 :     if (nread == 0)             /* end of file */
    1327                 :     {
    1328 CBC        1504 :         ExecClearTuple(tupleSlot);
    1329            1504 :         return NULL;
    1330                 :     }
    1331 GIC     2403697 :     *hashvalue = header[0];
    1332         2403697 :     tuple = (MinimalTuple) palloc(header[1]);
    1333         2403697 :     tuple->t_len = header[1];
    1334 GNC     2403697 :     BufFileReadExact(file,
    1335                 :                      (char *) tuple + sizeof(uint32),
    1336         2403697 :                      header[1] - sizeof(uint32));
    1337 GIC     2403697 :     ExecForceStoreMinimalTuple(tuple, tupleSlot, true);
    1338         2403697 :     return tupleSlot;
    1339                 : }
    1340                 : 
    1341                 : 
    1342                 : void
    1343            1143 : ExecReScanHashJoin(HashJoinState *node)
    1344                 : {
    1345 GNC        1143 :     PlanState  *outerPlan = outerPlanState(node);
    1346            1143 :     PlanState  *innerPlan = innerPlanState(node);
    1347                 : 
    1348                 :     /*
    1349                 :      * In a multi-batch join, we currently have to do rescans the hard way,
    1350                 :      * primarily because batch temp files may have already been released. But
    1351 ECB             :      * if it's a single-batch join, and there is no parameter change for the
    1352                 :      * inner subnode, then we can just re-use the existing hash table without
    1353                 :      * rebuilding it.
    1354                 :      */
    1355 GIC        1143 :     if (node->hj_HashTable != NULL)
    1356                 :     {
    1357             662 :         if (node->hj_HashTable->nbatch == 1 &&
    1358 GNC         662 :             innerPlan->chgParam == NULL)
    1359 ECB             :         {
    1360                 :             /*
    1361                 :              * Okay to reuse the hash table; needn't rescan inner, either.
    1362                 :              *
    1363                 :              * However, if it's a right/right-anti/full join, we'd better
    1364                 :              * reset the inner-tuple match flags contained in the table.
    1365                 :              */
    1366 CBC         251 :             if (HJ_FILL_INNER(node))
    1367               3 :                 ExecHashTableResetMatchFlags(node->hj_HashTable);
    1368                 : 
    1369 ECB             :             /*
    1370                 :              * Also, we need to reset our state about the emptiness of the
    1371                 :              * outer relation, so that the new scan of the outer will update
    1372                 :              * it correctly if it turns out to be empty this time. (There's no
    1373                 :              * harm in clearing it now because ExecHashJoin won't need the
    1374                 :              * info.  In the other cases, where the hash table doesn't exist
    1375                 :              * or we are destroying it, we leave this state alone because
    1376                 :              * ExecHashJoin will need it the first time through.)
    1377                 :              */
    1378 CBC         251 :             node->hj_OuterNotEmpty = false;
    1379 ECB             : 
    1380                 :             /* ExecHashJoin can skip the BUILD_HASHTABLE step */
    1381 GIC         251 :             node->hj_JoinState = HJ_NEED_NEW_OUTER;
    1382                 :         }
    1383                 :         else
    1384                 :         {
    1385                 :             /* must destroy and rebuild hash table */
    1386 GNC         411 :             HashState  *hashNode = castNode(HashState, innerPlan);
    1387                 : 
    1388 CBC         411 :             Assert(hashNode->hashtable == node->hj_HashTable);
    1389                 :             /* accumulate stats from old hash table, if wanted */
    1390 ECB             :             /* (this should match ExecShutdownHash) */
    1391 CBC         411 :             if (hashNode->ps.instrument && !hashNode->hinstrument)
    1392 UIC           0 :                 hashNode->hinstrument = (HashInstrumentation *)
    1393               0 :                     palloc0(sizeof(HashInstrumentation));
    1394 GIC         411 :             if (hashNode->hinstrument)
    1395 UIC           0 :                 ExecHashAccumInstrumentation(hashNode->hinstrument,
    1396                 :                                              hashNode->hashtable);
    1397                 :             /* for safety, be sure to clear child plan node's pointer too */
    1398 GIC         411 :             hashNode->hashtable = NULL;
    1399 ECB             : 
    1400 CBC         411 :             ExecHashTableDestroy(node->hj_HashTable);
    1401 GIC         411 :             node->hj_HashTable = NULL;
    1402             411 :             node->hj_JoinState = HJ_BUILD_HASHTABLE;
    1403                 : 
    1404                 :             /*
    1405                 :              * if chgParam of subnode is not null then plan will be re-scanned
    1406                 :              * by first ExecProcNode.
    1407                 :              */
    1408 GNC         411 :             if (innerPlan->chgParam == NULL)
    1409 UNC           0 :                 ExecReScan(innerPlan);
    1410                 :         }
    1411 ECB             :     }
    1412                 : 
    1413                 :     /* Always reset intra-tuple state */
    1414 CBC        1143 :     node->hj_CurHashValue = 0;
    1415 GIC        1143 :     node->hj_CurBucketNo = 0;
    1416            1143 :     node->hj_CurSkewBucketNo = INVALID_SKEW_BUCKET_NO;
    1417            1143 :     node->hj_CurTuple = NULL;
    1418                 : 
    1419 CBC        1143 :     node->hj_MatchedOuter = false;
    1420 GIC        1143 :     node->hj_FirstOuterTupleSlot = NULL;
    1421 ECB             : 
    1422                 :     /*
    1423                 :      * if chgParam of subnode is not null then plan will be re-scanned by
    1424                 :      * first ExecProcNode.
    1425 EUB             :      */
    1426 GNC        1143 :     if (outerPlan->chgParam == NULL)
    1427             826 :         ExecReScan(outerPlan);
    1428 GBC        1143 : }
    1429                 : 
    1430                 : void
    1431 CBC       12754 : ExecShutdownHashJoin(HashJoinState *node)
    1432                 : {
    1433           12754 :     if (node->hj_HashTable)
    1434 ECB             :     {
    1435                 :         /*
    1436                 :          * Detach from shared state before DSM memory goes away.  This makes
    1437                 :          * sure that we don't have any pointers into DSM memory by the time
    1438                 :          * ExecEndHashJoin runs.
    1439                 :          */
    1440 GIC        9424 :         ExecHashTableDetachBatch(node->hj_HashTable);
    1441 CBC        9424 :         ExecHashTableDetach(node->hj_HashTable);
    1442 EUB             :     }
    1443 GIC       12754 : }
    1444                 : 
    1445                 : static void
    1446              87 : ExecParallelHashJoinPartitionOuter(HashJoinState *hjstate)
    1447 ECB             : {
    1448 CBC          87 :     PlanState  *outerState = outerPlanState(hjstate);
    1449              87 :     ExprContext *econtext = hjstate->js.ps.ps_ExprContext;
    1450              87 :     HashJoinTable hashtable = hjstate->hj_HashTable;
    1451                 :     TupleTableSlot *slot;
    1452 ECB             :     uint32      hashvalue;
    1453                 :     int         i;
    1454                 : 
    1455 GIC          87 :     Assert(hjstate->hj_FirstOuterTupleSlot == NULL);
    1456                 : 
    1457                 :     /* Execute outer plan, writing all tuples to shared tuplestores. */
    1458                 :     for (;;)
    1459 ECB             :     {
    1460 CBC      600099 :         slot = ExecProcNode(outerState);
    1461          600099 :         if (TupIsNull(slot))
    1462                 :             break;
    1463 GIC      600012 :         econtext->ecxt_outertuple = slot;
    1464 CBC      600012 :         if (ExecHashGetHashValue(hashtable, econtext,
    1465                 :                                  hjstate->hj_OuterHashKeys,
    1466 ECB             :                                  true,  /* outer tuple */
    1467 GIC      600012 :                                  HJ_FILL_OUTER(hjstate),
    1468                 :                                  &hashvalue))
    1469                 :         {
    1470                 :             int         batchno;
    1471                 :             int         bucketno;
    1472                 :             bool        shouldFree;
    1473 CBC      600012 :             MinimalTuple mintup = ExecFetchSlotMinimalTuple(slot, &shouldFree);
    1474 ECB             : 
    1475 GIC      600012 :             ExecHashGetBucketAndBatch(hashtable, hashvalue, &bucketno,
    1476 ECB             :                                       &batchno);
    1477 GIC      600012 :             sts_puttuple(hashtable->batches[batchno].outer_tuples,
    1478                 :                          &hashvalue, mintup);
    1479 ECB             : 
    1480 GIC      600012 :             if (shouldFree)
    1481 CBC      600012 :                 heap_free_minimal_tuple(mintup);
    1482 ECB             :         }
    1483 CBC      600012 :         CHECK_FOR_INTERRUPTS();
    1484                 :     }
    1485                 : 
    1486                 :     /* Make sure all outer partitions are readable by any backend. */
    1487 GIC         811 :     for (i = 0; i < hashtable->nbatch; ++i)
    1488 CBC         724 :         sts_end_write(hashtable->batches[i].outer_tuples);
    1489 GIC          87 : }
    1490                 : 
    1491                 : void
    1492              57 : ExecHashJoinEstimate(HashJoinState *state, ParallelContext *pcxt)
    1493 ECB             : {
    1494 CBC          57 :     shm_toc_estimate_chunk(&pcxt->estimator, sizeof(ParallelHashJoinState));
    1495 GIC          57 :     shm_toc_estimate_keys(&pcxt->estimator, 1);
    1496 CBC          57 : }
    1497 ECB             : 
    1498                 : void
    1499 GIC          57 : ExecHashJoinInitializeDSM(HashJoinState *state, ParallelContext *pcxt)
    1500 ECB             : {
    1501 GIC          57 :     int         plan_node_id = state->js.ps.plan->plan_node_id;
    1502                 :     HashState  *hashNode;
    1503                 :     ParallelHashJoinState *pstate;
    1504                 : 
    1505                 :     /*
    1506 ECB             :      * Disable shared hash table mode if we failed to create a real DSM
    1507                 :      * segment, because that means that we don't have a DSA area to work with.
    1508                 :      */
    1509 GIC          57 :     if (pcxt->seg == NULL)
    1510 LBC           0 :         return;
    1511                 : 
    1512 GIC          57 :     ExecSetExecProcNode(&state->js.ps, ExecParallelHashJoin);
    1513 ECB             : 
    1514                 :     /*
    1515                 :      * Set up the state needed to coordinate access to the shared hash
    1516                 :      * table(s), using the plan node ID as the toc key.
    1517                 :      */
    1518 GIC          57 :     pstate = shm_toc_allocate(pcxt->toc, sizeof(ParallelHashJoinState));
    1519              57 :     shm_toc_insert(pcxt->toc, plan_node_id, pstate);
    1520 ECB             : 
    1521                 :     /*
    1522                 :      * Set up the shared hash join state with no batches initially.
    1523                 :      * ExecHashTableCreate() will prepare at least one later and set nbatch
    1524                 :      * and space_allowed.
    1525                 :      */
    1526 GIC          57 :     pstate->nbatch = 0;
    1527 CBC          57 :     pstate->space_allowed = 0;
    1528              57 :     pstate->batches = InvalidDsaPointer;
    1529              57 :     pstate->old_batches = InvalidDsaPointer;
    1530 GIC          57 :     pstate->nbuckets = 0;
    1531              57 :     pstate->growth = PHJ_GROWTH_OK;
    1532 CBC          57 :     pstate->chunk_work_queue = InvalidDsaPointer;
    1533 GIC          57 :     pg_atomic_init_u32(&pstate->distributor, 0);
    1534 CBC          57 :     pstate->nparticipants = pcxt->nworkers + 1;
    1535 GIC          57 :     pstate->total_tuples = 0;
    1536              57 :     LWLockInitialize(&pstate->lock,
    1537                 :                      LWTRANCHE_PARALLEL_HASH_JOIN);
    1538              57 :     BarrierInit(&pstate->build_barrier, 0);
    1539              57 :     BarrierInit(&pstate->grow_batches_barrier, 0);
    1540              57 :     BarrierInit(&pstate->grow_buckets_barrier, 0);
    1541                 : 
    1542 ECB             :     /* Set up the space we'll use for shared temporary files. */
    1543 GBC          57 :     SharedFileSetInit(&pstate->fileset, pcxt->seg);
    1544                 : 
    1545 ECB             :     /* Initialize the shared state in the hash node. */
    1546 GIC          57 :     hashNode = (HashState *) innerPlanState(state);
    1547              57 :     hashNode->parallel_state = pstate;
    1548                 : }
    1549                 : 
    1550                 : /* ----------------------------------------------------------------
    1551 ECB             :  *      ExecHashJoinReInitializeDSM
    1552                 :  *
    1553                 :  *      Reset shared state before beginning a fresh scan.
    1554                 :  * ----------------------------------------------------------------
    1555                 :  */
    1556                 : void
    1557 GNC          24 : ExecHashJoinReInitializeDSM(HashJoinState *state, ParallelContext *pcxt)
    1558                 : {
    1559 CBC          24 :     int         plan_node_id = state->js.ps.plan->plan_node_id;
    1560 ECB             :     ParallelHashJoinState *pstate =
    1561 GNC          24 :     shm_toc_lookup(pcxt->toc, plan_node_id, false);
    1562 ECB             : 
    1563                 :     /*
    1564                 :      * It would be possible to reuse the shared hash table in single-batch
    1565                 :      * cases by resetting and then fast-forwarding build_barrier to
    1566                 :      * PHJ_BUILD_FREE and batch 0's batch_barrier to PHJ_BATCH_PROBE, but
    1567                 :      * currently shared hash tables are already freed by now (by the last
    1568                 :      * participant to detach from the batch).  We could consider keeping it
    1569                 :      * around for single-batch joins.  We'd also need to adjust
    1570                 :      * finalize_plan() so that it doesn't record a dummy dependency for
    1571                 :      * Parallel Hash nodes, preventing the rescan optimization.  For now we
    1572                 :      * don't try.
    1573                 :      */
    1574                 : 
    1575                 :     /* Detach, freeing any remaining shared memory. */
    1576 CBC          24 :     if (state->hj_HashTable != NULL)
    1577                 :     {
    1578 UIC           0 :         ExecHashTableDetachBatch(state->hj_HashTable);
    1579 LBC           0 :         ExecHashTableDetach(state->hj_HashTable);
    1580 ECB             :     }
    1581                 : 
    1582                 :     /* Clear any shared batch files. */
    1583 GIC          24 :     SharedFileSetDeleteAll(&pstate->fileset);
    1584                 : 
    1585                 :     /* Reset build_barrier to PHJ_BUILD_ELECT so we can go around again. */
    1586              24 :     BarrierInit(&pstate->build_barrier, 0);
    1587              24 : }
    1588                 : 
    1589                 : void
    1590 CBC         147 : ExecHashJoinInitializeWorker(HashJoinState *state,
    1591                 :                              ParallelWorkerContext *pwcxt)
    1592 ECB             : {
    1593                 :     HashState  *hashNode;
    1594 CBC         147 :     int         plan_node_id = state->js.ps.plan->plan_node_id;
    1595                 :     ParallelHashJoinState *pstate =
    1596 GIC         147 :     shm_toc_lookup(pwcxt->toc, plan_node_id, false);
    1597                 : 
    1598                 :     /* Attach to the space for shared temporary files. */
    1599             147 :     SharedFileSetAttach(&pstate->fileset, pwcxt->seg);
    1600                 : 
    1601                 :     /* Attach to the shared state in the hash node. */
    1602             147 :     hashNode = (HashState *) innerPlanState(state);
    1603             147 :     hashNode->parallel_state = pstate;
    1604                 : 
    1605             147 :     ExecSetExecProcNode(&state->js.ps, ExecParallelHashJoin);
    1606             147 : }
        

Generated by: LCOV version v1.16-55-g56c0a2a