|            TLA  Line data    Source code 
       1                 : /*-------------------------------------------------------------------------
       2                 :  *
       3                 :  * nodeForeignscan.c
       4                 :  *    Routines to support scans of foreign tables
       5                 :  *
       6                 :  * Portions Copyright (c) 1996-2023, PostgreSQL Global Development Group
       7                 :  * Portions Copyright (c) 1994, Regents of the University of California
       8                 :  *
       9                 :  *
      10                 :  * IDENTIFICATION
      11                 :  *    src/backend/executor/nodeForeignscan.c
      12                 :  *
      13                 :  *-------------------------------------------------------------------------
      14                 :  */
      15                 : /*
      16                 :  * INTERFACE ROUTINES
      17                 :  *
      18                 :  *      ExecForeignScan         scans a foreign table.
      19                 :  *      ExecInitForeignScan     creates and initializes state info.
      20                 :  *      ExecReScanForeignScan   rescans the foreign relation.
      21                 :  *      ExecEndForeignScan      releases any resources allocated.
      22                 :  */
      23                 : #include "postgres.h"
      24                 : 
      25                 : #include "executor/executor.h"
      26                 : #include "executor/nodeForeignscan.h"
      27                 : #include "foreign/fdwapi.h"
      28                 : #include "utils/memutils.h"
      29                 : #include "utils/rel.h"
      30                 : 
      31                 : static TupleTableSlot *ForeignNext(ForeignScanState *node);
      32                 : static bool ForeignRecheck(ForeignScanState *node, TupleTableSlot *slot);
      33                 : 
      34                 : 
      35                 : /* ----------------------------------------------------------------
      36                 :  *      ForeignNext
      37                 :  *
      38                 :  *      This is a workhorse for ExecForeignScan
      39                 :  * ----------------------------------------------------------------
      40                 :  */
      41                 : static TupleTableSlot *
      42 CBC       70118 : ForeignNext(ForeignScanState *node)
      43                 : {
      44                 :     TupleTableSlot *slot;
      45           70118 :     ForeignScan *plan = (ForeignScan *) node->ss.ps.plan;
      46           70118 :     ExprContext *econtext = node->ss.ps.ps_ExprContext;
      47                 :     MemoryContext oldcontext;
      48                 : 
      49                 :     /* Call the Iterate function in short-lived context */
      50           70118 :     oldcontext = MemoryContextSwitchTo(econtext->ecxt_per_tuple_memory);
      51           70118 :     if (plan->operation != CMD_SELECT)
      52                 :     {
      53                 :         /*
      54                 :          * direct modifications cannot be re-evaluated, so shouldn't get here
      55                 :          * during EvalPlanQual processing
      56                 :          */
      57             418 :         Assert(node->ss.ps.state->es_epq_active == NULL);
      58                 : 
      59             418 :         slot = node->fdwroutine->IterateDirectModify(node);
      60                 :     }
      61                 :     else
      62           69700 :         slot = node->fdwroutine->IterateForeignScan(node);
      63           70105 :     MemoryContextSwitchTo(oldcontext);
      64                 : 
      65                 :     /*
      66                 :      * Insert valid value into tableoid, the only actually-useful system
      67                 :      * column.
      68                 :      */
      69           70105 :     if (plan->fsSystemCol && !TupIsNull(slot))
      70            2519 :         slot->tts_tableOid = RelationGetRelid(node->ss.ss_currentRelation);
      71                 : 
      72           70105 :     return slot;
      73                 : }
      74                 : 
      75                 : /*
      76                 :  * ForeignRecheck -- access method routine to recheck a tuple in EvalPlanQual
      77                 :  */
      78                 : static bool
      79 UBC           0 : ForeignRecheck(ForeignScanState *node, TupleTableSlot *slot)
      80                 : {
      81               0 :     FdwRoutine *fdwroutine = node->fdwroutine;
      82                 :     ExprContext *econtext;
      83                 : 
      84                 :     /*
      85                 :      * extract necessary information from foreign scan node
      86                 :      */
      87               0 :     econtext = node->ss.ps.ps_ExprContext;
      88                 : 
      89                 :     /* Does the tuple meet the remote qual condition? */
      90               0 :     econtext->ecxt_scantuple = slot;
      91                 : 
      92               0 :     ResetExprContext(econtext);
      93                 : 
      94                 :     /*
      95                 :      * If an outer join is pushed down, RecheckForeignScan may need to store a
      96                 :      * different tuple in the slot, because a different set of columns may go
      97                 :      * to NULL upon recheck.  Otherwise, it shouldn't need to change the slot
      98                 :      * contents, just return true or false to indicate whether the quals still
      99                 :      * pass.  For simple cases, setting fdw_recheck_quals may be easier than
     100                 :      * providing this callback.
     101                 :      */
     102               0 :     if (fdwroutine->RecheckForeignScan &&
     103               0 :         !fdwroutine->RecheckForeignScan(node, slot))
     104               0 :         return false;
     105                 : 
     106               0 :     return ExecQual(node->fdw_recheck_quals, econtext);
     107                 : }
     108                 : 
     109                 : /* ----------------------------------------------------------------
     110                 :  *      ExecForeignScan(node)
     111                 :  *
     112                 :  *      Fetches the next tuple from the FDW, checks local quals, and
     113                 :  *      returns it.
     114                 :  *      We call the ExecScan() routine and pass it the appropriate
     115                 :  *      access method functions.
     116                 :  * ----------------------------------------------------------------
     117                 :  */
     118                 : static TupleTableSlot *
     119 CBC       61032 : ExecForeignScan(PlanState *pstate)
     120                 : {
     121           61032 :     ForeignScanState *node = castNode(ForeignScanState, pstate);
     122           61032 :     ForeignScan *plan = (ForeignScan *) node->ss.ps.plan;
     123           61032 :     EState     *estate = node->ss.ps.state;
     124                 : 
     125                 :     /*
     126                 :      * Ignore direct modifications when EvalPlanQual is active --- they are
     127                 :      * irrelevant for EvalPlanQual rechecking
     128                 :      */
     129           61032 :     if (estate->es_epq_active != NULL && plan->operation != CMD_SELECT)
     130 UBC           0 :         return NULL;
     131                 : 
     132 CBC       61032 :     return ExecScan(&node->ss,
     133                 :                     (ExecScanAccessMtd) ForeignNext,
     134                 :                     (ExecScanRecheckMtd) ForeignRecheck);
     135                 : }
     136                 : 
     137                 : 
     138                 : /* ----------------------------------------------------------------
     139                 :  *      ExecInitForeignScan
     140                 :  * ----------------------------------------------------------------
     141                 :  */
     142                 : ForeignScanState *
     143             951 : ExecInitForeignScan(ForeignScan *node, EState *estate, int eflags)
     144                 : {
     145                 :     ForeignScanState *scanstate;
     146             951 :     Relation    currentRelation = NULL;
     147             951 :     Index       scanrelid = node->scan.scanrelid;
     148                 :     int         tlistvarno;
     149                 :     FdwRoutine *fdwroutine;
     150                 : 
     151                 :     /* check for unsupported flags */
     152             951 :     Assert(!(eflags & (EXEC_FLAG_BACKWARD | EXEC_FLAG_MARK)));
     153                 : 
     154                 :     /*
     155                 :      * create state structure
     156                 :      */
     157             951 :     scanstate = makeNode(ForeignScanState);
     158             951 :     scanstate->ss.ps.plan = (Plan *) node;
     159             951 :     scanstate->ss.ps.state = estate;
     160             951 :     scanstate->ss.ps.ExecProcNode = ExecForeignScan;
     161                 : 
     162                 :     /*
     163                 :      * Miscellaneous initialization
     164                 :      *
     165                 :      * create expression context for node
     166                 :      */
     167             951 :     ExecAssignExprContext(estate, &scanstate->ss.ps);
     168                 : 
     169                 :     /*
     170                 :      * open the scan relation, if any; also acquire function pointers from the
     171                 :      * FDW's handler
     172                 :      */
     173             951 :     if (scanrelid > 0)
     174                 :     {
     175             697 :         currentRelation = ExecOpenScanRelation(estate, scanrelid, eflags);
     176             697 :         scanstate->ss.ss_currentRelation = currentRelation;
     177             697 :         fdwroutine = GetFdwRoutineForRelation(currentRelation, true);
     178                 :     }
     179                 :     else
     180                 :     {
     181                 :         /* We can't use the relcache, so get fdwroutine the hard way */
     182             254 :         fdwroutine = GetFdwRoutineByServerId(node->fs_server);
     183                 :     }
     184                 : 
     185                 :     /*
     186                 :      * Determine the scan tuple type.  If the FDW provided a targetlist
     187                 :      * describing the scan tuples, use that; else use base relation's rowtype.
     188                 :      */
     189             951 :     if (node->fdw_scan_tlist != NIL || currentRelation == NULL)
     190             254 :     {
     191                 :         TupleDesc   scan_tupdesc;
     192                 : 
     193             254 :         scan_tupdesc = ExecTypeFromTL(node->fdw_scan_tlist);
     194             254 :         ExecInitScanTupleSlot(estate, &scanstate->ss, scan_tupdesc,
     195                 :                               &TTSOpsHeapTuple);
     196                 :         /* Node's targetlist will contain Vars with varno = INDEX_VAR */
     197             254 :         tlistvarno = INDEX_VAR;
     198                 :     }
     199                 :     else
     200                 :     {
     201                 :         TupleDesc   scan_tupdesc;
     202                 : 
     203                 :         /* don't trust FDWs to return tuples fulfilling NOT NULL constraints */
     204             697 :         scan_tupdesc = CreateTupleDescCopy(RelationGetDescr(currentRelation));
     205             697 :         ExecInitScanTupleSlot(estate, &scanstate->ss, scan_tupdesc,
     206                 :                               &TTSOpsHeapTuple);
     207                 :         /* Node's targetlist will contain Vars with varno = scanrelid */
     208             697 :         tlistvarno = scanrelid;
     209                 :     }
     210                 : 
     211                 :     /* Don't know what an FDW might return */
     212             951 :     scanstate->ss.ps.scanopsfixed = false;
     213             951 :     scanstate->ss.ps.scanopsset = true;
     214                 : 
     215                 :     /*
     216                 :      * Initialize result slot, type and projection.
     217                 :      */
     218             951 :     ExecInitResultTypeTL(&scanstate->ss.ps);
     219             951 :     ExecAssignScanProjectionInfoWithVarno(&scanstate->ss, tlistvarno);
     220                 : 
     221                 :     /*
     222                 :      * initialize child expressions
     223                 :      */
     224             951 :     scanstate->ss.ps.qual =
     225             951 :         ExecInitQual(node->scan.plan.qual, (PlanState *) scanstate);
     226             951 :     scanstate->fdw_recheck_quals =
     227             951 :         ExecInitQual(node->fdw_recheck_quals, (PlanState *) scanstate);
     228                 : 
     229                 :     /*
     230                 :      * Determine whether to scan the foreign relation asynchronously or not;
     231                 :      * this has to be kept in sync with the code in ExecInitAppend().
     232                 :      */
     233            1041 :     scanstate->ss.ps.async_capable = (((Plan *) node)->async_capable &&
     234              90 :                                       estate->es_epq_active == NULL);
     235                 : 
     236                 :     /*
     237                 :      * Initialize FDW-related state.
     238                 :      */
     239             951 :     scanstate->fdwroutine = fdwroutine;
     240             951 :     scanstate->fdw_state = NULL;
     241                 : 
     242                 :     /*
     243                 :      * For the FDW's convenience, look up the modification target relation's
     244                 :      * ResultRelInfo.  The ModifyTable node should have initialized it for us,
     245                 :      * see ExecInitModifyTable.
     246                 :      *
     247                 :      * Don't try to look up the ResultRelInfo when EvalPlanQual is active,
     248                 :      * though.  Direct modifications cannot be re-evaluated as part of
     249                 :      * EvalPlanQual.  The lookup wouldn't work anyway because during
     250                 :      * EvalPlanQual processing, EvalPlanQual only initializes the subtree
     251                 :      * under the ModifyTable, and doesn't run ExecInitModifyTable.
     252                 :      */
     253             951 :     if (node->resultRelation > 0 && estate->es_epq_active == NULL)
     254                 :     {
     255             104 :         if (estate->es_result_relations == NULL ||
     256             104 :             estate->es_result_relations[node->resultRelation - 1] == NULL)
     257                 :         {
     258 UBC           0 :             elog(ERROR, "result relation not initialized");
     259                 :         }
     260 CBC         104 :         scanstate->resultRelInfo = estate->es_result_relations[node->resultRelation - 1];
     261                 :     }
     262                 : 
     263                 :     /* Initialize any outer plan. */
     264             951 :     if (outerPlan(node))
     265              12 :         outerPlanState(scanstate) =
     266              12 :             ExecInitNode(outerPlan(node), estate, eflags);
     267                 : 
     268                 :     /*
     269                 :      * Tell the FDW to initialize the scan.
     270                 :      */
     271             951 :     if (node->operation != CMD_SELECT)
     272                 :     {
     273                 :         /*
     274                 :          * Direct modifications cannot be re-evaluated by EvalPlanQual, so
     275                 :          * don't bother preparing the FDW.
     276                 :          *
     277                 :          * In case of an inherited UPDATE/DELETE with foreign targets there
     278                 :          * can be direct-modify ForeignScan nodes in the EvalPlanQual subtree,
     279                 :          * so we need to ignore such ForeignScan nodes during EvalPlanQual
     280                 :          * processing.  See also ExecForeignScan/ExecReScanForeignScan.
     281                 :          */
     282             104 :         if (estate->es_epq_active == NULL)
     283             104 :             fdwroutine->BeginDirectModify(scanstate, eflags);
     284                 :     }
     285                 :     else
     286             847 :         fdwroutine->BeginForeignScan(scanstate, eflags);
     287                 : 
     288             943 :     return scanstate;
     289                 : }
     290                 : 
     291                 : /* ----------------------------------------------------------------
     292                 :  *      ExecEndForeignScan
     293                 :  *
     294                 :  *      frees any storage allocated through C routines.
     295                 :  * ----------------------------------------------------------------
     296                 :  */
     297                 : void
     298             920 : ExecEndForeignScan(ForeignScanState *node)
     299                 : {
     300             920 :     ForeignScan *plan = (ForeignScan *) node->ss.ps.plan;
     301             920 :     EState     *estate = node->ss.ps.state;
     302                 : 
     303                 :     /* Let the FDW shut down */
     304             920 :     if (plan->operation != CMD_SELECT)
     305                 :     {
     306              96 :         if (estate->es_epq_active == NULL)
     307              96 :             node->fdwroutine->EndDirectModify(node);
     308                 :     }
     309                 :     else
     310             824 :         node->fdwroutine->EndForeignScan(node);
     311                 : 
     312                 :     /* Shut down any outer plan. */
     313             920 :     if (outerPlanState(node))
     314              12 :         ExecEndNode(outerPlanState(node));
     315                 : 
     316                 :     /* Free the exprcontext */
     317             920 :     ExecFreeExprContext(&node->ss.ps);
     318                 : 
     319                 :     /* clean out the tuple table */
     320             920 :     if (node->ss.ps.ps_ResultTupleSlot)
     321             560 :         ExecClearTuple(node->ss.ps.ps_ResultTupleSlot);
     322             920 :     ExecClearTuple(node->ss.ss_ScanTupleSlot);
     323             920 : }
     324                 : 
     325                 : /* ----------------------------------------------------------------
     326                 :  *      ExecReScanForeignScan
     327                 :  *
     328                 :  *      Rescans the relation.
     329                 :  * ----------------------------------------------------------------
     330                 :  */
     331                 : void
     332             401 : ExecReScanForeignScan(ForeignScanState *node)
     333                 : {
     334             401 :     ForeignScan *plan = (ForeignScan *) node->ss.ps.plan;
     335             401 :     EState     *estate = node->ss.ps.state;
     336             401 :     PlanState  *outerPlan = outerPlanState(node);
     337                 : 
     338                 :     /*
     339                 :      * Ignore direct modifications when EvalPlanQual is active --- they are
     340                 :      * irrelevant for EvalPlanQual rechecking
     341                 :      */
     342             401 :     if (estate->es_epq_active != NULL && plan->operation != CMD_SELECT)
     343 UBC           0 :         return;
     344                 : 
     345 CBC         401 :     node->fdwroutine->ReScanForeignScan(node);
     346                 : 
     347                 :     /*
     348                 :      * If chgParam of subnode is not null then plan will be re-scanned by
     349                 :      * first ExecProcNode.  outerPlan may also be NULL, in which case there is
     350                 :      * nothing to rescan at all.
     351                 :      */
     352             401 :     if (outerPlan != NULL && outerPlan->chgParam == NULL)
     353              10 :         ExecReScan(outerPlan);
     354                 : 
     355             401 :     ExecScanReScan(&node->ss);
     356                 : }
     357                 : 
     358                 : /* ----------------------------------------------------------------
     359                 :  *      ExecForeignScanEstimate
     360                 :  *
     361                 :  *      Informs size of the parallel coordination information, if any
     362                 :  * ----------------------------------------------------------------
     363                 :  */
     364                 : void
     365 UBC           0 : ExecForeignScanEstimate(ForeignScanState *node, ParallelContext *pcxt)
     366                 : {
     367               0 :     FdwRoutine *fdwroutine = node->fdwroutine;
     368                 : 
     369               0 :     if (fdwroutine->EstimateDSMForeignScan)
     370                 :     {
     371               0 :         node->pscan_len = fdwroutine->EstimateDSMForeignScan(node, pcxt);
     372               0 :         shm_toc_estimate_chunk(&pcxt->estimator, node->pscan_len);
     373               0 :         shm_toc_estimate_keys(&pcxt->estimator, 1);
     374                 :     }
     375               0 : }
     376                 : 
     377                 : /* ----------------------------------------------------------------
     378                 :  *      ExecForeignScanInitializeDSM
     379                 :  *
     380                 :  *      Initialize the parallel coordination information
     381                 :  * ----------------------------------------------------------------
     382                 :  */
     383                 : void
     384               0 : ExecForeignScanInitializeDSM(ForeignScanState *node, ParallelContext *pcxt)
     385                 : {
     386               0 :     FdwRoutine *fdwroutine = node->fdwroutine;
     387                 : 
     388               0 :     if (fdwroutine->InitializeDSMForeignScan)
     389                 :     {
     390               0 :         int         plan_node_id = node->ss.ps.plan->plan_node_id;
     391                 :         void       *coordinate;
     392                 : 
     393               0 :         coordinate = shm_toc_allocate(pcxt->toc, node->pscan_len);
     394               0 :         fdwroutine->InitializeDSMForeignScan(node, pcxt, coordinate);
     395               0 :         shm_toc_insert(pcxt->toc, plan_node_id, coordinate);
     396                 :     }
     397               0 : }
     398                 : 
     399                 : /* ----------------------------------------------------------------
     400                 :  *      ExecForeignScanReInitializeDSM
     401                 :  *
     402                 :  *      Reset shared state before beginning a fresh scan.
     403                 :  * ----------------------------------------------------------------
     404                 :  */
     405                 : void
     406               0 : ExecForeignScanReInitializeDSM(ForeignScanState *node, ParallelContext *pcxt)
     407                 : {
     408               0 :     FdwRoutine *fdwroutine = node->fdwroutine;
     409                 : 
     410               0 :     if (fdwroutine->ReInitializeDSMForeignScan)
     411                 :     {
     412               0 :         int         plan_node_id = node->ss.ps.plan->plan_node_id;
     413                 :         void       *coordinate;
     414                 : 
     415               0 :         coordinate = shm_toc_lookup(pcxt->toc, plan_node_id, false);
     416               0 :         fdwroutine->ReInitializeDSMForeignScan(node, pcxt, coordinate);
     417                 :     }
     418               0 : }
     419                 : 
     420                 : /* ----------------------------------------------------------------
     421                 :  *      ExecForeignScanInitializeWorker
     422                 :  *
     423                 :  *      Initialization according to the parallel coordination information
     424                 :  * ----------------------------------------------------------------
     425                 :  */
     426                 : void
     427               0 : ExecForeignScanInitializeWorker(ForeignScanState *node,
     428                 :                                 ParallelWorkerContext *pwcxt)
     429                 : {
     430               0 :     FdwRoutine *fdwroutine = node->fdwroutine;
     431                 : 
     432               0 :     if (fdwroutine->InitializeWorkerForeignScan)
     433                 :     {
     434               0 :         int         plan_node_id = node->ss.ps.plan->plan_node_id;
     435                 :         void       *coordinate;
     436                 : 
     437               0 :         coordinate = shm_toc_lookup(pwcxt->toc, plan_node_id, false);
     438               0 :         fdwroutine->InitializeWorkerForeignScan(node, pwcxt->toc, coordinate);
     439                 :     }
     440               0 : }
     441                 : 
     442                 : /* ----------------------------------------------------------------
     443                 :  *      ExecShutdownForeignScan
     444                 :  *
     445                 :  *      Gives FDW chance to stop asynchronous resource consumption
     446                 :  *      and release any resources still held.
     447                 :  * ----------------------------------------------------------------
     448                 :  */
     449                 : void
     450 CBC         542 : ExecShutdownForeignScan(ForeignScanState *node)
     451                 : {
     452             542 :     FdwRoutine *fdwroutine = node->fdwroutine;
     453                 : 
     454             542 :     if (fdwroutine->ShutdownForeignScan)
     455 UBC           0 :         fdwroutine->ShutdownForeignScan(node);
     456 CBC         542 : }
     457                 : 
     458                 : /* ----------------------------------------------------------------
     459                 :  *      ExecAsyncForeignScanRequest
     460                 :  *
     461                 :  *      Asynchronously request a tuple from a designed async-capable node
     462                 :  * ----------------------------------------------------------------
     463                 :  */
     464                 : void
     465            5772 : ExecAsyncForeignScanRequest(AsyncRequest *areq)
     466                 : {
     467            5772 :     ForeignScanState *node = (ForeignScanState *) areq->requestee;
     468            5772 :     FdwRoutine *fdwroutine = node->fdwroutine;
     469                 : 
     470            5772 :     Assert(fdwroutine->ForeignAsyncRequest != NULL);
     471            5772 :     fdwroutine->ForeignAsyncRequest(areq);
     472            5772 : }
     473                 : 
     474                 : /* ----------------------------------------------------------------
     475                 :  *      ExecAsyncForeignScanConfigureWait
     476                 :  *
     477                 :  *      In async mode, configure for a wait
     478                 :  * ----------------------------------------------------------------
     479                 :  */
     480                 : void
     481             163 : ExecAsyncForeignScanConfigureWait(AsyncRequest *areq)
     482                 : {
     483             163 :     ForeignScanState *node = (ForeignScanState *) areq->requestee;
     484             163 :     FdwRoutine *fdwroutine = node->fdwroutine;
     485                 : 
     486             163 :     Assert(fdwroutine->ForeignAsyncConfigureWait != NULL);
     487             163 :     fdwroutine->ForeignAsyncConfigureWait(areq);
     488             163 : }
     489                 : 
     490                 : /* ----------------------------------------------------------------
     491                 :  *      ExecAsyncForeignScanNotify
     492                 :  *
     493                 :  *      Callback invoked when a relevant event has occurred
     494                 :  * ----------------------------------------------------------------
     495                 :  */
     496                 : void
     497             145 : ExecAsyncForeignScanNotify(AsyncRequest *areq)
     498                 : {
     499             145 :     ForeignScanState *node = (ForeignScanState *) areq->requestee;
     500             145 :     FdwRoutine *fdwroutine = node->fdwroutine;
     501                 : 
     502             145 :     Assert(fdwroutine->ForeignAsyncNotify != NULL);
     503             145 :     fdwroutine->ForeignAsyncNotify(areq);
     504             145 : }
         |