Age Owner TLA Line data Source code
1 : /*-------------------------------------------------------------------------
2 : *
3 : * compress_zstd.c
4 : * Routines for archivers to write a Zstd compressed data stream.
5 : *
6 : * Portions Copyright (c) 1996-2023, PostgreSQL Global Development Group
7 : * Portions Copyright (c) 1994, Regents of the University of California
8 : *
9 : * IDENTIFICATION
10 : * src/bin/pg_dump/compress_zstd.c
11 : *
12 : *-------------------------------------------------------------------------
13 : */
14 :
15 : #include "postgres_fe.h"
16 :
17 : #include "pg_backup_utils.h"
18 : #include "compress_zstd.h"
19 :
20 : #ifndef USE_ZSTD
21 :
22 : void
23 : InitCompressorZstd(CompressorState *cs, const pg_compress_specification compression_spec)
24 : {
25 : pg_fatal("this build does not support compression with %s", "ZSTD");
26 : }
27 :
28 : void
29 : InitCompressFileHandleZstd(CompressFileHandle *CFH, const pg_compress_specification compression_spec)
30 : {
31 : pg_fatal("this build does not support compression with %s", "ZSTD");
32 : }
33 :
34 : #else
35 :
36 : #include <zstd.h>
37 :
38 : typedef struct ZstdCompressorState
39 : {
40 : /* This is a normal file to which we read/write compressed data */
41 : FILE *fp;
42 :
43 : ZSTD_CStream *cstream;
44 : ZSTD_DStream *dstream;
45 : ZSTD_outBuffer output;
46 : ZSTD_inBuffer input;
47 :
48 : /* pointer to a static string like from strerror(), for Zstd_write() */
49 : const char *zstderror;
50 : } ZstdCompressorState;
51 :
52 : static ZSTD_CStream *_ZstdCStreamParams(pg_compress_specification compress);
53 : static void EndCompressorZstd(ArchiveHandle *AH, CompressorState *cs);
54 : static void WriteDataToArchiveZstd(ArchiveHandle *AH, CompressorState *cs,
55 : const void *data, size_t dLen);
56 : static void ReadDataFromArchiveZstd(ArchiveHandle *AH, CompressorState *cs);
57 :
58 : static void
4 tomas.vondra 59 GNC 54 : _Zstd_CCtx_setParam_or_die(ZSTD_CStream *cstream,
60 : ZSTD_cParameter param, int value, char *paramname)
61 : {
62 : size_t res;
63 :
64 54 : res = ZSTD_CCtx_setParameter(cstream, param, value);
65 54 : if (ZSTD_isError(res))
4 tomas.vondra 66 UNC 0 : pg_fatal("could not set compression parameter: \"%s\": %s",
67 : paramname, ZSTD_getErrorName(res));
4 tomas.vondra 68 GNC 54 : }
69 :
70 : /* Return a compression stream with parameters set per argument */
71 : static ZSTD_CStream *
72 53 : _ZstdCStreamParams(pg_compress_specification compress)
73 : {
74 : ZSTD_CStream *cstream;
75 :
76 53 : cstream = ZSTD_createCStream();
77 53 : if (cstream == NULL)
4 tomas.vondra 78 UNC 0 : pg_fatal("could not initialize compression library");
79 :
4 tomas.vondra 80 GNC 53 : _Zstd_CCtx_setParam_or_die(cstream, ZSTD_c_compressionLevel,
81 : compress.level, "level");
82 :
3 83 53 : if (compress.options & PG_COMPRESSION_OPTION_LONG_DISTANCE)
84 1 : _Zstd_CCtx_setParam_or_die(cstream,
85 : ZSTD_c_enableLongDistanceMatching,
86 1 : compress.long_distance, "long");
87 :
4 88 53 : return cstream;
89 : }
90 :
91 : /* Helper function for WriteDataToArchiveZstd and EndCompressorZstd */
92 : static void
93 80 : _ZstdWriteCommon(ArchiveHandle *AH, CompressorState *cs, bool flush)
94 : {
95 80 : ZstdCompressorState *zstdcs = (ZstdCompressorState *) cs->private_data;
96 80 : ZSTD_inBuffer *input = &zstdcs->input;
97 80 : ZSTD_outBuffer *output = &zstdcs->output;
98 :
99 : /* Loop while there's any input or until flushed */
100 80 : while (input->pos != input->size || flush)
101 : {
102 : size_t res;
103 :
104 80 : output->pos = 0;
105 80 : res = ZSTD_compressStream2(zstdcs->cstream, output,
106 : input, flush ? ZSTD_e_end : ZSTD_e_continue);
107 :
108 80 : if (ZSTD_isError(res))
4 tomas.vondra 109 UNC 0 : pg_fatal("could not compress data: %s", ZSTD_getErrorName(res));
110 :
111 : /*
112 : * Extra paranoia: avoid zero-length chunks, since a zero length chunk
113 : * is the EOF marker in the custom format. This should never happen
114 : * but...
115 : */
4 tomas.vondra 116 GNC 80 : if (output->pos > 0)
117 26 : cs->writeF(AH, output->dst, output->pos);
118 :
119 80 : if (res == 0)
120 80 : break; /* End of frame or all input consumed */
121 : }
122 80 : }
123 :
124 : static void
125 52 : EndCompressorZstd(ArchiveHandle *AH, CompressorState *cs)
126 : {
127 52 : ZstdCompressorState *zstdcs = (ZstdCompressorState *) cs->private_data;
128 :
129 52 : if (cs->readF != NULL)
130 : {
131 26 : Assert(zstdcs->cstream == NULL);
132 26 : ZSTD_freeDStream(zstdcs->dstream);
133 26 : pg_free(unconstify(void *, zstdcs->input.src));
134 : }
135 26 : else if (cs->writeF != NULL)
136 : {
137 26 : Assert(zstdcs->dstream == NULL);
138 26 : _ZstdWriteCommon(AH, cs, true);
139 26 : ZSTD_freeCStream(zstdcs->cstream);
140 26 : pg_free(zstdcs->output.dst);
141 : }
142 :
143 52 : pg_free(zstdcs);
144 52 : }
145 :
146 : static void
147 54 : WriteDataToArchiveZstd(ArchiveHandle *AH, CompressorState *cs,
148 : const void *data, size_t dLen)
149 : {
150 54 : ZstdCompressorState *zstdcs = (ZstdCompressorState *) cs->private_data;
151 :
152 54 : zstdcs->input.src = data;
153 54 : zstdcs->input.size = dLen;
154 54 : zstdcs->input.pos = 0;
155 :
156 54 : _ZstdWriteCommon(AH, cs, false);
157 54 : }
158 :
159 : static void
160 26 : ReadDataFromArchiveZstd(ArchiveHandle *AH, CompressorState *cs)
161 : {
162 26 : ZstdCompressorState *zstdcs = (ZstdCompressorState *) cs->private_data;
163 26 : ZSTD_outBuffer *output = &zstdcs->output;
164 26 : ZSTD_inBuffer *input = &zstdcs->input;
165 26 : size_t input_allocated_size = ZSTD_DStreamInSize();
166 : size_t res;
167 :
168 : for (;;)
169 26 : {
170 : size_t cnt;
171 :
172 : /*
173 : * Read compressed data. Note that readF can resize the buffer; the
174 : * new size is tracked and used for future loops.
175 : */
176 52 : input->size = input_allocated_size;
177 52 : cnt = cs->readF(AH, (char **) unconstify(void **, &input->src), &input->size);
178 :
179 : /* ensure that readF didn't *shrink* the buffer */
180 52 : Assert(input->size >= input_allocated_size);
181 52 : input_allocated_size = input->size;
182 52 : input->size = cnt;
183 52 : input->pos = 0;
184 :
185 52 : if (cnt == 0)
186 26 : break;
187 :
188 : /* Now decompress */
189 26 : while (input->pos < input->size)
190 : {
191 26 : output->pos = 0;
192 26 : res = ZSTD_decompressStream(zstdcs->dstream, output, input);
193 26 : if (ZSTD_isError(res))
4 tomas.vondra 194 UNC 0 : pg_fatal("could not decompress data: %s", ZSTD_getErrorName(res));
195 :
196 : /*
197 : * then write the decompressed data to the output handle
198 : */
4 tomas.vondra 199 GNC 26 : ((char *) output->dst)[output->pos] = '\0';
200 26 : ahwrite(output->dst, 1, output->pos, AH);
201 :
202 26 : if (res == 0)
203 26 : break; /* End of frame */
204 : }
205 : }
206 26 : }
207 :
208 : /* Public routine that supports Zstd compressed data I/O */
209 : void
210 52 : InitCompressorZstd(CompressorState *cs,
211 : const pg_compress_specification compression_spec)
212 : {
213 : ZstdCompressorState *zstdcs;
214 :
215 52 : cs->readData = ReadDataFromArchiveZstd;
216 52 : cs->writeData = WriteDataToArchiveZstd;
217 52 : cs->end = EndCompressorZstd;
218 :
219 52 : cs->compression_spec = compression_spec;
220 :
221 52 : zstdcs = (ZstdCompressorState *) pg_malloc0(sizeof(*zstdcs));
222 52 : cs->private_data = zstdcs;
223 :
224 : /* We expect that exactly one of readF/writeF is specified */
225 52 : Assert((cs->readF == NULL) != (cs->writeF == NULL));
226 :
227 52 : if (cs->readF != NULL)
228 : {
229 26 : zstdcs->dstream = ZSTD_createDStream();
230 26 : if (zstdcs->dstream == NULL)
4 tomas.vondra 231 UNC 0 : pg_fatal("could not initialize compression library");
232 :
4 tomas.vondra 233 GNC 26 : zstdcs->input.size = ZSTD_DStreamInSize();
234 26 : zstdcs->input.src = pg_malloc(zstdcs->input.size);
235 :
236 : /*
237 : * output.size is the buffer size we tell zstd it can output to.
238 : * Allocate an additional byte such that ReadDataFromArchiveZstd() can
239 : * call ahwrite() with a null-terminated string, which is an optimized
240 : * case in ExecuteSqlCommandBuf().
241 : */
242 26 : zstdcs->output.size = ZSTD_DStreamOutSize();
243 26 : zstdcs->output.dst = pg_malloc(zstdcs->output.size + 1);
244 : }
245 26 : else if (cs->writeF != NULL)
246 : {
247 26 : zstdcs->cstream = _ZstdCStreamParams(cs->compression_spec);
248 :
249 26 : zstdcs->output.size = ZSTD_CStreamOutSize();
250 26 : zstdcs->output.dst = pg_malloc(zstdcs->output.size);
251 26 : zstdcs->output.pos = 0;
252 : }
253 52 : }
254 :
255 : /*
256 : * Compressed stream API
257 : */
258 :
259 : static bool
260 94 : Zstd_read(void *ptr, size_t size, size_t *rdsize, CompressFileHandle *CFH)
261 : {
262 94 : ZstdCompressorState *zstdcs = (ZstdCompressorState *) CFH->private_data;
263 94 : ZSTD_inBuffer *input = &zstdcs->input;
264 94 : ZSTD_outBuffer *output = &zstdcs->output;
265 94 : size_t input_allocated_size = ZSTD_DStreamInSize();
266 : size_t res,
267 : cnt;
268 :
269 94 : output->size = size;
270 94 : output->dst = ptr;
271 94 : output->pos = 0;
272 :
273 : for (;;)
274 : {
275 120 : Assert(input->pos <= input->size);
276 120 : Assert(input->size <= input_allocated_size);
277 :
278 : /*
279 : * If the input is completely consumed, start back at the beginning
280 : */
281 120 : if (input->pos == input->size)
282 : {
283 : /* input->size is size produced by "fread" */
284 79 : input->size = 0;
285 : /* input->pos is position consumed by decompress */
286 79 : input->pos = 0;
287 : }
288 :
289 : /* read compressed data if we must produce more input */
290 120 : if (input->pos == input->size)
291 : {
292 79 : cnt = fread(unconstify(void *, input->src), 1, input_allocated_size, zstdcs->fp);
293 79 : input->size = cnt;
294 :
295 79 : Assert(cnt <= input_allocated_size);
296 :
297 : /* If we have no more input to consume, we're done */
298 79 : if (cnt == 0)
299 52 : break;
300 : }
301 :
302 68 : while (input->pos < input->size)
303 : {
304 : /* now decompress */
305 68 : res = ZSTD_decompressStream(zstdcs->dstream, output, input);
306 :
307 68 : if (ZSTD_isError(res))
4 tomas.vondra 308 UNC 0 : pg_fatal("could not decompress data: %s", ZSTD_getErrorName(res));
309 :
4 tomas.vondra 310 GNC 68 : if (output->pos == output->size)
311 42 : break; /* No more room for output */
312 :
313 26 : if (res == 0)
314 26 : break; /* End of frame */
315 : }
316 :
317 68 : if (output->pos == output->size)
318 42 : break; /* We read all the data that fits */
319 : }
320 :
321 94 : if (rdsize != NULL)
322 94 : *rdsize = output->pos;
323 :
324 94 : return true;
325 : }
326 :
327 : static bool
328 1498 : Zstd_write(const void *ptr, size_t size, CompressFileHandle *CFH)
329 : {
330 1498 : ZstdCompressorState *zstdcs = (ZstdCompressorState *) CFH->private_data;
331 1498 : ZSTD_inBuffer *input = &zstdcs->input;
332 1498 : ZSTD_outBuffer *output = &zstdcs->output;
333 : size_t res,
334 : cnt;
335 :
336 1498 : input->src = ptr;
337 1498 : input->size = size;
338 1498 : input->pos = 0;
339 :
340 : /* Consume all input, to be flushed later */
341 2996 : while (input->pos != input->size)
342 : {
343 1498 : output->pos = 0;
344 1498 : res = ZSTD_compressStream2(zstdcs->cstream, output, input, ZSTD_e_continue);
345 1498 : if (ZSTD_isError(res))
346 : {
4 tomas.vondra 347 UNC 0 : zstdcs->zstderror = ZSTD_getErrorName(res);
348 0 : return false;
349 : }
350 :
4 tomas.vondra 351 GNC 1498 : cnt = fwrite(output->dst, 1, output->pos, zstdcs->fp);
352 1498 : if (cnt != output->pos)
353 : {
4 tomas.vondra 354 UNC 0 : zstdcs->zstderror = strerror(errno);
355 0 : return false;
356 : }
357 : }
358 :
4 tomas.vondra 359 GNC 1498 : return size;
360 : }
361 :
362 : static int
4 tomas.vondra 363 UNC 0 : Zstd_getc(CompressFileHandle *CFH)
364 : {
365 0 : ZstdCompressorState *zstdcs = (ZstdCompressorState *) CFH->private_data;
366 : int ret;
367 :
368 0 : if (CFH->read_func(&ret, 1, NULL, CFH) != 1)
369 : {
370 0 : if (feof(zstdcs->fp))
371 0 : pg_fatal("could not read from input file: end of file");
372 : else
373 0 : pg_fatal("could not read from input file: %m");
374 : }
375 0 : return ret;
376 : }
377 :
378 : static char *
4 tomas.vondra 379 GNC 3 : Zstd_gets(char *buf, int len, CompressFileHandle *CFH)
380 : {
381 : int i;
382 :
383 3 : Assert(len > 0);
384 :
385 : /*
386 : * Read one byte at a time until newline or EOF. This is only used to read
387 : * the list of LOs, and the I/O is buffered anyway.
388 : */
389 43 : for (i = 0; i < len - 1; ++i)
390 : {
391 : size_t readsz;
392 :
393 43 : if (!CFH->read_func(&buf[i], 1, &readsz, CFH))
394 3 : break;
395 43 : if (readsz != 1)
396 1 : break;
397 42 : if (buf[i] == '\n')
398 : {
399 2 : ++i;
400 2 : break;
401 : }
402 : }
403 3 : buf[i] = '\0';
404 3 : return i > 0 ? buf : NULL;
405 : }
406 :
407 : static bool
408 54 : Zstd_close(CompressFileHandle *CFH)
409 : {
410 54 : ZstdCompressorState *zstdcs = (ZstdCompressorState *) CFH->private_data;
411 :
412 54 : if (zstdcs->cstream)
413 : {
414 : size_t res,
415 : cnt;
416 27 : ZSTD_inBuffer *input = &zstdcs->input;
417 27 : ZSTD_outBuffer *output = &zstdcs->output;
418 :
419 : /* Loop until the compression buffers are fully consumed */
420 : for (;;)
421 : {
422 27 : output->pos = 0;
423 27 : res = ZSTD_compressStream2(zstdcs->cstream, output, input, ZSTD_e_end);
424 27 : if (ZSTD_isError(res))
425 : {
4 tomas.vondra 426 UNC 0 : zstdcs->zstderror = ZSTD_getErrorName(res);
427 0 : return false;
428 : }
429 :
4 tomas.vondra 430 GNC 27 : cnt = fwrite(output->dst, 1, output->pos, zstdcs->fp);
431 27 : if (cnt != output->pos)
432 : {
4 tomas.vondra 433 UNC 0 : zstdcs->zstderror = strerror(errno);
434 0 : return false;
435 : }
436 :
4 tomas.vondra 437 GNC 27 : if (res == 0)
438 27 : break; /* End of frame */
439 : }
440 :
441 27 : ZSTD_freeCStream(zstdcs->cstream);
442 27 : pg_free(zstdcs->output.dst);
443 : }
444 :
445 54 : if (zstdcs->dstream)
446 : {
447 27 : ZSTD_freeDStream(zstdcs->dstream);
448 27 : pg_free(unconstify(void *, zstdcs->input.src));
449 : }
450 :
451 54 : if (fclose(zstdcs->fp) != 0)
4 tomas.vondra 452 UNC 0 : return false;
453 :
4 tomas.vondra 454 GNC 54 : pg_free(zstdcs);
455 54 : return true;
456 : }
457 :
458 : static bool
459 1 : Zstd_eof(CompressFileHandle *CFH)
460 : {
461 1 : ZstdCompressorState *zstdcs = (ZstdCompressorState *) CFH->private_data;
462 :
463 1 : return feof(zstdcs->fp);
464 : }
465 :
466 : static bool
467 54 : Zstd_open(const char *path, int fd, const char *mode,
468 : CompressFileHandle *CFH)
469 : {
470 : FILE *fp;
471 : ZstdCompressorState *zstdcs;
472 :
473 54 : if (fd >= 0)
4 tomas.vondra 474 UNC 0 : fp = fdopen(fd, mode);
475 : else
4 tomas.vondra 476 GNC 54 : fp = fopen(path, mode);
477 :
478 54 : if (fp == NULL)
4 tomas.vondra 479 UNC 0 : return false;
480 :
4 tomas.vondra 481 GNC 54 : zstdcs = (ZstdCompressorState *) pg_malloc0(sizeof(*zstdcs));
482 54 : CFH->private_data = zstdcs;
483 54 : zstdcs->fp = fp;
484 :
485 54 : if (mode[0] == 'r')
486 : {
487 27 : zstdcs->input.src = pg_malloc0(ZSTD_DStreamInSize());
488 27 : zstdcs->dstream = ZSTD_createDStream();
489 27 : if (zstdcs->dstream == NULL)
4 tomas.vondra 490 UNC 0 : pg_fatal("could not initialize compression library");
491 : }
4 tomas.vondra 492 GNC 27 : else if (mode[0] == 'w' || mode[0] == 'a')
493 : {
494 27 : zstdcs->output.size = ZSTD_CStreamOutSize();
495 27 : zstdcs->output.dst = pg_malloc0(zstdcs->output.size);
496 27 : zstdcs->cstream = _ZstdCStreamParams(CFH->compression_spec);
497 27 : if (zstdcs->cstream == NULL)
4 tomas.vondra 498 UNC 0 : pg_fatal("could not initialize compression library");
499 : }
500 : else
501 0 : pg_fatal("unhandled mode");
502 :
4 tomas.vondra 503 GNC 54 : return true;
504 : }
505 :
506 : static bool
507 26 : Zstd_open_write(const char *path, const char *mode, CompressFileHandle *CFH)
508 : {
509 : char fname[MAXPGPATH];
510 :
511 26 : sprintf(fname, "%s.zst", path);
512 26 : return CFH->open_func(fname, -1, mode, CFH);
513 : }
514 :
515 : static const char *
4 tomas.vondra 516 UNC 0 : Zstd_get_error(CompressFileHandle *CFH)
517 : {
518 0 : ZstdCompressorState *zstdcs = (ZstdCompressorState *) CFH->private_data;
519 :
520 0 : return zstdcs->zstderror;
521 : }
522 :
523 : void
4 tomas.vondra 524 GNC 54 : InitCompressFileHandleZstd(CompressFileHandle *CFH,
525 : const pg_compress_specification compression_spec)
526 : {
527 54 : CFH->open_func = Zstd_open;
528 54 : CFH->open_write_func = Zstd_open_write;
529 54 : CFH->read_func = Zstd_read;
530 54 : CFH->write_func = Zstd_write;
531 54 : CFH->gets_func = Zstd_gets;
532 54 : CFH->getc_func = Zstd_getc;
533 54 : CFH->close_func = Zstd_close;
534 54 : CFH->eof_func = Zstd_eof;
535 54 : CFH->get_error_func = Zstd_get_error;
536 :
537 54 : CFH->compression_spec = compression_spec;
538 :
539 54 : CFH->private_data = NULL;
540 54 : }
541 :
542 : #endif /* USE_ZSTD */
|