TLA Line data Source code
1 : /*-------------------------------------------------------------------------
2 : *
3 : * bbstreamer_zstd.c
4 : *
5 : * Portions Copyright (c) 1996-2023, PostgreSQL Global Development Group
6 : *
7 : * IDENTIFICATION
8 : * src/bin/pg_basebackup/bbstreamer_zstd.c
9 : *-------------------------------------------------------------------------
10 : */
11 :
12 : #include "postgres_fe.h"
13 :
14 : #include <unistd.h>
15 :
16 : #ifdef USE_ZSTD
17 : #include <zstd.h>
18 : #endif
19 :
20 : #include "bbstreamer.h"
21 : #include "common/logging.h"
22 :
23 : #ifdef USE_ZSTD
24 :
25 : typedef struct bbstreamer_zstd_frame
26 : {
27 : bbstreamer base;
28 :
29 : ZSTD_CCtx *cctx;
30 : ZSTD_DCtx *dctx;
31 : ZSTD_outBuffer zstd_outBuf;
32 : } bbstreamer_zstd_frame;
33 :
34 : static void bbstreamer_zstd_compressor_content(bbstreamer *streamer,
35 : bbstreamer_member *member,
36 : const char *data, int len,
37 : bbstreamer_archive_context context);
38 : static void bbstreamer_zstd_compressor_finalize(bbstreamer *streamer);
39 : static void bbstreamer_zstd_compressor_free(bbstreamer *streamer);
40 :
41 : const bbstreamer_ops bbstreamer_zstd_compressor_ops = {
42 : .content = bbstreamer_zstd_compressor_content,
43 : .finalize = bbstreamer_zstd_compressor_finalize,
44 : .free = bbstreamer_zstd_compressor_free
45 : };
46 :
47 : static void bbstreamer_zstd_decompressor_content(bbstreamer *streamer,
48 : bbstreamer_member *member,
49 : const char *data, int len,
50 : bbstreamer_archive_context context);
51 : static void bbstreamer_zstd_decompressor_finalize(bbstreamer *streamer);
52 : static void bbstreamer_zstd_decompressor_free(bbstreamer *streamer);
53 :
54 : const bbstreamer_ops bbstreamer_zstd_decompressor_ops = {
55 : .content = bbstreamer_zstd_decompressor_content,
56 : .finalize = bbstreamer_zstd_decompressor_finalize,
57 : .free = bbstreamer_zstd_decompressor_free
58 : };
59 : #endif
60 :
61 : /*
62 : * Create a new base backup streamer that performs zstd compression of tar
63 : * blocks.
64 : */
65 : bbstreamer *
66 CBC 3 : bbstreamer_zstd_compressor_new(bbstreamer *next, pg_compress_specification *compress)
67 : {
68 : #ifdef USE_ZSTD
69 : bbstreamer_zstd_frame *streamer;
70 : size_t ret;
71 :
72 3 : Assert(next != NULL);
73 :
74 3 : streamer = palloc0(sizeof(bbstreamer_zstd_frame));
75 :
76 3 : *((const bbstreamer_ops **) &streamer->base.bbs_ops) =
77 : &bbstreamer_zstd_compressor_ops;
78 :
79 3 : streamer->base.bbs_next = next;
80 3 : initStringInfo(&streamer->base.bbs_buffer);
81 3 : enlargeStringInfo(&streamer->base.bbs_buffer, ZSTD_DStreamOutSize());
82 :
83 3 : streamer->cctx = ZSTD_createCCtx();
84 3 : if (!streamer->cctx)
85 UBC 0 : pg_fatal("could not create zstd compression context");
86 :
87 : /* Set compression level */
88 CBC 3 : ret = ZSTD_CCtx_setParameter(streamer->cctx, ZSTD_c_compressionLevel,
89 : compress->level);
90 3 : if (ZSTD_isError(ret))
91 UBC 0 : pg_fatal("could not set zstd compression level to %d: %s",
92 : compress->level, ZSTD_getErrorName(ret));
93 :
94 : /* Set # of workers, if specified */
95 CBC 3 : if ((compress->options & PG_COMPRESSION_OPTION_WORKERS) != 0)
96 : {
97 : /*
98 : * On older versions of libzstd, this option does not exist, and
99 : * trying to set it will fail. Similarly for newer versions if they
100 : * are compiled without threading support.
101 : */
102 1 : ret = ZSTD_CCtx_setParameter(streamer->cctx, ZSTD_c_nbWorkers,
103 : compress->workers);
104 1 : if (ZSTD_isError(ret))
105 UBC 0 : pg_fatal("could not set compression worker count to %d: %s",
106 : compress->workers, ZSTD_getErrorName(ret));
107 : }
108 :
109 GNC 3 : if ((compress->options & PG_COMPRESSION_OPTION_LONG_DISTANCE) != 0)
110 : {
111 1 : ret = ZSTD_CCtx_setParameter(streamer->cctx,
112 : ZSTD_c_enableLongDistanceMatching,
113 1 : compress->long_distance);
114 1 : if (ZSTD_isError(ret))
115 : {
116 UNC 0 : pg_log_error("could not set compression flag for %s: %s",
117 : "long", ZSTD_getErrorName(ret));
118 0 : exit(1);
119 : }
120 : }
121 :
122 ECB : /* Initialize the ZSTD output buffer. */
123 GIC 3 : streamer->zstd_outBuf.dst = streamer->base.bbs_buffer.data;
124 CBC 3 : streamer->zstd_outBuf.size = streamer->base.bbs_buffer.maxlen;
125 GIC 3 : streamer->zstd_outBuf.pos = 0;
126 ECB :
127 CBC 3 : return &streamer->base;
128 : #else
129 : pg_fatal("this build does not support compression with %s", "ZSTD");
130 : return NULL; /* keep compiler quiet */
131 EUB : #endif
132 : }
133 :
134 : #ifdef USE_ZSTD
135 : /*
136 ECB : * Compress the input data to output buffer.
137 : *
138 : * Find out the compression bound based on input data length for each
139 : * invocation to make sure that output buffer has enough capacity to
140 : * accommodate the compressed data. In case if the output buffer
141 : * capacity falls short of compression bound then forward the content
142 : * of output buffer to next streamer and empty the buffer.
143 : */
144 : static void
145 GIC 8109 : bbstreamer_zstd_compressor_content(bbstreamer *streamer,
146 : bbstreamer_member *member,
147 : const char *data, int len,
148 : bbstreamer_archive_context context)
149 : {
150 8109 : bbstreamer_zstd_frame *mystreamer = (bbstreamer_zstd_frame *) streamer;
151 8109 : ZSTD_inBuffer inBuf = {data, len, 0};
152 :
153 24332 : while (inBuf.pos < inBuf.size)
154 : {
155 : size_t yet_to_flush;
156 8114 : size_t max_needed = ZSTD_compressBound(inBuf.size - inBuf.pos);
157 :
158 ECB : /*
159 : * If the output buffer is not left with enough space, send the
160 : * compressed bytes to the next streamer, and empty the buffer.
161 : */
162 GIC 8114 : if (mystreamer->zstd_outBuf.size - mystreamer->zstd_outBuf.pos <
163 ECB : max_needed)
164 : {
165 GIC 25 : bbstreamer_content(mystreamer->base.bbs_next, member,
166 CBC 25 : mystreamer->zstd_outBuf.dst,
167 GIC 25 : mystreamer->zstd_outBuf.pos,
168 : context);
169 ECB :
170 : /* Reset the ZSTD output buffer. */
171 GIC 25 : mystreamer->zstd_outBuf.dst = mystreamer->base.bbs_buffer.data;
172 25 : mystreamer->zstd_outBuf.size = mystreamer->base.bbs_buffer.maxlen;
173 25 : mystreamer->zstd_outBuf.pos = 0;
174 : }
175 ECB :
176 : yet_to_flush =
177 GIC 8114 : ZSTD_compressStream2(mystreamer->cctx, &mystreamer->zstd_outBuf,
178 ECB : &inBuf, ZSTD_e_continue);
179 :
180 CBC 8114 : if (ZSTD_isError(yet_to_flush))
181 UIC 0 : pg_log_error("could not compress data: %s",
182 : ZSTD_getErrorName(yet_to_flush));
183 : }
184 CBC 8109 : }
185 ECB :
186 : /*
187 : * End-of-stream processing.
188 : */
189 : static void
190 CBC 3 : bbstreamer_zstd_compressor_finalize(bbstreamer *streamer)
191 : {
192 GIC 3 : bbstreamer_zstd_frame *mystreamer = (bbstreamer_zstd_frame *) streamer;
193 ECB : size_t yet_to_flush;
194 EUB :
195 : do
196 : {
197 CBC 3 : ZSTD_inBuffer in = {NULL, 0, 0};
198 GIC 3 : size_t max_needed = ZSTD_compressBound(0);
199 :
200 : /*
201 : * If the output buffer is not left with enough space, send the
202 : * compressed bytes to the next streamer, and empty the buffer.
203 ECB : */
204 GIC 3 : if (mystreamer->zstd_outBuf.size - mystreamer->zstd_outBuf.pos <
205 ECB : max_needed)
206 : {
207 UIC 0 : bbstreamer_content(mystreamer->base.bbs_next, NULL,
208 0 : mystreamer->zstd_outBuf.dst,
209 0 : mystreamer->zstd_outBuf.pos,
210 ECB : BBSTREAMER_UNKNOWN);
211 :
212 : /* Reset the ZSTD output buffer. */
213 UIC 0 : mystreamer->zstd_outBuf.dst = mystreamer->base.bbs_buffer.data;
214 0 : mystreamer->zstd_outBuf.size = mystreamer->base.bbs_buffer.maxlen;
215 0 : mystreamer->zstd_outBuf.pos = 0;
216 : }
217 ECB :
218 GIC 3 : yet_to_flush = ZSTD_compressStream2(mystreamer->cctx,
219 : &mystreamer->zstd_outBuf,
220 EUB : &in, ZSTD_e_end);
221 :
222 GBC 3 : if (ZSTD_isError(yet_to_flush))
223 UIC 0 : pg_log_error("could not compress data: %s",
224 : ZSTD_getErrorName(yet_to_flush));
225 :
226 GBC 3 : } while (yet_to_flush > 0);
227 EUB :
228 : /* Make sure to pass any remaining bytes to the next streamer. */
229 GIC 3 : if (mystreamer->zstd_outBuf.pos > 0)
230 3 : bbstreamer_content(mystreamer->base.bbs_next, NULL,
231 CBC 3 : mystreamer->zstd_outBuf.dst,
232 GIC 3 : mystreamer->zstd_outBuf.pos,
233 : BBSTREAMER_UNKNOWN);
234 :
235 CBC 3 : bbstreamer_finalize(mystreamer->base.bbs_next);
236 GBC 3 : }
237 :
238 : /*
239 ECB : * Free memory.
240 : */
241 : static void
242 CBC 3 : bbstreamer_zstd_compressor_free(bbstreamer *streamer)
243 ECB : {
244 CBC 3 : bbstreamer_zstd_frame *mystreamer = (bbstreamer_zstd_frame *) streamer;
245 ECB :
246 GIC 3 : bbstreamer_free(streamer->bbs_next);
247 3 : ZSTD_freeCCtx(mystreamer->cctx);
248 CBC 3 : pfree(streamer->bbs_buffer.data);
249 3 : pfree(streamer);
250 GIC 3 : }
251 : #endif
252 :
253 : /*
254 : * Create a new base backup streamer that performs decompression of zstd
255 ECB : * compressed blocks.
256 : */
257 : bbstreamer *
258 GIC 2 : bbstreamer_zstd_decompressor_new(bbstreamer *next)
259 ECB : {
260 : #ifdef USE_ZSTD
261 : bbstreamer_zstd_frame *streamer;
262 :
263 CBC 2 : Assert(next != NULL);
264 :
265 GIC 2 : streamer = palloc0(sizeof(bbstreamer_zstd_frame));
266 2 : *((const bbstreamer_ops **) &streamer->base.bbs_ops) =
267 : &bbstreamer_zstd_decompressor_ops;
268 :
269 2 : streamer->base.bbs_next = next;
270 2 : initStringInfo(&streamer->base.bbs_buffer);
271 CBC 2 : enlargeStringInfo(&streamer->base.bbs_buffer, ZSTD_DStreamOutSize());
272 :
273 GIC 2 : streamer->dctx = ZSTD_createDCtx();
274 2 : if (!streamer->dctx)
275 UIC 0 : pg_fatal("could not create zstd decompression context");
276 ECB :
277 : /* Initialize the ZSTD output buffer. */
278 CBC 2 : streamer->zstd_outBuf.dst = streamer->base.bbs_buffer.data;
279 2 : streamer->zstd_outBuf.size = streamer->base.bbs_buffer.maxlen;
280 GIC 2 : streamer->zstd_outBuf.pos = 0;
281 :
282 CBC 2 : return &streamer->base;
283 ECB : #else
284 : pg_fatal("this build does not support compression with %s", "ZSTD");
285 : return NULL; /* keep compiler quiet */
286 : #endif
287 : }
288 EUB :
289 : #ifdef USE_ZSTD
290 : /*
291 ECB : * Decompress the input data to output buffer until we run out of input
292 : * data. Each time the output buffer is full, pass on the decompressed data
293 : * to the next streamer.
294 : */
295 : static void
296 GIC 216 : bbstreamer_zstd_decompressor_content(bbstreamer *streamer,
297 : bbstreamer_member *member,
298 : const char *data, int len,
299 : bbstreamer_archive_context context)
300 : {
301 216 : bbstreamer_zstd_frame *mystreamer = (bbstreamer_zstd_frame *) streamer;
302 216 : ZSTD_inBuffer inBuf = {data, len, 0};
303 :
304 839 : while (inBuf.pos < inBuf.size)
305 : {
306 : size_t ret;
307 :
308 : /*
309 ECB : * If output buffer is full then forward the content to next streamer
310 : * and update the output buffer.
311 : */
312 GIC 407 : if (mystreamer->zstd_outBuf.pos >= mystreamer->zstd_outBuf.size)
313 : {
314 CBC 312 : bbstreamer_content(mystreamer->base.bbs_next, member,
315 312 : mystreamer->zstd_outBuf.dst,
316 GIC 312 : mystreamer->zstd_outBuf.pos,
317 ECB : context);
318 :
319 : /* Reset the ZSTD output buffer. */
320 GIC 312 : mystreamer->zstd_outBuf.dst = mystreamer->base.bbs_buffer.data;
321 312 : mystreamer->zstd_outBuf.size = mystreamer->base.bbs_buffer.maxlen;
322 312 : mystreamer->zstd_outBuf.pos = 0;
323 : }
324 :
325 CBC 407 : ret = ZSTD_decompressStream(mystreamer->dctx,
326 : &mystreamer->zstd_outBuf, &inBuf);
327 ECB :
328 CBC 407 : if (ZSTD_isError(ret))
329 LBC 0 : pg_log_error("could not decompress data: %s",
330 : ZSTD_getErrorName(ret));
331 : }
332 GIC 216 : }
333 ECB :
334 : /*
335 : * End-of-stream processing.
336 : */
337 : static void
338 CBC 2 : bbstreamer_zstd_decompressor_finalize(bbstreamer *streamer)
339 : {
340 GIC 2 : bbstreamer_zstd_frame *mystreamer = (bbstreamer_zstd_frame *) streamer;
341 ECB :
342 EUB : /*
343 : * End of the stream, if there is some pending data in output buffers then
344 : * we must forward it to next streamer.
345 ECB : */
346 GIC 2 : if (mystreamer->zstd_outBuf.pos > 0)
347 2 : bbstreamer_content(mystreamer->base.bbs_next, NULL,
348 2 : mystreamer->base.bbs_buffer.data,
349 : mystreamer->base.bbs_buffer.maxlen,
350 : BBSTREAMER_UNKNOWN);
351 ECB :
352 GIC 2 : bbstreamer_finalize(mystreamer->base.bbs_next);
353 CBC 2 : }
354 :
355 : /*
356 : * Free memory.
357 : */
358 : static void
359 2 : bbstreamer_zstd_decompressor_free(bbstreamer *streamer)
360 ECB : {
361 CBC 2 : bbstreamer_zstd_frame *mystreamer = (bbstreamer_zstd_frame *) streamer;
362 :
363 GIC 2 : bbstreamer_free(streamer->bbs_next);
364 2 : ZSTD_freeDCtx(mystreamer->dctx);
365 CBC 2 : pfree(streamer->bbs_buffer.data);
366 2 : pfree(streamer);
367 GIC 2 : }
368 : #endif
|