LCOV - differential code coverage report
Current view: top level - src/backend/backup - basebackup_throttle.c (source / functions) Coverage Total Hit UBC CBC
Current: Differential Code Coverage HEAD vs 15 Lines: 88.9 % 45 40 5 40
Current Date: 2023-04-08 15:15:32 Functions: 80.0 % 5 4 1 4
Baseline: 15
Baseline Date: 2023-04-08 15:09:40
Legend: Lines: hit not hit

           TLA  Line data    Source code
       1                 : /*-------------------------------------------------------------------------
       2                 :  *
       3                 :  * basebackup_throttle.c
       4                 :  *    Basebackup sink implementing throttling. Data is forwarded to the
       5                 :  *    next base backup sink in the chain at a rate no greater than the
       6                 :  *    configured maximum.
       7                 :  *
       8                 :  * Portions Copyright (c) 2010-2023, PostgreSQL Global Development Group
       9                 :  *
      10                 :  * IDENTIFICATION
      11                 :  *    src/backend/backup/basebackup_throttle.c
      12                 :  *
      13                 :  *-------------------------------------------------------------------------
      14                 :  */
      15                 : #include "postgres.h"
      16                 : 
      17                 : #include "backup/basebackup_sink.h"
      18                 : #include "miscadmin.h"
      19                 : #include "pgstat.h"
      20                 : #include "storage/latch.h"
      21                 : #include "utils/timestamp.h"
      22                 : 
      23                 : typedef struct bbsink_throttle
      24                 : {
      25                 :     /* Common information for all types of sink. */
      26                 :     bbsink      base;
      27                 : 
      28                 :     /* The actual number of bytes, transfer of which may cause sleep. */
      29                 :     uint64      throttling_sample;
      30                 : 
      31                 :     /* Amount of data already transferred but not yet throttled.  */
      32                 :     int64       throttling_counter;
      33                 : 
      34                 :     /* The minimum time required to transfer throttling_sample bytes. */
      35                 :     TimeOffset  elapsed_min_unit;
      36                 : 
      37                 :     /* The last check of the transfer rate. */
      38                 :     TimestampTz throttled_last;
      39                 : } bbsink_throttle;
      40                 : 
      41                 : static void bbsink_throttle_begin_backup(bbsink *sink);
      42                 : static void bbsink_throttle_archive_contents(bbsink *sink, size_t len);
      43                 : static void bbsink_throttle_manifest_contents(bbsink *sink, size_t len);
      44                 : static void throttle(bbsink_throttle *sink, size_t increment);
      45                 : 
      46                 : static const bbsink_ops bbsink_throttle_ops = {
      47                 :     .begin_backup = bbsink_throttle_begin_backup,
      48                 :     .begin_archive = bbsink_forward_begin_archive,
      49                 :     .archive_contents = bbsink_throttle_archive_contents,
      50                 :     .end_archive = bbsink_forward_end_archive,
      51                 :     .begin_manifest = bbsink_forward_begin_manifest,
      52                 :     .manifest_contents = bbsink_throttle_manifest_contents,
      53                 :     .end_manifest = bbsink_forward_end_manifest,
      54                 :     .end_backup = bbsink_forward_end_backup,
      55                 :     .cleanup = bbsink_forward_cleanup
      56                 : };
      57                 : 
      58                 : /*
      59                 :  * How frequently to throttle, as a fraction of the specified rate-second.
      60                 :  */
      61                 : #define THROTTLING_FREQUENCY    8
      62                 : 
      63                 : /*
      64                 :  * Create a new basebackup sink that performs throttling and forwards data
      65                 :  * to a successor sink.
      66                 :  */
      67                 : bbsink *
      68 CBC           1 : bbsink_throttle_new(bbsink *next, uint32 maxrate)
      69                 : {
      70                 :     bbsink_throttle *sink;
      71                 : 
      72               1 :     Assert(next != NULL);
      73               1 :     Assert(maxrate > 0);
      74                 : 
      75               1 :     sink = palloc0(sizeof(bbsink_throttle));
      76               1 :     *((const bbsink_ops **) &sink->base.bbs_ops) = &bbsink_throttle_ops;
      77               1 :     sink->base.bbs_next = next;
      78                 : 
      79               1 :     sink->throttling_sample =
      80               1 :         (int64) maxrate * (int64) 1024 / THROTTLING_FREQUENCY;
      81                 : 
      82                 :     /*
      83                 :      * The minimum amount of time for throttling_sample bytes to be
      84                 :      * transferred.
      85                 :      */
      86               1 :     sink->elapsed_min_unit = USECS_PER_SEC / THROTTLING_FREQUENCY;
      87                 : 
      88               1 :     return &sink->base;
      89                 : }
      90                 : 
      91                 : /*
      92                 :  * There's no real work to do here, but we need to record the current time so
      93                 :  * that it can be used for future calculations.
      94                 :  */
      95                 : static void
      96               1 : bbsink_throttle_begin_backup(bbsink *sink)
      97                 : {
      98               1 :     bbsink_throttle *mysink = (bbsink_throttle *) sink;
      99                 : 
     100               1 :     bbsink_forward_begin_backup(sink);
     101                 : 
     102                 :     /* The 'real data' starts now (header was ignored). */
     103               1 :     mysink->throttled_last = GetCurrentTimestamp();
     104               1 : }
     105                 : 
     106                 : /*
     107                 :  * First throttle, and then pass archive contents to next sink.
     108                 :  */
     109                 : static void
     110              19 : bbsink_throttle_archive_contents(bbsink *sink, size_t len)
     111                 : {
     112              19 :     throttle((bbsink_throttle *) sink, len);
     113                 : 
     114              19 :     bbsink_forward_archive_contents(sink, len);
     115              19 : }
     116                 : 
     117                 : /*
     118                 :  * First throttle, and then pass manifest contents to next sink.
     119                 :  */
     120                 : static void
     121 UBC           0 : bbsink_throttle_manifest_contents(bbsink *sink, size_t len)
     122                 : {
     123               0 :     throttle((bbsink_throttle *) sink, len);
     124                 : 
     125               0 :     bbsink_forward_manifest_contents(sink, len);
     126               0 : }
     127                 : 
     128                 : /*
     129                 :  * Increment the network transfer counter by the given number of bytes,
     130                 :  * and sleep if necessary to comply with the requested network transfer
     131                 :  * rate.
     132                 :  */
     133                 : static void
     134 CBC          19 : throttle(bbsink_throttle *sink, size_t increment)
     135                 : {
     136                 :     TimeOffset  elapsed_min;
     137                 : 
     138              19 :     Assert(sink->throttling_counter >= 0);
     139                 : 
     140              19 :     sink->throttling_counter += increment;
     141              19 :     if (sink->throttling_counter < sink->throttling_sample)
     142              14 :         return;
     143                 : 
     144                 :     /* How much time should have elapsed at minimum? */
     145               5 :     elapsed_min = sink->elapsed_min_unit *
     146               5 :         (sink->throttling_counter / sink->throttling_sample);
     147                 : 
     148                 :     /*
     149                 :      * Since the latch could be set repeatedly because of concurrently WAL
     150                 :      * activity, sleep in a loop to ensure enough time has passed.
     151                 :      */
     152                 :     for (;;)
     153               3 :     {
     154                 :         TimeOffset  elapsed,
     155                 :                     sleep;
     156                 :         int         wait_result;
     157                 : 
     158                 :         /* Time elapsed since the last measurement (and possible wake up). */
     159               8 :         elapsed = GetCurrentTimestamp() - sink->throttled_last;
     160                 : 
     161                 :         /* sleep if the transfer is faster than it should be */
     162               8 :         sleep = elapsed_min - elapsed;
     163               8 :         if (sleep <= 0)
     164 UBC           0 :             break;
     165                 : 
     166 CBC           8 :         ResetLatch(MyLatch);
     167                 : 
     168                 :         /* We're eating a potentially set latch, so check for interrupts */
     169               8 :         CHECK_FOR_INTERRUPTS();
     170                 : 
     171                 :         /*
     172                 :          * (TAR_SEND_SIZE / throttling_sample * elapsed_min_unit) should be
     173                 :          * the maximum time to sleep. Thus the cast to long is safe.
     174                 :          */
     175               8 :         wait_result = WaitLatch(MyLatch,
     176                 :                                 WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH,
     177               8 :                                 (long) (sleep / 1000),
     178                 :                                 WAIT_EVENT_BASE_BACKUP_THROTTLE);
     179                 : 
     180               8 :         if (wait_result & WL_LATCH_SET)
     181               3 :             CHECK_FOR_INTERRUPTS();
     182                 : 
     183                 :         /* Done waiting? */
     184               8 :         if (wait_result & WL_TIMEOUT)
     185               5 :             break;
     186                 :     }
     187                 : 
     188                 :     /*
     189                 :      * As we work with integers, only whole multiple of throttling_sample was
     190                 :      * processed. The rest will be done during the next call of this function.
     191                 :      */
     192               5 :     sink->throttling_counter %= sink->throttling_sample;
     193                 : 
     194                 :     /*
     195                 :      * Time interval for the remaining amount and possible next increments
     196                 :      * starts now.
     197                 :      */
     198               5 :     sink->throttled_last = GetCurrentTimestamp();
     199                 : }
        

Generated by: LCOV version v1.16-55-g56c0a2a