LCOV - differential code coverage report
Current view: top level - src/backend/backup - basebackup_lz4.c (source / functions) Coverage Total Hit UBC CBC
Current: Differential Code Coverage 16@8cea358b128 vs 17@8cea358b128 Lines: 90.1 % 81 73 8 73
Current Date: 2024-04-14 14:21:10 Functions: 100.0 % 7 7 7
Baseline: 16@8cea358b128 Branches: 40.6 % 32 13 19 13
Baseline Date: 2024-04-14 14:21:09 Line coverage date bins:
Legend: Lines: hit not hit | Branches: + taken - not taken # not executed (240..) days: 90.1 % 81 73 8 73
Function coverage date bins:
(240..) days: 100.0 % 7 7 7
Branch coverage date bins:
(240..) days: 40.6 % 32 13 19 13

 Age         Owner                    Branch data    TLA  Line data    Source code
                                  1                 :                : /*-------------------------------------------------------------------------
                                  2                 :                :  *
                                  3                 :                :  * basebackup_lz4.c
                                  4                 :                :  *    Basebackup sink implementing lz4 compression.
                                  5                 :                :  *
                                  6                 :                :  * Portions Copyright (c) 2010-2024, PostgreSQL Global Development Group
                                  7                 :                :  *
                                  8                 :                :  * IDENTIFICATION
                                  9                 :                :  *    src/backend/backup/basebackup_lz4.c
                                 10                 :                :  *
                                 11                 :                :  *-------------------------------------------------------------------------
                                 12                 :                :  */
                                 13                 :                : #include "postgres.h"
                                 14                 :                : 
                                 15                 :                : #ifdef USE_LZ4
                                 16                 :                : #include <lz4frame.h>
                                 17                 :                : #endif
                                 18                 :                : 
                                 19                 :                : #include "backup/basebackup_sink.h"
                                 20                 :                : 
                                 21                 :                : #ifdef USE_LZ4
                                 22                 :                : 
                                 23                 :                : typedef struct bbsink_lz4
                                 24                 :                : {
                                 25                 :                :     /* Common information for all types of sink. */
                                 26                 :                :     bbsink      base;
                                 27                 :                : 
                                 28                 :                :     /* Compression level. */
                                 29                 :                :     int         compresslevel;
                                 30                 :                : 
                                 31                 :                :     LZ4F_compressionContext_t ctx;
                                 32                 :                :     LZ4F_preferences_t prefs;
                                 33                 :                : 
                                 34                 :                :     /* Number of bytes staged in output buffer. */
                                 35                 :                :     size_t      bytes_written;
                                 36                 :                : } bbsink_lz4;
                                 37                 :                : 
                                 38                 :                : static void bbsink_lz4_begin_backup(bbsink *sink);
                                 39                 :                : static void bbsink_lz4_begin_archive(bbsink *sink, const char *archive_name);
                                 40                 :                : static void bbsink_lz4_archive_contents(bbsink *sink, size_t avail_in);
                                 41                 :                : static void bbsink_lz4_manifest_contents(bbsink *sink, size_t len);
                                 42                 :                : static void bbsink_lz4_end_archive(bbsink *sink);
                                 43                 :                : static void bbsink_lz4_cleanup(bbsink *sink);
                                 44                 :                : 
                                 45                 :                : static const bbsink_ops bbsink_lz4_ops = {
                                 46                 :                :     .begin_backup = bbsink_lz4_begin_backup,
                                 47                 :                :     .begin_archive = bbsink_lz4_begin_archive,
                                 48                 :                :     .archive_contents = bbsink_lz4_archive_contents,
                                 49                 :                :     .end_archive = bbsink_lz4_end_archive,
                                 50                 :                :     .begin_manifest = bbsink_forward_begin_manifest,
                                 51                 :                :     .manifest_contents = bbsink_lz4_manifest_contents,
                                 52                 :                :     .end_manifest = bbsink_forward_end_manifest,
                                 53                 :                :     .end_backup = bbsink_forward_end_backup,
                                 54                 :                :     .cleanup = bbsink_lz4_cleanup
                                 55                 :                : };
                                 56                 :                : #endif
                                 57                 :                : 
                                 58                 :                : /*
                                 59                 :                :  * Create a new basebackup sink that performs lz4 compression.
                                 60                 :                :  */
                                 61                 :                : bbsink *
  733 michael@paquier.xyz        62                 :CBC           2 : bbsink_lz4_new(bbsink *next, pg_compress_specification *compress)
                                 63                 :                : {
                                 64                 :                : #ifndef USE_LZ4
                                 65                 :                :     ereport(ERROR,
                                 66                 :                :             (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
                                 67                 :                :              errmsg("lz4 compression is not supported by this build")));
                                 68                 :                :     return NULL;                /* keep compiler quiet */
                                 69                 :                : #else
                                 70                 :                :     bbsink_lz4 *sink;
                                 71                 :                :     int         compresslevel;
                                 72                 :                : 
  793 rhaas@postgresql.org       73         [ -  + ]:              2 :     Assert(next != NULL);
                                 74                 :                : 
  578 michael@paquier.xyz        75                 :              2 :     compresslevel = compress->level;
                                 76   [ +  -  -  + ]:              2 :     Assert(compresslevel >= 0 && compresslevel <= 12);
                                 77                 :                : 
  793 rhaas@postgresql.org       78                 :              2 :     sink = palloc0(sizeof(bbsink_lz4));
                                 79                 :              2 :     *((const bbsink_ops **) &sink->base.bbs_ops) = &bbsink_lz4_ops;
                                 80                 :              2 :     sink->base.bbs_next = next;
                                 81                 :              2 :     sink->compresslevel = compresslevel;
                                 82                 :                : 
                                 83                 :              2 :     return &sink->base;
                                 84                 :                : #endif
                                 85                 :                : }
                                 86                 :                : 
                                 87                 :                : #ifdef USE_LZ4
                                 88                 :                : 
                                 89                 :                : /*
                                 90                 :                :  * Begin backup.
                                 91                 :                :  */
                                 92                 :                : static void
                                 93                 :              2 : bbsink_lz4_begin_backup(bbsink *sink)
                                 94                 :                : {
                                 95                 :              2 :     bbsink_lz4 *mysink = (bbsink_lz4 *) sink;
                                 96                 :                :     size_t      output_buffer_bound;
                                 97                 :              2 :     LZ4F_preferences_t *prefs = &mysink->prefs;
                                 98                 :                : 
                                 99                 :                :     /* Initialize compressor object. */
                                100                 :              2 :     memset(prefs, 0, sizeof(LZ4F_preferences_t));
                                101                 :              2 :     prefs->frameInfo.blockSizeID = LZ4F_max256KB;
                                102                 :              2 :     prefs->compressionLevel = mysink->compresslevel;
                                103                 :                : 
                                104                 :                :     /*
                                105                 :                :      * We need our own buffer, because we're going to pass different data to
                                106                 :                :      * the next sink than what gets passed to us.
                                107                 :                :      */
                                108                 :              2 :     mysink->base.bbs_buffer = palloc(mysink->base.bbs_buffer_length);
                                109                 :                : 
                                110                 :                :     /*
                                111                 :                :      * Since LZ4F_compressUpdate() requires the output buffer of size equal or
                                112                 :                :      * greater than that of LZ4F_compressBound(), make sure we have the next
                                113                 :                :      * sink's bbs_buffer of length that can accommodate the compressed input
                                114                 :                :      * buffer.
                                115                 :                :      */
                                116                 :              2 :     output_buffer_bound = LZ4F_compressBound(mysink->base.bbs_buffer_length,
                                117                 :              2 :                                              &mysink->prefs);
                                118                 :                : 
                                119                 :                :     /*
                                120                 :                :      * The buffer length is expected to be a multiple of BLCKSZ, so round up.
                                121                 :                :      */
                                122                 :              2 :     output_buffer_bound = output_buffer_bound + BLCKSZ -
                                123                 :                :         (output_buffer_bound % BLCKSZ);
                                124                 :                : 
                                125                 :              2 :     bbsink_begin_backup(sink->bbs_next, sink->bbs_state, output_buffer_bound);
                                126                 :              2 : }
                                127                 :                : 
                                128                 :                : /*
                                129                 :                :  * Prepare to compress the next archive.
                                130                 :                :  */
                                131                 :                : static void
                                132                 :              2 : bbsink_lz4_begin_archive(bbsink *sink, const char *archive_name)
                                133                 :                : {
                                134                 :              2 :     bbsink_lz4 *mysink = (bbsink_lz4 *) sink;
                                135                 :                :     char       *lz4_archive_name;
                                136                 :                :     LZ4F_errorCode_t ctxError;
                                137                 :                :     size_t      headerSize;
                                138                 :                : 
                                139                 :              2 :     ctxError = LZ4F_createCompressionContext(&mysink->ctx, LZ4F_VERSION);
                                140         [ -  + ]:              2 :     if (LZ4F_isError(ctxError))
  793 rhaas@postgresql.org      141         [ #  # ]:UBC           0 :         elog(ERROR, "could not create lz4 compression context: %s",
                                142                 :                :              LZ4F_getErrorName(ctxError));
                                143                 :                : 
                                144                 :                :     /* First of all write the frame header to destination buffer. */
  793 rhaas@postgresql.org      145                 :CBC           2 :     headerSize = LZ4F_compressBegin(mysink->ctx,
                                146                 :              2 :                                     mysink->base.bbs_next->bbs_buffer,
                                147                 :              2 :                                     mysink->base.bbs_next->bbs_buffer_length,
                                148                 :              2 :                                     &mysink->prefs);
                                149                 :                : 
                                150         [ -  + ]:              2 :     if (LZ4F_isError(headerSize))
  793 rhaas@postgresql.org      151         [ #  # ]:UBC           0 :         elog(ERROR, "could not write lz4 header: %s",
                                152                 :                :              LZ4F_getErrorName(headerSize));
                                153                 :                : 
                                154                 :                :     /*
                                155                 :                :      * We need to write the compressed data after the header in the output
                                156                 :                :      * buffer. So, make sure to update the notion of bytes written to output
                                157                 :                :      * buffer.
                                158                 :                :      */
  793 rhaas@postgresql.org      159                 :CBC           2 :     mysink->bytes_written += headerSize;
                                160                 :                : 
                                161                 :                :     /* Add ".lz4" to the archive name. */
                                162                 :              2 :     lz4_archive_name = psprintf("%s.lz4", archive_name);
                                163         [ -  + ]:              2 :     Assert(sink->bbs_next != NULL);
                                164                 :              2 :     bbsink_begin_archive(sink->bbs_next, lz4_archive_name);
                                165                 :              2 :     pfree(lz4_archive_name);
                                166                 :              2 : }
                                167                 :                : 
                                168                 :                : /*
                                169                 :                :  * Compress the input data to the output buffer until we run out of input
                                170                 :                :  * data. Each time the output buffer falls below the compression bound for
                                171                 :                :  * the input buffer, invoke the archive_contents() method for then next sink.
                                172                 :                :  *
                                173                 :                :  * Note that since we're compressing the input, it may very commonly happen
                                174                 :                :  * that we consume all the input data without filling the output buffer. In
                                175                 :                :  * that case, the compressed representation of the current input data won't
                                176                 :                :  * actually be sent to the next bbsink until a later call to this function,
                                177                 :                :  * or perhaps even not until bbsink_lz4_end_archive() is invoked.
                                178                 :                :  */
                                179                 :                : static void
                                180                 :           5395 : bbsink_lz4_archive_contents(bbsink *sink, size_t avail_in)
                                181                 :                : {
                                182                 :           5395 :     bbsink_lz4 *mysink = (bbsink_lz4 *) sink;
                                183                 :                :     size_t      compressedSize;
                                184                 :                :     size_t      avail_in_bound;
                                185                 :                : 
                                186                 :           5395 :     avail_in_bound = LZ4F_compressBound(avail_in, &mysink->prefs);
                                187                 :                : 
                                188                 :                :     /*
                                189                 :                :      * If the number of available bytes has fallen below the value computed by
                                190                 :                :      * LZ4F_compressBound(), ask the next sink to process the data so that we
                                191                 :                :      * can empty the buffer.
                                192                 :                :      */
  768                           193         [ +  + ]:           5395 :     if ((mysink->base.bbs_next->bbs_buffer_length - mysink->bytes_written) <
                                194                 :                :         avail_in_bound)
                                195                 :                :     {
  793                           196                 :            179 :         bbsink_archive_contents(sink->bbs_next, mysink->bytes_written);
                                197                 :            179 :         mysink->bytes_written = 0;
                                198                 :                :     }
                                199                 :                : 
                                200                 :                :     /*
                                201                 :                :      * Compress the input buffer and write it into the output buffer.
                                202                 :                :      */
                                203                 :           5395 :     compressedSize = LZ4F_compressUpdate(mysink->ctx,
                                204                 :           5395 :                                          mysink->base.bbs_next->bbs_buffer + mysink->bytes_written,
                                205                 :           5395 :                                          mysink->base.bbs_next->bbs_buffer_length - mysink->bytes_written,
                                206                 :           5395 :                                          (uint8 *) mysink->base.bbs_buffer,
                                207                 :                :                                          avail_in,
                                208                 :                :                                          NULL);
                                209                 :                : 
                                210         [ -  + ]:           5395 :     if (LZ4F_isError(compressedSize))
  793 rhaas@postgresql.org      211         [ #  # ]:UBC           0 :         elog(ERROR, "could not compress data: %s",
                                212                 :                :              LZ4F_getErrorName(compressedSize));
                                213                 :                : 
                                214                 :                :     /*
                                215                 :                :      * Update our notion of how many bytes we've written into output buffer.
                                216                 :                :      */
  793 rhaas@postgresql.org      217                 :CBC        5395 :     mysink->bytes_written += compressedSize;
                                218                 :           5395 : }
                                219                 :                : 
                                220                 :                : /*
                                221                 :                :  * There might be some data inside lz4's internal buffers; we need to get
                                222                 :                :  * that flushed out and also finalize the lz4 frame and then get that forwarded
                                223                 :                :  * to the successor sink as archive content.
                                224                 :                :  *
                                225                 :                :  * Then we can end processing for this archive.
                                226                 :                :  */
                                227                 :                : static void
                                228                 :              2 : bbsink_lz4_end_archive(bbsink *sink)
                                229                 :                : {
                                230                 :              2 :     bbsink_lz4 *mysink = (bbsink_lz4 *) sink;
                                231                 :                :     size_t      compressedSize;
                                232                 :                :     size_t      lz4_footer_bound;
                                233                 :                : 
                                234                 :              2 :     lz4_footer_bound = LZ4F_compressBound(0, &mysink->prefs);
                                235                 :                : 
                                236         [ -  + ]:              2 :     Assert(mysink->base.bbs_next->bbs_buffer_length >= lz4_footer_bound);
                                237                 :                : 
  768                           238         [ -  + ]:              2 :     if ((mysink->base.bbs_next->bbs_buffer_length - mysink->bytes_written) <
                                239                 :                :         lz4_footer_bound)
                                240                 :                :     {
  793 rhaas@postgresql.org      241                 :UBC           0 :         bbsink_archive_contents(sink->bbs_next, mysink->bytes_written);
                                242                 :              0 :         mysink->bytes_written = 0;
                                243                 :                :     }
                                244                 :                : 
  793 rhaas@postgresql.org      245                 :CBC           2 :     compressedSize = LZ4F_compressEnd(mysink->ctx,
                                246                 :              2 :                                       mysink->base.bbs_next->bbs_buffer + mysink->bytes_written,
                                247                 :              2 :                                       mysink->base.bbs_next->bbs_buffer_length - mysink->bytes_written,
                                248                 :                :                                       NULL);
                                249                 :                : 
                                250         [ -  + ]:              2 :     if (LZ4F_isError(compressedSize))
  793 rhaas@postgresql.org      251         [ #  # ]:UBC           0 :         elog(ERROR, "could not end lz4 compression: %s",
                                252                 :                :              LZ4F_getErrorName(compressedSize));
                                253                 :                : 
                                254                 :                :     /* Update our notion of how many bytes we've written. */
  793 rhaas@postgresql.org      255                 :CBC           2 :     mysink->bytes_written += compressedSize;
                                256                 :                : 
                                257                 :                :     /* Send whatever accumulated output bytes we have. */
                                258                 :              2 :     bbsink_archive_contents(sink->bbs_next, mysink->bytes_written);
                                259                 :              2 :     mysink->bytes_written = 0;
                                260                 :                : 
                                261                 :                :     /* Release the resources. */
                                262                 :              2 :     LZ4F_freeCompressionContext(mysink->ctx);
                                263                 :              2 :     mysink->ctx = NULL;
                                264                 :                : 
                                265                 :                :     /* Pass on the information that this archive has ended. */
                                266                 :              2 :     bbsink_forward_end_archive(sink);
                                267                 :              2 : }
                                268                 :                : 
                                269                 :                : /*
                                270                 :                :  * Manifest contents are not compressed, but we do need to copy them into
                                271                 :                :  * the successor sink's buffer, because we have our own.
                                272                 :                :  */
                                273                 :                : static void
                                274                 :             10 : bbsink_lz4_manifest_contents(bbsink *sink, size_t len)
                                275                 :                : {
                                276                 :             10 :     memcpy(sink->bbs_next->bbs_buffer, sink->bbs_buffer, len);
                                277                 :             10 :     bbsink_manifest_contents(sink->bbs_next, len);
                                278                 :             10 : }
                                279                 :                : 
                                280                 :                : /*
                                281                 :                :  * In case the backup fails, make sure we free the compression context by
                                282                 :                :  * calling LZ4F_freeCompressionContext() if needed to avoid memory leak.
                                283                 :                :  */
                                284                 :                : static void
                                285                 :              2 : bbsink_lz4_cleanup(bbsink *sink)
                                286                 :                : {
                                287                 :              2 :     bbsink_lz4 *mysink = (bbsink_lz4 *) sink;
                                288                 :                : 
                                289         [ -  + ]:              2 :     if (mysink->ctx)
                                290                 :                :     {
  793 rhaas@postgresql.org      291                 :UBC           0 :         LZ4F_freeCompressionContext(mysink->ctx);
                                292                 :              0 :         mysink->ctx = NULL;
                                293                 :                :     }
  793 rhaas@postgresql.org      294                 :CBC           2 : }
                                295                 :                : 
                                296                 :                : #endif
        

Generated by: LCOV version 2.1-beta2-3-g6141622