Age Owner Branch data TLA Line data Source code
1 : : /*--------------------------------------------------------------------------
2 : : *
3 : : * test.c
4 : : * Test harness code for shared memory message queues.
5 : : *
6 : : * Copyright (c) 2013-2024, PostgreSQL Global Development Group
7 : : *
8 : : * IDENTIFICATION
9 : : * src/test/modules/test_shm_mq/test.c
10 : : *
11 : : * -------------------------------------------------------------------------
12 : : */
13 : :
14 : : #include "postgres.h"
15 : :
16 : : #include "fmgr.h"
17 : : #include "miscadmin.h"
18 : : #include "pgstat.h"
19 : : #include "varatt.h"
20 : :
21 : : #include "test_shm_mq.h"
22 : :
3566 bruce@momjian.us 23 :CBC 8 : PG_MODULE_MAGIC;
24 : :
25 : 2 : PG_FUNCTION_INFO_V1(test_shm_mq);
3743 rhaas@postgresql.org 26 : 2 : PG_FUNCTION_INFO_V1(test_shm_mq_pipelined);
27 : :
28 : : static void verify_message(Size origlen, char *origdata, Size newlen,
29 : : char *newdata);
30 : :
31 : : /* value cached, fetched from shared memory */
32 : : static uint32 we_message_queue = 0;
33 : :
34 : : /*
35 : : * Simple test of the shared memory message queue infrastructure.
36 : : *
37 : : * We set up a ring of message queues passing through 1 or more background
38 : : * processes and eventually looping back to ourselves. We then send a message
39 : : * through the ring a number of times indicated by the loop count. At the end,
40 : : * we check whether the final message matches the one we started with.
41 : : */
42 : : Datum
43 : 4 : test_shm_mq(PG_FUNCTION_ARGS)
44 : : {
45 : 4 : int64 queue_size = PG_GETARG_INT64(0);
46 : 4 : text *message = PG_GETARG_TEXT_PP(1);
47 [ - + ]: 4 : char *message_contents = VARDATA_ANY(message);
48 [ - + - - : 4 : int message_size = VARSIZE_ANY_EXHDR(message);
- - - - -
+ ]
49 : 4 : int32 loop_count = PG_GETARG_INT32(2);
50 : 4 : int32 nworkers = PG_GETARG_INT32(3);
51 : : dsm_segment *seg;
52 : : shm_mq_handle *outqh;
53 : : shm_mq_handle *inqh;
54 : : shm_mq_result res;
55 : : Size len;
56 : : void *data;
57 : :
58 : : /* A negative loopcount is nonsensical. */
59 [ - + ]: 4 : if (loop_count < 0)
3743 rhaas@postgresql.org 60 [ # # ]:UBC 0 : ereport(ERROR,
61 : : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
62 : : errmsg("repeat count size must be an integer value greater than or equal to zero")));
63 : :
64 : : /*
65 : : * Since this test sends data using the blocking interfaces, it cannot
66 : : * send data to itself. Therefore, a minimum of 1 worker is required. Of
67 : : * course, a negative worker count is nonsensical.
68 : : */
991 fujii@postgresql.org 69 [ - + ]:CBC 4 : if (nworkers <= 0)
3743 rhaas@postgresql.org 70 [ # # ]:UBC 0 : ereport(ERROR,
71 : : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
72 : : errmsg("number of workers must be an integer value greater than zero")));
73 : :
74 : : /* Set up dynamic shared memory segment and background workers. */
3743 rhaas@postgresql.org 75 :CBC 4 : test_shm_mq_setup(queue_size, nworkers, &seg, &outqh, &inqh);
76 : :
77 : : /* Send the initial message. */
913 78 : 4 : res = shm_mq_send(outqh, message_size, message_contents, false, true);
3743 79 [ + - ]: 4 : if (res != SHM_MQ_SUCCESS)
3743 rhaas@postgresql.org 80 [ # # ]:UBC 0 : ereport(ERROR,
81 : : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
82 : : errmsg("could not send message")));
83 : :
84 : : /*
85 : : * Receive a message and send it back out again. Do this a number of
86 : : * times equal to the loop count.
87 : : */
88 : : for (;;)
89 : : {
90 : : /* Receive a message. */
3743 rhaas@postgresql.org 91 :CBC 24001 : res = shm_mq_receive(inqh, &len, &data, false);
92 [ - + ]: 24001 : if (res != SHM_MQ_SUCCESS)
3743 rhaas@postgresql.org 93 [ # # ]:UBC 0 : ereport(ERROR,
94 : : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
95 : : errmsg("could not receive message")));
96 : :
97 : : /* If this is supposed to be the last iteration, stop here. */
3743 rhaas@postgresql.org 98 [ + + ]:CBC 24001 : if (--loop_count <= 0)
99 : 4 : break;
100 : :
101 : : /* Send it back out. */
913 102 : 23997 : res = shm_mq_send(outqh, len, data, false, true);
3743 103 [ - + ]: 23997 : if (res != SHM_MQ_SUCCESS)
3743 rhaas@postgresql.org 104 [ # # ]:UBC 0 : ereport(ERROR,
105 : : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
106 : : errmsg("could not send message")));
107 : : }
108 : :
109 : : /*
110 : : * Finally, check that we got back the same message from the last
111 : : * iteration that we originally sent.
112 : : */
3743 rhaas@postgresql.org 113 :CBC 4 : verify_message(message_size, message_contents, len, data);
114 : :
115 : : /* Clean up. */
116 : 4 : dsm_detach(seg);
117 : :
118 : 4 : PG_RETURN_VOID();
119 : : }
120 : :
121 : : /*
122 : : * Pipelined test of the shared memory message queue infrastructure.
123 : : *
124 : : * As in the basic test, we set up a ring of message queues passing through
125 : : * 1 or more background processes and eventually looping back to ourselves.
126 : : * Then, we send N copies of the user-specified message through the ring and
127 : : * receive them all back. Since this might fill up all message queues in the
128 : : * ring and then stall, we must be prepared to begin receiving the messages
129 : : * back before we've finished sending them.
130 : : */
131 : : Datum
132 : 1 : test_shm_mq_pipelined(PG_FUNCTION_ARGS)
133 : : {
134 : 1 : int64 queue_size = PG_GETARG_INT64(0);
135 : 1 : text *message = PG_GETARG_TEXT_PP(1);
136 [ - + ]: 1 : char *message_contents = VARDATA_ANY(message);
137 [ - + - - : 1 : int message_size = VARSIZE_ANY_EXHDR(message);
- - - - -
+ ]
138 : 1 : int32 loop_count = PG_GETARG_INT32(2);
139 : 1 : int32 nworkers = PG_GETARG_INT32(3);
140 : 1 : bool verify = PG_GETARG_BOOL(4);
141 : 1 : int32 send_count = 0;
142 : 1 : int32 receive_count = 0;
143 : : dsm_segment *seg;
144 : : shm_mq_handle *outqh;
145 : : shm_mq_handle *inqh;
146 : : shm_mq_result res;
147 : : Size len;
148 : : void *data;
149 : :
150 : : /* A negative loopcount is nonsensical. */
151 [ - + ]: 1 : if (loop_count < 0)
3743 rhaas@postgresql.org 152 [ # # ]:UBC 0 : ereport(ERROR,
153 : : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
154 : : errmsg("repeat count size must be an integer value greater than or equal to zero")));
155 : :
156 : : /*
157 : : * Using the nonblocking interfaces, we can even send data to ourselves,
158 : : * so the minimum number of workers for this test is zero.
159 : : */
3743 rhaas@postgresql.org 160 [ - + ]:CBC 1 : if (nworkers < 0)
3743 rhaas@postgresql.org 161 [ # # ]:UBC 0 : ereport(ERROR,
162 : : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
163 : : errmsg("number of workers must be an integer value greater than or equal to zero")));
164 : :
165 : : /* Set up dynamic shared memory segment and background workers. */
3743 rhaas@postgresql.org 166 :CBC 1 : test_shm_mq_setup(queue_size, nworkers, &seg, &outqh, &inqh);
167 : :
168 : : /* Main loop. */
169 : : for (;;)
170 : 4738 : {
171 : 4739 : bool wait = true;
172 : :
173 : : /*
174 : : * If we haven't yet sent the message the requisite number of times,
175 : : * try again to send it now. Note that when shm_mq_send() returns
176 : : * SHM_MQ_WOULD_BLOCK, the next call to that function must pass the
177 : : * same message size and contents; that's not an issue here because
178 : : * we're sending the same message every time.
179 : : */
180 [ + + ]: 4739 : if (send_count < loop_count)
181 : : {
913 182 : 4717 : res = shm_mq_send(outqh, message_size, message_contents, true,
183 : : true);
3743 184 [ + + ]: 4717 : if (res == SHM_MQ_SUCCESS)
185 : : {
186 : 200 : ++send_count;
187 : 200 : wait = false;
188 : : }
189 [ - + ]: 4517 : else if (res == SHM_MQ_DETACHED)
3743 rhaas@postgresql.org 190 [ # # ]:UBC 0 : ereport(ERROR,
191 : : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
192 : : errmsg("could not send message")));
193 : : }
194 : :
195 : : /*
196 : : * If we haven't yet received the message the requisite number of
197 : : * times, try to receive it again now.
198 : : */
3743 rhaas@postgresql.org 199 [ + + ]:CBC 4739 : if (receive_count < loop_count)
200 : : {
201 : 4738 : res = shm_mq_receive(inqh, &len, &data, true);
202 [ + + ]: 4738 : if (res == SHM_MQ_SUCCESS)
203 : : {
204 : 200 : ++receive_count;
205 : : /* Verifying every time is slow, so it's optional. */
206 [ + - ]: 200 : if (verify)
207 : 200 : verify_message(message_size, message_contents, len, data);
208 : 200 : wait = false;
209 : : }
210 [ - + ]: 4538 : else if (res == SHM_MQ_DETACHED)
3743 rhaas@postgresql.org 211 [ # # ]:UBC 0 : ereport(ERROR,
212 : : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
213 : : errmsg("could not receive message")));
214 : : }
215 : : else
216 : : {
217 : : /*
218 : : * Otherwise, we've received the message enough times. This
219 : : * shouldn't happen unless we've also sent it enough times.
220 : : */
3743 rhaas@postgresql.org 221 [ - + ]:CBC 1 : if (send_count != receive_count)
3743 rhaas@postgresql.org 222 [ # # ]:UBC 0 : ereport(ERROR,
223 : : (errcode(ERRCODE_INTERNAL_ERROR),
224 : : errmsg("message sent %d times, but received %d times",
225 : : send_count, receive_count)));
3743 rhaas@postgresql.org 226 :CBC 1 : break;
227 : : }
228 : :
229 [ + + ]: 4738 : if (wait)
230 : : {
231 : : /* first time, allocate or get the custom wait event */
193 michael@paquier.xyz 232 [ + + ]:GNC 4363 : if (we_message_queue == 0)
233 : 1 : we_message_queue = WaitEventExtensionNew("TestShmMqMessageQueue");
234 : :
235 : : /*
236 : : * If we made no progress, wait for one of the other processes to
237 : : * which we are connected to set our latch, indicating that they
238 : : * have read or written data and therefore there may now be work
239 : : * for us to do.
240 : : */
1969 tmunro@postgresql.or 241 :CBC 4363 : (void) WaitLatch(MyLatch, WL_LATCH_SET | WL_EXIT_ON_PM_DEATH, 0,
242 : : we_message_queue);
3378 andres@anarazel.de 243 : 4363 : ResetLatch(MyLatch);
2813 tgl@sss.pgh.pa.us 244 [ - + ]: 4363 : CHECK_FOR_INTERRUPTS();
245 : : }
246 : : }
247 : :
248 : : /* Clean up. */
3743 rhaas@postgresql.org 249 : 1 : dsm_detach(seg);
250 : :
251 : 1 : PG_RETURN_VOID();
252 : : }
253 : :
254 : : /*
255 : : * Verify that two messages are the same.
256 : : */
257 : : static void
3680 258 : 204 : verify_message(Size origlen, char *origdata, Size newlen, char *newdata)
259 : : {
260 : : Size i;
261 : :
3743 262 [ - + ]: 204 : if (origlen != newlen)
3743 rhaas@postgresql.org 263 [ # # ]:UBC 0 : ereport(ERROR,
264 : : (errmsg("message corrupted"),
265 : : errdetail("The original message was %zu bytes but the final message is %zu bytes.",
266 : : origlen, newlen)));
267 : :
3743 rhaas@postgresql.org 268 [ + + ]:CBC 54001068 : for (i = 0; i < origlen; ++i)
269 [ - + ]: 54000864 : if (origdata[i] != newdata[i])
3743 rhaas@postgresql.org 270 [ # # ]:UBC 0 : ereport(ERROR,
271 : : (errmsg("message corrupted"),
272 : : errdetail("The new and original messages differ at byte %zu of %zu.", i, origlen)));
3743 rhaas@postgresql.org 273 :CBC 204 : }
|