Age Owner TLA Line data Source code
1 : /*-------------------------------------------------------------------------
2 : *
3 : * execAsync.c
4 : * Support routines for asynchronous execution
5 : *
6 : * Portions Copyright (c) 1996-2023, PostgreSQL Global Development Group
7 : * Portions Copyright (c) 1994, Regents of the University of California
8 : *
9 : * IDENTIFICATION
10 : * src/backend/executor/execAsync.c
11 : *
12 : *-------------------------------------------------------------------------
13 : */
14 :
15 : #include "postgres.h"
16 :
17 : #include "executor/execAsync.h"
18 : #include "executor/executor.h"
19 : #include "executor/nodeAppend.h"
20 : #include "executor/nodeForeignscan.h"
21 :
22 : /*
23 : * Asynchronously request a tuple from a designed async-capable node.
24 : */
25 : void
739 efujita 26 CBC 5772 : ExecAsyncRequest(AsyncRequest *areq)
27 : {
697 28 5772 : if (areq->requestee->chgParam != NULL) /* something changed? */
tgl 29 2 : ExecReScan(areq->requestee); /* let ReScan handle this */
30 :
31 : /* must provide our own instrumentation support */
efujita 32 5772 : if (areq->requestee->instrument)
33 810 : InstrStartNode(areq->requestee->instrument);
34 :
739 35 5772 : switch (nodeTag(areq->requestee))
36 : {
37 5772 : case T_ForeignScanState:
38 5772 : ExecAsyncForeignScanRequest(areq);
39 5772 : break;
739 efujita 40 UBC 0 : default:
41 : /* If the node doesn't support async, caller messed up. */
42 0 : elog(ERROR, "unrecognized node type: %d",
43 : (int) nodeTag(areq->requestee));
44 : }
45 :
739 efujita 46 CBC 5772 : ExecAsyncResponse(areq);
47 :
48 : /* must provide our own instrumentation support */
697 49 5772 : if (areq->requestee->instrument)
50 810 : InstrStopNode(areq->requestee->instrument,
51 810 : TupIsNull(areq->result) ? 0.0 : 1.0);
739 52 5772 : }
53 :
54 : /*
55 : * Give the asynchronous node a chance to configure the file descriptor event
56 : * for which it wishes to wait. We expect the node-type specific callback to
57 : * make a single call of the following form:
58 : *
59 : * AddWaitEventToSet(set, WL_SOCKET_READABLE, fd, NULL, areq);
60 : */
61 : void
62 163 : ExecAsyncConfigureWait(AsyncRequest *areq)
63 : {
64 : /* must provide our own instrumentation support */
697 65 163 : if (areq->requestee->instrument)
66 18 : InstrStartNode(areq->requestee->instrument);
67 :
739 68 163 : switch (nodeTag(areq->requestee))
69 : {
70 163 : case T_ForeignScanState:
71 163 : ExecAsyncForeignScanConfigureWait(areq);
72 163 : break;
739 efujita 73 UBC 0 : default:
74 : /* If the node doesn't support async, caller messed up. */
75 0 : elog(ERROR, "unrecognized node type: %d",
76 : (int) nodeTag(areq->requestee));
77 : }
78 :
79 : /* must provide our own instrumentation support */
697 efujita 80 CBC 163 : if (areq->requestee->instrument)
81 18 : InstrStopNode(areq->requestee->instrument, 0.0);
739 82 163 : }
83 :
84 : /*
85 : * Call the asynchronous node back when a relevant event has occurred.
86 : */
87 : void
88 145 : ExecAsyncNotify(AsyncRequest *areq)
89 : {
90 : /* must provide our own instrumentation support */
697 91 145 : if (areq->requestee->instrument)
92 15 : InstrStartNode(areq->requestee->instrument);
93 :
739 94 145 : switch (nodeTag(areq->requestee))
95 : {
96 145 : case T_ForeignScanState:
97 145 : ExecAsyncForeignScanNotify(areq);
98 145 : break;
739 efujita 99 UBC 0 : default:
100 : /* If the node doesn't support async, caller messed up. */
101 0 : elog(ERROR, "unrecognized node type: %d",
102 : (int) nodeTag(areq->requestee));
103 : }
104 :
739 efujita 105 CBC 145 : ExecAsyncResponse(areq);
106 :
107 : /* must provide our own instrumentation support */
697 108 145 : if (areq->requestee->instrument)
109 15 : InstrStopNode(areq->requestee->instrument,
110 15 : TupIsNull(areq->result) ? 0.0 : 1.0);
739 111 145 : }
112 :
113 : /*
114 : * Call the requestor back when an asynchronous node has produced a result.
115 : */
116 : void
117 5920 : ExecAsyncResponse(AsyncRequest *areq)
118 : {
119 5920 : switch (nodeTag(areq->requestor))
120 : {
121 5920 : case T_AppendState:
122 5920 : ExecAsyncAppendResponse(areq);
123 5920 : break;
739 efujita 124 UBC 0 : default:
125 : /* If the node doesn't support async, caller messed up. */
126 0 : elog(ERROR, "unrecognized node type: %d",
127 : (int) nodeTag(areq->requestor));
128 : }
739 efujita 129 CBC 5920 : }
130 :
131 : /*
132 : * A requestee node should call this function to deliver the tuple to its
133 : * requestor node. The requestee node can call this from its ExecAsyncRequest
134 : * or ExecAsyncNotify callback.
135 : */
136 : void
137 5766 : ExecAsyncRequestDone(AsyncRequest *areq, TupleTableSlot *result)
138 : {
139 5766 : areq->request_complete = true;
140 5766 : areq->result = result;
141 5766 : }
142 :
143 : /*
144 : * A requestee node should call this function to indicate that it is pending
145 : * for a callback. The requestee node can call this from its ExecAsyncRequest
146 : * or ExecAsyncNotify callback.
147 : */
148 : void
149 154 : ExecAsyncRequestPending(AsyncRequest *areq)
150 : {
151 154 : areq->callback_pending = true;
152 154 : areq->request_complete = false;
153 154 : areq->result = NULL;
154 154 : }
|