LCOV - differential code coverage report
Current view: top level - src/backend/executor - execAsync.c (source / functions) Coverage Total Hit UBC CBC
Current: Differential Code Coverage HEAD vs 15 Lines: 86.4 % 59 51 8 51
Current Date: 2023-04-08 15:15:32 Functions: 100.0 % 6 6 6
Baseline: 15
Baseline Date: 2023-04-08 15:09:40
Legend: Lines: hit not hit

           TLA  Line data    Source code
       1                 : /*-------------------------------------------------------------------------
       2                 :  *
       3                 :  * execAsync.c
       4                 :  *    Support routines for asynchronous execution
       5                 :  *
       6                 :  * Portions Copyright (c) 1996-2023, PostgreSQL Global Development Group
       7                 :  * Portions Copyright (c) 1994, Regents of the University of California
       8                 :  *
       9                 :  * IDENTIFICATION
      10                 :  *    src/backend/executor/execAsync.c
      11                 :  *
      12                 :  *-------------------------------------------------------------------------
      13                 :  */
      14                 : 
      15                 : #include "postgres.h"
      16                 : 
      17                 : #include "executor/execAsync.h"
      18                 : #include "executor/executor.h"
      19                 : #include "executor/nodeAppend.h"
      20                 : #include "executor/nodeForeignscan.h"
      21                 : 
      22                 : /*
      23                 :  * Asynchronously request a tuple from a designed async-capable node.
      24                 :  */
      25                 : void
      26 CBC        5772 : ExecAsyncRequest(AsyncRequest *areq)
      27                 : {
      28            5772 :     if (areq->requestee->chgParam != NULL)    /* something changed? */
      29               2 :         ExecReScan(areq->requestee); /* let ReScan handle this */
      30                 : 
      31                 :     /* must provide our own instrumentation support */
      32            5772 :     if (areq->requestee->instrument)
      33             810 :         InstrStartNode(areq->requestee->instrument);
      34                 : 
      35            5772 :     switch (nodeTag(areq->requestee))
      36                 :     {
      37            5772 :         case T_ForeignScanState:
      38            5772 :             ExecAsyncForeignScanRequest(areq);
      39            5772 :             break;
      40 UBC           0 :         default:
      41                 :             /* If the node doesn't support async, caller messed up. */
      42               0 :             elog(ERROR, "unrecognized node type: %d",
      43                 :                  (int) nodeTag(areq->requestee));
      44                 :     }
      45                 : 
      46 CBC        5772 :     ExecAsyncResponse(areq);
      47                 : 
      48                 :     /* must provide our own instrumentation support */
      49            5772 :     if (areq->requestee->instrument)
      50             810 :         InstrStopNode(areq->requestee->instrument,
      51             810 :                       TupIsNull(areq->result) ? 0.0 : 1.0);
      52            5772 : }
      53                 : 
      54                 : /*
      55                 :  * Give the asynchronous node a chance to configure the file descriptor event
      56                 :  * for which it wishes to wait.  We expect the node-type specific callback to
      57                 :  * make a single call of the following form:
      58                 :  *
      59                 :  * AddWaitEventToSet(set, WL_SOCKET_READABLE, fd, NULL, areq);
      60                 :  */
      61                 : void
      62             163 : ExecAsyncConfigureWait(AsyncRequest *areq)
      63                 : {
      64                 :     /* must provide our own instrumentation support */
      65             163 :     if (areq->requestee->instrument)
      66              18 :         InstrStartNode(areq->requestee->instrument);
      67                 : 
      68             163 :     switch (nodeTag(areq->requestee))
      69                 :     {
      70             163 :         case T_ForeignScanState:
      71             163 :             ExecAsyncForeignScanConfigureWait(areq);
      72             163 :             break;
      73 UBC           0 :         default:
      74                 :             /* If the node doesn't support async, caller messed up. */
      75               0 :             elog(ERROR, "unrecognized node type: %d",
      76                 :                  (int) nodeTag(areq->requestee));
      77                 :     }
      78                 : 
      79                 :     /* must provide our own instrumentation support */
      80 CBC         163 :     if (areq->requestee->instrument)
      81              18 :         InstrStopNode(areq->requestee->instrument, 0.0);
      82             163 : }
      83                 : 
      84                 : /*
      85                 :  * Call the asynchronous node back when a relevant event has occurred.
      86                 :  */
      87                 : void
      88             145 : ExecAsyncNotify(AsyncRequest *areq)
      89                 : {
      90                 :     /* must provide our own instrumentation support */
      91             145 :     if (areq->requestee->instrument)
      92              15 :         InstrStartNode(areq->requestee->instrument);
      93                 : 
      94             145 :     switch (nodeTag(areq->requestee))
      95                 :     {
      96             145 :         case T_ForeignScanState:
      97             145 :             ExecAsyncForeignScanNotify(areq);
      98             145 :             break;
      99 UBC           0 :         default:
     100                 :             /* If the node doesn't support async, caller messed up. */
     101               0 :             elog(ERROR, "unrecognized node type: %d",
     102                 :                  (int) nodeTag(areq->requestee));
     103                 :     }
     104                 : 
     105 CBC         145 :     ExecAsyncResponse(areq);
     106                 : 
     107                 :     /* must provide our own instrumentation support */
     108             145 :     if (areq->requestee->instrument)
     109              15 :         InstrStopNode(areq->requestee->instrument,
     110              15 :                       TupIsNull(areq->result) ? 0.0 : 1.0);
     111             145 : }
     112                 : 
     113                 : /*
     114                 :  * Call the requestor back when an asynchronous node has produced a result.
     115                 :  */
     116                 : void
     117            5920 : ExecAsyncResponse(AsyncRequest *areq)
     118                 : {
     119            5920 :     switch (nodeTag(areq->requestor))
     120                 :     {
     121            5920 :         case T_AppendState:
     122            5920 :             ExecAsyncAppendResponse(areq);
     123            5920 :             break;
     124 UBC           0 :         default:
     125                 :             /* If the node doesn't support async, caller messed up. */
     126               0 :             elog(ERROR, "unrecognized node type: %d",
     127                 :                  (int) nodeTag(areq->requestor));
     128                 :     }
     129 CBC        5920 : }
     130                 : 
     131                 : /*
     132                 :  * A requestee node should call this function to deliver the tuple to its
     133                 :  * requestor node.  The requestee node can call this from its ExecAsyncRequest
     134                 :  * or ExecAsyncNotify callback.
     135                 :  */
     136                 : void
     137            5766 : ExecAsyncRequestDone(AsyncRequest *areq, TupleTableSlot *result)
     138                 : {
     139            5766 :     areq->request_complete = true;
     140            5766 :     areq->result = result;
     141            5766 : }
     142                 : 
     143                 : /*
     144                 :  * A requestee node should call this function to indicate that it is pending
     145                 :  * for a callback.  The requestee node can call this from its ExecAsyncRequest
     146                 :  * or ExecAsyncNotify callback.
     147                 :  */
     148                 : void
     149             154 : ExecAsyncRequestPending(AsyncRequest *areq)
     150                 : {
     151             154 :     areq->callback_pending = true;
     152             154 :     areq->request_complete = false;
     153             154 :     areq->result = NULL;
     154             154 : }
        

Generated by: LCOV version v1.16-55-g56c0a2a