Age Owner Branch data TLA Line data Source code
1 : : /*-------------------------------------------------------------------------
2 : : *
3 : : * bbstreamer_lz4.c
4 : : *
5 : : * Portions Copyright (c) 1996-2024, PostgreSQL Global Development Group
6 : : *
7 : : * IDENTIFICATION
8 : : * src/bin/pg_basebackup/bbstreamer_lz4.c
9 : : *-------------------------------------------------------------------------
10 : : */
11 : :
12 : : #include "postgres_fe.h"
13 : :
14 : : #include <unistd.h>
15 : :
16 : : #ifdef USE_LZ4
17 : : #include <lz4frame.h>
18 : : #endif
19 : :
20 : : #include "bbstreamer.h"
21 : : #include "common/file_perm.h"
22 : : #include "common/logging.h"
23 : : #include "common/string.h"
24 : :
25 : : #ifdef USE_LZ4
26 : : typedef struct bbstreamer_lz4_frame
27 : : {
28 : : bbstreamer base;
29 : :
30 : : LZ4F_compressionContext_t cctx;
31 : : LZ4F_decompressionContext_t dctx;
32 : : LZ4F_preferences_t prefs;
33 : :
34 : : size_t bytes_written;
35 : : bool header_written;
36 : : } bbstreamer_lz4_frame;
37 : :
38 : : static void bbstreamer_lz4_compressor_content(bbstreamer *streamer,
39 : : bbstreamer_member *member,
40 : : const char *data, int len,
41 : : bbstreamer_archive_context context);
42 : : static void bbstreamer_lz4_compressor_finalize(bbstreamer *streamer);
43 : : static void bbstreamer_lz4_compressor_free(bbstreamer *streamer);
44 : :
45 : : const bbstreamer_ops bbstreamer_lz4_compressor_ops = {
46 : : .content = bbstreamer_lz4_compressor_content,
47 : : .finalize = bbstreamer_lz4_compressor_finalize,
48 : : .free = bbstreamer_lz4_compressor_free
49 : : };
50 : :
51 : : static void bbstreamer_lz4_decompressor_content(bbstreamer *streamer,
52 : : bbstreamer_member *member,
53 : : const char *data, int len,
54 : : bbstreamer_archive_context context);
55 : : static void bbstreamer_lz4_decompressor_finalize(bbstreamer *streamer);
56 : : static void bbstreamer_lz4_decompressor_free(bbstreamer *streamer);
57 : :
58 : : const bbstreamer_ops bbstreamer_lz4_decompressor_ops = {
59 : : .content = bbstreamer_lz4_decompressor_content,
60 : : .finalize = bbstreamer_lz4_decompressor_finalize,
61 : : .free = bbstreamer_lz4_decompressor_free
62 : : };
63 : : #endif
64 : :
65 : : /*
66 : : * Create a new base backup streamer that performs lz4 compression of tar
67 : : * blocks.
68 : : */
69 : : bbstreamer *
733 michael@paquier.xyz 70 :CBC 1 : bbstreamer_lz4_compressor_new(bbstreamer *next, pg_compress_specification *compress)
71 : : {
72 : : #ifdef USE_LZ4
73 : : bbstreamer_lz4_frame *streamer;
74 : : LZ4F_errorCode_t ctxError;
75 : : LZ4F_preferences_t *prefs;
76 : :
793 rhaas@postgresql.org 77 [ - + ]: 1 : Assert(next != NULL);
78 : :
79 : 1 : streamer = palloc0(sizeof(bbstreamer_lz4_frame));
80 : 1 : *((const bbstreamer_ops **) &streamer->base.bbs_ops) =
81 : : &bbstreamer_lz4_compressor_ops;
82 : :
83 : 1 : streamer->base.bbs_next = next;
84 : 1 : initStringInfo(&streamer->base.bbs_buffer);
85 : 1 : streamer->header_written = false;
86 : :
87 : : /* Initialize stream compression preferences */
88 : 1 : prefs = &streamer->prefs;
89 : 1 : memset(prefs, 0, sizeof(LZ4F_preferences_t));
90 : 1 : prefs->frameInfo.blockSizeID = LZ4F_max256KB;
578 michael@paquier.xyz 91 : 1 : prefs->compressionLevel = compress->level;
92 : :
793 rhaas@postgresql.org 93 : 1 : ctxError = LZ4F_createCompressionContext(&streamer->cctx, LZ4F_VERSION);
94 [ - + ]: 1 : if (LZ4F_isError(ctxError))
737 tgl@sss.pgh.pa.us 95 :UBC 0 : pg_log_error("could not create lz4 compression context: %s",
96 : : LZ4F_getErrorName(ctxError));
97 : :
793 rhaas@postgresql.org 98 :CBC 1 : return &streamer->base;
99 : : #else
100 : : pg_fatal("this build does not support compression with %s", "LZ4");
101 : : return NULL; /* keep compiler quiet */
102 : : #endif
103 : : }
104 : :
105 : : #ifdef USE_LZ4
106 : : /*
107 : : * Compress the input data to output buffer.
108 : : *
109 : : * Find out the compression bound based on input data length for each
110 : : * invocation to make sure that output buffer has enough capacity to
111 : : * accommodate the compressed data. In case if the output buffer
112 : : * capacity falls short of compression bound then forward the content
113 : : * of output buffer to next streamer and empty the buffer.
114 : : */
115 : : static void
116 : 2698 : bbstreamer_lz4_compressor_content(bbstreamer *streamer,
117 : : bbstreamer_member *member,
118 : : const char *data, int len,
119 : : bbstreamer_archive_context context)
120 : : {
121 : : bbstreamer_lz4_frame *mystreamer;
122 : : uint8 *next_in,
123 : : *next_out;
124 : : size_t out_bound,
125 : : compressed_size,
126 : : avail_out;
127 : :
128 : 2698 : mystreamer = (bbstreamer_lz4_frame *) streamer;
129 : 2698 : next_in = (uint8 *) data;
130 : :
131 : : /* Write header before processing the first input chunk. */
132 [ + + ]: 2698 : if (!mystreamer->header_written)
133 : : {
134 : 1 : compressed_size = LZ4F_compressBegin(mystreamer->cctx,
135 : 1 : (uint8 *) mystreamer->base.bbs_buffer.data,
136 : 1 : mystreamer->base.bbs_buffer.maxlen,
137 : 1 : &mystreamer->prefs);
138 : :
139 [ - + ]: 1 : if (LZ4F_isError(compressed_size))
793 rhaas@postgresql.org 140 :UBC 0 : pg_log_error("could not write lz4 header: %s",
141 : : LZ4F_getErrorName(compressed_size));
142 : :
793 rhaas@postgresql.org 143 :CBC 1 : mystreamer->bytes_written += compressed_size;
144 : 1 : mystreamer->header_written = true;
145 : : }
146 : :
147 : : /*
148 : : * Update the offset and capacity of output buffer based on number of
149 : : * bytes written to output buffer.
150 : : */
151 : 2698 : next_out = (uint8 *) mystreamer->base.bbs_buffer.data + mystreamer->bytes_written;
152 : 2698 : avail_out = mystreamer->base.bbs_buffer.maxlen - mystreamer->bytes_written;
153 : :
154 : : /*
155 : : * Find out the compression bound and make sure that output buffer has the
156 : : * required capacity for the success of LZ4F_compressUpdate. If needed
157 : : * forward the content to next streamer and empty the buffer.
158 : : */
159 : 2698 : out_bound = LZ4F_compressBound(len, &mystreamer->prefs);
768 160 [ + + ]: 2698 : if (avail_out < out_bound)
161 : : {
703 tgl@sss.pgh.pa.us 162 : 13 : bbstreamer_content(mystreamer->base.bbs_next, member,
163 : 13 : mystreamer->base.bbs_buffer.data,
164 : 13 : mystreamer->bytes_written,
165 : : context);
166 : :
167 : : /* Enlarge buffer if it falls short of out bound. */
168 [ + + ]: 13 : if (mystreamer->base.bbs_buffer.maxlen < out_bound)
169 : 1 : enlargeStringInfo(&mystreamer->base.bbs_buffer, out_bound);
170 : :
171 : 13 : avail_out = mystreamer->base.bbs_buffer.maxlen;
172 : 13 : mystreamer->bytes_written = 0;
173 : 13 : next_out = (uint8 *) mystreamer->base.bbs_buffer.data;
174 : : }
175 : :
176 : : /*
177 : : * This call compresses the data starting at next_in and generates the
178 : : * output starting at next_out. It expects the caller to provide the size
179 : : * of input buffer and capacity of output buffer by providing parameters
180 : : * len and avail_out.
181 : : *
182 : : * It returns the number of bytes compressed to output buffer.
183 : : */
793 rhaas@postgresql.org 184 : 2698 : compressed_size = LZ4F_compressUpdate(mystreamer->cctx,
185 : : next_out, avail_out,
186 : : next_in, len, NULL);
187 : :
188 [ - + ]: 2698 : if (LZ4F_isError(compressed_size))
793 rhaas@postgresql.org 189 :UBC 0 : pg_log_error("could not compress data: %s",
190 : : LZ4F_getErrorName(compressed_size));
191 : :
793 rhaas@postgresql.org 192 :CBC 2698 : mystreamer->bytes_written += compressed_size;
193 : 2698 : }
194 : :
195 : : /*
196 : : * End-of-stream processing.
197 : : */
198 : : static void
199 : 1 : bbstreamer_lz4_compressor_finalize(bbstreamer *streamer)
200 : : {
201 : : bbstreamer_lz4_frame *mystreamer;
202 : : uint8 *next_out;
203 : : size_t footer_bound,
204 : : compressed_size,
205 : : avail_out;
206 : :
207 : 1 : mystreamer = (bbstreamer_lz4_frame *) streamer;
208 : :
209 : : /* Find out the footer bound and update the output buffer. */
210 : 1 : footer_bound = LZ4F_compressBound(0, &mystreamer->prefs);
768 211 [ - + ]: 1 : if ((mystreamer->base.bbs_buffer.maxlen - mystreamer->bytes_written) <
212 : : footer_bound)
213 : : {
703 tgl@sss.pgh.pa.us 214 :UBC 0 : bbstreamer_content(mystreamer->base.bbs_next, NULL,
215 : 0 : mystreamer->base.bbs_buffer.data,
216 : 0 : mystreamer->bytes_written,
217 : : BBSTREAMER_UNKNOWN);
218 : :
219 : : /* Enlarge buffer if it falls short of footer bound. */
220 [ # # ]: 0 : if (mystreamer->base.bbs_buffer.maxlen < footer_bound)
221 : 0 : enlargeStringInfo(&mystreamer->base.bbs_buffer, footer_bound);
222 : :
223 : 0 : avail_out = mystreamer->base.bbs_buffer.maxlen;
224 : 0 : mystreamer->bytes_written = 0;
225 : 0 : next_out = (uint8 *) mystreamer->base.bbs_buffer.data;
226 : : }
227 : : else
228 : : {
793 rhaas@postgresql.org 229 :CBC 1 : next_out = (uint8 *) mystreamer->base.bbs_buffer.data + mystreamer->bytes_written;
230 : 1 : avail_out = mystreamer->base.bbs_buffer.maxlen - mystreamer->bytes_written;
231 : : }
232 : :
233 : : /*
234 : : * Finalize the frame and flush whatever data remaining in compression
235 : : * context.
236 : : */
237 : 1 : compressed_size = LZ4F_compressEnd(mystreamer->cctx,
238 : : next_out, avail_out, NULL);
239 : :
240 [ - + ]: 1 : if (LZ4F_isError(compressed_size))
793 rhaas@postgresql.org 241 :UBC 0 : pg_log_error("could not end lz4 compression: %s",
242 : : LZ4F_getErrorName(compressed_size));
243 : :
793 rhaas@postgresql.org 244 :CBC 1 : mystreamer->bytes_written += compressed_size;
245 : :
246 : 1 : bbstreamer_content(mystreamer->base.bbs_next, NULL,
247 : 1 : mystreamer->base.bbs_buffer.data,
248 : 1 : mystreamer->bytes_written,
249 : : BBSTREAMER_UNKNOWN);
250 : :
251 : 1 : bbstreamer_finalize(mystreamer->base.bbs_next);
252 : 1 : }
253 : :
254 : : /*
255 : : * Free memory.
256 : : */
257 : : static void
258 : 1 : bbstreamer_lz4_compressor_free(bbstreamer *streamer)
259 : : {
260 : : bbstreamer_lz4_frame *mystreamer;
261 : :
262 : 1 : mystreamer = (bbstreamer_lz4_frame *) streamer;
263 : 1 : bbstreamer_free(streamer->bbs_next);
264 : 1 : LZ4F_freeCompressionContext(mystreamer->cctx);
265 : 1 : pfree(streamer->bbs_buffer.data);
266 : 1 : pfree(streamer);
267 : 1 : }
268 : : #endif
269 : :
270 : : /*
271 : : * Create a new base backup streamer that performs decompression of lz4
272 : : * compressed blocks.
273 : : */
274 : : bbstreamer *
275 : 1 : bbstreamer_lz4_decompressor_new(bbstreamer *next)
276 : : {
277 : : #ifdef USE_LZ4
278 : : bbstreamer_lz4_frame *streamer;
279 : : LZ4F_errorCode_t ctxError;
280 : :
281 [ - + ]: 1 : Assert(next != NULL);
282 : :
283 : 1 : streamer = palloc0(sizeof(bbstreamer_lz4_frame));
284 : 1 : *((const bbstreamer_ops **) &streamer->base.bbs_ops) =
285 : : &bbstreamer_lz4_decompressor_ops;
286 : :
287 : 1 : streamer->base.bbs_next = next;
288 : 1 : initStringInfo(&streamer->base.bbs_buffer);
289 : :
290 : : /* Initialize internal stream state for decompression */
291 : 1 : ctxError = LZ4F_createDecompressionContext(&streamer->dctx, LZ4F_VERSION);
292 [ - + ]: 1 : if (LZ4F_isError(ctxError))
737 tgl@sss.pgh.pa.us 293 :UBC 0 : pg_fatal("could not initialize compression library: %s",
294 : : LZ4F_getErrorName(ctxError));
295 : :
793 rhaas@postgresql.org 296 :CBC 1 : return &streamer->base;
297 : : #else
298 : : pg_fatal("this build does not support compression with %s", "LZ4");
299 : : return NULL; /* keep compiler quiet */
300 : : #endif
301 : : }
302 : :
303 : : #ifdef USE_LZ4
304 : : /*
305 : : * Decompress the input data to output buffer until we run out of input
306 : : * data. Each time the output buffer is full, pass on the decompressed data
307 : : * to the next streamer.
308 : : */
309 : : static void
310 : 90 : bbstreamer_lz4_decompressor_content(bbstreamer *streamer,
311 : : bbstreamer_member *member,
312 : : const char *data, int len,
313 : : bbstreamer_archive_context context)
314 : : {
315 : : bbstreamer_lz4_frame *mystreamer;
316 : : uint8 *next_in,
317 : : *next_out;
318 : : size_t avail_in,
319 : : avail_out;
320 : :
321 : 90 : mystreamer = (bbstreamer_lz4_frame *) streamer;
322 : 90 : next_in = (uint8 *) data;
323 : 90 : next_out = (uint8 *) mystreamer->base.bbs_buffer.data;
324 : 90 : avail_in = len;
325 : 90 : avail_out = mystreamer->base.bbs_buffer.maxlen;
326 : :
327 [ + + ]: 40019 : while (avail_in > 0)
328 : : {
329 : : size_t ret,
330 : : read_size,
331 : : out_size;
332 : :
333 : 39929 : read_size = avail_in;
334 : 39929 : out_size = avail_out;
335 : :
336 : : /*
337 : : * This call decompresses the data starting at next_in and generates
338 : : * the output data starting at next_out. It expects the caller to
339 : : * provide size of the input buffer and total capacity of the output
340 : : * buffer by providing the read_size and out_size parameters
341 : : * respectively.
342 : : *
343 : : * Per the documentation of LZ4, parameters read_size and out_size
344 : : * behaves as dual parameters. On return, the number of bytes consumed
345 : : * from the input buffer will be written back to read_size and the
346 : : * number of bytes decompressed to output buffer will be written back
347 : : * to out_size respectively.
348 : : */
349 : 39929 : ret = LZ4F_decompress(mystreamer->dctx,
350 : : next_out, &out_size,
351 : : next_in, &read_size, NULL);
352 : :
353 [ - + ]: 39929 : if (LZ4F_isError(ret))
793 rhaas@postgresql.org 354 :UBC 0 : pg_log_error("could not decompress data: %s",
355 : : LZ4F_getErrorName(ret));
356 : :
357 : : /* Update input buffer based on number of bytes consumed */
793 rhaas@postgresql.org 358 :CBC 39929 : avail_in -= read_size;
359 : 39929 : next_in += read_size;
360 : :
361 : 39929 : mystreamer->bytes_written += out_size;
362 : :
363 : : /*
364 : : * If output buffer is full then forward the content to next streamer
365 : : * and update the output buffer.
366 : : */
367 [ + - ]: 39929 : if (mystreamer->bytes_written >= mystreamer->base.bbs_buffer.maxlen)
368 : : {
369 : 39929 : bbstreamer_content(mystreamer->base.bbs_next, member,
370 : 39929 : mystreamer->base.bbs_buffer.data,
371 : : mystreamer->base.bbs_buffer.maxlen,
372 : : context);
373 : :
374 : 39929 : avail_out = mystreamer->base.bbs_buffer.maxlen;
375 : 39929 : mystreamer->bytes_written = 0;
376 : 39929 : next_out = (uint8 *) mystreamer->base.bbs_buffer.data;
377 : : }
378 : : else
379 : : {
793 rhaas@postgresql.org 380 :LBC (1) : avail_out = mystreamer->base.bbs_buffer.maxlen - mystreamer->bytes_written;
381 : (1) : next_out += mystreamer->bytes_written;
382 : : }
383 : : }
793 rhaas@postgresql.org 384 :CBC 90 : }
385 : :
386 : : /*
387 : : * End-of-stream processing.
388 : : */
389 : : static void
390 : 1 : bbstreamer_lz4_decompressor_finalize(bbstreamer *streamer)
391 : : {
392 : : bbstreamer_lz4_frame *mystreamer;
393 : :
394 : 1 : mystreamer = (bbstreamer_lz4_frame *) streamer;
395 : :
396 : : /*
397 : : * End of the stream, if there is some pending data in output buffers then
398 : : * we must forward it to next streamer.
399 : : */
400 : 1 : bbstreamer_content(mystreamer->base.bbs_next, NULL,
401 : 1 : mystreamer->base.bbs_buffer.data,
402 : : mystreamer->base.bbs_buffer.maxlen,
403 : : BBSTREAMER_UNKNOWN);
404 : :
405 : 1 : bbstreamer_finalize(mystreamer->base.bbs_next);
406 : 1 : }
407 : :
408 : : /*
409 : : * Free memory.
410 : : */
411 : : static void
412 : 1 : bbstreamer_lz4_decompressor_free(bbstreamer *streamer)
413 : : {
414 : : bbstreamer_lz4_frame *mystreamer;
415 : :
416 : 1 : mystreamer = (bbstreamer_lz4_frame *) streamer;
417 : 1 : bbstreamer_free(streamer->bbs_next);
418 : 1 : LZ4F_freeDecompressionContext(mystreamer->dctx);
419 : 1 : pfree(streamer->bbs_buffer.data);
420 : 1 : pfree(streamer);
421 : 1 : }
422 : : #endif
|