Age Owner Branch data TLA Line data Source code
1 : : /*-------------------------------------------------------------------------
2 : : *
3 : : * compress_lz4.c
4 : : * Routines for archivers to write a LZ4 compressed data stream.
5 : : *
6 : : * Portions Copyright (c) 1996-2024, PostgreSQL Global Development Group
7 : : * Portions Copyright (c) 1994, Regents of the University of California
8 : : *
9 : : * IDENTIFICATION
10 : : * src/bin/pg_dump/compress_lz4.c
11 : : *
12 : : *-------------------------------------------------------------------------
13 : : */
14 : : #include "postgres_fe.h"
15 : :
16 : : #include "compress_lz4.h"
17 : : #include "pg_backup_utils.h"
18 : :
19 : : #ifdef USE_LZ4
20 : : #include <lz4frame.h>
21 : :
22 : : /*
23 : : * LZ4F_HEADER_SIZE_MAX first appeared in v1.7.5 of the library.
24 : : * Redefine it for installations with a lesser version.
25 : : */
26 : : #ifndef LZ4F_HEADER_SIZE_MAX
27 : : #define LZ4F_HEADER_SIZE_MAX 32
28 : : #endif
29 : :
30 : : /*---------------------------------
31 : : * Common to both compression APIs
32 : : *---------------------------------
33 : : */
34 : :
35 : : /*
36 : : * (de)compression state used by both the Compressor and Stream APIs.
37 : : */
38 : : typedef struct LZ4State
39 : : {
40 : : /*
41 : : * Used by the Stream API to keep track of the file stream.
42 : : */
43 : : FILE *fp;
44 : :
45 : : LZ4F_preferences_t prefs;
46 : :
47 : : LZ4F_compressionContext_t ctx;
48 : : LZ4F_decompressionContext_t dtx;
49 : :
50 : : /*
51 : : * Used by the Stream API's lazy initialization.
52 : : */
53 : : bool inited;
54 : :
55 : : /*
56 : : * Used by the Stream API to distinguish between compression and
57 : : * decompression operations.
58 : : */
59 : : bool compressing;
60 : :
61 : : /*
62 : : * Used by the Compressor API to mark if the compression headers have been
63 : : * written after initialization.
64 : : */
65 : : bool needs_header_flush;
66 : :
67 : : size_t buflen;
68 : : char *buffer;
69 : :
70 : : /*
71 : : * Used by the Stream API to store already uncompressed data that the
72 : : * caller has not consumed.
73 : : */
74 : : size_t overflowalloclen;
75 : : size_t overflowlen;
76 : : char *overflowbuf;
77 : :
78 : : /*
79 : : * Used by both APIs to keep track of the compressed data length stored in
80 : : * the buffer.
81 : : */
82 : : size_t compressedlen;
83 : :
84 : : /*
85 : : * Used by both APIs to keep track of error codes.
86 : : */
87 : : size_t errcode;
88 : : } LZ4State;
89 : :
90 : : /*
91 : : * LZ4State_compression_init
92 : : * Initialize the required LZ4State members for compression.
93 : : *
94 : : * Write the LZ4 frame header in a buffer keeping track of its length. Users of
95 : : * this function can choose when and how to write the header to a file stream.
96 : : *
97 : : * Returns true on success. In case of a failure returns false, and stores the
98 : : * error code in state->errcode.
99 : : */
100 : : static bool
379 tomas.vondra@postgre 101 :CBC 64 : LZ4State_compression_init(LZ4State *state)
102 : : {
103 : : size_t status;
104 : :
105 : 64 : state->buflen = LZ4F_compressBound(DEFAULT_IO_BUFFER_SIZE, &state->prefs);
106 : :
107 : : /*
108 : : * LZ4F_compressBegin requires a buffer that is greater or equal to
109 : : * LZ4F_HEADER_SIZE_MAX. Verify that the requirement is met.
110 : : */
111 [ - + ]: 64 : if (state->buflen < LZ4F_HEADER_SIZE_MAX)
379 tomas.vondra@postgre 112 :UBC 0 : state->buflen = LZ4F_HEADER_SIZE_MAX;
113 : :
379 tomas.vondra@postgre 114 :CBC 64 : status = LZ4F_createCompressionContext(&state->ctx, LZ4F_VERSION);
115 [ - + ]: 64 : if (LZ4F_isError(status))
116 : : {
379 tomas.vondra@postgre 117 :UBC 0 : state->errcode = status;
118 : 0 : return false;
119 : : }
120 : :
379 tomas.vondra@postgre 121 :CBC 64 : state->buffer = pg_malloc(state->buflen);
122 : 64 : status = LZ4F_compressBegin(state->ctx,
123 : 64 : state->buffer, state->buflen,
124 : 64 : &state->prefs);
125 [ - + ]: 64 : if (LZ4F_isError(status))
126 : : {
379 tomas.vondra@postgre 127 :UBC 0 : state->errcode = status;
128 : 0 : return false;
129 : : }
130 : :
379 tomas.vondra@postgre 131 :CBC 64 : state->compressedlen = status;
132 : :
133 : 64 : return true;
134 : : }
135 : :
136 : : /*----------------------
137 : : * Compressor API
138 : : *----------------------
139 : : */
140 : :
141 : : /* Private routines that support LZ4 compressed data I/O */
142 : :
143 : : static void
416 144 : 32 : ReadDataFromArchiveLZ4(ArchiveHandle *AH, CompressorState *cs)
145 : : {
146 : : size_t r;
147 : : size_t readbuflen;
148 : : char *outbuf;
149 : : char *readbuf;
379 150 : 32 : LZ4F_decompressionContext_t ctx = NULL;
151 : : LZ4F_decompressOptions_t dec_opt;
152 : : LZ4F_errorCode_t status;
153 : :
154 : 32 : memset(&dec_opt, 0, sizeof(dec_opt));
155 : 32 : status = LZ4F_createDecompressionContext(&ctx, LZ4F_VERSION);
156 [ - + ]: 32 : if (LZ4F_isError(status))
379 tomas.vondra@postgre 157 :UBC 0 : pg_fatal("could not create LZ4 decompression context: %s",
158 : : LZ4F_getErrorName(status));
159 : :
379 tomas.vondra@postgre 160 :CBC 32 : outbuf = pg_malloc0(DEFAULT_IO_BUFFER_SIZE);
161 : 32 : readbuf = pg_malloc0(DEFAULT_IO_BUFFER_SIZE);
162 : 32 : readbuflen = DEFAULT_IO_BUFFER_SIZE;
163 [ + + ]: 96 : while ((r = cs->readF(AH, &readbuf, &readbuflen)) > 0)
164 : : {
165 : : char *readp;
166 : : char *readend;
167 : :
168 : : /* Process one chunk */
169 : 64 : readp = readbuf;
170 : 64 : readend = readbuf + r;
171 [ + + ]: 131 : while (readp < readend)
172 : : {
173 : 67 : size_t out_size = DEFAULT_IO_BUFFER_SIZE;
174 : 67 : size_t read_size = readend - readp;
175 : :
176 : 67 : memset(outbuf, 0, DEFAULT_IO_BUFFER_SIZE);
177 : 67 : status = LZ4F_decompress(ctx, outbuf, &out_size,
178 : : readp, &read_size, &dec_opt);
179 [ - + ]: 67 : if (LZ4F_isError(status))
379 tomas.vondra@postgre 180 :UBC 0 : pg_fatal("could not decompress: %s",
181 : : LZ4F_getErrorName(status));
182 : :
379 tomas.vondra@postgre 183 :CBC 67 : ahwrite(outbuf, 1, out_size, AH);
184 : 67 : readp += read_size;
185 : : }
186 : : }
187 : :
188 : 32 : pg_free(outbuf);
189 : 32 : pg_free(readbuf);
190 : :
191 : 32 : status = LZ4F_freeDecompressionContext(ctx);
192 [ - + ]: 32 : if (LZ4F_isError(status))
379 tomas.vondra@postgre 193 :UBC 0 : pg_fatal("could not free LZ4 decompression context: %s",
194 : : LZ4F_getErrorName(status));
416 tomas.vondra@postgre 195 :CBC 32 : }
196 : :
197 : : static void
198 : 61 : WriteDataToArchiveLZ4(ArchiveHandle *AH, CompressorState *cs,
199 : : const void *data, size_t dLen)
200 : : {
379 201 : 61 : LZ4State *state = (LZ4State *) cs->private_data;
202 : 61 : size_t remaining = dLen;
203 : : size_t status;
204 : : size_t chunk;
205 : :
206 : : /* Write the header if not yet written. */
207 [ + + ]: 61 : if (state->needs_header_flush)
208 : : {
209 : 31 : cs->writeF(AH, state->buffer, state->compressedlen);
210 : 31 : state->needs_header_flush = false;
211 : : }
212 : :
213 [ + + ]: 125 : while (remaining > 0)
214 : : {
215 : :
216 [ + + ]: 64 : if (remaining > DEFAULT_IO_BUFFER_SIZE)
217 : 3 : chunk = DEFAULT_IO_BUFFER_SIZE;
218 : : else
219 : 61 : chunk = remaining;
220 : :
221 : 64 : remaining -= chunk;
222 : 64 : status = LZ4F_compressUpdate(state->ctx,
223 : 64 : state->buffer, state->buflen,
224 : : data, chunk, NULL);
225 : :
226 [ - + ]: 64 : if (LZ4F_isError(status))
331 peter@eisentraut.org 227 :UBC 0 : pg_fatal("could not compress data: %s",
228 : : LZ4F_getErrorName(status));
229 : :
379 tomas.vondra@postgre 230 :CBC 64 : cs->writeF(AH, state->buffer, status);
231 : :
232 : 64 : data = ((char *) data) + chunk;
233 : : }
416 234 : 61 : }
235 : :
236 : : static void
237 : 64 : EndCompressorLZ4(ArchiveHandle *AH, CompressorState *cs)
238 : : {
379 239 : 64 : LZ4State *state = (LZ4State *) cs->private_data;
240 : : size_t status;
241 : :
242 : : /* Nothing needs to be done */
243 [ + + ]: 64 : if (!state)
244 : 32 : return;
245 : :
246 : : /*
247 : : * Write the header if not yet written. The caller is not required to call
248 : : * writeData if the relation does not contain any data. Thus it is
249 : : * possible to reach here without having flushed the header. Do it before
250 : : * ending the compression.
251 : : */
252 [ + + ]: 32 : if (state->needs_header_flush)
253 : 1 : cs->writeF(AH, state->buffer, state->compressedlen);
254 : :
255 : 32 : status = LZ4F_compressEnd(state->ctx,
256 : 32 : state->buffer, state->buflen,
257 : : NULL);
258 [ - + ]: 32 : if (LZ4F_isError(status))
331 peter@eisentraut.org 259 :UBC 0 : pg_fatal("could not end compression: %s",
260 : : LZ4F_getErrorName(status));
261 : :
379 tomas.vondra@postgre 262 :CBC 32 : cs->writeF(AH, state->buffer, status);
263 : :
264 : 32 : status = LZ4F_freeCompressionContext(state->ctx);
265 [ - + ]: 32 : if (LZ4F_isError(status))
331 peter@eisentraut.org 266 :UBC 0 : pg_fatal("could not end compression: %s",
267 : : LZ4F_getErrorName(status));
268 : :
379 tomas.vondra@postgre 269 :CBC 32 : pg_free(state->buffer);
270 : 32 : pg_free(state);
271 : :
272 : 32 : cs->private_data = NULL;
273 : : }
274 : :
275 : : /*
276 : : * Public routines that support LZ4 compressed data I/O
277 : : */
278 : : void
416 279 : 64 : InitCompressorLZ4(CompressorState *cs, const pg_compress_specification compression_spec)
280 : : {
281 : : LZ4State *state;
282 : :
283 : 64 : cs->readData = ReadDataFromArchiveLZ4;
284 : 64 : cs->writeData = WriteDataToArchiveLZ4;
285 : 64 : cs->end = EndCompressorLZ4;
286 : :
287 : 64 : cs->compression_spec = compression_spec;
288 : :
289 : : /*
290 : : * Read operations have access to the whole input. No state needs to be
291 : : * carried between calls.
292 : : */
379 293 [ + + ]: 64 : if (cs->readF)
294 : 32 : return;
295 : :
296 : 32 : state = pg_malloc0(sizeof(*state));
297 [ + - ]: 32 : if (cs->compression_spec.level >= 0)
298 : 32 : state->prefs.compressionLevel = cs->compression_spec.level;
299 : :
300 [ - + ]: 32 : if (!LZ4State_compression_init(state))
379 tomas.vondra@postgre 301 :UBC 0 : pg_fatal("could not initialize LZ4 compression: %s",
302 : : LZ4F_getErrorName(state->errcode));
303 : :
304 : : /* Remember that the header has not been written. */
379 tomas.vondra@postgre 305 :CBC 32 : state->needs_header_flush = true;
306 : 32 : cs->private_data = state;
307 : : }
308 : :
309 : : /*----------------------
310 : : * Compress Stream API
311 : : *----------------------
312 : : */
313 : :
314 : :
315 : : /*
316 : : * LZ4 equivalent to feof() or gzeof(). Return true iff there is no
317 : : * decompressed output in the overflow buffer and the end of the backing file
318 : : * is reached.
319 : : */
320 : : static bool
379 tomas.vondra@postgre 321 :LBC (2) : LZ4Stream_eof(CompressFileHandle *CFH)
322 : : {
323 : (2) : LZ4State *state = (LZ4State *) CFH->private_data;
324 : :
325 [ # # # # ]: (2) : return state->overflowlen == 0 && feof(state->fp);
326 : : }
327 : :
328 : : static const char *
379 tomas.vondra@postgre 329 :UBC 0 : LZ4Stream_get_error(CompressFileHandle *CFH)
330 : : {
331 : 0 : LZ4State *state = (LZ4State *) CFH->private_data;
332 : : const char *errmsg;
333 : :
334 [ # # ]: 0 : if (LZ4F_isError(state->errcode))
335 : 0 : errmsg = LZ4F_getErrorName(state->errcode);
336 : : else
416 337 : 0 : errmsg = strerror(errno);
338 : :
339 : 0 : return errmsg;
340 : : }
341 : :
342 : : /*
343 : : * Initialize an already alloc'ed LZ4State struct for subsequent calls.
344 : : *
345 : : * Creates the necessary contexts for either compression or decompression. When
346 : : * compressing data (indicated by compressing=true), it additionally writes the
347 : : * LZ4 header in the output stream.
348 : : *
349 : : * Returns true on success. In case of a failure returns false, and stores the
350 : : * error code in state->errcode.
351 : : */
352 : : static bool
379 tomas.vondra@postgre 353 :CBC 1712 : LZ4Stream_init(LZ4State *state, int size, bool compressing)
354 : : {
355 : : size_t status;
356 : :
357 [ + + ]: 1712 : if (state->inited)
388 358 : 1648 : return true;
359 : :
379 360 : 64 : state->compressing = compressing;
361 : 64 : state->inited = true;
362 : :
363 : : /* When compressing, write LZ4 header to the output stream. */
364 [ + + ]: 64 : if (state->compressing)
365 : : {
366 : :
367 [ - + ]: 32 : if (!LZ4State_compression_init(state))
388 tomas.vondra@postgre 368 :UBC 0 : return false;
369 : :
379 tomas.vondra@postgre 370 [ - + ]:CBC 32 : if (fwrite(state->buffer, 1, state->compressedlen, state->fp) != state->compressedlen)
371 : : {
416 tomas.vondra@postgre 372 [ # # ]:UBC 0 : errno = (errno) ? errno : ENOSPC;
388 373 : 0 : return false;
374 : : }
375 : : }
376 : : else
377 : : {
379 tomas.vondra@postgre 378 :CBC 32 : status = LZ4F_createDecompressionContext(&state->dtx, LZ4F_VERSION);
416 379 [ - + ]: 32 : if (LZ4F_isError(status))
380 : : {
379 tomas.vondra@postgre 381 :UBC 0 : state->errcode = status;
388 382 : 0 : return false;
383 : : }
384 : :
379 tomas.vondra@postgre 385 :CBC 32 : state->buflen = Max(size, DEFAULT_IO_BUFFER_SIZE);
386 : 32 : state->buffer = pg_malloc(state->buflen);
387 : :
388 : 32 : state->overflowalloclen = state->buflen;
389 : 32 : state->overflowbuf = pg_malloc(state->overflowalloclen);
390 : 32 : state->overflowlen = 0;
391 : : }
392 : :
388 393 : 64 : return true;
394 : : }
395 : :
396 : : /*
397 : : * Read already decompressed content from the overflow buffer into 'ptr' up to
398 : : * 'size' bytes, if available. If the eol_flag is set, then stop at the first
399 : : * occurrence of the newline char prior to 'size' bytes.
400 : : *
401 : : * Any unread content in the overflow buffer is moved to the beginning.
402 : : *
403 : : * Returns the number of bytes read from the overflow buffer (and copied into
404 : : * the 'ptr' buffer), or 0 if the overflow buffer is empty.
405 : : */
406 : : static int
379 407 : 66 : LZ4Stream_read_overflow(LZ4State *state, void *ptr, int size, bool eol_flag)
408 : : {
409 : : char *p;
416 410 : 66 : int readlen = 0;
411 : :
379 412 [ + + ]: 66 : if (state->overflowlen == 0)
416 413 : 63 : return 0;
414 : :
379 415 [ + + ]: 3 : if (state->overflowlen >= size)
416 416 : 2 : readlen = size;
417 : : else
379 418 : 1 : readlen = state->overflowlen;
419 : :
420 [ - + - - ]: 3 : if (eol_flag && (p = memchr(state->overflowbuf, '\n', readlen)))
421 : : /* Include the line terminating char */
379 tomas.vondra@postgre 422 :LBC (1) : readlen = p - state->overflowbuf + 1;
423 : :
379 tomas.vondra@postgre 424 :CBC 3 : memcpy(ptr, state->overflowbuf, readlen);
425 : 3 : state->overflowlen -= readlen;
426 : :
427 [ + + ]: 3 : if (state->overflowlen > 0)
428 : 2 : memmove(state->overflowbuf, state->overflowbuf + readlen, state->overflowlen);
429 : :
416 430 : 3 : return readlen;
431 : : }
432 : :
433 : : /*
434 : : * The workhorse for reading decompressed content out of an LZ4 compressed
435 : : * stream.
436 : : *
437 : : * It will read up to 'ptrsize' decompressed content, or up to the new line
438 : : * char if found first when the eol_flag is set. It is possible that the
439 : : * decompressed output generated by reading any compressed input via the
440 : : * LZ4F API, exceeds 'ptrsize'. Any exceeding decompressed content is stored
441 : : * at an overflow buffer within LZ4State. Of course, when the function is
442 : : * called, it will first try to consume any decompressed content already
443 : : * present in the overflow buffer, before decompressing new content.
444 : : *
445 : : * Returns the number of bytes of decompressed data copied into the ptr
446 : : * buffer, or -1 in case of error.
447 : : */
448 : : static int
379 449 : 66 : LZ4Stream_read_internal(LZ4State *state, void *ptr, int ptrsize, bool eol_flag)
450 : : {
388 451 : 66 : int dsize = 0;
452 : : int rsize;
453 : 66 : int size = ptrsize;
416 454 : 66 : bool eol_found = false;
455 : :
456 : : void *readbuf;
457 : :
458 : : /* Lazy init */
379 459 [ - + ]: 66 : if (!LZ4Stream_init(state, size, false /* decompressing */ ))
416 tomas.vondra@postgre 460 :UBC 0 : return -1;
461 : :
462 : : /* No work needs to be done for a zero-sized output buffer */
333 tomas.vondra@postgre 463 [ - + ]:CBC 66 : if (size <= 0)
333 tomas.vondra@postgre 464 :UBC 0 : return 0;
465 : :
466 : : /* Verify that there is enough space in the outbuf */
379 tomas.vondra@postgre 467 [ - + ]:CBC 66 : if (size > state->buflen)
468 : : {
379 tomas.vondra@postgre 469 :UBC 0 : state->buflen = size;
470 : 0 : state->buffer = pg_realloc(state->buffer, size);
471 : : }
472 : :
473 : : /* use already decompressed content if available */
379 tomas.vondra@postgre 474 :CBC 66 : dsize = LZ4Stream_read_overflow(state, ptr, size, eol_flag);
416 475 [ + + - + : 66 : if (dsize == size || (eol_flag && memchr(ptr, '\n', dsize)))
- - ]
476 : 2 : return dsize;
477 : :
478 : 64 : readbuf = pg_malloc(size);
479 : :
480 : : do
481 : : {
482 : : char *rp;
483 : : char *rend;
484 : :
379 485 : 67 : rsize = fread(readbuf, 1, size, state->fp);
486 [ + + - + ]: 67 : if (rsize < size && !feof(state->fp))
416 tomas.vondra@postgre 487 :UBC 0 : return -1;
488 : :
416 tomas.vondra@postgre 489 :CBC 67 : rp = (char *) readbuf;
490 : 67 : rend = (char *) readbuf + rsize;
491 : :
492 [ + + ]: 104 : while (rp < rend)
493 : : {
494 : : size_t status;
379 495 : 37 : size_t outlen = state->buflen;
416 496 : 37 : size_t read_remain = rend - rp;
497 : :
379 498 : 37 : memset(state->buffer, 0, outlen);
499 : 37 : status = LZ4F_decompress(state->dtx, state->buffer, &outlen,
500 : : rp, &read_remain, NULL);
416 501 [ - + ]: 37 : if (LZ4F_isError(status))
502 : : {
379 tomas.vondra@postgre 503 :UBC 0 : state->errcode = status;
416 504 : 0 : return -1;
505 : : }
506 : :
416 tomas.vondra@postgre 507 :CBC 37 : rp += read_remain;
508 : :
509 : : /*
510 : : * fill in what space is available in ptr if the eol flag is set,
511 : : * either skip if one already found or fill up to EOL if present
512 : : * in the outbuf
513 : : */
514 [ + + + + : 37 : if (outlen > 0 && dsize < size && eol_found == false)
+ - ]
515 : : {
516 : : char *p;
517 [ + - ]: 31 : size_t lib = (!eol_flag) ? size - dsize : size - 1 - dsize;
518 : 31 : size_t len = outlen < lib ? outlen : lib;
519 : :
520 [ - + ]: 31 : if (eol_flag &&
379 tomas.vondra@postgre 521 [ # # ]:LBC (1) : (p = memchr(state->buffer, '\n', outlen)) &&
522 [ # # ]: (1) : (size_t) (p - state->buffer + 1) <= len)
523 : : {
524 : (1) : len = p - state->buffer + 1;
416 525 : (1) : eol_found = true;
526 : : }
527 : :
379 tomas.vondra@postgre 528 :CBC 31 : memcpy((char *) ptr + dsize, state->buffer, len);
416 529 : 31 : dsize += len;
530 : :
531 : : /* move what did not fit, if any, at the beginning of the buf */
532 [ - + ]: 31 : if (len < outlen)
379 tomas.vondra@postgre 533 :LBC (1) : memmove(state->buffer, state->buffer + len, outlen - len);
416 tomas.vondra@postgre 534 :CBC 31 : outlen -= len;
535 : : }
536 : :
537 : : /* if there is available output, save it */
538 [ + + ]: 37 : if (outlen > 0)
539 : : {
379 540 [ + + ]: 5 : while (state->overflowlen + outlen > state->overflowalloclen)
541 : : {
542 : 2 : state->overflowalloclen *= 2;
543 : 2 : state->overflowbuf = pg_realloc(state->overflowbuf,
544 : : state->overflowalloclen);
545 : : }
546 : :
547 : 3 : memcpy(state->overflowbuf + state->overflowlen, state->buffer, outlen);
548 : 3 : state->overflowlen += outlen;
549 : : }
550 : : }
388 551 [ + + + - : 67 : } while (rsize == size && dsize < size && eol_found == false);
+ - ]
552 : :
416 553 : 64 : pg_free(readbuf);
554 : :
388 555 : 64 : return dsize;
556 : : }
557 : :
558 : : /*
559 : : * Compress size bytes from ptr and write them to the stream.
560 : : */
561 : : static bool
379 562 : 1646 : LZ4Stream_write(const void *ptr, size_t size, CompressFileHandle *CFH)
563 : : {
564 : 1646 : LZ4State *state = (LZ4State *) CFH->private_data;
565 : : size_t status;
416 566 : 1646 : int remaining = size;
567 : :
568 : : /* Lazy init */
379 569 [ - + ]: 1646 : if (!LZ4Stream_init(state, size, true))
388 tomas.vondra@postgre 570 :UBC 0 : return false;
571 : :
416 tomas.vondra@postgre 572 [ + + ]:CBC 3298 : while (remaining > 0)
573 : : {
388 574 : 1652 : int chunk = Min(remaining, DEFAULT_IO_BUFFER_SIZE);
575 : :
416 576 : 1652 : remaining -= chunk;
577 : :
379 578 : 1652 : status = LZ4F_compressUpdate(state->ctx, state->buffer, state->buflen,
579 : : ptr, chunk, NULL);
416 580 [ - + ]: 1652 : if (LZ4F_isError(status))
581 : : {
379 tomas.vondra@postgre 582 :UBC 0 : state->errcode = status;
388 583 : 0 : return false;
584 : : }
585 : :
379 tomas.vondra@postgre 586 [ - + ]:CBC 1652 : if (fwrite(state->buffer, 1, status, state->fp) != status)
587 : : {
416 tomas.vondra@postgre 588 [ # # ]:UBC 0 : errno = (errno) ? errno : ENOSPC;
388 589 : 0 : return false;
590 : : }
591 : :
333 tomas.vondra@postgre 592 :CBC 1652 : ptr = ((const char *) ptr) + chunk;
593 : : }
594 : :
388 595 : 1646 : return true;
596 : : }
597 : :
598 : : /*
599 : : * fread() equivalent implementation for LZ4 compressed files.
600 : : */
601 : : static bool
379 602 : 66 : LZ4Stream_read(void *ptr, size_t size, size_t *rsize, CompressFileHandle *CFH)
603 : : {
604 : 66 : LZ4State *state = (LZ4State *) CFH->private_data;
605 : : int ret;
606 : :
607 [ - + ]: 66 : if ((ret = LZ4Stream_read_internal(state, ptr, size, false)) < 0)
379 tomas.vondra@postgre 608 :UBC 0 : pg_fatal("could not read from input file: %s", LZ4Stream_get_error(CFH));
609 : :
388 tomas.vondra@postgre 610 [ + - ]:CBC 66 : if (rsize)
611 : 66 : *rsize = (size_t) ret;
612 : :
613 : 66 : return true;
614 : : }
615 : :
616 : : /*
617 : : * fgetc() equivalent implementation for LZ4 compressed files.
618 : : */
619 : : static int
379 tomas.vondra@postgre 620 :UBC 0 : LZ4Stream_getc(CompressFileHandle *CFH)
621 : : {
622 : 0 : LZ4State *state = (LZ4State *) CFH->private_data;
623 : : unsigned char c;
624 : :
625 [ # # ]: 0 : if (LZ4Stream_read_internal(state, &c, 1, false) <= 0)
626 : : {
627 [ # # ]: 0 : if (!LZ4Stream_eof(CFH))
628 : 0 : pg_fatal("could not read from input file: %s", LZ4Stream_get_error(CFH));
629 : : else
416 630 : 0 : pg_fatal("could not read from input file: end of file");
631 : : }
632 : :
633 : 0 : return c;
634 : : }
635 : :
636 : : /*
637 : : * fgets() equivalent implementation for LZ4 compressed files.
638 : : */
639 : : static char *
379 tomas.vondra@postgre 640 :LBC (3) : LZ4Stream_gets(char *ptr, int size, CompressFileHandle *CFH)
641 : : {
642 : (3) : LZ4State *state = (LZ4State *) CFH->private_data;
643 : : int ret;
644 : :
333 645 : (3) : ret = LZ4Stream_read_internal(state, ptr, size - 1, true);
379 646 [ # # # # : (3) : if (ret < 0 || (ret == 0 && !LZ4Stream_eof(CFH)))
# # ]
379 tomas.vondra@postgre 647 :UBC 0 : pg_fatal("could not read from input file: %s", LZ4Stream_get_error(CFH));
648 : :
649 : : /* Done reading */
388 tomas.vondra@postgre 650 [ # # ]:LBC (3) : if (ret == 0)
416 651 : (1) : return NULL;
652 : :
653 : : /*
654 : : * Our caller expects the return string to be NULL terminated and we know
655 : : * that ret is greater than zero.
656 : : */
333 657 : (2) : ptr[ret - 1] = '\0';
658 : :
416 659 : (2) : return ptr;
660 : : }
661 : :
662 : : /*
663 : : * Finalize (de)compression of a stream. When compressing it will write any
664 : : * remaining content and/or generated footer from the LZ4 API.
665 : : */
666 : : static bool
379 tomas.vondra@postgre 667 :CBC 65 : LZ4Stream_close(CompressFileHandle *CFH)
668 : : {
669 : : FILE *fp;
670 : 65 : LZ4State *state = (LZ4State *) CFH->private_data;
671 : : size_t status;
672 : :
673 : 65 : fp = state->fp;
674 [ + + ]: 65 : if (state->inited)
675 : : {
676 [ + + ]: 64 : if (state->compressing)
677 : : {
678 : 32 : status = LZ4F_compressEnd(state->ctx, state->buffer, state->buflen, NULL);
416 679 [ - + ]: 32 : if (LZ4F_isError(status))
331 peter@eisentraut.org 680 :UBC 0 : pg_fatal("could not end compression: %s",
681 : : LZ4F_getErrorName(status));
379 tomas.vondra@postgre 682 [ - + ]:CBC 32 : else if (fwrite(state->buffer, 1, status, state->fp) != status)
683 : : {
416 tomas.vondra@postgre 684 [ # # ]:UBC 0 : errno = (errno) ? errno : ENOSPC;
685 : 0 : WRITE_ERROR_EXIT;
686 : : }
687 : :
379 tomas.vondra@postgre 688 :CBC 32 : status = LZ4F_freeCompressionContext(state->ctx);
416 689 [ - + ]: 32 : if (LZ4F_isError(status))
331 peter@eisentraut.org 690 :UBC 0 : pg_fatal("could not end compression: %s",
691 : : LZ4F_getErrorName(status));
692 : : }
693 : : else
694 : : {
379 tomas.vondra@postgre 695 :CBC 32 : status = LZ4F_freeDecompressionContext(state->dtx);
416 696 [ - + ]: 32 : if (LZ4F_isError(status))
331 peter@eisentraut.org 697 :UBC 0 : pg_fatal("could not end decompression: %s",
698 : : LZ4F_getErrorName(status));
379 tomas.vondra@postgre 699 :CBC 32 : pg_free(state->overflowbuf);
700 : : }
701 : :
702 : 64 : pg_free(state->buffer);
703 : : }
704 : :
705 : 65 : pg_free(state);
706 : :
388 707 : 65 : return fclose(fp) == 0;
708 : : }
709 : :
710 : : static bool
379 711 : 65 : LZ4Stream_open(const char *path, int fd, const char *mode,
712 : : CompressFileHandle *CFH)
713 : : {
714 : : FILE *fp;
715 : 65 : LZ4State *state = (LZ4State *) CFH->private_data;
716 : :
416 717 [ - + ]: 65 : if (fd >= 0)
416 tomas.vondra@postgre 718 :UBC 0 : fp = fdopen(fd, mode);
719 : : else
416 tomas.vondra@postgre 720 :CBC 65 : fp = fopen(path, mode);
721 [ - + ]: 65 : if (fp == NULL)
722 : : {
379 tomas.vondra@postgre 723 :UBC 0 : state->errcode = errno;
388 724 : 0 : return false;
725 : : }
726 : :
379 tomas.vondra@postgre 727 :CBC 65 : state->fp = fp;
728 : :
388 729 : 65 : return true;
730 : : }
731 : :
732 : : static bool
379 733 : 32 : LZ4Stream_open_write(const char *path, const char *mode, CompressFileHandle *CFH)
734 : : {
735 : : char *fname;
736 : : int save_errno;
737 : : bool ret;
738 : :
416 739 : 32 : fname = psprintf("%s.lz4", path);
740 : 32 : ret = CFH->open_func(fname, -1, mode, CFH);
741 : :
388 742 : 32 : save_errno = errno;
416 743 : 32 : pg_free(fname);
388 744 : 32 : errno = save_errno;
745 : :
416 746 : 32 : return ret;
747 : : }
748 : :
749 : : /*
750 : : * Public routines
751 : : */
752 : : void
753 : 65 : InitCompressFileHandleLZ4(CompressFileHandle *CFH,
754 : : const pg_compress_specification compression_spec)
755 : : {
756 : : LZ4State *state;
757 : :
379 758 : 65 : CFH->open_func = LZ4Stream_open;
759 : 65 : CFH->open_write_func = LZ4Stream_open_write;
760 : 65 : CFH->read_func = LZ4Stream_read;
761 : 65 : CFH->write_func = LZ4Stream_write;
762 : 65 : CFH->gets_func = LZ4Stream_gets;
763 : 65 : CFH->getc_func = LZ4Stream_getc;
764 : 65 : CFH->eof_func = LZ4Stream_eof;
765 : 65 : CFH->close_func = LZ4Stream_close;
766 : 65 : CFH->get_error_func = LZ4Stream_get_error;
767 : :
416 768 : 65 : CFH->compression_spec = compression_spec;
379 769 : 65 : state = pg_malloc0(sizeof(*state));
416 770 [ + - ]: 65 : if (CFH->compression_spec.level >= 0)
379 771 : 65 : state->prefs.compressionLevel = CFH->compression_spec.level;
772 : :
773 : 65 : CFH->private_data = state;
416 774 : 65 : }
775 : : #else /* USE_LZ4 */
776 : : void
777 : : InitCompressorLZ4(CompressorState *cs,
778 : : const pg_compress_specification compression_spec)
779 : : {
780 : : pg_fatal("this build does not support compression with %s", "LZ4");
781 : : }
782 : :
783 : : void
784 : : InitCompressFileHandleLZ4(CompressFileHandle *CFH,
785 : : const pg_compress_specification compression_spec)
786 : : {
787 : : pg_fatal("this build does not support compression with %s", "LZ4");
788 : : }
789 : : #endif /* USE_LZ4 */
|