LCOV - differential code coverage report
Current view: top level - src/test/modules/test_shm_mq - setup.c (source / functions) Coverage Total Hit UBC GNC CBC
Current: Differential Code Coverage 16@8cea358b128 vs 17@8cea358b128 Lines: 87.2 % 94 82 12 2 80
Current Date: 2024-04-14 14:21:10 Functions: 83.3 % 6 5 1 1 4
Baseline: 16@8cea358b128 Branches: 59.5 % 42 25 17 2 23
Baseline Date: 2024-04-14 14:21:09 Line coverage date bins:
Legend: Lines: hit not hit | Branches: + taken - not taken # not executed (180,240] days: 100.0 % 2 2 2
(240..) days: 87.0 % 92 80 12 80
Function coverage date bins:
(240..) days: 83.3 % 6 5 1 1 4
Branch coverage date bins:
(180,240] days: 100.0 % 2 2 2
(240..) days: 57.5 % 40 23 17 23

 Age         Owner                    Branch data    TLA  Line data    Source code
                                  1                 :                : /*--------------------------------------------------------------------------
                                  2                 :                :  *
                                  3                 :                :  * setup.c
                                  4                 :                :  *      Code to set up a dynamic shared memory segments and a specified
                                  5                 :                :  *      number of background workers for shared memory message queue
                                  6                 :                :  *      testing.
                                  7                 :                :  *
                                  8                 :                :  * Copyright (c) 2013-2024, PostgreSQL Global Development Group
                                  9                 :                :  *
                                 10                 :                :  * IDENTIFICATION
                                 11                 :                :  *      src/test/modules/test_shm_mq/setup.c
                                 12                 :                :  *
                                 13                 :                :  * -------------------------------------------------------------------------
                                 14                 :                :  */
                                 15                 :                : 
                                 16                 :                : #include "postgres.h"
                                 17                 :                : 
                                 18                 :                : #include "miscadmin.h"
                                 19                 :                : #include "pgstat.h"
                                 20                 :                : #include "postmaster/bgworker.h"
                                 21                 :                : #include "storage/procsignal.h"
                                 22                 :                : #include "storage/shm_toc.h"
                                 23                 :                : #include "test_shm_mq.h"
                                 24                 :                : #include "utils/memutils.h"
                                 25                 :                : 
                                 26                 :                : typedef struct
                                 27                 :                : {
                                 28                 :                :     int         nworkers;
                                 29                 :                :     BackgroundWorkerHandle *handle[FLEXIBLE_ARRAY_MEMBER];
                                 30                 :                : } worker_state;
                                 31                 :                : 
                                 32                 :                : static void setup_dynamic_shared_memory(int64 queue_size, int nworkers,
                                 33                 :                :                                         dsm_segment **segp,
                                 34                 :                :                                         test_shm_mq_header **hdrp,
                                 35                 :                :                                         shm_mq **outp, shm_mq **inp);
                                 36                 :                : static worker_state *setup_background_workers(int nworkers,
                                 37                 :                :                                               dsm_segment *seg);
                                 38                 :                : static void cleanup_background_workers(dsm_segment *seg, Datum arg);
                                 39                 :                : static void wait_for_workers_to_become_ready(worker_state *wstate,
                                 40                 :                :                                              volatile test_shm_mq_header *hdr);
                                 41                 :                : static bool check_worker_status(worker_state *wstate);
                                 42                 :                : 
                                 43                 :                : /* value cached, fetched from shared memory */
                                 44                 :                : static uint32 we_bgworker_startup = 0;
                                 45                 :                : 
                                 46                 :                : /*
                                 47                 :                :  * Set up a dynamic shared memory segment and zero or more background workers
                                 48                 :                :  * for a test run.
                                 49                 :                :  */
                                 50                 :                : void
 3742 rhaas@postgresql.org       51                 :CBC           5 : test_shm_mq_setup(int64 queue_size, int32 nworkers, dsm_segment **segp,
                                 52                 :                :                   shm_mq_handle **output, shm_mq_handle **input)
                                 53                 :                : {
                                 54                 :                :     dsm_segment *seg;
                                 55                 :                :     test_shm_mq_header *hdr;
 3631 bruce@momjian.us           56                 :              5 :     shm_mq     *outq = NULL;    /* placate compiler */
                                 57                 :              5 :     shm_mq     *inq = NULL;     /* placate compiler */
                                 58                 :                :     worker_state *wstate;
                                 59                 :                : 
                                 60                 :                :     /* Set up a dynamic shared memory segment. */
 3743 rhaas@postgresql.org       61                 :              5 :     setup_dynamic_shared_memory(queue_size, nworkers, &seg, &hdr, &outq, &inq);
                                 62                 :              5 :     *segp = seg;
                                 63                 :                : 
                                 64                 :                :     /* Register background workers. */
                                 65                 :              5 :     wstate = setup_background_workers(nworkers, seg);
                                 66                 :                : 
                                 67                 :                :     /* Attach the queues. */
                                 68                 :              5 :     *output = shm_mq_attach(outq, seg, wstate->handle[0]);
                                 69                 :              5 :     *input = shm_mq_attach(inq, seg, wstate->handle[nworkers - 1]);
                                 70                 :                : 
                                 71                 :                :     /* Wait for workers to become ready. */
                                 72                 :              5 :     wait_for_workers_to_become_ready(wstate, hdr);
                                 73                 :                : 
                                 74                 :                :     /*
                                 75                 :                :      * Once we reach this point, all workers are ready.  We no longer need to
                                 76                 :                :      * kill them if we die; they'll die on their own as the message queues
                                 77                 :                :      * shut down.
                                 78                 :                :      */
                                 79                 :              5 :     cancel_on_dsm_detach(seg, cleanup_background_workers,
                                 80                 :                :                          PointerGetDatum(wstate));
                                 81                 :              5 :     pfree(wstate);
                                 82                 :              5 : }
                                 83                 :                : 
                                 84                 :                : /*
                                 85                 :                :  * Set up a dynamic shared memory segment.
                                 86                 :                :  *
                                 87                 :                :  * We set up a small control region that contains only a test_shm_mq_header,
                                 88                 :                :  * plus one region per message queue.  There are as many message queues as
                                 89                 :                :  * the number of workers, plus one.
                                 90                 :                :  */
                                 91                 :                : static void
 3742                            92                 :              5 : setup_dynamic_shared_memory(int64 queue_size, int nworkers,
                                 93                 :                :                             dsm_segment **segp, test_shm_mq_header **hdrp,
                                 94                 :                :                             shm_mq **outp, shm_mq **inp)
                                 95                 :                : {
                                 96                 :                :     shm_toc_estimator e;
                                 97                 :                :     int         i;
                                 98                 :                :     Size        segsize;
                                 99                 :                :     dsm_segment *seg;
                                100                 :                :     shm_toc    *toc;
                                101                 :                :     test_shm_mq_header *hdr;
                                102                 :                : 
                                103                 :                :     /* Ensure a valid queue size. */
 3743                           104   [ +  -  -  + ]:              5 :     if (queue_size < 0 || ((uint64) queue_size) < shm_mq_minimum_size)
 3743 rhaas@postgresql.org      105         [ #  # ]:UBC           0 :         ereport(ERROR,
                                106                 :                :                 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
                                107                 :                :                  errmsg("queue size must be at least %zu bytes",
                                108                 :                :                         shm_mq_minimum_size)));
                                109                 :                :     if (queue_size != ((Size) queue_size))
                                110                 :                :         ereport(ERROR,
                                111                 :                :                 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
                                112                 :                :                  errmsg("queue size overflows size_t")));
                                113                 :                : 
                                114                 :                :     /*
                                115                 :                :      * Estimate how much shared memory we need.
                                116                 :                :      *
                                117                 :                :      * Because the TOC machinery may choose to insert padding of oddly-sized
                                118                 :                :      * requests, we must estimate each chunk separately.
                                119                 :                :      *
                                120                 :                :      * We need one key to register the location of the header, and we need
                                121                 :                :      * nworkers + 1 keys to track the locations of the message queues.
                                122                 :                :      */
 3743 rhaas@postgresql.org      123                 :CBC           5 :     shm_toc_initialize_estimator(&e);
                                124                 :              5 :     shm_toc_estimate_chunk(&e, sizeof(test_shm_mq_header));
                                125         [ +  + ]:             17 :     for (i = 0; i <= nworkers; ++i)
 3680 tgl@sss.pgh.pa.us         126                 :             12 :         shm_toc_estimate_chunk(&e, (Size) queue_size);
 3743 rhaas@postgresql.org      127                 :              5 :     shm_toc_estimate_keys(&e, 2 + nworkers);
                                128                 :              5 :     segsize = shm_toc_estimate(&e);
                                129                 :                : 
                                130                 :                :     /* Create the shared memory segment and establish a table of contents. */
 3314                           131                 :              5 :     seg = dsm_create(shm_toc_estimate(&e), 0);
 3743                           132                 :              5 :     toc = shm_toc_create(PG_TEST_SHM_MQ_MAGIC, dsm_segment_address(seg),
                                133                 :                :                          segsize);
                                134                 :                : 
                                135                 :                :     /* Set up the header region. */
                                136                 :              5 :     hdr = shm_toc_allocate(toc, sizeof(test_shm_mq_header));
                                137                 :              5 :     SpinLockInit(&hdr->mutex);
                                138                 :              5 :     hdr->workers_total = nworkers;
                                139                 :              5 :     hdr->workers_attached = 0;
                                140                 :              5 :     hdr->workers_ready = 0;
                                141                 :              5 :     shm_toc_insert(toc, 0, hdr);
                                142                 :                : 
                                143                 :                :     /* Set up one message queue per worker, plus one. */
                                144         [ +  + ]:             17 :     for (i = 0; i <= nworkers; ++i)
                                145                 :                :     {
                                146                 :                :         shm_mq     *mq;
                                147                 :                : 
 3680 tgl@sss.pgh.pa.us         148                 :             12 :         mq = shm_mq_create(shm_toc_allocate(toc, (Size) queue_size),
                                149                 :                :                            (Size) queue_size);
 3743 rhaas@postgresql.org      150                 :             12 :         shm_toc_insert(toc, i + 1, mq);
                                151                 :                : 
                                152         [ +  + ]:             12 :         if (i == 0)
                                153                 :                :         {
                                154                 :                :             /* We send messages to the first queue. */
                                155                 :              5 :             shm_mq_set_sender(mq, MyProc);
                                156                 :              5 :             *outp = mq;
                                157                 :                :         }
                                158         [ +  + ]:             12 :         if (i == nworkers)
                                159                 :                :         {
                                160                 :                :             /* We receive messages from the last queue. */
                                161                 :              5 :             shm_mq_set_receiver(mq, MyProc);
                                162                 :              5 :             *inp = mq;
                                163                 :                :         }
                                164                 :                :     }
                                165                 :                : 
                                166                 :                :     /* Return results to caller. */
                                167                 :              5 :     *segp = seg;
                                168                 :              5 :     *hdrp = hdr;
                                169                 :              5 : }
                                170                 :                : 
                                171                 :                : /*
                                172                 :                :  * Register background workers.
                                173                 :                :  */
                                174                 :                : static worker_state *
                                175                 :              5 : setup_background_workers(int nworkers, dsm_segment *seg)
                                176                 :                : {
                                177                 :                :     MemoryContext oldcontext;
                                178                 :                :     BackgroundWorker worker;
                                179                 :                :     worker_state *wstate;
                                180                 :                :     int         i;
                                181                 :                : 
                                182                 :                :     /*
                                183                 :                :      * We need the worker_state object and the background worker handles to
                                184                 :                :      * which it points to be allocated in CurTransactionContext rather than
                                185                 :                :      * ExprContext; otherwise, they'll be destroyed before the on_dsm_detach
                                186                 :                :      * hooks run.
                                187                 :                :      */
                                188                 :              5 :     oldcontext = MemoryContextSwitchTo(CurTransactionContext);
                                189                 :                : 
                                190                 :                :     /* Create worker state object. */
                                191                 :              5 :     wstate = MemoryContextAlloc(TopTransactionContext,
                                192                 :              5 :                                 offsetof(worker_state, handle) +
                                193                 :                :                                 sizeof(BackgroundWorkerHandle *) * nworkers);
                                194                 :              5 :     wstate->nworkers = 0;
                                195                 :                : 
                                196                 :                :     /*
                                197                 :                :      * Arrange to kill all the workers if we abort before all workers are
                                198                 :                :      * finished hooking themselves up to the dynamic shared memory segment.
                                199                 :                :      *
                                200                 :                :      * If we die after all the workers have finished hooking themselves up to
                                201                 :                :      * the dynamic shared memory segment, we'll mark the two queues to which
                                202                 :                :      * we're directly connected as detached, and the worker(s) connected to
                                203                 :                :      * those queues will exit, marking any other queues to which they are
                                204                 :                :      * connected as detached.  This will cause any as-yet-unaware workers
                                205                 :                :      * connected to those queues to exit in their turn, and so on, until
                                206                 :                :      * everybody exits.
                                207                 :                :      *
                                208                 :                :      * But suppose the workers which are supposed to connect to the queues to
                                209                 :                :      * which we're directly attached exit due to some error before they
                                210                 :                :      * actually attach the queues.  The remaining workers will have no way of
                                211                 :                :      * knowing this.  From their perspective, they're still waiting for those
                                212                 :                :      * workers to start, when in fact they've already died.
                                213                 :                :      */
                                214                 :              5 :     on_dsm_detach(seg, cleanup_background_workers,
                                215                 :                :                   PointerGetDatum(wstate));
                                216                 :                : 
                                217                 :                :     /* Configure a worker. */
 2555 tgl@sss.pgh.pa.us         218                 :              5 :     memset(&worker, 0, sizeof(worker));
 3743 rhaas@postgresql.org      219                 :              5 :     worker.bgw_flags = BGWORKER_SHMEM_ACCESS;
                                220                 :              5 :     worker.bgw_start_time = BgWorkerStart_ConsistentState;
                                221                 :              5 :     worker.bgw_restart_time = BGW_NEVER_RESTART;
                                222                 :              5 :     sprintf(worker.bgw_library_name, "test_shm_mq");
                                223                 :              5 :     sprintf(worker.bgw_function_name, "test_shm_mq_main");
 2418 peter_e@gmx.net           224                 :              5 :     snprintf(worker.bgw_type, BGW_MAXLEN, "test_shm_mq");
 3743 rhaas@postgresql.org      225                 :              5 :     worker.bgw_main_arg = UInt32GetDatum(dsm_segment_handle(seg));
                                226                 :                :     /* set bgw_notify_pid, so we can detect if the worker stops */
                                227                 :              5 :     worker.bgw_notify_pid = MyProcPid;
                                228                 :                : 
                                229                 :                :     /* Register the workers. */
                                230         [ +  + ]:             12 :     for (i = 0; i < nworkers; ++i)
                                231                 :                :     {
                                232         [ -  + ]:              7 :         if (!RegisterDynamicBackgroundWorker(&worker, &wstate->handle[i]))
 3743 rhaas@postgresql.org      233         [ #  # ]:UBC           0 :             ereport(ERROR,
                                234                 :                :                     (errcode(ERRCODE_INSUFFICIENT_RESOURCES),
                                235                 :                :                      errmsg("could not register background process"),
                                236                 :                :                      errhint("You may need to increase max_worker_processes.")));
 3743 rhaas@postgresql.org      237                 :CBC           7 :         ++wstate->nworkers;
                                238                 :                :     }
                                239                 :                : 
                                240                 :                :     /* All done. */
                                241                 :              5 :     MemoryContextSwitchTo(oldcontext);
                                242                 :              5 :     return wstate;
                                243                 :                : }
                                244                 :                : 
                                245                 :                : static void
 3743 rhaas@postgresql.org      246                 :UBC           0 : cleanup_background_workers(dsm_segment *seg, Datum arg)
                                247                 :                : {
                                248                 :              0 :     worker_state *wstate = (worker_state *) DatumGetPointer(arg);
                                249                 :                : 
                                250         [ #  # ]:              0 :     while (wstate->nworkers > 0)
                                251                 :                :     {
                                252                 :              0 :         --wstate->nworkers;
                                253                 :              0 :         TerminateBackgroundWorker(wstate->handle[wstate->nworkers]);
                                254                 :                :     }
                                255                 :              0 : }
                                256                 :                : 
                                257                 :                : static void
 3743 rhaas@postgresql.org      258                 :CBC           5 : wait_for_workers_to_become_ready(worker_state *wstate,
                                259                 :                :                                  volatile test_shm_mq_header *hdr)
                                260                 :                : {
 3631 bruce@momjian.us          261                 :              5 :     bool        result = false;
                                262                 :                : 
                                263                 :                :     for (;;)
 3743 rhaas@postgresql.org      264                 :             15 :     {
                                265                 :                :         int         workers_ready;
                                266                 :                : 
                                267                 :                :         /* If all the workers are ready, we have succeeded. */
 3110                           268         [ -  + ]:             20 :         SpinLockAcquire(&hdr->mutex);
                                269                 :             20 :         workers_ready = hdr->workers_ready;
                                270                 :             20 :         SpinLockRelease(&hdr->mutex);
                                271         [ +  + ]:             20 :         if (workers_ready >= wstate->nworkers)
                                272                 :                :         {
                                273                 :              5 :             result = true;
                                274                 :              5 :             break;
                                275                 :                :         }
                                276                 :                : 
                                277                 :                :         /* If any workers (or the postmaster) have died, we have failed. */
                                278         [ -  + ]:             15 :         if (!check_worker_status(wstate))
                                279                 :                :         {
 3110 rhaas@postgresql.org      280                 :UBC           0 :             result = false;
                                281                 :              0 :             break;
                                282                 :                :         }
                                283                 :                : 
                                284                 :                :         /* first time, allocate or get the custom wait event */
  193 michael@paquier.xyz       285         [ +  + ]:GNC          15 :         if (we_bgworker_startup == 0)
                                286                 :              1 :             we_bgworker_startup = WaitEventExtensionNew("TestShmMqBgWorkerStartup");
                                287                 :                : 
                                288                 :                :         /* Wait to be signaled. */
 1969 tmunro@postgresql.or      289                 :CBC          15 :         (void) WaitLatch(MyLatch, WL_LATCH_SET | WL_EXIT_ON_PM_DEATH, 0,
                                290                 :                :                          we_bgworker_startup);
                                291                 :                : 
                                292                 :                :         /* Reset the latch so we don't spin. */
 3110 rhaas@postgresql.org      293                 :             15 :         ResetLatch(MyLatch);
                                294                 :                : 
                                295                 :                :         /* An interrupt may have occurred while we were waiting. */
 2813 tgl@sss.pgh.pa.us         296         [ -  + ]:             15 :         CHECK_FOR_INTERRUPTS();
                                297                 :                :     }
                                298                 :                : 
 3743 rhaas@postgresql.org      299         [ -  + ]:              5 :     if (!result)
 3743 rhaas@postgresql.org      300         [ #  # ]:UBC           0 :         ereport(ERROR,
                                301                 :                :                 (errcode(ERRCODE_INSUFFICIENT_RESOURCES),
                                302                 :                :                  errmsg("one or more background workers failed to start")));
 3743 rhaas@postgresql.org      303                 :CBC           5 : }
                                304                 :                : 
                                305                 :                : static bool
                                306                 :             15 : check_worker_status(worker_state *wstate)
                                307                 :                : {
                                308                 :                :     int         n;
                                309                 :                : 
                                310                 :                :     /* If any workers (or the postmaster) have died, we have failed. */
                                311         [ +  + ]:             44 :     for (n = 0; n < wstate->nworkers; ++n)
                                312                 :                :     {
                                313                 :                :         BgwHandleStatus status;
                                314                 :                :         pid_t       pid;
                                315                 :                : 
                                316                 :             29 :         status = GetBackgroundWorkerPid(wstate->handle[n], &pid);
                                317   [ +  -  -  + ]:             29 :         if (status == BGWH_STOPPED || status == BGWH_POSTMASTER_DIED)
 3743 rhaas@postgresql.org      318                 :UBC           0 :             return false;
                                319                 :                :     }
                                320                 :                : 
                                321                 :                :     /* Otherwise, things still look OK. */
 3743 rhaas@postgresql.org      322                 :CBC          15 :     return true;
                                323                 :                : }
        

Generated by: LCOV version 2.1-beta2-3-g6141622