|  Age         Owner                    Branch data    TLA  Line data    Source code 
                                  1                 :                : /*-------------------------------------------------------------------------
                                  2                 :                :  *
                                  3                 :                :  * basebackup_throttle.c
                                  4                 :                :  *    Basebackup sink implementing throttling. Data is forwarded to the
                                  5                 :                :  *    next base backup sink in the chain at a rate no greater than the
                                  6                 :                :  *    configured maximum.
                                  7                 :                :  *
                                  8                 :                :  * Portions Copyright (c) 2010-2024, PostgreSQL Global Development Group
                                  9                 :                :  *
                                 10                 :                :  * IDENTIFICATION
                                 11                 :                :  *    src/backend/backup/basebackup_throttle.c
                                 12                 :                :  *
                                 13                 :                :  *-------------------------------------------------------------------------
                                 14                 :                :  */
                                 15                 :                : #include "postgres.h"
                                 16                 :                : 
                                 17                 :                : #include "backup/basebackup_sink.h"
                                 18                 :                : #include "miscadmin.h"
                                 19                 :                : #include "pgstat.h"
                                 20                 :                : #include "storage/latch.h"
                                 21                 :                : #include "utils/timestamp.h"
                                 22                 :                : 
                                 23                 :                : typedef struct bbsink_throttle
                                 24                 :                : {
                                 25                 :                :     /* Common information for all types of sink. */
                                 26                 :                :     bbsink      base;
                                 27                 :                : 
                                 28                 :                :     /* The actual number of bytes, transfer of which may cause sleep. */
                                 29                 :                :     uint64      throttling_sample;
                                 30                 :                : 
                                 31                 :                :     /* Amount of data already transferred but not yet throttled.  */
                                 32                 :                :     int64       throttling_counter;
                                 33                 :                : 
                                 34                 :                :     /* The minimum time required to transfer throttling_sample bytes. */
                                 35                 :                :     TimeOffset  elapsed_min_unit;
                                 36                 :                : 
                                 37                 :                :     /* The last check of the transfer rate. */
                                 38                 :                :     TimestampTz throttled_last;
                                 39                 :                : } bbsink_throttle;
                                 40                 :                : 
                                 41                 :                : static void bbsink_throttle_begin_backup(bbsink *sink);
                                 42                 :                : static void bbsink_throttle_archive_contents(bbsink *sink, size_t len);
                                 43                 :                : static void bbsink_throttle_manifest_contents(bbsink *sink, size_t len);
                                 44                 :                : static void throttle(bbsink_throttle *sink, size_t increment);
                                 45                 :                : 
                                 46                 :                : static const bbsink_ops bbsink_throttle_ops = {
                                 47                 :                :     .begin_backup = bbsink_throttle_begin_backup,
                                 48                 :                :     .begin_archive = bbsink_forward_begin_archive,
                                 49                 :                :     .archive_contents = bbsink_throttle_archive_contents,
                                 50                 :                :     .end_archive = bbsink_forward_end_archive,
                                 51                 :                :     .begin_manifest = bbsink_forward_begin_manifest,
                                 52                 :                :     .manifest_contents = bbsink_throttle_manifest_contents,
                                 53                 :                :     .end_manifest = bbsink_forward_end_manifest,
                                 54                 :                :     .end_backup = bbsink_forward_end_backup,
                                 55                 :                :     .cleanup = bbsink_forward_cleanup
                                 56                 :                : };
                                 57                 :                : 
                                 58                 :                : /*
                                 59                 :                :  * How frequently to throttle, as a fraction of the specified rate-second.
                                 60                 :                :  */
                                 61                 :                : #define THROTTLING_FREQUENCY    8
                                 62                 :                : 
                                 63                 :                : /*
                                 64                 :                :  * Create a new basebackup sink that performs throttling and forwards data
                                 65                 :                :  * to a successor sink.
                                 66                 :                :  */
                                 67                 :                : bbsink *
  891 rhaas@postgresql.org       68                 :CBC           1 : bbsink_throttle_new(bbsink *next, uint32 maxrate)
                                 69                 :                : {
                                 70                 :                :     bbsink_throttle *sink;
                                 71                 :                : 
                                 72         [ -  + ]:              1 :     Assert(next != NULL);
                                 73         [ -  + ]:              1 :     Assert(maxrate > 0);
                                 74                 :                : 
                                 75                 :              1 :     sink = palloc0(sizeof(bbsink_throttle));
                                 76                 :              1 :     *((const bbsink_ops **) &sink->base.bbs_ops) = &bbsink_throttle_ops;
                                 77                 :              1 :     sink->base.bbs_next = next;
                                 78                 :                : 
                                 79                 :              1 :     sink->throttling_sample =
                                 80                 :              1 :         (int64) maxrate * (int64) 1024 / THROTTLING_FREQUENCY;
                                 81                 :                : 
                                 82                 :                :     /*
                                 83                 :                :      * The minimum amount of time for throttling_sample bytes to be
                                 84                 :                :      * transferred.
                                 85                 :                :      */
                                 86                 :              1 :     sink->elapsed_min_unit = USECS_PER_SEC / THROTTLING_FREQUENCY;
                                 87                 :                : 
                                 88                 :              1 :     return &sink->base;
                                 89                 :                : }
                                 90                 :                : 
                                 91                 :                : /*
                                 92                 :                :  * There's no real work to do here, but we need to record the current time so
                                 93                 :                :  * that it can be used for future calculations.
                                 94                 :                :  */
                                 95                 :                : static void
                                 96                 :              1 : bbsink_throttle_begin_backup(bbsink *sink)
                                 97                 :                : {
                                 98                 :              1 :     bbsink_throttle *mysink = (bbsink_throttle *) sink;
                                 99                 :                : 
                                100                 :              1 :     bbsink_forward_begin_backup(sink);
                                101                 :                : 
                                102                 :                :     /* The 'real data' starts now (header was ignored). */
                                103                 :              1 :     mysink->throttled_last = GetCurrentTimestamp();
                                104                 :              1 : }
                                105                 :                : 
                                106                 :                : /*
                                107                 :                :  * First throttle, and then pass archive contents to next sink.
                                108                 :                :  */
                                109                 :                : static void
                                110                 :             14 : bbsink_throttle_archive_contents(bbsink *sink, size_t len)
                                111                 :                : {
                                112                 :             14 :     throttle((bbsink_throttle *) sink, len);
                                113                 :                : 
                                114                 :             14 :     bbsink_forward_archive_contents(sink, len);
                                115                 :             14 : }
                                116                 :                : 
                                117                 :                : /*
                                118                 :                :  * First throttle, and then pass manifest contents to next sink.
                                119                 :                :  */
                                120                 :                : static void
  891 rhaas@postgresql.org      121                 :UBC           0 : bbsink_throttle_manifest_contents(bbsink *sink, size_t len)
                                122                 :                : {
                                123                 :              0 :     throttle((bbsink_throttle *) sink, len);
                                124                 :                : 
  881                           125                 :              0 :     bbsink_forward_manifest_contents(sink, len);
  891                           126                 :              0 : }
                                127                 :                : 
                                128                 :                : /*
                                129                 :                :  * Increment the network transfer counter by the given number of bytes,
                                130                 :                :  * and sleep if necessary to comply with the requested network transfer
                                131                 :                :  * rate.
                                132                 :                :  */
                                133                 :                : static void
  891 rhaas@postgresql.org      134                 :CBC          14 : throttle(bbsink_throttle *sink, size_t increment)
                                135                 :                : {
                                136                 :                :     TimeOffset  elapsed_min;
                                137                 :                : 
                                138         [ -  + ]:             14 :     Assert(sink->throttling_counter >= 0);
                                139                 :                : 
                                140                 :             14 :     sink->throttling_counter += increment;
                                141         [ +  + ]:             14 :     if (sink->throttling_counter < sink->throttling_sample)
                                142                 :             10 :         return;
                                143                 :                : 
                                144                 :                :     /* How much time should have elapsed at minimum? */
                                145                 :              4 :     elapsed_min = sink->elapsed_min_unit *
                                146                 :              4 :         (sink->throttling_counter / sink->throttling_sample);
                                147                 :                : 
                                148                 :                :     /*
                                149                 :                :      * Since the latch could be set repeatedly because of concurrently WAL
                                150                 :                :      * activity, sleep in a loop to ensure enough time has passed.
                                151                 :                :      */
                                152                 :                :     for (;;)
  891 rhaas@postgresql.org      153                 :UBC           0 :     {
                                154                 :                :         TimeOffset  elapsed,
                                155                 :                :                     sleep;
                                156                 :                :         int         wait_result;
                                157                 :                : 
                                158                 :                :         /* Time elapsed since the last measurement (and possible wake up). */
  891 rhaas@postgresql.org      159                 :CBC           4 :         elapsed = GetCurrentTimestamp() - sink->throttled_last;
                                160                 :                : 
                                161                 :                :         /* sleep if the transfer is faster than it should be */
                                162                 :              4 :         sleep = elapsed_min - elapsed;
                                163         [ -  + ]:              4 :         if (sleep <= 0)
  891 rhaas@postgresql.org      164                 :UBC           0 :             break;
                                165                 :                : 
  891 rhaas@postgresql.org      166                 :CBC           4 :         ResetLatch(MyLatch);
                                167                 :                : 
                                168                 :                :         /* We're eating a potentially set latch, so check for interrupts */
                                169         [ -  + ]:              4 :         CHECK_FOR_INTERRUPTS();
                                170                 :                : 
                                171                 :                :         /*
                                172                 :                :          * (TAR_SEND_SIZE / throttling_sample * elapsed_min_unit) should be
                                173                 :                :          * the maximum time to sleep. Thus the cast to long is safe.
                                174                 :                :          */
                                175                 :              4 :         wait_result = WaitLatch(MyLatch,
                                176                 :                :                                 WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH,
                                177                 :              4 :                                 (long) (sleep / 1000),
                                178                 :                :                                 WAIT_EVENT_BASE_BACKUP_THROTTLE);
                                179                 :                : 
                                180         [ -  + ]:              4 :         if (wait_result & WL_LATCH_SET)
  891 rhaas@postgresql.org      181         [ #  # ]:UBC           0 :             CHECK_FOR_INTERRUPTS();
                                182                 :                : 
                                183                 :                :         /* Done waiting? */
  891 rhaas@postgresql.org      184         [ +  - ]:CBC           4 :         if (wait_result & WL_TIMEOUT)
                                185                 :              4 :             break;
                                186                 :                :     }
                                187                 :                : 
                                188                 :                :     /*
                                189                 :                :      * As we work with integers, only whole multiple of throttling_sample was
                                190                 :                :      * processed. The rest will be done during the next call of this function.
                                191                 :                :      */
                                192                 :              4 :     sink->throttling_counter %= sink->throttling_sample;
                                193                 :                : 
                                194                 :                :     /*
                                195                 :                :      * Time interval for the remaining amount and possible next increments
                                196                 :                :      * starts now.
                                197                 :                :      */
                                198                 :              4 :     sink->throttled_last = GetCurrentTimestamp();
                                199                 :                : }
         |