LCOV - differential code coverage report
Current view: top level - src/backend/executor - nodeGather.c (source / functions) Coverage Total Hit UBC GNC CBC DCB
Current: Differential Code Coverage HEAD vs 15 Lines: 97.9 % 141 138 3 2 136 2
Current Date: 2023-04-08 15:15:32 Functions: 100.0 % 8 8 1 7
Baseline: 15
Baseline Date: 2023-04-08 15:09:40
Legend: Lines: hit not hit

           TLA  Line data    Source code
       1                 : /*-------------------------------------------------------------------------
       2                 :  *
       3                 :  * nodeGather.c
       4                 :  *    Support routines for scanning a plan via multiple workers.
       5                 :  *
       6                 :  * Portions Copyright (c) 1996-2023, PostgreSQL Global Development Group
       7                 :  * Portions Copyright (c) 1994, Regents of the University of California
       8                 :  *
       9                 :  * A Gather executor launches parallel workers to run multiple copies of a
      10                 :  * plan.  It can also run the plan itself, if the workers are not available
      11                 :  * or have not started up yet.  It then merges all of the results it produces
      12                 :  * and the results from the workers into a single output stream.  Therefore,
      13                 :  * it will normally be used with a plan where running multiple copies of the
      14                 :  * same plan does not produce duplicate output, such as parallel-aware
      15                 :  * SeqScan.
      16                 :  *
      17                 :  * Alternatively, a Gather node can be configured to use just one worker
      18                 :  * and the single-copy flag can be set.  In this case, the Gather node will
      19                 :  * run the plan in one worker and will not execute the plan itself.  In
      20                 :  * this case, it simply returns whatever tuples were returned by the worker.
      21                 :  * If a worker cannot be obtained, then it will run the plan itself and
      22                 :  * return the results.  Therefore, a plan used with a single-copy Gather
      23                 :  * node need not be parallel-aware.
      24                 :  *
      25                 :  * IDENTIFICATION
      26                 :  *    src/backend/executor/nodeGather.c
      27                 :  *
      28                 :  *-------------------------------------------------------------------------
      29                 :  */
      30                 : 
      31                 : #include "postgres.h"
      32                 : 
      33                 : #include "access/relscan.h"
      34                 : #include "access/xact.h"
      35                 : #include "executor/execdebug.h"
      36                 : #include "executor/execParallel.h"
      37                 : #include "executor/nodeGather.h"
      38                 : #include "executor/nodeSubplan.h"
      39                 : #include "executor/tqueue.h"
      40                 : #include "miscadmin.h"
      41                 : #include "optimizer/optimizer.h"
      42                 : #include "pgstat.h"
      43                 : #include "utils/memutils.h"
      44                 : #include "utils/rel.h"
      45                 : 
      46                 : 
      47                 : static TupleTableSlot *ExecGather(PlanState *pstate);
      48                 : static TupleTableSlot *gather_getnext(GatherState *gatherstate);
      49                 : static MinimalTuple gather_readnext(GatherState *gatherstate);
      50                 : static void ExecShutdownGatherWorkers(GatherState *node);
      51                 : 
      52                 : 
      53                 : /* ----------------------------------------------------------------
      54                 :  *      ExecInitGather
      55                 :  * ----------------------------------------------------------------
      56                 :  */
      57                 : GatherState *
      58 CBC         479 : ExecInitGather(Gather *node, EState *estate, int eflags)
      59                 : {
      60                 :     GatherState *gatherstate;
      61                 :     Plan       *outerNode;
      62                 :     TupleDesc   tupDesc;
      63                 : 
      64                 :     /* Gather node doesn't have innerPlan node. */
      65             479 :     Assert(innerPlan(node) == NULL);
      66                 : 
      67                 :     /*
      68                 :      * create state structure
      69                 :      */
      70             479 :     gatherstate = makeNode(GatherState);
      71             479 :     gatherstate->ps.plan = (Plan *) node;
      72             479 :     gatherstate->ps.state = estate;
      73             479 :     gatherstate->ps.ExecProcNode = ExecGather;
      74                 : 
      75             479 :     gatherstate->initialized = false;
      76             479 :     gatherstate->need_to_scan_locally =
      77             479 :         !node->single_copy && parallel_leader_participation;
      78             479 :     gatherstate->tuples_needed = -1;
      79                 : 
      80                 :     /*
      81                 :      * Miscellaneous initialization
      82                 :      *
      83                 :      * create expression context for node
      84                 :      */
      85             479 :     ExecAssignExprContext(estate, &gatherstate->ps);
      86                 : 
      87                 :     /*
      88                 :      * now initialize outer plan
      89                 :      */
      90             479 :     outerNode = outerPlan(node);
      91             479 :     outerPlanState(gatherstate) = ExecInitNode(outerNode, estate, eflags);
      92             479 :     tupDesc = ExecGetResultType(outerPlanState(gatherstate));
      93                 : 
      94                 :     /*
      95                 :      * Leader may access ExecProcNode result directly (if
      96                 :      * need_to_scan_locally), or from workers via tuple queue.  So we can't
      97                 :      * trivially rely on the slot type being fixed for expressions evaluated
      98                 :      * within this node.
      99                 :      */
     100             479 :     gatherstate->ps.outeropsset = true;
     101             479 :     gatherstate->ps.outeropsfixed = false;
     102                 : 
     103                 :     /*
     104                 :      * Initialize result type and projection.
     105                 :      */
     106             479 :     ExecInitResultTypeTL(&gatherstate->ps);
     107             479 :     ExecConditionalAssignProjectionInfo(&gatherstate->ps, tupDesc, OUTER_VAR);
     108                 : 
     109                 :     /*
     110                 :      * Without projections result slot type is not trivially known, see
     111                 :      * comment above.
     112                 :      */
     113             479 :     if (gatherstate->ps.ps_ProjInfo == NULL)
     114                 :     {
     115             458 :         gatherstate->ps.resultopsset = true;
     116             458 :         gatherstate->ps.resultopsfixed = false;
     117                 :     }
     118                 : 
     119                 :     /*
     120                 :      * Initialize funnel slot to same tuple descriptor as outer plan.
     121                 :      */
     122             479 :     gatherstate->funnel_slot = ExecInitExtraTupleSlot(estate, tupDesc,
     123                 :                                                       &TTSOpsMinimalTuple);
     124                 : 
     125                 :     /*
     126                 :      * Gather doesn't support checking a qual (it's always more efficient to
     127                 :      * do it in the child node).
     128                 :      */
     129             479 :     Assert(!node->plan.qual);
     130                 : 
     131             479 :     return gatherstate;
     132                 : }
     133                 : 
     134                 : /* ----------------------------------------------------------------
     135                 :  *      ExecGather(node)
     136                 :  *
     137                 :  *      Scans the relation via multiple workers and returns
     138                 :  *      the next qualifying tuple.
     139                 :  * ----------------------------------------------------------------
     140                 :  */
     141                 : static TupleTableSlot *
     142         1459186 : ExecGather(PlanState *pstate)
     143                 : {
     144         1459186 :     GatherState *node = castNode(GatherState, pstate);
     145                 :     TupleTableSlot *slot;
     146                 :     ExprContext *econtext;
     147                 : 
     148         1459186 :     CHECK_FOR_INTERRUPTS();
     149                 : 
     150                 :     /*
     151                 :      * Initialize the parallel context and workers on first execution. We do
     152                 :      * this on first execution rather than during node initialization, as it
     153                 :      * needs to allocate a large dynamic segment, so it is better to do it
     154                 :      * only if it is really needed.
     155                 :      */
     156         1459186 :     if (!node->initialized)
     157                 :     {
     158             383 :         EState     *estate = node->ps.state;
     159             383 :         Gather     *gather = (Gather *) node->ps.plan;
     160                 : 
     161                 :         /*
     162                 :          * Sometimes we might have to run without parallelism; but if parallel
     163                 :          * mode is active then we can try to fire up some workers.
     164                 :          */
     165             383 :         if (gather->num_workers > 0 && estate->es_use_parallel_mode)
     166                 :         {
     167                 :             ParallelContext *pcxt;
     168                 : 
     169                 :             /* Initialize, or re-initialize, shared state needed by workers. */
     170             380 :             if (!node->pei)
     171 GNC         266 :                 node->pei = ExecInitParallelPlan(outerPlanState(node),
     172                 :                                                  estate,
     173                 :                                                  gather->initParam,
     174                 :                                                  gather->num_workers,
     175                 :                                                  node->tuples_needed);
     176                 :             else
     177             114 :                 ExecParallelReinitialize(outerPlanState(node),
     178 CBC         114 :                                          node->pei,
     179                 :                                          gather->initParam);
     180                 : 
     181                 :             /*
     182                 :              * Register backend workers. We might not get as many as we
     183                 :              * requested, or indeed any at all.
     184                 :              */
     185             380 :             pcxt = node->pei->pcxt;
     186             380 :             LaunchParallelWorkers(pcxt);
     187                 :             /* We save # workers launched for the benefit of EXPLAIN */
     188             380 :             node->nworkers_launched = pcxt->nworkers_launched;
     189                 : 
     190                 :             /* Set up tuple queue readers to read the results. */
     191             380 :             if (pcxt->nworkers_launched > 0)
     192                 :             {
     193             377 :                 ExecParallelCreateReaders(node->pei);
     194                 :                 /* Make a working array showing the active readers */
     195             377 :                 node->nreaders = pcxt->nworkers_launched;
     196             377 :                 node->reader = (TupleQueueReader **)
     197             377 :                     palloc(node->nreaders * sizeof(TupleQueueReader *));
     198             377 :                 memcpy(node->reader, node->pei->reader,
     199             377 :                        node->nreaders * sizeof(TupleQueueReader *));
     200                 :             }
     201                 :             else
     202                 :             {
     203                 :                 /* No workers?  Then never mind. */
     204               3 :                 node->nreaders = 0;
     205               3 :                 node->reader = NULL;
     206                 :             }
     207             380 :             node->nextreader = 0;
     208                 :         }
     209                 : 
     210                 :         /* Run plan locally if no workers or enabled and not single-copy. */
     211             766 :         node->need_to_scan_locally = (node->nreaders == 0)
     212             383 :             || (!gather->single_copy && parallel_leader_participation);
     213             383 :         node->initialized = true;
     214                 :     }
     215                 : 
     216                 :     /*
     217                 :      * Reset per-tuple memory context to free any expression evaluation
     218                 :      * storage allocated in the previous tuple cycle.
     219                 :      */
     220         1459186 :     econtext = node->ps.ps_ExprContext;
     221         1459186 :     ResetExprContext(econtext);
     222                 : 
     223                 :     /*
     224                 :      * Get next tuple, either from one of our workers, or by running the plan
     225                 :      * ourselves.
     226                 :      */
     227         1459186 :     slot = gather_getnext(node);
     228         1459183 :     if (TupIsNull(slot))
     229             380 :         return NULL;
     230                 : 
     231                 :     /* If no projection is required, we're done. */
     232         1458803 :     if (node->ps.ps_ProjInfo == NULL)
     233         1458803 :         return slot;
     234                 : 
     235                 :     /*
     236                 :      * Form the result tuple using ExecProject(), and return it.
     237                 :      */
     238 UBC           0 :     econtext->ecxt_outertuple = slot;
     239               0 :     return ExecProject(node->ps.ps_ProjInfo);
     240                 : }
     241                 : 
     242                 : /* ----------------------------------------------------------------
     243                 :  *      ExecEndGather
     244                 :  *
     245                 :  *      frees any storage allocated through C routines.
     246                 :  * ----------------------------------------------------------------
     247                 :  */
     248                 : void
     249 CBC         476 : ExecEndGather(GatherState *node)
     250                 : {
     251             476 :     ExecEndNode(outerPlanState(node));  /* let children clean up first */
     252             476 :     ExecShutdownGather(node);
     253             476 :     ExecFreeExprContext(&node->ps);
     254             476 :     if (node->ps.ps_ResultTupleSlot)
     255              21 :         ExecClearTuple(node->ps.ps_ResultTupleSlot);
     256             476 : }
     257                 : 
     258                 : /*
     259                 :  * Read the next tuple.  We might fetch a tuple from one of the tuple queues
     260                 :  * using gather_readnext, or if no tuple queue contains a tuple and the
     261                 :  * single_copy flag is not set, we might generate one locally instead.
     262                 :  */
     263                 : static TupleTableSlot *
     264         1459186 : gather_getnext(GatherState *gatherstate)
     265                 : {
     266         1459186 :     PlanState  *outerPlan = outerPlanState(gatherstate);
     267                 :     TupleTableSlot *outerTupleSlot;
     268         1459186 :     TupleTableSlot *fslot = gatherstate->funnel_slot;
     269                 :     MinimalTuple tup;
     270                 : 
     271         2918995 :     while (gatherstate->nreaders > 0 || gatherstate->need_to_scan_locally)
     272                 :     {
     273         1459429 :         CHECK_FOR_INTERRUPTS();
     274                 : 
     275         1459429 :         if (gatherstate->nreaders > 0)
     276                 :         {
     277         1422416 :             tup = gather_readnext(gatherstate);
     278                 : 
     279         1422413 :             if (HeapTupleIsValid(tup))
     280                 :             {
     281          720547 :                 ExecStoreMinimalTuple(tup,  /* tuple to store */
     282                 :                                       fslot,    /* slot to store the tuple */
     283                 :                                       false);   /* don't pfree tuple  */
     284          720547 :                 return fslot;
     285                 :             }
     286                 :         }
     287                 : 
     288          738879 :         if (gatherstate->need_to_scan_locally)
     289                 :         {
     290          738539 :             EState     *estate = gatherstate->ps.state;
     291                 : 
     292                 :             /* Install our DSA area while executing the plan. */
     293          738539 :             estate->es_query_dsa =
     294          738539 :                 gatherstate->pei ? gatherstate->pei->area : NULL;
     295          738539 :             outerTupleSlot = ExecProcNode(outerPlan);
     296          738539 :             estate->es_query_dsa = NULL;
     297                 : 
     298          738539 :             if (!TupIsNull(outerTupleSlot))
     299          738256 :                 return outerTupleSlot;
     300                 : 
     301             283 :             gatherstate->need_to_scan_locally = false;
     302                 :         }
     303                 :     }
     304                 : 
     305             380 :     return ExecClearTuple(fslot);
     306                 : }
     307                 : 
     308                 : /*
     309                 :  * Attempt to read a tuple from one of our parallel workers.
     310                 :  */
     311                 : static MinimalTuple
     312         1422416 : gather_readnext(GatherState *gatherstate)
     313                 : {
     314         1422416 :     int         nvisited = 0;
     315                 : 
     316                 :     for (;;)
     317         1915896 :     {
     318                 :         TupleQueueReader *reader;
     319                 :         MinimalTuple tup;
     320                 :         bool        readerdone;
     321                 : 
     322                 :         /* Check for async events, particularly messages from workers. */
     323         3338312 :         CHECK_FOR_INTERRUPTS();
     324                 : 
     325                 :         /*
     326                 :          * Attempt to read a tuple, but don't block if none is available.
     327                 :          *
     328                 :          * Note that TupleQueueReaderNext will just return NULL for a worker
     329                 :          * which fails to initialize.  We'll treat that worker as having
     330                 :          * produced no tuples; WaitForParallelWorkersToFinish will error out
     331                 :          * when we get there.
     332                 :          */
     333         3338309 :         Assert(gatherstate->nextreader < gatherstate->nreaders);
     334         3338309 :         reader = gatherstate->reader[gatherstate->nextreader];
     335         3338309 :         tup = TupleQueueReaderNext(reader, true, &readerdone);
     336                 : 
     337                 :         /*
     338                 :          * If this reader is done, remove it from our working array of active
     339                 :          * readers.  If all readers are done, we're outta here.
     340                 :          */
     341         3338309 :         if (readerdone)
     342                 :         {
     343            1015 :             Assert(!tup);
     344            1015 :             --gatherstate->nreaders;
     345            1015 :             if (gatherstate->nreaders == 0)
     346                 :             {
     347             374 :                 ExecShutdownGatherWorkers(gatherstate);
     348         1422413 :                 return NULL;
     349                 :             }
     350             641 :             memmove(&gatherstate->reader[gatherstate->nextreader],
     351             641 :                     &gatherstate->reader[gatherstate->nextreader + 1],
     352                 :                     sizeof(TupleQueueReader *)
     353             641 :                     * (gatherstate->nreaders - gatherstate->nextreader));
     354             641 :             if (gatherstate->nextreader >= gatherstate->nreaders)
     355             249 :                 gatherstate->nextreader = 0;
     356             641 :             continue;
     357                 :         }
     358                 : 
     359                 :         /* If we got a tuple, return it. */
     360         3337294 :         if (tup)
     361          720547 :             return tup;
     362                 : 
     363                 :         /*
     364                 :          * Advance nextreader pointer in round-robin fashion.  Note that we
     365                 :          * only reach this code if we weren't able to get a tuple from the
     366                 :          * current worker.  We used to advance the nextreader pointer after
     367                 :          * every tuple, but it turns out to be much more efficient to keep
     368                 :          * reading from the same queue until that would require blocking.
     369                 :          */
     370         2616747 :         gatherstate->nextreader++;
     371         2616747 :         if (gatherstate->nextreader >= gatherstate->nreaders)
     372          706498 :             gatherstate->nextreader = 0;
     373                 : 
     374                 :         /* Have we visited every (surviving) TupleQueueReader? */
     375         2616747 :         nvisited++;
     376         2616747 :         if (nvisited >= gatherstate->nreaders)
     377                 :         {
     378                 :             /*
     379                 :              * If (still) running plan locally, return NULL so caller can
     380                 :              * generate another tuple from the local copy of the plan.
     381                 :              */
     382          706233 :             if (gatherstate->need_to_scan_locally)
     383          701492 :                 return NULL;
     384                 : 
     385                 :             /* Nothing to do except wait for developments. */
     386            4741 :             (void) WaitLatch(MyLatch, WL_LATCH_SET | WL_EXIT_ON_PM_DEATH, 0,
     387                 :                              WAIT_EVENT_EXECUTE_GATHER);
     388            4741 :             ResetLatch(MyLatch);
     389            4741 :             nvisited = 0;
     390                 :         }
     391                 :     }
     392                 : }
     393                 : 
     394                 : /* ----------------------------------------------------------------
     395                 :  *      ExecShutdownGatherWorkers
     396                 :  *
     397                 :  *      Stop all the parallel workers.
     398                 :  * ----------------------------------------------------------------
     399                 :  */
     400                 : static void
     401            1272 : ExecShutdownGatherWorkers(GatherState *node)
     402                 : {
     403            1272 :     if (node->pei != NULL)
     404             751 :         ExecParallelFinish(node->pei);
     405                 : 
     406                 :     /* Flush local copy of reader array */
     407            1272 :     if (node->reader)
     408             374 :         pfree(node->reader);
     409            1272 :     node->reader = NULL;
     410            1272 : }
     411                 : 
     412                 : /* ----------------------------------------------------------------
     413                 :  *      ExecShutdownGather
     414                 :  *
     415                 :  *      Destroy the setup for parallel workers including parallel context.
     416                 :  * ----------------------------------------------------------------
     417                 :  */
     418                 : void
     419             748 : ExecShutdownGather(GatherState *node)
     420                 : {
     421             748 :     ExecShutdownGatherWorkers(node);
     422                 : 
     423                 :     /* Now destroy the parallel context. */
     424             748 :     if (node->pei != NULL)
     425                 :     {
     426             263 :         ExecParallelCleanup(node->pei);
     427             263 :         node->pei = NULL;
     428                 :     }
     429             748 : }
     430                 : 
     431                 : /* ----------------------------------------------------------------
     432                 :  *                      Join Support
     433                 :  * ----------------------------------------------------------------
     434                 :  */
     435                 : 
     436                 : /* ----------------------------------------------------------------
     437                 :  *      ExecReScanGather
     438                 :  *
     439                 :  *      Prepare to re-scan the result of a Gather.
     440                 :  * ----------------------------------------------------------------
     441                 :  */
     442                 : void
     443             150 : ExecReScanGather(GatherState *node)
     444                 : {
     445             150 :     Gather     *gather = (Gather *) node->ps.plan;
     446             150 :     PlanState  *outerPlan = outerPlanState(node);
     447                 : 
     448                 :     /* Make sure any existing workers are gracefully shut down */
     449             150 :     ExecShutdownGatherWorkers(node);
     450                 : 
     451                 :     /* Mark node so that shared state will be rebuilt at next call */
     452             150 :     node->initialized = false;
     453                 : 
     454                 :     /*
     455                 :      * Set child node's chgParam to tell it that the next scan might deliver a
     456                 :      * different set of rows within the leader process.  (The overall rowset
     457                 :      * shouldn't change, but the leader process's subset might; hence nodes
     458                 :      * between here and the parallel table scan node mustn't optimize on the
     459                 :      * assumption of an unchanging rowset.)
     460                 :      */
     461             150 :     if (gather->rescan_param >= 0)
     462             150 :         outerPlan->chgParam = bms_add_member(outerPlan->chgParam,
     463                 :                                              gather->rescan_param);
     464                 : 
     465                 :     /*
     466                 :      * If chgParam of subnode is not null then plan will be re-scanned by
     467                 :      * first ExecProcNode.  Note: because this does nothing if we have a
     468                 :      * rescan_param, it's currently guaranteed that parallel-aware child nodes
     469                 :      * will not see a ReScan call until after they get a ReInitializeDSM call.
     470                 :      * That ordering might not be something to rely on, though.  A good rule
     471                 :      * of thumb is that ReInitializeDSM should reset only shared state, ReScan
     472                 :      * should reset only local state, and anything that depends on both of
     473                 :      * those steps being finished must wait until the first ExecProcNode call.
     474                 :      */
     475             150 :     if (outerPlan->chgParam == NULL)
     476 UBC           0 :         ExecReScan(outerPlan);
     477 CBC         150 : }
        

Generated by: LCOV version v1.16-55-g56c0a2a