LCOV - differential code coverage report
Current view: top level - src/backend/executor - nodeGather.c (source / functions) Coverage Total Hit UBC GNC CBC DCB
Current: Differential Code Coverage HEAD vs 15 Lines: 97.9 % 141 138 3 2 136 2
Current Date: 2023-04-08 17:13:01 Functions: 100.0 % 8 8 1 7
Baseline: 15 Line coverage date bins:
Baseline Date: 2023-04-08 15:09:40 (240..) days: 97.9 % 141 138 3 2 136
Legend: Lines: hit not hit Function coverage date bins:
(240..) days: 100.0 % 8 8 1 7

 Age         Owner                  TLA  Line data    Source code
                                  1                 : /*-------------------------------------------------------------------------
                                  2                 :  *
                                  3                 :  * nodeGather.c
                                  4                 :  *    Support routines for scanning a plan via multiple workers.
                                  5                 :  *
                                  6                 :  * Portions Copyright (c) 1996-2023, PostgreSQL Global Development Group
                                  7                 :  * Portions Copyright (c) 1994, Regents of the University of California
                                  8                 :  *
                                  9                 :  * A Gather executor launches parallel workers to run multiple copies of a
                                 10                 :  * plan.  It can also run the plan itself, if the workers are not available
                                 11                 :  * or have not started up yet.  It then merges all of the results it produces
                                 12                 :  * and the results from the workers into a single output stream.  Therefore,
                                 13                 :  * it will normally be used with a plan where running multiple copies of the
                                 14                 :  * same plan does not produce duplicate output, such as parallel-aware
                                 15                 :  * SeqScan.
                                 16                 :  *
                                 17                 :  * Alternatively, a Gather node can be configured to use just one worker
                                 18                 :  * and the single-copy flag can be set.  In this case, the Gather node will
                                 19                 :  * run the plan in one worker and will not execute the plan itself.  In
                                 20                 :  * this case, it simply returns whatever tuples were returned by the worker.
                                 21                 :  * If a worker cannot be obtained, then it will run the plan itself and
                                 22                 :  * return the results.  Therefore, a plan used with a single-copy Gather
                                 23                 :  * node need not be parallel-aware.
                                 24                 :  *
                                 25                 :  * IDENTIFICATION
                                 26                 :  *    src/backend/executor/nodeGather.c
                                 27                 :  *
                                 28                 :  *-------------------------------------------------------------------------
                                 29                 :  */
                                 30                 : 
                                 31                 : #include "postgres.h"
                                 32                 : 
                                 33                 : #include "access/relscan.h"
                                 34                 : #include "access/xact.h"
                                 35                 : #include "executor/execdebug.h"
                                 36                 : #include "executor/execParallel.h"
                                 37                 : #include "executor/nodeGather.h"
                                 38                 : #include "executor/nodeSubplan.h"
                                 39                 : #include "executor/tqueue.h"
                                 40                 : #include "miscadmin.h"
                                 41                 : #include "optimizer/optimizer.h"
                                 42                 : #include "pgstat.h"
                                 43                 : #include "utils/memutils.h"
                                 44                 : #include "utils/rel.h"
                                 45                 : 
                                 46                 : 
                                 47                 : static TupleTableSlot *ExecGather(PlanState *pstate);
                                 48                 : static TupleTableSlot *gather_getnext(GatherState *gatherstate);
                                 49                 : static MinimalTuple gather_readnext(GatherState *gatherstate);
                                 50                 : static void ExecShutdownGatherWorkers(GatherState *node);
                                 51                 : 
                                 52                 : 
                                 53                 : /* ----------------------------------------------------------------
                                 54                 :  *      ExecInitGather
                                 55                 :  * ----------------------------------------------------------------
                                 56                 :  */
                                 57                 : GatherState *
 2748 rhaas                      58 CBC         479 : ExecInitGather(Gather *node, EState *estate, int eflags)
                                 59                 : {
                                 60                 :     GatherState *gatherstate;
                                 61                 :     Plan       *outerNode;
                                 62                 :     TupleDesc   tupDesc;
                                 63                 : 
                                 64                 :     /* Gather node doesn't have innerPlan node. */
                                 65             479 :     Assert(innerPlan(node) == NULL);
                                 66                 : 
                                 67                 :     /*
                                 68                 :      * create state structure
                                 69                 :      */
                                 70             479 :     gatherstate = makeNode(GatherState);
                                 71             479 :     gatherstate->ps.plan = (Plan *) node;
                                 72             479 :     gatherstate->ps.state = estate;
 2092 andres                     73             479 :     gatherstate->ps.ExecProcNode = ExecGather;
                                 74                 : 
 2048 tgl                        75             479 :     gatherstate->initialized = false;
 1971 rhaas                      76             479 :     gatherstate->need_to_scan_locally =
                                 77             479 :         !node->single_copy && parallel_leader_participation;
 2049                            78             479 :     gatherstate->tuples_needed = -1;
                                 79                 : 
                                 80                 :     /*
                                 81                 :      * Miscellaneous initialization
                                 82                 :      *
                                 83                 :      * create expression context for node
                                 84                 :      */
 2748                            85             479 :     ExecAssignExprContext(estate, &gatherstate->ps);
                                 86                 : 
                                 87                 :     /*
                                 88                 :      * now initialize outer plan
                                 89                 :      */
 2720                            90             479 :     outerNode = outerPlan(node);
                                 91             479 :     outerPlanState(gatherstate) = ExecInitNode(outerNode, estate, eflags);
 1878 andres                     92             479 :     tupDesc = ExecGetResultType(outerPlanState(gatherstate));
                                 93                 : 
                                 94                 :     /*
                                 95                 :      * Leader may access ExecProcNode result directly (if
                                 96                 :      * need_to_scan_locally), or from workers via tuple queue.  So we can't
                                 97                 :      * trivially rely on the slot type being fixed for expressions evaluated
                                 98                 :      * within this node.
                                 99                 :      */
 1606                           100             479 :     gatherstate->ps.outeropsset = true;
                                101             479 :     gatherstate->ps.outeropsfixed = false;
                                102                 : 
                                103                 :     /*
                                104                 :      * Initialize result type and projection.
                                105                 :      */
 1612                           106             479 :     ExecInitResultTypeTL(&gatherstate->ps);
 1878                           107             479 :     ExecConditionalAssignProjectionInfo(&gatherstate->ps, tupDesc, OUTER_VAR);
                                108                 : 
                                109                 :     /*
                                110                 :      * Without projections result slot type is not trivially known, see
                                111                 :      * comment above.
                                112                 :      */
 1606                           113             479 :     if (gatherstate->ps.ps_ProjInfo == NULL)
                                114                 :     {
                                115             458 :         gatherstate->ps.resultopsset = true;
                                116             458 :         gatherstate->ps.resultopsfixed = false;
                                117                 :     }
                                118                 : 
                                119                 :     /*
                                120                 :      * Initialize funnel slot to same tuple descriptor as outer plan.
                                121                 :      */
                                122             479 :     gatherstate->funnel_slot = ExecInitExtraTupleSlot(estate, tupDesc,
                                123                 :                                                       &TTSOpsMinimalTuple);
                                124                 : 
                                125                 :     /*
                                126                 :      * Gather doesn't support checking a qual (it's always more efficient to
                                127                 :      * do it in the child node).
                                128                 :      */
 1878                           129             479 :     Assert(!node->plan.qual);
                                130                 : 
 2748 rhaas                     131             479 :     return gatherstate;
                                132                 : }
                                133                 : 
                                134                 : /* ----------------------------------------------------------------
                                135                 :  *      ExecGather(node)
                                136                 :  *
                                137                 :  *      Scans the relation via multiple workers and returns
                                138                 :  *      the next qualifying tuple.
                                139                 :  * ----------------------------------------------------------------
                                140                 :  */
                                141                 : static TupleTableSlot *
 2092 andres                    142         1459186 : ExecGather(PlanState *pstate)
                                143                 : {
                                144         1459186 :     GatherState *node = castNode(GatherState, pstate);
                                145                 :     TupleTableSlot *slot;
                                146                 :     ExprContext *econtext;
                                147                 : 
 2084                           148         1459186 :     CHECK_FOR_INTERRUPTS();
                                149                 : 
                                150                 :     /*
                                151                 :      * Initialize the parallel context and workers on first execution. We do
                                152                 :      * this on first execution rather than during node initialization, as it
                                153                 :      * needs to allocate a large dynamic segment, so it is better to do it
                                154                 :      * only if it is really needed.
                                155                 :      */
 2732 rhaas                     156         1459186 :     if (!node->initialized)
                                157                 :     {
 2748                           158             383 :         EState     *estate = node->ps.state;
 2732                           159             383 :         Gather     *gather = (Gather *) node->ps.plan;
                                160                 : 
                                161                 :         /*
                                162                 :          * Sometimes we might have to run without parallelism; but if parallel
                                163                 :          * mode is active then we can try to fire up some workers.
                                164                 :          */
 1990                           165             383 :         if (gather->num_workers > 0 && estate->es_use_parallel_mode)
                                166                 :         {
                                167                 :             ParallelContext *pcxt;
                                168                 : 
                                169                 :             /* Initialize, or re-initialize, shared state needed by workers. */
 2718                           170             380 :             if (!node->pei)
  276 tgl                       171 GNC         266 :                 node->pei = ExecInitParallelPlan(outerPlanState(node),
                                172                 :                                                  estate,
                                173                 :                                                  gather->initParam,
                                174                 :                                                  gather->num_workers,
                                175                 :                                                  node->tuples_needed);
                                176                 :             else
                                177             114 :                 ExecParallelReinitialize(outerPlanState(node),
 1970 rhaas                     178 CBC         114 :                                          node->pei,
                                179                 :                                          gather->initParam);
                                180                 : 
                                181                 :             /*
                                182                 :              * Register backend workers. We might not get as many as we
                                183                 :              * requested, or indeed any at all.
                                184                 :              */
 2711                           185             380 :             pcxt = node->pei->pcxt;
                                186             380 :             LaunchParallelWorkers(pcxt);
                                187                 :             /* We save # workers launched for the benefit of EXPLAIN */
 2550                           188             380 :             node->nworkers_launched = pcxt->nworkers_launched;
                                189                 : 
                                190                 :             /* Set up tuple queue readers to read the results. */
 2592                           191             380 :             if (pcxt->nworkers_launched > 0)
                                192                 :             {
 2033 andres                    193             377 :                 ExecParallelCreateReaders(node->pei);
                                194                 :                 /* Make a working array showing the active readers */
 2046 tgl                       195             377 :                 node->nreaders = pcxt->nworkers_launched;
                                196             377 :                 node->reader = (TupleQueueReader **)
                                197             377 :                     palloc(node->nreaders * sizeof(TupleQueueReader *));
                                198             377 :                 memcpy(node->reader, node->pei->reader,
                                199             377 :                        node->nreaders * sizeof(TupleQueueReader *));
                                200                 :             }
                                201                 :             else
                                202                 :             {
                                203                 :                 /* No workers?  Then never mind. */
                                204               3 :                 node->nreaders = 0;
                                205               3 :                 node->reader = NULL;
                                206                 :             }
                                207             380 :             node->nextreader = 0;
                                208                 :         }
                                209                 : 
                                210                 :         /* Run plan locally if no workers or enabled and not single-copy. */
                                211             766 :         node->need_to_scan_locally = (node->nreaders == 0)
 1971 rhaas                     212             383 :             || (!gather->single_copy && parallel_leader_participation);
 2732                           213             383 :         node->initialized = true;
                                214                 :     }
                                215                 : 
                                216                 :     /*
                                217                 :      * Reset per-tuple memory context to free any expression evaluation
                                218                 :      * storage allocated in the previous tuple cycle.
                                219                 :      */
 2720                           220         1459186 :     econtext = node->ps.ps_ExprContext;
                                221         1459186 :     ResetExprContext(econtext);
                                222                 : 
                                223                 :     /*
                                224                 :      * Get next tuple, either from one of our workers, or by running the plan
                                225                 :      * ourselves.
                                226                 :      */
 2268 tgl                       227         1459186 :     slot = gather_getnext(node);
                                228         1459183 :     if (TupIsNull(slot))
                                229             380 :         return NULL;
                                230                 : 
                                231                 :     /* If no projection is required, we're done. */
 1961 rhaas                     232         1458803 :     if (node->ps.ps_ProjInfo == NULL)
                                233         1458803 :         return slot;
                                234                 : 
                                235                 :     /*
                                236                 :      * Form the result tuple using ExecProject(), and return it.
                                237                 :      */
 2268 tgl                       238 UBC           0 :     econtext->ecxt_outertuple = slot;
                                239               0 :     return ExecProject(node->ps.ps_ProjInfo);
                                240                 : }
                                241                 : 
                                242                 : /* ----------------------------------------------------------------
                                243                 :  *      ExecEndGather
                                244                 :  *
                                245                 :  *      frees any storage allocated through C routines.
                                246                 :  * ----------------------------------------------------------------
                                247                 :  */
                                248                 : void
 2748 rhaas                     249 CBC         476 : ExecEndGather(GatherState *node)
                                250                 : {
 2153 bruce                     251             476 :     ExecEndNode(outerPlanState(node));  /* let children clean up first */
 2748 rhaas                     252             476 :     ExecShutdownGather(node);
                                253             476 :     ExecFreeExprContext(&node->ps);
 1612 andres                    254             476 :     if (node->ps.ps_ResultTupleSlot)
                                255              21 :         ExecClearTuple(node->ps.ps_ResultTupleSlot);
 2748 rhaas                     256             476 : }
                                257                 : 
                                258                 : /*
                                259                 :  * Read the next tuple.  We might fetch a tuple from one of the tuple queues
                                260                 :  * using gather_readnext, or if no tuple queue contains a tuple and the
                                261                 :  * single_copy flag is not set, we might generate one locally instead.
                                262                 :  */
                                263                 : static TupleTableSlot *
                                264         1459186 : gather_getnext(GatherState *gatherstate)
                                265                 : {
 2720                           266         1459186 :     PlanState  *outerPlan = outerPlanState(gatherstate);
                                267                 :     TupleTableSlot *outerTupleSlot;
                                268         1459186 :     TupleTableSlot *fslot = gatherstate->funnel_slot;
                                269                 :     MinimalTuple tup;
                                270                 : 
 2046 tgl                       271         2918995 :     while (gatherstate->nreaders > 0 || gatherstate->need_to_scan_locally)
                                272                 :     {
 2084 andres                    273         1459429 :         CHECK_FOR_INTERRUPTS();
                                274                 : 
 2046 tgl                       275         1459429 :         if (gatherstate->nreaders > 0)
                                276                 :         {
 2711 rhaas                     277         1422416 :             tup = gather_readnext(gatherstate);
                                278                 : 
 2748                           279         1422413 :             if (HeapTupleIsValid(tup))
                                280                 :             {
  697 tgl                       281          720547 :                 ExecStoreMinimalTuple(tup,  /* tuple to store */
                                282                 :                                       fslot,    /* slot to store the tuple */
                                283                 :                                       false);   /* don't pfree tuple  */
 2720 rhaas                     284          720547 :                 return fslot;
                                285                 :             }
                                286                 :         }
                                287                 : 
 2748                           288          738879 :         if (gatherstate->need_to_scan_locally)
                                289                 :         {
 1809 tgl                       290          738539 :             EState     *estate = gatherstate->ps.state;
                                291                 : 
                                292                 :             /* Install our DSA area while executing the plan. */
 1938 rhaas                     293          738539 :             estate->es_query_dsa =
                                294          738539 :                 gatherstate->pei ? gatherstate->pei->area : NULL;
 2748                           295          738539 :             outerTupleSlot = ExecProcNode(outerPlan);
 1938                           296          738539 :             estate->es_query_dsa = NULL;
                                297                 : 
 2748                           298          738539 :             if (!TupIsNull(outerTupleSlot))
                                299          738256 :                 return outerTupleSlot;
                                300                 : 
                                301             283 :             gatherstate->need_to_scan_locally = false;
                                302                 :         }
                                303                 :     }
                                304                 : 
 2720                           305             380 :     return ExecClearTuple(fslot);
                                306                 : }
                                307                 : 
                                308                 : /*
                                309                 :  * Attempt to read a tuple from one of our parallel workers.
                                310                 :  */
                                311                 : static MinimalTuple
 2711                           312         1422416 : gather_readnext(GatherState *gatherstate)
                                313                 : {
 2445 tgl                       314         1422416 :     int         nvisited = 0;
                                315                 : 
                                316                 :     for (;;)
 2711 rhaas                     317         1915896 :     {
                                318                 :         TupleQueueReader *reader;
                                319                 :         MinimalTuple tup;
                                320                 :         bool        readerdone;
                                321                 : 
                                322                 :         /* Check for async events, particularly messages from workers. */
 2442 tgl                       323         3338312 :         CHECK_FOR_INTERRUPTS();
                                324                 : 
                                325                 :         /*
                                326                 :          * Attempt to read a tuple, but don't block if none is available.
                                327                 :          *
                                328                 :          * Note that TupleQueueReaderNext will just return NULL for a worker
                                329                 :          * which fails to initialize.  We'll treat that worker as having
                                330                 :          * produced no tuples; WaitForParallelWorkersToFinish will error out
                                331                 :          * when we get there.
                                332                 :          */
 2316 rhaas                     333         3338309 :         Assert(gatherstate->nextreader < gatherstate->nreaders);
 2711                           334         3338309 :         reader = gatherstate->reader[gatherstate->nextreader];
                                335         3338309 :         tup = TupleQueueReaderNext(reader, true, &readerdone);
                                336                 : 
                                337                 :         /*
                                338                 :          * If this reader is done, remove it from our working array of active
                                339                 :          * readers.  If all readers are done, we're outta here.
                                340                 :          */
                                341         3338309 :         if (readerdone)
                                342                 :         {
 2445 tgl                       343            1015 :             Assert(!tup);
 2711 rhaas                     344            1015 :             --gatherstate->nreaders;
                                345            1015 :             if (gatherstate->nreaders == 0)
                                346                 :             {
 1710 akapila                   347             374 :                 ExecShutdownGatherWorkers(gatherstate);
 2711 rhaas                     348         1422413 :                 return NULL;
                                349                 :             }
 2445 tgl                       350             641 :             memmove(&gatherstate->reader[gatherstate->nextreader],
                                351             641 :                     &gatherstate->reader[gatherstate->nextreader + 1],
                                352                 :                     sizeof(TupleQueueReader *)
                                353             641 :                     * (gatherstate->nreaders - gatherstate->nextreader));
                                354             641 :             if (gatherstate->nextreader >= gatherstate->nreaders)
                                355             249 :                 gatherstate->nextreader = 0;
 2711 rhaas                     356             641 :             continue;
                                357                 :         }
                                358                 : 
                                359                 :         /* If we got a tuple, return it. */
                                360         3337294 :         if (tup)
                                361          720547 :             return tup;
                                362                 : 
                                363                 :         /*
                                364                 :          * Advance nextreader pointer in round-robin fashion.  Note that we
                                365                 :          * only reach this code if we weren't able to get a tuple from the
                                366                 :          * current worker.  We used to advance the nextreader pointer after
                                367                 :          * every tuple, but it turns out to be much more efficient to keep
                                368                 :          * reading from the same queue until that would require blocking.
                                369                 :          */
 2445 tgl                       370         2616747 :         gatherstate->nextreader++;
                                371         2616747 :         if (gatherstate->nextreader >= gatherstate->nreaders)
                                372          706498 :             gatherstate->nextreader = 0;
                                373                 : 
                                374                 :         /* Have we visited every (surviving) TupleQueueReader? */
                                375         2616747 :         nvisited++;
                                376         2616747 :         if (nvisited >= gatherstate->nreaders)
                                377                 :         {
                                378                 :             /*
                                379                 :              * If (still) running plan locally, return NULL so caller can
                                380                 :              * generate another tuple from the local copy of the plan.
                                381                 :              */
 2711 rhaas                     382          706233 :             if (gatherstate->need_to_scan_locally)
                                383          701492 :                 return NULL;
                                384                 : 
                                385                 :             /* Nothing to do except wait for developments. */
 1598 tmunro                    386            4741 :             (void) WaitLatch(MyLatch, WL_LATCH_SET | WL_EXIT_ON_PM_DEATH, 0,
                                387                 :                              WAIT_EVENT_EXECUTE_GATHER);
 2711 rhaas                     388            4741 :             ResetLatch(MyLatch);
 2445 tgl                       389            4741 :             nvisited = 0;
                                390                 :         }
                                391                 :     }
                                392                 : }
                                393                 : 
                                394                 : /* ----------------------------------------------------------------
                                395                 :  *      ExecShutdownGatherWorkers
                                396                 :  *
                                397                 :  *      Stop all the parallel workers.
                                398                 :  * ----------------------------------------------------------------
                                399                 :  */
                                400                 : static void
 2718 rhaas                     401            1272 : ExecShutdownGatherWorkers(GatherState *node)
                                402                 : {
 2732                           403            1272 :     if (node->pei != NULL)
                                404             751 :         ExecParallelFinish(node->pei);
                                405                 : 
                                406                 :     /* Flush local copy of reader array */
 2046 tgl                       407            1272 :     if (node->reader)
                                408             374 :         pfree(node->reader);
                                409            1272 :     node->reader = NULL;
 2718 rhaas                     410            1272 : }
                                411                 : 
                                412                 : /* ----------------------------------------------------------------
                                413                 :  *      ExecShutdownGather
                                414                 :  *
                                415                 :  *      Destroy the setup for parallel workers including parallel context.
                                416                 :  * ----------------------------------------------------------------
                                417                 :  */
                                418                 : void
                                419             748 : ExecShutdownGather(GatherState *node)
                                420                 : {
                                421             748 :     ExecShutdownGatherWorkers(node);
                                422                 : 
                                423                 :     /* Now destroy the parallel context. */
                                424             748 :     if (node->pei != NULL)
                                425                 :     {
 2732                           426             263 :         ExecParallelCleanup(node->pei);
                                427             263 :         node->pei = NULL;
                                428                 :     }
 2748                           429             748 : }
                                430                 : 
                                431                 : /* ----------------------------------------------------------------
                                432                 :  *                      Join Support
                                433                 :  * ----------------------------------------------------------------
                                434                 :  */
                                435                 : 
                                436                 : /* ----------------------------------------------------------------
                                437                 :  *      ExecReScanGather
                                438                 :  *
                                439                 :  *      Prepare to re-scan the result of a Gather.
                                440                 :  * ----------------------------------------------------------------
                                441                 :  */
                                442                 : void
                                443             150 : ExecReScanGather(GatherState *node)
                                444                 : {
 2048 tgl                       445             150 :     Gather     *gather = (Gather *) node->ps.plan;
                                446             150 :     PlanState  *outerPlan = outerPlanState(node);
                                447                 : 
                                448                 :     /* Make sure any existing workers are gracefully shut down */
 2718 rhaas                     449             150 :     ExecShutdownGatherWorkers(node);
                                450                 : 
                                451                 :     /* Mark node so that shared state will be rebuilt at next call */
 2732                           452             150 :     node->initialized = false;
                                453                 : 
                                454                 :     /*
                                455                 :      * Set child node's chgParam to tell it that the next scan might deliver a
                                456                 :      * different set of rows within the leader process.  (The overall rowset
                                457                 :      * shouldn't change, but the leader process's subset might; hence nodes
                                458                 :      * between here and the parallel table scan node mustn't optimize on the
                                459                 :      * assumption of an unchanging rowset.)
                                460                 :      */
 2048 tgl                       461             150 :     if (gather->rescan_param >= 0)
                                462             150 :         outerPlan->chgParam = bms_add_member(outerPlan->chgParam,
                                463                 :                                              gather->rescan_param);
                                464                 : 
                                465                 :     /*
                                466                 :      * If chgParam of subnode is not null then plan will be re-scanned by
                                467                 :      * first ExecProcNode.  Note: because this does nothing if we have a
                                468                 :      * rescan_param, it's currently guaranteed that parallel-aware child nodes
                                469                 :      * will not see a ReScan call until after they get a ReInitializeDSM call.
                                470                 :      * That ordering might not be something to rely on, though.  A good rule
                                471                 :      * of thumb is that ReInitializeDSM should reset only shared state, ReScan
                                472                 :      * should reset only local state, and anything that depends on both of
                                473                 :      * those steps being finished must wait until the first ExecProcNode call.
                                474                 :      */
                                475             150 :     if (outerPlan->chgParam == NULL)
 2048 tgl                       476 UBC           0 :         ExecReScan(outerPlan);
 2748 rhaas                     477 CBC         150 : }
        

Generated by: LCOV version v1.16-55-g56c0a2a