Age Owner TLA Line data Source code
1 : /*-------------------------------------------------------------------------
2 : *
3 : * bbstreamer_gzip.c
4 : *
5 : * Portions Copyright (c) 1996-2023, PostgreSQL Global Development Group
6 : *
7 : * IDENTIFICATION
8 : * src/bin/pg_basebackup/bbstreamer_gzip.c
9 : *-------------------------------------------------------------------------
10 : */
11 :
12 : #include "postgres_fe.h"
13 :
14 : #include <unistd.h>
15 :
16 : #ifdef HAVE_LIBZ
17 : #include <zlib.h>
18 : #endif
19 :
20 : #include "bbstreamer.h"
21 : #include "common/logging.h"
22 : #include "common/file_perm.h"
23 : #include "common/string.h"
24 :
25 : #ifdef HAVE_LIBZ
26 : typedef struct bbstreamer_gzip_writer
27 : {
28 : bbstreamer base;
29 : char *pathname;
30 : gzFile gzfile;
31 : } bbstreamer_gzip_writer;
32 :
33 : typedef struct bbstreamer_gzip_decompressor
34 : {
35 : bbstreamer base;
36 : z_stream zstream;
37 : size_t bytes_written;
38 : } bbstreamer_gzip_decompressor;
39 :
40 : static void bbstreamer_gzip_writer_content(bbstreamer *streamer,
41 : bbstreamer_member *member,
42 : const char *data, int len,
43 : bbstreamer_archive_context context);
44 : static void bbstreamer_gzip_writer_finalize(bbstreamer *streamer);
45 : static void bbstreamer_gzip_writer_free(bbstreamer *streamer);
46 : static const char *get_gz_error(gzFile gzf);
47 :
48 : const bbstreamer_ops bbstreamer_gzip_writer_ops = {
49 : .content = bbstreamer_gzip_writer_content,
50 : .finalize = bbstreamer_gzip_writer_finalize,
51 : .free = bbstreamer_gzip_writer_free
52 : };
53 :
54 : static void bbstreamer_gzip_decompressor_content(bbstreamer *streamer,
55 : bbstreamer_member *member,
56 : const char *data, int len,
57 : bbstreamer_archive_context context);
58 : static void bbstreamer_gzip_decompressor_finalize(bbstreamer *streamer);
59 : static void bbstreamer_gzip_decompressor_free(bbstreamer *streamer);
60 : static void *gzip_palloc(void *opaque, unsigned items, unsigned size);
61 : static void gzip_pfree(void *opaque, void *address);
62 :
63 : const bbstreamer_ops bbstreamer_gzip_decompressor_ops = {
64 : .content = bbstreamer_gzip_decompressor_content,
65 : .finalize = bbstreamer_gzip_decompressor_finalize,
66 : .free = bbstreamer_gzip_decompressor_free
67 : };
68 : #endif
69 :
70 : /*
71 : * Create a bbstreamer that just compresses data using gzip, and then writes
72 : * it to a file.
73 : *
74 : * As in the case of bbstreamer_plain_writer_new, pathname is always used
75 : * for error reporting purposes; if file is NULL, it is also the opened and
76 : * closed so that the data may be written there.
77 : */
78 : bbstreamer *
382 rhaas 79 CBC 4 : bbstreamer_gzip_writer_new(char *pathname, FILE *file,
80 : pg_compress_specification *compress)
81 : {
82 : #ifdef HAVE_LIBZ
83 : bbstreamer_gzip_writer *streamer;
84 :
436 85 4 : streamer = palloc0(sizeof(bbstreamer_gzip_writer));
86 4 : *((const bbstreamer_ops **) &streamer->base.bbs_ops) =
87 : &bbstreamer_gzip_writer_ops;
88 :
89 4 : streamer->pathname = pstrdup(pathname);
90 :
91 4 : if (file == NULL)
92 : {
93 4 : streamer->gzfile = gzopen(pathname, "wb");
94 4 : if (streamer->gzfile == NULL)
366 tgl 95 UBC 0 : pg_fatal("could not create compressed file \"%s\": %m",
96 : pathname);
97 : }
98 : else
99 : {
436 rhaas 100 0 : int fd = dup(fileno(file));
101 :
102 0 : if (fd < 0)
366 tgl 103 0 : pg_fatal("could not duplicate stdout: %m");
104 :
436 rhaas 105 0 : streamer->gzfile = gzdopen(fd, "wb");
106 0 : if (streamer->gzfile == NULL)
366 tgl 107 0 : pg_fatal("could not open output file: %m");
108 : }
109 :
207 michael 110 CBC 4 : if (gzsetparams(streamer->gzfile, compress->level, Z_DEFAULT_STRATEGY) != Z_OK)
366 tgl 111 UBC 0 : pg_fatal("could not set compression level %d: %s",
112 : compress->level, get_gz_error(streamer->gzfile));
113 :
436 rhaas 114 CBC 4 : return &streamer->base;
115 : #else
116 : pg_fatal("this build does not support compression with %s", "gzip");
117 : return NULL; /* keep compiler quiet */
118 : #endif
119 : }
120 :
121 : #ifdef HAVE_LIBZ
122 : /*
123 : * Write archive content to gzip file.
124 : */
125 : static void
126 9282 : bbstreamer_gzip_writer_content(bbstreamer *streamer,
127 : bbstreamer_member *member, const char *data,
128 : int len, bbstreamer_archive_context context)
129 : {
130 : bbstreamer_gzip_writer *mystreamer;
131 :
132 9282 : mystreamer = (bbstreamer_gzip_writer *) streamer;
133 :
134 9282 : if (len == 0)
436 rhaas 135 UBC 0 : return;
136 :
436 rhaas 137 CBC 9282 : errno = 0;
138 9282 : if (gzwrite(mystreamer->gzfile, data, len) != len)
139 : {
140 : /* if write didn't set errno, assume problem is no disk space */
436 rhaas 141 UBC 0 : if (errno == 0)
142 0 : errno = ENOSPC;
366 tgl 143 0 : pg_fatal("could not write to compressed file \"%s\": %s",
144 : mystreamer->pathname, get_gz_error(mystreamer->gzfile));
145 : }
146 : }
147 :
148 : /*
149 : * End-of-archive processing when writing to a gzip file consists of just
150 : * calling gzclose.
151 : *
152 : * It makes no difference whether we opened the file or the caller did it,
153 : * because libz provides no way of avoiding a close on the underling file
154 : * handle. Notice, however, that bbstreamer_gzip_writer_new() uses dup() to
155 : * work around this issue, so that the behavior from the caller's viewpoint
156 : * is the same as for bbstreamer_plain_writer.
157 : */
158 : static void
436 rhaas 159 CBC 4 : bbstreamer_gzip_writer_finalize(bbstreamer *streamer)
160 : {
161 : bbstreamer_gzip_writer *mystreamer;
162 :
163 4 : mystreamer = (bbstreamer_gzip_writer *) streamer;
164 :
165 4 : errno = 0; /* in case gzclose() doesn't set it */
166 4 : if (gzclose(mystreamer->gzfile) != 0)
366 tgl 167 UBC 0 : pg_fatal("could not close compressed file \"%s\": %m",
168 : mystreamer->pathname);
169 :
436 rhaas 170 CBC 4 : mystreamer->gzfile = NULL;
171 4 : }
172 :
173 : /*
174 : * Free memory associated with this bbstreamer.
175 : */
176 : static void
177 4 : bbstreamer_gzip_writer_free(bbstreamer *streamer)
178 : {
179 : bbstreamer_gzip_writer *mystreamer;
180 :
181 4 : mystreamer = (bbstreamer_gzip_writer *) streamer;
182 :
183 4 : Assert(mystreamer->base.bbs_next == NULL);
184 4 : Assert(mystreamer->gzfile == NULL);
185 :
186 4 : pfree(mystreamer->pathname);
187 4 : pfree(mystreamer);
188 4 : }
189 :
190 : /*
191 : * Helper function for libz error reporting.
192 : */
193 : static const char *
436 rhaas 194 UBC 0 : get_gz_error(gzFile gzf)
195 : {
196 : int errnum;
197 : const char *errmsg;
198 :
199 0 : errmsg = gzerror(gzf, &errnum);
200 0 : if (errnum == Z_ERRNO)
201 0 : return strerror(errno);
202 : else
203 0 : return errmsg;
204 : }
205 : #endif
206 :
207 : /*
208 : * Create a new base backup streamer that performs decompression of gzip
209 : * compressed blocks.
210 : */
211 : bbstreamer *
436 rhaas 212 CBC 1 : bbstreamer_gzip_decompressor_new(bbstreamer *next)
213 : {
214 : #ifdef HAVE_LIBZ
215 : bbstreamer_gzip_decompressor *streamer;
216 : z_stream *zs;
217 :
218 1 : Assert(next != NULL);
219 :
220 1 : streamer = palloc0(sizeof(bbstreamer_gzip_decompressor));
221 1 : *((const bbstreamer_ops **) &streamer->base.bbs_ops) =
222 : &bbstreamer_gzip_decompressor_ops;
223 :
224 1 : streamer->base.bbs_next = next;
225 1 : initStringInfo(&streamer->base.bbs_buffer);
226 :
227 : /* Initialize internal stream state for decompression */
228 1 : zs = &streamer->zstream;
229 1 : zs->zalloc = gzip_palloc;
230 1 : zs->zfree = gzip_pfree;
231 1 : zs->next_out = (uint8 *) streamer->base.bbs_buffer.data;
232 1 : zs->avail_out = streamer->base.bbs_buffer.maxlen;
233 :
234 : /*
235 : * Data compression was initialized using deflateInit2 to request a gzip
236 : * header. Similarly, we are using inflateInit2 to initialize data
237 : * decompression.
238 : *
239 : * Per the documentation for inflateInit2, the second argument is
240 : * "windowBits" and its value must be greater than or equal to the value
241 : * provided while compressing the data, so we are using the maximum
242 : * possible value for safety.
243 : */
244 1 : if (inflateInit2(zs, 15 + 16) != Z_OK)
366 tgl 245 UBC 0 : pg_fatal("could not initialize compression library");
246 :
436 rhaas 247 CBC 1 : return &streamer->base;
248 : #else
249 : pg_fatal("this build does not support compression with %s", "gzip");
250 : return NULL; /* keep compiler quiet */
251 : #endif
252 : }
253 :
254 : #ifdef HAVE_LIBZ
255 : /*
256 : * Decompress the input data to output buffer until we run out of input
257 : * data. Each time the output buffer is full, pass on the decompressed data
258 : * to the next streamer.
259 : */
260 : static void
261 100 : bbstreamer_gzip_decompressor_content(bbstreamer *streamer,
262 : bbstreamer_member *member,
263 : const char *data, int len,
264 : bbstreamer_archive_context context)
265 : {
266 : bbstreamer_gzip_decompressor *mystreamer;
267 : z_stream *zs;
268 :
269 100 : mystreamer = (bbstreamer_gzip_decompressor *) streamer;
270 :
271 100 : zs = &mystreamer->zstream;
272 100 : zs->next_in = (uint8 *) data;
273 100 : zs->avail_in = len;
274 :
275 : /* Process the current chunk */
276 40250 : while (zs->avail_in > 0)
277 : {
278 : int res;
279 :
280 40050 : Assert(mystreamer->bytes_written < mystreamer->base.bbs_buffer.maxlen);
281 :
282 40050 : zs->next_out = (uint8 *)
283 40050 : mystreamer->base.bbs_buffer.data + mystreamer->bytes_written;
284 40050 : zs->avail_out =
285 40050 : mystreamer->base.bbs_buffer.maxlen - mystreamer->bytes_written;
286 :
287 : /*
288 : * This call decompresses data starting at zs->next_in and updates
289 : * zs->next_in * and zs->avail_in. It generates output data starting
290 : * at zs->next_out and updates zs->next_out and zs->avail_out
291 : * accordingly.
292 : */
293 40050 : res = inflate(zs, Z_NO_FLUSH);
294 :
295 40050 : if (res == Z_STREAM_ERROR)
436 rhaas 296 UBC 0 : pg_log_error("could not decompress data: %s", zs->msg);
297 :
436 rhaas 298 CBC 40050 : mystreamer->bytes_written =
299 40050 : mystreamer->base.bbs_buffer.maxlen - zs->avail_out;
300 :
301 : /* If output buffer is full then pass data to next streamer */
302 40050 : if (mystreamer->bytes_written >= mystreamer->base.bbs_buffer.maxlen)
303 : {
304 39951 : bbstreamer_content(mystreamer->base.bbs_next, member,
305 39951 : mystreamer->base.bbs_buffer.data,
306 : mystreamer->base.bbs_buffer.maxlen, context);
307 39951 : mystreamer->bytes_written = 0;
308 : }
309 : }
310 100 : }
311 :
312 : /*
313 : * End-of-stream processing.
314 : */
315 : static void
316 1 : bbstreamer_gzip_decompressor_finalize(bbstreamer *streamer)
317 : {
318 : bbstreamer_gzip_decompressor *mystreamer;
319 :
320 1 : mystreamer = (bbstreamer_gzip_decompressor *) streamer;
321 :
322 : /*
323 : * End of the stream, if there is some pending data in output buffers then
324 : * we must forward it to next streamer.
325 : */
326 1 : bbstreamer_content(mystreamer->base.bbs_next, NULL,
327 1 : mystreamer->base.bbs_buffer.data,
328 : mystreamer->base.bbs_buffer.maxlen,
329 : BBSTREAMER_UNKNOWN);
330 :
331 1 : bbstreamer_finalize(mystreamer->base.bbs_next);
332 1 : }
333 :
334 : /*
335 : * Free memory.
336 : */
337 : static void
338 1 : bbstreamer_gzip_decompressor_free(bbstreamer *streamer)
339 : {
340 1 : bbstreamer_free(streamer->bbs_next);
341 1 : pfree(streamer->bbs_buffer.data);
342 1 : pfree(streamer);
343 1 : }
344 :
345 : /*
346 : * Wrapper function to adjust the signature of palloc to match what libz
347 : * expects.
348 : */
349 : static void *
350 2 : gzip_palloc(void *opaque, unsigned items, unsigned size)
351 : {
352 2 : return palloc(items * size);
353 : }
354 :
355 : /*
356 : * Wrapper function to adjust the signature of pfree to match what libz
357 : * expects.
358 : */
359 : static void
436 rhaas 360 UBC 0 : gzip_pfree(void *opaque, void *address)
361 : {
362 0 : pfree(address);
363 0 : }
364 : #endif
|