Age Owner TLA Line data Source code
1 : /*-------------------------------------------------------------------------
2 : *
3 : * tqueue.c
4 : * Use shm_mq to send & receive tuples between parallel backends
5 : *
6 : * A DestReceiver of type DestTupleQueue, which is a TQueueDestReceiver
7 : * under the hood, writes tuples from the executor to a shm_mq.
8 : *
9 : * A TupleQueueReader reads tuples from a shm_mq and returns the tuples.
10 : *
11 : * Portions Copyright (c) 1996-2023, PostgreSQL Global Development Group
12 : * Portions Copyright (c) 1994, Regents of the University of California
13 : *
14 : * IDENTIFICATION
15 : * src/backend/executor/tqueue.c
16 : *
17 : *-------------------------------------------------------------------------
18 : */
19 :
20 : #include "postgres.h"
21 :
22 : #include "access/htup_details.h"
23 : #include "executor/tqueue.h"
24 :
25 : /*
26 : * DestReceiver object's private contents
27 : *
28 : * queue is a pointer to data supplied by DestReceiver's caller.
29 : */
30 : typedef struct TQueueDestReceiver
31 : {
32 : DestReceiver pub; /* public fields */
33 : shm_mq_handle *queue; /* shm_mq to send to */
34 : } TQueueDestReceiver;
35 :
36 : /*
37 : * TupleQueueReader object's private contents
38 : *
39 : * queue is a pointer to data supplied by reader's caller.
40 : *
41 : * "typedef struct TupleQueueReader TupleQueueReader" is in tqueue.h
42 : */
43 : struct TupleQueueReader
44 : {
45 : shm_mq_handle *queue; /* shm_mq to receive from */
46 : };
47 :
48 : /*
49 : * Receive a tuple from a query, and send it to the designated shm_mq.
50 : *
51 : * Returns true if successful, false if shm_mq has been detached.
52 : */
53 : static bool
2760 rhaas 54 CBC 721031 : tqueueReceiveSlot(TupleTableSlot *slot, DestReceiver *self)
55 : {
56 721031 : TQueueDestReceiver *tqueue = (TQueueDestReceiver *) self;
57 : MinimalTuple tuple;
58 : shm_mq_result result;
59 : bool should_free;
60 :
61 : /* Send the tuple itself. */
996 tmunro 62 721031 : tuple = ExecFetchSlotMinimalTuple(slot, &should_free);
542 rhaas 63 721031 : result = shm_mq_send(tqueue->queue, tuple->t_len, tuple, false, false);
64 :
1606 andres 65 721031 : if (should_free)
996 tmunro 66 720725 : pfree(tuple);
67 :
68 : /* Check for failure. */
2498 rhaas 69 721031 : if (result == SHM_MQ_DETACHED)
2498 rhaas 70 UBC 0 : return false;
2498 rhaas 71 CBC 721031 : else if (result != SHM_MQ_SUCCESS)
2498 rhaas 72 UBC 0 : ereport(ERROR,
73 : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
74 : errmsg("could not send tuple to shared-memory queue")));
75 :
2498 rhaas 76 CBC 721031 : return true;
77 : }
78 :
79 : /*
80 : * Prepare to receive tuples from executor.
81 : */
82 : static void
2760 83 1210 : tqueueStartupReceiver(DestReceiver *self, int operation, TupleDesc typeinfo)
84 : {
85 : /* do nothing */
86 1210 : }
87 :
88 : /*
89 : * Clean up at end of an executor run
90 : */
91 : static void
92 1207 : tqueueShutdownReceiver(DestReceiver *self)
93 : {
2750 94 1207 : TQueueDestReceiver *tqueue = (TQueueDestReceiver *) self;
95 :
2047 tgl 96 1207 : if (tqueue->queue != NULL)
97 1207 : shm_mq_detach(tqueue->queue);
98 1207 : tqueue->queue = NULL;
2760 rhaas 99 1207 : }
100 :
101 : /*
102 : * Destroy receiver when done with it
103 : */
104 : static void
105 1207 : tqueueDestroyReceiver(DestReceiver *self)
106 : {
2711 107 1207 : TQueueDestReceiver *tqueue = (TQueueDestReceiver *) self;
108 :
109 : /* We probably already detached from queue, but let's be sure */
2047 tgl 110 1207 : if (tqueue->queue != NULL)
2047 tgl 111 UBC 0 : shm_mq_detach(tqueue->queue);
2760 rhaas 112 CBC 1207 : pfree(self);
113 1207 : }
114 :
115 : /*
116 : * Create a DestReceiver that writes tuples to a tuple queue.
117 : */
118 : DestReceiver *
119 1210 : CreateTupleQueueDestReceiver(shm_mq_handle *handle)
120 : {
121 : TQueueDestReceiver *self;
122 :
123 1210 : self = (TQueueDestReceiver *) palloc0(sizeof(TQueueDestReceiver));
124 :
125 1210 : self->pub.receiveSlot = tqueueReceiveSlot;
126 1210 : self->pub.rStartup = tqueueStartupReceiver;
127 1210 : self->pub.rShutdown = tqueueShutdownReceiver;
128 1210 : self->pub.rDestroy = tqueueDestroyReceiver;
129 1210 : self->pub.mydest = DestTupleQueue;
2443 tgl 130 1210 : self->queue = handle;
131 :
2760 rhaas 132 1210 : return (DestReceiver *) self;
133 : }
134 :
135 : /*
136 : * Create a tuple queue reader.
137 : */
138 : TupleQueueReader *
2033 andres 139 1210 : CreateTupleQueueReader(shm_mq_handle *handle)
140 : {
2711 rhaas 141 1210 : TupleQueueReader *reader = palloc0(sizeof(TupleQueueReader));
142 :
143 1210 : reader->queue = handle;
144 :
145 1210 : return reader;
146 : }
147 :
148 : /*
149 : * Destroy a tuple queue reader.
150 : *
151 : * Note: cleaning up the underlying shm_mq is the caller's responsibility.
152 : * We won't access it here, as it may be detached already.
153 : */
154 : void
155 1207 : DestroyTupleQueueReader(TupleQueueReader *reader)
156 : {
157 1207 : pfree(reader);
158 1207 : }
159 :
160 : /*
161 : * Fetch a tuple from a tuple queue reader.
162 : *
163 : * The return value is NULL if there are no remaining tuples or if
164 : * nowait = true and no tuple is ready to return. *done, if not NULL,
165 : * is set to true when there are no remaining tuples and otherwise to false.
166 : *
167 : * The returned tuple, if any, is either in shared memory or a private buffer
168 : * and should not be freed. The pointer is invalid after the next call to
169 : * TupleQueueReaderNext().
170 : *
171 : * Even when shm_mq_receive() returns SHM_MQ_WOULD_BLOCK, this can still
172 : * accumulate bytes from a partially-read message, so it's useful to call
173 : * this with nowait = true even if nothing is returned.
174 : */
175 : MinimalTuple
176 3339153 : TupleQueueReaderNext(TupleQueueReader *reader, bool nowait, bool *done)
177 : {
178 : MinimalTuple tuple;
179 : shm_mq_result result;
180 : Size nbytes;
181 : void *data;
182 :
183 3339153 : if (done != NULL)
184 3339153 : *done = false;
185 :
186 : /* Attempt to read a message. */
2033 andres 187 3339153 : result = shm_mq_receive(reader->queue, &nbytes, &data, nowait);
188 :
189 : /* If queue is detached, set *done and return NULL. */
190 3339153 : if (result == SHM_MQ_DETACHED)
191 : {
192 1207 : if (done != NULL)
193 1207 : *done = true;
194 1207 : return NULL;
195 : }
196 :
197 : /* In non-blocking mode, bail out if no message ready yet. */
198 3337946 : if (result == SHM_MQ_WOULD_BLOCK)
199 2616915 : return NULL;
200 721031 : Assert(result == SHM_MQ_SUCCESS);
201 :
202 : /*
203 : * Return a pointer to the queue memory directly (which had better be
204 : * sufficiently aligned).
205 : */
996 tmunro 206 721031 : tuple = (MinimalTuple) data;
207 721031 : Assert(tuple->t_len == nbytes);
208 :
209 721031 : return tuple;
210 : }
|