Age Owner Branch data TLA Line data Source code
1 : : /*-------------------------------------------------------------------------
2 : : *
3 : : * compress_zstd.c
4 : : * Routines for archivers to write a Zstd compressed data stream.
5 : : *
6 : : * Portions Copyright (c) 1996-2024, PostgreSQL Global Development Group
7 : : * Portions Copyright (c) 1994, Regents of the University of California
8 : : *
9 : : * IDENTIFICATION
10 : : * src/bin/pg_dump/compress_zstd.c
11 : : *
12 : : *-------------------------------------------------------------------------
13 : : */
14 : :
15 : : #include "postgres_fe.h"
16 : :
17 : : #include "compress_zstd.h"
18 : : #include "pg_backup_utils.h"
19 : :
20 : : #ifndef USE_ZSTD
21 : :
22 : : void
23 : : InitCompressorZstd(CompressorState *cs, const pg_compress_specification compression_spec)
24 : : {
25 : : pg_fatal("this build does not support compression with %s", "ZSTD");
26 : : }
27 : :
28 : : void
29 : : InitCompressFileHandleZstd(CompressFileHandle *CFH, const pg_compress_specification compression_spec)
30 : : {
31 : : pg_fatal("this build does not support compression with %s", "ZSTD");
32 : : }
33 : :
34 : : #else
35 : :
36 : : #include <zstd.h>
37 : :
38 : : typedef struct ZstdCompressorState
39 : : {
40 : : /* This is a normal file to which we read/write compressed data */
41 : : FILE *fp;
42 : :
43 : : ZSTD_CStream *cstream;
44 : : ZSTD_DStream *dstream;
45 : : ZSTD_outBuffer output;
46 : : ZSTD_inBuffer input;
47 : :
48 : : /* pointer to a static string like from strerror(), for Zstd_write() */
49 : : const char *zstderror;
50 : : } ZstdCompressorState;
51 : :
52 : : static ZSTD_CStream *_ZstdCStreamParams(pg_compress_specification compress);
53 : : static void EndCompressorZstd(ArchiveHandle *AH, CompressorState *cs);
54 : : static void WriteDataToArchiveZstd(ArchiveHandle *AH, CompressorState *cs,
55 : : const void *data, size_t dLen);
56 : : static void ReadDataFromArchiveZstd(ArchiveHandle *AH, CompressorState *cs);
57 : :
58 : : static void
375 tomas.vondra@postgre 59 :CBC 66 : _Zstd_CCtx_setParam_or_die(ZSTD_CStream *cstream,
60 : : ZSTD_cParameter param, int value, char *paramname)
61 : : {
62 : : size_t res;
63 : :
64 : 66 : res = ZSTD_CCtx_setParameter(cstream, param, value);
65 [ - + ]: 66 : if (ZSTD_isError(res))
333 alvherre@alvh.no-ip. 66 :UBC 0 : pg_fatal("could not set compression parameter \"%s\": %s",
67 : : paramname, ZSTD_getErrorName(res));
375 tomas.vondra@postgre 68 :CBC 66 : }
69 : :
70 : : /* Return a compression stream with parameters set per argument */
71 : : static ZSTD_CStream *
72 : 65 : _ZstdCStreamParams(pg_compress_specification compress)
73 : : {
74 : : ZSTD_CStream *cstream;
75 : :
76 : 65 : cstream = ZSTD_createCStream();
77 [ - + ]: 65 : if (cstream == NULL)
375 tomas.vondra@postgre 78 :UBC 0 : pg_fatal("could not initialize compression library");
79 : :
375 tomas.vondra@postgre 80 :CBC 65 : _Zstd_CCtx_setParam_or_die(cstream, ZSTD_c_compressionLevel,
81 : : compress.level, "level");
82 : :
374 83 [ + + ]: 65 : if (compress.options & PG_COMPRESSION_OPTION_LONG_DISTANCE)
84 : 1 : _Zstd_CCtx_setParam_or_die(cstream,
85 : : ZSTD_c_enableLongDistanceMatching,
331 tgl@sss.pgh.pa.us 86 : 1 : compress.long_distance, "long");
87 : :
375 tomas.vondra@postgre 88 : 65 : return cstream;
89 : : }
90 : :
91 : : /* Helper function for WriteDataToArchiveZstd and EndCompressorZstd */
92 : : static void
93 : 93 : _ZstdWriteCommon(ArchiveHandle *AH, CompressorState *cs, bool flush)
94 : : {
95 : 93 : ZstdCompressorState *zstdcs = (ZstdCompressorState *) cs->private_data;
96 : 93 : ZSTD_inBuffer *input = &zstdcs->input;
97 : 93 : ZSTD_outBuffer *output = &zstdcs->output;
98 : :
99 : : /* Loop while there's any input or until flushed */
100 [ + + + - ]: 93 : while (input->pos != input->size || flush)
101 : : {
102 : : size_t res;
103 : :
104 : 93 : output->pos = 0;
105 [ + + ]: 93 : res = ZSTD_compressStream2(zstdcs->cstream, output,
106 : : input, flush ? ZSTD_e_end : ZSTD_e_continue);
107 : :
108 [ - + ]: 93 : if (ZSTD_isError(res))
375 tomas.vondra@postgre 109 :UBC 0 : pg_fatal("could not compress data: %s", ZSTD_getErrorName(res));
110 : :
111 : : /*
112 : : * Extra paranoia: avoid zero-length chunks, since a zero length chunk
113 : : * is the EOF marker in the custom format. This should never happen
114 : : * but...
115 : : */
375 tomas.vondra@postgre 116 [ + + ]:CBC 93 : if (output->pos > 0)
117 : 32 : cs->writeF(AH, output->dst, output->pos);
118 : :
119 [ + - ]: 93 : if (res == 0)
120 : 93 : break; /* End of frame or all input consumed */
121 : : }
122 : 93 : }
123 : :
124 : : static void
125 : 64 : EndCompressorZstd(ArchiveHandle *AH, CompressorState *cs)
126 : : {
127 : 64 : ZstdCompressorState *zstdcs = (ZstdCompressorState *) cs->private_data;
128 : :
129 [ + + ]: 64 : if (cs->readF != NULL)
130 : : {
131 [ - + ]: 32 : Assert(zstdcs->cstream == NULL);
132 : 32 : ZSTD_freeDStream(zstdcs->dstream);
133 : 32 : pg_free(unconstify(void *, zstdcs->input.src));
134 : : }
135 [ + - ]: 32 : else if (cs->writeF != NULL)
136 : : {
137 [ - + ]: 32 : Assert(zstdcs->dstream == NULL);
138 : 32 : _ZstdWriteCommon(AH, cs, true);
139 : 32 : ZSTD_freeCStream(zstdcs->cstream);
140 : 32 : pg_free(zstdcs->output.dst);
141 : : }
142 : :
143 : 64 : pg_free(zstdcs);
144 : 64 : }
145 : :
146 : : static void
147 : 61 : WriteDataToArchiveZstd(ArchiveHandle *AH, CompressorState *cs,
148 : : const void *data, size_t dLen)
149 : : {
150 : 61 : ZstdCompressorState *zstdcs = (ZstdCompressorState *) cs->private_data;
151 : :
152 : 61 : zstdcs->input.src = data;
153 : 61 : zstdcs->input.size = dLen;
154 : 61 : zstdcs->input.pos = 0;
155 : :
156 : 61 : _ZstdWriteCommon(AH, cs, false);
157 : 61 : }
158 : :
159 : : static void
160 : 32 : ReadDataFromArchiveZstd(ArchiveHandle *AH, CompressorState *cs)
161 : : {
162 : 32 : ZstdCompressorState *zstdcs = (ZstdCompressorState *) cs->private_data;
163 : 32 : ZSTD_outBuffer *output = &zstdcs->output;
164 : 32 : ZSTD_inBuffer *input = &zstdcs->input;
165 : 32 : size_t input_allocated_size = ZSTD_DStreamInSize();
166 : : size_t res;
167 : :
168 : : for (;;)
169 : 32 : {
170 : : size_t cnt;
171 : :
172 : : /*
173 : : * Read compressed data. Note that readF can resize the buffer; the
174 : : * new size is tracked and used for future loops.
175 : : */
176 : 64 : input->size = input_allocated_size;
177 : 64 : cnt = cs->readF(AH, (char **) unconstify(void **, &input->src), &input->size);
178 : :
179 : : /* ensure that readF didn't *shrink* the buffer */
180 [ - + ]: 64 : Assert(input->size >= input_allocated_size);
181 : 64 : input_allocated_size = input->size;
182 : 64 : input->size = cnt;
183 : 64 : input->pos = 0;
184 : :
185 [ + + ]: 64 : if (cnt == 0)
186 : 32 : break;
187 : :
188 : : /* Now decompress */
189 [ + - ]: 32 : while (input->pos < input->size)
190 : : {
191 : 32 : output->pos = 0;
192 : 32 : res = ZSTD_decompressStream(zstdcs->dstream, output, input);
193 [ - + ]: 32 : if (ZSTD_isError(res))
375 tomas.vondra@postgre 194 :UBC 0 : pg_fatal("could not decompress data: %s", ZSTD_getErrorName(res));
195 : :
196 : : /*
197 : : * then write the decompressed data to the output handle
198 : : */
375 tomas.vondra@postgre 199 :CBC 32 : ((char *) output->dst)[output->pos] = '\0';
200 : 32 : ahwrite(output->dst, 1, output->pos, AH);
201 : :
202 [ + - ]: 32 : if (res == 0)
203 : 32 : break; /* End of frame */
204 : : }
205 : : }
206 : 32 : }
207 : :
208 : : /* Public routine that supports Zstd compressed data I/O */
209 : : void
210 : 64 : InitCompressorZstd(CompressorState *cs,
211 : : const pg_compress_specification compression_spec)
212 : : {
213 : : ZstdCompressorState *zstdcs;
214 : :
215 : 64 : cs->readData = ReadDataFromArchiveZstd;
216 : 64 : cs->writeData = WriteDataToArchiveZstd;
217 : 64 : cs->end = EndCompressorZstd;
218 : :
219 : 64 : cs->compression_spec = compression_spec;
220 : :
221 : 64 : zstdcs = (ZstdCompressorState *) pg_malloc0(sizeof(*zstdcs));
222 : 64 : cs->private_data = zstdcs;
223 : :
224 : : /* We expect that exactly one of readF/writeF is specified */
225 [ - + ]: 64 : Assert((cs->readF == NULL) != (cs->writeF == NULL));
226 : :
227 [ + + ]: 64 : if (cs->readF != NULL)
228 : : {
229 : 32 : zstdcs->dstream = ZSTD_createDStream();
230 [ - + ]: 32 : if (zstdcs->dstream == NULL)
375 tomas.vondra@postgre 231 :UBC 0 : pg_fatal("could not initialize compression library");
232 : :
375 tomas.vondra@postgre 233 :CBC 32 : zstdcs->input.size = ZSTD_DStreamInSize();
234 : 32 : zstdcs->input.src = pg_malloc(zstdcs->input.size);
235 : :
236 : : /*
237 : : * output.size is the buffer size we tell zstd it can output to.
238 : : * Allocate an additional byte such that ReadDataFromArchiveZstd() can
239 : : * call ahwrite() with a null-terminated string, which is an optimized
240 : : * case in ExecuteSqlCommandBuf().
241 : : */
242 : 32 : zstdcs->output.size = ZSTD_DStreamOutSize();
243 : 32 : zstdcs->output.dst = pg_malloc(zstdcs->output.size + 1);
244 : : }
245 [ + - ]: 32 : else if (cs->writeF != NULL)
246 : : {
247 : 32 : zstdcs->cstream = _ZstdCStreamParams(cs->compression_spec);
248 : :
249 : 32 : zstdcs->output.size = ZSTD_CStreamOutSize();
250 : 32 : zstdcs->output.dst = pg_malloc(zstdcs->output.size);
251 : 32 : zstdcs->output.pos = 0;
252 : : }
253 : 64 : }
254 : :
255 : : /*
256 : : * Compressed stream API
257 : : */
258 : :
259 : : static bool
260 : 110 : Zstd_read(void *ptr, size_t size, size_t *rdsize, CompressFileHandle *CFH)
261 : : {
262 : 110 : ZstdCompressorState *zstdcs = (ZstdCompressorState *) CFH->private_data;
263 : 110 : ZSTD_inBuffer *input = &zstdcs->input;
264 : 110 : ZSTD_outBuffer *output = &zstdcs->output;
265 : 110 : size_t input_allocated_size = ZSTD_DStreamInSize();
266 : : size_t res,
267 : : cnt;
268 : :
269 : 110 : output->size = size;
270 : 110 : output->dst = ptr;
271 : 110 : output->pos = 0;
272 : :
273 : : for (;;)
274 : : {
275 [ - + ]: 142 : Assert(input->pos <= input->size);
276 [ - + ]: 142 : Assert(input->size <= input_allocated_size);
277 : :
278 : : /*
279 : : * If the input is completely consumed, start back at the beginning
280 : : */
281 [ + + ]: 142 : if (input->pos == input->size)
282 : : {
283 : : /* input->size is size produced by "fread" */
284 : 99 : input->size = 0;
285 : : /* input->pos is position consumed by decompress */
286 : 99 : input->pos = 0;
287 : : }
288 : :
289 : : /* read compressed data if we must produce more input */
290 [ + + ]: 142 : if (input->pos == input->size)
291 : : {
292 : 99 : cnt = fread(unconstify(void *, input->src), 1, input_allocated_size, zstdcs->fp);
293 : 99 : input->size = cnt;
294 : :
295 [ - + ]: 99 : Assert(cnt <= input_allocated_size);
296 : :
297 : : /* If we have no more input to consume, we're done */
298 [ + + ]: 99 : if (cnt == 0)
299 : 65 : break;
300 : : }
301 : :
302 [ + - ]: 77 : while (input->pos < input->size)
303 : : {
304 : : /* now decompress */
305 : 77 : res = ZSTD_decompressStream(zstdcs->dstream, output, input);
306 : :
307 [ - + ]: 77 : if (ZSTD_isError(res))
375 tomas.vondra@postgre 308 :UBC 0 : pg_fatal("could not decompress data: %s", ZSTD_getErrorName(res));
309 : :
375 tomas.vondra@postgre 310 [ + + ]:CBC 77 : if (output->pos == output->size)
311 : 45 : break; /* No more room for output */
312 : :
313 [ + - ]: 32 : if (res == 0)
314 : 32 : break; /* End of frame */
315 : : }
316 : :
317 [ + + ]: 77 : if (output->pos == output->size)
318 : 45 : break; /* We read all the data that fits */
319 : : }
320 : :
321 [ + - ]: 110 : if (rdsize != NULL)
322 : 110 : *rdsize = output->pos;
323 : :
324 : 110 : return true;
325 : : }
326 : :
327 : : static bool
328 : 1646 : Zstd_write(const void *ptr, size_t size, CompressFileHandle *CFH)
329 : : {
330 : 1646 : ZstdCompressorState *zstdcs = (ZstdCompressorState *) CFH->private_data;
331 : 1646 : ZSTD_inBuffer *input = &zstdcs->input;
332 : 1646 : ZSTD_outBuffer *output = &zstdcs->output;
333 : : size_t res,
334 : : cnt;
335 : :
336 : 1646 : input->src = ptr;
337 : 1646 : input->size = size;
338 : 1646 : input->pos = 0;
339 : :
340 : : /* Consume all input, to be flushed later */
341 [ + + ]: 3292 : while (input->pos != input->size)
342 : : {
343 : 1646 : output->pos = 0;
344 : 1646 : res = ZSTD_compressStream2(zstdcs->cstream, output, input, ZSTD_e_continue);
345 [ - + ]: 1646 : if (ZSTD_isError(res))
346 : : {
375 tomas.vondra@postgre 347 :UBC 0 : zstdcs->zstderror = ZSTD_getErrorName(res);
348 : 0 : return false;
349 : : }
350 : :
375 tomas.vondra@postgre 351 :CBC 1646 : cnt = fwrite(output->dst, 1, output->pos, zstdcs->fp);
352 [ - + ]: 1646 : if (cnt != output->pos)
353 : : {
375 tomas.vondra@postgre 354 :UBC 0 : zstdcs->zstderror = strerror(errno);
355 : 0 : return false;
356 : : }
357 : : }
358 : :
375 tomas.vondra@postgre 359 :CBC 1646 : return size;
360 : : }
361 : :
362 : : static int
375 tomas.vondra@postgre 363 :UBC 0 : Zstd_getc(CompressFileHandle *CFH)
364 : : {
365 : 0 : ZstdCompressorState *zstdcs = (ZstdCompressorState *) CFH->private_data;
366 : : int ret;
367 : :
368 [ # # ]: 0 : if (CFH->read_func(&ret, 1, NULL, CFH) != 1)
369 : : {
370 [ # # ]: 0 : if (feof(zstdcs->fp))
371 : 0 : pg_fatal("could not read from input file: end of file");
372 : : else
373 : 0 : pg_fatal("could not read from input file: %m");
374 : : }
375 : 0 : return ret;
376 : : }
377 : :
378 : : static char *
375 tomas.vondra@postgre 379 :CBC 4 : Zstd_gets(char *buf, int len, CompressFileHandle *CFH)
380 : : {
381 : : int i;
382 : :
383 [ - + ]: 4 : Assert(len > 0);
384 : :
385 : : /*
386 : : * Read one byte at a time until newline or EOF. This is only used to read
387 : : * the list of LOs, and the I/O is buffered anyway.
388 : : */
389 [ + - ]: 44 : for (i = 0; i < len - 1; ++i)
390 : : {
391 : : size_t readsz;
392 : :
393 [ - + ]: 44 : if (!CFH->read_func(&buf[i], 1, &readsz, CFH))
375 tomas.vondra@postgre 394 :UBC 0 : break;
375 tomas.vondra@postgre 395 [ + + ]:CBC 44 : if (readsz != 1)
396 : 2 : break;
397 [ + + ]: 42 : if (buf[i] == '\n')
398 : : {
399 : 2 : ++i;
400 : 2 : break;
401 : : }
402 : : }
403 : 4 : buf[i] = '\0';
404 [ + + ]: 4 : return i > 0 ? buf : NULL;
405 : : }
406 : :
407 : : static bool
408 : 67 : Zstd_close(CompressFileHandle *CFH)
409 : : {
410 : 67 : ZstdCompressorState *zstdcs = (ZstdCompressorState *) CFH->private_data;
411 : :
412 [ + + ]: 67 : if (zstdcs->cstream)
413 : : {
414 : : size_t res,
415 : : cnt;
416 : 33 : ZSTD_inBuffer *input = &zstdcs->input;
417 : 33 : ZSTD_outBuffer *output = &zstdcs->output;
418 : :
419 : : /* Loop until the compression buffers are fully consumed */
420 : : for (;;)
421 : : {
422 : 33 : output->pos = 0;
423 : 33 : res = ZSTD_compressStream2(zstdcs->cstream, output, input, ZSTD_e_end);
424 [ - + ]: 33 : if (ZSTD_isError(res))
425 : : {
375 tomas.vondra@postgre 426 :UBC 0 : zstdcs->zstderror = ZSTD_getErrorName(res);
427 : 0 : return false;
428 : : }
429 : :
375 tomas.vondra@postgre 430 :CBC 33 : cnt = fwrite(output->dst, 1, output->pos, zstdcs->fp);
431 [ - + ]: 33 : if (cnt != output->pos)
432 : : {
375 tomas.vondra@postgre 433 :UBC 0 : zstdcs->zstderror = strerror(errno);
434 : 0 : return false;
435 : : }
436 : :
375 tomas.vondra@postgre 437 [ + - ]:CBC 33 : if (res == 0)
438 : 33 : break; /* End of frame */
439 : : }
440 : :
441 : 33 : ZSTD_freeCStream(zstdcs->cstream);
442 : 33 : pg_free(zstdcs->output.dst);
443 : : }
444 : :
445 [ + + ]: 67 : if (zstdcs->dstream)
446 : : {
447 : 34 : ZSTD_freeDStream(zstdcs->dstream);
448 : 34 : pg_free(unconstify(void *, zstdcs->input.src));
449 : : }
450 : :
451 [ - + ]: 67 : if (fclose(zstdcs->fp) != 0)
375 tomas.vondra@postgre 452 :UBC 0 : return false;
453 : :
375 tomas.vondra@postgre 454 :CBC 67 : pg_free(zstdcs);
455 : 67 : return true;
456 : : }
457 : :
458 : : static bool
459 : 2 : Zstd_eof(CompressFileHandle *CFH)
460 : : {
461 : 2 : ZstdCompressorState *zstdcs = (ZstdCompressorState *) CFH->private_data;
462 : :
463 : 2 : return feof(zstdcs->fp);
464 : : }
465 : :
466 : : static bool
467 : 67 : Zstd_open(const char *path, int fd, const char *mode,
468 : : CompressFileHandle *CFH)
469 : : {
470 : : FILE *fp;
471 : : ZstdCompressorState *zstdcs;
472 : :
473 [ - + ]: 67 : if (fd >= 0)
375 tomas.vondra@postgre 474 :UBC 0 : fp = fdopen(fd, mode);
475 : : else
375 tomas.vondra@postgre 476 :CBC 67 : fp = fopen(path, mode);
477 : :
478 [ - + ]: 67 : if (fp == NULL)
375 tomas.vondra@postgre 479 :UBC 0 : return false;
480 : :
375 tomas.vondra@postgre 481 :CBC 67 : zstdcs = (ZstdCompressorState *) pg_malloc0(sizeof(*zstdcs));
482 : 67 : CFH->private_data = zstdcs;
483 : 67 : zstdcs->fp = fp;
484 : :
485 [ + + ]: 67 : if (mode[0] == 'r')
486 : : {
487 : 34 : zstdcs->input.src = pg_malloc0(ZSTD_DStreamInSize());
488 : 34 : zstdcs->dstream = ZSTD_createDStream();
489 [ - + ]: 34 : if (zstdcs->dstream == NULL)
375 tomas.vondra@postgre 490 :UBC 0 : pg_fatal("could not initialize compression library");
491 : : }
375 tomas.vondra@postgre 492 [ - + - - ]:CBC 33 : else if (mode[0] == 'w' || mode[0] == 'a')
493 : : {
494 : 33 : zstdcs->output.size = ZSTD_CStreamOutSize();
495 : 33 : zstdcs->output.dst = pg_malloc0(zstdcs->output.size);
496 : 33 : zstdcs->cstream = _ZstdCStreamParams(CFH->compression_spec);
497 [ - + ]: 33 : if (zstdcs->cstream == NULL)
375 tomas.vondra@postgre 498 :UBC 0 : pg_fatal("could not initialize compression library");
499 : : }
500 : : else
333 alvherre@alvh.no-ip. 501 : 0 : pg_fatal("unhandled mode \"%s\"", mode);
502 : :
375 tomas.vondra@postgre 503 :CBC 67 : return true;
504 : : }
505 : :
506 : : static bool
507 : 32 : Zstd_open_write(const char *path, const char *mode, CompressFileHandle *CFH)
508 : : {
509 : : char fname[MAXPGPATH];
510 : :
511 : 32 : sprintf(fname, "%s.zst", path);
512 : 32 : return CFH->open_func(fname, -1, mode, CFH);
513 : : }
514 : :
515 : : static const char *
375 tomas.vondra@postgre 516 :UBC 0 : Zstd_get_error(CompressFileHandle *CFH)
517 : : {
518 : 0 : ZstdCompressorState *zstdcs = (ZstdCompressorState *) CFH->private_data;
519 : :
520 : 0 : return zstdcs->zstderror;
521 : : }
522 : :
523 : : void
375 tomas.vondra@postgre 524 :CBC 67 : InitCompressFileHandleZstd(CompressFileHandle *CFH,
525 : : const pg_compress_specification compression_spec)
526 : : {
527 : 67 : CFH->open_func = Zstd_open;
528 : 67 : CFH->open_write_func = Zstd_open_write;
529 : 67 : CFH->read_func = Zstd_read;
530 : 67 : CFH->write_func = Zstd_write;
531 : 67 : CFH->gets_func = Zstd_gets;
532 : 67 : CFH->getc_func = Zstd_getc;
533 : 67 : CFH->close_func = Zstd_close;
534 : 67 : CFH->eof_func = Zstd_eof;
535 : 67 : CFH->get_error_func = Zstd_get_error;
536 : :
537 : 67 : CFH->compression_spec = compression_spec;
538 : :
539 : 67 : CFH->private_data = NULL;
540 : 67 : }
541 : :
542 : : #endif /* USE_ZSTD */
|