LCOV - differential code coverage report
Current view: top level - src/backend/executor - nodeMergeAppend.c (source / functions) Coverage Total Hit LBC UIC UBC GIC GNC CBC EUB ECB DCB
Current: Differential Code Coverage HEAD vs 15 Lines: 95.4 % 109 104 2 1 2 56 1 47 3 54 1
Current Date: 2023-04-08 15:15:32 Functions: 100.0 % 5 5 4 1 4
Baseline: 15
Baseline Date: 2023-04-08 15:09:40
Legend: Lines: hit not hit

           TLA  Line data    Source code
       1                 : /*-------------------------------------------------------------------------
       2                 :  *
       3                 :  * nodeMergeAppend.c
       4                 :  *    routines to handle MergeAppend nodes.
       5                 :  *
       6                 :  * Portions Copyright (c) 1996-2023, PostgreSQL Global Development Group
       7                 :  * Portions Copyright (c) 1994, Regents of the University of California
       8                 :  *
       9                 :  *
      10                 :  * IDENTIFICATION
      11                 :  *    src/backend/executor/nodeMergeAppend.c
      12                 :  *
      13                 :  *-------------------------------------------------------------------------
      14                 :  */
      15                 : /* INTERFACE ROUTINES
      16                 :  *      ExecInitMergeAppend     - initialize the MergeAppend node
      17                 :  *      ExecMergeAppend         - retrieve the next tuple from the node
      18                 :  *      ExecEndMergeAppend      - shut down the MergeAppend node
      19                 :  *      ExecReScanMergeAppend   - rescan the MergeAppend node
      20                 :  *
      21                 :  *   NOTES
      22                 :  *      A MergeAppend node contains a list of one or more subplans.
      23                 :  *      These are each expected to deliver tuples that are sorted according
      24                 :  *      to a common sort key.  The MergeAppend node merges these streams
      25                 :  *      to produce output sorted the same way.
      26                 :  *
      27                 :  *      MergeAppend nodes don't make use of their left and right
      28                 :  *      subtrees, rather they maintain a list of subplans so
      29                 :  *      a typical MergeAppend node looks like this in the plan tree:
      30                 :  *
      31                 :  *                 ...
      32                 :  *                 /
      33                 :  *              MergeAppend---+------+------+--- nil
      34                 :  *              /   \         |      |      |
      35                 :  *            nil   nil      ...    ...    ...
      36                 :  *                               subplans
      37                 :  */
      38                 : 
      39                 : #include "postgres.h"
      40                 : 
      41                 : #include "executor/execdebug.h"
      42                 : #include "executor/execPartition.h"
      43                 : #include "executor/nodeMergeAppend.h"
      44                 : #include "lib/binaryheap.h"
      45                 : #include "miscadmin.h"
      46                 : 
      47                 : /*
      48                 :  * We have one slot for each item in the heap array.  We use SlotNumber
      49                 :  * to store slot indexes.  This doesn't actually provide any formal
      50                 :  * type-safety, but it makes the code more self-documenting.
      51                 :  */
      52                 : typedef int32 SlotNumber;
      53                 : 
      54                 : static TupleTableSlot *ExecMergeAppend(PlanState *pstate);
      55                 : static int  heap_compare_slots(Datum a, Datum b, void *arg);
      56                 : 
      57                 : 
      58                 : /* ----------------------------------------------------------------
      59                 :  *      ExecInitMergeAppend
      60                 :  *
      61                 :  *      Begin all of the subscans of the MergeAppend node.
      62                 :  * ----------------------------------------------------------------
      63                 :  */
      64                 : MergeAppendState *
      65 CBC         207 : ExecInitMergeAppend(MergeAppend *node, EState *estate, int eflags)
      66                 : {
      67             207 :     MergeAppendState *mergestate = makeNode(MergeAppendState);
      68                 :     PlanState **mergeplanstates;
      69                 :     Bitmapset  *validsubplans;
      70                 :     int         nplans;
      71                 :     int         i,
      72                 :                 j;
      73                 : 
      74                 :     /* check for unsupported flags */
      75             207 :     Assert(!(eflags & (EXEC_FLAG_BACKWARD | EXEC_FLAG_MARK)));
      76                 : 
      77                 :     /*
      78                 :      * create new MergeAppendState for our node
      79                 :      */
      80             207 :     mergestate->ps.plan = (Plan *) node;
      81             207 :     mergestate->ps.state = estate;
      82             207 :     mergestate->ps.ExecProcNode = ExecMergeAppend;
      83                 : 
      84                 :     /* If run-time partition pruning is enabled, then set that up now */
      85 GNC         207 :     if (node->part_prune_index >= 0)
      86                 :     {
      87                 :         PartitionPruneState *prunestate;
      88                 : 
      89                 :         /*
      90                 :          * Set up pruning data structure.  This also initializes the set of
      91                 :          * subplans to initialize (validsubplans) by taking into account the
      92                 :          * result of performing initial pruning if any.
      93                 :          */
      94 CBC          27 :         prunestate = ExecInitPartitionPruning(&mergestate->ps,
      95              27 :                                               list_length(node->mergeplans),
      96                 :                                               node->part_prune_index,
      97                 :                                               node->apprelids,
      98                 :                                               &validsubplans);
      99 GIC          27 :         mergestate->ms_prune_state = prunestate;
     100 CBC          27 :         nplans = bms_num_members(validsubplans);
     101 ECB             : 
     102                 :         /*
     103                 :          * When no run-time pruning is required and there's at least one
     104                 :          * subplan, we can fill ms_valid_subplans immediately, preventing
     105                 :          * later calls to ExecFindMatchingSubPlans.
     106                 :          */
     107 GIC          27 :         if (!prunestate->do_exec_prune && nplans > 0)
     108 CBC          12 :             mergestate->ms_valid_subplans = bms_add_range(NULL, 0, nplans - 1);
     109 ECB             :     }
     110                 :     else
     111                 :     {
     112 GIC         180 :         nplans = list_length(node->mergeplans);
     113 ECB             : 
     114                 :         /*
     115                 :          * When run-time partition pruning is not enabled we can just mark all
     116                 :          * subplans as valid; they must also all be initialized.
     117                 :          */
     118 GIC         180 :         Assert(nplans > 0);
     119 CBC         180 :         mergestate->ms_valid_subplans = validsubplans =
     120             180 :             bms_add_range(NULL, 0, nplans - 1);
     121             180 :         mergestate->ms_prune_state = NULL;
     122 ECB             :     }
     123                 : 
     124 GIC         207 :     mergeplanstates = (PlanState **) palloc(nplans * sizeof(PlanState *));
     125 CBC         207 :     mergestate->mergeplans = mergeplanstates;
     126             207 :     mergestate->ms_nplans = nplans;
     127 ECB             : 
     128 GIC         207 :     mergestate->ms_slots = (TupleTableSlot **) palloc0(sizeof(TupleTableSlot *) * nplans);
     129 CBC         207 :     mergestate->ms_heap = binaryheap_allocate(nplans, heap_compare_slots,
     130 ECB             :                                               mergestate);
     131                 : 
     132                 :     /*
     133                 :      * Miscellaneous initialization
     134                 :      *
     135                 :      * MergeAppend nodes do have Result slots, which hold pointers to tuples,
     136                 :      * so we have to initialize them.  FIXME
     137                 :      */
     138 GIC         207 :     ExecInitResultTupleSlotTL(&mergestate->ps, &TTSOpsVirtual);
     139 ECB             : 
     140                 :     /* node returns slots from each of its subnodes, therefore not fixed */
     141 GIC         207 :     mergestate->ps.resultopsset = true;
     142 CBC         207 :     mergestate->ps.resultopsfixed = false;
     143 ECB             : 
     144                 :     /*
     145                 :      * call ExecInitNode on each of the valid plans to be executed and save
     146                 :      * the results into the mergeplanstates array.
     147                 :      */
     148 GIC         207 :     j = 0;
     149 CBC         207 :     i = -1;
     150             796 :     while ((i = bms_next_member(validsubplans, i)) >= 0)
     151 ECB             :     {
     152 GIC         589 :         Plan       *initNode = (Plan *) list_nth(node->mergeplans, i);
     153 ECB             : 
     154 GIC         589 :         mergeplanstates[j++] = ExecInitNode(initNode, estate, eflags);
     155 ECB             :     }
     156                 : 
     157 GIC         207 :     mergestate->ps.ps_ProjInfo = NULL;
     158 ECB             : 
     159                 :     /*
     160                 :      * initialize sort-key information
     161                 :      */
     162 GIC         207 :     mergestate->ms_nkeys = node->numCols;
     163 CBC         207 :     mergestate->ms_sortkeys = palloc0(sizeof(SortSupportData) * node->numCols);
     164 ECB             : 
     165 GIC         444 :     for (i = 0; i < node->numCols; i++)
     166 ECB             :     {
     167 GIC         237 :         SortSupport sortKey = mergestate->ms_sortkeys + i;
     168 ECB             : 
     169 GIC         237 :         sortKey->ssup_cxt = CurrentMemoryContext;
     170 CBC         237 :         sortKey->ssup_collation = node->collations[i];
     171             237 :         sortKey->ssup_nulls_first = node->nullsFirst[i];
     172             237 :         sortKey->ssup_attno = node->sortColIdx[i];
     173 ECB             : 
     174                 :         /*
     175                 :          * It isn't feasible to perform abbreviated key conversion, since
     176                 :          * tuples are pulled into mergestate's binary heap as needed.  It
     177                 :          * would likely be counter-productive to convert tuples into an
     178                 :          * abbreviated representation as they're pulled up, so opt out of that
     179                 :          * additional optimization entirely.
     180                 :          */
     181 GIC         237 :         sortKey->abbreviate = false;
     182 ECB             : 
     183 GIC         237 :         PrepareSortSupportFromOrderingOp(node->sortOperators[i], sortKey);
     184 ECB             :     }
     185                 : 
     186                 :     /*
     187                 :      * initialize to show we have not run the subplans yet
     188                 :      */
     189 GIC         207 :     mergestate->ms_initialized = false;
     190 ECB             : 
     191 GIC         207 :     return mergestate;
     192 ECB             : }
     193                 : 
     194                 : /* ----------------------------------------------------------------
     195                 :  *     ExecMergeAppend
     196                 :  *
     197                 :  *      Handles iteration over multiple subplans.
     198                 :  * ----------------------------------------------------------------
     199                 :  */
     200                 : static TupleTableSlot *
     201 GIC       21781 : ExecMergeAppend(PlanState *pstate)
     202 ECB             : {
     203 GIC       21781 :     MergeAppendState *node = castNode(MergeAppendState, pstate);
     204 ECB             :     TupleTableSlot *result;
     205                 :     SlotNumber  i;
     206                 : 
     207 GIC       21781 :     CHECK_FOR_INTERRUPTS();
     208 ECB             : 
     209 GIC       21781 :     if (!node->ms_initialized)
     210 ECB             :     {
     211                 :         /* Nothing to do if all subplans were pruned */
     212 GIC         100 :         if (node->ms_nplans == 0)
     213 CBC           9 :             return ExecClearTuple(node->ps.ps_ResultTupleSlot);
     214 ECB             : 
     215                 :         /*
     216                 :          * If we've yet to determine the valid subplans then do so now.  If
     217                 :          * run-time pruning is disabled then the valid subplans will always be
     218                 :          * set to all subplans.
     219                 :          */
     220 GIC          91 :         if (node->ms_valid_subplans == NULL)
     221 CBC           6 :             node->ms_valid_subplans =
     222               6 :                 ExecFindMatchingSubPlans(node->ms_prune_state, false);
     223 ECB             : 
     224                 :         /*
     225                 :          * First time through: pull the first tuple from each valid subplan,
     226                 :          * and set up the heap.
     227                 :          */
     228 GIC          91 :         i = -1;
     229 CBC         349 :         while ((i = bms_next_member(node->ms_valid_subplans, i)) >= 0)
     230 ECB             :         {
     231 GIC         258 :             node->ms_slots[i] = ExecProcNode(node->mergeplans[i]);
     232 CBC         258 :             if (!TupIsNull(node->ms_slots[i]))
     233             220 :                 binaryheap_add_unordered(node->ms_heap, Int32GetDatum(i));
     234 ECB             :         }
     235 GIC          91 :         binaryheap_build(node->ms_heap);
     236 CBC          91 :         node->ms_initialized = true;
     237 ECB             :     }
     238                 :     else
     239                 :     {
     240                 :         /*
     241                 :          * Otherwise, pull the next tuple from whichever subplan we returned
     242                 :          * from last time, and reinsert the subplan index into the heap,
     243                 :          * because it might now compare differently against the existing
     244                 :          * elements of the heap.  (We could perhaps simplify the logic a bit
     245                 :          * by doing this before returning from the prior call, but it's better
     246                 :          * to not pull tuples until necessary.)
     247                 :          */
     248 GIC       21681 :         i = DatumGetInt32(binaryheap_first(node->ms_heap));
     249 CBC       21681 :         node->ms_slots[i] = ExecProcNode(node->mergeplans[i]);
     250           21681 :         if (!TupIsNull(node->ms_slots[i]))
     251           21548 :             binaryheap_replace_first(node->ms_heap, Int32GetDatum(i));
     252 ECB             :         else
     253 GIC         133 :             (void) binaryheap_remove_first(node->ms_heap);
     254 ECB             :     }
     255                 : 
     256 GIC       21772 :     if (binaryheap_empty(node->ms_heap))
     257 ECB             :     {
     258                 :         /* All the subplans are exhausted, and so is the heap */
     259 GIC          64 :         result = ExecClearTuple(node->ps.ps_ResultTupleSlot);
     260 ECB             :     }
     261                 :     else
     262                 :     {
     263 GIC       21708 :         i = DatumGetInt32(binaryheap_first(node->ms_heap));
     264 CBC       21708 :         result = node->ms_slots[i];
     265 ECB             :     }
     266                 : 
     267 GIC       21772 :     return result;
     268 ECB             : }
     269                 : 
     270                 : /*
     271                 :  * Compare the tuples in the two given slots.
     272                 :  */
     273                 : static int32
     274 GIC       29704 : heap_compare_slots(Datum a, Datum b, void *arg)
     275 ECB             : {
     276 GIC       29704 :     MergeAppendState *node = (MergeAppendState *) arg;
     277 CBC       29704 :     SlotNumber  slot1 = DatumGetInt32(a);
     278           29704 :     SlotNumber  slot2 = DatumGetInt32(b);
     279 ECB             : 
     280 GIC       29704 :     TupleTableSlot *s1 = node->ms_slots[slot1];
     281 CBC       29704 :     TupleTableSlot *s2 = node->ms_slots[slot2];
     282 ECB             :     int         nkey;
     283                 : 
     284 GIC       29704 :     Assert(!TupIsNull(s1));
     285 CBC       29704 :     Assert(!TupIsNull(s2));
     286 ECB             : 
     287 GIC       31627 :     for (nkey = 0; nkey < node->ms_nkeys; nkey++)
     288 ECB             :     {
     289 GIC       29704 :         SortSupport sortKey = node->ms_sortkeys + nkey;
     290 CBC       29704 :         AttrNumber  attno = sortKey->ssup_attno;
     291 ECB             :         Datum       datum1,
     292                 :                     datum2;
     293                 :         bool        isNull1,
     294                 :                     isNull2;
     295                 :         int         compare;
     296                 : 
     297 GIC       29704 :         datum1 = slot_getattr(s1, attno, &isNull1);
     298 CBC       29704 :         datum2 = slot_getattr(s2, attno, &isNull2);
     299 ECB             : 
     300 GIC       29704 :         compare = ApplySortComparator(datum1, isNull1,
     301 ECB             :                                       datum2, isNull2,
     302                 :                                       sortKey);
     303 GIC       29704 :         if (compare != 0)
     304 ECB             :         {
     305 GIC       27781 :             INVERT_COMPARE_RESULT(compare);
     306 CBC       27781 :             return compare;
     307 ECB             :         }
     308                 :     }
     309 GIC        1923 :     return 0;
     310 ECB             : }
     311                 : 
     312                 : /* ----------------------------------------------------------------
     313                 :  *      ExecEndMergeAppend
     314                 :  *
     315                 :  *      Shuts down the subscans of the MergeAppend node.
     316                 :  *
     317                 :  *      Returns nothing of interest.
     318                 :  * ----------------------------------------------------------------
     319                 :  */
     320                 : void
     321 GIC         207 : ExecEndMergeAppend(MergeAppendState *node)
     322 ECB             : {
     323                 :     PlanState **mergeplans;
     324                 :     int         nplans;
     325                 :     int         i;
     326                 : 
     327                 :     /*
     328                 :      * get information from the node
     329                 :      */
     330 GIC         207 :     mergeplans = node->mergeplans;
     331 CBC         207 :     nplans = node->ms_nplans;
     332 ECB             : 
     333                 :     /*
     334                 :      * shut down each of the subscans
     335                 :      */
     336 GIC         796 :     for (i = 0; i < nplans; i++)
     337 CBC         589 :         ExecEndNode(mergeplans[i]);
     338             207 : }
     339 ECB             : 
     340                 : void
     341 GIC           9 : ExecReScanMergeAppend(MergeAppendState *node)
     342 ECB             : {
     343                 :     int         i;
     344                 : 
     345                 :     /*
     346                 :      * If any PARAM_EXEC Params used in pruning expressions have changed, then
     347                 :      * we'd better unset the valid subplans so that they are reselected for
     348                 :      * the new parameter values.
     349                 :      */
     350 GIC           9 :     if (node->ms_prune_state &&
     351 LBC           0 :         bms_overlap(node->ps.chgParam,
     352 UBC           0 :                     node->ms_prune_state->execparamids))
     353 EUB             :     {
     354 UIC           0 :         bms_free(node->ms_valid_subplans);
     355 UBC           0 :         node->ms_valid_subplans = NULL;
     356 EUB             :     }
     357                 : 
     358 GIC          27 :     for (i = 0; i < node->ms_nplans; i++)
     359 ECB             :     {
     360 GIC          18 :         PlanState  *subnode = node->mergeplans[i];
     361 ECB             : 
     362                 :         /*
     363                 :          * ExecReScan doesn't know about my subplans, so I have to do
     364                 :          * changed-parameter signaling myself.
     365                 :          */
     366 GIC          18 :         if (node->ps.chgParam != NULL)
     367 CBC          18 :             UpdateChangedParamSet(subnode, node->ps.chgParam);
     368 ECB             : 
     369                 :         /*
     370                 :          * If chgParam of subnode is not null then plan will be re-scanned by
     371                 :          * first ExecProcNode.
     372                 :          */
     373 GIC          18 :         if (subnode->chgParam == NULL)
     374 LBC           0 :             ExecReScan(subnode);
     375 EUB             :     }
     376 GIC           9 :     binaryheap_reset(node->ms_heap);
     377 CBC           9 :     node->ms_initialized = false;
     378               9 : }
        

Generated by: LCOV version v1.16-55-g56c0a2a