Age Owner TLA Line data Source code
1 : /*-------------------------------------------------------------------------
2 : *
3 : * compress_lz4.c
4 : * Routines for archivers to write a LZ4 compressed data stream.
5 : *
6 : * Portions Copyright (c) 1996-2023, PostgreSQL Global Development Group
7 : * Portions Copyright (c) 1994, Regents of the University of California
8 : *
9 : * IDENTIFICATION
10 : * src/bin/pg_dump/compress_lz4.c
11 : *
12 : *-------------------------------------------------------------------------
13 : */
14 : #include "postgres_fe.h"
15 : #include "pg_backup_utils.h"
16 :
17 : #include "compress_lz4.h"
18 :
19 : #ifdef USE_LZ4
20 : #include <lz4frame.h>
21 :
22 : /*
23 : * LZ4F_HEADER_SIZE_MAX first appeared in v1.7.5 of the library.
24 : * Redefine it for installations with a lesser version.
25 : */
26 : #ifndef LZ4F_HEADER_SIZE_MAX
27 : #define LZ4F_HEADER_SIZE_MAX 32
28 : #endif
29 :
30 : /*---------------------------------
31 : * Common to both compression APIs
32 : *---------------------------------
33 : */
34 :
35 : /*
36 : * (de)compression state used by both the Compressor and Stream APIs.
37 : */
38 : typedef struct LZ4State
39 : {
40 : /*
41 : * Used by the Stream API to keep track of the file stream.
42 : */
43 : FILE *fp;
44 :
45 : LZ4F_preferences_t prefs;
46 :
47 : LZ4F_compressionContext_t ctx;
48 : LZ4F_decompressionContext_t dtx;
49 :
50 : /*
51 : * Used by the Stream API's lazy initialization.
52 : */
53 : bool inited;
54 :
55 : /*
56 : * Used by the Stream API to distinguish between compression and
57 : * decompression operations.
58 : */
59 : bool compressing;
60 :
61 : /*
62 : * Used by the Compressor API to mark if the compression headers have been
63 : * written after initialization.
64 : */
65 : bool needs_header_flush;
66 :
67 : size_t buflen;
68 : char *buffer;
69 :
70 : /*
71 : * Used by the Stream API to store already uncompressed data that the
72 : * caller has not consumed.
73 : */
74 : size_t overflowalloclen;
75 : size_t overflowlen;
76 : char *overflowbuf;
77 :
78 : /*
79 : * Used by both APIs to keep track of the compressed data length stored in
80 : * the buffer.
81 : */
82 : size_t compressedlen;
83 :
84 : /*
85 : * Used by both APIs to keep track of error codes.
86 : */
87 : size_t errcode;
88 : } LZ4State;
89 :
90 : /*
91 : * LZ4State_compression_init
92 : * Initialize the required LZ4State members for compression.
93 : *
94 : * Write the LZ4 frame header in a buffer keeping track of its length. Users of
95 : * this function can choose when and how to write the header to a file stream.
96 : *
97 : * Returns true on success. In case of a failure returns false, and stores the
98 : * error code in state->errcode.
99 : */
100 : static bool
8 tomas.vondra 101 GNC 52 : LZ4State_compression_init(LZ4State *state)
102 : {
103 : size_t status;
104 :
105 52 : state->buflen = LZ4F_compressBound(DEFAULT_IO_BUFFER_SIZE, &state->prefs);
106 :
107 : /*
108 : * LZ4F_compressBegin requires a buffer that is greater or equal to
109 : * LZ4F_HEADER_SIZE_MAX. Verify that the requirement is met.
110 : */
111 52 : if (state->buflen < LZ4F_HEADER_SIZE_MAX)
8 tomas.vondra 112 UNC 0 : state->buflen = LZ4F_HEADER_SIZE_MAX;
113 :
8 tomas.vondra 114 GNC 52 : status = LZ4F_createCompressionContext(&state->ctx, LZ4F_VERSION);
115 52 : if (LZ4F_isError(status))
116 : {
8 tomas.vondra 117 UNC 0 : state->errcode = status;
118 0 : return false;
119 : }
120 :
8 tomas.vondra 121 GNC 52 : state->buffer = pg_malloc(state->buflen);
122 52 : status = LZ4F_compressBegin(state->ctx,
123 52 : state->buffer, state->buflen,
124 52 : &state->prefs);
125 52 : if (LZ4F_isError(status))
126 : {
8 tomas.vondra 127 UNC 0 : state->errcode = status;
128 0 : return false;
129 : }
130 :
8 tomas.vondra 131 GNC 52 : state->compressedlen = status;
132 :
133 52 : return true;
134 : }
135 :
136 : /*----------------------
137 : * Compressor API
138 : *----------------------
139 : */
140 :
141 : /* Private routines that support LZ4 compressed data I/O */
142 :
143 : static void
45 144 26 : ReadDataFromArchiveLZ4(ArchiveHandle *AH, CompressorState *cs)
145 : {
146 : size_t r;
147 : size_t readbuflen;
148 : char *outbuf;
149 : char *readbuf;
8 150 26 : LZ4F_decompressionContext_t ctx = NULL;
151 : LZ4F_decompressOptions_t dec_opt;
152 : LZ4F_errorCode_t status;
153 :
154 26 : memset(&dec_opt, 0, sizeof(dec_opt));
155 26 : status = LZ4F_createDecompressionContext(&ctx, LZ4F_VERSION);
156 26 : if (LZ4F_isError(status))
8 tomas.vondra 157 UNC 0 : pg_fatal("could not create LZ4 decompression context: %s",
158 : LZ4F_getErrorName(status));
159 :
8 tomas.vondra 160 GNC 26 : outbuf = pg_malloc0(DEFAULT_IO_BUFFER_SIZE);
161 26 : readbuf = pg_malloc0(DEFAULT_IO_BUFFER_SIZE);
162 26 : readbuflen = DEFAULT_IO_BUFFER_SIZE;
163 78 : while ((r = cs->readF(AH, &readbuf, &readbuflen)) > 0)
164 : {
165 : char *readp;
166 : char *readend;
167 :
168 : /* Process one chunk */
169 52 : readp = readbuf;
170 52 : readend = readbuf + r;
171 104 : while (readp < readend)
172 : {
173 52 : size_t out_size = DEFAULT_IO_BUFFER_SIZE;
174 52 : size_t read_size = readend - readp;
175 :
176 52 : memset(outbuf, 0, DEFAULT_IO_BUFFER_SIZE);
177 52 : status = LZ4F_decompress(ctx, outbuf, &out_size,
178 : readp, &read_size, &dec_opt);
179 52 : if (LZ4F_isError(status))
8 tomas.vondra 180 UNC 0 : pg_fatal("could not decompress: %s",
181 : LZ4F_getErrorName(status));
182 :
8 tomas.vondra 183 GNC 52 : ahwrite(outbuf, 1, out_size, AH);
184 52 : readp += read_size;
185 : }
186 : }
187 :
188 26 : pg_free(outbuf);
189 26 : pg_free(readbuf);
190 :
191 26 : status = LZ4F_freeDecompressionContext(ctx);
192 26 : if (LZ4F_isError(status))
8 tomas.vondra 193 UNC 0 : pg_fatal("could not free LZ4 decompression context: %s",
194 : LZ4F_getErrorName(status));
45 tomas.vondra 195 GNC 26 : }
196 :
197 : static void
198 54 : WriteDataToArchiveLZ4(ArchiveHandle *AH, CompressorState *cs,
199 : const void *data, size_t dLen)
200 : {
8 201 54 : LZ4State *state = (LZ4State *) cs->private_data;
202 54 : size_t remaining = dLen;
203 : size_t status;
204 : size_t chunk;
205 :
206 : /* Write the header if not yet written. */
207 54 : if (state->needs_header_flush)
208 : {
209 25 : cs->writeF(AH, state->buffer, state->compressedlen);
210 25 : state->needs_header_flush = false;
211 : }
212 :
213 108 : while (remaining > 0)
214 : {
215 :
216 54 : if (remaining > DEFAULT_IO_BUFFER_SIZE)
8 tomas.vondra 217 UNC 0 : chunk = DEFAULT_IO_BUFFER_SIZE;
218 : else
8 tomas.vondra 219 GNC 54 : chunk = remaining;
220 :
221 54 : remaining -= chunk;
222 54 : status = LZ4F_compressUpdate(state->ctx,
223 54 : state->buffer, state->buflen,
224 : data, chunk, NULL);
225 :
226 54 : if (LZ4F_isError(status))
8 tomas.vondra 227 UNC 0 : pg_fatal("failed to LZ4 compress data: %s",
228 : LZ4F_getErrorName(status));
229 :
8 tomas.vondra 230 GNC 54 : cs->writeF(AH, state->buffer, status);
231 :
232 54 : data = ((char *) data) + chunk;
233 : }
45 234 54 : }
235 :
236 : static void
237 52 : EndCompressorLZ4(ArchiveHandle *AH, CompressorState *cs)
238 : {
8 239 52 : LZ4State *state = (LZ4State *) cs->private_data;
240 : size_t status;
241 :
242 : /* Nothing needs to be done */
243 52 : if (!state)
244 26 : return;
245 :
246 : /*
247 : * Write the header if not yet written. The caller is not required to call
248 : * writeData if the relation does not contain any data. Thus it is
249 : * possible to reach here without having flushed the header. Do it before
250 : * ending the compression.
251 : */
252 26 : if (state->needs_header_flush)
253 1 : cs->writeF(AH, state->buffer, state->compressedlen);
254 :
255 26 : status = LZ4F_compressEnd(state->ctx,
256 26 : state->buffer, state->buflen,
257 : NULL);
258 26 : if (LZ4F_isError(status))
8 tomas.vondra 259 UNC 0 : pg_fatal("failed to end compression: %s",
260 : LZ4F_getErrorName(status));
261 :
8 tomas.vondra 262 GNC 26 : cs->writeF(AH, state->buffer, status);
263 :
264 26 : status = LZ4F_freeCompressionContext(state->ctx);
265 26 : if (LZ4F_isError(status))
8 tomas.vondra 266 UNC 0 : pg_fatal("failed to end compression: %s",
267 : LZ4F_getErrorName(status));
268 :
8 tomas.vondra 269 GNC 26 : pg_free(state->buffer);
270 26 : pg_free(state);
271 :
272 26 : cs->private_data = NULL;
273 : }
274 :
275 : /*
276 : * Public routines that support LZ4 compressed data I/O
277 : */
278 : void
45 279 52 : InitCompressorLZ4(CompressorState *cs, const pg_compress_specification compression_spec)
280 : {
281 : LZ4State *state;
282 :
283 52 : cs->readData = ReadDataFromArchiveLZ4;
284 52 : cs->writeData = WriteDataToArchiveLZ4;
285 52 : cs->end = EndCompressorLZ4;
286 :
287 52 : cs->compression_spec = compression_spec;
288 :
289 : /*
290 : * Read operations have access to the whole input. No state needs to be
291 : * carried between calls.
292 : */
8 293 52 : if (cs->readF)
294 26 : return;
295 :
296 26 : state = pg_malloc0(sizeof(*state));
297 26 : if (cs->compression_spec.level >= 0)
298 26 : state->prefs.compressionLevel = cs->compression_spec.level;
299 :
300 26 : if (!LZ4State_compression_init(state))
8 tomas.vondra 301 UNC 0 : pg_fatal("could not initialize LZ4 compression: %s",
302 : LZ4F_getErrorName(state->errcode));
303 :
304 : /* Remember that the header has not been written. */
8 tomas.vondra 305 GNC 26 : state->needs_header_flush = true;
306 26 : cs->private_data = state;
307 : }
308 :
309 : /*----------------------
310 : * Compress Stream API
311 : *----------------------
312 : */
313 :
314 :
315 : /*
316 : * LZ4 equivalent to feof() or gzeof(). Return true iff there is no
317 : * decompressed output in the overflow buffer and the end of the backing file
318 : * is reached.
319 : */
320 : static bool
321 2 : LZ4Stream_eof(CompressFileHandle *CFH)
322 : {
323 2 : LZ4State *state = (LZ4State *) CFH->private_data;
324 :
325 2 : return state->overflowlen == 0 && feof(state->fp);
326 : }
327 :
328 : static const char *
8 tomas.vondra 329 UNC 0 : LZ4Stream_get_error(CompressFileHandle *CFH)
330 : {
331 0 : LZ4State *state = (LZ4State *) CFH->private_data;
332 : const char *errmsg;
333 :
334 0 : if (LZ4F_isError(state->errcode))
335 0 : errmsg = LZ4F_getErrorName(state->errcode);
336 : else
45 337 0 : errmsg = strerror(errno);
338 :
339 0 : return errmsg;
340 : }
341 :
342 : /*
343 : * Initialize an already alloc'ed LZ4State struct for subsequent calls.
344 : *
345 : * Creates the necessary contexts for either compresion or decompression. When
346 : * compressing data (indicated by compressing=true), it additionally writes the
347 : * LZ4 header in the output stream.
348 : *
349 : * Returns true on success. In case of a failure returns false, and stores the
350 : * error code in state->errcode.
351 : */
352 : static bool
8 tomas.vondra 353 GNC 1552 : LZ4Stream_init(LZ4State *state, int size, bool compressing)
354 : {
355 : size_t status;
356 :
357 1552 : if (state->inited)
17 358 1499 : return true;
359 :
8 360 53 : state->compressing = compressing;
361 53 : state->inited = true;
362 :
363 : /* When compressing, write LZ4 header to the output stream. */
364 53 : if (state->compressing)
365 : {
366 :
367 26 : if (!LZ4State_compression_init(state))
17 tomas.vondra 368 UNC 0 : return false;
369 :
8 tomas.vondra 370 GNC 26 : if (fwrite(state->buffer, 1, state->compressedlen, state->fp) != state->compressedlen)
371 : {
45 tomas.vondra 372 UNC 0 : errno = (errno) ? errno : ENOSPC;
17 373 0 : return false;
374 : }
375 : }
376 : else
377 : {
8 tomas.vondra 378 GNC 27 : status = LZ4F_createDecompressionContext(&state->dtx, LZ4F_VERSION);
45 379 27 : if (LZ4F_isError(status))
380 : {
8 tomas.vondra 381 UNC 0 : state->errcode = status;
17 382 0 : return false;
383 : }
384 :
8 tomas.vondra 385 GNC 27 : state->buflen = Max(size, DEFAULT_IO_BUFFER_SIZE);
386 27 : state->buffer = pg_malloc(state->buflen);
387 :
388 27 : state->overflowalloclen = state->buflen;
389 27 : state->overflowbuf = pg_malloc(state->overflowalloclen);
390 27 : state->overflowlen = 0;
391 : }
392 :
17 393 53 : return true;
394 : }
395 :
396 : /*
397 : * Read already decompressed content from the overflow buffer into 'ptr' up to
398 : * 'size' bytes, if available. If the eol_flag is set, then stop at the first
399 : * occurrence of the newline char prior to 'size' bytes.
400 : *
401 : * Any unread content in the overflow buffer is moved to the beginning.
402 : *
403 : * Returns the number of bytes read from the overflow buffer (and copied into
404 : * the 'ptr' buffer), or 0 if the overflow buffer is empty.
405 : */
406 : static int
8 407 54 : LZ4Stream_read_overflow(LZ4State *state, void *ptr, int size, bool eol_flag)
408 : {
409 : char *p;
45 410 54 : int readlen = 0;
411 :
8 412 54 : if (state->overflowlen == 0)
45 413 53 : return 0;
414 :
8 415 1 : if (state->overflowlen >= size)
45 tomas.vondra 416 UNC 0 : readlen = size;
417 : else
8 tomas.vondra 418 GNC 1 : readlen = state->overflowlen;
419 :
420 1 : if (eol_flag && (p = memchr(state->overflowbuf, '\n', readlen)))
421 : /* Include the line terminating char */
422 1 : readlen = p - state->overflowbuf + 1;
423 :
424 1 : memcpy(ptr, state->overflowbuf, readlen);
425 1 : state->overflowlen -= readlen;
426 :
427 1 : if (state->overflowlen > 0)
8 tomas.vondra 428 UNC 0 : memmove(state->overflowbuf, state->overflowbuf + readlen, state->overflowlen);
429 :
45 tomas.vondra 430 GNC 1 : return readlen;
431 : }
432 :
433 : /*
434 : * The workhorse for reading decompressed content out of an LZ4 compressed
435 : * stream.
436 : *
437 : * It will read up to 'ptrsize' decompressed content, or up to the new line
438 : * char if found first when the eol_flag is set. It is possible that the
439 : * decompressed output generated by reading any compressed input via the
440 : * LZ4F API, exceeds 'ptrsize'. Any exceeding decompressed content is stored
441 : * at an overflow buffer within LZ4State. Of course, when the function is
442 : * called, it will first try to consume any decompressed content already
443 : * present in the overflow buffer, before decompressing new content.
444 : *
445 : * Returns the number of bytes of decompressed data copied into the ptr
446 : * buffer, or -1 in case of error.
447 : */
448 : static int
8 449 54 : LZ4Stream_read_internal(LZ4State *state, void *ptr, int ptrsize, bool eol_flag)
450 : {
17 451 54 : int dsize = 0;
452 : int rsize;
453 54 : int size = ptrsize;
45 454 54 : bool eol_found = false;
455 :
456 : void *readbuf;
457 :
458 : /* Lazy init */
8 459 54 : if (!LZ4Stream_init(state, size, false /* decompressing */ ))
45 tomas.vondra 460 UNC 0 : return -1;
461 :
462 : /* Verify that there is enough space in the outbuf */
8 tomas.vondra 463 GNC 54 : if (size > state->buflen)
464 : {
8 tomas.vondra 465 UNC 0 : state->buflen = size;
466 0 : state->buffer = pg_realloc(state->buffer, size);
467 : }
468 :
469 : /* use already decompressed content if available */
8 tomas.vondra 470 GNC 54 : dsize = LZ4Stream_read_overflow(state, ptr, size, eol_flag);
45 471 54 : if (dsize == size || (eol_flag && memchr(ptr, '\n', dsize)))
472 1 : return dsize;
473 :
474 53 : readbuf = pg_malloc(size);
475 :
476 : do
477 : {
478 : char *rp;
479 : char *rend;
480 :
8 481 53 : rsize = fread(readbuf, 1, size, state->fp);
482 53 : if (rsize < size && !feof(state->fp))
45 tomas.vondra 483 UNC 0 : return -1;
484 :
45 tomas.vondra 485 GNC 53 : rp = (char *) readbuf;
486 53 : rend = (char *) readbuf + rsize;
487 :
488 79 : while (rp < rend)
489 : {
490 : size_t status;
8 491 26 : size_t outlen = state->buflen;
45 492 26 : size_t read_remain = rend - rp;
493 :
8 494 26 : memset(state->buffer, 0, outlen);
495 26 : status = LZ4F_decompress(state->dtx, state->buffer, &outlen,
496 : rp, &read_remain, NULL);
45 497 26 : if (LZ4F_isError(status))
498 : {
8 tomas.vondra 499 UNC 0 : state->errcode = status;
45 500 0 : return -1;
501 : }
502 :
45 tomas.vondra 503 GNC 26 : rp += read_remain;
504 :
505 : /*
506 : * fill in what space is available in ptr if the eol flag is set,
507 : * either skip if one already found or fill up to EOL if present
508 : * in the outbuf
509 : */
510 26 : if (outlen > 0 && dsize < size && eol_found == false)
511 : {
512 : char *p;
513 26 : size_t lib = (!eol_flag) ? size - dsize : size - 1 - dsize;
514 26 : size_t len = outlen < lib ? outlen : lib;
515 :
516 26 : if (eol_flag &&
8 517 1 : (p = memchr(state->buffer, '\n', outlen)) &&
518 1 : (size_t) (p - state->buffer + 1) <= len)
519 : {
520 1 : len = p - state->buffer + 1;
45 521 1 : eol_found = true;
522 : }
523 :
8 524 26 : memcpy((char *) ptr + dsize, state->buffer, len);
45 525 26 : dsize += len;
526 :
527 : /* move what did not fit, if any, at the beginning of the buf */
528 26 : if (len < outlen)
8 529 1 : memmove(state->buffer, state->buffer + len, outlen - len);
45 530 26 : outlen -= len;
531 : }
532 :
533 : /* if there is available output, save it */
534 26 : if (outlen > 0)
535 : {
8 536 1 : while (state->overflowlen + outlen > state->overflowalloclen)
537 : {
8 tomas.vondra 538 UNC 0 : state->overflowalloclen *= 2;
539 0 : state->overflowbuf = pg_realloc(state->overflowbuf,
540 : state->overflowalloclen);
541 : }
542 :
8 tomas.vondra 543 GNC 1 : memcpy(state->overflowbuf + state->overflowlen, state->buffer, outlen);
544 1 : state->overflowlen += outlen;
545 : }
546 : }
17 547 53 : } while (rsize == size && dsize < size && eol_found == false);
548 :
45 549 53 : pg_free(readbuf);
550 :
17 551 53 : return dsize;
552 : }
553 :
554 : /*
555 : * Compress size bytes from ptr and write them to the stream.
556 : */
557 : static bool
8 558 1498 : LZ4Stream_write(const void *ptr, size_t size, CompressFileHandle *CFH)
559 : {
560 1498 : LZ4State *state = (LZ4State *) CFH->private_data;
561 : size_t status;
45 562 1498 : int remaining = size;
563 :
564 : /* Lazy init */
8 565 1498 : if (!LZ4Stream_init(state, size, true))
17 tomas.vondra 566 UNC 0 : return false;
567 :
45 tomas.vondra 568 GNC 2996 : while (remaining > 0)
569 : {
17 570 1498 : int chunk = Min(remaining, DEFAULT_IO_BUFFER_SIZE);
571 :
45 572 1498 : remaining -= chunk;
573 :
8 574 1498 : status = LZ4F_compressUpdate(state->ctx, state->buffer, state->buflen,
575 : ptr, chunk, NULL);
45 576 1498 : if (LZ4F_isError(status))
577 : {
8 tomas.vondra 578 UNC 0 : state->errcode = status;
17 579 0 : return false;
580 : }
581 :
8 tomas.vondra 582 GNC 1498 : if (fwrite(state->buffer, 1, status, state->fp) != status)
583 : {
45 tomas.vondra 584 UNC 0 : errno = (errno) ? errno : ENOSPC;
17 585 0 : return false;
586 : }
587 : }
588 :
17 tomas.vondra 589 GNC 1498 : return true;
590 : }
591 :
592 : /*
593 : * fread() equivalent implementation for LZ4 compressed files.
594 : */
595 : static bool
8 596 51 : LZ4Stream_read(void *ptr, size_t size, size_t *rsize, CompressFileHandle *CFH)
597 : {
598 51 : LZ4State *state = (LZ4State *) CFH->private_data;
599 : int ret;
600 :
601 51 : if ((ret = LZ4Stream_read_internal(state, ptr, size, false)) < 0)
8 tomas.vondra 602 UNC 0 : pg_fatal("could not read from input file: %s", LZ4Stream_get_error(CFH));
603 :
17 tomas.vondra 604 GNC 51 : if (rsize)
605 51 : *rsize = (size_t) ret;
606 :
607 51 : return true;
608 : }
609 :
610 : /*
611 : * fgetc() equivalent implementation for LZ4 compressed files.
612 : */
613 : static int
8 tomas.vondra 614 UNC 0 : LZ4Stream_getc(CompressFileHandle *CFH)
615 : {
616 0 : LZ4State *state = (LZ4State *) CFH->private_data;
617 : unsigned char c;
618 :
619 0 : if (LZ4Stream_read_internal(state, &c, 1, false) <= 0)
620 : {
621 0 : if (!LZ4Stream_eof(CFH))
622 0 : pg_fatal("could not read from input file: %s", LZ4Stream_get_error(CFH));
623 : else
45 624 0 : pg_fatal("could not read from input file: end of file");
625 : }
626 :
627 0 : return c;
628 : }
629 :
630 : /*
631 : * fgets() equivalent implementation for LZ4 compressed files.
632 : */
633 : static char *
8 tomas.vondra 634 GNC 3 : LZ4Stream_gets(char *ptr, int size, CompressFileHandle *CFH)
635 : {
636 3 : LZ4State *state = (LZ4State *) CFH->private_data;
637 : int ret;
638 :
639 3 : ret = LZ4Stream_read_internal(state, ptr, size, true);
640 3 : if (ret < 0 || (ret == 0 && !LZ4Stream_eof(CFH)))
8 tomas.vondra 641 UNC 0 : pg_fatal("could not read from input file: %s", LZ4Stream_get_error(CFH));
642 :
643 : /* Done reading */
17 tomas.vondra 644 GNC 3 : if (ret == 0)
45 645 1 : return NULL;
646 :
647 2 : return ptr;
648 : }
649 :
650 : /*
651 : * Finalize (de)compression of a stream. When compressing it will write any
652 : * remaining content and/or generated footer from the LZ4 API.
653 : */
654 : static bool
8 655 54 : LZ4Stream_close(CompressFileHandle *CFH)
656 : {
657 : FILE *fp;
658 54 : LZ4State *state = (LZ4State *) CFH->private_data;
659 : size_t status;
660 :
661 54 : fp = state->fp;
662 54 : if (state->inited)
663 : {
664 53 : if (state->compressing)
665 : {
666 26 : status = LZ4F_compressEnd(state->ctx, state->buffer, state->buflen, NULL);
45 667 26 : if (LZ4F_isError(status))
45 tomas.vondra 668 UNC 0 : pg_fatal("failed to end compression: %s",
669 : LZ4F_getErrorName(status));
8 tomas.vondra 670 GNC 26 : else if (fwrite(state->buffer, 1, status, state->fp) != status)
671 : {
45 tomas.vondra 672 UNC 0 : errno = (errno) ? errno : ENOSPC;
673 0 : WRITE_ERROR_EXIT;
674 : }
675 :
8 tomas.vondra 676 GNC 26 : status = LZ4F_freeCompressionContext(state->ctx);
45 677 26 : if (LZ4F_isError(status))
45 tomas.vondra 678 UNC 0 : pg_fatal("failed to end compression: %s",
679 : LZ4F_getErrorName(status));
680 : }
681 : else
682 : {
8 tomas.vondra 683 GNC 27 : status = LZ4F_freeDecompressionContext(state->dtx);
45 684 27 : if (LZ4F_isError(status))
45 tomas.vondra 685 UNC 0 : pg_fatal("failed to end decompression: %s",
686 : LZ4F_getErrorName(status));
8 tomas.vondra 687 GNC 27 : pg_free(state->overflowbuf);
688 : }
689 :
690 53 : pg_free(state->buffer);
691 : }
692 :
693 54 : pg_free(state);
694 :
17 695 54 : return fclose(fp) == 0;
696 : }
697 :
698 : static bool
8 699 54 : LZ4Stream_open(const char *path, int fd, const char *mode,
700 : CompressFileHandle *CFH)
701 : {
702 : FILE *fp;
703 54 : LZ4State *state = (LZ4State *) CFH->private_data;
704 :
45 705 54 : if (fd >= 0)
45 tomas.vondra 706 UNC 0 : fp = fdopen(fd, mode);
707 : else
45 tomas.vondra 708 GNC 54 : fp = fopen(path, mode);
709 54 : if (fp == NULL)
710 : {
8 tomas.vondra 711 UNC 0 : state->errcode = errno;
17 712 0 : return false;
713 : }
714 :
8 tomas.vondra 715 GNC 54 : state->fp = fp;
716 :
17 717 54 : return true;
718 : }
719 :
720 : static bool
8 721 26 : LZ4Stream_open_write(const char *path, const char *mode, CompressFileHandle *CFH)
722 : {
723 : char *fname;
724 : int save_errno;
725 : bool ret;
726 :
45 727 26 : fname = psprintf("%s.lz4", path);
728 26 : ret = CFH->open_func(fname, -1, mode, CFH);
729 :
17 730 26 : save_errno = errno;
45 731 26 : pg_free(fname);
17 732 26 : errno = save_errno;
733 :
45 734 26 : return ret;
735 : }
736 :
737 : /*
738 : * Public routines
739 : */
740 : void
741 54 : InitCompressFileHandleLZ4(CompressFileHandle *CFH,
742 : const pg_compress_specification compression_spec)
743 : {
744 : LZ4State *state;
745 :
8 746 54 : CFH->open_func = LZ4Stream_open;
747 54 : CFH->open_write_func = LZ4Stream_open_write;
748 54 : CFH->read_func = LZ4Stream_read;
749 54 : CFH->write_func = LZ4Stream_write;
750 54 : CFH->gets_func = LZ4Stream_gets;
751 54 : CFH->getc_func = LZ4Stream_getc;
752 54 : CFH->eof_func = LZ4Stream_eof;
753 54 : CFH->close_func = LZ4Stream_close;
754 54 : CFH->get_error_func = LZ4Stream_get_error;
755 :
45 756 54 : CFH->compression_spec = compression_spec;
8 757 54 : state = pg_malloc0(sizeof(*state));
45 758 54 : if (CFH->compression_spec.level >= 0)
8 759 54 : state->prefs.compressionLevel = CFH->compression_spec.level;
760 :
761 54 : CFH->private_data = state;
45 762 54 : }
763 : #else /* USE_LZ4 */
764 : void
765 : InitCompressorLZ4(CompressorState *cs,
766 : const pg_compress_specification compression_spec)
767 : {
768 : pg_fatal("this build does not support compression with %s", "LZ4");
769 : }
770 :
771 : void
772 : InitCompressFileHandleLZ4(CompressFileHandle *CFH,
773 : const pg_compress_specification compression_spec)
774 : {
775 : pg_fatal("this build does not support compression with %s", "LZ4");
776 : }
777 : #endif /* USE_LZ4 */
|