Age Owner Branch data TLA Line data Source code
1 : : /*-------------------------------------------------------------------------
2 : : *
3 : : * tqueue.c
4 : : * Use shm_mq to send & receive tuples between parallel backends
5 : : *
6 : : * A DestReceiver of type DestTupleQueue, which is a TQueueDestReceiver
7 : : * under the hood, writes tuples from the executor to a shm_mq.
8 : : *
9 : : * A TupleQueueReader reads tuples from a shm_mq and returns the tuples.
10 : : *
11 : : * Portions Copyright (c) 1996-2024, PostgreSQL Global Development Group
12 : : * Portions Copyright (c) 1994, Regents of the University of California
13 : : *
14 : : * IDENTIFICATION
15 : : * src/backend/executor/tqueue.c
16 : : *
17 : : *-------------------------------------------------------------------------
18 : : */
19 : :
20 : : #include "postgres.h"
21 : :
22 : : #include "access/htup_details.h"
23 : : #include "executor/tqueue.h"
24 : :
25 : : /*
26 : : * DestReceiver object's private contents
27 : : *
28 : : * queue is a pointer to data supplied by DestReceiver's caller.
29 : : */
30 : : typedef struct TQueueDestReceiver
31 : : {
32 : : DestReceiver pub; /* public fields */
33 : : shm_mq_handle *queue; /* shm_mq to send to */
34 : : } TQueueDestReceiver;
35 : :
36 : : /*
37 : : * TupleQueueReader object's private contents
38 : : *
39 : : * queue is a pointer to data supplied by reader's caller.
40 : : *
41 : : * "typedef struct TupleQueueReader TupleQueueReader" is in tqueue.h
42 : : */
43 : : struct TupleQueueReader
44 : : {
45 : : shm_mq_handle *queue; /* shm_mq to receive from */
46 : : };
47 : :
48 : : /*
49 : : * Receive a tuple from a query, and send it to the designated shm_mq.
50 : : *
51 : : * Returns true if successful, false if shm_mq has been detached.
52 : : */
53 : : static bool
3131 rhaas@postgresql.org 54 :CBC 1187652 : tqueueReceiveSlot(TupleTableSlot *slot, DestReceiver *self)
55 : : {
56 : 1187652 : TQueueDestReceiver *tqueue = (TQueueDestReceiver *) self;
57 : : MinimalTuple tuple;
58 : : shm_mq_result result;
59 : : bool should_free;
60 : :
61 : : /* Send the tuple itself. */
1367 tmunro@postgresql.or 62 : 1187652 : tuple = ExecFetchSlotMinimalTuple(slot, &should_free);
913 rhaas@postgresql.org 63 : 1187652 : result = shm_mq_send(tqueue->queue, tuple->t_len, tuple, false, false);
64 : :
1977 andres@anarazel.de 65 [ + + ]: 1187652 : if (should_free)
1367 tmunro@postgresql.or 66 : 1152500 : pfree(tuple);
67 : :
68 : : /* Check for failure. */
2869 rhaas@postgresql.org 69 [ - + ]: 1187652 : if (result == SHM_MQ_DETACHED)
2869 rhaas@postgresql.org 70 :UBC 0 : return false;
2869 rhaas@postgresql.org 71 [ - + ]:CBC 1187652 : else if (result != SHM_MQ_SUCCESS)
2869 rhaas@postgresql.org 72 [ # # ]:UBC 0 : ereport(ERROR,
73 : : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
74 : : errmsg("could not send tuple to shared-memory queue")));
75 : :
2869 rhaas@postgresql.org 76 :CBC 1187652 : return true;
77 : : }
78 : :
79 : : /*
80 : : * Prepare to receive tuples from executor.
81 : : */
82 : : static void
3131 83 : 1232 : tqueueStartupReceiver(DestReceiver *self, int operation, TupleDesc typeinfo)
84 : : {
85 : : /* do nothing */
86 : 1232 : }
87 : :
88 : : /*
89 : : * Clean up at end of an executor run
90 : : */
91 : : static void
92 : 1229 : tqueueShutdownReceiver(DestReceiver *self)
93 : : {
3121 94 : 1229 : TQueueDestReceiver *tqueue = (TQueueDestReceiver *) self;
95 : :
2418 tgl@sss.pgh.pa.us 96 [ + - ]: 1229 : if (tqueue->queue != NULL)
97 : 1229 : shm_mq_detach(tqueue->queue);
98 : 1229 : tqueue->queue = NULL;
3131 rhaas@postgresql.org 99 : 1229 : }
100 : :
101 : : /*
102 : : * Destroy receiver when done with it
103 : : */
104 : : static void
105 : 1229 : tqueueDestroyReceiver(DestReceiver *self)
106 : : {
3082 107 : 1229 : TQueueDestReceiver *tqueue = (TQueueDestReceiver *) self;
108 : :
109 : : /* We probably already detached from queue, but let's be sure */
2418 tgl@sss.pgh.pa.us 110 [ - + ]: 1229 : if (tqueue->queue != NULL)
2418 tgl@sss.pgh.pa.us 111 :UBC 0 : shm_mq_detach(tqueue->queue);
3131 rhaas@postgresql.org 112 :CBC 1229 : pfree(self);
113 : 1229 : }
114 : :
115 : : /*
116 : : * Create a DestReceiver that writes tuples to a tuple queue.
117 : : */
118 : : DestReceiver *
119 : 1232 : CreateTupleQueueDestReceiver(shm_mq_handle *handle)
120 : : {
121 : : TQueueDestReceiver *self;
122 : :
123 : 1232 : self = (TQueueDestReceiver *) palloc0(sizeof(TQueueDestReceiver));
124 : :
125 : 1232 : self->pub.receiveSlot = tqueueReceiveSlot;
126 : 1232 : self->pub.rStartup = tqueueStartupReceiver;
127 : 1232 : self->pub.rShutdown = tqueueShutdownReceiver;
128 : 1232 : self->pub.rDestroy = tqueueDestroyReceiver;
129 : 1232 : self->pub.mydest = DestTupleQueue;
2814 tgl@sss.pgh.pa.us 130 : 1232 : self->queue = handle;
131 : :
3131 rhaas@postgresql.org 132 : 1232 : return (DestReceiver *) self;
133 : : }
134 : :
135 : : /*
136 : : * Create a tuple queue reader.
137 : : */
138 : : TupleQueueReader *
2404 andres@anarazel.de 139 : 1232 : CreateTupleQueueReader(shm_mq_handle *handle)
140 : : {
3082 rhaas@postgresql.org 141 : 1232 : TupleQueueReader *reader = palloc0(sizeof(TupleQueueReader));
142 : :
143 : 1232 : reader->queue = handle;
144 : :
145 : 1232 : return reader;
146 : : }
147 : :
148 : : /*
149 : : * Destroy a tuple queue reader.
150 : : *
151 : : * Note: cleaning up the underlying shm_mq is the caller's responsibility.
152 : : * We won't access it here, as it may be detached already.
153 : : */
154 : : void
155 : 1229 : DestroyTupleQueueReader(TupleQueueReader *reader)
156 : : {
157 : 1229 : pfree(reader);
158 : 1229 : }
159 : :
160 : : /*
161 : : * Fetch a tuple from a tuple queue reader.
162 : : *
163 : : * The return value is NULL if there are no remaining tuples or if
164 : : * nowait = true and no tuple is ready to return. *done, if not NULL,
165 : : * is set to true when there are no remaining tuples and otherwise to false.
166 : : *
167 : : * The returned tuple, if any, is either in shared memory or a private buffer
168 : : * and should not be freed. The pointer is invalid after the next call to
169 : : * TupleQueueReaderNext().
170 : : *
171 : : * Even when shm_mq_receive() returns SHM_MQ_WOULD_BLOCK, this can still
172 : : * accumulate bytes from a partially-read message, so it's useful to call
173 : : * this with nowait = true even if nothing is returned.
174 : : */
175 : : MinimalTuple
176 : 1968799 : TupleQueueReaderNext(TupleQueueReader *reader, bool nowait, bool *done)
177 : : {
178 : : MinimalTuple tuple;
179 : : shm_mq_result result;
180 : : Size nbytes;
181 : : void *data;
182 : :
183 [ + - ]: 1968799 : if (done != NULL)
184 : 1968799 : *done = false;
185 : :
186 : : /* Attempt to read a message. */
2404 andres@anarazel.de 187 : 1968799 : result = shm_mq_receive(reader->queue, &nbytes, &data, nowait);
188 : :
189 : : /* If queue is detached, set *done and return NULL. */
190 [ + + ]: 1968799 : if (result == SHM_MQ_DETACHED)
191 : : {
192 [ + - ]: 1223 : if (done != NULL)
193 : 1223 : *done = true;
194 : 1223 : return NULL;
195 : : }
196 : :
197 : : /* In non-blocking mode, bail out if no message ready yet. */
198 [ + + ]: 1967576 : if (result == SHM_MQ_WOULD_BLOCK)
199 : 784813 : return NULL;
200 [ - + ]: 1182763 : Assert(result == SHM_MQ_SUCCESS);
201 : :
202 : : /*
203 : : * Return a pointer to the queue memory directly (which had better be
204 : : * sufficiently aligned).
205 : : */
1367 tmunro@postgresql.or 206 : 1182763 : tuple = (MinimalTuple) data;
207 [ - + ]: 1182763 : Assert(tuple->t_len == nbytes);
208 : :
209 : 1182763 : return tuple;
210 : : }
|