Age Owner TLA Line data Source code
1 : /*-------------------------------------------------------------------------
2 : *
3 : * basebackup_throttle.c
4 : * Basebackup sink implementing throttling. Data is forwarded to the
5 : * next base backup sink in the chain at a rate no greater than the
6 : * configured maximum.
7 : *
8 : * Portions Copyright (c) 2010-2023, PostgreSQL Global Development Group
9 : *
10 : * IDENTIFICATION
11 : * src/backend/backup/basebackup_throttle.c
12 : *
13 : *-------------------------------------------------------------------------
14 : */
15 : #include "postgres.h"
16 :
17 : #include "backup/basebackup_sink.h"
18 : #include "miscadmin.h"
19 : #include "pgstat.h"
20 : #include "storage/latch.h"
21 : #include "utils/timestamp.h"
22 :
23 : typedef struct bbsink_throttle
24 : {
25 : /* Common information for all types of sink. */
26 : bbsink base;
27 :
28 : /* The actual number of bytes, transfer of which may cause sleep. */
29 : uint64 throttling_sample;
30 :
31 : /* Amount of data already transferred but not yet throttled. */
32 : int64 throttling_counter;
33 :
34 : /* The minimum time required to transfer throttling_sample bytes. */
35 : TimeOffset elapsed_min_unit;
36 :
37 : /* The last check of the transfer rate. */
38 : TimestampTz throttled_last;
39 : } bbsink_throttle;
40 :
41 : static void bbsink_throttle_begin_backup(bbsink *sink);
42 : static void bbsink_throttle_archive_contents(bbsink *sink, size_t len);
43 : static void bbsink_throttle_manifest_contents(bbsink *sink, size_t len);
44 : static void throttle(bbsink_throttle *sink, size_t increment);
45 :
46 : static const bbsink_ops bbsink_throttle_ops = {
47 : .begin_backup = bbsink_throttle_begin_backup,
48 : .begin_archive = bbsink_forward_begin_archive,
49 : .archive_contents = bbsink_throttle_archive_contents,
50 : .end_archive = bbsink_forward_end_archive,
51 : .begin_manifest = bbsink_forward_begin_manifest,
52 : .manifest_contents = bbsink_throttle_manifest_contents,
53 : .end_manifest = bbsink_forward_end_manifest,
54 : .end_backup = bbsink_forward_end_backup,
55 : .cleanup = bbsink_forward_cleanup
56 : };
57 :
58 : /*
59 : * How frequently to throttle, as a fraction of the specified rate-second.
60 : */
61 : #define THROTTLING_FREQUENCY 8
62 :
63 : /*
64 : * Create a new basebackup sink that performs throttling and forwards data
65 : * to a successor sink.
66 : */
67 : bbsink *
520 rhaas 68 CBC 1 : bbsink_throttle_new(bbsink *next, uint32 maxrate)
69 : {
70 : bbsink_throttle *sink;
71 :
72 1 : Assert(next != NULL);
73 1 : Assert(maxrate > 0);
74 :
75 1 : sink = palloc0(sizeof(bbsink_throttle));
76 1 : *((const bbsink_ops **) &sink->base.bbs_ops) = &bbsink_throttle_ops;
77 1 : sink->base.bbs_next = next;
78 :
79 1 : sink->throttling_sample =
80 1 : (int64) maxrate * (int64) 1024 / THROTTLING_FREQUENCY;
81 :
82 : /*
83 : * The minimum amount of time for throttling_sample bytes to be
84 : * transferred.
85 : */
86 1 : sink->elapsed_min_unit = USECS_PER_SEC / THROTTLING_FREQUENCY;
87 :
88 1 : return &sink->base;
89 : }
90 :
91 : /*
92 : * There's no real work to do here, but we need to record the current time so
93 : * that it can be used for future calculations.
94 : */
95 : static void
96 1 : bbsink_throttle_begin_backup(bbsink *sink)
97 : {
98 1 : bbsink_throttle *mysink = (bbsink_throttle *) sink;
99 :
100 1 : bbsink_forward_begin_backup(sink);
101 :
102 : /* The 'real data' starts now (header was ignored). */
103 1 : mysink->throttled_last = GetCurrentTimestamp();
104 1 : }
105 :
106 : /*
107 : * First throttle, and then pass archive contents to next sink.
108 : */
109 : static void
110 19 : bbsink_throttle_archive_contents(bbsink *sink, size_t len)
111 : {
112 19 : throttle((bbsink_throttle *) sink, len);
113 :
114 19 : bbsink_forward_archive_contents(sink, len);
115 19 : }
116 :
117 : /*
118 : * First throttle, and then pass manifest contents to next sink.
119 : */
120 : static void
520 rhaas 121 UBC 0 : bbsink_throttle_manifest_contents(bbsink *sink, size_t len)
122 : {
123 0 : throttle((bbsink_throttle *) sink, len);
124 :
510 125 0 : bbsink_forward_manifest_contents(sink, len);
520 126 0 : }
127 :
128 : /*
129 : * Increment the network transfer counter by the given number of bytes,
130 : * and sleep if necessary to comply with the requested network transfer
131 : * rate.
132 : */
133 : static void
520 rhaas 134 CBC 19 : throttle(bbsink_throttle *sink, size_t increment)
135 : {
136 : TimeOffset elapsed_min;
137 :
138 19 : Assert(sink->throttling_counter >= 0);
139 :
140 19 : sink->throttling_counter += increment;
141 19 : if (sink->throttling_counter < sink->throttling_sample)
142 14 : return;
143 :
144 : /* How much time should have elapsed at minimum? */
145 5 : elapsed_min = sink->elapsed_min_unit *
146 5 : (sink->throttling_counter / sink->throttling_sample);
147 :
148 : /*
149 : * Since the latch could be set repeatedly because of concurrently WAL
150 : * activity, sleep in a loop to ensure enough time has passed.
151 : */
152 : for (;;)
153 3 : {
154 : TimeOffset elapsed,
155 : sleep;
156 : int wait_result;
157 :
158 : /* Time elapsed since the last measurement (and possible wake up). */
159 8 : elapsed = GetCurrentTimestamp() - sink->throttled_last;
160 :
161 : /* sleep if the transfer is faster than it should be */
162 8 : sleep = elapsed_min - elapsed;
163 8 : if (sleep <= 0)
520 rhaas 164 UBC 0 : break;
165 :
520 rhaas 166 CBC 8 : ResetLatch(MyLatch);
167 :
168 : /* We're eating a potentially set latch, so check for interrupts */
169 8 : CHECK_FOR_INTERRUPTS();
170 :
171 : /*
172 : * (TAR_SEND_SIZE / throttling_sample * elapsed_min_unit) should be
173 : * the maximum time to sleep. Thus the cast to long is safe.
174 : */
175 8 : wait_result = WaitLatch(MyLatch,
176 : WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH,
177 8 : (long) (sleep / 1000),
178 : WAIT_EVENT_BASE_BACKUP_THROTTLE);
179 :
180 8 : if (wait_result & WL_LATCH_SET)
181 3 : CHECK_FOR_INTERRUPTS();
182 :
183 : /* Done waiting? */
184 8 : if (wait_result & WL_TIMEOUT)
185 5 : break;
186 : }
187 :
188 : /*
189 : * As we work with integers, only whole multiple of throttling_sample was
190 : * processed. The rest will be done during the next call of this function.
191 : */
192 5 : sink->throttling_counter %= sink->throttling_sample;
193 :
194 : /*
195 : * Time interval for the remaining amount and possible next increments
196 : * starts now.
197 : */
198 5 : sink->throttled_last = GetCurrentTimestamp();
199 : }
|