Age Owner Branch data TLA Line data Source code
1 : : /*-------------------------------------------------------------------------
2 : : *
3 : : * basebackup_gzip.c
4 : : * Basebackup sink implementing gzip compression.
5 : : *
6 : : * Portions Copyright (c) 2010-2024, PostgreSQL Global Development Group
7 : : *
8 : : * IDENTIFICATION
9 : : * src/backend/backup/basebackup_gzip.c
10 : : *
11 : : *-------------------------------------------------------------------------
12 : : */
13 : : #include "postgres.h"
14 : :
15 : : #ifdef HAVE_LIBZ
16 : : #include <zlib.h>
17 : : #endif
18 : :
19 : : #include "backup/basebackup_sink.h"
20 : :
21 : : #ifdef HAVE_LIBZ
22 : : typedef struct bbsink_gzip
23 : : {
24 : : /* Common information for all types of sink. */
25 : : bbsink base;
26 : :
27 : : /* Compression level. */
28 : : int compresslevel;
29 : :
30 : : /* Compressed data stream. */
31 : : z_stream zstream;
32 : :
33 : : /* Number of bytes staged in output buffer. */
34 : : size_t bytes_written;
35 : : } bbsink_gzip;
36 : :
37 : : static void bbsink_gzip_begin_backup(bbsink *sink);
38 : : static void bbsink_gzip_begin_archive(bbsink *sink, const char *archive_name);
39 : : static void bbsink_gzip_archive_contents(bbsink *sink, size_t len);
40 : : static void bbsink_gzip_manifest_contents(bbsink *sink, size_t len);
41 : : static void bbsink_gzip_end_archive(bbsink *sink);
42 : : static void *gzip_palloc(void *opaque, unsigned items, unsigned size);
43 : : static void gzip_pfree(void *opaque, void *address);
44 : :
45 : : static const bbsink_ops bbsink_gzip_ops = {
46 : : .begin_backup = bbsink_gzip_begin_backup,
47 : : .begin_archive = bbsink_gzip_begin_archive,
48 : : .archive_contents = bbsink_gzip_archive_contents,
49 : : .end_archive = bbsink_gzip_end_archive,
50 : : .begin_manifest = bbsink_forward_begin_manifest,
51 : : .manifest_contents = bbsink_gzip_manifest_contents,
52 : : .end_manifest = bbsink_forward_end_manifest,
53 : : .end_backup = bbsink_forward_end_backup,
54 : : .cleanup = bbsink_forward_cleanup
55 : : };
56 : : #endif
57 : :
58 : : /*
59 : : * Create a new basebackup sink that performs gzip compression.
60 : : */
61 : : bbsink *
733 michael@paquier.xyz 62 :CBC 2 : bbsink_gzip_new(bbsink *next, pg_compress_specification *compress)
63 : : {
64 : : #ifndef HAVE_LIBZ
65 : : ereport(ERROR,
66 : : (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
67 : : errmsg("gzip compression is not supported by this build")));
68 : : return NULL; /* keep compiler quiet */
69 : : #else
70 : : bbsink_gzip *sink;
71 : : int compresslevel;
72 : :
811 rhaas@postgresql.org 73 [ - + ]: 2 : Assert(next != NULL);
74 : :
578 michael@paquier.xyz 75 : 2 : compresslevel = compress->level;
76 [ + + - + : 2 : Assert((compresslevel >= 1 && compresslevel <= 9) ||
- + ]
77 : : compresslevel == Z_DEFAULT_COMPRESSION);
78 : :
811 rhaas@postgresql.org 79 : 2 : sink = palloc0(sizeof(bbsink_gzip));
80 : 2 : *((const bbsink_ops **) &sink->base.bbs_ops) = &bbsink_gzip_ops;
81 : 2 : sink->base.bbs_next = next;
82 : 2 : sink->compresslevel = compresslevel;
83 : :
84 : 2 : return &sink->base;
85 : : #endif
86 : : }
87 : :
88 : : #ifdef HAVE_LIBZ
89 : :
90 : : /*
91 : : * Begin backup.
92 : : */
93 : : static void
94 : 2 : bbsink_gzip_begin_backup(bbsink *sink)
95 : : {
96 : : /*
97 : : * We need our own buffer, because we're going to pass different data to
98 : : * the next sink than what gets passed to us.
99 : : */
100 : 2 : sink->bbs_buffer = palloc(sink->bbs_buffer_length);
101 : :
102 : : /*
103 : : * Since deflate() doesn't require the output buffer to be of any
104 : : * particular size, we can just make it the same size as the input buffer.
105 : : */
106 : 2 : bbsink_begin_backup(sink->bbs_next, sink->bbs_state,
107 : 2 : sink->bbs_buffer_length);
108 : 2 : }
109 : :
110 : : /*
111 : : * Prepare to compress the next archive.
112 : : */
113 : : static void
114 : 2 : bbsink_gzip_begin_archive(bbsink *sink, const char *archive_name)
115 : : {
116 : 2 : bbsink_gzip *mysink = (bbsink_gzip *) sink;
117 : : char *gz_archive_name;
703 tgl@sss.pgh.pa.us 118 : 2 : z_stream *zs = &mysink->zstream;
119 : :
120 : : /* Initialize compressor object. */
811 rhaas@postgresql.org 121 : 2 : memset(zs, 0, sizeof(z_stream));
122 : 2 : zs->zalloc = gzip_palloc;
123 : 2 : zs->zfree = gzip_pfree;
124 : 2 : zs->next_out = (uint8 *) sink->bbs_next->bbs_buffer;
125 : 2 : zs->avail_out = sink->bbs_next->bbs_buffer_length;
126 : :
127 : : /*
128 : : * We need to use deflateInit2() rather than deflateInit() here so that we
129 : : * can request a gzip header rather than a zlib header. Otherwise, we want
130 : : * to supply the same values that would have been used by default if we
131 : : * had just called deflateInit().
132 : : *
133 : : * Per the documentation for deflateInit2, the third argument must be
134 : : * Z_DEFLATED; the fourth argument is the number of "window bits", by
135 : : * default 15, but adding 16 gets you a gzip header rather than a zlib
136 : : * header; the fifth argument controls memory usage, and 8 is the default;
137 : : * and likewise Z_DEFAULT_STRATEGY is the default for the sixth argument.
138 : : */
139 [ - + ]: 2 : if (deflateInit2(zs, mysink->compresslevel, Z_DEFLATED, 15 + 16, 8,
140 : : Z_DEFAULT_STRATEGY) != Z_OK)
811 rhaas@postgresql.org 141 [ # # ]:UBC 0 : ereport(ERROR,
142 : : errcode(ERRCODE_INTERNAL_ERROR),
143 : : errmsg("could not initialize compression library"));
144 : :
145 : : /*
146 : : * Add ".gz" to the archive name. Note that the pg_basebackup -z produces
147 : : * archives named ".tar.gz" rather than ".tgz", so we match that here.
148 : : */
811 rhaas@postgresql.org 149 :CBC 2 : gz_archive_name = psprintf("%s.gz", archive_name);
150 [ - + ]: 2 : Assert(sink->bbs_next != NULL);
151 : 2 : bbsink_begin_archive(sink->bbs_next, gz_archive_name);
152 : 2 : pfree(gz_archive_name);
153 : 2 : }
154 : :
155 : : /*
156 : : * Compress the input data to the output buffer until we run out of input
157 : : * data. Each time the output buffer fills up, invoke the archive_contents()
158 : : * method for then next sink.
159 : : *
160 : : * Note that since we're compressing the input, it may very commonly happen
161 : : * that we consume all the input data without filling the output buffer. In
162 : : * that case, the compressed representation of the current input data won't
163 : : * actually be sent to the next bbsink until a later call to this function,
164 : : * or perhaps even not until bbsink_gzip_end_archive() is invoked.
165 : : */
166 : : static void
167 : 5395 : bbsink_gzip_archive_contents(bbsink *sink, size_t len)
168 : : {
169 : 5395 : bbsink_gzip *mysink = (bbsink_gzip *) sink;
703 tgl@sss.pgh.pa.us 170 : 5395 : z_stream *zs = &mysink->zstream;
171 : :
172 : : /* Compress data from input buffer. */
811 rhaas@postgresql.org 173 : 5395 : zs->next_in = (uint8 *) mysink->base.bbs_buffer;
174 : 5395 : zs->avail_in = len;
175 : :
176 [ + + ]: 16266 : while (zs->avail_in > 0)
177 : : {
178 : : int res;
179 : :
180 : : /* Write output data into unused portion of output buffer. */
181 [ - + ]: 5476 : Assert(mysink->bytes_written < mysink->base.bbs_next->bbs_buffer_length);
182 : 5476 : zs->next_out = (uint8 *)
183 : 5476 : mysink->base.bbs_next->bbs_buffer + mysink->bytes_written;
184 : 5476 : zs->avail_out =
185 : 5476 : mysink->base.bbs_next->bbs_buffer_length - mysink->bytes_written;
186 : :
187 : : /*
188 : : * Try to compress. Note that this will update zs->next_in and
189 : : * zs->avail_in according to how much input data was consumed, and
190 : : * zs->next_out and zs->avail_out according to how many output bytes
191 : : * were produced.
192 : : *
193 : : * According to the zlib documentation, Z_STREAM_ERROR should only
194 : : * occur if we've made a programming error, or if say there's been a
195 : : * memory clobber; we use elog() rather than Assert() here out of an
196 : : * abundance of caution.
197 : : */
198 : 5476 : res = deflate(zs, Z_NO_FLUSH);
199 [ - + ]: 5476 : if (res == Z_STREAM_ERROR)
811 rhaas@postgresql.org 200 [ # # ]:UBC 0 : elog(ERROR, "could not compress data: %s", zs->msg);
201 : :
202 : : /* Update our notion of how many bytes we've written. */
811 rhaas@postgresql.org 203 :CBC 5476 : mysink->bytes_written =
204 : 5476 : mysink->base.bbs_next->bbs_buffer_length - zs->avail_out;
205 : :
206 : : /*
207 : : * If the output buffer is full, it's time for the next sink to
208 : : * process the contents.
209 : : */
210 [ + + ]: 5476 : if (mysink->bytes_written >= mysink->base.bbs_next->bbs_buffer_length)
211 : : {
212 : 190 : bbsink_archive_contents(sink->bbs_next, mysink->bytes_written);
213 : 190 : mysink->bytes_written = 0;
214 : : }
215 : : }
216 : 5395 : }
217 : :
218 : : /*
219 : : * There might be some data inside zlib's internal buffers; we need to get
220 : : * that flushed out and forwarded to the successor sink as archive content.
221 : : *
222 : : * Then we can end processing for this archive.
223 : : */
224 : : static void
225 : 2 : bbsink_gzip_end_archive(bbsink *sink)
226 : : {
227 : 2 : bbsink_gzip *mysink = (bbsink_gzip *) sink;
703 tgl@sss.pgh.pa.us 228 : 2 : z_stream *zs = &mysink->zstream;
229 : :
230 : : /* There is no more data available. */
811 rhaas@postgresql.org 231 : 2 : zs->next_in = (uint8 *) mysink->base.bbs_buffer;
232 : 2 : zs->avail_in = 0;
233 : :
234 : : while (1)
235 : 3 : {
236 : : int res;
237 : :
238 : : /* Write output data into unused portion of output buffer. */
239 [ - + ]: 5 : Assert(mysink->bytes_written < mysink->base.bbs_next->bbs_buffer_length);
240 : 5 : zs->next_out = (uint8 *)
241 : 5 : mysink->base.bbs_next->bbs_buffer + mysink->bytes_written;
242 : 5 : zs->avail_out =
243 : 5 : mysink->base.bbs_next->bbs_buffer_length - mysink->bytes_written;
244 : :
245 : : /*
246 : : * As bbsink_gzip_archive_contents, but pass Z_FINISH since there is
247 : : * no more input.
248 : : */
249 : 5 : res = deflate(zs, Z_FINISH);
250 [ - + ]: 5 : if (res == Z_STREAM_ERROR)
811 rhaas@postgresql.org 251 [ # # ]:UBC 0 : elog(ERROR, "could not compress data: %s", zs->msg);
252 : :
253 : : /* Update our notion of how many bytes we've written. */
811 rhaas@postgresql.org 254 :CBC 5 : mysink->bytes_written =
255 : 5 : mysink->base.bbs_next->bbs_buffer_length - zs->avail_out;
256 : :
257 : : /*
258 : : * Apparently we had no data in the output buffer and deflate() was
259 : : * not able to add any. We must be done.
260 : : */
261 [ + + ]: 5 : if (mysink->bytes_written == 0)
262 : 2 : break;
263 : :
264 : : /* Send whatever accumulated output bytes we have. */
265 : 3 : bbsink_archive_contents(sink->bbs_next, mysink->bytes_written);
266 : 3 : mysink->bytes_written = 0;
267 : : }
268 : :
269 : : /* Must also pass on the information that this archive has ended. */
270 : 2 : bbsink_forward_end_archive(sink);
271 : 2 : }
272 : :
273 : : /*
274 : : * Manifest contents are not compressed, but we do need to copy them into
275 : : * the successor sink's buffer, because we have our own.
276 : : */
277 : : static void
278 : 10 : bbsink_gzip_manifest_contents(bbsink *sink, size_t len)
279 : : {
280 : 10 : memcpy(sink->bbs_next->bbs_buffer, sink->bbs_buffer, len);
281 : 10 : bbsink_manifest_contents(sink->bbs_next, len);
282 : 10 : }
283 : :
284 : : /*
285 : : * Wrapper function to adjust the signature of palloc to match what libz
286 : : * expects.
287 : : */
288 : : static void *
289 : 10 : gzip_palloc(void *opaque, unsigned items, unsigned size)
290 : : {
291 : 10 : return palloc(items * size);
292 : : }
293 : :
294 : : /*
295 : : * Wrapper function to adjust the signature of pfree to match what libz
296 : : * expects.
297 : : */
298 : : static void
811 rhaas@postgresql.org 299 :UBC 0 : gzip_pfree(void *opaque, void *address)
300 : : {
301 : 0 : pfree(address);
302 : 0 : }
303 : :
304 : : #endif
|