Age Owner TLA Line data Source code
1 : /*-------------------------------------------------------------------------
2 : *
3 : * bbstreamer_lz4.c
4 : *
5 : * Portions Copyright (c) 1996-2023, PostgreSQL Global Development Group
6 : *
7 : * IDENTIFICATION
8 : * src/bin/pg_basebackup/bbstreamer_lz4.c
9 : *-------------------------------------------------------------------------
10 : */
11 :
12 : #include "postgres_fe.h"
13 :
14 : #include <unistd.h>
15 :
16 : #ifdef USE_LZ4
17 : #include <lz4frame.h>
18 : #endif
19 :
20 : #include "bbstreamer.h"
21 : #include "common/logging.h"
22 : #include "common/file_perm.h"
23 : #include "common/string.h"
24 :
25 : #ifdef USE_LZ4
26 : typedef struct bbstreamer_lz4_frame
27 : {
28 : bbstreamer base;
29 :
30 : LZ4F_compressionContext_t cctx;
31 : LZ4F_decompressionContext_t dctx;
32 : LZ4F_preferences_t prefs;
33 :
34 : size_t bytes_written;
35 : bool header_written;
36 : } bbstreamer_lz4_frame;
37 :
38 : static void bbstreamer_lz4_compressor_content(bbstreamer *streamer,
39 : bbstreamer_member *member,
40 : const char *data, int len,
41 : bbstreamer_archive_context context);
42 : static void bbstreamer_lz4_compressor_finalize(bbstreamer *streamer);
43 : static void bbstreamer_lz4_compressor_free(bbstreamer *streamer);
44 :
45 : const bbstreamer_ops bbstreamer_lz4_compressor_ops = {
46 : .content = bbstreamer_lz4_compressor_content,
47 : .finalize = bbstreamer_lz4_compressor_finalize,
48 : .free = bbstreamer_lz4_compressor_free
49 : };
50 :
51 : static void bbstreamer_lz4_decompressor_content(bbstreamer *streamer,
52 : bbstreamer_member *member,
53 : const char *data, int len,
54 : bbstreamer_archive_context context);
55 : static void bbstreamer_lz4_decompressor_finalize(bbstreamer *streamer);
56 : static void bbstreamer_lz4_decompressor_free(bbstreamer *streamer);
57 :
58 : const bbstreamer_ops bbstreamer_lz4_decompressor_ops = {
59 : .content = bbstreamer_lz4_decompressor_content,
60 : .finalize = bbstreamer_lz4_decompressor_finalize,
61 : .free = bbstreamer_lz4_decompressor_free
62 : };
63 : #endif
64 :
65 : /*
66 : * Create a new base backup streamer that performs lz4 compression of tar
67 : * blocks.
68 : */
69 : bbstreamer *
362 michael 70 CBC 1 : bbstreamer_lz4_compressor_new(bbstreamer *next, pg_compress_specification *compress)
71 : {
72 : #ifdef USE_LZ4
73 : bbstreamer_lz4_frame *streamer;
74 : LZ4F_errorCode_t ctxError;
75 : LZ4F_preferences_t *prefs;
76 :
422 rhaas 77 1 : Assert(next != NULL);
78 :
79 1 : streamer = palloc0(sizeof(bbstreamer_lz4_frame));
80 1 : *((const bbstreamer_ops **) &streamer->base.bbs_ops) =
81 : &bbstreamer_lz4_compressor_ops;
82 :
83 1 : streamer->base.bbs_next = next;
84 1 : initStringInfo(&streamer->base.bbs_buffer);
85 1 : streamer->header_written = false;
86 :
87 : /* Initialize stream compression preferences */
88 1 : prefs = &streamer->prefs;
89 1 : memset(prefs, 0, sizeof(LZ4F_preferences_t));
90 1 : prefs->frameInfo.blockSizeID = LZ4F_max256KB;
207 michael 91 1 : prefs->compressionLevel = compress->level;
92 :
422 rhaas 93 1 : ctxError = LZ4F_createCompressionContext(&streamer->cctx, LZ4F_VERSION);
94 1 : if (LZ4F_isError(ctxError))
366 tgl 95 UBC 0 : pg_log_error("could not create lz4 compression context: %s",
96 : LZ4F_getErrorName(ctxError));
97 :
422 rhaas 98 CBC 1 : return &streamer->base;
99 : #else
100 : pg_fatal("this build does not support compression with %s", "LZ4");
101 : return NULL; /* keep compiler quiet */
102 : #endif
103 : }
104 :
105 : #ifdef USE_LZ4
106 : /*
107 : * Compress the input data to output buffer.
108 : *
109 : * Find out the compression bound based on input data length for each
110 : * invocation to make sure that output buffer has enough capacity to
111 : * accommodate the compressed data. In case if the output buffer
112 : * capacity falls short of compression bound then forward the content
113 : * of output buffer to next streamer and empty the buffer.
114 : */
115 : static void
116 2703 : bbstreamer_lz4_compressor_content(bbstreamer *streamer,
117 : bbstreamer_member *member,
118 : const char *data, int len,
119 : bbstreamer_archive_context context)
120 : {
121 : bbstreamer_lz4_frame *mystreamer;
122 : uint8 *next_in,
123 : *next_out;
124 : size_t out_bound,
125 : compressed_size,
126 : avail_out;
127 :
128 2703 : mystreamer = (bbstreamer_lz4_frame *) streamer;
129 2703 : next_in = (uint8 *) data;
130 :
131 : /* Write header before processing the first input chunk. */
132 2703 : if (!mystreamer->header_written)
133 : {
134 1 : compressed_size = LZ4F_compressBegin(mystreamer->cctx,
135 1 : (uint8 *) mystreamer->base.bbs_buffer.data,
136 1 : mystreamer->base.bbs_buffer.maxlen,
137 1 : &mystreamer->prefs);
138 :
139 1 : if (LZ4F_isError(compressed_size))
422 rhaas 140 UBC 0 : pg_log_error("could not write lz4 header: %s",
141 : LZ4F_getErrorName(compressed_size));
142 :
422 rhaas 143 CBC 1 : mystreamer->bytes_written += compressed_size;
144 1 : mystreamer->header_written = true;
145 : }
146 :
147 : /*
148 : * Update the offset and capacity of output buffer based on number of
149 : * bytes written to output buffer.
150 : */
151 2703 : next_out = (uint8 *) mystreamer->base.bbs_buffer.data + mystreamer->bytes_written;
152 2703 : avail_out = mystreamer->base.bbs_buffer.maxlen - mystreamer->bytes_written;
153 :
154 : /*
155 : * Find out the compression bound and make sure that output buffer has the
156 : * required capacity for the success of LZ4F_compressUpdate. If needed
157 : * forward the content to next streamer and empty the buffer.
158 : */
159 2703 : out_bound = LZ4F_compressBound(len, &mystreamer->prefs);
397 160 2703 : if (avail_out < out_bound)
161 : {
332 tgl 162 14 : bbstreamer_content(mystreamer->base.bbs_next, member,
163 14 : mystreamer->base.bbs_buffer.data,
164 14 : mystreamer->bytes_written,
165 : context);
166 :
167 : /* Enlarge buffer if it falls short of out bound. */
168 14 : if (mystreamer->base.bbs_buffer.maxlen < out_bound)
169 1 : enlargeStringInfo(&mystreamer->base.bbs_buffer, out_bound);
170 :
171 14 : avail_out = mystreamer->base.bbs_buffer.maxlen;
172 14 : mystreamer->bytes_written = 0;
173 14 : next_out = (uint8 *) mystreamer->base.bbs_buffer.data;
174 : }
175 :
176 : /*
177 : * This call compresses the data starting at next_in and generates the
178 : * output starting at next_out. It expects the caller to provide the size
179 : * of input buffer and capacity of output buffer by providing parameters
180 : * len and avail_out.
181 : *
182 : * It returns the number of bytes compressed to output buffer.
183 : */
422 rhaas 184 2703 : compressed_size = LZ4F_compressUpdate(mystreamer->cctx,
185 : next_out, avail_out,
186 : next_in, len, NULL);
187 :
188 2703 : if (LZ4F_isError(compressed_size))
422 rhaas 189 UBC 0 : pg_log_error("could not compress data: %s",
190 : LZ4F_getErrorName(compressed_size));
191 :
422 rhaas 192 CBC 2703 : mystreamer->bytes_written += compressed_size;
193 2703 : }
194 :
195 : /*
196 : * End-of-stream processing.
197 : */
198 : static void
199 1 : bbstreamer_lz4_compressor_finalize(bbstreamer *streamer)
200 : {
201 : bbstreamer_lz4_frame *mystreamer;
202 : uint8 *next_out;
203 : size_t footer_bound,
204 : compressed_size,
205 : avail_out;
206 :
207 1 : mystreamer = (bbstreamer_lz4_frame *) streamer;
208 :
209 : /* Find out the footer bound and update the output buffer. */
210 1 : footer_bound = LZ4F_compressBound(0, &mystreamer->prefs);
397 211 1 : if ((mystreamer->base.bbs_buffer.maxlen - mystreamer->bytes_written) <
212 : footer_bound)
213 : {
332 tgl 214 UBC 0 : bbstreamer_content(mystreamer->base.bbs_next, NULL,
215 0 : mystreamer->base.bbs_buffer.data,
216 0 : mystreamer->bytes_written,
217 : BBSTREAMER_UNKNOWN);
218 :
219 : /* Enlarge buffer if it falls short of footer bound. */
220 0 : if (mystreamer->base.bbs_buffer.maxlen < footer_bound)
221 0 : enlargeStringInfo(&mystreamer->base.bbs_buffer, footer_bound);
222 :
223 0 : avail_out = mystreamer->base.bbs_buffer.maxlen;
224 0 : mystreamer->bytes_written = 0;
225 0 : next_out = (uint8 *) mystreamer->base.bbs_buffer.data;
226 : }
227 : else
228 : {
422 rhaas 229 CBC 1 : next_out = (uint8 *) mystreamer->base.bbs_buffer.data + mystreamer->bytes_written;
230 1 : avail_out = mystreamer->base.bbs_buffer.maxlen - mystreamer->bytes_written;
231 : }
232 :
233 : /*
234 : * Finalize the frame and flush whatever data remaining in compression
235 : * context.
236 : */
237 1 : compressed_size = LZ4F_compressEnd(mystreamer->cctx,
238 : next_out, avail_out, NULL);
239 :
240 1 : if (LZ4F_isError(compressed_size))
422 rhaas 241 UBC 0 : pg_log_error("could not end lz4 compression: %s",
242 : LZ4F_getErrorName(compressed_size));
243 :
422 rhaas 244 CBC 1 : mystreamer->bytes_written += compressed_size;
245 :
246 1 : bbstreamer_content(mystreamer->base.bbs_next, NULL,
247 1 : mystreamer->base.bbs_buffer.data,
248 1 : mystreamer->bytes_written,
249 : BBSTREAMER_UNKNOWN);
250 :
251 1 : bbstreamer_finalize(mystreamer->base.bbs_next);
252 1 : }
253 :
254 : /*
255 : * Free memory.
256 : */
257 : static void
258 1 : bbstreamer_lz4_compressor_free(bbstreamer *streamer)
259 : {
260 : bbstreamer_lz4_frame *mystreamer;
261 :
262 1 : mystreamer = (bbstreamer_lz4_frame *) streamer;
263 1 : bbstreamer_free(streamer->bbs_next);
264 1 : LZ4F_freeCompressionContext(mystreamer->cctx);
265 1 : pfree(streamer->bbs_buffer.data);
266 1 : pfree(streamer);
267 1 : }
268 : #endif
269 :
270 : /*
271 : * Create a new base backup streamer that performs decompression of lz4
272 : * compressed blocks.
273 : */
274 : bbstreamer *
275 1 : bbstreamer_lz4_decompressor_new(bbstreamer *next)
276 : {
277 : #ifdef USE_LZ4
278 : bbstreamer_lz4_frame *streamer;
279 : LZ4F_errorCode_t ctxError;
280 :
281 1 : Assert(next != NULL);
282 :
283 1 : streamer = palloc0(sizeof(bbstreamer_lz4_frame));
284 1 : *((const bbstreamer_ops **) &streamer->base.bbs_ops) =
285 : &bbstreamer_lz4_decompressor_ops;
286 :
287 1 : streamer->base.bbs_next = next;
288 1 : initStringInfo(&streamer->base.bbs_buffer);
289 :
290 : /* Initialize internal stream state for decompression */
291 1 : ctxError = LZ4F_createDecompressionContext(&streamer->dctx, LZ4F_VERSION);
292 1 : if (LZ4F_isError(ctxError))
366 tgl 293 UBC 0 : pg_fatal("could not initialize compression library: %s",
294 : LZ4F_getErrorName(ctxError));
295 :
422 rhaas 296 CBC 1 : return &streamer->base;
297 : #else
298 : pg_fatal("this build does not support compression with %s", "LZ4");
299 : return NULL; /* keep compiler quiet */
300 : #endif
301 : }
302 :
303 : #ifdef USE_LZ4
304 : /*
305 : * Decompress the input data to output buffer until we run out of input
306 : * data. Each time the output buffer is full, pass on the decompressed data
307 : * to the next streamer.
308 : */
309 : static void
310 98 : bbstreamer_lz4_decompressor_content(bbstreamer *streamer,
311 : bbstreamer_member *member,
312 : const char *data, int len,
313 : bbstreamer_archive_context context)
314 : {
315 : bbstreamer_lz4_frame *mystreamer;
316 : uint8 *next_in,
317 : *next_out;
318 : size_t avail_in,
319 : avail_out;
320 :
321 98 : mystreamer = (bbstreamer_lz4_frame *) streamer;
322 98 : next_in = (uint8 *) data;
323 98 : next_out = (uint8 *) mystreamer->base.bbs_buffer.data;
324 98 : avail_in = len;
325 98 : avail_out = mystreamer->base.bbs_buffer.maxlen;
326 :
327 40050 : while (avail_in > 0)
328 : {
329 : size_t ret,
330 : read_size,
331 : out_size;
332 :
333 39952 : read_size = avail_in;
334 39952 : out_size = avail_out;
335 :
336 : /*
337 : * This call decompresses the data starting at next_in and generates
338 : * the output data starting at next_out. It expects the caller to
339 : * provide size of the input buffer and total capacity of the output
340 : * buffer by providing the read_size and out_size parameters
341 : * respectively.
342 : *
343 : * Per the documentation of LZ4, parameters read_size and out_size
344 : * behaves as dual parameters. On return, the number of bytes consumed
345 : * from the input buffer will be written back to read_size and the
346 : * number of bytes decompressed to output buffer will be written back
347 : * to out_size respectively.
348 : */
349 39952 : ret = LZ4F_decompress(mystreamer->dctx,
350 : next_out, &out_size,
351 : next_in, &read_size, NULL);
352 :
353 39952 : if (LZ4F_isError(ret))
422 rhaas 354 UBC 0 : pg_log_error("could not decompress data: %s",
355 : LZ4F_getErrorName(ret));
356 :
357 : /* Update input buffer based on number of bytes consumed */
422 rhaas 358 CBC 39952 : avail_in -= read_size;
359 39952 : next_in += read_size;
360 :
361 39952 : mystreamer->bytes_written += out_size;
362 :
363 : /*
364 : * If output buffer is full then forward the content to next streamer
365 : * and update the output buffer.
366 : */
367 39952 : if (mystreamer->bytes_written >= mystreamer->base.bbs_buffer.maxlen)
368 : {
369 39951 : bbstreamer_content(mystreamer->base.bbs_next, member,
370 39951 : mystreamer->base.bbs_buffer.data,
371 : mystreamer->base.bbs_buffer.maxlen,
372 : context);
373 :
374 39951 : avail_out = mystreamer->base.bbs_buffer.maxlen;
375 39951 : mystreamer->bytes_written = 0;
376 39951 : next_out = (uint8 *) mystreamer->base.bbs_buffer.data;
377 : }
378 : else
379 : {
380 1 : avail_out = mystreamer->base.bbs_buffer.maxlen - mystreamer->bytes_written;
381 1 : next_out += mystreamer->bytes_written;
382 : }
383 : }
384 98 : }
385 :
386 : /*
387 : * End-of-stream processing.
388 : */
389 : static void
390 1 : bbstreamer_lz4_decompressor_finalize(bbstreamer *streamer)
391 : {
392 : bbstreamer_lz4_frame *mystreamer;
393 :
394 1 : mystreamer = (bbstreamer_lz4_frame *) streamer;
395 :
396 : /*
397 : * End of the stream, if there is some pending data in output buffers then
398 : * we must forward it to next streamer.
399 : */
400 1 : bbstreamer_content(mystreamer->base.bbs_next, NULL,
401 1 : mystreamer->base.bbs_buffer.data,
402 : mystreamer->base.bbs_buffer.maxlen,
403 : BBSTREAMER_UNKNOWN);
404 :
405 1 : bbstreamer_finalize(mystreamer->base.bbs_next);
406 1 : }
407 :
408 : /*
409 : * Free memory.
410 : */
411 : static void
412 1 : bbstreamer_lz4_decompressor_free(bbstreamer *streamer)
413 : {
414 : bbstreamer_lz4_frame *mystreamer;
415 :
416 1 : mystreamer = (bbstreamer_lz4_frame *) streamer;
417 1 : bbstreamer_free(streamer->bbs_next);
418 1 : LZ4F_freeDecompressionContext(mystreamer->dctx);
419 1 : pfree(streamer->bbs_buffer.data);
420 1 : pfree(streamer);
421 1 : }
422 : #endif
|