Age Owner Branch data TLA Line data Source code
1 : : /*-------------------------------------------------------------------------
2 : : *
3 : : * bbstreamer_zstd.c
4 : : *
5 : : * Portions Copyright (c) 1996-2024, PostgreSQL Global Development Group
6 : : *
7 : : * IDENTIFICATION
8 : : * src/bin/pg_basebackup/bbstreamer_zstd.c
9 : : *-------------------------------------------------------------------------
10 : : */
11 : :
12 : : #include "postgres_fe.h"
13 : :
14 : : #include <unistd.h>
15 : :
16 : : #ifdef USE_ZSTD
17 : : #include <zstd.h>
18 : : #endif
19 : :
20 : : #include "bbstreamer.h"
21 : : #include "common/logging.h"
22 : :
23 : : #ifdef USE_ZSTD
24 : :
25 : : typedef struct bbstreamer_zstd_frame
26 : : {
27 : : bbstreamer base;
28 : :
29 : : ZSTD_CCtx *cctx;
30 : : ZSTD_DCtx *dctx;
31 : : ZSTD_outBuffer zstd_outBuf;
32 : : } bbstreamer_zstd_frame;
33 : :
34 : : static void bbstreamer_zstd_compressor_content(bbstreamer *streamer,
35 : : bbstreamer_member *member,
36 : : const char *data, int len,
37 : : bbstreamer_archive_context context);
38 : : static void bbstreamer_zstd_compressor_finalize(bbstreamer *streamer);
39 : : static void bbstreamer_zstd_compressor_free(bbstreamer *streamer);
40 : :
41 : : const bbstreamer_ops bbstreamer_zstd_compressor_ops = {
42 : : .content = bbstreamer_zstd_compressor_content,
43 : : .finalize = bbstreamer_zstd_compressor_finalize,
44 : : .free = bbstreamer_zstd_compressor_free
45 : : };
46 : :
47 : : static void bbstreamer_zstd_decompressor_content(bbstreamer *streamer,
48 : : bbstreamer_member *member,
49 : : const char *data, int len,
50 : : bbstreamer_archive_context context);
51 : : static void bbstreamer_zstd_decompressor_finalize(bbstreamer *streamer);
52 : : static void bbstreamer_zstd_decompressor_free(bbstreamer *streamer);
53 : :
54 : : const bbstreamer_ops bbstreamer_zstd_decompressor_ops = {
55 : : .content = bbstreamer_zstd_decompressor_content,
56 : : .finalize = bbstreamer_zstd_decompressor_finalize,
57 : : .free = bbstreamer_zstd_decompressor_free
58 : : };
59 : : #endif
60 : :
61 : : /*
62 : : * Create a new base backup streamer that performs zstd compression of tar
63 : : * blocks.
64 : : */
65 : : bbstreamer *
733 michael@paquier.xyz 66 :CBC 3 : bbstreamer_zstd_compressor_new(bbstreamer *next, pg_compress_specification *compress)
67 : : {
68 : : #ifdef USE_ZSTD
69 : : bbstreamer_zstd_frame *streamer;
70 : : size_t ret;
71 : :
769 rhaas@postgresql.org 72 [ - + ]: 3 : Assert(next != NULL);
73 : :
74 : 3 : streamer = palloc0(sizeof(bbstreamer_zstd_frame));
75 : :
76 : 3 : *((const bbstreamer_ops **) &streamer->base.bbs_ops) =
77 : : &bbstreamer_zstd_compressor_ops;
78 : :
79 : 3 : streamer->base.bbs_next = next;
80 : 3 : initStringInfo(&streamer->base.bbs_buffer);
81 : 3 : enlargeStringInfo(&streamer->base.bbs_buffer, ZSTD_DStreamOutSize());
82 : :
83 : 3 : streamer->cctx = ZSTD_createCCtx();
84 [ - + ]: 3 : if (!streamer->cctx)
737 tgl@sss.pgh.pa.us 85 :UBC 0 : pg_fatal("could not create zstd compression context");
86 : :
87 : : /* Set compression level */
578 michael@paquier.xyz 88 :CBC 3 : ret = ZSTD_CCtx_setParameter(streamer->cctx, ZSTD_c_compressionLevel,
89 : : compress->level);
90 [ - + ]: 3 : if (ZSTD_isError(ret))
578 michael@paquier.xyz 91 :UBC 0 : pg_fatal("could not set zstd compression level to %d: %s",
92 : : compress->level, ZSTD_getErrorName(ret));
93 : :
94 : : /* Set # of workers, if specified */
733 michael@paquier.xyz 95 [ + + ]:CBC 3 : if ((compress->options & PG_COMPRESSION_OPTION_WORKERS) != 0)
96 : : {
97 : : /*
98 : : * On older versions of libzstd, this option does not exist, and
99 : : * trying to set it will fail. Similarly for newer versions if they
100 : : * are compiled without threading support.
101 : : */
746 rhaas@postgresql.org 102 : 1 : ret = ZSTD_CCtx_setParameter(streamer->cctx, ZSTD_c_nbWorkers,
103 : : compress->workers);
104 [ - + ]: 1 : if (ZSTD_isError(ret))
737 tgl@sss.pgh.pa.us 105 :UBC 0 : pg_fatal("could not set compression worker count to %d: %s",
106 : : compress->workers, ZSTD_getErrorName(ret));
107 : : }
108 : :
374 tomas.vondra@postgre 109 [ + + ]:CBC 3 : if ((compress->options & PG_COMPRESSION_OPTION_LONG_DISTANCE) != 0)
110 : : {
111 : 1 : ret = ZSTD_CCtx_setParameter(streamer->cctx,
112 : : ZSTD_c_enableLongDistanceMatching,
113 : 1 : compress->long_distance);
114 [ - + ]: 1 : if (ZSTD_isError(ret))
115 : : {
331 peter@eisentraut.org 116 :UBC 0 : pg_log_error("could not enable long-distance mode: %s",
117 : : ZSTD_getErrorName(ret));
374 tomas.vondra@postgre 118 : 0 : exit(1);
119 : : }
120 : : }
121 : :
122 : : /* Initialize the ZSTD output buffer. */
769 rhaas@postgresql.org 123 :CBC 3 : streamer->zstd_outBuf.dst = streamer->base.bbs_buffer.data;
124 : 3 : streamer->zstd_outBuf.size = streamer->base.bbs_buffer.maxlen;
125 : 3 : streamer->zstd_outBuf.pos = 0;
126 : :
127 : 3 : return &streamer->base;
128 : : #else
129 : : pg_fatal("this build does not support compression with %s", "ZSTD");
130 : : return NULL; /* keep compiler quiet */
131 : : #endif
132 : : }
133 : :
134 : : #ifdef USE_ZSTD
135 : : /*
136 : : * Compress the input data to output buffer.
137 : : *
138 : : * Find out the compression bound based on input data length for each
139 : : * invocation to make sure that output buffer has enough capacity to
140 : : * accommodate the compressed data. In case if the output buffer
141 : : * capacity falls short of compression bound then forward the content
142 : : * of output buffer to next streamer and empty the buffer.
143 : : */
144 : : static void
145 : 8094 : bbstreamer_zstd_compressor_content(bbstreamer *streamer,
146 : : bbstreamer_member *member,
147 : : const char *data, int len,
148 : : bbstreamer_archive_context context)
149 : : {
150 : 8094 : bbstreamer_zstd_frame *mystreamer = (bbstreamer_zstd_frame *) streamer;
151 : 8094 : ZSTD_inBuffer inBuf = {data, len, 0};
152 : :
153 [ + + ]: 24289 : while (inBuf.pos < inBuf.size)
154 : : {
155 : : size_t yet_to_flush;
156 : 8101 : size_t max_needed = ZSTD_compressBound(inBuf.size - inBuf.pos);
157 : :
158 : : /*
159 : : * If the output buffer is not left with enough space, send the
160 : : * compressed bytes to the next streamer, and empty the buffer.
161 : : */
162 [ + + ]: 8101 : if (mystreamer->zstd_outBuf.size - mystreamer->zstd_outBuf.pos <
163 : : max_needed)
164 : : {
165 : 21 : bbstreamer_content(mystreamer->base.bbs_next, member,
166 : 21 : mystreamer->zstd_outBuf.dst,
167 : 21 : mystreamer->zstd_outBuf.pos,
168 : : context);
169 : :
170 : : /* Reset the ZSTD output buffer. */
171 : 21 : mystreamer->zstd_outBuf.dst = mystreamer->base.bbs_buffer.data;
172 : 21 : mystreamer->zstd_outBuf.size = mystreamer->base.bbs_buffer.maxlen;
173 : 21 : mystreamer->zstd_outBuf.pos = 0;
174 : : }
175 : :
176 : : yet_to_flush =
177 : 8101 : ZSTD_compressStream2(mystreamer->cctx, &mystreamer->zstd_outBuf,
178 : : &inBuf, ZSTD_e_continue);
179 : :
180 [ + - ]: 8101 : if (ZSTD_isError(yet_to_flush))
769 rhaas@postgresql.org 181 :UBC 0 : pg_log_error("could not compress data: %s",
182 : : ZSTD_getErrorName(yet_to_flush));
183 : : }
769 rhaas@postgresql.org 184 :CBC 8094 : }
185 : :
186 : : /*
187 : : * End-of-stream processing.
188 : : */
189 : : static void
190 : 3 : bbstreamer_zstd_compressor_finalize(bbstreamer *streamer)
191 : : {
192 : 3 : bbstreamer_zstd_frame *mystreamer = (bbstreamer_zstd_frame *) streamer;
193 : : size_t yet_to_flush;
194 : :
195 : : do
196 : : {
197 : 5 : ZSTD_inBuffer in = {NULL, 0, 0};
198 : 5 : size_t max_needed = ZSTD_compressBound(0);
199 : :
200 : : /*
201 : : * If the output buffer is not left with enough space, send the
202 : : * compressed bytes to the next streamer, and empty the buffer.
203 : : */
204 [ + + ]: 5 : if (mystreamer->zstd_outBuf.size - mystreamer->zstd_outBuf.pos <
205 : : max_needed)
206 : : {
207 : 2 : bbstreamer_content(mystreamer->base.bbs_next, NULL,
208 : 2 : mystreamer->zstd_outBuf.dst,
209 : 2 : mystreamer->zstd_outBuf.pos,
210 : : BBSTREAMER_UNKNOWN);
211 : :
212 : : /* Reset the ZSTD output buffer. */
213 : 2 : mystreamer->zstd_outBuf.dst = mystreamer->base.bbs_buffer.data;
214 : 2 : mystreamer->zstd_outBuf.size = mystreamer->base.bbs_buffer.maxlen;
215 : 2 : mystreamer->zstd_outBuf.pos = 0;
216 : : }
217 : :
218 : 5 : yet_to_flush = ZSTD_compressStream2(mystreamer->cctx,
219 : : &mystreamer->zstd_outBuf,
220 : : &in, ZSTD_e_end);
221 : :
222 [ - + ]: 5 : if (ZSTD_isError(yet_to_flush))
769 rhaas@postgresql.org 223 :UBC 0 : pg_log_error("could not compress data: %s",
224 : : ZSTD_getErrorName(yet_to_flush));
225 : :
769 rhaas@postgresql.org 226 [ + + ]:CBC 5 : } while (yet_to_flush > 0);
227 : :
228 : : /* Make sure to pass any remaining bytes to the next streamer. */
229 [ + - ]: 3 : if (mystreamer->zstd_outBuf.pos > 0)
230 : 3 : bbstreamer_content(mystreamer->base.bbs_next, NULL,
231 : 3 : mystreamer->zstd_outBuf.dst,
232 : 3 : mystreamer->zstd_outBuf.pos,
233 : : BBSTREAMER_UNKNOWN);
234 : :
235 : 3 : bbstreamer_finalize(mystreamer->base.bbs_next);
236 : 3 : }
237 : :
238 : : /*
239 : : * Free memory.
240 : : */
241 : : static void
242 : 3 : bbstreamer_zstd_compressor_free(bbstreamer *streamer)
243 : : {
244 : 3 : bbstreamer_zstd_frame *mystreamer = (bbstreamer_zstd_frame *) streamer;
245 : :
246 : 3 : bbstreamer_free(streamer->bbs_next);
247 : 3 : ZSTD_freeCCtx(mystreamer->cctx);
248 : 3 : pfree(streamer->bbs_buffer.data);
249 : 3 : pfree(streamer);
250 : 3 : }
251 : : #endif
252 : :
253 : : /*
254 : : * Create a new base backup streamer that performs decompression of zstd
255 : : * compressed blocks.
256 : : */
257 : : bbstreamer *
258 : 2 : bbstreamer_zstd_decompressor_new(bbstreamer *next)
259 : : {
260 : : #ifdef USE_ZSTD
261 : : bbstreamer_zstd_frame *streamer;
262 : :
263 [ - + ]: 2 : Assert(next != NULL);
264 : :
265 : 2 : streamer = palloc0(sizeof(bbstreamer_zstd_frame));
266 : 2 : *((const bbstreamer_ops **) &streamer->base.bbs_ops) =
267 : : &bbstreamer_zstd_decompressor_ops;
268 : :
269 : 2 : streamer->base.bbs_next = next;
270 : 2 : initStringInfo(&streamer->base.bbs_buffer);
271 : 2 : enlargeStringInfo(&streamer->base.bbs_buffer, ZSTD_DStreamOutSize());
272 : :
273 : 2 : streamer->dctx = ZSTD_createDCtx();
274 [ - + ]: 2 : if (!streamer->dctx)
737 tgl@sss.pgh.pa.us 275 :UBC 0 : pg_fatal("could not create zstd decompression context");
276 : :
277 : : /* Initialize the ZSTD output buffer. */
769 rhaas@postgresql.org 278 :CBC 2 : streamer->zstd_outBuf.dst = streamer->base.bbs_buffer.data;
279 : 2 : streamer->zstd_outBuf.size = streamer->base.bbs_buffer.maxlen;
280 : 2 : streamer->zstd_outBuf.pos = 0;
281 : :
282 : 2 : return &streamer->base;
283 : : #else
284 : : pg_fatal("this build does not support compression with %s", "ZSTD");
285 : : return NULL; /* keep compiler quiet */
286 : : #endif
287 : : }
288 : :
289 : : #ifdef USE_ZSTD
290 : : /*
291 : : * Decompress the input data to output buffer until we run out of input
292 : : * data. Each time the output buffer is full, pass on the decompressed data
293 : : * to the next streamer.
294 : : */
295 : : static void
296 : 205 : bbstreamer_zstd_decompressor_content(bbstreamer *streamer,
297 : : bbstreamer_member *member,
298 : : const char *data, int len,
299 : : bbstreamer_archive_context context)
300 : : {
301 : 205 : bbstreamer_zstd_frame *mystreamer = (bbstreamer_zstd_frame *) streamer;
302 : 205 : ZSTD_inBuffer inBuf = {data, len, 0};
303 : :
304 [ + + ]: 809 : while (inBuf.pos < inBuf.size)
305 : : {
306 : : size_t ret;
307 : :
308 : : /*
309 : : * If output buffer is full then forward the content to next streamer
310 : : * and update the output buffer.
311 : : */
312 [ + + ]: 399 : if (mystreamer->zstd_outBuf.pos >= mystreamer->zstd_outBuf.size)
313 : : {
314 : 310 : bbstreamer_content(mystreamer->base.bbs_next, member,
315 : 310 : mystreamer->zstd_outBuf.dst,
316 : 310 : mystreamer->zstd_outBuf.pos,
317 : : context);
318 : :
319 : : /* Reset the ZSTD output buffer. */
320 : 310 : mystreamer->zstd_outBuf.dst = mystreamer->base.bbs_buffer.data;
321 : 310 : mystreamer->zstd_outBuf.size = mystreamer->base.bbs_buffer.maxlen;
322 : 310 : mystreamer->zstd_outBuf.pos = 0;
323 : : }
324 : :
325 : 399 : ret = ZSTD_decompressStream(mystreamer->dctx,
326 : : &mystreamer->zstd_outBuf, &inBuf);
327 : :
328 [ + - ]: 399 : if (ZSTD_isError(ret))
737 tgl@sss.pgh.pa.us 329 :UBC 0 : pg_log_error("could not decompress data: %s",
330 : : ZSTD_getErrorName(ret));
331 : : }
769 rhaas@postgresql.org 332 :CBC 205 : }
333 : :
334 : : /*
335 : : * End-of-stream processing.
336 : : */
337 : : static void
338 : 2 : bbstreamer_zstd_decompressor_finalize(bbstreamer *streamer)
339 : : {
340 : 2 : bbstreamer_zstd_frame *mystreamer = (bbstreamer_zstd_frame *) streamer;
341 : :
342 : : /*
343 : : * End of the stream, if there is some pending data in output buffers then
344 : : * we must forward it to next streamer.
345 : : */
346 [ + - ]: 2 : if (mystreamer->zstd_outBuf.pos > 0)
347 : 2 : bbstreamer_content(mystreamer->base.bbs_next, NULL,
348 : 2 : mystreamer->base.bbs_buffer.data,
349 : : mystreamer->base.bbs_buffer.maxlen,
350 : : BBSTREAMER_UNKNOWN);
351 : :
352 : 2 : bbstreamer_finalize(mystreamer->base.bbs_next);
353 : 2 : }
354 : :
355 : : /*
356 : : * Free memory.
357 : : */
358 : : static void
359 : 2 : bbstreamer_zstd_decompressor_free(bbstreamer *streamer)
360 : : {
361 : 2 : bbstreamer_zstd_frame *mystreamer = (bbstreamer_zstd_frame *) streamer;
362 : :
363 : 2 : bbstreamer_free(streamer->bbs_next);
364 : 2 : ZSTD_freeDCtx(mystreamer->dctx);
365 : 2 : pfree(streamer->bbs_buffer.data);
366 : 2 : pfree(streamer);
367 : 2 : }
368 : : #endif
|