LCOV - differential code coverage report
Current view: top level - src/backend/backup - basebackup_throttle.c (source / functions) Coverage Total Hit UBC CBC
Current: Differential Code Coverage HEAD vs 15 Lines: 88.9 % 45 40 5 40
Current Date: 2023-04-08 17:13:01 Functions: 80.0 % 5 4 1 4
Baseline: 15 Line coverage date bins:
Baseline Date: 2023-04-08 15:09:40 (240..) days: 88.9 % 45 40 5 40
Legend: Lines: hit not hit Function coverage date bins:
(240..) days: 80.0 % 5 4 1 4

 Age         Owner                  TLA  Line data    Source code
                                  1                 : /*-------------------------------------------------------------------------
                                  2                 :  *
                                  3                 :  * basebackup_throttle.c
                                  4                 :  *    Basebackup sink implementing throttling. Data is forwarded to the
                                  5                 :  *    next base backup sink in the chain at a rate no greater than the
                                  6                 :  *    configured maximum.
                                  7                 :  *
                                  8                 :  * Portions Copyright (c) 2010-2023, PostgreSQL Global Development Group
                                  9                 :  *
                                 10                 :  * IDENTIFICATION
                                 11                 :  *    src/backend/backup/basebackup_throttle.c
                                 12                 :  *
                                 13                 :  *-------------------------------------------------------------------------
                                 14                 :  */
                                 15                 : #include "postgres.h"
                                 16                 : 
                                 17                 : #include "backup/basebackup_sink.h"
                                 18                 : #include "miscadmin.h"
                                 19                 : #include "pgstat.h"
                                 20                 : #include "storage/latch.h"
                                 21                 : #include "utils/timestamp.h"
                                 22                 : 
                                 23                 : typedef struct bbsink_throttle
                                 24                 : {
                                 25                 :     /* Common information for all types of sink. */
                                 26                 :     bbsink      base;
                                 27                 : 
                                 28                 :     /* The actual number of bytes, transfer of which may cause sleep. */
                                 29                 :     uint64      throttling_sample;
                                 30                 : 
                                 31                 :     /* Amount of data already transferred but not yet throttled.  */
                                 32                 :     int64       throttling_counter;
                                 33                 : 
                                 34                 :     /* The minimum time required to transfer throttling_sample bytes. */
                                 35                 :     TimeOffset  elapsed_min_unit;
                                 36                 : 
                                 37                 :     /* The last check of the transfer rate. */
                                 38                 :     TimestampTz throttled_last;
                                 39                 : } bbsink_throttle;
                                 40                 : 
                                 41                 : static void bbsink_throttle_begin_backup(bbsink *sink);
                                 42                 : static void bbsink_throttle_archive_contents(bbsink *sink, size_t len);
                                 43                 : static void bbsink_throttle_manifest_contents(bbsink *sink, size_t len);
                                 44                 : static void throttle(bbsink_throttle *sink, size_t increment);
                                 45                 : 
                                 46                 : static const bbsink_ops bbsink_throttle_ops = {
                                 47                 :     .begin_backup = bbsink_throttle_begin_backup,
                                 48                 :     .begin_archive = bbsink_forward_begin_archive,
                                 49                 :     .archive_contents = bbsink_throttle_archive_contents,
                                 50                 :     .end_archive = bbsink_forward_end_archive,
                                 51                 :     .begin_manifest = bbsink_forward_begin_manifest,
                                 52                 :     .manifest_contents = bbsink_throttle_manifest_contents,
                                 53                 :     .end_manifest = bbsink_forward_end_manifest,
                                 54                 :     .end_backup = bbsink_forward_end_backup,
                                 55                 :     .cleanup = bbsink_forward_cleanup
                                 56                 : };
                                 57                 : 
                                 58                 : /*
                                 59                 :  * How frequently to throttle, as a fraction of the specified rate-second.
                                 60                 :  */
                                 61                 : #define THROTTLING_FREQUENCY    8
                                 62                 : 
                                 63                 : /*
                                 64                 :  * Create a new basebackup sink that performs throttling and forwards data
                                 65                 :  * to a successor sink.
                                 66                 :  */
                                 67                 : bbsink *
  520 rhaas                      68 CBC           1 : bbsink_throttle_new(bbsink *next, uint32 maxrate)
                                 69                 : {
                                 70                 :     bbsink_throttle *sink;
                                 71                 : 
                                 72               1 :     Assert(next != NULL);
                                 73               1 :     Assert(maxrate > 0);
                                 74                 : 
                                 75               1 :     sink = palloc0(sizeof(bbsink_throttle));
                                 76               1 :     *((const bbsink_ops **) &sink->base.bbs_ops) = &bbsink_throttle_ops;
                                 77               1 :     sink->base.bbs_next = next;
                                 78                 : 
                                 79               1 :     sink->throttling_sample =
                                 80               1 :         (int64) maxrate * (int64) 1024 / THROTTLING_FREQUENCY;
                                 81                 : 
                                 82                 :     /*
                                 83                 :      * The minimum amount of time for throttling_sample bytes to be
                                 84                 :      * transferred.
                                 85                 :      */
                                 86               1 :     sink->elapsed_min_unit = USECS_PER_SEC / THROTTLING_FREQUENCY;
                                 87                 : 
                                 88               1 :     return &sink->base;
                                 89                 : }
                                 90                 : 
                                 91                 : /*
                                 92                 :  * There's no real work to do here, but we need to record the current time so
                                 93                 :  * that it can be used for future calculations.
                                 94                 :  */
                                 95                 : static void
                                 96               1 : bbsink_throttle_begin_backup(bbsink *sink)
                                 97                 : {
                                 98               1 :     bbsink_throttle *mysink = (bbsink_throttle *) sink;
                                 99                 : 
                                100               1 :     bbsink_forward_begin_backup(sink);
                                101                 : 
                                102                 :     /* The 'real data' starts now (header was ignored). */
                                103               1 :     mysink->throttled_last = GetCurrentTimestamp();
                                104               1 : }
                                105                 : 
                                106                 : /*
                                107                 :  * First throttle, and then pass archive contents to next sink.
                                108                 :  */
                                109                 : static void
                                110              19 : bbsink_throttle_archive_contents(bbsink *sink, size_t len)
                                111                 : {
                                112              19 :     throttle((bbsink_throttle *) sink, len);
                                113                 : 
                                114              19 :     bbsink_forward_archive_contents(sink, len);
                                115              19 : }
                                116                 : 
                                117                 : /*
                                118                 :  * First throttle, and then pass manifest contents to next sink.
                                119                 :  */
                                120                 : static void
  520 rhaas                     121 UBC           0 : bbsink_throttle_manifest_contents(bbsink *sink, size_t len)
                                122                 : {
                                123               0 :     throttle((bbsink_throttle *) sink, len);
                                124                 : 
  510                           125               0 :     bbsink_forward_manifest_contents(sink, len);
  520                           126               0 : }
                                127                 : 
                                128                 : /*
                                129                 :  * Increment the network transfer counter by the given number of bytes,
                                130                 :  * and sleep if necessary to comply with the requested network transfer
                                131                 :  * rate.
                                132                 :  */
                                133                 : static void
  520 rhaas                     134 CBC          19 : throttle(bbsink_throttle *sink, size_t increment)
                                135                 : {
                                136                 :     TimeOffset  elapsed_min;
                                137                 : 
                                138              19 :     Assert(sink->throttling_counter >= 0);
                                139                 : 
                                140              19 :     sink->throttling_counter += increment;
                                141              19 :     if (sink->throttling_counter < sink->throttling_sample)
                                142              14 :         return;
                                143                 : 
                                144                 :     /* How much time should have elapsed at minimum? */
                                145               5 :     elapsed_min = sink->elapsed_min_unit *
                                146               5 :         (sink->throttling_counter / sink->throttling_sample);
                                147                 : 
                                148                 :     /*
                                149                 :      * Since the latch could be set repeatedly because of concurrently WAL
                                150                 :      * activity, sleep in a loop to ensure enough time has passed.
                                151                 :      */
                                152                 :     for (;;)
                                153               3 :     {
                                154                 :         TimeOffset  elapsed,
                                155                 :                     sleep;
                                156                 :         int         wait_result;
                                157                 : 
                                158                 :         /* Time elapsed since the last measurement (and possible wake up). */
                                159               8 :         elapsed = GetCurrentTimestamp() - sink->throttled_last;
                                160                 : 
                                161                 :         /* sleep if the transfer is faster than it should be */
                                162               8 :         sleep = elapsed_min - elapsed;
                                163               8 :         if (sleep <= 0)
  520 rhaas                     164 UBC           0 :             break;
                                165                 : 
  520 rhaas                     166 CBC           8 :         ResetLatch(MyLatch);
                                167                 : 
                                168                 :         /* We're eating a potentially set latch, so check for interrupts */
                                169               8 :         CHECK_FOR_INTERRUPTS();
                                170                 : 
                                171                 :         /*
                                172                 :          * (TAR_SEND_SIZE / throttling_sample * elapsed_min_unit) should be
                                173                 :          * the maximum time to sleep. Thus the cast to long is safe.
                                174                 :          */
                                175               8 :         wait_result = WaitLatch(MyLatch,
                                176                 :                                 WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH,
                                177               8 :                                 (long) (sleep / 1000),
                                178                 :                                 WAIT_EVENT_BASE_BACKUP_THROTTLE);
                                179                 : 
                                180               8 :         if (wait_result & WL_LATCH_SET)
                                181               3 :             CHECK_FOR_INTERRUPTS();
                                182                 : 
                                183                 :         /* Done waiting? */
                                184               8 :         if (wait_result & WL_TIMEOUT)
                                185               5 :             break;
                                186                 :     }
                                187                 : 
                                188                 :     /*
                                189                 :      * As we work with integers, only whole multiple of throttling_sample was
                                190                 :      * processed. The rest will be done during the next call of this function.
                                191                 :      */
                                192               5 :     sink->throttling_counter %= sink->throttling_sample;
                                193                 : 
                                194                 :     /*
                                195                 :      * Time interval for the remaining amount and possible next increments
                                196                 :      * starts now.
                                197                 :      */
                                198               5 :     sink->throttled_last = GetCurrentTimestamp();
                                199                 : }
        

Generated by: LCOV version v1.16-55-g56c0a2a