LCOV - differential code coverage report
Current view: top level - src/bin/pg_basebackup - bbstreamer_lz4.c (source / functions) Coverage Total Hit UBC CBC
Current: Differential Code Coverage HEAD vs 15 Lines: 88.4 % 121 107 14 107
Current Date: 2023-04-08 17:13:01 Functions: 100.0 % 8 8 8
Baseline: 15 Line coverage date bins:
Baseline Date: 2023-04-08 15:09:40 (180,240] days: 100.0 % 1 1 1
Legend: Lines: hit not hit (240..) days: 88.3 % 120 106 14 106
Function coverage date bins:
(240..) days: 100.0 % 8 8 8

 Age         Owner                  TLA  Line data    Source code
                                  1                 : /*-------------------------------------------------------------------------
                                  2                 :  *
                                  3                 :  * bbstreamer_lz4.c
                                  4                 :  *
                                  5                 :  * Portions Copyright (c) 1996-2023, PostgreSQL Global Development Group
                                  6                 :  *
                                  7                 :  * IDENTIFICATION
                                  8                 :  *        src/bin/pg_basebackup/bbstreamer_lz4.c
                                  9                 :  *-------------------------------------------------------------------------
                                 10                 :  */
                                 11                 : 
                                 12                 : #include "postgres_fe.h"
                                 13                 : 
                                 14                 : #include <unistd.h>
                                 15                 : 
                                 16                 : #ifdef USE_LZ4
                                 17                 : #include <lz4frame.h>
                                 18                 : #endif
                                 19                 : 
                                 20                 : #include "bbstreamer.h"
                                 21                 : #include "common/logging.h"
                                 22                 : #include "common/file_perm.h"
                                 23                 : #include "common/string.h"
                                 24                 : 
                                 25                 : #ifdef USE_LZ4
                                 26                 : typedef struct bbstreamer_lz4_frame
                                 27                 : {
                                 28                 :     bbstreamer  base;
                                 29                 : 
                                 30                 :     LZ4F_compressionContext_t cctx;
                                 31                 :     LZ4F_decompressionContext_t dctx;
                                 32                 :     LZ4F_preferences_t prefs;
                                 33                 : 
                                 34                 :     size_t      bytes_written;
                                 35                 :     bool        header_written;
                                 36                 : } bbstreamer_lz4_frame;
                                 37                 : 
                                 38                 : static void bbstreamer_lz4_compressor_content(bbstreamer *streamer,
                                 39                 :                                               bbstreamer_member *member,
                                 40                 :                                               const char *data, int len,
                                 41                 :                                               bbstreamer_archive_context context);
                                 42                 : static void bbstreamer_lz4_compressor_finalize(bbstreamer *streamer);
                                 43                 : static void bbstreamer_lz4_compressor_free(bbstreamer *streamer);
                                 44                 : 
                                 45                 : const bbstreamer_ops bbstreamer_lz4_compressor_ops = {
                                 46                 :     .content = bbstreamer_lz4_compressor_content,
                                 47                 :     .finalize = bbstreamer_lz4_compressor_finalize,
                                 48                 :     .free = bbstreamer_lz4_compressor_free
                                 49                 : };
                                 50                 : 
                                 51                 : static void bbstreamer_lz4_decompressor_content(bbstreamer *streamer,
                                 52                 :                                                 bbstreamer_member *member,
                                 53                 :                                                 const char *data, int len,
                                 54                 :                                                 bbstreamer_archive_context context);
                                 55                 : static void bbstreamer_lz4_decompressor_finalize(bbstreamer *streamer);
                                 56                 : static void bbstreamer_lz4_decompressor_free(bbstreamer *streamer);
                                 57                 : 
                                 58                 : const bbstreamer_ops bbstreamer_lz4_decompressor_ops = {
                                 59                 :     .content = bbstreamer_lz4_decompressor_content,
                                 60                 :     .finalize = bbstreamer_lz4_decompressor_finalize,
                                 61                 :     .free = bbstreamer_lz4_decompressor_free
                                 62                 : };
                                 63                 : #endif
                                 64                 : 
                                 65                 : /*
                                 66                 :  * Create a new base backup streamer that performs lz4 compression of tar
                                 67                 :  * blocks.
                                 68                 :  */
                                 69                 : bbstreamer *
  362 michael                    70 CBC           1 : bbstreamer_lz4_compressor_new(bbstreamer *next, pg_compress_specification *compress)
                                 71                 : {
                                 72                 : #ifdef USE_LZ4
                                 73                 :     bbstreamer_lz4_frame *streamer;
                                 74                 :     LZ4F_errorCode_t ctxError;
                                 75                 :     LZ4F_preferences_t *prefs;
                                 76                 : 
  422 rhaas                      77               1 :     Assert(next != NULL);
                                 78                 : 
                                 79               1 :     streamer = palloc0(sizeof(bbstreamer_lz4_frame));
                                 80               1 :     *((const bbstreamer_ops **) &streamer->base.bbs_ops) =
                                 81                 :         &bbstreamer_lz4_compressor_ops;
                                 82                 : 
                                 83               1 :     streamer->base.bbs_next = next;
                                 84               1 :     initStringInfo(&streamer->base.bbs_buffer);
                                 85               1 :     streamer->header_written = false;
                                 86                 : 
                                 87                 :     /* Initialize stream compression preferences */
                                 88               1 :     prefs = &streamer->prefs;
                                 89               1 :     memset(prefs, 0, sizeof(LZ4F_preferences_t));
                                 90               1 :     prefs->frameInfo.blockSizeID = LZ4F_max256KB;
  207 michael                    91               1 :     prefs->compressionLevel = compress->level;
                                 92                 : 
  422 rhaas                      93               1 :     ctxError = LZ4F_createCompressionContext(&streamer->cctx, LZ4F_VERSION);
                                 94               1 :     if (LZ4F_isError(ctxError))
  366 tgl                        95 UBC           0 :         pg_log_error("could not create lz4 compression context: %s",
                                 96                 :                      LZ4F_getErrorName(ctxError));
                                 97                 : 
  422 rhaas                      98 CBC           1 :     return &streamer->base;
                                 99                 : #else
                                100                 :     pg_fatal("this build does not support compression with %s", "LZ4");
                                101                 :     return NULL;                /* keep compiler quiet */
                                102                 : #endif
                                103                 : }
                                104                 : 
                                105                 : #ifdef USE_LZ4
                                106                 : /*
                                107                 :  * Compress the input data to output buffer.
                                108                 :  *
                                109                 :  * Find out the compression bound based on input data length for each
                                110                 :  * invocation to make sure that output buffer has enough capacity to
                                111                 :  * accommodate the compressed data. In case if the output buffer
                                112                 :  * capacity falls short of compression bound then forward the content
                                113                 :  * of output buffer to next streamer and empty the buffer.
                                114                 :  */
                                115                 : static void
                                116            2703 : bbstreamer_lz4_compressor_content(bbstreamer *streamer,
                                117                 :                                   bbstreamer_member *member,
                                118                 :                                   const char *data, int len,
                                119                 :                                   bbstreamer_archive_context context)
                                120                 : {
                                121                 :     bbstreamer_lz4_frame *mystreamer;
                                122                 :     uint8      *next_in,
                                123                 :                *next_out;
                                124                 :     size_t      out_bound,
                                125                 :                 compressed_size,
                                126                 :                 avail_out;
                                127                 : 
                                128            2703 :     mystreamer = (bbstreamer_lz4_frame *) streamer;
                                129            2703 :     next_in = (uint8 *) data;
                                130                 : 
                                131                 :     /* Write header before processing the first input chunk. */
                                132            2703 :     if (!mystreamer->header_written)
                                133                 :     {
                                134               1 :         compressed_size = LZ4F_compressBegin(mystreamer->cctx,
                                135               1 :                                              (uint8 *) mystreamer->base.bbs_buffer.data,
                                136               1 :                                              mystreamer->base.bbs_buffer.maxlen,
                                137               1 :                                              &mystreamer->prefs);
                                138                 : 
                                139               1 :         if (LZ4F_isError(compressed_size))
  422 rhaas                     140 UBC           0 :             pg_log_error("could not write lz4 header: %s",
                                141                 :                          LZ4F_getErrorName(compressed_size));
                                142                 : 
  422 rhaas                     143 CBC           1 :         mystreamer->bytes_written += compressed_size;
                                144               1 :         mystreamer->header_written = true;
                                145                 :     }
                                146                 : 
                                147                 :     /*
                                148                 :      * Update the offset and capacity of output buffer based on number of
                                149                 :      * bytes written to output buffer.
                                150                 :      */
                                151            2703 :     next_out = (uint8 *) mystreamer->base.bbs_buffer.data + mystreamer->bytes_written;
                                152            2703 :     avail_out = mystreamer->base.bbs_buffer.maxlen - mystreamer->bytes_written;
                                153                 : 
                                154                 :     /*
                                155                 :      * Find out the compression bound and make sure that output buffer has the
                                156                 :      * required capacity for the success of LZ4F_compressUpdate. If needed
                                157                 :      * forward the content to next streamer and empty the buffer.
                                158                 :      */
                                159            2703 :     out_bound = LZ4F_compressBound(len, &mystreamer->prefs);
  397                           160            2703 :     if (avail_out < out_bound)
                                161                 :     {
  332 tgl                       162              14 :         bbstreamer_content(mystreamer->base.bbs_next, member,
                                163              14 :                            mystreamer->base.bbs_buffer.data,
                                164              14 :                            mystreamer->bytes_written,
                                165                 :                            context);
                                166                 : 
                                167                 :         /* Enlarge buffer if it falls short of out bound. */
                                168              14 :         if (mystreamer->base.bbs_buffer.maxlen < out_bound)
                                169               1 :             enlargeStringInfo(&mystreamer->base.bbs_buffer, out_bound);
                                170                 : 
                                171              14 :         avail_out = mystreamer->base.bbs_buffer.maxlen;
                                172              14 :         mystreamer->bytes_written = 0;
                                173              14 :         next_out = (uint8 *) mystreamer->base.bbs_buffer.data;
                                174                 :     }
                                175                 : 
                                176                 :     /*
                                177                 :      * This call compresses the data starting at next_in and generates the
                                178                 :      * output starting at next_out. It expects the caller to provide the size
                                179                 :      * of input buffer and capacity of output buffer by providing parameters
                                180                 :      * len and avail_out.
                                181                 :      *
                                182                 :      * It returns the number of bytes compressed to output buffer.
                                183                 :      */
  422 rhaas                     184            2703 :     compressed_size = LZ4F_compressUpdate(mystreamer->cctx,
                                185                 :                                           next_out, avail_out,
                                186                 :                                           next_in, len, NULL);
                                187                 : 
                                188            2703 :     if (LZ4F_isError(compressed_size))
  422 rhaas                     189 UBC           0 :         pg_log_error("could not compress data: %s",
                                190                 :                      LZ4F_getErrorName(compressed_size));
                                191                 : 
  422 rhaas                     192 CBC        2703 :     mystreamer->bytes_written += compressed_size;
                                193            2703 : }
                                194                 : 
                                195                 : /*
                                196                 :  * End-of-stream processing.
                                197                 :  */
                                198                 : static void
                                199               1 : bbstreamer_lz4_compressor_finalize(bbstreamer *streamer)
                                200                 : {
                                201                 :     bbstreamer_lz4_frame *mystreamer;
                                202                 :     uint8      *next_out;
                                203                 :     size_t      footer_bound,
                                204                 :                 compressed_size,
                                205                 :                 avail_out;
                                206                 : 
                                207               1 :     mystreamer = (bbstreamer_lz4_frame *) streamer;
                                208                 : 
                                209                 :     /* Find out the footer bound and update the output buffer. */
                                210               1 :     footer_bound = LZ4F_compressBound(0, &mystreamer->prefs);
  397                           211               1 :     if ((mystreamer->base.bbs_buffer.maxlen - mystreamer->bytes_written) <
                                212                 :         footer_bound)
                                213                 :     {
  332 tgl                       214 UBC           0 :         bbstreamer_content(mystreamer->base.bbs_next, NULL,
                                215               0 :                            mystreamer->base.bbs_buffer.data,
                                216               0 :                            mystreamer->bytes_written,
                                217                 :                            BBSTREAMER_UNKNOWN);
                                218                 : 
                                219                 :         /* Enlarge buffer if it falls short of footer bound. */
                                220               0 :         if (mystreamer->base.bbs_buffer.maxlen < footer_bound)
                                221               0 :             enlargeStringInfo(&mystreamer->base.bbs_buffer, footer_bound);
                                222                 : 
                                223               0 :         avail_out = mystreamer->base.bbs_buffer.maxlen;
                                224               0 :         mystreamer->bytes_written = 0;
                                225               0 :         next_out = (uint8 *) mystreamer->base.bbs_buffer.data;
                                226                 :     }
                                227                 :     else
                                228                 :     {
  422 rhaas                     229 CBC           1 :         next_out = (uint8 *) mystreamer->base.bbs_buffer.data + mystreamer->bytes_written;
                                230               1 :         avail_out = mystreamer->base.bbs_buffer.maxlen - mystreamer->bytes_written;
                                231                 :     }
                                232                 : 
                                233                 :     /*
                                234                 :      * Finalize the frame and flush whatever data remaining in compression
                                235                 :      * context.
                                236                 :      */
                                237               1 :     compressed_size = LZ4F_compressEnd(mystreamer->cctx,
                                238                 :                                        next_out, avail_out, NULL);
                                239                 : 
                                240               1 :     if (LZ4F_isError(compressed_size))
  422 rhaas                     241 UBC           0 :         pg_log_error("could not end lz4 compression: %s",
                                242                 :                      LZ4F_getErrorName(compressed_size));
                                243                 : 
  422 rhaas                     244 CBC           1 :     mystreamer->bytes_written += compressed_size;
                                245                 : 
                                246               1 :     bbstreamer_content(mystreamer->base.bbs_next, NULL,
                                247               1 :                        mystreamer->base.bbs_buffer.data,
                                248               1 :                        mystreamer->bytes_written,
                                249                 :                        BBSTREAMER_UNKNOWN);
                                250                 : 
                                251               1 :     bbstreamer_finalize(mystreamer->base.bbs_next);
                                252               1 : }
                                253                 : 
                                254                 : /*
                                255                 :  * Free memory.
                                256                 :  */
                                257                 : static void
                                258               1 : bbstreamer_lz4_compressor_free(bbstreamer *streamer)
                                259                 : {
                                260                 :     bbstreamer_lz4_frame *mystreamer;
                                261                 : 
                                262               1 :     mystreamer = (bbstreamer_lz4_frame *) streamer;
                                263               1 :     bbstreamer_free(streamer->bbs_next);
                                264               1 :     LZ4F_freeCompressionContext(mystreamer->cctx);
                                265               1 :     pfree(streamer->bbs_buffer.data);
                                266               1 :     pfree(streamer);
                                267               1 : }
                                268                 : #endif
                                269                 : 
                                270                 : /*
                                271                 :  * Create a new base backup streamer that performs decompression of lz4
                                272                 :  * compressed blocks.
                                273                 :  */
                                274                 : bbstreamer *
                                275               1 : bbstreamer_lz4_decompressor_new(bbstreamer *next)
                                276                 : {
                                277                 : #ifdef USE_LZ4
                                278                 :     bbstreamer_lz4_frame *streamer;
                                279                 :     LZ4F_errorCode_t ctxError;
                                280                 : 
                                281               1 :     Assert(next != NULL);
                                282                 : 
                                283               1 :     streamer = palloc0(sizeof(bbstreamer_lz4_frame));
                                284               1 :     *((const bbstreamer_ops **) &streamer->base.bbs_ops) =
                                285                 :         &bbstreamer_lz4_decompressor_ops;
                                286                 : 
                                287               1 :     streamer->base.bbs_next = next;
                                288               1 :     initStringInfo(&streamer->base.bbs_buffer);
                                289                 : 
                                290                 :     /* Initialize internal stream state for decompression */
                                291               1 :     ctxError = LZ4F_createDecompressionContext(&streamer->dctx, LZ4F_VERSION);
                                292               1 :     if (LZ4F_isError(ctxError))
  366 tgl                       293 UBC           0 :         pg_fatal("could not initialize compression library: %s",
                                294                 :                  LZ4F_getErrorName(ctxError));
                                295                 : 
  422 rhaas                     296 CBC           1 :     return &streamer->base;
                                297                 : #else
                                298                 :     pg_fatal("this build does not support compression with %s", "LZ4");
                                299                 :     return NULL;                /* keep compiler quiet */
                                300                 : #endif
                                301                 : }
                                302                 : 
                                303                 : #ifdef USE_LZ4
                                304                 : /*
                                305                 :  * Decompress the input data to output buffer until we run out of input
                                306                 :  * data. Each time the output buffer is full, pass on the decompressed data
                                307                 :  * to the next streamer.
                                308                 :  */
                                309                 : static void
                                310              98 : bbstreamer_lz4_decompressor_content(bbstreamer *streamer,
                                311                 :                                     bbstreamer_member *member,
                                312                 :                                     const char *data, int len,
                                313                 :                                     bbstreamer_archive_context context)
                                314                 : {
                                315                 :     bbstreamer_lz4_frame *mystreamer;
                                316                 :     uint8      *next_in,
                                317                 :                *next_out;
                                318                 :     size_t      avail_in,
                                319                 :                 avail_out;
                                320                 : 
                                321              98 :     mystreamer = (bbstreamer_lz4_frame *) streamer;
                                322              98 :     next_in = (uint8 *) data;
                                323              98 :     next_out = (uint8 *) mystreamer->base.bbs_buffer.data;
                                324              98 :     avail_in = len;
                                325              98 :     avail_out = mystreamer->base.bbs_buffer.maxlen;
                                326                 : 
                                327           40050 :     while (avail_in > 0)
                                328                 :     {
                                329                 :         size_t      ret,
                                330                 :                     read_size,
                                331                 :                     out_size;
                                332                 : 
                                333           39952 :         read_size = avail_in;
                                334           39952 :         out_size = avail_out;
                                335                 : 
                                336                 :         /*
                                337                 :          * This call decompresses the data starting at next_in and generates
                                338                 :          * the output data starting at next_out. It expects the caller to
                                339                 :          * provide size of the input buffer and total capacity of the output
                                340                 :          * buffer by providing the read_size and out_size parameters
                                341                 :          * respectively.
                                342                 :          *
                                343                 :          * Per the documentation of LZ4, parameters read_size and out_size
                                344                 :          * behaves as dual parameters. On return, the number of bytes consumed
                                345                 :          * from the input buffer will be written back to read_size and the
                                346                 :          * number of bytes decompressed to output buffer will be written back
                                347                 :          * to out_size respectively.
                                348                 :          */
                                349           39952 :         ret = LZ4F_decompress(mystreamer->dctx,
                                350                 :                               next_out, &out_size,
                                351                 :                               next_in, &read_size, NULL);
                                352                 : 
                                353           39952 :         if (LZ4F_isError(ret))
  422 rhaas                     354 UBC           0 :             pg_log_error("could not decompress data: %s",
                                355                 :                          LZ4F_getErrorName(ret));
                                356                 : 
                                357                 :         /* Update input buffer based on number of bytes consumed */
  422 rhaas                     358 CBC       39952 :         avail_in -= read_size;
                                359           39952 :         next_in += read_size;
                                360                 : 
                                361           39952 :         mystreamer->bytes_written += out_size;
                                362                 : 
                                363                 :         /*
                                364                 :          * If output buffer is full then forward the content to next streamer
                                365                 :          * and update the output buffer.
                                366                 :          */
                                367           39952 :         if (mystreamer->bytes_written >= mystreamer->base.bbs_buffer.maxlen)
                                368                 :         {
                                369           39951 :             bbstreamer_content(mystreamer->base.bbs_next, member,
                                370           39951 :                                mystreamer->base.bbs_buffer.data,
                                371                 :                                mystreamer->base.bbs_buffer.maxlen,
                                372                 :                                context);
                                373                 : 
                                374           39951 :             avail_out = mystreamer->base.bbs_buffer.maxlen;
                                375           39951 :             mystreamer->bytes_written = 0;
                                376           39951 :             next_out = (uint8 *) mystreamer->base.bbs_buffer.data;
                                377                 :         }
                                378                 :         else
                                379                 :         {
                                380               1 :             avail_out = mystreamer->base.bbs_buffer.maxlen - mystreamer->bytes_written;
                                381               1 :             next_out += mystreamer->bytes_written;
                                382                 :         }
                                383                 :     }
                                384              98 : }
                                385                 : 
                                386                 : /*
                                387                 :  * End-of-stream processing.
                                388                 :  */
                                389                 : static void
                                390               1 : bbstreamer_lz4_decompressor_finalize(bbstreamer *streamer)
                                391                 : {
                                392                 :     bbstreamer_lz4_frame *mystreamer;
                                393                 : 
                                394               1 :     mystreamer = (bbstreamer_lz4_frame *) streamer;
                                395                 : 
                                396                 :     /*
                                397                 :      * End of the stream, if there is some pending data in output buffers then
                                398                 :      * we must forward it to next streamer.
                                399                 :      */
                                400               1 :     bbstreamer_content(mystreamer->base.bbs_next, NULL,
                                401               1 :                        mystreamer->base.bbs_buffer.data,
                                402                 :                        mystreamer->base.bbs_buffer.maxlen,
                                403                 :                        BBSTREAMER_UNKNOWN);
                                404                 : 
                                405               1 :     bbstreamer_finalize(mystreamer->base.bbs_next);
                                406               1 : }
                                407                 : 
                                408                 : /*
                                409                 :  * Free memory.
                                410                 :  */
                                411                 : static void
                                412               1 : bbstreamer_lz4_decompressor_free(bbstreamer *streamer)
                                413                 : {
                                414                 :     bbstreamer_lz4_frame *mystreamer;
                                415                 : 
                                416               1 :     mystreamer = (bbstreamer_lz4_frame *) streamer;
                                417               1 :     bbstreamer_free(streamer->bbs_next);
                                418               1 :     LZ4F_freeDecompressionContext(mystreamer->dctx);
                                419               1 :     pfree(streamer->bbs_buffer.data);
                                420               1 :     pfree(streamer);
                                421               1 : }
                                422                 : #endif
        

Generated by: LCOV version v1.16-55-g56c0a2a