LCOV - differential code coverage report
Current view: top level - src/backend/backup - basebackup_lz4.c (source / functions) Coverage Total Hit UBC CBC
Current: Differential Code Coverage HEAD vs 15 Lines: 90.1 % 81 73 8 73
Current Date: 2023-04-08 17:13:01 Functions: 100.0 % 7 7 7
Baseline: 15 Line coverage date bins:
Baseline Date: 2023-04-08 15:09:40 (180,240] days: 100.0 % 2 2 2
Legend: Lines: hit not hit (240..) days: 89.9 % 79 71 8 71
Function coverage date bins:
(240..) days: 100.0 % 7 7 7

 Age         Owner                  TLA  Line data    Source code
                                  1                 : /*-------------------------------------------------------------------------
                                  2                 :  *
                                  3                 :  * basebackup_lz4.c
                                  4                 :  *    Basebackup sink implementing lz4 compression.
                                  5                 :  *
                                  6                 :  * Portions Copyright (c) 2010-2023, PostgreSQL Global Development Group
                                  7                 :  *
                                  8                 :  * IDENTIFICATION
                                  9                 :  *    src/backend/backup/basebackup_lz4.c
                                 10                 :  *
                                 11                 :  *-------------------------------------------------------------------------
                                 12                 :  */
                                 13                 : #include "postgres.h"
                                 14                 : 
                                 15                 : #ifdef USE_LZ4
                                 16                 : #include <lz4frame.h>
                                 17                 : #endif
                                 18                 : 
                                 19                 : #include "backup/basebackup_sink.h"
                                 20                 : 
                                 21                 : #ifdef USE_LZ4
                                 22                 : 
                                 23                 : typedef struct bbsink_lz4
                                 24                 : {
                                 25                 :     /* Common information for all types of sink. */
                                 26                 :     bbsink      base;
                                 27                 : 
                                 28                 :     /* Compression level. */
                                 29                 :     int         compresslevel;
                                 30                 : 
                                 31                 :     LZ4F_compressionContext_t ctx;
                                 32                 :     LZ4F_preferences_t prefs;
                                 33                 : 
                                 34                 :     /* Number of bytes staged in output buffer. */
                                 35                 :     size_t      bytes_written;
                                 36                 : } bbsink_lz4;
                                 37                 : 
                                 38                 : static void bbsink_lz4_begin_backup(bbsink *sink);
                                 39                 : static void bbsink_lz4_begin_archive(bbsink *sink, const char *archive_name);
                                 40                 : static void bbsink_lz4_archive_contents(bbsink *sink, size_t avail_in);
                                 41                 : static void bbsink_lz4_manifest_contents(bbsink *sink, size_t len);
                                 42                 : static void bbsink_lz4_end_archive(bbsink *sink);
                                 43                 : static void bbsink_lz4_cleanup(bbsink *sink);
                                 44                 : 
                                 45                 : static const bbsink_ops bbsink_lz4_ops = {
                                 46                 :     .begin_backup = bbsink_lz4_begin_backup,
                                 47                 :     .begin_archive = bbsink_lz4_begin_archive,
                                 48                 :     .archive_contents = bbsink_lz4_archive_contents,
                                 49                 :     .end_archive = bbsink_lz4_end_archive,
                                 50                 :     .begin_manifest = bbsink_forward_begin_manifest,
                                 51                 :     .manifest_contents = bbsink_lz4_manifest_contents,
                                 52                 :     .end_manifest = bbsink_forward_end_manifest,
                                 53                 :     .end_backup = bbsink_forward_end_backup,
                                 54                 :     .cleanup = bbsink_lz4_cleanup
                                 55                 : };
                                 56                 : #endif
                                 57                 : 
                                 58                 : /*
                                 59                 :  * Create a new basebackup sink that performs lz4 compression.
                                 60                 :  */
                                 61                 : bbsink *
  362 michael                    62 CBC           2 : bbsink_lz4_new(bbsink *next, pg_compress_specification *compress)
                                 63                 : {
                                 64                 : #ifndef USE_LZ4
                                 65                 :     ereport(ERROR,
                                 66                 :             (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
                                 67                 :              errmsg("lz4 compression is not supported by this build")));
                                 68                 :     return NULL;                /* keep compiler quiet */
                                 69                 : #else
                                 70                 :     bbsink_lz4 *sink;
                                 71                 :     int         compresslevel;
                                 72                 : 
  422 rhaas                      73               2 :     Assert(next != NULL);
                                 74                 : 
  207 michael                    75               2 :     compresslevel = compress->level;
                                 76               2 :     Assert(compresslevel >= 0 && compresslevel <= 12);
                                 77                 : 
  422 rhaas                      78               2 :     sink = palloc0(sizeof(bbsink_lz4));
                                 79               2 :     *((const bbsink_ops **) &sink->base.bbs_ops) = &bbsink_lz4_ops;
                                 80               2 :     sink->base.bbs_next = next;
                                 81               2 :     sink->compresslevel = compresslevel;
                                 82                 : 
                                 83               2 :     return &sink->base;
                                 84                 : #endif
                                 85                 : }
                                 86                 : 
                                 87                 : #ifdef USE_LZ4
                                 88                 : 
                                 89                 : /*
                                 90                 :  * Begin backup.
                                 91                 :  */
                                 92                 : static void
                                 93               2 : bbsink_lz4_begin_backup(bbsink *sink)
                                 94                 : {
                                 95               2 :     bbsink_lz4 *mysink = (bbsink_lz4 *) sink;
                                 96                 :     size_t      output_buffer_bound;
                                 97               2 :     LZ4F_preferences_t *prefs = &mysink->prefs;
                                 98                 : 
                                 99                 :     /* Initialize compressor object. */
                                100               2 :     memset(prefs, 0, sizeof(LZ4F_preferences_t));
                                101               2 :     prefs->frameInfo.blockSizeID = LZ4F_max256KB;
                                102               2 :     prefs->compressionLevel = mysink->compresslevel;
                                103                 : 
                                104                 :     /*
                                105                 :      * We need our own buffer, because we're going to pass different data to
                                106                 :      * the next sink than what gets passed to us.
                                107                 :      */
                                108               2 :     mysink->base.bbs_buffer = palloc(mysink->base.bbs_buffer_length);
                                109                 : 
                                110                 :     /*
                                111                 :      * Since LZ4F_compressUpdate() requires the output buffer of size equal or
                                112                 :      * greater than that of LZ4F_compressBound(), make sure we have the next
                                113                 :      * sink's bbs_buffer of length that can accommodate the compressed input
                                114                 :      * buffer.
                                115                 :      */
                                116               2 :     output_buffer_bound = LZ4F_compressBound(mysink->base.bbs_buffer_length,
                                117               2 :                                              &mysink->prefs);
                                118                 : 
                                119                 :     /*
                                120                 :      * The buffer length is expected to be a multiple of BLCKSZ, so round up.
                                121                 :      */
                                122               2 :     output_buffer_bound = output_buffer_bound + BLCKSZ -
                                123                 :         (output_buffer_bound % BLCKSZ);
                                124                 : 
                                125               2 :     bbsink_begin_backup(sink->bbs_next, sink->bbs_state, output_buffer_bound);
                                126               2 : }
                                127                 : 
                                128                 : /*
                                129                 :  * Prepare to compress the next archive.
                                130                 :  */
                                131                 : static void
                                132               2 : bbsink_lz4_begin_archive(bbsink *sink, const char *archive_name)
                                133                 : {
                                134               2 :     bbsink_lz4 *mysink = (bbsink_lz4 *) sink;
                                135                 :     char       *lz4_archive_name;
                                136                 :     LZ4F_errorCode_t ctxError;
                                137                 :     size_t      headerSize;
                                138                 : 
                                139               2 :     ctxError = LZ4F_createCompressionContext(&mysink->ctx, LZ4F_VERSION);
                                140               2 :     if (LZ4F_isError(ctxError))
  422 rhaas                     141 UBC           0 :         elog(ERROR, "could not create lz4 compression context: %s",
                                142                 :              LZ4F_getErrorName(ctxError));
                                143                 : 
                                144                 :     /* First of all write the frame header to destination buffer. */
  422 rhaas                     145 CBC           2 :     headerSize = LZ4F_compressBegin(mysink->ctx,
                                146               2 :                                     mysink->base.bbs_next->bbs_buffer,
                                147               2 :                                     mysink->base.bbs_next->bbs_buffer_length,
                                148               2 :                                     &mysink->prefs);
                                149                 : 
                                150               2 :     if (LZ4F_isError(headerSize))
  422 rhaas                     151 UBC           0 :         elog(ERROR, "could not write lz4 header: %s",
                                152                 :              LZ4F_getErrorName(headerSize));
                                153                 : 
                                154                 :     /*
                                155                 :      * We need to write the compressed data after the header in the output
                                156                 :      * buffer. So, make sure to update the notion of bytes written to output
                                157                 :      * buffer.
                                158                 :      */
  422 rhaas                     159 CBC           2 :     mysink->bytes_written += headerSize;
                                160                 : 
                                161                 :     /* Add ".lz4" to the archive name. */
                                162               2 :     lz4_archive_name = psprintf("%s.lz4", archive_name);
                                163               2 :     Assert(sink->bbs_next != NULL);
                                164               2 :     bbsink_begin_archive(sink->bbs_next, lz4_archive_name);
                                165               2 :     pfree(lz4_archive_name);
                                166               2 : }
                                167                 : 
                                168                 : /*
                                169                 :  * Compress the input data to the output buffer until we run out of input
                                170                 :  * data. Each time the output buffer falls below the compression bound for
                                171                 :  * the input buffer, invoke the archive_contents() method for then next sink.
                                172                 :  *
                                173                 :  * Note that since we're compressing the input, it may very commonly happen
                                174                 :  * that we consume all the input data without filling the output buffer. In
                                175                 :  * that case, the compressed representation of the current input data won't
                                176                 :  * actually be sent to the next bbsink until a later call to this function,
                                177                 :  * or perhaps even not until bbsink_lz4_end_archive() is invoked.
                                178                 :  */
                                179                 : static void
                                180            5405 : bbsink_lz4_archive_contents(bbsink *sink, size_t avail_in)
                                181                 : {
                                182            5405 :     bbsink_lz4 *mysink = (bbsink_lz4 *) sink;
                                183                 :     size_t      compressedSize;
                                184                 :     size_t      avail_in_bound;
                                185                 : 
                                186            5405 :     avail_in_bound = LZ4F_compressBound(avail_in, &mysink->prefs);
                                187                 : 
                                188                 :     /*
                                189                 :      * If the number of available bytes has fallen below the value computed by
                                190                 :      * LZ4F_compressBound(), ask the next sink to process the data so that we
                                191                 :      * can empty the buffer.
                                192                 :      */
  397                           193            5405 :     if ((mysink->base.bbs_next->bbs_buffer_length - mysink->bytes_written) <
                                194                 :         avail_in_bound)
                                195                 :     {
  422                           196             195 :         bbsink_archive_contents(sink->bbs_next, mysink->bytes_written);
                                197             195 :         mysink->bytes_written = 0;
                                198                 :     }
                                199                 : 
                                200                 :     /*
                                201                 :      * Compress the input buffer and write it into the output buffer.
                                202                 :      */
                                203            5405 :     compressedSize = LZ4F_compressUpdate(mysink->ctx,
                                204            5405 :                                          mysink->base.bbs_next->bbs_buffer + mysink->bytes_written,
                                205            5405 :                                          mysink->base.bbs_next->bbs_buffer_length - mysink->bytes_written,
                                206            5405 :                                          (uint8 *) mysink->base.bbs_buffer,
                                207                 :                                          avail_in,
                                208                 :                                          NULL);
                                209                 : 
                                210            5405 :     if (LZ4F_isError(compressedSize))
  422 rhaas                     211 UBC           0 :         elog(ERROR, "could not compress data: %s",
                                212                 :              LZ4F_getErrorName(compressedSize));
                                213                 : 
                                214                 :     /*
                                215                 :      * Update our notion of how many bytes we've written into output buffer.
                                216                 :      */
  422 rhaas                     217 CBC        5405 :     mysink->bytes_written += compressedSize;
                                218            5405 : }
                                219                 : 
                                220                 : /*
                                221                 :  * There might be some data inside lz4's internal buffers; we need to get
                                222                 :  * that flushed out and also finalize the lz4 frame and then get that forwarded
                                223                 :  * to the successor sink as archive content.
                                224                 :  *
                                225                 :  * Then we can end processing for this archive.
                                226                 :  */
                                227                 : static void
                                228               2 : bbsink_lz4_end_archive(bbsink *sink)
                                229                 : {
                                230               2 :     bbsink_lz4 *mysink = (bbsink_lz4 *) sink;
                                231                 :     size_t      compressedSize;
                                232                 :     size_t      lz4_footer_bound;
                                233                 : 
                                234               2 :     lz4_footer_bound = LZ4F_compressBound(0, &mysink->prefs);
                                235                 : 
                                236               2 :     Assert(mysink->base.bbs_next->bbs_buffer_length >= lz4_footer_bound);
                                237                 : 
  397                           238               2 :     if ((mysink->base.bbs_next->bbs_buffer_length - mysink->bytes_written) <
                                239                 :         lz4_footer_bound)
                                240                 :     {
  422 rhaas                     241 UBC           0 :         bbsink_archive_contents(sink->bbs_next, mysink->bytes_written);
                                242               0 :         mysink->bytes_written = 0;
                                243                 :     }
                                244                 : 
  422 rhaas                     245 CBC           2 :     compressedSize = LZ4F_compressEnd(mysink->ctx,
                                246               2 :                                       mysink->base.bbs_next->bbs_buffer + mysink->bytes_written,
                                247               2 :                                       mysink->base.bbs_next->bbs_buffer_length - mysink->bytes_written,
                                248                 :                                       NULL);
                                249                 : 
                                250               2 :     if (LZ4F_isError(compressedSize))
  422 rhaas                     251 UBC           0 :         elog(ERROR, "could not end lz4 compression: %s",
                                252                 :              LZ4F_getErrorName(compressedSize));
                                253                 : 
                                254                 :     /* Update our notion of how many bytes we've written. */
  422 rhaas                     255 CBC           2 :     mysink->bytes_written += compressedSize;
                                256                 : 
                                257                 :     /* Send whatever accumulated output bytes we have. */
                                258               2 :     bbsink_archive_contents(sink->bbs_next, mysink->bytes_written);
                                259               2 :     mysink->bytes_written = 0;
                                260                 : 
                                261                 :     /* Release the resources. */
                                262               2 :     LZ4F_freeCompressionContext(mysink->ctx);
                                263               2 :     mysink->ctx = NULL;
                                264                 : 
                                265                 :     /* Pass on the information that this archive has ended. */
                                266               2 :     bbsink_forward_end_archive(sink);
                                267               2 : }
                                268                 : 
                                269                 : /*
                                270                 :  * Manifest contents are not compressed, but we do need to copy them into
                                271                 :  * the successor sink's buffer, because we have our own.
                                272                 :  */
                                273                 : static void
                                274              10 : bbsink_lz4_manifest_contents(bbsink *sink, size_t len)
                                275                 : {
                                276              10 :     memcpy(sink->bbs_next->bbs_buffer, sink->bbs_buffer, len);
                                277              10 :     bbsink_manifest_contents(sink->bbs_next, len);
                                278              10 : }
                                279                 : 
                                280                 : /*
                                281                 :  * In case the backup fails, make sure we free the compression context by
                                282                 :  * calling LZ4F_freeCompressionContext() if needed to avoid memory leak.
                                283                 :  */
                                284                 : static void
                                285               2 : bbsink_lz4_cleanup(bbsink *sink)
                                286                 : {
                                287               2 :     bbsink_lz4 *mysink = (bbsink_lz4 *) sink;
                                288                 : 
                                289               2 :     if (mysink->ctx)
                                290                 :     {
  422 rhaas                     291 UBC           0 :         LZ4F_freeCompressionContext(mysink->ctx);
                                292               0 :         mysink->ctx = NULL;
                                293                 :     }
  422 rhaas                     294 CBC           2 : }
                                295                 : 
                                296                 : #endif
        

Generated by: LCOV version v1.16-55-g56c0a2a