Age Owner TLA Line data Source code
1 : /*--------------------------------------------------------------------------
2 : *
3 : * test.c
4 : * Test harness code for shared memory message queues.
5 : *
6 : * Copyright (c) 2013-2023, PostgreSQL Global Development Group
7 : *
8 : * IDENTIFICATION
9 : * src/test/modules/test_shm_mq/test.c
10 : *
11 : * -------------------------------------------------------------------------
12 : */
13 :
14 : #include "postgres.h"
15 :
16 : #include "fmgr.h"
17 : #include "miscadmin.h"
18 : #include "pgstat.h"
19 : #include "varatt.h"
20 :
21 : #include "test_shm_mq.h"
22 :
3195 bruce 23 GIC 8 : PG_MODULE_MAGIC;
3195 bruce 24 ECB :
3195 bruce 25 GIC 2 : PG_FUNCTION_INFO_V1(test_shm_mq);
3372 rhaas 26 CBC 2 : PG_FUNCTION_INFO_V1(test_shm_mq_pipelined);
3372 rhaas 27 ECB :
28 : static void verify_message(Size origlen, char *origdata, Size newlen,
29 : char *newdata);
30 :
31 : /*
32 : * Simple test of the shared memory message queue infrastructure.
33 : *
34 : * We set up a ring of message queues passing through 1 or more background
35 : * processes and eventually looping back to ourselves. We then send a message
36 : * through the ring a number of times indicated by the loop count. At the end,
37 : * we check whether the final message matches the one we started with.
38 : */
39 : Datum
3372 rhaas 40 GIC 4 : test_shm_mq(PG_FUNCTION_ARGS)
3372 rhaas 41 ECB : {
3372 rhaas 42 CBC 4 : int64 queue_size = PG_GETARG_INT64(0);
43 4 : text *message = PG_GETARG_TEXT_PP(1);
44 4 : char *message_contents = VARDATA_ANY(message);
45 4 : int message_size = VARSIZE_ANY_EXHDR(message);
46 4 : int32 loop_count = PG_GETARG_INT32(2);
3372 rhaas 47 GIC 4 : int32 nworkers = PG_GETARG_INT32(3);
48 : dsm_segment *seg;
49 : shm_mq_handle *outqh;
50 : shm_mq_handle *inqh;
51 : shm_mq_result res;
52 : Size len;
53 : void *data;
54 :
3372 rhaas 55 ECB : /* A negative loopcount is nonsensical. */
3372 rhaas 56 GBC 4 : if (loop_count < 0)
3372 rhaas 57 UIC 0 : ereport(ERROR,
58 : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
59 : errmsg("repeat count size must be an integer value greater than or equal to zero")));
60 :
61 : /*
62 : * Since this test sends data using the blocking interfaces, it cannot
63 : * send data to itself. Therefore, a minimum of 1 worker is required. Of
64 : * course, a negative worker count is nonsensical.
3372 rhaas 65 ECB : */
620 fujii 66 GBC 4 : if (nworkers <= 0)
3372 rhaas 67 UIC 0 : ereport(ERROR,
68 : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
69 : errmsg("number of workers must be an integer value greater than zero")));
70 :
3372 rhaas 71 ECB : /* Set up dynamic shared memory segment and background workers. */
3372 rhaas 72 GIC 4 : test_shm_mq_setup(queue_size, nworkers, &seg, &outqh, &inqh);
73 :
3372 rhaas 74 ECB : /* Send the initial message. */
542 rhaas 75 CBC 4 : res = shm_mq_send(outqh, message_size, message_contents, false, true);
3372 rhaas 76 GBC 4 : if (res != SHM_MQ_SUCCESS)
3372 rhaas 77 UIC 0 : ereport(ERROR,
78 : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
79 : errmsg("could not send message")));
80 :
81 : /*
82 : * Receive a message and send it back out again. Do this a number of
83 : * times equal to the loop count.
84 : */
85 : for (;;)
86 : {
3372 rhaas 87 ECB : /* Receive a message. */
3372 rhaas 88 CBC 24001 : res = shm_mq_receive(inqh, &len, &data, false);
3372 rhaas 89 GBC 24001 : if (res != SHM_MQ_SUCCESS)
3372 rhaas 90 UIC 0 : ereport(ERROR,
91 : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
92 : errmsg("could not receive message")));
93 :
3372 rhaas 94 ECB : /* If this is supposed to be the last iteration, stop here. */
3372 rhaas 95 CBC 24001 : if (--loop_count <= 0)
3372 rhaas 96 GIC 4 : break;
97 :
3372 rhaas 98 ECB : /* Send it back out. */
542 rhaas 99 CBC 23997 : res = shm_mq_send(outqh, len, data, false, true);
3372 rhaas 100 GBC 23997 : if (res != SHM_MQ_SUCCESS)
3372 rhaas 101 UIC 0 : ereport(ERROR,
102 : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
103 : errmsg("could not send message")));
104 : }
105 :
106 : /*
107 : * Finally, check that we got back the same message from the last
108 : * iteration that we originally sent.
3372 rhaas 109 ECB : */
3372 rhaas 110 GIC 4 : verify_message(message_size, message_contents, len, data);
111 :
3372 rhaas 112 ECB : /* Clean up. */
3372 rhaas 113 GIC 4 : dsm_detach(seg);
3372 rhaas 114 ECB :
3372 rhaas 115 GIC 4 : PG_RETURN_VOID();
116 : }
117 :
118 : /*
119 : * Pipelined test of the shared memory message queue infrastructure.
120 : *
121 : * As in the basic test, we set up a ring of message queues passing through
122 : * 1 or more background processes and eventually looping back to ourselves.
123 : * Then, we send N copies of the user-specified message through the ring and
124 : * receive them all back. Since this might fill up all message queues in the
125 : * ring and then stall, we must be prepared to begin receiving the messages
126 : * back before we've finished sending them.
127 : */
3372 rhaas 128 ECB : Datum
3372 rhaas 129 GIC 1 : test_shm_mq_pipelined(PG_FUNCTION_ARGS)
3372 rhaas 130 ECB : {
3372 rhaas 131 CBC 1 : int64 queue_size = PG_GETARG_INT64(0);
132 1 : text *message = PG_GETARG_TEXT_PP(1);
133 1 : char *message_contents = VARDATA_ANY(message);
134 1 : int message_size = VARSIZE_ANY_EXHDR(message);
135 1 : int32 loop_count = PG_GETARG_INT32(2);
136 1 : int32 nworkers = PG_GETARG_INT32(3);
137 1 : bool verify = PG_GETARG_BOOL(4);
138 1 : int32 send_count = 0;
3372 rhaas 139 GIC 1 : int32 receive_count = 0;
140 : dsm_segment *seg;
141 : shm_mq_handle *outqh;
142 : shm_mq_handle *inqh;
143 : shm_mq_result res;
144 : Size len;
145 : void *data;
146 :
3372 rhaas 147 ECB : /* A negative loopcount is nonsensical. */
3372 rhaas 148 GBC 1 : if (loop_count < 0)
3372 rhaas 149 UIC 0 : ereport(ERROR,
150 : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
151 : errmsg("repeat count size must be an integer value greater than or equal to zero")));
152 :
153 : /*
154 : * Using the nonblocking interfaces, we can even send data to ourselves,
155 : * so the minimum number of workers for this test is zero.
3372 rhaas 156 ECB : */
3372 rhaas 157 GBC 1 : if (nworkers < 0)
3372 rhaas 158 UIC 0 : ereport(ERROR,
159 : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
160 : errmsg("number of workers must be an integer value greater than or equal to zero")));
161 :
3372 rhaas 162 ECB : /* Set up dynamic shared memory segment and background workers. */
3372 rhaas 163 GIC 1 : test_shm_mq_setup(queue_size, nworkers, &seg, &outqh, &inqh);
164 :
165 : /* Main loop. */
3372 rhaas 166 ECB : for (;;)
3372 rhaas 167 CBC 4589 : {
3372 rhaas 168 GIC 4590 : bool wait = true;
169 :
170 : /*
171 : * If we haven't yet sent the message the requisite number of times,
172 : * try again to send it now. Note that when shm_mq_send() returns
173 : * SHM_MQ_WOULD_BLOCK, the next call to that function must pass the
174 : * same message size and contents; that's not an issue here because
175 : * we're sending the same message every time.
3372 rhaas 176 ECB : */
3372 rhaas 177 GIC 4590 : if (send_count < loop_count)
3372 rhaas 178 ECB : {
542 rhaas 179 GIC 4569 : res = shm_mq_send(outqh, message_size, message_contents, true,
542 rhaas 180 ECB : true);
3372 rhaas 181 GIC 4569 : if (res == SHM_MQ_SUCCESS)
3372 rhaas 182 ECB : {
3372 rhaas 183 CBC 200 : ++send_count;
3372 rhaas 184 GIC 200 : wait = false;
3372 rhaas 185 ECB : }
3372 rhaas 186 GBC 4369 : else if (res == SHM_MQ_DETACHED)
3372 rhaas 187 UIC 0 : ereport(ERROR,
188 : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
189 : errmsg("could not send message")));
190 : }
191 :
192 : /*
193 : * If we haven't yet received the message the requisite number of
194 : * times, try to receive it again now.
3372 rhaas 195 ECB : */
3372 rhaas 196 GIC 4590 : if (receive_count < loop_count)
3372 rhaas 197 ECB : {
3372 rhaas 198 CBC 4589 : res = shm_mq_receive(inqh, &len, &data, true);
3372 rhaas 199 GIC 4589 : if (res == SHM_MQ_SUCCESS)
3372 rhaas 200 ECB : {
3372 rhaas 201 GIC 200 : ++receive_count;
3372 rhaas 202 ECB : /* Verifying every time is slow, so it's optional. */
3372 rhaas 203 CBC 200 : if (verify)
204 200 : verify_message(message_size, message_contents, len, data);
3372 rhaas 205 GIC 200 : wait = false;
3372 rhaas 206 ECB : }
3372 rhaas 207 GBC 4389 : else if (res == SHM_MQ_DETACHED)
3372 rhaas 208 UIC 0 : ereport(ERROR,
209 : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
210 : errmsg("could not receive message")));
211 : }
212 : else
213 : {
214 : /*
215 : * Otherwise, we've received the message enough times. This
216 : * shouldn't happen unless we've also sent it enough times.
3372 rhaas 217 ECB : */
3372 rhaas 218 GBC 1 : if (send_count != receive_count)
3372 rhaas 219 UIC 0 : ereport(ERROR,
220 : (errcode(ERRCODE_INTERNAL_ERROR),
221 : errmsg("message sent %d times, but received %d times",
2118 tgl 222 ECB : send_count, receive_count)));
3372 rhaas 223 GIC 1 : break;
224 : }
3372 rhaas 225 ECB :
3372 rhaas 226 GIC 4589 : if (wait)
227 : {
228 : /*
229 : * If we made no progress, wait for one of the other processes to
230 : * which we are connected to set our latch, indicating that they
231 : * have read or written data and therefore there may now be work
232 : * for us to do.
3372 rhaas 233 ECB : */
1598 tmunro 234 GIC 4212 : (void) WaitLatch(MyLatch, WL_LATCH_SET | WL_EXIT_ON_PM_DEATH, 0,
1598 tmunro 235 ECB : PG_WAIT_EXTENSION);
3007 andres 236 CBC 4212 : ResetLatch(MyLatch);
2442 tgl 237 GIC 4212 : CHECK_FOR_INTERRUPTS();
238 : }
239 : }
240 :
3372 rhaas 241 ECB : /* Clean up. */
3372 rhaas 242 GIC 1 : dsm_detach(seg);
3372 rhaas 243 ECB :
3372 rhaas 244 GIC 1 : PG_RETURN_VOID();
245 : }
246 :
247 : /*
248 : * Verify that two messages are the same.
249 : */
3372 rhaas 250 ECB : static void
3309 rhaas 251 GIC 204 : verify_message(Size origlen, char *origdata, Size newlen, char *newdata)
252 : {
253 : Size i;
3372 rhaas 254 ECB :
3372 rhaas 255 GBC 204 : if (origlen != newlen)
3372 rhaas 256 UIC 0 : ereport(ERROR,
257 : (errmsg("message corrupted"),
258 : errdetail("The original message was %zu bytes but the final message is %zu bytes.",
259 : origlen, newlen)));
3372 rhaas 260 ECB :
3372 rhaas 261 CBC 54001336 : for (i = 0; i < origlen; ++i)
3372 rhaas 262 GBC 54001132 : if (origdata[i] != newdata[i])
3372 rhaas 263 UIC 0 : ereport(ERROR,
264 : (errmsg("message corrupted"),
3309 tgl 265 ECB : errdetail("The new and original messages differ at byte %zu of %zu.", i, origlen)));
3372 rhaas 266 GIC 204 : }
|