LCOV - differential code coverage report
Current view: top level - src/test/modules/test_shm_mq - test.c (source / functions) Coverage Total Hit UIC GBC GIC CBC ECB
Current: Differential Code Coverage HEAD vs 15 Lines: 84.6 % 78 66 12 12 29 25 41
Current Date: 2023-04-08 17:13:01 Functions: 100.0 % 6 6 6 6
Baseline: 15 Line coverage date bins:
Baseline Date: 2023-04-08 15:09:40 (240..) days: 84.6 % 78 66 12 12 29 25 41
Legend: Lines: hit not hit Function coverage date bins:
(240..) days: 50.0 % 12 6 6 6

 Age         Owner                  TLA  Line data    Source code
                                  1                 : /*--------------------------------------------------------------------------
                                  2                 :  *
                                  3                 :  * test.c
                                  4                 :  *      Test harness code for shared memory message queues.
                                  5                 :  *
                                  6                 :  * Copyright (c) 2013-2023, PostgreSQL Global Development Group
                                  7                 :  *
                                  8                 :  * IDENTIFICATION
                                  9                 :  *      src/test/modules/test_shm_mq/test.c
                                 10                 :  *
                                 11                 :  * -------------------------------------------------------------------------
                                 12                 :  */
                                 13                 : 
                                 14                 : #include "postgres.h"
                                 15                 : 
                                 16                 : #include "fmgr.h"
                                 17                 : #include "miscadmin.h"
                                 18                 : #include "pgstat.h"
                                 19                 : #include "varatt.h"
                                 20                 : 
                                 21                 : #include "test_shm_mq.h"
                                 22                 : 
 3195 bruce                      23 GIC           8 : PG_MODULE_MAGIC;
 3195 bruce                      24 ECB             : 
 3195 bruce                      25 GIC           2 : PG_FUNCTION_INFO_V1(test_shm_mq);
 3372 rhaas                      26 CBC           2 : PG_FUNCTION_INFO_V1(test_shm_mq_pipelined);
 3372 rhaas                      27 ECB             : 
                                 28                 : static void verify_message(Size origlen, char *origdata, Size newlen,
                                 29                 :                            char *newdata);
                                 30                 : 
                                 31                 : /*
                                 32                 :  * Simple test of the shared memory message queue infrastructure.
                                 33                 :  *
                                 34                 :  * We set up a ring of message queues passing through 1 or more background
                                 35                 :  * processes and eventually looping back to ourselves.  We then send a message
                                 36                 :  * through the ring a number of times indicated by the loop count.  At the end,
                                 37                 :  * we check whether the final message matches the one we started with.
                                 38                 :  */
                                 39                 : Datum
 3372 rhaas                      40 GIC           4 : test_shm_mq(PG_FUNCTION_ARGS)
 3372 rhaas                      41 ECB             : {
 3372 rhaas                      42 CBC           4 :     int64       queue_size = PG_GETARG_INT64(0);
                                 43               4 :     text       *message = PG_GETARG_TEXT_PP(1);
                                 44               4 :     char       *message_contents = VARDATA_ANY(message);
                                 45               4 :     int         message_size = VARSIZE_ANY_EXHDR(message);
                                 46               4 :     int32       loop_count = PG_GETARG_INT32(2);
 3372 rhaas                      47 GIC           4 :     int32       nworkers = PG_GETARG_INT32(3);
                                 48                 :     dsm_segment *seg;
                                 49                 :     shm_mq_handle *outqh;
                                 50                 :     shm_mq_handle *inqh;
                                 51                 :     shm_mq_result res;
                                 52                 :     Size        len;
                                 53                 :     void       *data;
                                 54                 : 
 3372 rhaas                      55 ECB             :     /* A negative loopcount is nonsensical. */
 3372 rhaas                      56 GBC           4 :     if (loop_count < 0)
 3372 rhaas                      57 UIC           0 :         ereport(ERROR,
                                 58                 :                 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
                                 59                 :                  errmsg("repeat count size must be an integer value greater than or equal to zero")));
                                 60                 : 
                                 61                 :     /*
                                 62                 :      * Since this test sends data using the blocking interfaces, it cannot
                                 63                 :      * send data to itself.  Therefore, a minimum of 1 worker is required. Of
                                 64                 :      * course, a negative worker count is nonsensical.
 3372 rhaas                      65 ECB             :      */
  620 fujii                      66 GBC           4 :     if (nworkers <= 0)
 3372 rhaas                      67 UIC           0 :         ereport(ERROR,
                                 68                 :                 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
                                 69                 :                  errmsg("number of workers must be an integer value greater than zero")));
                                 70                 : 
 3372 rhaas                      71 ECB             :     /* Set up dynamic shared memory segment and background workers. */
 3372 rhaas                      72 GIC           4 :     test_shm_mq_setup(queue_size, nworkers, &seg, &outqh, &inqh);
                                 73                 : 
 3372 rhaas                      74 ECB             :     /* Send the initial message. */
  542 rhaas                      75 CBC           4 :     res = shm_mq_send(outqh, message_size, message_contents, false, true);
 3372 rhaas                      76 GBC           4 :     if (res != SHM_MQ_SUCCESS)
 3372 rhaas                      77 UIC           0 :         ereport(ERROR,
                                 78                 :                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
                                 79                 :                  errmsg("could not send message")));
                                 80                 : 
                                 81                 :     /*
                                 82                 :      * Receive a message and send it back out again.  Do this a number of
                                 83                 :      * times equal to the loop count.
                                 84                 :      */
                                 85                 :     for (;;)
                                 86                 :     {
 3372 rhaas                      87 ECB             :         /* Receive a message. */
 3372 rhaas                      88 CBC       24001 :         res = shm_mq_receive(inqh, &len, &data, false);
 3372 rhaas                      89 GBC       24001 :         if (res != SHM_MQ_SUCCESS)
 3372 rhaas                      90 UIC           0 :             ereport(ERROR,
                                 91                 :                     (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
                                 92                 :                      errmsg("could not receive message")));
                                 93                 : 
 3372 rhaas                      94 ECB             :         /* If this is supposed to be the last iteration, stop here. */
 3372 rhaas                      95 CBC       24001 :         if (--loop_count <= 0)
 3372 rhaas                      96 GIC           4 :             break;
                                 97                 : 
 3372 rhaas                      98 ECB             :         /* Send it back out. */
  542 rhaas                      99 CBC       23997 :         res = shm_mq_send(outqh, len, data, false, true);
 3372 rhaas                     100 GBC       23997 :         if (res != SHM_MQ_SUCCESS)
 3372 rhaas                     101 UIC           0 :             ereport(ERROR,
                                102                 :                     (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
                                103                 :                      errmsg("could not send message")));
                                104                 :     }
                                105                 : 
                                106                 :     /*
                                107                 :      * Finally, check that we got back the same message from the last
                                108                 :      * iteration that we originally sent.
 3372 rhaas                     109 ECB             :      */
 3372 rhaas                     110 GIC           4 :     verify_message(message_size, message_contents, len, data);
                                111                 : 
 3372 rhaas                     112 ECB             :     /* Clean up. */
 3372 rhaas                     113 GIC           4 :     dsm_detach(seg);
 3372 rhaas                     114 ECB             : 
 3372 rhaas                     115 GIC           4 :     PG_RETURN_VOID();
                                116                 : }
                                117                 : 
                                118                 : /*
                                119                 :  * Pipelined test of the shared memory message queue infrastructure.
                                120                 :  *
                                121                 :  * As in the basic test, we set up a ring of message queues passing through
                                122                 :  * 1 or more background processes and eventually looping back to ourselves.
                                123                 :  * Then, we send N copies of the user-specified message through the ring and
                                124                 :  * receive them all back.  Since this might fill up all message queues in the
                                125                 :  * ring and then stall, we must be prepared to begin receiving the messages
                                126                 :  * back before we've finished sending them.
                                127                 :  */
 3372 rhaas                     128 ECB             : Datum
 3372 rhaas                     129 GIC           1 : test_shm_mq_pipelined(PG_FUNCTION_ARGS)
 3372 rhaas                     130 ECB             : {
 3372 rhaas                     131 CBC           1 :     int64       queue_size = PG_GETARG_INT64(0);
                                132               1 :     text       *message = PG_GETARG_TEXT_PP(1);
                                133               1 :     char       *message_contents = VARDATA_ANY(message);
                                134               1 :     int         message_size = VARSIZE_ANY_EXHDR(message);
                                135               1 :     int32       loop_count = PG_GETARG_INT32(2);
                                136               1 :     int32       nworkers = PG_GETARG_INT32(3);
                                137               1 :     bool        verify = PG_GETARG_BOOL(4);
                                138               1 :     int32       send_count = 0;
 3372 rhaas                     139 GIC           1 :     int32       receive_count = 0;
                                140                 :     dsm_segment *seg;
                                141                 :     shm_mq_handle *outqh;
                                142                 :     shm_mq_handle *inqh;
                                143                 :     shm_mq_result res;
                                144                 :     Size        len;
                                145                 :     void       *data;
                                146                 : 
 3372 rhaas                     147 ECB             :     /* A negative loopcount is nonsensical. */
 3372 rhaas                     148 GBC           1 :     if (loop_count < 0)
 3372 rhaas                     149 UIC           0 :         ereport(ERROR,
                                150                 :                 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
                                151                 :                  errmsg("repeat count size must be an integer value greater than or equal to zero")));
                                152                 : 
                                153                 :     /*
                                154                 :      * Using the nonblocking interfaces, we can even send data to ourselves,
                                155                 :      * so the minimum number of workers for this test is zero.
 3372 rhaas                     156 ECB             :      */
 3372 rhaas                     157 GBC           1 :     if (nworkers < 0)
 3372 rhaas                     158 UIC           0 :         ereport(ERROR,
                                159                 :                 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
                                160                 :                  errmsg("number of workers must be an integer value greater than or equal to zero")));
                                161                 : 
 3372 rhaas                     162 ECB             :     /* Set up dynamic shared memory segment and background workers. */
 3372 rhaas                     163 GIC           1 :     test_shm_mq_setup(queue_size, nworkers, &seg, &outqh, &inqh);
                                164                 : 
                                165                 :     /* Main loop. */
 3372 rhaas                     166 ECB             :     for (;;)
 3372 rhaas                     167 CBC        4589 :     {
 3372 rhaas                     168 GIC        4590 :         bool        wait = true;
                                169                 : 
                                170                 :         /*
                                171                 :          * If we haven't yet sent the message the requisite number of times,
                                172                 :          * try again to send it now.  Note that when shm_mq_send() returns
                                173                 :          * SHM_MQ_WOULD_BLOCK, the next call to that function must pass the
                                174                 :          * same message size and contents; that's not an issue here because
                                175                 :          * we're sending the same message every time.
 3372 rhaas                     176 ECB             :          */
 3372 rhaas                     177 GIC        4590 :         if (send_count < loop_count)
 3372 rhaas                     178 ECB             :         {
  542 rhaas                     179 GIC        4569 :             res = shm_mq_send(outqh, message_size, message_contents, true,
  542 rhaas                     180 ECB             :                               true);
 3372 rhaas                     181 GIC        4569 :             if (res == SHM_MQ_SUCCESS)
 3372 rhaas                     182 ECB             :             {
 3372 rhaas                     183 CBC         200 :                 ++send_count;
 3372 rhaas                     184 GIC         200 :                 wait = false;
 3372 rhaas                     185 ECB             :             }
 3372 rhaas                     186 GBC        4369 :             else if (res == SHM_MQ_DETACHED)
 3372 rhaas                     187 UIC           0 :                 ereport(ERROR,
                                188                 :                         (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
                                189                 :                          errmsg("could not send message")));
                                190                 :         }
                                191                 : 
                                192                 :         /*
                                193                 :          * If we haven't yet received the message the requisite number of
                                194                 :          * times, try to receive it again now.
 3372 rhaas                     195 ECB             :          */
 3372 rhaas                     196 GIC        4590 :         if (receive_count < loop_count)
 3372 rhaas                     197 ECB             :         {
 3372 rhaas                     198 CBC        4589 :             res = shm_mq_receive(inqh, &len, &data, true);
 3372 rhaas                     199 GIC        4589 :             if (res == SHM_MQ_SUCCESS)
 3372 rhaas                     200 ECB             :             {
 3372 rhaas                     201 GIC         200 :                 ++receive_count;
 3372 rhaas                     202 ECB             :                 /* Verifying every time is slow, so it's optional. */
 3372 rhaas                     203 CBC         200 :                 if (verify)
                                204             200 :                     verify_message(message_size, message_contents, len, data);
 3372 rhaas                     205 GIC         200 :                 wait = false;
 3372 rhaas                     206 ECB             :             }
 3372 rhaas                     207 GBC        4389 :             else if (res == SHM_MQ_DETACHED)
 3372 rhaas                     208 UIC           0 :                 ereport(ERROR,
                                209                 :                         (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
                                210                 :                          errmsg("could not receive message")));
                                211                 :         }
                                212                 :         else
                                213                 :         {
                                214                 :             /*
                                215                 :              * Otherwise, we've received the message enough times.  This
                                216                 :              * shouldn't happen unless we've also sent it enough times.
 3372 rhaas                     217 ECB             :              */
 3372 rhaas                     218 GBC           1 :             if (send_count != receive_count)
 3372 rhaas                     219 UIC           0 :                 ereport(ERROR,
                                220                 :                         (errcode(ERRCODE_INTERNAL_ERROR),
                                221                 :                          errmsg("message sent %d times, but received %d times",
 2118 tgl                       222 ECB             :                                 send_count, receive_count)));
 3372 rhaas                     223 GIC           1 :             break;
                                224                 :         }
 3372 rhaas                     225 ECB             : 
 3372 rhaas                     226 GIC        4589 :         if (wait)
                                227                 :         {
                                228                 :             /*
                                229                 :              * If we made no progress, wait for one of the other processes to
                                230                 :              * which we are connected to set our latch, indicating that they
                                231                 :              * have read or written data and therefore there may now be work
                                232                 :              * for us to do.
 3372 rhaas                     233 ECB             :              */
 1598 tmunro                    234 GIC        4212 :             (void) WaitLatch(MyLatch, WL_LATCH_SET | WL_EXIT_ON_PM_DEATH, 0,
 1598 tmunro                    235 ECB             :                              PG_WAIT_EXTENSION);
 3007 andres                    236 CBC        4212 :             ResetLatch(MyLatch);
 2442 tgl                       237 GIC        4212 :             CHECK_FOR_INTERRUPTS();
                                238                 :         }
                                239                 :     }
                                240                 : 
 3372 rhaas                     241 ECB             :     /* Clean up. */
 3372 rhaas                     242 GIC           1 :     dsm_detach(seg);
 3372 rhaas                     243 ECB             : 
 3372 rhaas                     244 GIC           1 :     PG_RETURN_VOID();
                                245                 : }
                                246                 : 
                                247                 : /*
                                248                 :  * Verify that two messages are the same.
                                249                 :  */
 3372 rhaas                     250 ECB             : static void
 3309 rhaas                     251 GIC         204 : verify_message(Size origlen, char *origdata, Size newlen, char *newdata)
                                252                 : {
                                253                 :     Size        i;
 3372 rhaas                     254 ECB             : 
 3372 rhaas                     255 GBC         204 :     if (origlen != newlen)
 3372 rhaas                     256 UIC           0 :         ereport(ERROR,
                                257                 :                 (errmsg("message corrupted"),
                                258                 :                  errdetail("The original message was %zu bytes but the final message is %zu bytes.",
                                259                 :                            origlen, newlen)));
 3372 rhaas                     260 ECB             : 
 3372 rhaas                     261 CBC    54001336 :     for (i = 0; i < origlen; ++i)
 3372 rhaas                     262 GBC    54001132 :         if (origdata[i] != newdata[i])
 3372 rhaas                     263 UIC           0 :             ereport(ERROR,
                                264                 :                     (errmsg("message corrupted"),
 3309 tgl                       265 ECB             :                      errdetail("The new and original messages differ at byte %zu of %zu.", i, origlen)));
 3372 rhaas                     266 GIC         204 : }
        

Generated by: LCOV version v1.16-55-g56c0a2a