TLA Line data Source code
1 : /*-------------------------------------------------------------------------
2 : *
3 : * compress_lz4.c
4 : * Routines for archivers to write a LZ4 compressed data stream.
5 : *
6 : * Portions Copyright (c) 1996-2023, PostgreSQL Global Development Group
7 : * Portions Copyright (c) 1994, Regents of the University of California
8 : *
9 : * IDENTIFICATION
10 : * src/bin/pg_dump/compress_lz4.c
11 : *
12 : *-------------------------------------------------------------------------
13 : */
14 : #include "postgres_fe.h"
15 : #include "pg_backup_utils.h"
16 :
17 : #include "compress_lz4.h"
18 :
19 : #ifdef USE_LZ4
20 : #include <lz4frame.h>
21 :
22 : /*
23 : * LZ4F_HEADER_SIZE_MAX first appeared in v1.7.5 of the library.
24 : * Redefine it for installations with a lesser version.
25 : */
26 : #ifndef LZ4F_HEADER_SIZE_MAX
27 : #define LZ4F_HEADER_SIZE_MAX 32
28 : #endif
29 :
30 : /*---------------------------------
31 : * Common to both compression APIs
32 : *---------------------------------
33 : */
34 :
35 : /*
36 : * (de)compression state used by both the Compressor and Stream APIs.
37 : */
38 : typedef struct LZ4State
39 : {
40 : /*
41 : * Used by the Stream API to keep track of the file stream.
42 : */
43 : FILE *fp;
44 :
45 : LZ4F_preferences_t prefs;
46 :
47 : LZ4F_compressionContext_t ctx;
48 : LZ4F_decompressionContext_t dtx;
49 :
50 : /*
51 : * Used by the Stream API's lazy initialization.
52 : */
53 : bool inited;
54 :
55 : /*
56 : * Used by the Stream API to distinguish between compression and
57 : * decompression operations.
58 : */
59 : bool compressing;
60 :
61 : /*
62 : * Used by the Compressor API to mark if the compression headers have been
63 : * written after initialization.
64 : */
65 : bool needs_header_flush;
66 :
67 : size_t buflen;
68 : char *buffer;
69 :
70 : /*
71 : * Used by the Stream API to store already uncompressed data that the
72 : * caller has not consumed.
73 : */
74 : size_t overflowalloclen;
75 : size_t overflowlen;
76 : char *overflowbuf;
77 :
78 : /*
79 : * Used by both APIs to keep track of the compressed data length stored in
80 : * the buffer.
81 : */
82 : size_t compressedlen;
83 :
84 : /*
85 : * Used by both APIs to keep track of error codes.
86 : */
87 : size_t errcode;
88 : } LZ4State;
89 :
90 : /*
91 : * LZ4State_compression_init
92 : * Initialize the required LZ4State members for compression.
93 : *
94 : * Write the LZ4 frame header in a buffer keeping track of its length. Users of
95 : * this function can choose when and how to write the header to a file stream.
96 : *
97 : * Returns true on success. In case of a failure returns false, and stores the
98 : * error code in state->errcode.
99 : */
100 : static bool
101 GNC 52 : LZ4State_compression_init(LZ4State *state)
102 : {
103 : size_t status;
104 :
105 52 : state->buflen = LZ4F_compressBound(DEFAULT_IO_BUFFER_SIZE, &state->prefs);
106 :
107 : /*
108 : * LZ4F_compressBegin requires a buffer that is greater or equal to
109 : * LZ4F_HEADER_SIZE_MAX. Verify that the requirement is met.
110 : */
111 52 : if (state->buflen < LZ4F_HEADER_SIZE_MAX)
112 UNC 0 : state->buflen = LZ4F_HEADER_SIZE_MAX;
113 :
114 GNC 52 : status = LZ4F_createCompressionContext(&state->ctx, LZ4F_VERSION);
115 52 : if (LZ4F_isError(status))
116 : {
117 UNC 0 : state->errcode = status;
118 0 : return false;
119 : }
120 :
121 GNC 52 : state->buffer = pg_malloc(state->buflen);
122 52 : status = LZ4F_compressBegin(state->ctx,
123 52 : state->buffer, state->buflen,
124 52 : &state->prefs);
125 52 : if (LZ4F_isError(status))
126 : {
127 UNC 0 : state->errcode = status;
128 0 : return false;
129 : }
130 :
131 GNC 52 : state->compressedlen = status;
132 :
133 52 : return true;
134 : }
135 :
136 : /*----------------------
137 : * Compressor API
138 : *----------------------
139 : */
140 :
141 : /* Private routines that support LZ4 compressed data I/O */
142 :
143 : static void
144 26 : ReadDataFromArchiveLZ4(ArchiveHandle *AH, CompressorState *cs)
145 : {
146 : size_t r;
147 : size_t readbuflen;
148 : char *outbuf;
149 : char *readbuf;
150 26 : LZ4F_decompressionContext_t ctx = NULL;
151 : LZ4F_decompressOptions_t dec_opt;
152 : LZ4F_errorCode_t status;
153 :
154 26 : memset(&dec_opt, 0, sizeof(dec_opt));
155 26 : status = LZ4F_createDecompressionContext(&ctx, LZ4F_VERSION);
156 26 : if (LZ4F_isError(status))
157 UNC 0 : pg_fatal("could not create LZ4 decompression context: %s",
158 : LZ4F_getErrorName(status));
159 :
160 GNC 26 : outbuf = pg_malloc0(DEFAULT_IO_BUFFER_SIZE);
161 26 : readbuf = pg_malloc0(DEFAULT_IO_BUFFER_SIZE);
162 26 : readbuflen = DEFAULT_IO_BUFFER_SIZE;
163 78 : while ((r = cs->readF(AH, &readbuf, &readbuflen)) > 0)
164 : {
165 : char *readp;
166 : char *readend;
167 :
168 : /* Process one chunk */
169 52 : readp = readbuf;
170 52 : readend = readbuf + r;
171 104 : while (readp < readend)
172 : {
173 52 : size_t out_size = DEFAULT_IO_BUFFER_SIZE;
174 52 : size_t read_size = readend - readp;
175 :
176 52 : memset(outbuf, 0, DEFAULT_IO_BUFFER_SIZE);
177 52 : status = LZ4F_decompress(ctx, outbuf, &out_size,
178 : readp, &read_size, &dec_opt);
179 52 : if (LZ4F_isError(status))
180 UNC 0 : pg_fatal("could not decompress: %s",
181 : LZ4F_getErrorName(status));
182 :
183 GNC 52 : ahwrite(outbuf, 1, out_size, AH);
184 52 : readp += read_size;
185 : }
186 : }
187 :
188 26 : pg_free(outbuf);
189 26 : pg_free(readbuf);
190 :
191 26 : status = LZ4F_freeDecompressionContext(ctx);
192 26 : if (LZ4F_isError(status))
193 UNC 0 : pg_fatal("could not free LZ4 decompression context: %s",
194 : LZ4F_getErrorName(status));
195 GNC 26 : }
196 :
197 : static void
198 54 : WriteDataToArchiveLZ4(ArchiveHandle *AH, CompressorState *cs,
199 : const void *data, size_t dLen)
200 : {
201 54 : LZ4State *state = (LZ4State *) cs->private_data;
202 54 : size_t remaining = dLen;
203 : size_t status;
204 : size_t chunk;
205 :
206 : /* Write the header if not yet written. */
207 54 : if (state->needs_header_flush)
208 : {
209 25 : cs->writeF(AH, state->buffer, state->compressedlen);
210 25 : state->needs_header_flush = false;
211 : }
212 :
213 108 : while (remaining > 0)
214 : {
215 :
216 54 : if (remaining > DEFAULT_IO_BUFFER_SIZE)
217 UNC 0 : chunk = DEFAULT_IO_BUFFER_SIZE;
218 : else
219 GNC 54 : chunk = remaining;
220 :
221 54 : remaining -= chunk;
222 54 : status = LZ4F_compressUpdate(state->ctx,
223 54 : state->buffer, state->buflen,
224 : data, chunk, NULL);
225 :
226 54 : if (LZ4F_isError(status))
227 UNC 0 : pg_fatal("failed to LZ4 compress data: %s",
228 : LZ4F_getErrorName(status));
229 :
230 GNC 54 : cs->writeF(AH, state->buffer, status);
231 :
232 54 : data = ((char *) data) + chunk;
233 : }
234 54 : }
235 :
236 : static void
237 52 : EndCompressorLZ4(ArchiveHandle *AH, CompressorState *cs)
238 : {
239 52 : LZ4State *state = (LZ4State *) cs->private_data;
240 : size_t status;
241 :
242 : /* Nothing needs to be done */
243 52 : if (!state)
244 26 : return;
245 :
246 : /*
247 : * Write the header if not yet written. The caller is not required to call
248 : * writeData if the relation does not contain any data. Thus it is
249 : * possible to reach here without having flushed the header. Do it before
250 : * ending the compression.
251 : */
252 26 : if (state->needs_header_flush)
253 1 : cs->writeF(AH, state->buffer, state->compressedlen);
254 :
255 26 : status = LZ4F_compressEnd(state->ctx,
256 26 : state->buffer, state->buflen,
257 : NULL);
258 26 : if (LZ4F_isError(status))
259 UNC 0 : pg_fatal("failed to end compression: %s",
260 : LZ4F_getErrorName(status));
261 :
262 GNC 26 : cs->writeF(AH, state->buffer, status);
263 :
264 26 : status = LZ4F_freeCompressionContext(state->ctx);
265 26 : if (LZ4F_isError(status))
266 UNC 0 : pg_fatal("failed to end compression: %s",
267 : LZ4F_getErrorName(status));
268 :
269 GNC 26 : pg_free(state->buffer);
270 26 : pg_free(state);
271 :
272 26 : cs->private_data = NULL;
273 : }
274 :
275 : /*
276 : * Public routines that support LZ4 compressed data I/O
277 : */
278 : void
279 52 : InitCompressorLZ4(CompressorState *cs, const pg_compress_specification compression_spec)
280 : {
281 : LZ4State *state;
282 :
283 52 : cs->readData = ReadDataFromArchiveLZ4;
284 52 : cs->writeData = WriteDataToArchiveLZ4;
285 52 : cs->end = EndCompressorLZ4;
286 :
287 52 : cs->compression_spec = compression_spec;
288 :
289 : /*
290 : * Read operations have access to the whole input. No state needs to be
291 : * carried between calls.
292 : */
293 52 : if (cs->readF)
294 26 : return;
295 :
296 26 : state = pg_malloc0(sizeof(*state));
297 26 : if (cs->compression_spec.level >= 0)
298 26 : state->prefs.compressionLevel = cs->compression_spec.level;
299 :
300 26 : if (!LZ4State_compression_init(state))
301 UNC 0 : pg_fatal("could not initialize LZ4 compression: %s",
302 : LZ4F_getErrorName(state->errcode));
303 :
304 : /* Remember that the header has not been written. */
305 GNC 26 : state->needs_header_flush = true;
306 26 : cs->private_data = state;
307 : }
308 :
309 : /*----------------------
310 : * Compress Stream API
311 : *----------------------
312 : */
313 :
314 :
315 : /*
316 : * LZ4 equivalent to feof() or gzeof(). Return true iff there is no
317 : * decompressed output in the overflow buffer and the end of the backing file
318 : * is reached.
319 : */
320 : static bool
321 2 : LZ4Stream_eof(CompressFileHandle *CFH)
322 : {
323 2 : LZ4State *state = (LZ4State *) CFH->private_data;
324 :
325 2 : return state->overflowlen == 0 && feof(state->fp);
326 : }
327 :
328 : static const char *
329 UNC 0 : LZ4Stream_get_error(CompressFileHandle *CFH)
330 : {
331 0 : LZ4State *state = (LZ4State *) CFH->private_data;
332 : const char *errmsg;
333 :
334 0 : if (LZ4F_isError(state->errcode))
335 0 : errmsg = LZ4F_getErrorName(state->errcode);
336 : else
337 0 : errmsg = strerror(errno);
338 :
339 0 : return errmsg;
340 : }
341 :
342 : /*
343 : * Initialize an already alloc'ed LZ4State struct for subsequent calls.
344 : *
345 : * Creates the necessary contexts for either compresion or decompression. When
346 : * compressing data (indicated by compressing=true), it additionally writes the
347 : * LZ4 header in the output stream.
348 : *
349 : * Returns true on success. In case of a failure returns false, and stores the
350 : * error code in state->errcode.
351 : */
352 : static bool
353 GNC 1552 : LZ4Stream_init(LZ4State *state, int size, bool compressing)
354 : {
355 : size_t status;
356 :
357 1552 : if (state->inited)
358 1499 : return true;
359 :
360 53 : state->compressing = compressing;
361 53 : state->inited = true;
362 :
363 : /* When compressing, write LZ4 header to the output stream. */
364 53 : if (state->compressing)
365 : {
366 :
367 26 : if (!LZ4State_compression_init(state))
368 UNC 0 : return false;
369 :
370 GNC 26 : if (fwrite(state->buffer, 1, state->compressedlen, state->fp) != state->compressedlen)
371 : {
372 UNC 0 : errno = (errno) ? errno : ENOSPC;
373 0 : return false;
374 : }
375 : }
376 : else
377 : {
378 GNC 27 : status = LZ4F_createDecompressionContext(&state->dtx, LZ4F_VERSION);
379 27 : if (LZ4F_isError(status))
380 : {
381 UNC 0 : state->errcode = status;
382 0 : return false;
383 : }
384 :
385 GNC 27 : state->buflen = Max(size, DEFAULT_IO_BUFFER_SIZE);
386 27 : state->buffer = pg_malloc(state->buflen);
387 :
388 27 : state->overflowalloclen = state->buflen;
389 27 : state->overflowbuf = pg_malloc(state->overflowalloclen);
390 27 : state->overflowlen = 0;
391 : }
392 :
393 53 : return true;
394 : }
395 :
396 : /*
397 : * Read already decompressed content from the overflow buffer into 'ptr' up to
398 : * 'size' bytes, if available. If the eol_flag is set, then stop at the first
399 : * occurrence of the newline char prior to 'size' bytes.
400 : *
401 : * Any unread content in the overflow buffer is moved to the beginning.
402 : *
403 : * Returns the number of bytes read from the overflow buffer (and copied into
404 : * the 'ptr' buffer), or 0 if the overflow buffer is empty.
405 : */
406 : static int
407 54 : LZ4Stream_read_overflow(LZ4State *state, void *ptr, int size, bool eol_flag)
408 : {
409 : char *p;
410 54 : int readlen = 0;
411 :
412 54 : if (state->overflowlen == 0)
413 53 : return 0;
414 :
415 1 : if (state->overflowlen >= size)
416 UNC 0 : readlen = size;
417 : else
418 GNC 1 : readlen = state->overflowlen;
419 :
420 1 : if (eol_flag && (p = memchr(state->overflowbuf, '\n', readlen)))
421 : /* Include the line terminating char */
422 1 : readlen = p - state->overflowbuf + 1;
423 :
424 1 : memcpy(ptr, state->overflowbuf, readlen);
425 1 : state->overflowlen -= readlen;
426 :
427 1 : if (state->overflowlen > 0)
428 UNC 0 : memmove(state->overflowbuf, state->overflowbuf + readlen, state->overflowlen);
429 :
430 GNC 1 : return readlen;
431 : }
432 :
433 : /*
434 : * The workhorse for reading decompressed content out of an LZ4 compressed
435 : * stream.
436 : *
437 : * It will read up to 'ptrsize' decompressed content, or up to the new line
438 : * char if found first when the eol_flag is set. It is possible that the
439 : * decompressed output generated by reading any compressed input via the
440 : * LZ4F API, exceeds 'ptrsize'. Any exceeding decompressed content is stored
441 : * at an overflow buffer within LZ4State. Of course, when the function is
442 : * called, it will first try to consume any decompressed content already
443 : * present in the overflow buffer, before decompressing new content.
444 : *
445 : * Returns the number of bytes of decompressed data copied into the ptr
446 : * buffer, or -1 in case of error.
447 : */
448 : static int
449 54 : LZ4Stream_read_internal(LZ4State *state, void *ptr, int ptrsize, bool eol_flag)
450 : {
451 54 : int dsize = 0;
452 : int rsize;
453 54 : int size = ptrsize;
454 54 : bool eol_found = false;
455 :
456 : void *readbuf;
457 :
458 : /* Lazy init */
459 54 : if (!LZ4Stream_init(state, size, false /* decompressing */ ))
460 UNC 0 : return -1;
461 :
462 : /* Verify that there is enough space in the outbuf */
463 GNC 54 : if (size > state->buflen)
464 : {
465 UNC 0 : state->buflen = size;
466 0 : state->buffer = pg_realloc(state->buffer, size);
467 : }
468 :
469 : /* use already decompressed content if available */
470 GNC 54 : dsize = LZ4Stream_read_overflow(state, ptr, size, eol_flag);
471 54 : if (dsize == size || (eol_flag && memchr(ptr, '\n', dsize)))
472 1 : return dsize;
473 :
474 53 : readbuf = pg_malloc(size);
475 :
476 : do
477 : {
478 : char *rp;
479 : char *rend;
480 :
481 53 : rsize = fread(readbuf, 1, size, state->fp);
482 53 : if (rsize < size && !feof(state->fp))
483 UNC 0 : return -1;
484 :
485 GNC 53 : rp = (char *) readbuf;
486 53 : rend = (char *) readbuf + rsize;
487 :
488 79 : while (rp < rend)
489 : {
490 : size_t status;
491 26 : size_t outlen = state->buflen;
492 26 : size_t read_remain = rend - rp;
493 :
494 26 : memset(state->buffer, 0, outlen);
495 26 : status = LZ4F_decompress(state->dtx, state->buffer, &outlen,
496 : rp, &read_remain, NULL);
497 26 : if (LZ4F_isError(status))
498 : {
499 UNC 0 : state->errcode = status;
500 0 : return -1;
501 : }
502 :
503 GNC 26 : rp += read_remain;
504 :
505 : /*
506 : * fill in what space is available in ptr if the eol flag is set,
507 : * either skip if one already found or fill up to EOL if present
508 : * in the outbuf
509 : */
510 26 : if (outlen > 0 && dsize < size && eol_found == false)
511 : {
512 : char *p;
513 26 : size_t lib = (!eol_flag) ? size - dsize : size - 1 - dsize;
514 26 : size_t len = outlen < lib ? outlen : lib;
515 :
516 26 : if (eol_flag &&
517 1 : (p = memchr(state->buffer, '\n', outlen)) &&
518 1 : (size_t) (p - state->buffer + 1) <= len)
519 : {
520 1 : len = p - state->buffer + 1;
521 1 : eol_found = true;
522 : }
523 :
524 26 : memcpy((char *) ptr + dsize, state->buffer, len);
525 26 : dsize += len;
526 :
527 : /* move what did not fit, if any, at the beginning of the buf */
528 26 : if (len < outlen)
529 1 : memmove(state->buffer, state->buffer + len, outlen - len);
530 26 : outlen -= len;
531 : }
532 :
533 : /* if there is available output, save it */
534 26 : if (outlen > 0)
535 : {
536 1 : while (state->overflowlen + outlen > state->overflowalloclen)
537 : {
538 UNC 0 : state->overflowalloclen *= 2;
539 0 : state->overflowbuf = pg_realloc(state->overflowbuf,
540 : state->overflowalloclen);
541 : }
542 :
543 GNC 1 : memcpy(state->overflowbuf + state->overflowlen, state->buffer, outlen);
544 1 : state->overflowlen += outlen;
545 : }
546 : }
547 53 : } while (rsize == size && dsize < size && eol_found == false);
548 :
549 53 : pg_free(readbuf);
550 :
551 53 : return dsize;
552 : }
553 :
554 : /*
555 : * Compress size bytes from ptr and write them to the stream.
556 : */
557 : static bool
558 1498 : LZ4Stream_write(const void *ptr, size_t size, CompressFileHandle *CFH)
559 : {
560 1498 : LZ4State *state = (LZ4State *) CFH->private_data;
561 : size_t status;
562 1498 : int remaining = size;
563 :
564 : /* Lazy init */
565 1498 : if (!LZ4Stream_init(state, size, true))
566 UNC 0 : return false;
567 :
568 GNC 2996 : while (remaining > 0)
569 : {
570 1498 : int chunk = Min(remaining, DEFAULT_IO_BUFFER_SIZE);
571 :
572 1498 : remaining -= chunk;
573 :
574 1498 : status = LZ4F_compressUpdate(state->ctx, state->buffer, state->buflen,
575 : ptr, chunk, NULL);
576 1498 : if (LZ4F_isError(status))
577 : {
578 UNC 0 : state->errcode = status;
579 0 : return false;
580 : }
581 :
582 GNC 1498 : if (fwrite(state->buffer, 1, status, state->fp) != status)
583 : {
584 UNC 0 : errno = (errno) ? errno : ENOSPC;
585 0 : return false;
586 : }
587 : }
588 :
589 GNC 1498 : return true;
590 : }
591 :
592 : /*
593 : * fread() equivalent implementation for LZ4 compressed files.
594 : */
595 : static bool
596 51 : LZ4Stream_read(void *ptr, size_t size, size_t *rsize, CompressFileHandle *CFH)
597 : {
598 51 : LZ4State *state = (LZ4State *) CFH->private_data;
599 : int ret;
600 :
601 51 : if ((ret = LZ4Stream_read_internal(state, ptr, size, false)) < 0)
602 UNC 0 : pg_fatal("could not read from input file: %s", LZ4Stream_get_error(CFH));
603 :
604 GNC 51 : if (rsize)
605 51 : *rsize = (size_t) ret;
606 :
607 51 : return true;
608 : }
609 :
610 : /*
611 : * fgetc() equivalent implementation for LZ4 compressed files.
612 : */
613 : static int
614 UNC 0 : LZ4Stream_getc(CompressFileHandle *CFH)
615 : {
616 0 : LZ4State *state = (LZ4State *) CFH->private_data;
617 : unsigned char c;
618 :
619 0 : if (LZ4Stream_read_internal(state, &c, 1, false) <= 0)
620 : {
621 0 : if (!LZ4Stream_eof(CFH))
622 0 : pg_fatal("could not read from input file: %s", LZ4Stream_get_error(CFH));
623 : else
624 0 : pg_fatal("could not read from input file: end of file");
625 : }
626 :
627 0 : return c;
628 : }
629 :
630 : /*
631 : * fgets() equivalent implementation for LZ4 compressed files.
632 : */
633 : static char *
634 GNC 3 : LZ4Stream_gets(char *ptr, int size, CompressFileHandle *CFH)
635 : {
636 3 : LZ4State *state = (LZ4State *) CFH->private_data;
637 : int ret;
638 :
639 3 : ret = LZ4Stream_read_internal(state, ptr, size, true);
640 3 : if (ret < 0 || (ret == 0 && !LZ4Stream_eof(CFH)))
641 UNC 0 : pg_fatal("could not read from input file: %s", LZ4Stream_get_error(CFH));
642 :
643 : /* Done reading */
644 GNC 3 : if (ret == 0)
645 1 : return NULL;
646 :
647 2 : return ptr;
648 : }
649 :
650 : /*
651 : * Finalize (de)compression of a stream. When compressing it will write any
652 : * remaining content and/or generated footer from the LZ4 API.
653 : */
654 : static bool
655 54 : LZ4Stream_close(CompressFileHandle *CFH)
656 : {
657 : FILE *fp;
658 54 : LZ4State *state = (LZ4State *) CFH->private_data;
659 : size_t status;
660 :
661 54 : fp = state->fp;
662 54 : if (state->inited)
663 : {
664 53 : if (state->compressing)
665 : {
666 26 : status = LZ4F_compressEnd(state->ctx, state->buffer, state->buflen, NULL);
667 26 : if (LZ4F_isError(status))
668 UNC 0 : pg_fatal("failed to end compression: %s",
669 : LZ4F_getErrorName(status));
670 GNC 26 : else if (fwrite(state->buffer, 1, status, state->fp) != status)
671 : {
672 UNC 0 : errno = (errno) ? errno : ENOSPC;
673 0 : WRITE_ERROR_EXIT;
674 : }
675 :
676 GNC 26 : status = LZ4F_freeCompressionContext(state->ctx);
677 26 : if (LZ4F_isError(status))
678 UNC 0 : pg_fatal("failed to end compression: %s",
679 : LZ4F_getErrorName(status));
680 : }
681 : else
682 : {
683 GNC 27 : status = LZ4F_freeDecompressionContext(state->dtx);
684 27 : if (LZ4F_isError(status))
685 UNC 0 : pg_fatal("failed to end decompression: %s",
686 : LZ4F_getErrorName(status));
687 GNC 27 : pg_free(state->overflowbuf);
688 : }
689 :
690 53 : pg_free(state->buffer);
691 : }
692 :
693 54 : pg_free(state);
694 :
695 54 : return fclose(fp) == 0;
696 : }
697 :
698 : static bool
699 54 : LZ4Stream_open(const char *path, int fd, const char *mode,
700 : CompressFileHandle *CFH)
701 : {
702 : FILE *fp;
703 54 : LZ4State *state = (LZ4State *) CFH->private_data;
704 :
705 54 : if (fd >= 0)
706 UNC 0 : fp = fdopen(fd, mode);
707 : else
708 GNC 54 : fp = fopen(path, mode);
709 54 : if (fp == NULL)
710 : {
711 UNC 0 : state->errcode = errno;
712 0 : return false;
713 : }
714 :
715 GNC 54 : state->fp = fp;
716 :
717 54 : return true;
718 : }
719 :
720 : static bool
721 26 : LZ4Stream_open_write(const char *path, const char *mode, CompressFileHandle *CFH)
722 : {
723 : char *fname;
724 : int save_errno;
725 : bool ret;
726 :
727 26 : fname = psprintf("%s.lz4", path);
728 26 : ret = CFH->open_func(fname, -1, mode, CFH);
729 :
730 26 : save_errno = errno;
731 26 : pg_free(fname);
732 26 : errno = save_errno;
733 :
734 26 : return ret;
735 : }
736 :
737 : /*
738 : * Public routines
739 : */
740 : void
741 54 : InitCompressFileHandleLZ4(CompressFileHandle *CFH,
742 : const pg_compress_specification compression_spec)
743 : {
744 : LZ4State *state;
745 :
746 54 : CFH->open_func = LZ4Stream_open;
747 54 : CFH->open_write_func = LZ4Stream_open_write;
748 54 : CFH->read_func = LZ4Stream_read;
749 54 : CFH->write_func = LZ4Stream_write;
750 54 : CFH->gets_func = LZ4Stream_gets;
751 54 : CFH->getc_func = LZ4Stream_getc;
752 54 : CFH->eof_func = LZ4Stream_eof;
753 54 : CFH->close_func = LZ4Stream_close;
754 54 : CFH->get_error_func = LZ4Stream_get_error;
755 :
756 54 : CFH->compression_spec = compression_spec;
757 54 : state = pg_malloc0(sizeof(*state));
758 54 : if (CFH->compression_spec.level >= 0)
759 54 : state->prefs.compressionLevel = CFH->compression_spec.level;
760 :
761 54 : CFH->private_data = state;
762 54 : }
763 : #else /* USE_LZ4 */
764 : void
765 : InitCompressorLZ4(CompressorState *cs,
766 : const pg_compress_specification compression_spec)
767 : {
768 : pg_fatal("this build does not support compression with %s", "LZ4");
769 : }
770 :
771 : void
772 : InitCompressFileHandleLZ4(CompressFileHandle *CFH,
773 : const pg_compress_specification compression_spec)
774 : {
775 : pg_fatal("this build does not support compression with %s", "LZ4");
776 : }
777 : #endif /* USE_LZ4 */
|