Age Owner TLA Line data Source code
1 : /*-------------------------------------------------------------------------
2 : *
3 : * nodeForeignscan.c
4 : * Routines to support scans of foreign tables
5 : *
6 : * Portions Copyright (c) 1996-2023, PostgreSQL Global Development Group
7 : * Portions Copyright (c) 1994, Regents of the University of California
8 : *
9 : *
10 : * IDENTIFICATION
11 : * src/backend/executor/nodeForeignscan.c
12 : *
13 : *-------------------------------------------------------------------------
14 : */
15 : /*
16 : * INTERFACE ROUTINES
17 : *
18 : * ExecForeignScan scans a foreign table.
19 : * ExecInitForeignScan creates and initializes state info.
20 : * ExecReScanForeignScan rescans the foreign relation.
21 : * ExecEndForeignScan releases any resources allocated.
22 : */
23 : #include "postgres.h"
24 :
25 : #include "executor/executor.h"
26 : #include "executor/nodeForeignscan.h"
27 : #include "foreign/fdwapi.h"
28 : #include "utils/memutils.h"
29 : #include "utils/rel.h"
30 :
31 : static TupleTableSlot *ForeignNext(ForeignScanState *node);
32 : static bool ForeignRecheck(ForeignScanState *node, TupleTableSlot *slot);
33 :
34 :
35 : /* ----------------------------------------------------------------
36 : * ForeignNext
37 : *
38 : * This is a workhorse for ExecForeignScan
39 : * ----------------------------------------------------------------
40 : */
41 : static TupleTableSlot *
4431 tgl 42 CBC 70118 : ForeignNext(ForeignScanState *node)
43 : {
44 : TupleTableSlot *slot;
4382 bruce 45 70118 : ForeignScan *plan = (ForeignScan *) node->ss.ps.plan;
4431 tgl 46 70118 : ExprContext *econtext = node->ss.ps.ps_ExprContext;
47 : MemoryContext oldcontext;
48 :
49 : /* Call the Iterate function in short-lived context */
50 70118 : oldcontext = MemoryContextSwitchTo(econtext->ecxt_per_tuple_memory);
2578 rhaas 51 70118 : if (plan->operation != CMD_SELECT)
52 : {
53 : /*
54 : * direct modifications cannot be re-evaluated, so shouldn't get here
55 : * during EvalPlanQual processing
56 : */
430 andres 57 418 : Assert(node->ss.ps.state->es_epq_active == NULL);
58 :
2578 rhaas 59 418 : slot = node->fdwroutine->IterateDirectModify(node);
60 : }
61 : else
62 69700 : slot = node->fdwroutine->IterateForeignScan(node);
4431 tgl 63 70105 : MemoryContextSwitchTo(oldcontext);
64 :
65 : /*
66 : * Insert valid value into tableoid, the only actually-useful system
67 : * column.
68 : */
69 70105 : if (plan->fsSystemCol && !TupIsNull(slot))
1503 andres 70 2519 : slot->tts_tableOid = RelationGetRelid(node->ss.ss_currentRelation);
71 :
4431 tgl 72 70105 : return slot;
73 : }
74 :
75 : /*
76 : * ForeignRecheck -- access method routine to recheck a tuple in EvalPlanQual
77 : */
78 : static bool
4431 tgl 79 UBC 0 : ForeignRecheck(ForeignScanState *node, TupleTableSlot *slot)
80 : {
2679 rhaas 81 0 : FdwRoutine *fdwroutine = node->fdwroutine;
82 : ExprContext *econtext;
83 :
84 : /*
85 : * extract necessary information from foreign scan node
86 : */
2733 87 0 : econtext = node->ss.ps.ps_ExprContext;
88 :
89 : /* Does the tuple meet the remote qual condition? */
90 0 : econtext->ecxt_scantuple = slot;
91 :
92 0 : ResetExprContext(econtext);
93 :
94 : /*
95 : * If an outer join is pushed down, RecheckForeignScan may need to store a
96 : * different tuple in the slot, because a different set of columns may go
97 : * to NULL upon recheck. Otherwise, it shouldn't need to change the slot
98 : * contents, just return true or false to indicate whether the quals still
99 : * pass. For simple cases, setting fdw_recheck_quals may be easier than
100 : * providing this callback.
101 : */
2679 102 0 : if (fdwroutine->RecheckForeignScan &&
103 0 : !fdwroutine->RecheckForeignScan(node, slot))
104 0 : return false;
105 :
2217 andres 106 0 : return ExecQual(node->fdw_recheck_quals, econtext);
107 : }
108 :
109 : /* ----------------------------------------------------------------
110 : * ExecForeignScan(node)
111 : *
112 : * Fetches the next tuple from the FDW, checks local quals, and
113 : * returns it.
114 : * We call the ExecScan() routine and pass it the appropriate
115 : * access method functions.
116 : * ----------------------------------------------------------------
117 : */
118 : static TupleTableSlot *
2092 andres 119 CBC 61032 : ExecForeignScan(PlanState *pstate)
120 : {
121 61032 : ForeignScanState *node = castNode(ForeignScanState, pstate);
430 efujita 122 61032 : ForeignScan *plan = (ForeignScan *) node->ss.ps.plan;
123 61032 : EState *estate = node->ss.ps.state;
124 :
125 : /*
126 : * Ignore direct modifications when EvalPlanQual is active --- they are
127 : * irrelevant for EvalPlanQual rechecking
128 : */
129 61032 : if (estate->es_epq_active != NULL && plan->operation != CMD_SELECT)
430 efujita 130 UBC 0 : return NULL;
131 :
2092 andres 132 CBC 61032 : return ExecScan(&node->ss,
133 : (ExecScanAccessMtd) ForeignNext,
134 : (ExecScanRecheckMtd) ForeignRecheck);
135 : }
136 :
137 :
138 : /* ----------------------------------------------------------------
139 : * ExecInitForeignScan
140 : * ----------------------------------------------------------------
141 : */
142 : ForeignScanState *
4431 tgl 143 951 : ExecInitForeignScan(ForeignScan *node, EState *estate, int eflags)
144 : {
145 : ForeignScanState *scanstate;
2891 146 951 : Relation currentRelation = NULL;
2900 rhaas 147 951 : Index scanrelid = node->scan.scanrelid;
148 : int tlistvarno;
149 : FdwRoutine *fdwroutine;
150 :
151 : /* check for unsupported flags */
4431 tgl 152 951 : Assert(!(eflags & (EXEC_FLAG_BACKWARD | EXEC_FLAG_MARK)));
153 :
154 : /*
155 : * create state structure
156 : */
157 951 : scanstate = makeNode(ForeignScanState);
158 951 : scanstate->ss.ps.plan = (Plan *) node;
159 951 : scanstate->ss.ps.state = estate;
2092 andres 160 951 : scanstate->ss.ps.ExecProcNode = ExecForeignScan;
161 :
162 : /*
163 : * Miscellaneous initialization
164 : *
165 : * create expression context for node
166 : */
4431 tgl 167 951 : ExecAssignExprContext(estate, &scanstate->ss.ps);
168 :
169 : /*
170 : * open the scan relation, if any; also acquire function pointers from the
171 : * FDW's handler
172 : */
2900 rhaas 173 951 : if (scanrelid > 0)
174 : {
175 697 : currentRelation = ExecOpenScanRelation(estate, scanrelid, eflags);
176 697 : scanstate->ss.ss_currentRelation = currentRelation;
2891 tgl 177 697 : fdwroutine = GetFdwRoutineForRelation(currentRelation, true);
178 : }
179 : else
180 : {
181 : /* We can't use the relcache, so get fdwroutine the hard way */
182 254 : fdwroutine = GetFdwRoutineByServerId(node->fs_server);
183 : }
184 :
185 : /*
186 : * Determine the scan tuple type. If the FDW provided a targetlist
187 : * describing the scan tuples, use that; else use base relation's rowtype.
188 : */
189 951 : if (node->fdw_scan_tlist != NIL || currentRelation == NULL)
190 254 : {
191 : TupleDesc scan_tupdesc;
192 :
1601 andres 193 254 : scan_tupdesc = ExecTypeFromTL(node->fdw_scan_tlist);
1606 194 254 : ExecInitScanTupleSlot(estate, &scanstate->ss, scan_tupdesc,
195 : &TTSOpsHeapTuple);
196 : /* Node's targetlist will contain Vars with varno = INDEX_VAR */
2891 tgl 197 254 : tlistvarno = INDEX_VAR;
198 : }
199 : else
200 : {
201 : TupleDesc scan_tupdesc;
202 :
203 : /* don't trust FDWs to return tuples fulfilling NOT NULL constraints */
1840 andres 204 697 : scan_tupdesc = CreateTupleDescCopy(RelationGetDescr(currentRelation));
1606 205 697 : ExecInitScanTupleSlot(estate, &scanstate->ss, scan_tupdesc,
206 : &TTSOpsHeapTuple);
207 : /* Node's targetlist will contain Vars with varno = scanrelid */
2891 tgl 208 697 : tlistvarno = scanrelid;
209 : }
210 :
211 : /* Don't know what an FDW might return */
1606 andres 212 951 : scanstate->ss.ps.scanopsfixed = false;
213 951 : scanstate->ss.ps.scanopsset = true;
214 :
215 : /*
216 : * Initialize result slot, type and projection.
217 : */
1612 218 951 : ExecInitResultTypeTL(&scanstate->ss.ps);
2891 tgl 219 951 : ExecAssignScanProjectionInfoWithVarno(&scanstate->ss, tlistvarno);
220 :
221 : /*
222 : * initialize child expressions
223 : */
1878 andres 224 951 : scanstate->ss.ps.qual =
225 951 : ExecInitQual(node->scan.plan.qual, (PlanState *) scanstate);
226 951 : scanstate->fdw_recheck_quals =
227 951 : ExecInitQual(node->fdw_recheck_quals, (PlanState *) scanstate);
228 :
229 : /*
230 : * Determine whether to scan the foreign relation asynchronously or not;
231 : * this has to be kept in sync with the code in ExecInitAppend().
232 : */
697 efujita 233 1041 : scanstate->ss.ps.async_capable = (((Plan *) node)->async_capable &&
234 90 : estate->es_epq_active == NULL);
235 :
236 : /*
237 : * Initialize FDW-related state.
238 : */
4431 tgl 239 951 : scanstate->fdwroutine = fdwroutine;
240 951 : scanstate->fdw_state = NULL;
241 :
242 : /*
243 : * For the FDW's convenience, look up the modification target relation's
244 : * ResultRelInfo. The ModifyTable node should have initialized it for us,
245 : * see ExecInitModifyTable.
246 : *
247 : * Don't try to look up the ResultRelInfo when EvalPlanQual is active,
248 : * though. Direct modifications cannot be re-evaluated as part of
249 : * EvalPlanQual. The lookup wouldn't work anyway because during
250 : * EvalPlanQual processing, EvalPlanQual only initializes the subtree
251 : * under the ModifyTable, and doesn't run ExecInitModifyTable.
252 : */
605 heikki.linnakangas 253 951 : if (node->resultRelation > 0 && estate->es_epq_active == NULL)
254 : {
255 104 : if (estate->es_result_relations == NULL ||
256 104 : estate->es_result_relations[node->resultRelation - 1] == NULL)
257 : {
605 heikki.linnakangas 258 UBC 0 : elog(ERROR, "result relation not initialized");
259 : }
907 heikki.linnakangas 260 CBC 104 : scanstate->resultRelInfo = estate->es_result_relations[node->resultRelation - 1];
261 : }
262 :
263 : /* Initialize any outer plan. */
2679 rhaas 264 951 : if (outerPlan(node))
265 12 : outerPlanState(scanstate) =
266 12 : ExecInitNode(outerPlan(node), estate, eflags);
267 :
268 : /*
269 : * Tell the FDW to initialize the scan.
270 : */
2578 271 951 : if (node->operation != CMD_SELECT)
272 : {
273 : /*
274 : * Direct modifications cannot be re-evaluated by EvalPlanQual, so
275 : * don't bother preparing the FDW.
276 : *
277 : * In case of an inherited UPDATE/DELETE with foreign targets there
278 : * can be direct-modify ForeignScan nodes in the EvalPlanQual subtree,
279 : * so we need to ignore such ForeignScan nodes during EvalPlanQual
280 : * processing. See also ExecForeignScan/ExecReScanForeignScan.
281 : */
605 heikki.linnakangas 282 104 : if (estate->es_epq_active == NULL)
283 104 : fdwroutine->BeginDirectModify(scanstate, eflags);
284 : }
285 : else
2578 rhaas 286 847 : fdwroutine->BeginForeignScan(scanstate, eflags);
287 :
4431 tgl 288 943 : return scanstate;
289 : }
290 :
291 : /* ----------------------------------------------------------------
292 : * ExecEndForeignScan
293 : *
294 : * frees any storage allocated through C routines.
295 : * ----------------------------------------------------------------
296 : */
297 : void
298 920 : ExecEndForeignScan(ForeignScanState *node)
299 : {
2578 rhaas 300 920 : ForeignScan *plan = (ForeignScan *) node->ss.ps.plan;
605 heikki.linnakangas 301 920 : EState *estate = node->ss.ps.state;
302 :
303 : /* Let the FDW shut down */
2578 rhaas 304 920 : if (plan->operation != CMD_SELECT)
305 : {
605 heikki.linnakangas 306 96 : if (estate->es_epq_active == NULL)
307 96 : node->fdwroutine->EndDirectModify(node);
308 : }
309 : else
2578 rhaas 310 824 : node->fdwroutine->EndForeignScan(node);
311 :
312 : /* Shut down any outer plan. */
2679 313 920 : if (outerPlanState(node))
314 12 : ExecEndNode(outerPlanState(node));
315 :
316 : /* Free the exprcontext */
4431 tgl 317 920 : ExecFreeExprContext(&node->ss.ps);
318 :
319 : /* clean out the tuple table */
1612 andres 320 920 : if (node->ss.ps.ps_ResultTupleSlot)
321 560 : ExecClearTuple(node->ss.ps.ps_ResultTupleSlot);
4431 tgl 322 920 : ExecClearTuple(node->ss.ss_ScanTupleSlot);
323 920 : }
324 :
325 : /* ----------------------------------------------------------------
326 : * ExecReScanForeignScan
327 : *
328 : * Rescans the relation.
329 : * ----------------------------------------------------------------
330 : */
331 : void
332 401 : ExecReScanForeignScan(ForeignScanState *node)
333 : {
430 efujita 334 401 : ForeignScan *plan = (ForeignScan *) node->ss.ps.plan;
335 401 : EState *estate = node->ss.ps.state;
2679 rhaas 336 401 : PlanState *outerPlan = outerPlanState(node);
337 :
338 : /*
339 : * Ignore direct modifications when EvalPlanQual is active --- they are
340 : * irrelevant for EvalPlanQual rechecking
341 : */
430 efujita 342 401 : if (estate->es_epq_active != NULL && plan->operation != CMD_SELECT)
430 efujita 343 UBC 0 : return;
344 :
4431 tgl 345 CBC 401 : node->fdwroutine->ReScanForeignScan(node);
346 :
347 : /*
348 : * If chgParam of subnode is not null then plan will be re-scanned by
349 : * first ExecProcNode. outerPlan may also be NULL, in which case there is
350 : * nothing to rescan at all.
351 : */
2679 rhaas 352 401 : if (outerPlan != NULL && outerPlan->chgParam == NULL)
353 10 : ExecReScan(outerPlan);
354 :
4431 tgl 355 401 : ExecScanReScan(&node->ss);
356 : }
357 :
358 : /* ----------------------------------------------------------------
359 : * ExecForeignScanEstimate
360 : *
361 : * Informs size of the parallel coordination information, if any
362 : * ----------------------------------------------------------------
363 : */
364 : void
2622 rhaas 365 UBC 0 : ExecForeignScanEstimate(ForeignScanState *node, ParallelContext *pcxt)
366 : {
367 0 : FdwRoutine *fdwroutine = node->fdwroutine;
368 :
369 0 : if (fdwroutine->EstimateDSMForeignScan)
370 : {
371 0 : node->pscan_len = fdwroutine->EstimateDSMForeignScan(node, pcxt);
372 0 : shm_toc_estimate_chunk(&pcxt->estimator, node->pscan_len);
373 0 : shm_toc_estimate_keys(&pcxt->estimator, 1);
374 : }
375 0 : }
376 :
377 : /* ----------------------------------------------------------------
378 : * ExecForeignScanInitializeDSM
379 : *
380 : * Initialize the parallel coordination information
381 : * ----------------------------------------------------------------
382 : */
383 : void
384 0 : ExecForeignScanInitializeDSM(ForeignScanState *node, ParallelContext *pcxt)
385 : {
386 0 : FdwRoutine *fdwroutine = node->fdwroutine;
387 :
388 0 : if (fdwroutine->InitializeDSMForeignScan)
389 : {
390 0 : int plan_node_id = node->ss.ps.plan->plan_node_id;
391 : void *coordinate;
392 :
393 0 : coordinate = shm_toc_allocate(pcxt->toc, node->pscan_len);
394 0 : fdwroutine->InitializeDSMForeignScan(node, pcxt, coordinate);
395 0 : shm_toc_insert(pcxt->toc, plan_node_id, coordinate);
396 : }
397 0 : }
398 :
399 : /* ----------------------------------------------------------------
400 : * ExecForeignScanReInitializeDSM
401 : *
402 : * Reset shared state before beginning a fresh scan.
403 : * ----------------------------------------------------------------
404 : */
405 : void
2048 tgl 406 0 : ExecForeignScanReInitializeDSM(ForeignScanState *node, ParallelContext *pcxt)
407 : {
408 0 : FdwRoutine *fdwroutine = node->fdwroutine;
409 :
410 0 : if (fdwroutine->ReInitializeDSMForeignScan)
411 : {
412 0 : int plan_node_id = node->ss.ps.plan->plan_node_id;
413 : void *coordinate;
414 :
415 0 : coordinate = shm_toc_lookup(pcxt->toc, plan_node_id, false);
416 0 : fdwroutine->ReInitializeDSMForeignScan(node, pcxt, coordinate);
417 : }
418 0 : }
419 :
420 : /* ----------------------------------------------------------------
421 : * ExecForeignScanInitializeWorker
422 : *
423 : * Initialization according to the parallel coordination information
424 : * ----------------------------------------------------------------
425 : */
426 : void
1970 andres 427 0 : ExecForeignScanInitializeWorker(ForeignScanState *node,
428 : ParallelWorkerContext *pwcxt)
429 : {
2622 rhaas 430 0 : FdwRoutine *fdwroutine = node->fdwroutine;
431 :
432 0 : if (fdwroutine->InitializeWorkerForeignScan)
433 : {
434 0 : int plan_node_id = node->ss.ps.plan->plan_node_id;
435 : void *coordinate;
436 :
1970 andres 437 0 : coordinate = shm_toc_lookup(pwcxt->toc, plan_node_id, false);
438 0 : fdwroutine->InitializeWorkerForeignScan(node, pwcxt->toc, coordinate);
439 : }
2622 rhaas 440 0 : }
441 :
442 : /* ----------------------------------------------------------------
443 : * ExecShutdownForeignScan
444 : *
445 : * Gives FDW chance to stop asynchronous resource consumption
446 : * and release any resources still held.
447 : * ----------------------------------------------------------------
448 : */
449 : void
2233 rhaas 450 CBC 542 : ExecShutdownForeignScan(ForeignScanState *node)
451 : {
452 542 : FdwRoutine *fdwroutine = node->fdwroutine;
453 :
454 542 : if (fdwroutine->ShutdownForeignScan)
2233 rhaas 455 UBC 0 : fdwroutine->ShutdownForeignScan(node);
2233 rhaas 456 CBC 542 : }
457 :
458 : /* ----------------------------------------------------------------
459 : * ExecAsyncForeignScanRequest
460 : *
461 : * Asynchronously request a tuple from a designed async-capable node
462 : * ----------------------------------------------------------------
463 : */
464 : void
739 efujita 465 5772 : ExecAsyncForeignScanRequest(AsyncRequest *areq)
466 : {
467 5772 : ForeignScanState *node = (ForeignScanState *) areq->requestee;
468 5772 : FdwRoutine *fdwroutine = node->fdwroutine;
469 :
470 5772 : Assert(fdwroutine->ForeignAsyncRequest != NULL);
471 5772 : fdwroutine->ForeignAsyncRequest(areq);
472 5772 : }
473 :
474 : /* ----------------------------------------------------------------
475 : * ExecAsyncForeignScanConfigureWait
476 : *
477 : * In async mode, configure for a wait
478 : * ----------------------------------------------------------------
479 : */
480 : void
481 163 : ExecAsyncForeignScanConfigureWait(AsyncRequest *areq)
482 : {
483 163 : ForeignScanState *node = (ForeignScanState *) areq->requestee;
484 163 : FdwRoutine *fdwroutine = node->fdwroutine;
485 :
486 163 : Assert(fdwroutine->ForeignAsyncConfigureWait != NULL);
487 163 : fdwroutine->ForeignAsyncConfigureWait(areq);
488 163 : }
489 :
490 : /* ----------------------------------------------------------------
491 : * ExecAsyncForeignScanNotify
492 : *
493 : * Callback invoked when a relevant event has occurred
494 : * ----------------------------------------------------------------
495 : */
496 : void
497 145 : ExecAsyncForeignScanNotify(AsyncRequest *areq)
498 : {
499 145 : ForeignScanState *node = (ForeignScanState *) areq->requestee;
500 145 : FdwRoutine *fdwroutine = node->fdwroutine;
501 :
502 145 : Assert(fdwroutine->ForeignAsyncNotify != NULL);
503 145 : fdwroutine->ForeignAsyncNotify(areq);
504 145 : }
|