LCOV - differential code coverage report
Current view: top level - src/backend/backup - basebackup_zstd.c (source / functions) Coverage Total Hit UNC LBC UIC UBC GBC GIC GNC CBC EUB ECB
Current: Differential Code Coverage HEAD vs 15 Lines: 85.7 % 91 78 1 3 6 3 3 37 4 34 7 41
Current Date: 2023-04-08 15:15:32 Functions: 100.0 % 8 8 6 1 1 6
Baseline: 15
Baseline Date: 2023-04-08 15:09:40
Legend: Lines: hit not hit

           TLA  Line data    Source code
       1                 : /*-------------------------------------------------------------------------
       2                 :  *
       3                 :  * basebackup_zstd.c
       4                 :  *    Basebackup sink implementing zstd compression.
       5                 :  *
       6                 :  * Portions Copyright (c) 2010-2023, PostgreSQL Global Development Group
       7                 :  *
       8                 :  * IDENTIFICATION
       9                 :  *    src/backend/backup/basebackup_zstd.c
      10                 :  *
      11                 :  *-------------------------------------------------------------------------
      12                 :  */
      13                 : #include "postgres.h"
      14                 : 
      15                 : #ifdef USE_ZSTD
      16                 : #include <zstd.h>
      17                 : #endif
      18                 : 
      19                 : #include "backup/basebackup_sink.h"
      20                 : 
      21                 : #ifdef USE_ZSTD
      22                 : 
      23                 : typedef struct bbsink_zstd
      24                 : {
      25                 :     /* Common information for all types of sink. */
      26                 :     bbsink      base;
      27                 : 
      28                 :     /* Compression options */
      29                 :     pg_compress_specification *compress;
      30                 : 
      31                 :     ZSTD_CCtx  *cctx;
      32                 :     ZSTD_outBuffer zstd_outBuf;
      33                 : } bbsink_zstd;
      34                 : 
      35                 : static void bbsink_zstd_begin_backup(bbsink *sink);
      36                 : static void bbsink_zstd_begin_archive(bbsink *sink, const char *archive_name);
      37                 : static void bbsink_zstd_archive_contents(bbsink *sink, size_t avail_in);
      38                 : static void bbsink_zstd_manifest_contents(bbsink *sink, size_t len);
      39                 : static void bbsink_zstd_end_archive(bbsink *sink);
      40                 : static void bbsink_zstd_cleanup(bbsink *sink);
      41                 : static void bbsink_zstd_end_backup(bbsink *sink, XLogRecPtr endptr,
      42                 :                                    TimeLineID endtli);
      43                 : 
      44                 : static const bbsink_ops bbsink_zstd_ops = {
      45                 :     .begin_backup = bbsink_zstd_begin_backup,
      46                 :     .begin_archive = bbsink_zstd_begin_archive,
      47                 :     .archive_contents = bbsink_zstd_archive_contents,
      48                 :     .end_archive = bbsink_zstd_end_archive,
      49                 :     .begin_manifest = bbsink_forward_begin_manifest,
      50                 :     .manifest_contents = bbsink_zstd_manifest_contents,
      51                 :     .end_manifest = bbsink_forward_end_manifest,
      52                 :     .end_backup = bbsink_zstd_end_backup,
      53                 :     .cleanup = bbsink_zstd_cleanup
      54                 : };
      55                 : #endif
      56                 : 
      57                 : /*
      58                 :  * Create a new basebackup sink that performs zstd compression.
      59                 :  */
      60                 : bbsink *
      61 CBC           4 : bbsink_zstd_new(bbsink *next, pg_compress_specification *compress)
      62                 : {
      63                 : #ifndef USE_ZSTD
      64                 :     ereport(ERROR,
      65                 :             (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
      66                 :              errmsg("zstd compression is not supported by this build")));
      67                 :     return NULL;                /* keep compiler quiet */
      68                 : #else
      69                 :     bbsink_zstd *sink;
      70                 : 
      71               4 :     Assert(next != NULL);
      72                 : 
      73               4 :     sink = palloc0(sizeof(bbsink_zstd));
      74               4 :     *((const bbsink_ops **) &sink->base.bbs_ops) = &bbsink_zstd_ops;
      75               4 :     sink->base.bbs_next = next;
      76               4 :     sink->compress = compress;
      77                 : 
      78               4 :     return &sink->base;
      79                 : #endif
      80                 : }
      81                 : 
      82                 : #ifdef USE_ZSTD
      83                 : 
      84                 : /*
      85                 :  * Begin backup.
      86                 :  */
      87                 : static void
      88               4 : bbsink_zstd_begin_backup(bbsink *sink)
      89                 : {
      90               4 :     bbsink_zstd *mysink = (bbsink_zstd *) sink;
      91                 :     size_t      output_buffer_bound;
      92                 :     size_t      ret;
      93               4 :     pg_compress_specification *compress = mysink->compress;
      94                 : 
      95               4 :     mysink->cctx = ZSTD_createCCtx();
      96               4 :     if (!mysink->cctx)
      97 UBC           0 :         elog(ERROR, "could not create zstd compression context");
      98                 : 
      99 CBC           4 :     ret = ZSTD_CCtx_setParameter(mysink->cctx, ZSTD_c_compressionLevel,
     100                 :                                  compress->level);
     101               4 :     if (ZSTD_isError(ret))
     102 UBC           0 :         elog(ERROR, "could not set zstd compression level to %d: %s",
     103                 :              compress->level, ZSTD_getErrorName(ret));
     104                 : 
     105 CBC           4 :     if ((compress->options & PG_COMPRESSION_OPTION_WORKERS) != 0)
     106                 :     {
     107                 :         /*
     108                 :          * On older versions of libzstd, this option does not exist, and
     109                 :          * trying to set it will fail. Similarly for newer versions if they
     110                 :          * are compiled without threading support.
     111                 :          */
     112               1 :         ret = ZSTD_CCtx_setParameter(mysink->cctx, ZSTD_c_nbWorkers,
     113                 :                                      compress->workers);
     114               1 :         if (ZSTD_isError(ret))
     115 UBC           0 :             ereport(ERROR,
     116                 :                     errcode(ERRCODE_INVALID_PARAMETER_VALUE),
     117                 :                     errmsg("could not set compression worker count to %d: %s",
     118                 :                            compress->workers, ZSTD_getErrorName(ret)));
     119                 :     }
     120                 : 
     121 GNC           4 :     if ((compress->options & PG_COMPRESSION_OPTION_LONG_DISTANCE) != 0)
     122                 :     {
     123               1 :         ret = ZSTD_CCtx_setParameter(mysink->cctx,
     124                 :                                      ZSTD_c_enableLongDistanceMatching,
     125               1 :                                      compress->long_distance);
     126               1 :         if (ZSTD_isError(ret))
     127 UNC           0 :             ereport(ERROR,
     128                 :                     errcode(ERRCODE_INVALID_PARAMETER_VALUE),
     129                 :                     errmsg("could not set compression flag for %s: %s",
     130                 :                            "long", ZSTD_getErrorName(ret)));
     131                 :     }
     132                 : 
     133 ECB             :     /*
     134                 :      * We need our own buffer, because we're going to pass different data to
     135                 :      * the next sink than what gets passed to us.
     136                 :      */
     137 CBC           4 :     mysink->base.bbs_buffer = palloc(mysink->base.bbs_buffer_length);
     138 ECB             : 
     139 EUB             :     /*
     140                 :      * Make sure that the next sink's bbs_buffer is big enough to accommodate
     141                 :      * the compressed input buffer.
     142                 :      */
     143 GIC           4 :     output_buffer_bound = ZSTD_compressBound(mysink->base.bbs_buffer_length);
     144                 : 
     145                 :     /*
     146                 :      * The buffer length is expected to be a multiple of BLCKSZ, so round up.
     147                 :      */
     148               4 :     output_buffer_bound = output_buffer_bound + BLCKSZ -
     149 ECB             :         (output_buffer_bound % BLCKSZ);
     150                 : 
     151 GIC           4 :     bbsink_begin_backup(sink->bbs_next, sink->bbs_state, output_buffer_bound);
     152               4 : }
     153                 : 
     154                 : /*
     155 ECB             :  * Prepare to compress the next archive.
     156                 :  */
     157                 : static void
     158 GIC           4 : bbsink_zstd_begin_archive(bbsink *sink, const char *archive_name)
     159                 : {
     160 CBC           4 :     bbsink_zstd *mysink = (bbsink_zstd *) sink;
     161                 :     char       *zstd_archive_name;
     162                 : 
     163 ECB             :     /*
     164                 :      * At the start of each archive we reset the state to start a new
     165                 :      * compression operation. The parameters are sticky and they will stick
     166                 :      * around as we are resetting with option ZSTD_reset_session_only.
     167                 :      */
     168 GIC           4 :     ZSTD_CCtx_reset(mysink->cctx, ZSTD_reset_session_only);
     169                 : 
     170 CBC           4 :     mysink->zstd_outBuf.dst = mysink->base.bbs_next->bbs_buffer;
     171 GIC           4 :     mysink->zstd_outBuf.size = mysink->base.bbs_next->bbs_buffer_length;
     172 CBC           4 :     mysink->zstd_outBuf.pos = 0;
     173                 : 
     174                 :     /* Add ".zst" to the archive name. */
     175 GIC           4 :     zstd_archive_name = psprintf("%s.zst", archive_name);
     176               4 :     Assert(sink->bbs_next != NULL);
     177               4 :     bbsink_begin_archive(sink->bbs_next, zstd_archive_name);
     178               4 :     pfree(zstd_archive_name);
     179               4 : }
     180 ECB             : 
     181                 : /*
     182                 :  * Compress the input data to the output buffer until we run out of input
     183                 :  * data. Each time the output buffer falls below the compression bound for
     184                 :  * the input buffer, invoke the archive_contents() method for the next sink.
     185                 :  *
     186                 :  * Note that since we're compressing the input, it may very commonly happen
     187                 :  * that we consume all the input data without filling the output buffer. In
     188                 :  * that case, the compressed representation of the current input data won't
     189                 :  * actually be sent to the next bbsink until a later call to this function,
     190                 :  * or perhaps even not until bbsink_zstd_end_archive() is invoked.
     191                 :  */
     192                 : static void
     193 GIC       10810 : bbsink_zstd_archive_contents(bbsink *sink, size_t len)
     194                 : {
     195           10810 :     bbsink_zstd *mysink = (bbsink_zstd *) sink;
     196           10810 :     ZSTD_inBuffer inBuf = {mysink->base.bbs_buffer, len, 0};
     197                 : 
     198           21650 :     while (inBuf.pos < inBuf.size)
     199                 :     {
     200                 :         size_t      yet_to_flush;
     201           10840 :         size_t      max_needed = ZSTD_compressBound(inBuf.size - inBuf.pos);
     202                 : 
     203                 :         /*
     204                 :          * If the out buffer is not left with enough space, send the output
     205 ECB             :          * buffer to the next sink, and reset it.
     206                 :          */
     207 CBC       10840 :         if (mysink->zstd_outBuf.size - mysink->zstd_outBuf.pos < max_needed)
     208 ECB             :         {
     209 GIC         410 :             bbsink_archive_contents(mysink->base.bbs_next,
     210 ECB             :                                     mysink->zstd_outBuf.pos);
     211 GIC         410 :             mysink->zstd_outBuf.dst = mysink->base.bbs_next->bbs_buffer;
     212             410 :             mysink->zstd_outBuf.size =
     213 CBC         410 :                 mysink->base.bbs_next->bbs_buffer_length;
     214 GIC         410 :             mysink->zstd_outBuf.pos = 0;
     215                 :         }
     216                 : 
     217           10840 :         yet_to_flush = ZSTD_compressStream2(mysink->cctx, &mysink->zstd_outBuf,
     218                 :                                             &inBuf, ZSTD_e_continue);
     219 ECB             : 
     220 GIC       10840 :         if (ZSTD_isError(yet_to_flush))
     221 LBC           0 :             elog(ERROR,
     222                 :                  "could not compress data: %s",
     223 ECB             :                  ZSTD_getErrorName(yet_to_flush));
     224                 :     }
     225 CBC       10810 : }
     226 ECB             : 
     227                 : /*
     228                 :  * There might be some data inside zstd's internal buffers; we need to get that
     229                 :  * flushed out, also end the zstd frame and then get that forwarded to the
     230                 :  * successor sink as archive content.
     231                 :  *
     232                 :  * Then we can end processing for this archive.
     233 EUB             :  */
     234                 : static void
     235 GIC           4 : bbsink_zstd_end_archive(bbsink *sink)
     236                 : {
     237 CBC           4 :     bbsink_zstd *mysink = (bbsink_zstd *) sink;
     238                 :     size_t      yet_to_flush;
     239                 : 
     240                 :     do
     241                 :     {
     242 GIC           4 :         ZSTD_inBuffer in = {NULL, 0, 0};
     243               4 :         size_t      max_needed = ZSTD_compressBound(0);
     244                 : 
     245                 :         /*
     246                 :          * If the out buffer is not left with enough space, send the output
     247 ECB             :          * buffer to the next sink, and reset it.
     248                 :          */
     249 CBC           4 :         if (mysink->zstd_outBuf.size - mysink->zstd_outBuf.pos < max_needed)
     250                 :         {
     251 UIC           0 :             bbsink_archive_contents(mysink->base.bbs_next,
     252                 :                                     mysink->zstd_outBuf.pos);
     253               0 :             mysink->zstd_outBuf.dst = mysink->base.bbs_next->bbs_buffer;
     254 LBC           0 :             mysink->zstd_outBuf.size =
     255               0 :                 mysink->base.bbs_next->bbs_buffer_length;
     256 UIC           0 :             mysink->zstd_outBuf.pos = 0;
     257                 :         }
     258                 : 
     259 GIC           4 :         yet_to_flush = ZSTD_compressStream2(mysink->cctx,
     260                 :                                             &mysink->zstd_outBuf,
     261 ECB             :                                             &in, ZSTD_e_end);
     262                 : 
     263 GBC           4 :         if (ZSTD_isError(yet_to_flush))
     264 UIC           0 :             elog(ERROR, "could not compress data: %s",
     265 EUB             :                  ZSTD_getErrorName(yet_to_flush));
     266                 : 
     267 GBC           4 :     } while (yet_to_flush > 0);
     268 EUB             : 
     269                 :     /* Make sure to pass any remaining bytes to the next sink. */
     270 GIC           4 :     if (mysink->zstd_outBuf.pos > 0)
     271 CBC           4 :         bbsink_archive_contents(mysink->base.bbs_next,
     272                 :                                 mysink->zstd_outBuf.pos);
     273                 : 
     274                 :     /* Pass on the information that this archive has ended. */
     275               4 :     bbsink_forward_end_archive(sink);
     276 GBC           4 : }
     277                 : 
     278                 : /*
     279 ECB             :  * Free the resources and context.
     280                 :  */
     281                 : static void
     282 CBC           4 : bbsink_zstd_end_backup(bbsink *sink, XLogRecPtr endptr,
     283 ECB             :                        TimeLineID endtli)
     284                 : {
     285 GIC           4 :     bbsink_zstd *mysink = (bbsink_zstd *) sink;
     286                 : 
     287 ECB             :     /* Release the context. */
     288 CBC           4 :     if (mysink->cctx)
     289                 :     {
     290 GIC           4 :         ZSTD_freeCCtx(mysink->cctx);
     291               4 :         mysink->cctx = NULL;
     292                 :     }
     293                 : 
     294 CBC           4 :     bbsink_forward_end_backup(sink, endptr, endtli);
     295 GIC           4 : }
     296                 : 
     297 ECB             : /*
     298                 :  * Manifest contents are not compressed, but we do need to copy them into
     299                 :  * the successor sink's buffer, because we have our own.
     300                 :  */
     301                 : static void
     302 CBC          20 : bbsink_zstd_manifest_contents(bbsink *sink, size_t len)
     303 ECB             : {
     304 GIC          20 :     memcpy(sink->bbs_next->bbs_buffer, sink->bbs_buffer, len);
     305              20 :     bbsink_manifest_contents(sink->bbs_next, len);
     306 CBC          20 : }
     307 ECB             : 
     308                 : /*
     309                 :  * In case the backup fails, make sure we free any compression context that
     310                 :  * got allocated, so that we don't leak memory.
     311                 :  */
     312                 : static void
     313 GIC           4 : bbsink_zstd_cleanup(bbsink *sink)
     314 ECB             : {
     315 GIC           4 :     bbsink_zstd *mysink = (bbsink_zstd *) sink;
     316 ECB             : 
     317                 :     /* Release the context if not already released. */
     318 CBC           4 :     if (mysink->cctx)
     319                 :     {
     320 UIC           0 :         ZSTD_freeCCtx(mysink->cctx);
     321               0 :         mysink->cctx = NULL;
     322                 :     }
     323 GIC           4 : }
     324                 : 
     325 ECB             : #endif
        

Generated by: LCOV version v1.16-55-g56c0a2a