LCOV - differential code coverage report
Current view: top level - src/backend/backup - basebackup_zstd.c (source / functions) Coverage Total Hit LBC UBC CBC
Current: Differential Code Coverage 16@8cea358b128 vs 17@8cea358b128 Lines: 85.7 % 91 78 5 8 78
Current Date: 2024-04-14 14:21:10 Functions: 100.0 % 8 8 8
Baseline: 16@8cea358b128 Branches: 45.7 % 46 21 2 23 21
Baseline Date: 2024-04-14 14:21:09 Line coverage date bins:
Legend: Lines: hit not hit | Branches: + taken - not taken # not executed (240..) days: 85.7 % 91 78 5 8 78
Function coverage date bins:
(240..) days: 100.0 % 8 8 8
Branch coverage date bins:
(240..) days: 45.7 % 46 21 2 23 21

 Age         Owner                    Branch data    TLA  Line data    Source code
                                  1                 :                : /*-------------------------------------------------------------------------
                                  2                 :                :  *
                                  3                 :                :  * basebackup_zstd.c
                                  4                 :                :  *    Basebackup sink implementing zstd compression.
                                  5                 :                :  *
                                  6                 :                :  * Portions Copyright (c) 2010-2024, PostgreSQL Global Development Group
                                  7                 :                :  *
                                  8                 :                :  * IDENTIFICATION
                                  9                 :                :  *    src/backend/backup/basebackup_zstd.c
                                 10                 :                :  *
                                 11                 :                :  *-------------------------------------------------------------------------
                                 12                 :                :  */
                                 13                 :                : #include "postgres.h"
                                 14                 :                : 
                                 15                 :                : #ifdef USE_ZSTD
                                 16                 :                : #include <zstd.h>
                                 17                 :                : #endif
                                 18                 :                : 
                                 19                 :                : #include "backup/basebackup_sink.h"
                                 20                 :                : 
                                 21                 :                : #ifdef USE_ZSTD
                                 22                 :                : 
                                 23                 :                : typedef struct bbsink_zstd
                                 24                 :                : {
                                 25                 :                :     /* Common information for all types of sink. */
                                 26                 :                :     bbsink      base;
                                 27                 :                : 
                                 28                 :                :     /* Compression options */
                                 29                 :                :     pg_compress_specification *compress;
                                 30                 :                : 
                                 31                 :                :     ZSTD_CCtx  *cctx;
                                 32                 :                :     ZSTD_outBuffer zstd_outBuf;
                                 33                 :                : } bbsink_zstd;
                                 34                 :                : 
                                 35                 :                : static void bbsink_zstd_begin_backup(bbsink *sink);
                                 36                 :                : static void bbsink_zstd_begin_archive(bbsink *sink, const char *archive_name);
                                 37                 :                : static void bbsink_zstd_archive_contents(bbsink *sink, size_t avail_in);
                                 38                 :                : static void bbsink_zstd_manifest_contents(bbsink *sink, size_t len);
                                 39                 :                : static void bbsink_zstd_end_archive(bbsink *sink);
                                 40                 :                : static void bbsink_zstd_cleanup(bbsink *sink);
                                 41                 :                : static void bbsink_zstd_end_backup(bbsink *sink, XLogRecPtr endptr,
                                 42                 :                :                                    TimeLineID endtli);
                                 43                 :                : 
                                 44                 :                : static const bbsink_ops bbsink_zstd_ops = {
                                 45                 :                :     .begin_backup = bbsink_zstd_begin_backup,
                                 46                 :                :     .begin_archive = bbsink_zstd_begin_archive,
                                 47                 :                :     .archive_contents = bbsink_zstd_archive_contents,
                                 48                 :                :     .end_archive = bbsink_zstd_end_archive,
                                 49                 :                :     .begin_manifest = bbsink_forward_begin_manifest,
                                 50                 :                :     .manifest_contents = bbsink_zstd_manifest_contents,
                                 51                 :                :     .end_manifest = bbsink_forward_end_manifest,
                                 52                 :                :     .end_backup = bbsink_zstd_end_backup,
                                 53                 :                :     .cleanup = bbsink_zstd_cleanup
                                 54                 :                : };
                                 55                 :                : #endif
                                 56                 :                : 
                                 57                 :                : /*
                                 58                 :                :  * Create a new basebackup sink that performs zstd compression.
                                 59                 :                :  */
                                 60                 :                : bbsink *
  733 michael@paquier.xyz        61                 :CBC           4 : bbsink_zstd_new(bbsink *next, pg_compress_specification *compress)
                                 62                 :                : {
                                 63                 :                : #ifndef USE_ZSTD
                                 64                 :                :     ereport(ERROR,
                                 65                 :                :             (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
                                 66                 :                :              errmsg("zstd compression is not supported by this build")));
                                 67                 :                :     return NULL;                /* keep compiler quiet */
                                 68                 :                : #else
                                 69                 :                :     bbsink_zstd *sink;
                                 70                 :                : 
  769 rhaas@postgresql.org       71         [ -  + ]:              4 :     Assert(next != NULL);
                                 72                 :                : 
                                 73                 :              4 :     sink = palloc0(sizeof(bbsink_zstd));
                                 74                 :              4 :     *((const bbsink_ops **) &sink->base.bbs_ops) = &bbsink_zstd_ops;
                                 75                 :              4 :     sink->base.bbs_next = next;
  746                            76                 :              4 :     sink->compress = compress;
                                 77                 :                : 
  769                            78                 :              4 :     return &sink->base;
                                 79                 :                : #endif
                                 80                 :                : }
                                 81                 :                : 
                                 82                 :                : #ifdef USE_ZSTD
                                 83                 :                : 
                                 84                 :                : /*
                                 85                 :                :  * Begin backup.
                                 86                 :                :  */
                                 87                 :                : static void
                                 88                 :              4 : bbsink_zstd_begin_backup(bbsink *sink)
                                 89                 :                : {
                                 90                 :              4 :     bbsink_zstd *mysink = (bbsink_zstd *) sink;
                                 91                 :                :     size_t      output_buffer_bound;
                                 92                 :                :     size_t      ret;
  733 michael@paquier.xyz        93                 :              4 :     pg_compress_specification *compress = mysink->compress;
                                 94                 :                : 
  769 rhaas@postgresql.org       95                 :              4 :     mysink->cctx = ZSTD_createCCtx();
                                 96         [ -  + ]:              4 :     if (!mysink->cctx)
  769 rhaas@postgresql.org       97         [ #  # ]:UBC           0 :         elog(ERROR, "could not create zstd compression context");
                                 98                 :                : 
  578 michael@paquier.xyz        99                 :CBC           4 :     ret = ZSTD_CCtx_setParameter(mysink->cctx, ZSTD_c_compressionLevel,
                                100                 :                :                                  compress->level);
                                101         [ -  + ]:              4 :     if (ZSTD_isError(ret))
  578 michael@paquier.xyz       102         [ #  # ]:UBC           0 :         elog(ERROR, "could not set zstd compression level to %d: %s",
                                103                 :                :              compress->level, ZSTD_getErrorName(ret));
                                104                 :                : 
  733 michael@paquier.xyz       105         [ +  + ]:CBC           4 :     if ((compress->options & PG_COMPRESSION_OPTION_WORKERS) != 0)
                                106                 :                :     {
                                107                 :                :         /*
                                108                 :                :          * On older versions of libzstd, this option does not exist, and
                                109                 :                :          * trying to set it will fail. Similarly for newer versions if they
                                110                 :                :          * are compiled without threading support.
                                111                 :                :          */
  746 rhaas@postgresql.org      112                 :              1 :         ret = ZSTD_CCtx_setParameter(mysink->cctx, ZSTD_c_nbWorkers,
                                113                 :                :                                      compress->workers);
                                114         [ -  + ]:              1 :         if (ZSTD_isError(ret))
  746 rhaas@postgresql.org      115         [ #  # ]:UBC           0 :             ereport(ERROR,
                                116                 :                :                     errcode(ERRCODE_INVALID_PARAMETER_VALUE),
                                117                 :                :                     errmsg("could not set compression worker count to %d: %s",
                                118                 :                :                            compress->workers, ZSTD_getErrorName(ret)));
                                119                 :                :     }
                                120                 :                : 
  374 tomas.vondra@postgre      121         [ +  + ]:CBC           4 :     if ((compress->options & PG_COMPRESSION_OPTION_LONG_DISTANCE) != 0)
                                122                 :                :     {
                                123                 :              1 :         ret = ZSTD_CCtx_setParameter(mysink->cctx,
                                124                 :                :                                      ZSTD_c_enableLongDistanceMatching,
                                125                 :              1 :                                      compress->long_distance);
                                126         [ -  + ]:              1 :         if (ZSTD_isError(ret))
  374 tomas.vondra@postgre      127         [ #  # ]:UBC           0 :             ereport(ERROR,
                                128                 :                :                     errcode(ERRCODE_INVALID_PARAMETER_VALUE),
                                129                 :                :                     errmsg("could not enable long-distance mode: %s",
                                130                 :                :                            ZSTD_getErrorName(ret)));
                                131                 :                :     }
                                132                 :                : 
                                133                 :                :     /*
                                134                 :                :      * We need our own buffer, because we're going to pass different data to
                                135                 :                :      * the next sink than what gets passed to us.
                                136                 :                :      */
  769 rhaas@postgresql.org      137                 :CBC           4 :     mysink->base.bbs_buffer = palloc(mysink->base.bbs_buffer_length);
                                138                 :                : 
                                139                 :                :     /*
                                140                 :                :      * Make sure that the next sink's bbs_buffer is big enough to accommodate
                                141                 :                :      * the compressed input buffer.
                                142                 :                :      */
                                143                 :              4 :     output_buffer_bound = ZSTD_compressBound(mysink->base.bbs_buffer_length);
                                144                 :                : 
                                145                 :                :     /*
                                146                 :                :      * The buffer length is expected to be a multiple of BLCKSZ, so round up.
                                147                 :                :      */
                                148                 :              4 :     output_buffer_bound = output_buffer_bound + BLCKSZ -
                                149                 :                :         (output_buffer_bound % BLCKSZ);
                                150                 :                : 
                                151                 :              4 :     bbsink_begin_backup(sink->bbs_next, sink->bbs_state, output_buffer_bound);
                                152                 :              4 : }
                                153                 :                : 
                                154                 :                : /*
                                155                 :                :  * Prepare to compress the next archive.
                                156                 :                :  */
                                157                 :                : static void
                                158                 :              4 : bbsink_zstd_begin_archive(bbsink *sink, const char *archive_name)
                                159                 :                : {
                                160                 :              4 :     bbsink_zstd *mysink = (bbsink_zstd *) sink;
                                161                 :                :     char       *zstd_archive_name;
                                162                 :                : 
                                163                 :                :     /*
                                164                 :                :      * At the start of each archive we reset the state to start a new
                                165                 :                :      * compression operation. The parameters are sticky and they will stick
                                166                 :                :      * around as we are resetting with option ZSTD_reset_session_only.
                                167                 :                :      */
                                168                 :              4 :     ZSTD_CCtx_reset(mysink->cctx, ZSTD_reset_session_only);
                                169                 :                : 
                                170                 :              4 :     mysink->zstd_outBuf.dst = mysink->base.bbs_next->bbs_buffer;
                                171                 :              4 :     mysink->zstd_outBuf.size = mysink->base.bbs_next->bbs_buffer_length;
                                172                 :              4 :     mysink->zstd_outBuf.pos = 0;
                                173                 :                : 
                                174                 :                :     /* Add ".zst" to the archive name. */
                                175                 :              4 :     zstd_archive_name = psprintf("%s.zst", archive_name);
                                176         [ -  + ]:              4 :     Assert(sink->bbs_next != NULL);
                                177                 :              4 :     bbsink_begin_archive(sink->bbs_next, zstd_archive_name);
                                178                 :              4 :     pfree(zstd_archive_name);
                                179                 :              4 : }
                                180                 :                : 
                                181                 :                : /*
                                182                 :                :  * Compress the input data to the output buffer until we run out of input
                                183                 :                :  * data. Each time the output buffer falls below the compression bound for
                                184                 :                :  * the input buffer, invoke the archive_contents() method for the next sink.
                                185                 :                :  *
                                186                 :                :  * Note that since we're compressing the input, it may very commonly happen
                                187                 :                :  * that we consume all the input data without filling the output buffer. In
                                188                 :                :  * that case, the compressed representation of the current input data won't
                                189                 :                :  * actually be sent to the next bbsink until a later call to this function,
                                190                 :                :  * or perhaps even not until bbsink_zstd_end_archive() is invoked.
                                191                 :                :  */
                                192                 :                : static void
                                193                 :          10790 : bbsink_zstd_archive_contents(bbsink *sink, size_t len)
                                194                 :                : {
                                195                 :          10790 :     bbsink_zstd *mysink = (bbsink_zstd *) sink;
                                196                 :          10790 :     ZSTD_inBuffer inBuf = {mysink->base.bbs_buffer, len, 0};
                                197                 :                : 
                                198         [ +  + ]:          21611 :     while (inBuf.pos < inBuf.size)
                                199                 :                :     {
                                200                 :                :         size_t      yet_to_flush;
                                201                 :          10821 :         size_t      max_needed = ZSTD_compressBound(inBuf.size - inBuf.pos);
                                202                 :                : 
                                203                 :                :         /*
                                204                 :                :          * If the out buffer is not left with enough space, send the output
                                205                 :                :          * buffer to the next sink, and reset it.
                                206                 :                :          */
                                207         [ +  + ]:          10821 :         if (mysink->zstd_outBuf.size - mysink->zstd_outBuf.pos < max_needed)
                                208                 :                :         {
                                209                 :            391 :             bbsink_archive_contents(mysink->base.bbs_next,
                                210                 :                :                                     mysink->zstd_outBuf.pos);
                                211                 :            391 :             mysink->zstd_outBuf.dst = mysink->base.bbs_next->bbs_buffer;
                                212                 :            391 :             mysink->zstd_outBuf.size =
                                213                 :            391 :                 mysink->base.bbs_next->bbs_buffer_length;
                                214                 :            391 :             mysink->zstd_outBuf.pos = 0;
                                215                 :                :         }
                                216                 :                : 
                                217                 :          10821 :         yet_to_flush = ZSTD_compressStream2(mysink->cctx, &mysink->zstd_outBuf,
                                218                 :                :                                             &inBuf, ZSTD_e_continue);
                                219                 :                : 
                                220         [ -  + ]:          10821 :         if (ZSTD_isError(yet_to_flush))
  769 rhaas@postgresql.org      221         [ #  # ]:UBC           0 :             elog(ERROR,
                                222                 :                :                  "could not compress data: %s",
                                223                 :                :                  ZSTD_getErrorName(yet_to_flush));
                                224                 :                :     }
  769 rhaas@postgresql.org      225                 :CBC       10790 : }
                                226                 :                : 
                                227                 :                : /*
                                228                 :                :  * There might be some data inside zstd's internal buffers; we need to get that
                                229                 :                :  * flushed out, also end the zstd frame and then get that forwarded to the
                                230                 :                :  * successor sink as archive content.
                                231                 :                :  *
                                232                 :                :  * Then we can end processing for this archive.
                                233                 :                :  */
                                234                 :                : static void
                                235                 :              4 : bbsink_zstd_end_archive(bbsink *sink)
                                236                 :                : {
                                237                 :              4 :     bbsink_zstd *mysink = (bbsink_zstd *) sink;
                                238                 :                :     size_t      yet_to_flush;
                                239                 :                : 
                                240                 :                :     do
                                241                 :                :     {
                                242                 :              4 :         ZSTD_inBuffer in = {NULL, 0, 0};
                                243                 :              4 :         size_t      max_needed = ZSTD_compressBound(0);
                                244                 :                : 
                                245                 :                :         /*
                                246                 :                :          * If the out buffer is not left with enough space, send the output
                                247                 :                :          * buffer to the next sink, and reset it.
                                248                 :                :          */
                                249         [ -  + ]:              4 :         if (mysink->zstd_outBuf.size - mysink->zstd_outBuf.pos < max_needed)
                                250                 :                :         {
  769 rhaas@postgresql.org      251                 :LBC        (20) :             bbsink_archive_contents(mysink->base.bbs_next,
                                252                 :                :                                     mysink->zstd_outBuf.pos);
                                253                 :           (20) :             mysink->zstd_outBuf.dst = mysink->base.bbs_next->bbs_buffer;
                                254                 :           (20) :             mysink->zstd_outBuf.size =
                                255                 :           (20) :                 mysink->base.bbs_next->bbs_buffer_length;
                                256                 :           (20) :             mysink->zstd_outBuf.pos = 0;
                                257                 :                :         }
                                258                 :                : 
  769 rhaas@postgresql.org      259                 :CBC           4 :         yet_to_flush = ZSTD_compressStream2(mysink->cctx,
                                260                 :                :                                             &mysink->zstd_outBuf,
                                261                 :                :                                             &in, ZSTD_e_end);
                                262                 :                : 
                                263         [ -  + ]:              4 :         if (ZSTD_isError(yet_to_flush))
  769 rhaas@postgresql.org      264         [ #  # ]:UBC           0 :             elog(ERROR, "could not compress data: %s",
                                265                 :                :                  ZSTD_getErrorName(yet_to_flush));
                                266                 :                : 
  769 rhaas@postgresql.org      267         [ -  + ]:CBC           4 :     } while (yet_to_flush > 0);
                                268                 :                : 
                                269                 :                :     /* Make sure to pass any remaining bytes to the next sink. */
                                270         [ +  - ]:              4 :     if (mysink->zstd_outBuf.pos > 0)
                                271                 :              4 :         bbsink_archive_contents(mysink->base.bbs_next,
                                272                 :                :                                 mysink->zstd_outBuf.pos);
                                273                 :                : 
                                274                 :                :     /* Pass on the information that this archive has ended. */
                                275                 :              4 :     bbsink_forward_end_archive(sink);
                                276                 :              4 : }
                                277                 :                : 
                                278                 :                : /*
                                279                 :                :  * Free the resources and context.
                                280                 :                :  */
                                281                 :                : static void
                                282                 :              4 : bbsink_zstd_end_backup(bbsink *sink, XLogRecPtr endptr,
                                283                 :                :                        TimeLineID endtli)
                                284                 :                : {
                                285                 :              4 :     bbsink_zstd *mysink = (bbsink_zstd *) sink;
                                286                 :                : 
                                287                 :                :     /* Release the context. */
                                288         [ +  - ]:              4 :     if (mysink->cctx)
                                289                 :                :     {
                                290                 :              4 :         ZSTD_freeCCtx(mysink->cctx);
                                291                 :              4 :         mysink->cctx = NULL;
                                292                 :                :     }
                                293                 :                : 
                                294                 :              4 :     bbsink_forward_end_backup(sink, endptr, endtli);
                                295                 :              4 : }
                                296                 :                : 
                                297                 :                : /*
                                298                 :                :  * Manifest contents are not compressed, but we do need to copy them into
                                299                 :                :  * the successor sink's buffer, because we have our own.
                                300                 :                :  */
                                301                 :                : static void
                                302                 :             20 : bbsink_zstd_manifest_contents(bbsink *sink, size_t len)
                                303                 :                : {
                                304                 :             20 :     memcpy(sink->bbs_next->bbs_buffer, sink->bbs_buffer, len);
                                305                 :             20 :     bbsink_manifest_contents(sink->bbs_next, len);
                                306                 :             20 : }
                                307                 :                : 
                                308                 :                : /*
                                309                 :                :  * In case the backup fails, make sure we free any compression context that
                                310                 :                :  * got allocated, so that we don't leak memory.
                                311                 :                :  */
                                312                 :                : static void
                                313                 :              4 : bbsink_zstd_cleanup(bbsink *sink)
                                314                 :                : {
                                315                 :              4 :     bbsink_zstd *mysink = (bbsink_zstd *) sink;
                                316                 :                : 
                                317                 :                :     /* Release the context if not already released. */
                                318         [ -  + ]:              4 :     if (mysink->cctx)
                                319                 :                :     {
  769 rhaas@postgresql.org      320                 :UBC           0 :         ZSTD_freeCCtx(mysink->cctx);
                                321                 :              0 :         mysink->cctx = NULL;
                                322                 :                :     }
  769 rhaas@postgresql.org      323                 :CBC           4 : }
                                324                 :                : 
                                325                 :                : #endif
        

Generated by: LCOV version 2.1-beta2-3-g6141622