TLA Line data Source code
1 : /*--------------------------------------------------------------------------
2 : *
3 : * test.c
4 : * Test harness code for shared memory message queues.
5 : *
6 : * Copyright (c) 2013-2023, PostgreSQL Global Development Group
7 : *
8 : * IDENTIFICATION
9 : * src/test/modules/test_shm_mq/test.c
10 : *
11 : * -------------------------------------------------------------------------
12 : */
13 :
14 : #include "postgres.h"
15 :
16 : #include "fmgr.h"
17 : #include "miscadmin.h"
18 : #include "pgstat.h"
19 : #include "varatt.h"
20 :
21 : #include "test_shm_mq.h"
22 :
23 GIC 8 : PG_MODULE_MAGIC;
24 ECB :
25 GIC 2 : PG_FUNCTION_INFO_V1(test_shm_mq);
26 CBC 2 : PG_FUNCTION_INFO_V1(test_shm_mq_pipelined);
27 ECB :
28 : static void verify_message(Size origlen, char *origdata, Size newlen,
29 : char *newdata);
30 :
31 : /*
32 : * Simple test of the shared memory message queue infrastructure.
33 : *
34 : * We set up a ring of message queues passing through 1 or more background
35 : * processes and eventually looping back to ourselves. We then send a message
36 : * through the ring a number of times indicated by the loop count. At the end,
37 : * we check whether the final message matches the one we started with.
38 : */
39 : Datum
40 GIC 4 : test_shm_mq(PG_FUNCTION_ARGS)
41 ECB : {
42 CBC 4 : int64 queue_size = PG_GETARG_INT64(0);
43 4 : text *message = PG_GETARG_TEXT_PP(1);
44 4 : char *message_contents = VARDATA_ANY(message);
45 4 : int message_size = VARSIZE_ANY_EXHDR(message);
46 4 : int32 loop_count = PG_GETARG_INT32(2);
47 GIC 4 : int32 nworkers = PG_GETARG_INT32(3);
48 : dsm_segment *seg;
49 : shm_mq_handle *outqh;
50 : shm_mq_handle *inqh;
51 : shm_mq_result res;
52 : Size len;
53 : void *data;
54 :
55 ECB : /* A negative loopcount is nonsensical. */
56 GBC 4 : if (loop_count < 0)
57 UIC 0 : ereport(ERROR,
58 : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
59 : errmsg("repeat count size must be an integer value greater than or equal to zero")));
60 :
61 : /*
62 : * Since this test sends data using the blocking interfaces, it cannot
63 : * send data to itself. Therefore, a minimum of 1 worker is required. Of
64 : * course, a negative worker count is nonsensical.
65 ECB : */
66 GBC 4 : if (nworkers <= 0)
67 UIC 0 : ereport(ERROR,
68 : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
69 : errmsg("number of workers must be an integer value greater than zero")));
70 :
71 ECB : /* Set up dynamic shared memory segment and background workers. */
72 GIC 4 : test_shm_mq_setup(queue_size, nworkers, &seg, &outqh, &inqh);
73 :
74 ECB : /* Send the initial message. */
75 CBC 4 : res = shm_mq_send(outqh, message_size, message_contents, false, true);
76 GBC 4 : if (res != SHM_MQ_SUCCESS)
77 UIC 0 : ereport(ERROR,
78 : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
79 : errmsg("could not send message")));
80 :
81 : /*
82 : * Receive a message and send it back out again. Do this a number of
83 : * times equal to the loop count.
84 : */
85 : for (;;)
86 : {
87 ECB : /* Receive a message. */
88 CBC 24001 : res = shm_mq_receive(inqh, &len, &data, false);
89 GBC 24001 : if (res != SHM_MQ_SUCCESS)
90 UIC 0 : ereport(ERROR,
91 : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
92 : errmsg("could not receive message")));
93 :
94 ECB : /* If this is supposed to be the last iteration, stop here. */
95 CBC 24001 : if (--loop_count <= 0)
96 GIC 4 : break;
97 :
98 ECB : /* Send it back out. */
99 CBC 23997 : res = shm_mq_send(outqh, len, data, false, true);
100 GBC 23997 : if (res != SHM_MQ_SUCCESS)
101 UIC 0 : ereport(ERROR,
102 : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
103 : errmsg("could not send message")));
104 : }
105 :
106 : /*
107 : * Finally, check that we got back the same message from the last
108 : * iteration that we originally sent.
109 ECB : */
110 GIC 4 : verify_message(message_size, message_contents, len, data);
111 :
112 ECB : /* Clean up. */
113 GIC 4 : dsm_detach(seg);
114 ECB :
115 GIC 4 : PG_RETURN_VOID();
116 : }
117 :
118 : /*
119 : * Pipelined test of the shared memory message queue infrastructure.
120 : *
121 : * As in the basic test, we set up a ring of message queues passing through
122 : * 1 or more background processes and eventually looping back to ourselves.
123 : * Then, we send N copies of the user-specified message through the ring and
124 : * receive them all back. Since this might fill up all message queues in the
125 : * ring and then stall, we must be prepared to begin receiving the messages
126 : * back before we've finished sending them.
127 : */
128 ECB : Datum
129 GIC 1 : test_shm_mq_pipelined(PG_FUNCTION_ARGS)
130 ECB : {
131 CBC 1 : int64 queue_size = PG_GETARG_INT64(0);
132 1 : text *message = PG_GETARG_TEXT_PP(1);
133 1 : char *message_contents = VARDATA_ANY(message);
134 1 : int message_size = VARSIZE_ANY_EXHDR(message);
135 1 : int32 loop_count = PG_GETARG_INT32(2);
136 1 : int32 nworkers = PG_GETARG_INT32(3);
137 1 : bool verify = PG_GETARG_BOOL(4);
138 1 : int32 send_count = 0;
139 GIC 1 : int32 receive_count = 0;
140 : dsm_segment *seg;
141 : shm_mq_handle *outqh;
142 : shm_mq_handle *inqh;
143 : shm_mq_result res;
144 : Size len;
145 : void *data;
146 :
147 ECB : /* A negative loopcount is nonsensical. */
148 GBC 1 : if (loop_count < 0)
149 UIC 0 : ereport(ERROR,
150 : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
151 : errmsg("repeat count size must be an integer value greater than or equal to zero")));
152 :
153 : /*
154 : * Using the nonblocking interfaces, we can even send data to ourselves,
155 : * so the minimum number of workers for this test is zero.
156 ECB : */
157 GBC 1 : if (nworkers < 0)
158 UIC 0 : ereport(ERROR,
159 : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
160 : errmsg("number of workers must be an integer value greater than or equal to zero")));
161 :
162 ECB : /* Set up dynamic shared memory segment and background workers. */
163 GIC 1 : test_shm_mq_setup(queue_size, nworkers, &seg, &outqh, &inqh);
164 :
165 : /* Main loop. */
166 ECB : for (;;)
167 CBC 4589 : {
168 GIC 4590 : bool wait = true;
169 :
170 : /*
171 : * If we haven't yet sent the message the requisite number of times,
172 : * try again to send it now. Note that when shm_mq_send() returns
173 : * SHM_MQ_WOULD_BLOCK, the next call to that function must pass the
174 : * same message size and contents; that's not an issue here because
175 : * we're sending the same message every time.
176 ECB : */
177 GIC 4590 : if (send_count < loop_count)
178 ECB : {
179 GIC 4569 : res = shm_mq_send(outqh, message_size, message_contents, true,
180 ECB : true);
181 GIC 4569 : if (res == SHM_MQ_SUCCESS)
182 ECB : {
183 CBC 200 : ++send_count;
184 GIC 200 : wait = false;
185 ECB : }
186 GBC 4369 : else if (res == SHM_MQ_DETACHED)
187 UIC 0 : ereport(ERROR,
188 : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
189 : errmsg("could not send message")));
190 : }
191 :
192 : /*
193 : * If we haven't yet received the message the requisite number of
194 : * times, try to receive it again now.
195 ECB : */
196 GIC 4590 : if (receive_count < loop_count)
197 ECB : {
198 CBC 4589 : res = shm_mq_receive(inqh, &len, &data, true);
199 GIC 4589 : if (res == SHM_MQ_SUCCESS)
200 ECB : {
201 GIC 200 : ++receive_count;
202 ECB : /* Verifying every time is slow, so it's optional. */
203 CBC 200 : if (verify)
204 200 : verify_message(message_size, message_contents, len, data);
205 GIC 200 : wait = false;
206 ECB : }
207 GBC 4389 : else if (res == SHM_MQ_DETACHED)
208 UIC 0 : ereport(ERROR,
209 : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
210 : errmsg("could not receive message")));
211 : }
212 : else
213 : {
214 : /*
215 : * Otherwise, we've received the message enough times. This
216 : * shouldn't happen unless we've also sent it enough times.
217 ECB : */
218 GBC 1 : if (send_count != receive_count)
219 UIC 0 : ereport(ERROR,
220 : (errcode(ERRCODE_INTERNAL_ERROR),
221 : errmsg("message sent %d times, but received %d times",
222 ECB : send_count, receive_count)));
223 GIC 1 : break;
224 : }
225 ECB :
226 GIC 4589 : if (wait)
227 : {
228 : /*
229 : * If we made no progress, wait for one of the other processes to
230 : * which we are connected to set our latch, indicating that they
231 : * have read or written data and therefore there may now be work
232 : * for us to do.
233 ECB : */
234 GIC 4212 : (void) WaitLatch(MyLatch, WL_LATCH_SET | WL_EXIT_ON_PM_DEATH, 0,
235 ECB : PG_WAIT_EXTENSION);
236 CBC 4212 : ResetLatch(MyLatch);
237 GIC 4212 : CHECK_FOR_INTERRUPTS();
238 : }
239 : }
240 :
241 ECB : /* Clean up. */
242 GIC 1 : dsm_detach(seg);
243 ECB :
244 GIC 1 : PG_RETURN_VOID();
245 : }
246 :
247 : /*
248 : * Verify that two messages are the same.
249 : */
250 ECB : static void
251 GIC 204 : verify_message(Size origlen, char *origdata, Size newlen, char *newdata)
252 : {
253 : Size i;
254 ECB :
255 GBC 204 : if (origlen != newlen)
256 UIC 0 : ereport(ERROR,
257 : (errmsg("message corrupted"),
258 : errdetail("The original message was %zu bytes but the final message is %zu bytes.",
259 : origlen, newlen)));
260 ECB :
261 CBC 54001336 : for (i = 0; i < origlen; ++i)
262 GBC 54001132 : if (origdata[i] != newdata[i])
263 UIC 0 : ereport(ERROR,
264 : (errmsg("message corrupted"),
265 ECB : errdetail("The new and original messages differ at byte %zu of %zu.", i, origlen)));
266 GIC 204 : }
|