LCOV - differential code coverage report
Current view: top level - src/backend/backup - basebackup_zstd.c (source / functions) Coverage Total Hit UNC LBC UIC UBC GBC GIC GNC CBC EUB ECB
Current: Differential Code Coverage HEAD vs 15 Lines: 85.7 % 91 78 1 3 6 3 3 37 4 34 7 41
Current Date: 2023-04-08 17:13:01 Functions: 100.0 % 8 8 6 1 1 6
Baseline: 15 Line coverage date bins:
Baseline Date: 2023-04-08 15:09:40 [..60] days: 80.0 % 5 4 1 4
Legend: Lines: hit not hit (180,240] days: 66.7 % 3 2 1 2
(240..) days: 86.7 % 83 72 3 6 2 3 37 32 5 38
Function coverage date bins:
(240..) days: 57.1 % 14 8 6 1 1 6

 Age         Owner                  TLA  Line data    Source code
                                  1                 : /*-------------------------------------------------------------------------
                                  2                 :  *
                                  3                 :  * basebackup_zstd.c
                                  4                 :  *    Basebackup sink implementing zstd compression.
                                  5                 :  *
                                  6                 :  * Portions Copyright (c) 2010-2023, PostgreSQL Global Development Group
                                  7                 :  *
                                  8                 :  * IDENTIFICATION
                                  9                 :  *    src/backend/backup/basebackup_zstd.c
                                 10                 :  *
                                 11                 :  *-------------------------------------------------------------------------
                                 12                 :  */
                                 13                 : #include "postgres.h"
                                 14                 : 
                                 15                 : #ifdef USE_ZSTD
                                 16                 : #include <zstd.h>
                                 17                 : #endif
                                 18                 : 
                                 19                 : #include "backup/basebackup_sink.h"
                                 20                 : 
                                 21                 : #ifdef USE_ZSTD
                                 22                 : 
                                 23                 : typedef struct bbsink_zstd
                                 24                 : {
                                 25                 :     /* Common information for all types of sink. */
                                 26                 :     bbsink      base;
                                 27                 : 
                                 28                 :     /* Compression options */
                                 29                 :     pg_compress_specification *compress;
                                 30                 : 
                                 31                 :     ZSTD_CCtx  *cctx;
                                 32                 :     ZSTD_outBuffer zstd_outBuf;
                                 33                 : } bbsink_zstd;
                                 34                 : 
                                 35                 : static void bbsink_zstd_begin_backup(bbsink *sink);
                                 36                 : static void bbsink_zstd_begin_archive(bbsink *sink, const char *archive_name);
                                 37                 : static void bbsink_zstd_archive_contents(bbsink *sink, size_t avail_in);
                                 38                 : static void bbsink_zstd_manifest_contents(bbsink *sink, size_t len);
                                 39                 : static void bbsink_zstd_end_archive(bbsink *sink);
                                 40                 : static void bbsink_zstd_cleanup(bbsink *sink);
                                 41                 : static void bbsink_zstd_end_backup(bbsink *sink, XLogRecPtr endptr,
                                 42                 :                                    TimeLineID endtli);
                                 43                 : 
                                 44                 : static const bbsink_ops bbsink_zstd_ops = {
                                 45                 :     .begin_backup = bbsink_zstd_begin_backup,
                                 46                 :     .begin_archive = bbsink_zstd_begin_archive,
                                 47                 :     .archive_contents = bbsink_zstd_archive_contents,
                                 48                 :     .end_archive = bbsink_zstd_end_archive,
                                 49                 :     .begin_manifest = bbsink_forward_begin_manifest,
                                 50                 :     .manifest_contents = bbsink_zstd_manifest_contents,
                                 51                 :     .end_manifest = bbsink_forward_end_manifest,
                                 52                 :     .end_backup = bbsink_zstd_end_backup,
                                 53                 :     .cleanup = bbsink_zstd_cleanup
                                 54                 : };
                                 55                 : #endif
                                 56                 : 
                                 57                 : /*
                                 58                 :  * Create a new basebackup sink that performs zstd compression.
                                 59                 :  */
                                 60                 : bbsink *
  362 michael                    61 CBC           4 : bbsink_zstd_new(bbsink *next, pg_compress_specification *compress)
                                 62                 : {
                                 63                 : #ifndef USE_ZSTD
                                 64                 :     ereport(ERROR,
                                 65                 :             (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
                                 66                 :              errmsg("zstd compression is not supported by this build")));
                                 67                 :     return NULL;                /* keep compiler quiet */
                                 68                 : #else
                                 69                 :     bbsink_zstd *sink;
                                 70                 : 
  398 rhaas                      71               4 :     Assert(next != NULL);
                                 72                 : 
                                 73               4 :     sink = palloc0(sizeof(bbsink_zstd));
                                 74               4 :     *((const bbsink_ops **) &sink->base.bbs_ops) = &bbsink_zstd_ops;
                                 75               4 :     sink->base.bbs_next = next;
  375                            76               4 :     sink->compress = compress;
                                 77                 : 
  398                            78               4 :     return &sink->base;
                                 79                 : #endif
                                 80                 : }
                                 81                 : 
                                 82                 : #ifdef USE_ZSTD
                                 83                 : 
                                 84                 : /*
                                 85                 :  * Begin backup.
                                 86                 :  */
                                 87                 : static void
                                 88               4 : bbsink_zstd_begin_backup(bbsink *sink)
                                 89                 : {
                                 90               4 :     bbsink_zstd *mysink = (bbsink_zstd *) sink;
                                 91                 :     size_t      output_buffer_bound;
                                 92                 :     size_t      ret;
  362 michael                    93               4 :     pg_compress_specification *compress = mysink->compress;
                                 94                 : 
  398 rhaas                      95               4 :     mysink->cctx = ZSTD_createCCtx();
                                 96               4 :     if (!mysink->cctx)
  398 rhaas                      97 UBC           0 :         elog(ERROR, "could not create zstd compression context");
                                 98                 : 
  207 michael                    99 CBC           4 :     ret = ZSTD_CCtx_setParameter(mysink->cctx, ZSTD_c_compressionLevel,
                                100                 :                                  compress->level);
                                101               4 :     if (ZSTD_isError(ret))
  207 michael                   102 UBC           0 :         elog(ERROR, "could not set zstd compression level to %d: %s",
                                103                 :              compress->level, ZSTD_getErrorName(ret));
                                104                 : 
  362 michael                   105 CBC           4 :     if ((compress->options & PG_COMPRESSION_OPTION_WORKERS) != 0)
                                106                 :     {
                                107                 :         /*
                                108                 :          * On older versions of libzstd, this option does not exist, and
                                109                 :          * trying to set it will fail. Similarly for newer versions if they
                                110                 :          * are compiled without threading support.
                                111                 :          */
  375 rhaas                     112               1 :         ret = ZSTD_CCtx_setParameter(mysink->cctx, ZSTD_c_nbWorkers,
                                113                 :                                      compress->workers);
                                114               1 :         if (ZSTD_isError(ret))
  375 rhaas                     115 UBC           0 :             ereport(ERROR,
                                116                 :                     errcode(ERRCODE_INVALID_PARAMETER_VALUE),
                                117                 :                     errmsg("could not set compression worker count to %d: %s",
                                118                 :                            compress->workers, ZSTD_getErrorName(ret)));
                                119                 :     }
                                120                 : 
    3 tomas.vondra              121 GNC           4 :     if ((compress->options & PG_COMPRESSION_OPTION_LONG_DISTANCE) != 0)
                                122                 :     {
                                123               1 :         ret = ZSTD_CCtx_setParameter(mysink->cctx,
                                124                 :                                      ZSTD_c_enableLongDistanceMatching,
                                125               1 :                                      compress->long_distance);
                                126               1 :         if (ZSTD_isError(ret))
    3 tomas.vondra              127 UNC           0 :             ereport(ERROR,
                                128                 :                     errcode(ERRCODE_INVALID_PARAMETER_VALUE),
                                129                 :                     errmsg("could not set compression flag for %s: %s",
                                130                 :                            "long", ZSTD_getErrorName(ret)));
                                131                 :     }
                                132                 : 
  398 rhaas                     133 ECB             :     /*
                                134                 :      * We need our own buffer, because we're going to pass different data to
                                135                 :      * the next sink than what gets passed to us.
                                136                 :      */
  398 rhaas                     137 CBC           4 :     mysink->base.bbs_buffer = palloc(mysink->base.bbs_buffer_length);
  398 rhaas                     138 ECB             : 
  398 rhaas                     139 EUB             :     /*
                                140                 :      * Make sure that the next sink's bbs_buffer is big enough to accommodate
                                141                 :      * the compressed input buffer.
                                142                 :      */
  398 rhaas                     143 GIC           4 :     output_buffer_bound = ZSTD_compressBound(mysink->base.bbs_buffer_length);
                                144                 : 
                                145                 :     /*
                                146                 :      * The buffer length is expected to be a multiple of BLCKSZ, so round up.
                                147                 :      */
                                148               4 :     output_buffer_bound = output_buffer_bound + BLCKSZ -
  398 rhaas                     149 ECB             :         (output_buffer_bound % BLCKSZ);
                                150                 : 
  398 rhaas                     151 GIC           4 :     bbsink_begin_backup(sink->bbs_next, sink->bbs_state, output_buffer_bound);
                                152               4 : }
                                153                 : 
                                154                 : /*
  398 rhaas                     155 ECB             :  * Prepare to compress the next archive.
                                156                 :  */
                                157                 : static void
  398 rhaas                     158 GIC           4 : bbsink_zstd_begin_archive(bbsink *sink, const char *archive_name)
                                159                 : {
  398 rhaas                     160 CBC           4 :     bbsink_zstd *mysink = (bbsink_zstd *) sink;
                                161                 :     char       *zstd_archive_name;
                                162                 : 
  398 rhaas                     163 ECB             :     /*
                                164                 :      * At the start of each archive we reset the state to start a new
                                165                 :      * compression operation. The parameters are sticky and they will stick
                                166                 :      * around as we are resetting with option ZSTD_reset_session_only.
                                167                 :      */
  398 rhaas                     168 GIC           4 :     ZSTD_CCtx_reset(mysink->cctx, ZSTD_reset_session_only);
                                169                 : 
  398 rhaas                     170 CBC           4 :     mysink->zstd_outBuf.dst = mysink->base.bbs_next->bbs_buffer;
  398 rhaas                     171 GIC           4 :     mysink->zstd_outBuf.size = mysink->base.bbs_next->bbs_buffer_length;
  398 rhaas                     172 CBC           4 :     mysink->zstd_outBuf.pos = 0;
                                173                 : 
                                174                 :     /* Add ".zst" to the archive name. */
  398 rhaas                     175 GIC           4 :     zstd_archive_name = psprintf("%s.zst", archive_name);
                                176               4 :     Assert(sink->bbs_next != NULL);
                                177               4 :     bbsink_begin_archive(sink->bbs_next, zstd_archive_name);
                                178               4 :     pfree(zstd_archive_name);
                                179               4 : }
  398 rhaas                     180 ECB             : 
                                181                 : /*
                                182                 :  * Compress the input data to the output buffer until we run out of input
                                183                 :  * data. Each time the output buffer falls below the compression bound for
                                184                 :  * the input buffer, invoke the archive_contents() method for the next sink.
                                185                 :  *
                                186                 :  * Note that since we're compressing the input, it may very commonly happen
                                187                 :  * that we consume all the input data without filling the output buffer. In
                                188                 :  * that case, the compressed representation of the current input data won't
                                189                 :  * actually be sent to the next bbsink until a later call to this function,
                                190                 :  * or perhaps even not until bbsink_zstd_end_archive() is invoked.
                                191                 :  */
                                192                 : static void
  398 rhaas                     193 GIC       10810 : bbsink_zstd_archive_contents(bbsink *sink, size_t len)
                                194                 : {
                                195           10810 :     bbsink_zstd *mysink = (bbsink_zstd *) sink;
                                196           10810 :     ZSTD_inBuffer inBuf = {mysink->base.bbs_buffer, len, 0};
                                197                 : 
                                198           21650 :     while (inBuf.pos < inBuf.size)
                                199                 :     {
                                200                 :         size_t      yet_to_flush;
                                201           10840 :         size_t      max_needed = ZSTD_compressBound(inBuf.size - inBuf.pos);
                                202                 : 
                                203                 :         /*
                                204                 :          * If the out buffer is not left with enough space, send the output
  398 rhaas                     205 ECB             :          * buffer to the next sink, and reset it.
                                206                 :          */
  398 rhaas                     207 CBC       10840 :         if (mysink->zstd_outBuf.size - mysink->zstd_outBuf.pos < max_needed)
  398 rhaas                     208 ECB             :         {
  398 rhaas                     209 GIC         410 :             bbsink_archive_contents(mysink->base.bbs_next,
  398 rhaas                     210 ECB             :                                     mysink->zstd_outBuf.pos);
  398 rhaas                     211 GIC         410 :             mysink->zstd_outBuf.dst = mysink->base.bbs_next->bbs_buffer;
                                212             410 :             mysink->zstd_outBuf.size =
  398 rhaas                     213 CBC         410 :                 mysink->base.bbs_next->bbs_buffer_length;
  398 rhaas                     214 GIC         410 :             mysink->zstd_outBuf.pos = 0;
                                215                 :         }
                                216                 : 
                                217           10840 :         yet_to_flush = ZSTD_compressStream2(mysink->cctx, &mysink->zstd_outBuf,
                                218                 :                                             &inBuf, ZSTD_e_continue);
  398 rhaas                     219 ECB             : 
  398 rhaas                     220 GIC       10840 :         if (ZSTD_isError(yet_to_flush))
  398 rhaas                     221 LBC           0 :             elog(ERROR,
                                222                 :                  "could not compress data: %s",
  398 rhaas                     223 ECB             :                  ZSTD_getErrorName(yet_to_flush));
                                224                 :     }
  398 rhaas                     225 CBC       10810 : }
  398 rhaas                     226 ECB             : 
                                227                 : /*
                                228                 :  * There might be some data inside zstd's internal buffers; we need to get that
                                229                 :  * flushed out, also end the zstd frame and then get that forwarded to the
                                230                 :  * successor sink as archive content.
                                231                 :  *
                                232                 :  * Then we can end processing for this archive.
  398 rhaas                     233 EUB             :  */
                                234                 : static void
  398 rhaas                     235 GIC           4 : bbsink_zstd_end_archive(bbsink *sink)
                                236                 : {
  398 rhaas                     237 CBC           4 :     bbsink_zstd *mysink = (bbsink_zstd *) sink;
                                238                 :     size_t      yet_to_flush;
                                239                 : 
                                240                 :     do
                                241                 :     {
  398 rhaas                     242 GIC           4 :         ZSTD_inBuffer in = {NULL, 0, 0};
                                243               4 :         size_t      max_needed = ZSTD_compressBound(0);
                                244                 : 
                                245                 :         /*
                                246                 :          * If the out buffer is not left with enough space, send the output
  398 rhaas                     247 ECB             :          * buffer to the next sink, and reset it.
                                248                 :          */
  398 rhaas                     249 CBC           4 :         if (mysink->zstd_outBuf.size - mysink->zstd_outBuf.pos < max_needed)
                                250                 :         {
  398 rhaas                     251 UIC           0 :             bbsink_archive_contents(mysink->base.bbs_next,
                                252                 :                                     mysink->zstd_outBuf.pos);
                                253               0 :             mysink->zstd_outBuf.dst = mysink->base.bbs_next->bbs_buffer;
  398 rhaas                     254 LBC           0 :             mysink->zstd_outBuf.size =
                                255               0 :                 mysink->base.bbs_next->bbs_buffer_length;
  398 rhaas                     256 UIC           0 :             mysink->zstd_outBuf.pos = 0;
                                257                 :         }
                                258                 : 
  398 rhaas                     259 GIC           4 :         yet_to_flush = ZSTD_compressStream2(mysink->cctx,
                                260                 :                                             &mysink->zstd_outBuf,
  398 rhaas                     261 ECB             :                                             &in, ZSTD_e_end);
                                262                 : 
  398 rhaas                     263 GBC           4 :         if (ZSTD_isError(yet_to_flush))
  398 rhaas                     264 UIC           0 :             elog(ERROR, "could not compress data: %s",
  398 rhaas                     265 EUB             :                  ZSTD_getErrorName(yet_to_flush));
                                266                 : 
  398 rhaas                     267 GBC           4 :     } while (yet_to_flush > 0);
  398 rhaas                     268 EUB             : 
                                269                 :     /* Make sure to pass any remaining bytes to the next sink. */
  398 rhaas                     270 GIC           4 :     if (mysink->zstd_outBuf.pos > 0)
  398 rhaas                     271 CBC           4 :         bbsink_archive_contents(mysink->base.bbs_next,
                                272                 :                                 mysink->zstd_outBuf.pos);
                                273                 : 
                                274                 :     /* Pass on the information that this archive has ended. */
                                275               4 :     bbsink_forward_end_archive(sink);
  398 rhaas                     276 GBC           4 : }
                                277                 : 
                                278                 : /*
  398 rhaas                     279 ECB             :  * Free the resources and context.
                                280                 :  */
                                281                 : static void
  398 rhaas                     282 CBC           4 : bbsink_zstd_end_backup(bbsink *sink, XLogRecPtr endptr,
  398 rhaas                     283 ECB             :                        TimeLineID endtli)
                                284                 : {
  398 rhaas                     285 GIC           4 :     bbsink_zstd *mysink = (bbsink_zstd *) sink;
                                286                 : 
  398 rhaas                     287 ECB             :     /* Release the context. */
  398 rhaas                     288 CBC           4 :     if (mysink->cctx)
                                289                 :     {
  398 rhaas                     290 GIC           4 :         ZSTD_freeCCtx(mysink->cctx);
                                291               4 :         mysink->cctx = NULL;
                                292                 :     }
                                293                 : 
  398 rhaas                     294 CBC           4 :     bbsink_forward_end_backup(sink, endptr, endtli);
  398 rhaas                     295 GIC           4 : }
                                296                 : 
  398 rhaas                     297 ECB             : /*
                                298                 :  * Manifest contents are not compressed, but we do need to copy them into
                                299                 :  * the successor sink's buffer, because we have our own.
                                300                 :  */
                                301                 : static void
  398 rhaas                     302 CBC          20 : bbsink_zstd_manifest_contents(bbsink *sink, size_t len)
  398 rhaas                     303 ECB             : {
  398 rhaas                     304 GIC          20 :     memcpy(sink->bbs_next->bbs_buffer, sink->bbs_buffer, len);
                                305              20 :     bbsink_manifest_contents(sink->bbs_next, len);
  398 rhaas                     306 CBC          20 : }
  398 rhaas                     307 ECB             : 
                                308                 : /*
                                309                 :  * In case the backup fails, make sure we free any compression context that
                                310                 :  * got allocated, so that we don't leak memory.
                                311                 :  */
                                312                 : static void
  398 rhaas                     313 GIC           4 : bbsink_zstd_cleanup(bbsink *sink)
  398 rhaas                     314 ECB             : {
  398 rhaas                     315 GIC           4 :     bbsink_zstd *mysink = (bbsink_zstd *) sink;
  398 rhaas                     316 ECB             : 
                                317                 :     /* Release the context if not already released. */
  398 rhaas                     318 CBC           4 :     if (mysink->cctx)
                                319                 :     {
  398 rhaas                     320 UIC           0 :         ZSTD_freeCCtx(mysink->cctx);
                                321               0 :         mysink->cctx = NULL;
                                322                 :     }
  398 rhaas                     323 GIC           4 : }
                                324                 : 
  398 rhaas                     325 ECB             : #endif
        

Generated by: LCOV version v1.16-55-g56c0a2a