LCOV - differential code coverage report
Current view: top level - src/bin/pg_basebackup - bbstreamer_lz4.c (source / functions) Coverage Total Hit UBC CBC
Current: Differential Code Coverage HEAD vs 15 Lines: 88.4 % 121 107 14 107
Current Date: 2023-04-08 15:15:32 Functions: 100.0 % 8 8 8
Baseline: 15
Baseline Date: 2023-04-08 15:09:40
Legend: Lines: hit not hit

           TLA  Line data    Source code
       1                 : /*-------------------------------------------------------------------------
       2                 :  *
       3                 :  * bbstreamer_lz4.c
       4                 :  *
       5                 :  * Portions Copyright (c) 1996-2023, PostgreSQL Global Development Group
       6                 :  *
       7                 :  * IDENTIFICATION
       8                 :  *        src/bin/pg_basebackup/bbstreamer_lz4.c
       9                 :  *-------------------------------------------------------------------------
      10                 :  */
      11                 : 
      12                 : #include "postgres_fe.h"
      13                 : 
      14                 : #include <unistd.h>
      15                 : 
      16                 : #ifdef USE_LZ4
      17                 : #include <lz4frame.h>
      18                 : #endif
      19                 : 
      20                 : #include "bbstreamer.h"
      21                 : #include "common/logging.h"
      22                 : #include "common/file_perm.h"
      23                 : #include "common/string.h"
      24                 : 
      25                 : #ifdef USE_LZ4
      26                 : typedef struct bbstreamer_lz4_frame
      27                 : {
      28                 :     bbstreamer  base;
      29                 : 
      30                 :     LZ4F_compressionContext_t cctx;
      31                 :     LZ4F_decompressionContext_t dctx;
      32                 :     LZ4F_preferences_t prefs;
      33                 : 
      34                 :     size_t      bytes_written;
      35                 :     bool        header_written;
      36                 : } bbstreamer_lz4_frame;
      37                 : 
      38                 : static void bbstreamer_lz4_compressor_content(bbstreamer *streamer,
      39                 :                                               bbstreamer_member *member,
      40                 :                                               const char *data, int len,
      41                 :                                               bbstreamer_archive_context context);
      42                 : static void bbstreamer_lz4_compressor_finalize(bbstreamer *streamer);
      43                 : static void bbstreamer_lz4_compressor_free(bbstreamer *streamer);
      44                 : 
      45                 : const bbstreamer_ops bbstreamer_lz4_compressor_ops = {
      46                 :     .content = bbstreamer_lz4_compressor_content,
      47                 :     .finalize = bbstreamer_lz4_compressor_finalize,
      48                 :     .free = bbstreamer_lz4_compressor_free
      49                 : };
      50                 : 
      51                 : static void bbstreamer_lz4_decompressor_content(bbstreamer *streamer,
      52                 :                                                 bbstreamer_member *member,
      53                 :                                                 const char *data, int len,
      54                 :                                                 bbstreamer_archive_context context);
      55                 : static void bbstreamer_lz4_decompressor_finalize(bbstreamer *streamer);
      56                 : static void bbstreamer_lz4_decompressor_free(bbstreamer *streamer);
      57                 : 
      58                 : const bbstreamer_ops bbstreamer_lz4_decompressor_ops = {
      59                 :     .content = bbstreamer_lz4_decompressor_content,
      60                 :     .finalize = bbstreamer_lz4_decompressor_finalize,
      61                 :     .free = bbstreamer_lz4_decompressor_free
      62                 : };
      63                 : #endif
      64                 : 
      65                 : /*
      66                 :  * Create a new base backup streamer that performs lz4 compression of tar
      67                 :  * blocks.
      68                 :  */
      69                 : bbstreamer *
      70 CBC           1 : bbstreamer_lz4_compressor_new(bbstreamer *next, pg_compress_specification *compress)
      71                 : {
      72                 : #ifdef USE_LZ4
      73                 :     bbstreamer_lz4_frame *streamer;
      74                 :     LZ4F_errorCode_t ctxError;
      75                 :     LZ4F_preferences_t *prefs;
      76                 : 
      77               1 :     Assert(next != NULL);
      78                 : 
      79               1 :     streamer = palloc0(sizeof(bbstreamer_lz4_frame));
      80               1 :     *((const bbstreamer_ops **) &streamer->base.bbs_ops) =
      81                 :         &bbstreamer_lz4_compressor_ops;
      82                 : 
      83               1 :     streamer->base.bbs_next = next;
      84               1 :     initStringInfo(&streamer->base.bbs_buffer);
      85               1 :     streamer->header_written = false;
      86                 : 
      87                 :     /* Initialize stream compression preferences */
      88               1 :     prefs = &streamer->prefs;
      89               1 :     memset(prefs, 0, sizeof(LZ4F_preferences_t));
      90               1 :     prefs->frameInfo.blockSizeID = LZ4F_max256KB;
      91               1 :     prefs->compressionLevel = compress->level;
      92                 : 
      93               1 :     ctxError = LZ4F_createCompressionContext(&streamer->cctx, LZ4F_VERSION);
      94               1 :     if (LZ4F_isError(ctxError))
      95 UBC           0 :         pg_log_error("could not create lz4 compression context: %s",
      96                 :                      LZ4F_getErrorName(ctxError));
      97                 : 
      98 CBC           1 :     return &streamer->base;
      99                 : #else
     100                 :     pg_fatal("this build does not support compression with %s", "LZ4");
     101                 :     return NULL;                /* keep compiler quiet */
     102                 : #endif
     103                 : }
     104                 : 
     105                 : #ifdef USE_LZ4
     106                 : /*
     107                 :  * Compress the input data to output buffer.
     108                 :  *
     109                 :  * Find out the compression bound based on input data length for each
     110                 :  * invocation to make sure that output buffer has enough capacity to
     111                 :  * accommodate the compressed data. In case if the output buffer
     112                 :  * capacity falls short of compression bound then forward the content
     113                 :  * of output buffer to next streamer and empty the buffer.
     114                 :  */
     115                 : static void
     116            2703 : bbstreamer_lz4_compressor_content(bbstreamer *streamer,
     117                 :                                   bbstreamer_member *member,
     118                 :                                   const char *data, int len,
     119                 :                                   bbstreamer_archive_context context)
     120                 : {
     121                 :     bbstreamer_lz4_frame *mystreamer;
     122                 :     uint8      *next_in,
     123                 :                *next_out;
     124                 :     size_t      out_bound,
     125                 :                 compressed_size,
     126                 :                 avail_out;
     127                 : 
     128            2703 :     mystreamer = (bbstreamer_lz4_frame *) streamer;
     129            2703 :     next_in = (uint8 *) data;
     130                 : 
     131                 :     /* Write header before processing the first input chunk. */
     132            2703 :     if (!mystreamer->header_written)
     133                 :     {
     134               1 :         compressed_size = LZ4F_compressBegin(mystreamer->cctx,
     135               1 :                                              (uint8 *) mystreamer->base.bbs_buffer.data,
     136               1 :                                              mystreamer->base.bbs_buffer.maxlen,
     137               1 :                                              &mystreamer->prefs);
     138                 : 
     139               1 :         if (LZ4F_isError(compressed_size))
     140 UBC           0 :             pg_log_error("could not write lz4 header: %s",
     141                 :                          LZ4F_getErrorName(compressed_size));
     142                 : 
     143 CBC           1 :         mystreamer->bytes_written += compressed_size;
     144               1 :         mystreamer->header_written = true;
     145                 :     }
     146                 : 
     147                 :     /*
     148                 :      * Update the offset and capacity of output buffer based on number of
     149                 :      * bytes written to output buffer.
     150                 :      */
     151            2703 :     next_out = (uint8 *) mystreamer->base.bbs_buffer.data + mystreamer->bytes_written;
     152            2703 :     avail_out = mystreamer->base.bbs_buffer.maxlen - mystreamer->bytes_written;
     153                 : 
     154                 :     /*
     155                 :      * Find out the compression bound and make sure that output buffer has the
     156                 :      * required capacity for the success of LZ4F_compressUpdate. If needed
     157                 :      * forward the content to next streamer and empty the buffer.
     158                 :      */
     159            2703 :     out_bound = LZ4F_compressBound(len, &mystreamer->prefs);
     160            2703 :     if (avail_out < out_bound)
     161                 :     {
     162              14 :         bbstreamer_content(mystreamer->base.bbs_next, member,
     163              14 :                            mystreamer->base.bbs_buffer.data,
     164              14 :                            mystreamer->bytes_written,
     165                 :                            context);
     166                 : 
     167                 :         /* Enlarge buffer if it falls short of out bound. */
     168              14 :         if (mystreamer->base.bbs_buffer.maxlen < out_bound)
     169               1 :             enlargeStringInfo(&mystreamer->base.bbs_buffer, out_bound);
     170                 : 
     171              14 :         avail_out = mystreamer->base.bbs_buffer.maxlen;
     172              14 :         mystreamer->bytes_written = 0;
     173              14 :         next_out = (uint8 *) mystreamer->base.bbs_buffer.data;
     174                 :     }
     175                 : 
     176                 :     /*
     177                 :      * This call compresses the data starting at next_in and generates the
     178                 :      * output starting at next_out. It expects the caller to provide the size
     179                 :      * of input buffer and capacity of output buffer by providing parameters
     180                 :      * len and avail_out.
     181                 :      *
     182                 :      * It returns the number of bytes compressed to output buffer.
     183                 :      */
     184            2703 :     compressed_size = LZ4F_compressUpdate(mystreamer->cctx,
     185                 :                                           next_out, avail_out,
     186                 :                                           next_in, len, NULL);
     187                 : 
     188            2703 :     if (LZ4F_isError(compressed_size))
     189 UBC           0 :         pg_log_error("could not compress data: %s",
     190                 :                      LZ4F_getErrorName(compressed_size));
     191                 : 
     192 CBC        2703 :     mystreamer->bytes_written += compressed_size;
     193            2703 : }
     194                 : 
     195                 : /*
     196                 :  * End-of-stream processing.
     197                 :  */
     198                 : static void
     199               1 : bbstreamer_lz4_compressor_finalize(bbstreamer *streamer)
     200                 : {
     201                 :     bbstreamer_lz4_frame *mystreamer;
     202                 :     uint8      *next_out;
     203                 :     size_t      footer_bound,
     204                 :                 compressed_size,
     205                 :                 avail_out;
     206                 : 
     207               1 :     mystreamer = (bbstreamer_lz4_frame *) streamer;
     208                 : 
     209                 :     /* Find out the footer bound and update the output buffer. */
     210               1 :     footer_bound = LZ4F_compressBound(0, &mystreamer->prefs);
     211               1 :     if ((mystreamer->base.bbs_buffer.maxlen - mystreamer->bytes_written) <
     212                 :         footer_bound)
     213                 :     {
     214 UBC           0 :         bbstreamer_content(mystreamer->base.bbs_next, NULL,
     215               0 :                            mystreamer->base.bbs_buffer.data,
     216               0 :                            mystreamer->bytes_written,
     217                 :                            BBSTREAMER_UNKNOWN);
     218                 : 
     219                 :         /* Enlarge buffer if it falls short of footer bound. */
     220               0 :         if (mystreamer->base.bbs_buffer.maxlen < footer_bound)
     221               0 :             enlargeStringInfo(&mystreamer->base.bbs_buffer, footer_bound);
     222                 : 
     223               0 :         avail_out = mystreamer->base.bbs_buffer.maxlen;
     224               0 :         mystreamer->bytes_written = 0;
     225               0 :         next_out = (uint8 *) mystreamer->base.bbs_buffer.data;
     226                 :     }
     227                 :     else
     228                 :     {
     229 CBC           1 :         next_out = (uint8 *) mystreamer->base.bbs_buffer.data + mystreamer->bytes_written;
     230               1 :         avail_out = mystreamer->base.bbs_buffer.maxlen - mystreamer->bytes_written;
     231                 :     }
     232                 : 
     233                 :     /*
     234                 :      * Finalize the frame and flush whatever data remaining in compression
     235                 :      * context.
     236                 :      */
     237               1 :     compressed_size = LZ4F_compressEnd(mystreamer->cctx,
     238                 :                                        next_out, avail_out, NULL);
     239                 : 
     240               1 :     if (LZ4F_isError(compressed_size))
     241 UBC           0 :         pg_log_error("could not end lz4 compression: %s",
     242                 :                      LZ4F_getErrorName(compressed_size));
     243                 : 
     244 CBC           1 :     mystreamer->bytes_written += compressed_size;
     245                 : 
     246               1 :     bbstreamer_content(mystreamer->base.bbs_next, NULL,
     247               1 :                        mystreamer->base.bbs_buffer.data,
     248               1 :                        mystreamer->bytes_written,
     249                 :                        BBSTREAMER_UNKNOWN);
     250                 : 
     251               1 :     bbstreamer_finalize(mystreamer->base.bbs_next);
     252               1 : }
     253                 : 
     254                 : /*
     255                 :  * Free memory.
     256                 :  */
     257                 : static void
     258               1 : bbstreamer_lz4_compressor_free(bbstreamer *streamer)
     259                 : {
     260                 :     bbstreamer_lz4_frame *mystreamer;
     261                 : 
     262               1 :     mystreamer = (bbstreamer_lz4_frame *) streamer;
     263               1 :     bbstreamer_free(streamer->bbs_next);
     264               1 :     LZ4F_freeCompressionContext(mystreamer->cctx);
     265               1 :     pfree(streamer->bbs_buffer.data);
     266               1 :     pfree(streamer);
     267               1 : }
     268                 : #endif
     269                 : 
     270                 : /*
     271                 :  * Create a new base backup streamer that performs decompression of lz4
     272                 :  * compressed blocks.
     273                 :  */
     274                 : bbstreamer *
     275               1 : bbstreamer_lz4_decompressor_new(bbstreamer *next)
     276                 : {
     277                 : #ifdef USE_LZ4
     278                 :     bbstreamer_lz4_frame *streamer;
     279                 :     LZ4F_errorCode_t ctxError;
     280                 : 
     281               1 :     Assert(next != NULL);
     282                 : 
     283               1 :     streamer = palloc0(sizeof(bbstreamer_lz4_frame));
     284               1 :     *((const bbstreamer_ops **) &streamer->base.bbs_ops) =
     285                 :         &bbstreamer_lz4_decompressor_ops;
     286                 : 
     287               1 :     streamer->base.bbs_next = next;
     288               1 :     initStringInfo(&streamer->base.bbs_buffer);
     289                 : 
     290                 :     /* Initialize internal stream state for decompression */
     291               1 :     ctxError = LZ4F_createDecompressionContext(&streamer->dctx, LZ4F_VERSION);
     292               1 :     if (LZ4F_isError(ctxError))
     293 UBC           0 :         pg_fatal("could not initialize compression library: %s",
     294                 :                  LZ4F_getErrorName(ctxError));
     295                 : 
     296 CBC           1 :     return &streamer->base;
     297                 : #else
     298                 :     pg_fatal("this build does not support compression with %s", "LZ4");
     299                 :     return NULL;                /* keep compiler quiet */
     300                 : #endif
     301                 : }
     302                 : 
     303                 : #ifdef USE_LZ4
     304                 : /*
     305                 :  * Decompress the input data to output buffer until we run out of input
     306                 :  * data. Each time the output buffer is full, pass on the decompressed data
     307                 :  * to the next streamer.
     308                 :  */
     309                 : static void
     310              98 : bbstreamer_lz4_decompressor_content(bbstreamer *streamer,
     311                 :                                     bbstreamer_member *member,
     312                 :                                     const char *data, int len,
     313                 :                                     bbstreamer_archive_context context)
     314                 : {
     315                 :     bbstreamer_lz4_frame *mystreamer;
     316                 :     uint8      *next_in,
     317                 :                *next_out;
     318                 :     size_t      avail_in,
     319                 :                 avail_out;
     320                 : 
     321              98 :     mystreamer = (bbstreamer_lz4_frame *) streamer;
     322              98 :     next_in = (uint8 *) data;
     323              98 :     next_out = (uint8 *) mystreamer->base.bbs_buffer.data;
     324              98 :     avail_in = len;
     325              98 :     avail_out = mystreamer->base.bbs_buffer.maxlen;
     326                 : 
     327           40050 :     while (avail_in > 0)
     328                 :     {
     329                 :         size_t      ret,
     330                 :                     read_size,
     331                 :                     out_size;
     332                 : 
     333           39952 :         read_size = avail_in;
     334           39952 :         out_size = avail_out;
     335                 : 
     336                 :         /*
     337                 :          * This call decompresses the data starting at next_in and generates
     338                 :          * the output data starting at next_out. It expects the caller to
     339                 :          * provide size of the input buffer and total capacity of the output
     340                 :          * buffer by providing the read_size and out_size parameters
     341                 :          * respectively.
     342                 :          *
     343                 :          * Per the documentation of LZ4, parameters read_size and out_size
     344                 :          * behaves as dual parameters. On return, the number of bytes consumed
     345                 :          * from the input buffer will be written back to read_size and the
     346                 :          * number of bytes decompressed to output buffer will be written back
     347                 :          * to out_size respectively.
     348                 :          */
     349           39952 :         ret = LZ4F_decompress(mystreamer->dctx,
     350                 :                               next_out, &out_size,
     351                 :                               next_in, &read_size, NULL);
     352                 : 
     353           39952 :         if (LZ4F_isError(ret))
     354 UBC           0 :             pg_log_error("could not decompress data: %s",
     355                 :                          LZ4F_getErrorName(ret));
     356                 : 
     357                 :         /* Update input buffer based on number of bytes consumed */
     358 CBC       39952 :         avail_in -= read_size;
     359           39952 :         next_in += read_size;
     360                 : 
     361           39952 :         mystreamer->bytes_written += out_size;
     362                 : 
     363                 :         /*
     364                 :          * If output buffer is full then forward the content to next streamer
     365                 :          * and update the output buffer.
     366                 :          */
     367           39952 :         if (mystreamer->bytes_written >= mystreamer->base.bbs_buffer.maxlen)
     368                 :         {
     369           39951 :             bbstreamer_content(mystreamer->base.bbs_next, member,
     370           39951 :                                mystreamer->base.bbs_buffer.data,
     371                 :                                mystreamer->base.bbs_buffer.maxlen,
     372                 :                                context);
     373                 : 
     374           39951 :             avail_out = mystreamer->base.bbs_buffer.maxlen;
     375           39951 :             mystreamer->bytes_written = 0;
     376           39951 :             next_out = (uint8 *) mystreamer->base.bbs_buffer.data;
     377                 :         }
     378                 :         else
     379                 :         {
     380               1 :             avail_out = mystreamer->base.bbs_buffer.maxlen - mystreamer->bytes_written;
     381               1 :             next_out += mystreamer->bytes_written;
     382                 :         }
     383                 :     }
     384              98 : }
     385                 : 
     386                 : /*
     387                 :  * End-of-stream processing.
     388                 :  */
     389                 : static void
     390               1 : bbstreamer_lz4_decompressor_finalize(bbstreamer *streamer)
     391                 : {
     392                 :     bbstreamer_lz4_frame *mystreamer;
     393                 : 
     394               1 :     mystreamer = (bbstreamer_lz4_frame *) streamer;
     395                 : 
     396                 :     /*
     397                 :      * End of the stream, if there is some pending data in output buffers then
     398                 :      * we must forward it to next streamer.
     399                 :      */
     400               1 :     bbstreamer_content(mystreamer->base.bbs_next, NULL,
     401               1 :                        mystreamer->base.bbs_buffer.data,
     402                 :                        mystreamer->base.bbs_buffer.maxlen,
     403                 :                        BBSTREAMER_UNKNOWN);
     404                 : 
     405               1 :     bbstreamer_finalize(mystreamer->base.bbs_next);
     406               1 : }
     407                 : 
     408                 : /*
     409                 :  * Free memory.
     410                 :  */
     411                 : static void
     412               1 : bbstreamer_lz4_decompressor_free(bbstreamer *streamer)
     413                 : {
     414                 :     bbstreamer_lz4_frame *mystreamer;
     415                 : 
     416               1 :     mystreamer = (bbstreamer_lz4_frame *) streamer;
     417               1 :     bbstreamer_free(streamer->bbs_next);
     418               1 :     LZ4F_freeDecompressionContext(mystreamer->dctx);
     419               1 :     pfree(streamer->bbs_buffer.data);
     420               1 :     pfree(streamer);
     421               1 : }
     422                 : #endif
        

Generated by: LCOV version v1.16-55-g56c0a2a