Age Owner Branch data TLA Line data Source code
1 : : /*-------------------------------------------------------------------------
2 : : *
3 : : * basebackup_throttle.c
4 : : * Basebackup sink implementing throttling. Data is forwarded to the
5 : : * next base backup sink in the chain at a rate no greater than the
6 : : * configured maximum.
7 : : *
8 : : * Portions Copyright (c) 2010-2024, PostgreSQL Global Development Group
9 : : *
10 : : * IDENTIFICATION
11 : : * src/backend/backup/basebackup_throttle.c
12 : : *
13 : : *-------------------------------------------------------------------------
14 : : */
15 : : #include "postgres.h"
16 : :
17 : : #include "backup/basebackup_sink.h"
18 : : #include "miscadmin.h"
19 : : #include "pgstat.h"
20 : : #include "storage/latch.h"
21 : : #include "utils/timestamp.h"
22 : :
23 : : typedef struct bbsink_throttle
24 : : {
25 : : /* Common information for all types of sink. */
26 : : bbsink base;
27 : :
28 : : /* The actual number of bytes, transfer of which may cause sleep. */
29 : : uint64 throttling_sample;
30 : :
31 : : /* Amount of data already transferred but not yet throttled. */
32 : : int64 throttling_counter;
33 : :
34 : : /* The minimum time required to transfer throttling_sample bytes. */
35 : : TimeOffset elapsed_min_unit;
36 : :
37 : : /* The last check of the transfer rate. */
38 : : TimestampTz throttled_last;
39 : : } bbsink_throttle;
40 : :
41 : : static void bbsink_throttle_begin_backup(bbsink *sink);
42 : : static void bbsink_throttle_archive_contents(bbsink *sink, size_t len);
43 : : static void bbsink_throttle_manifest_contents(bbsink *sink, size_t len);
44 : : static void throttle(bbsink_throttle *sink, size_t increment);
45 : :
46 : : static const bbsink_ops bbsink_throttle_ops = {
47 : : .begin_backup = bbsink_throttle_begin_backup,
48 : : .begin_archive = bbsink_forward_begin_archive,
49 : : .archive_contents = bbsink_throttle_archive_contents,
50 : : .end_archive = bbsink_forward_end_archive,
51 : : .begin_manifest = bbsink_forward_begin_manifest,
52 : : .manifest_contents = bbsink_throttle_manifest_contents,
53 : : .end_manifest = bbsink_forward_end_manifest,
54 : : .end_backup = bbsink_forward_end_backup,
55 : : .cleanup = bbsink_forward_cleanup
56 : : };
57 : :
58 : : /*
59 : : * How frequently to throttle, as a fraction of the specified rate-second.
60 : : */
61 : : #define THROTTLING_FREQUENCY 8
62 : :
63 : : /*
64 : : * Create a new basebackup sink that performs throttling and forwards data
65 : : * to a successor sink.
66 : : */
67 : : bbsink *
891 rhaas@postgresql.org 68 :CBC 1 : bbsink_throttle_new(bbsink *next, uint32 maxrate)
69 : : {
70 : : bbsink_throttle *sink;
71 : :
72 [ - + ]: 1 : Assert(next != NULL);
73 [ - + ]: 1 : Assert(maxrate > 0);
74 : :
75 : 1 : sink = palloc0(sizeof(bbsink_throttle));
76 : 1 : *((const bbsink_ops **) &sink->base.bbs_ops) = &bbsink_throttle_ops;
77 : 1 : sink->base.bbs_next = next;
78 : :
79 : 1 : sink->throttling_sample =
80 : 1 : (int64) maxrate * (int64) 1024 / THROTTLING_FREQUENCY;
81 : :
82 : : /*
83 : : * The minimum amount of time for throttling_sample bytes to be
84 : : * transferred.
85 : : */
86 : 1 : sink->elapsed_min_unit = USECS_PER_SEC / THROTTLING_FREQUENCY;
87 : :
88 : 1 : return &sink->base;
89 : : }
90 : :
91 : : /*
92 : : * There's no real work to do here, but we need to record the current time so
93 : : * that it can be used for future calculations.
94 : : */
95 : : static void
96 : 1 : bbsink_throttle_begin_backup(bbsink *sink)
97 : : {
98 : 1 : bbsink_throttle *mysink = (bbsink_throttle *) sink;
99 : :
100 : 1 : bbsink_forward_begin_backup(sink);
101 : :
102 : : /* The 'real data' starts now (header was ignored). */
103 : 1 : mysink->throttled_last = GetCurrentTimestamp();
104 : 1 : }
105 : :
106 : : /*
107 : : * First throttle, and then pass archive contents to next sink.
108 : : */
109 : : static void
110 : 14 : bbsink_throttle_archive_contents(bbsink *sink, size_t len)
111 : : {
112 : 14 : throttle((bbsink_throttle *) sink, len);
113 : :
114 : 14 : bbsink_forward_archive_contents(sink, len);
115 : 14 : }
116 : :
117 : : /*
118 : : * First throttle, and then pass manifest contents to next sink.
119 : : */
120 : : static void
891 rhaas@postgresql.org 121 :UBC 0 : bbsink_throttle_manifest_contents(bbsink *sink, size_t len)
122 : : {
123 : 0 : throttle((bbsink_throttle *) sink, len);
124 : :
881 125 : 0 : bbsink_forward_manifest_contents(sink, len);
891 126 : 0 : }
127 : :
128 : : /*
129 : : * Increment the network transfer counter by the given number of bytes,
130 : : * and sleep if necessary to comply with the requested network transfer
131 : : * rate.
132 : : */
133 : : static void
891 rhaas@postgresql.org 134 :CBC 14 : throttle(bbsink_throttle *sink, size_t increment)
135 : : {
136 : : TimeOffset elapsed_min;
137 : :
138 [ - + ]: 14 : Assert(sink->throttling_counter >= 0);
139 : :
140 : 14 : sink->throttling_counter += increment;
141 [ + + ]: 14 : if (sink->throttling_counter < sink->throttling_sample)
142 : 10 : return;
143 : :
144 : : /* How much time should have elapsed at minimum? */
145 : 4 : elapsed_min = sink->elapsed_min_unit *
146 : 4 : (sink->throttling_counter / sink->throttling_sample);
147 : :
148 : : /*
149 : : * Since the latch could be set repeatedly because of concurrently WAL
150 : : * activity, sleep in a loop to ensure enough time has passed.
151 : : */
152 : : for (;;)
891 rhaas@postgresql.org 153 :UBC 0 : {
154 : : TimeOffset elapsed,
155 : : sleep;
156 : : int wait_result;
157 : :
158 : : /* Time elapsed since the last measurement (and possible wake up). */
891 rhaas@postgresql.org 159 :CBC 4 : elapsed = GetCurrentTimestamp() - sink->throttled_last;
160 : :
161 : : /* sleep if the transfer is faster than it should be */
162 : 4 : sleep = elapsed_min - elapsed;
163 [ - + ]: 4 : if (sleep <= 0)
891 rhaas@postgresql.org 164 :UBC 0 : break;
165 : :
891 rhaas@postgresql.org 166 :CBC 4 : ResetLatch(MyLatch);
167 : :
168 : : /* We're eating a potentially set latch, so check for interrupts */
169 [ - + ]: 4 : CHECK_FOR_INTERRUPTS();
170 : :
171 : : /*
172 : : * (TAR_SEND_SIZE / throttling_sample * elapsed_min_unit) should be
173 : : * the maximum time to sleep. Thus the cast to long is safe.
174 : : */
175 : 4 : wait_result = WaitLatch(MyLatch,
176 : : WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH,
177 : 4 : (long) (sleep / 1000),
178 : : WAIT_EVENT_BASE_BACKUP_THROTTLE);
179 : :
180 [ - + ]: 4 : if (wait_result & WL_LATCH_SET)
891 rhaas@postgresql.org 181 [ # # ]:UBC 0 : CHECK_FOR_INTERRUPTS();
182 : :
183 : : /* Done waiting? */
891 rhaas@postgresql.org 184 [ + - ]:CBC 4 : if (wait_result & WL_TIMEOUT)
185 : 4 : break;
186 : : }
187 : :
188 : : /*
189 : : * As we work with integers, only whole multiple of throttling_sample was
190 : : * processed. The rest will be done during the next call of this function.
191 : : */
192 : 4 : sink->throttling_counter %= sink->throttling_sample;
193 : :
194 : : /*
195 : : * Time interval for the remaining amount and possible next increments
196 : : * starts now.
197 : : */
198 : 4 : sink->throttled_last = GetCurrentTimestamp();
199 : : }
|