LCOV - differential code coverage report
Current view: top level - src/test/modules/test_shm_mq - setup.c (source / functions) Coverage Total Hit UBC CBC
Current: Differential Code Coverage HEAD vs 15 Lines: 87.0 % 92 80 12 80
Current Date: 2023-04-08 15:15:32 Functions: 83.3 % 6 5 1 5
Baseline: 15
Baseline Date: 2023-04-08 15:09:40
Legend: Lines: hit not hit

           TLA  Line data    Source code
       1                 : /*--------------------------------------------------------------------------
       2                 :  *
       3                 :  * setup.c
       4                 :  *      Code to set up a dynamic shared memory segments and a specified
       5                 :  *      number of background workers for shared memory message queue
       6                 :  *      testing.
       7                 :  *
       8                 :  * Copyright (c) 2013-2023, PostgreSQL Global Development Group
       9                 :  *
      10                 :  * IDENTIFICATION
      11                 :  *      src/test/modules/test_shm_mq/setup.c
      12                 :  *
      13                 :  * -------------------------------------------------------------------------
      14                 :  */
      15                 : 
      16                 : #include "postgres.h"
      17                 : 
      18                 : #include "miscadmin.h"
      19                 : #include "pgstat.h"
      20                 : #include "postmaster/bgworker.h"
      21                 : #include "storage/procsignal.h"
      22                 : #include "storage/shm_toc.h"
      23                 : #include "test_shm_mq.h"
      24                 : #include "utils/memutils.h"
      25                 : 
      26                 : typedef struct
      27                 : {
      28                 :     int         nworkers;
      29                 :     BackgroundWorkerHandle *handle[FLEXIBLE_ARRAY_MEMBER];
      30                 : } worker_state;
      31                 : 
      32                 : static void setup_dynamic_shared_memory(int64 queue_size, int nworkers,
      33                 :                                         dsm_segment **segp,
      34                 :                                         test_shm_mq_header **hdrp,
      35                 :                                         shm_mq **outp, shm_mq **inp);
      36                 : static worker_state *setup_background_workers(int nworkers,
      37                 :                                               dsm_segment *seg);
      38                 : static void cleanup_background_workers(dsm_segment *seg, Datum arg);
      39                 : static void wait_for_workers_to_become_ready(worker_state *wstate,
      40                 :                                              volatile test_shm_mq_header *hdr);
      41                 : static bool check_worker_status(worker_state *wstate);
      42                 : 
      43                 : /*
      44                 :  * Set up a dynamic shared memory segment and zero or more background workers
      45                 :  * for a test run.
      46                 :  */
      47                 : void
      48 CBC           5 : test_shm_mq_setup(int64 queue_size, int32 nworkers, dsm_segment **segp,
      49                 :                   shm_mq_handle **output, shm_mq_handle **input)
      50                 : {
      51                 :     dsm_segment *seg;
      52                 :     test_shm_mq_header *hdr;
      53               5 :     shm_mq     *outq = NULL;    /* placate compiler */
      54               5 :     shm_mq     *inq = NULL;     /* placate compiler */
      55                 :     worker_state *wstate;
      56                 : 
      57                 :     /* Set up a dynamic shared memory segment. */
      58               5 :     setup_dynamic_shared_memory(queue_size, nworkers, &seg, &hdr, &outq, &inq);
      59               5 :     *segp = seg;
      60                 : 
      61                 :     /* Register background workers. */
      62               5 :     wstate = setup_background_workers(nworkers, seg);
      63                 : 
      64                 :     /* Attach the queues. */
      65               5 :     *output = shm_mq_attach(outq, seg, wstate->handle[0]);
      66               5 :     *input = shm_mq_attach(inq, seg, wstate->handle[nworkers - 1]);
      67                 : 
      68                 :     /* Wait for workers to become ready. */
      69               5 :     wait_for_workers_to_become_ready(wstate, hdr);
      70                 : 
      71                 :     /*
      72                 :      * Once we reach this point, all workers are ready.  We no longer need to
      73                 :      * kill them if we die; they'll die on their own as the message queues
      74                 :      * shut down.
      75                 :      */
      76               5 :     cancel_on_dsm_detach(seg, cleanup_background_workers,
      77                 :                          PointerGetDatum(wstate));
      78               5 :     pfree(wstate);
      79               5 : }
      80                 : 
      81                 : /*
      82                 :  * Set up a dynamic shared memory segment.
      83                 :  *
      84                 :  * We set up a small control region that contains only a test_shm_mq_header,
      85                 :  * plus one region per message queue.  There are as many message queues as
      86                 :  * the number of workers, plus one.
      87                 :  */
      88                 : static void
      89               5 : setup_dynamic_shared_memory(int64 queue_size, int nworkers,
      90                 :                             dsm_segment **segp, test_shm_mq_header **hdrp,
      91                 :                             shm_mq **outp, shm_mq **inp)
      92                 : {
      93                 :     shm_toc_estimator e;
      94                 :     int         i;
      95                 :     Size        segsize;
      96                 :     dsm_segment *seg;
      97                 :     shm_toc    *toc;
      98                 :     test_shm_mq_header *hdr;
      99                 : 
     100                 :     /* Ensure a valid queue size. */
     101               5 :     if (queue_size < 0 || ((uint64) queue_size) < shm_mq_minimum_size)
     102 UBC           0 :         ereport(ERROR,
     103                 :                 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
     104                 :                  errmsg("queue size must be at least %zu bytes",
     105                 :                         shm_mq_minimum_size)));
     106                 :     if (queue_size != ((Size) queue_size))
     107                 :         ereport(ERROR,
     108                 :                 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
     109                 :                  errmsg("queue size overflows size_t")));
     110                 : 
     111                 :     /*
     112                 :      * Estimate how much shared memory we need.
     113                 :      *
     114                 :      * Because the TOC machinery may choose to insert padding of oddly-sized
     115                 :      * requests, we must estimate each chunk separately.
     116                 :      *
     117                 :      * We need one key to register the location of the header, and we need
     118                 :      * nworkers + 1 keys to track the locations of the message queues.
     119                 :      */
     120 CBC           5 :     shm_toc_initialize_estimator(&e);
     121               5 :     shm_toc_estimate_chunk(&e, sizeof(test_shm_mq_header));
     122              17 :     for (i = 0; i <= nworkers; ++i)
     123              12 :         shm_toc_estimate_chunk(&e, (Size) queue_size);
     124               5 :     shm_toc_estimate_keys(&e, 2 + nworkers);
     125               5 :     segsize = shm_toc_estimate(&e);
     126                 : 
     127                 :     /* Create the shared memory segment and establish a table of contents. */
     128               5 :     seg = dsm_create(shm_toc_estimate(&e), 0);
     129               5 :     toc = shm_toc_create(PG_TEST_SHM_MQ_MAGIC, dsm_segment_address(seg),
     130                 :                          segsize);
     131                 : 
     132                 :     /* Set up the header region. */
     133               5 :     hdr = shm_toc_allocate(toc, sizeof(test_shm_mq_header));
     134               5 :     SpinLockInit(&hdr->mutex);
     135               5 :     hdr->workers_total = nworkers;
     136               5 :     hdr->workers_attached = 0;
     137               5 :     hdr->workers_ready = 0;
     138               5 :     shm_toc_insert(toc, 0, hdr);
     139                 : 
     140                 :     /* Set up one message queue per worker, plus one. */
     141              17 :     for (i = 0; i <= nworkers; ++i)
     142                 :     {
     143                 :         shm_mq     *mq;
     144                 : 
     145              12 :         mq = shm_mq_create(shm_toc_allocate(toc, (Size) queue_size),
     146                 :                            (Size) queue_size);
     147              12 :         shm_toc_insert(toc, i + 1, mq);
     148                 : 
     149              12 :         if (i == 0)
     150                 :         {
     151                 :             /* We send messages to the first queue. */
     152               5 :             shm_mq_set_sender(mq, MyProc);
     153               5 :             *outp = mq;
     154                 :         }
     155              12 :         if (i == nworkers)
     156                 :         {
     157                 :             /* We receive messages from the last queue. */
     158               5 :             shm_mq_set_receiver(mq, MyProc);
     159               5 :             *inp = mq;
     160                 :         }
     161                 :     }
     162                 : 
     163                 :     /* Return results to caller. */
     164               5 :     *segp = seg;
     165               5 :     *hdrp = hdr;
     166               5 : }
     167                 : 
     168                 : /*
     169                 :  * Register background workers.
     170                 :  */
     171                 : static worker_state *
     172               5 : setup_background_workers(int nworkers, dsm_segment *seg)
     173                 : {
     174                 :     MemoryContext oldcontext;
     175                 :     BackgroundWorker worker;
     176                 :     worker_state *wstate;
     177                 :     int         i;
     178                 : 
     179                 :     /*
     180                 :      * We need the worker_state object and the background worker handles to
     181                 :      * which it points to be allocated in CurTransactionContext rather than
     182                 :      * ExprContext; otherwise, they'll be destroyed before the on_dsm_detach
     183                 :      * hooks run.
     184                 :      */
     185               5 :     oldcontext = MemoryContextSwitchTo(CurTransactionContext);
     186                 : 
     187                 :     /* Create worker state object. */
     188               5 :     wstate = MemoryContextAlloc(TopTransactionContext,
     189               5 :                                 offsetof(worker_state, handle) +
     190                 :                                 sizeof(BackgroundWorkerHandle *) * nworkers);
     191               5 :     wstate->nworkers = 0;
     192                 : 
     193                 :     /*
     194                 :      * Arrange to kill all the workers if we abort before all workers are
     195                 :      * finished hooking themselves up to the dynamic shared memory segment.
     196                 :      *
     197                 :      * If we die after all the workers have finished hooking themselves up to
     198                 :      * the dynamic shared memory segment, we'll mark the two queues to which
     199                 :      * we're directly connected as detached, and the worker(s) connected to
     200                 :      * those queues will exit, marking any other queues to which they are
     201                 :      * connected as detached.  This will cause any as-yet-unaware workers
     202                 :      * connected to those queues to exit in their turn, and so on, until
     203                 :      * everybody exits.
     204                 :      *
     205                 :      * But suppose the workers which are supposed to connect to the queues to
     206                 :      * which we're directly attached exit due to some error before they
     207                 :      * actually attach the queues.  The remaining workers will have no way of
     208                 :      * knowing this.  From their perspective, they're still waiting for those
     209                 :      * workers to start, when in fact they've already died.
     210                 :      */
     211               5 :     on_dsm_detach(seg, cleanup_background_workers,
     212                 :                   PointerGetDatum(wstate));
     213                 : 
     214                 :     /* Configure a worker. */
     215               5 :     memset(&worker, 0, sizeof(worker));
     216               5 :     worker.bgw_flags = BGWORKER_SHMEM_ACCESS;
     217               5 :     worker.bgw_start_time = BgWorkerStart_ConsistentState;
     218               5 :     worker.bgw_restart_time = BGW_NEVER_RESTART;
     219               5 :     sprintf(worker.bgw_library_name, "test_shm_mq");
     220               5 :     sprintf(worker.bgw_function_name, "test_shm_mq_main");
     221               5 :     snprintf(worker.bgw_type, BGW_MAXLEN, "test_shm_mq");
     222               5 :     worker.bgw_main_arg = UInt32GetDatum(dsm_segment_handle(seg));
     223                 :     /* set bgw_notify_pid, so we can detect if the worker stops */
     224               5 :     worker.bgw_notify_pid = MyProcPid;
     225                 : 
     226                 :     /* Register the workers. */
     227              12 :     for (i = 0; i < nworkers; ++i)
     228                 :     {
     229               7 :         if (!RegisterDynamicBackgroundWorker(&worker, &wstate->handle[i]))
     230 UBC           0 :             ereport(ERROR,
     231                 :                     (errcode(ERRCODE_INSUFFICIENT_RESOURCES),
     232                 :                      errmsg("could not register background process"),
     233                 :                      errhint("You may need to increase max_worker_processes.")));
     234 CBC           7 :         ++wstate->nworkers;
     235                 :     }
     236                 : 
     237                 :     /* All done. */
     238               5 :     MemoryContextSwitchTo(oldcontext);
     239               5 :     return wstate;
     240                 : }
     241                 : 
     242                 : static void
     243 UBC           0 : cleanup_background_workers(dsm_segment *seg, Datum arg)
     244                 : {
     245               0 :     worker_state *wstate = (worker_state *) DatumGetPointer(arg);
     246                 : 
     247               0 :     while (wstate->nworkers > 0)
     248                 :     {
     249               0 :         --wstate->nworkers;
     250               0 :         TerminateBackgroundWorker(wstate->handle[wstate->nworkers]);
     251                 :     }
     252               0 : }
     253                 : 
     254                 : static void
     255 CBC           5 : wait_for_workers_to_become_ready(worker_state *wstate,
     256                 :                                  volatile test_shm_mq_header *hdr)
     257                 : {
     258               5 :     bool        result = false;
     259                 : 
     260                 :     for (;;)
     261              18 :     {
     262                 :         int         workers_ready;
     263                 : 
     264                 :         /* If all the workers are ready, we have succeeded. */
     265              23 :         SpinLockAcquire(&hdr->mutex);
     266              23 :         workers_ready = hdr->workers_ready;
     267              23 :         SpinLockRelease(&hdr->mutex);
     268              23 :         if (workers_ready >= wstate->nworkers)
     269                 :         {
     270               5 :             result = true;
     271               5 :             break;
     272                 :         }
     273                 : 
     274                 :         /* If any workers (or the postmaster) have died, we have failed. */
     275              18 :         if (!check_worker_status(wstate))
     276                 :         {
     277 UBC           0 :             result = false;
     278               0 :             break;
     279                 :         }
     280                 : 
     281                 :         /* Wait to be signaled. */
     282 CBC          18 :         (void) WaitLatch(MyLatch, WL_LATCH_SET | WL_EXIT_ON_PM_DEATH, 0,
     283                 :                          PG_WAIT_EXTENSION);
     284                 : 
     285                 :         /* Reset the latch so we don't spin. */
     286              18 :         ResetLatch(MyLatch);
     287                 : 
     288                 :         /* An interrupt may have occurred while we were waiting. */
     289              18 :         CHECK_FOR_INTERRUPTS();
     290                 :     }
     291                 : 
     292               5 :     if (!result)
     293 UBC           0 :         ereport(ERROR,
     294                 :                 (errcode(ERRCODE_INSUFFICIENT_RESOURCES),
     295                 :                  errmsg("one or more background workers failed to start")));
     296 CBC           5 : }
     297                 : 
     298                 : static bool
     299              18 : check_worker_status(worker_state *wstate)
     300                 : {
     301                 :     int         n;
     302                 : 
     303                 :     /* If any workers (or the postmaster) have died, we have failed. */
     304              44 :     for (n = 0; n < wstate->nworkers; ++n)
     305                 :     {
     306                 :         BgwHandleStatus status;
     307                 :         pid_t       pid;
     308                 : 
     309              26 :         status = GetBackgroundWorkerPid(wstate->handle[n], &pid);
     310              26 :         if (status == BGWH_STOPPED || status == BGWH_POSTMASTER_DIED)
     311 UBC           0 :             return false;
     312                 :     }
     313                 : 
     314                 :     /* Otherwise, things still look OK. */
     315 CBC          18 :     return true;
     316                 : }
        

Generated by: LCOV version v1.16-55-g56c0a2a