Age Owner TLA Line data Source code
1 : /*-------------------------------------------------------------------------
2 : *
3 : * bbstreamer_zstd.c
4 : *
5 : * Portions Copyright (c) 1996-2023, PostgreSQL Global Development Group
6 : *
7 : * IDENTIFICATION
8 : * src/bin/pg_basebackup/bbstreamer_zstd.c
9 : *-------------------------------------------------------------------------
10 : */
11 :
12 : #include "postgres_fe.h"
13 :
14 : #include <unistd.h>
15 :
16 : #ifdef USE_ZSTD
17 : #include <zstd.h>
18 : #endif
19 :
20 : #include "bbstreamer.h"
21 : #include "common/logging.h"
22 :
23 : #ifdef USE_ZSTD
24 :
25 : typedef struct bbstreamer_zstd_frame
26 : {
27 : bbstreamer base;
28 :
29 : ZSTD_CCtx *cctx;
30 : ZSTD_DCtx *dctx;
31 : ZSTD_outBuffer zstd_outBuf;
32 : } bbstreamer_zstd_frame;
33 :
34 : static void bbstreamer_zstd_compressor_content(bbstreamer *streamer,
35 : bbstreamer_member *member,
36 : const char *data, int len,
37 : bbstreamer_archive_context context);
38 : static void bbstreamer_zstd_compressor_finalize(bbstreamer *streamer);
39 : static void bbstreamer_zstd_compressor_free(bbstreamer *streamer);
40 :
41 : const bbstreamer_ops bbstreamer_zstd_compressor_ops = {
42 : .content = bbstreamer_zstd_compressor_content,
43 : .finalize = bbstreamer_zstd_compressor_finalize,
44 : .free = bbstreamer_zstd_compressor_free
45 : };
46 :
47 : static void bbstreamer_zstd_decompressor_content(bbstreamer *streamer,
48 : bbstreamer_member *member,
49 : const char *data, int len,
50 : bbstreamer_archive_context context);
51 : static void bbstreamer_zstd_decompressor_finalize(bbstreamer *streamer);
52 : static void bbstreamer_zstd_decompressor_free(bbstreamer *streamer);
53 :
54 : const bbstreamer_ops bbstreamer_zstd_decompressor_ops = {
55 : .content = bbstreamer_zstd_decompressor_content,
56 : .finalize = bbstreamer_zstd_decompressor_finalize,
57 : .free = bbstreamer_zstd_decompressor_free
58 : };
59 : #endif
60 :
61 : /*
62 : * Create a new base backup streamer that performs zstd compression of tar
63 : * blocks.
64 : */
65 : bbstreamer *
362 michael 66 CBC 3 : bbstreamer_zstd_compressor_new(bbstreamer *next, pg_compress_specification *compress)
67 : {
68 : #ifdef USE_ZSTD
69 : bbstreamer_zstd_frame *streamer;
70 : size_t ret;
71 :
398 rhaas 72 3 : Assert(next != NULL);
73 :
74 3 : streamer = palloc0(sizeof(bbstreamer_zstd_frame));
75 :
76 3 : *((const bbstreamer_ops **) &streamer->base.bbs_ops) =
77 : &bbstreamer_zstd_compressor_ops;
78 :
79 3 : streamer->base.bbs_next = next;
80 3 : initStringInfo(&streamer->base.bbs_buffer);
81 3 : enlargeStringInfo(&streamer->base.bbs_buffer, ZSTD_DStreamOutSize());
82 :
83 3 : streamer->cctx = ZSTD_createCCtx();
84 3 : if (!streamer->cctx)
366 tgl 85 UBC 0 : pg_fatal("could not create zstd compression context");
86 :
87 : /* Set compression level */
207 michael 88 CBC 3 : ret = ZSTD_CCtx_setParameter(streamer->cctx, ZSTD_c_compressionLevel,
89 : compress->level);
90 3 : if (ZSTD_isError(ret))
207 michael 91 UBC 0 : pg_fatal("could not set zstd compression level to %d: %s",
92 : compress->level, ZSTD_getErrorName(ret));
93 :
94 : /* Set # of workers, if specified */
362 michael 95 CBC 3 : if ((compress->options & PG_COMPRESSION_OPTION_WORKERS) != 0)
96 : {
97 : /*
98 : * On older versions of libzstd, this option does not exist, and
99 : * trying to set it will fail. Similarly for newer versions if they
100 : * are compiled without threading support.
101 : */
375 rhaas 102 1 : ret = ZSTD_CCtx_setParameter(streamer->cctx, ZSTD_c_nbWorkers,
103 : compress->workers);
104 1 : if (ZSTD_isError(ret))
366 tgl 105 UBC 0 : pg_fatal("could not set compression worker count to %d: %s",
106 : compress->workers, ZSTD_getErrorName(ret));
107 : }
108 :
3 tomas.vondra 109 GNC 3 : if ((compress->options & PG_COMPRESSION_OPTION_LONG_DISTANCE) != 0)
110 : {
111 1 : ret = ZSTD_CCtx_setParameter(streamer->cctx,
112 : ZSTD_c_enableLongDistanceMatching,
113 1 : compress->long_distance);
114 1 : if (ZSTD_isError(ret))
115 : {
3 tomas.vondra 116 UNC 0 : pg_log_error("could not set compression flag for %s: %s",
117 : "long", ZSTD_getErrorName(ret));
118 0 : exit(1);
119 : }
120 : }
121 :
398 rhaas 122 ECB : /* Initialize the ZSTD output buffer. */
398 rhaas 123 GIC 3 : streamer->zstd_outBuf.dst = streamer->base.bbs_buffer.data;
398 rhaas 124 CBC 3 : streamer->zstd_outBuf.size = streamer->base.bbs_buffer.maxlen;
398 rhaas 125 GIC 3 : streamer->zstd_outBuf.pos = 0;
398 rhaas 126 ECB :
398 rhaas 127 CBC 3 : return &streamer->base;
128 : #else
129 : pg_fatal("this build does not support compression with %s", "ZSTD");
130 : return NULL; /* keep compiler quiet */
398 rhaas 131 EUB : #endif
132 : }
133 :
134 : #ifdef USE_ZSTD
135 : /*
398 rhaas 136 ECB : * Compress the input data to output buffer.
137 : *
138 : * Find out the compression bound based on input data length for each
139 : * invocation to make sure that output buffer has enough capacity to
140 : * accommodate the compressed data. In case if the output buffer
141 : * capacity falls short of compression bound then forward the content
142 : * of output buffer to next streamer and empty the buffer.
143 : */
144 : static void
398 rhaas 145 GIC 8109 : bbstreamer_zstd_compressor_content(bbstreamer *streamer,
146 : bbstreamer_member *member,
147 : const char *data, int len,
148 : bbstreamer_archive_context context)
149 : {
150 8109 : bbstreamer_zstd_frame *mystreamer = (bbstreamer_zstd_frame *) streamer;
151 8109 : ZSTD_inBuffer inBuf = {data, len, 0};
152 :
153 24332 : while (inBuf.pos < inBuf.size)
154 : {
155 : size_t yet_to_flush;
156 8114 : size_t max_needed = ZSTD_compressBound(inBuf.size - inBuf.pos);
157 :
398 rhaas 158 ECB : /*
159 : * If the output buffer is not left with enough space, send the
160 : * compressed bytes to the next streamer, and empty the buffer.
161 : */
398 rhaas 162 GIC 8114 : if (mystreamer->zstd_outBuf.size - mystreamer->zstd_outBuf.pos <
398 rhaas 163 ECB : max_needed)
164 : {
398 rhaas 165 GIC 25 : bbstreamer_content(mystreamer->base.bbs_next, member,
398 rhaas 166 CBC 25 : mystreamer->zstd_outBuf.dst,
398 rhaas 167 GIC 25 : mystreamer->zstd_outBuf.pos,
168 : context);
398 rhaas 169 ECB :
170 : /* Reset the ZSTD output buffer. */
398 rhaas 171 GIC 25 : mystreamer->zstd_outBuf.dst = mystreamer->base.bbs_buffer.data;
172 25 : mystreamer->zstd_outBuf.size = mystreamer->base.bbs_buffer.maxlen;
173 25 : mystreamer->zstd_outBuf.pos = 0;
174 : }
398 rhaas 175 ECB :
176 : yet_to_flush =
398 rhaas 177 GIC 8114 : ZSTD_compressStream2(mystreamer->cctx, &mystreamer->zstd_outBuf,
398 rhaas 178 ECB : &inBuf, ZSTD_e_continue);
179 :
398 rhaas 180 CBC 8114 : if (ZSTD_isError(yet_to_flush))
398 rhaas 181 UIC 0 : pg_log_error("could not compress data: %s",
182 : ZSTD_getErrorName(yet_to_flush));
183 : }
398 rhaas 184 CBC 8109 : }
398 rhaas 185 ECB :
186 : /*
187 : * End-of-stream processing.
188 : */
189 : static void
398 rhaas 190 CBC 3 : bbstreamer_zstd_compressor_finalize(bbstreamer *streamer)
191 : {
398 rhaas 192 GIC 3 : bbstreamer_zstd_frame *mystreamer = (bbstreamer_zstd_frame *) streamer;
398 rhaas 193 ECB : size_t yet_to_flush;
398 rhaas 194 EUB :
195 : do
196 : {
398 rhaas 197 CBC 3 : ZSTD_inBuffer in = {NULL, 0, 0};
398 rhaas 198 GIC 3 : size_t max_needed = ZSTD_compressBound(0);
199 :
200 : /*
201 : * If the output buffer is not left with enough space, send the
202 : * compressed bytes to the next streamer, and empty the buffer.
398 rhaas 203 ECB : */
398 rhaas 204 GIC 3 : if (mystreamer->zstd_outBuf.size - mystreamer->zstd_outBuf.pos <
398 rhaas 205 ECB : max_needed)
206 : {
398 rhaas 207 UIC 0 : bbstreamer_content(mystreamer->base.bbs_next, NULL,
208 0 : mystreamer->zstd_outBuf.dst,
209 0 : mystreamer->zstd_outBuf.pos,
398 rhaas 210 ECB : BBSTREAMER_UNKNOWN);
211 :
212 : /* Reset the ZSTD output buffer. */
398 rhaas 213 UIC 0 : mystreamer->zstd_outBuf.dst = mystreamer->base.bbs_buffer.data;
214 0 : mystreamer->zstd_outBuf.size = mystreamer->base.bbs_buffer.maxlen;
215 0 : mystreamer->zstd_outBuf.pos = 0;
216 : }
398 rhaas 217 ECB :
398 rhaas 218 GIC 3 : yet_to_flush = ZSTD_compressStream2(mystreamer->cctx,
219 : &mystreamer->zstd_outBuf,
398 rhaas 220 EUB : &in, ZSTD_e_end);
221 :
398 rhaas 222 GBC 3 : if (ZSTD_isError(yet_to_flush))
398 rhaas 223 UIC 0 : pg_log_error("could not compress data: %s",
224 : ZSTD_getErrorName(yet_to_flush));
225 :
398 rhaas 226 GBC 3 : } while (yet_to_flush > 0);
398 rhaas 227 EUB :
228 : /* Make sure to pass any remaining bytes to the next streamer. */
398 rhaas 229 GIC 3 : if (mystreamer->zstd_outBuf.pos > 0)
230 3 : bbstreamer_content(mystreamer->base.bbs_next, NULL,
398 rhaas 231 CBC 3 : mystreamer->zstd_outBuf.dst,
398 rhaas 232 GIC 3 : mystreamer->zstd_outBuf.pos,
233 : BBSTREAMER_UNKNOWN);
234 :
398 rhaas 235 CBC 3 : bbstreamer_finalize(mystreamer->base.bbs_next);
398 rhaas 236 GBC 3 : }
237 :
238 : /*
398 rhaas 239 ECB : * Free memory.
240 : */
241 : static void
398 rhaas 242 CBC 3 : bbstreamer_zstd_compressor_free(bbstreamer *streamer)
398 rhaas 243 ECB : {
398 rhaas 244 CBC 3 : bbstreamer_zstd_frame *mystreamer = (bbstreamer_zstd_frame *) streamer;
398 rhaas 245 ECB :
398 rhaas 246 GIC 3 : bbstreamer_free(streamer->bbs_next);
247 3 : ZSTD_freeCCtx(mystreamer->cctx);
398 rhaas 248 CBC 3 : pfree(streamer->bbs_buffer.data);
249 3 : pfree(streamer);
398 rhaas 250 GIC 3 : }
251 : #endif
252 :
253 : /*
254 : * Create a new base backup streamer that performs decompression of zstd
398 rhaas 255 ECB : * compressed blocks.
256 : */
257 : bbstreamer *
398 rhaas 258 GIC 2 : bbstreamer_zstd_decompressor_new(bbstreamer *next)
398 rhaas 259 ECB : {
390 260 : #ifdef USE_ZSTD
398 261 : bbstreamer_zstd_frame *streamer;
262 :
398 rhaas 263 CBC 2 : Assert(next != NULL);
264 :
398 rhaas 265 GIC 2 : streamer = palloc0(sizeof(bbstreamer_zstd_frame));
266 2 : *((const bbstreamer_ops **) &streamer->base.bbs_ops) =
267 : &bbstreamer_zstd_decompressor_ops;
268 :
269 2 : streamer->base.bbs_next = next;
270 2 : initStringInfo(&streamer->base.bbs_buffer);
398 rhaas 271 CBC 2 : enlargeStringInfo(&streamer->base.bbs_buffer, ZSTD_DStreamOutSize());
272 :
398 rhaas 273 GIC 2 : streamer->dctx = ZSTD_createDCtx();
274 2 : if (!streamer->dctx)
366 tgl 275 UIC 0 : pg_fatal("could not create zstd decompression context");
398 rhaas 276 ECB :
277 : /* Initialize the ZSTD output buffer. */
398 rhaas 278 CBC 2 : streamer->zstd_outBuf.dst = streamer->base.bbs_buffer.data;
279 2 : streamer->zstd_outBuf.size = streamer->base.bbs_buffer.maxlen;
398 rhaas 280 GIC 2 : streamer->zstd_outBuf.pos = 0;
281 :
398 rhaas 282 CBC 2 : return &streamer->base;
398 rhaas 283 ECB : #else
284 : pg_fatal("this build does not support compression with %s", "ZSTD");
285 : return NULL; /* keep compiler quiet */
286 : #endif
287 : }
398 rhaas 288 EUB :
289 : #ifdef USE_ZSTD
290 : /*
398 rhaas 291 ECB : * Decompress the input data to output buffer until we run out of input
292 : * data. Each time the output buffer is full, pass on the decompressed data
293 : * to the next streamer.
294 : */
295 : static void
398 rhaas 296 GIC 216 : bbstreamer_zstd_decompressor_content(bbstreamer *streamer,
297 : bbstreamer_member *member,
298 : const char *data, int len,
299 : bbstreamer_archive_context context)
300 : {
301 216 : bbstreamer_zstd_frame *mystreamer = (bbstreamer_zstd_frame *) streamer;
302 216 : ZSTD_inBuffer inBuf = {data, len, 0};
303 :
304 839 : while (inBuf.pos < inBuf.size)
305 : {
306 : size_t ret;
307 :
308 : /*
398 rhaas 309 ECB : * If output buffer is full then forward the content to next streamer
310 : * and update the output buffer.
311 : */
398 rhaas 312 GIC 407 : if (mystreamer->zstd_outBuf.pos >= mystreamer->zstd_outBuf.size)
313 : {
398 rhaas 314 CBC 312 : bbstreamer_content(mystreamer->base.bbs_next, member,
315 312 : mystreamer->zstd_outBuf.dst,
398 rhaas 316 GIC 312 : mystreamer->zstd_outBuf.pos,
398 rhaas 317 ECB : context);
318 :
319 : /* Reset the ZSTD output buffer. */
398 rhaas 320 GIC 312 : mystreamer->zstd_outBuf.dst = mystreamer->base.bbs_buffer.data;
321 312 : mystreamer->zstd_outBuf.size = mystreamer->base.bbs_buffer.maxlen;
322 312 : mystreamer->zstd_outBuf.pos = 0;
323 : }
324 :
398 rhaas 325 CBC 407 : ret = ZSTD_decompressStream(mystreamer->dctx,
326 : &mystreamer->zstd_outBuf, &inBuf);
398 rhaas 327 ECB :
398 rhaas 328 CBC 407 : if (ZSTD_isError(ret))
366 tgl 329 LBC 0 : pg_log_error("could not decompress data: %s",
330 : ZSTD_getErrorName(ret));
331 : }
398 rhaas 332 GIC 216 : }
398 rhaas 333 ECB :
334 : /*
335 : * End-of-stream processing.
336 : */
337 : static void
398 rhaas 338 CBC 2 : bbstreamer_zstd_decompressor_finalize(bbstreamer *streamer)
339 : {
398 rhaas 340 GIC 2 : bbstreamer_zstd_frame *mystreamer = (bbstreamer_zstd_frame *) streamer;
398 rhaas 341 ECB :
398 rhaas 342 EUB : /*
343 : * End of the stream, if there is some pending data in output buffers then
344 : * we must forward it to next streamer.
398 rhaas 345 ECB : */
398 rhaas 346 GIC 2 : if (mystreamer->zstd_outBuf.pos > 0)
347 2 : bbstreamer_content(mystreamer->base.bbs_next, NULL,
348 2 : mystreamer->base.bbs_buffer.data,
349 : mystreamer->base.bbs_buffer.maxlen,
350 : BBSTREAMER_UNKNOWN);
398 rhaas 351 ECB :
398 rhaas 352 GIC 2 : bbstreamer_finalize(mystreamer->base.bbs_next);
398 rhaas 353 CBC 2 : }
354 :
355 : /*
356 : * Free memory.
357 : */
358 : static void
359 2 : bbstreamer_zstd_decompressor_free(bbstreamer *streamer)
398 rhaas 360 ECB : {
398 rhaas 361 CBC 2 : bbstreamer_zstd_frame *mystreamer = (bbstreamer_zstd_frame *) streamer;
362 :
398 rhaas 363 GIC 2 : bbstreamer_free(streamer->bbs_next);
364 2 : ZSTD_freeDCtx(mystreamer->dctx);
398 rhaas 365 CBC 2 : pfree(streamer->bbs_buffer.data);
366 2 : pfree(streamer);
398 rhaas 367 GIC 2 : }
368 : #endif
|