Age Owner Branch data TLA Line data Source code
1 : : /*-------------------------------------------------------------------------
2 : : *
3 : : * execAsync.c
4 : : * Support routines for asynchronous execution
5 : : *
6 : : * Portions Copyright (c) 1996-2024, PostgreSQL Global Development Group
7 : : * Portions Copyright (c) 1994, Regents of the University of California
8 : : *
9 : : * IDENTIFICATION
10 : : * src/backend/executor/execAsync.c
11 : : *
12 : : *-------------------------------------------------------------------------
13 : : */
14 : :
15 : : #include "postgres.h"
16 : :
17 : : #include "executor/execAsync.h"
18 : : #include "executor/executor.h"
19 : : #include "executor/nodeAppend.h"
20 : : #include "executor/nodeForeignscan.h"
21 : :
22 : : /*
23 : : * Asynchronously request a tuple from a designed async-capable node.
24 : : */
25 : : void
1110 efujita@postgresql.o 26 :CBC 6175 : ExecAsyncRequest(AsyncRequest *areq)
27 : : {
1068 28 [ + + ]: 6175 : if (areq->requestee->chgParam != NULL) /* something changed? */
tgl@sss.pgh.pa.us 29 : 2 : ExecReScan(areq->requestee); /* let ReScan handle this */
30 : :
31 : : /* must provide our own instrumentation support */
efujita@postgresql.o 32 [ + + ]: 6175 : if (areq->requestee->instrument)
33 : 810 : InstrStartNode(areq->requestee->instrument);
34 : :
1110 35 [ + - ]: 6175 : switch (nodeTag(areq->requestee))
36 : : {
37 : 6175 : case T_ForeignScanState:
38 : 6175 : ExecAsyncForeignScanRequest(areq);
39 : 6175 : break;
1110 efujita@postgresql.o 40 :UBC 0 : default:
41 : : /* If the node doesn't support async, caller messed up. */
42 [ # # ]: 0 : elog(ERROR, "unrecognized node type: %d",
43 : : (int) nodeTag(areq->requestee));
44 : : }
45 : :
1110 efujita@postgresql.o 46 :CBC 6175 : ExecAsyncResponse(areq);
47 : :
48 : : /* must provide our own instrumentation support */
1068 49 [ + + ]: 6175 : if (areq->requestee->instrument)
50 : 810 : InstrStopNode(areq->requestee->instrument,
51 [ + + - + ]: 810 : TupIsNull(areq->result) ? 0.0 : 1.0);
1110 52 : 6175 : }
53 : :
54 : : /*
55 : : * Give the asynchronous node a chance to configure the file descriptor event
56 : : * for which it wishes to wait. We expect the node-type specific callback to
57 : : * make a single call of the following form:
58 : : *
59 : : * AddWaitEventToSet(set, WL_SOCKET_READABLE, fd, NULL, areq);
60 : : */
61 : : void
62 : 283 : ExecAsyncConfigureWait(AsyncRequest *areq)
63 : : {
64 : : /* must provide our own instrumentation support */
1068 65 [ + + ]: 283 : if (areq->requestee->instrument)
66 : 21 : InstrStartNode(areq->requestee->instrument);
67 : :
1110 68 [ + - ]: 283 : switch (nodeTag(areq->requestee))
69 : : {
70 : 283 : case T_ForeignScanState:
71 : 283 : ExecAsyncForeignScanConfigureWait(areq);
72 : 282 : break;
1110 efujita@postgresql.o 73 :UBC 0 : default:
74 : : /* If the node doesn't support async, caller messed up. */
75 [ # # ]: 0 : elog(ERROR, "unrecognized node type: %d",
76 : : (int) nodeTag(areq->requestee));
77 : : }
78 : :
79 : : /* must provide our own instrumentation support */
1068 efujita@postgresql.o 80 [ + + ]:CBC 282 : if (areq->requestee->instrument)
81 : 21 : InstrStopNode(areq->requestee->instrument, 0.0);
1110 82 : 282 : }
83 : :
84 : : /*
85 : : * Call the asynchronous node back when a relevant event has occurred.
86 : : */
87 : : void
88 : 148 : ExecAsyncNotify(AsyncRequest *areq)
89 : : {
90 : : /* must provide our own instrumentation support */
1068 91 [ + + ]: 148 : if (areq->requestee->instrument)
92 : 14 : InstrStartNode(areq->requestee->instrument);
93 : :
1110 94 [ + - ]: 148 : switch (nodeTag(areq->requestee))
95 : : {
96 : 148 : case T_ForeignScanState:
97 : 148 : ExecAsyncForeignScanNotify(areq);
98 : 148 : break;
1110 efujita@postgresql.o 99 :UBC 0 : default:
100 : : /* If the node doesn't support async, caller messed up. */
101 [ # # ]: 0 : elog(ERROR, "unrecognized node type: %d",
102 : : (int) nodeTag(areq->requestee));
103 : : }
104 : :
1110 efujita@postgresql.o 105 :CBC 148 : ExecAsyncResponse(areq);
106 : :
107 : : /* must provide our own instrumentation support */
1068 108 [ + + ]: 148 : if (areq->requestee->instrument)
109 : 14 : InstrStopNode(areq->requestee->instrument,
110 [ + + - + ]: 14 : TupIsNull(areq->result) ? 0.0 : 1.0);
1110 111 : 148 : }
112 : :
113 : : /*
114 : : * Call the requestor back when an asynchronous node has produced a result.
115 : : */
116 : : void
117 : 6328 : ExecAsyncResponse(AsyncRequest *areq)
118 : : {
119 [ + - ]: 6328 : switch (nodeTag(areq->requestor))
120 : : {
121 : 6328 : case T_AppendState:
122 : 6328 : ExecAsyncAppendResponse(areq);
123 : 6328 : break;
1110 efujita@postgresql.o 124 :UBC 0 : default:
125 : : /* If the node doesn't support async, caller messed up. */
126 [ # # ]: 0 : elog(ERROR, "unrecognized node type: %d",
127 : : (int) nodeTag(areq->requestor));
128 : : }
1110 efujita@postgresql.o 129 :CBC 6328 : }
130 : :
131 : : /*
132 : : * A requestee node should call this function to deliver the tuple to its
133 : : * requestor node. The requestee node can call this from its ExecAsyncRequest
134 : : * or ExecAsyncNotify callback.
135 : : */
136 : : void
137 : 6167 : ExecAsyncRequestDone(AsyncRequest *areq, TupleTableSlot *result)
138 : : {
139 : 6167 : areq->request_complete = true;
140 : 6167 : areq->result = result;
141 : 6167 : }
142 : :
143 : : /*
144 : : * A requestee node should call this function to indicate that it is pending
145 : : * for a callback. The requestee node can call this from its ExecAsyncRequest
146 : : * or ExecAsyncNotify callback.
147 : : */
148 : : void
149 : 161 : ExecAsyncRequestPending(AsyncRequest *areq)
150 : : {
151 : 161 : areq->callback_pending = true;
152 : 161 : areq->request_complete = false;
153 : 161 : areq->result = NULL;
154 : 161 : }
|