|            TLA  Line data    Source code 
       1                 : /*-------------------------------------------------------------------------
       2                 :  *
       3                 :  * compress_zstd.c
       4                 :  *   Routines for archivers to write a Zstd compressed data stream.
       5                 :  *
       6                 :  * Portions Copyright (c) 1996-2023, PostgreSQL Global Development Group
       7                 :  * Portions Copyright (c) 1994, Regents of the University of California
       8                 :  *
       9                 :  * IDENTIFICATION
      10                 :  *     src/bin/pg_dump/compress_zstd.c
      11                 :  *
      12                 :  *-------------------------------------------------------------------------
      13                 :  */
      14                 : 
      15                 : #include "postgres_fe.h"
      16                 : 
      17                 : #include "pg_backup_utils.h"
      18                 : #include "compress_zstd.h"
      19                 : 
      20                 : #ifndef USE_ZSTD
      21                 : 
      22                 : void
      23                 : InitCompressorZstd(CompressorState *cs, const pg_compress_specification compression_spec)
      24                 : {
      25                 :     pg_fatal("this build does not support compression with %s", "ZSTD");
      26                 : }
      27                 : 
      28                 : void
      29                 : InitCompressFileHandleZstd(CompressFileHandle *CFH, const pg_compress_specification compression_spec)
      30                 : {
      31                 :     pg_fatal("this build does not support compression with %s", "ZSTD");
      32                 : }
      33                 : 
      34                 : #else
      35                 : 
      36                 : #include <zstd.h>
      37                 : 
      38                 : typedef struct ZstdCompressorState
      39                 : {
      40                 :     /* This is a normal file to which we read/write compressed data */
      41                 :     FILE       *fp;
      42                 : 
      43                 :     ZSTD_CStream *cstream;
      44                 :     ZSTD_DStream *dstream;
      45                 :     ZSTD_outBuffer output;
      46                 :     ZSTD_inBuffer input;
      47                 : 
      48                 :     /* pointer to a static string like from strerror(), for Zstd_write() */
      49                 :     const char *zstderror;
      50                 : } ZstdCompressorState;
      51                 : 
      52                 : static ZSTD_CStream *_ZstdCStreamParams(pg_compress_specification compress);
      53                 : static void EndCompressorZstd(ArchiveHandle *AH, CompressorState *cs);
      54                 : static void WriteDataToArchiveZstd(ArchiveHandle *AH, CompressorState *cs,
      55                 :                                    const void *data, size_t dLen);
      56                 : static void ReadDataFromArchiveZstd(ArchiveHandle *AH, CompressorState *cs);
      57                 : 
      58                 : static void
      59 GNC          54 : _Zstd_CCtx_setParam_or_die(ZSTD_CStream *cstream,
      60                 :                            ZSTD_cParameter param, int value, char *paramname)
      61                 : {
      62                 :     size_t      res;
      63                 : 
      64              54 :     res = ZSTD_CCtx_setParameter(cstream, param, value);
      65              54 :     if (ZSTD_isError(res))
      66 UNC           0 :         pg_fatal("could not set compression parameter: \"%s\": %s",
      67                 :                  paramname, ZSTD_getErrorName(res));
      68 GNC          54 : }
      69                 : 
      70                 : /* Return a compression stream with parameters set per argument */
      71                 : static ZSTD_CStream *
      72              53 : _ZstdCStreamParams(pg_compress_specification compress)
      73                 : {
      74                 :     ZSTD_CStream *cstream;
      75                 : 
      76              53 :     cstream = ZSTD_createCStream();
      77              53 :     if (cstream == NULL)
      78 UNC           0 :         pg_fatal("could not initialize compression library");
      79                 : 
      80 GNC          53 :     _Zstd_CCtx_setParam_or_die(cstream, ZSTD_c_compressionLevel,
      81                 :                                compress.level, "level");
      82                 : 
      83              53 :     if (compress.options & PG_COMPRESSION_OPTION_LONG_DISTANCE)
      84               1 :         _Zstd_CCtx_setParam_or_die(cstream,
      85                 :                                   ZSTD_c_enableLongDistanceMatching,
      86               1 :                                   compress.long_distance, "long");
      87                 : 
      88              53 :     return cstream;
      89                 : }
      90                 : 
      91                 : /* Helper function for WriteDataToArchiveZstd and EndCompressorZstd */
      92                 : static void
      93              80 : _ZstdWriteCommon(ArchiveHandle *AH, CompressorState *cs, bool flush)
      94                 : {
      95              80 :     ZstdCompressorState *zstdcs = (ZstdCompressorState *) cs->private_data;
      96              80 :     ZSTD_inBuffer *input = &zstdcs->input;
      97              80 :     ZSTD_outBuffer *output = &zstdcs->output;
      98                 : 
      99                 :     /* Loop while there's any input or until flushed */
     100              80 :     while (input->pos != input->size || flush)
     101                 :     {
     102                 :         size_t      res;
     103                 : 
     104              80 :         output->pos = 0;
     105              80 :         res = ZSTD_compressStream2(zstdcs->cstream, output,
     106                 :                                    input, flush ? ZSTD_e_end : ZSTD_e_continue);
     107                 : 
     108              80 :         if (ZSTD_isError(res))
     109 UNC           0 :             pg_fatal("could not compress data: %s", ZSTD_getErrorName(res));
     110                 : 
     111                 :         /*
     112                 :          * Extra paranoia: avoid zero-length chunks, since a zero length chunk
     113                 :          * is the EOF marker in the custom format. This should never happen
     114                 :          * but...
     115                 :          */
     116 GNC          80 :         if (output->pos > 0)
     117              26 :             cs->writeF(AH, output->dst, output->pos);
     118                 : 
     119              80 :         if (res == 0)
     120              80 :             break;              /* End of frame or all input consumed */
     121                 :     }
     122              80 : }
     123                 : 
     124                 : static void
     125              52 : EndCompressorZstd(ArchiveHandle *AH, CompressorState *cs)
     126                 : {
     127              52 :     ZstdCompressorState *zstdcs = (ZstdCompressorState *) cs->private_data;
     128                 : 
     129              52 :     if (cs->readF != NULL)
     130                 :     {
     131              26 :         Assert(zstdcs->cstream == NULL);
     132              26 :         ZSTD_freeDStream(zstdcs->dstream);
     133              26 :         pg_free(unconstify(void *, zstdcs->input.src));
     134                 :     }
     135              26 :     else if (cs->writeF != NULL)
     136                 :     {
     137              26 :         Assert(zstdcs->dstream == NULL);
     138              26 :         _ZstdWriteCommon(AH, cs, true);
     139              26 :         ZSTD_freeCStream(zstdcs->cstream);
     140              26 :         pg_free(zstdcs->output.dst);
     141                 :     }
     142                 : 
     143              52 :     pg_free(zstdcs);
     144              52 : }
     145                 : 
     146                 : static void
     147              54 : WriteDataToArchiveZstd(ArchiveHandle *AH, CompressorState *cs,
     148                 :                        const void *data, size_t dLen)
     149                 : {
     150              54 :     ZstdCompressorState *zstdcs = (ZstdCompressorState *) cs->private_data;
     151                 : 
     152              54 :     zstdcs->input.src = data;
     153              54 :     zstdcs->input.size = dLen;
     154              54 :     zstdcs->input.pos = 0;
     155                 : 
     156              54 :     _ZstdWriteCommon(AH, cs, false);
     157              54 : }
     158                 : 
     159                 : static void
     160              26 : ReadDataFromArchiveZstd(ArchiveHandle *AH, CompressorState *cs)
     161                 : {
     162              26 :     ZstdCompressorState *zstdcs = (ZstdCompressorState *) cs->private_data;
     163              26 :     ZSTD_outBuffer *output = &zstdcs->output;
     164              26 :     ZSTD_inBuffer *input = &zstdcs->input;
     165              26 :     size_t      input_allocated_size = ZSTD_DStreamInSize();
     166                 :     size_t      res;
     167                 : 
     168                 :     for (;;)
     169              26 :     {
     170                 :         size_t      cnt;
     171                 : 
     172                 :         /*
     173                 :          * Read compressed data.  Note that readF can resize the buffer; the
     174                 :          * new size is tracked and used for future loops.
     175                 :          */
     176              52 :         input->size = input_allocated_size;
     177              52 :         cnt = cs->readF(AH, (char **) unconstify(void **, &input->src), &input->size);
     178                 : 
     179                 :         /* ensure that readF didn't *shrink* the buffer */
     180              52 :         Assert(input->size >= input_allocated_size);
     181              52 :         input_allocated_size = input->size;
     182              52 :         input->size = cnt;
     183              52 :         input->pos = 0;
     184                 : 
     185              52 :         if (cnt == 0)
     186              26 :             break;
     187                 : 
     188                 :         /* Now decompress */
     189              26 :         while (input->pos < input->size)
     190                 :         {
     191              26 :             output->pos = 0;
     192              26 :             res = ZSTD_decompressStream(zstdcs->dstream, output, input);
     193              26 :             if (ZSTD_isError(res))
     194 UNC           0 :                 pg_fatal("could not decompress data: %s", ZSTD_getErrorName(res));
     195                 : 
     196                 :             /*
     197                 :              * then write the decompressed data to the output handle
     198                 :              */
     199 GNC          26 :             ((char *) output->dst)[output->pos] = '\0';
     200              26 :             ahwrite(output->dst, 1, output->pos, AH);
     201                 : 
     202              26 :             if (res == 0)
     203              26 :                 break;          /* End of frame */
     204                 :         }
     205                 :     }
     206              26 : }
     207                 : 
     208                 : /* Public routine that supports Zstd compressed data I/O */
     209                 : void
     210              52 : InitCompressorZstd(CompressorState *cs,
     211                 :                    const pg_compress_specification compression_spec)
     212                 : {
     213                 :     ZstdCompressorState *zstdcs;
     214                 : 
     215              52 :     cs->readData = ReadDataFromArchiveZstd;
     216              52 :     cs->writeData = WriteDataToArchiveZstd;
     217              52 :     cs->end = EndCompressorZstd;
     218                 : 
     219              52 :     cs->compression_spec = compression_spec;
     220                 : 
     221              52 :     zstdcs = (ZstdCompressorState *) pg_malloc0(sizeof(*zstdcs));
     222              52 :     cs->private_data = zstdcs;
     223                 : 
     224                 :     /* We expect that exactly one of readF/writeF is specified */
     225              52 :     Assert((cs->readF == NULL) != (cs->writeF == NULL));
     226                 : 
     227              52 :     if (cs->readF != NULL)
     228                 :     {
     229              26 :         zstdcs->dstream = ZSTD_createDStream();
     230              26 :         if (zstdcs->dstream == NULL)
     231 UNC           0 :             pg_fatal("could not initialize compression library");
     232                 : 
     233 GNC          26 :         zstdcs->input.size = ZSTD_DStreamInSize();
     234              26 :         zstdcs->input.src = pg_malloc(zstdcs->input.size);
     235                 : 
     236                 :         /*
     237                 :          * output.size is the buffer size we tell zstd it can output to.
     238                 :          * Allocate an additional byte such that ReadDataFromArchiveZstd() can
     239                 :          * call ahwrite() with a null-terminated string, which is an optimized
     240                 :          * case in ExecuteSqlCommandBuf().
     241                 :          */
     242              26 :         zstdcs->output.size = ZSTD_DStreamOutSize();
     243              26 :         zstdcs->output.dst = pg_malloc(zstdcs->output.size + 1);
     244                 :     }
     245              26 :     else if (cs->writeF != NULL)
     246                 :     {
     247              26 :         zstdcs->cstream = _ZstdCStreamParams(cs->compression_spec);
     248                 : 
     249              26 :         zstdcs->output.size = ZSTD_CStreamOutSize();
     250              26 :         zstdcs->output.dst = pg_malloc(zstdcs->output.size);
     251              26 :         zstdcs->output.pos = 0;
     252                 :     }
     253              52 : }
     254                 : 
     255                 : /*
     256                 :  * Compressed stream API
     257                 :  */
     258                 : 
     259                 : static bool
     260              94 : Zstd_read(void *ptr, size_t size, size_t *rdsize, CompressFileHandle *CFH)
     261                 : {
     262              94 :     ZstdCompressorState *zstdcs = (ZstdCompressorState *) CFH->private_data;
     263              94 :     ZSTD_inBuffer *input = &zstdcs->input;
     264              94 :     ZSTD_outBuffer *output = &zstdcs->output;
     265              94 :     size_t      input_allocated_size = ZSTD_DStreamInSize();
     266                 :     size_t      res,
     267                 :                 cnt;
     268                 : 
     269              94 :     output->size = size;
     270              94 :     output->dst = ptr;
     271              94 :     output->pos = 0;
     272                 : 
     273                 :     for (;;)
     274                 :     {
     275             120 :         Assert(input->pos <= input->size);
     276             120 :         Assert(input->size <= input_allocated_size);
     277                 : 
     278                 :         /*
     279                 :          * If the input is completely consumed, start back at the beginning
     280                 :          */
     281             120 :         if (input->pos == input->size)
     282                 :         {
     283                 :             /* input->size is size produced by "fread" */
     284              79 :             input->size = 0;
     285                 :             /* input->pos is position consumed by decompress */
     286              79 :             input->pos = 0;
     287                 :         }
     288                 : 
     289                 :         /* read compressed data if we must produce more input */
     290             120 :         if (input->pos == input->size)
     291                 :         {
     292              79 :             cnt = fread(unconstify(void *, input->src), 1, input_allocated_size, zstdcs->fp);
     293              79 :             input->size = cnt;
     294                 : 
     295              79 :             Assert(cnt <= input_allocated_size);
     296                 : 
     297                 :             /* If we have no more input to consume, we're done */
     298              79 :             if (cnt == 0)
     299              52 :                 break;
     300                 :         }
     301                 : 
     302              68 :         while (input->pos < input->size)
     303                 :         {
     304                 :             /* now decompress */
     305              68 :             res = ZSTD_decompressStream(zstdcs->dstream, output, input);
     306                 : 
     307              68 :             if (ZSTD_isError(res))
     308 UNC           0 :                 pg_fatal("could not decompress data: %s", ZSTD_getErrorName(res));
     309                 : 
     310 GNC          68 :             if (output->pos == output->size)
     311              42 :                 break;          /* No more room for output */
     312                 : 
     313              26 :             if (res == 0)
     314              26 :                 break;          /* End of frame */
     315                 :         }
     316                 : 
     317              68 :         if (output->pos == output->size)
     318              42 :             break;              /* We read all the data that fits */
     319                 :     }
     320                 : 
     321              94 :     if (rdsize != NULL)
     322              94 :         *rdsize = output->pos;
     323                 : 
     324              94 :     return true;
     325                 : }
     326                 : 
     327                 : static bool
     328            1498 : Zstd_write(const void *ptr, size_t size, CompressFileHandle *CFH)
     329                 : {
     330            1498 :     ZstdCompressorState *zstdcs = (ZstdCompressorState *) CFH->private_data;
     331            1498 :     ZSTD_inBuffer *input = &zstdcs->input;
     332            1498 :     ZSTD_outBuffer *output = &zstdcs->output;
     333                 :     size_t      res,
     334                 :                 cnt;
     335                 : 
     336            1498 :     input->src = ptr;
     337            1498 :     input->size = size;
     338            1498 :     input->pos = 0;
     339                 : 
     340                 :     /* Consume all input, to be flushed later */
     341            2996 :     while (input->pos != input->size)
     342                 :     {
     343            1498 :         output->pos = 0;
     344            1498 :         res = ZSTD_compressStream2(zstdcs->cstream, output, input, ZSTD_e_continue);
     345            1498 :         if (ZSTD_isError(res))
     346                 :         {
     347 UNC           0 :             zstdcs->zstderror = ZSTD_getErrorName(res);
     348               0 :             return false;
     349                 :         }
     350                 : 
     351 GNC        1498 :         cnt = fwrite(output->dst, 1, output->pos, zstdcs->fp);
     352            1498 :         if (cnt != output->pos)
     353                 :         {
     354 UNC           0 :             zstdcs->zstderror = strerror(errno);
     355               0 :             return false;
     356                 :         }
     357                 :     }
     358                 : 
     359 GNC        1498 :     return size;
     360                 : }
     361                 : 
     362                 : static int
     363 UNC           0 : Zstd_getc(CompressFileHandle *CFH)
     364                 : {
     365               0 :     ZstdCompressorState *zstdcs = (ZstdCompressorState *) CFH->private_data;
     366                 :     int         ret;
     367                 : 
     368               0 :     if (CFH->read_func(&ret, 1, NULL, CFH) != 1)
     369                 :     {
     370               0 :         if (feof(zstdcs->fp))
     371               0 :             pg_fatal("could not read from input file: end of file");
     372                 :         else
     373               0 :             pg_fatal("could not read from input file: %m");
     374                 :     }
     375               0 :     return ret;
     376                 : }
     377                 : 
     378                 : static char *
     379 GNC           3 : Zstd_gets(char *buf, int len, CompressFileHandle *CFH)
     380                 : {
     381                 :     int         i;
     382                 : 
     383               3 :     Assert(len > 0);
     384                 : 
     385                 :     /*
     386                 :      * Read one byte at a time until newline or EOF. This is only used to read
     387                 :      * the list of LOs, and the I/O is buffered anyway.
     388                 :      */
     389              43 :     for (i = 0; i < len - 1; ++i)
     390                 :     {
     391                 :         size_t      readsz;
     392                 : 
     393              43 :         if (!CFH->read_func(&buf[i], 1, &readsz, CFH))
     394               3 :             break;
     395              43 :         if (readsz != 1)
     396               1 :             break;
     397              42 :         if (buf[i] == '\n')
     398                 :         {
     399               2 :             ++i;
     400               2 :             break;
     401                 :         }
     402                 :     }
     403               3 :     buf[i] = '\0';
     404               3 :     return i > 0 ? buf : NULL;
     405                 : }
     406                 : 
     407                 : static bool
     408              54 : Zstd_close(CompressFileHandle *CFH)
     409                 : {
     410              54 :     ZstdCompressorState *zstdcs = (ZstdCompressorState *) CFH->private_data;
     411                 : 
     412              54 :     if (zstdcs->cstream)
     413                 :     {
     414                 :         size_t      res,
     415                 :                     cnt;
     416              27 :         ZSTD_inBuffer *input = &zstdcs->input;
     417              27 :         ZSTD_outBuffer *output = &zstdcs->output;
     418                 : 
     419                 :         /* Loop until the compression buffers are fully consumed */
     420                 :         for (;;)
     421                 :         {
     422              27 :             output->pos = 0;
     423              27 :             res = ZSTD_compressStream2(zstdcs->cstream, output, input, ZSTD_e_end);
     424              27 :             if (ZSTD_isError(res))
     425                 :             {
     426 UNC           0 :                 zstdcs->zstderror = ZSTD_getErrorName(res);
     427               0 :                 return false;
     428                 :             }
     429                 : 
     430 GNC          27 :             cnt = fwrite(output->dst, 1, output->pos, zstdcs->fp);
     431              27 :             if (cnt != output->pos)
     432                 :             {
     433 UNC           0 :                 zstdcs->zstderror = strerror(errno);
     434               0 :                 return false;
     435                 :             }
     436                 : 
     437 GNC          27 :             if (res == 0)
     438              27 :                 break;          /* End of frame */
     439                 :         }
     440                 : 
     441              27 :         ZSTD_freeCStream(zstdcs->cstream);
     442              27 :         pg_free(zstdcs->output.dst);
     443                 :     }
     444                 : 
     445              54 :     if (zstdcs->dstream)
     446                 :     {
     447              27 :         ZSTD_freeDStream(zstdcs->dstream);
     448              27 :         pg_free(unconstify(void *, zstdcs->input.src));
     449                 :     }
     450                 : 
     451              54 :     if (fclose(zstdcs->fp) != 0)
     452 UNC           0 :         return false;
     453                 : 
     454 GNC          54 :     pg_free(zstdcs);
     455              54 :     return true;
     456                 : }
     457                 : 
     458                 : static bool
     459               1 : Zstd_eof(CompressFileHandle *CFH)
     460                 : {
     461               1 :     ZstdCompressorState *zstdcs = (ZstdCompressorState *) CFH->private_data;
     462                 : 
     463               1 :     return feof(zstdcs->fp);
     464                 : }
     465                 : 
     466                 : static bool
     467              54 : Zstd_open(const char *path, int fd, const char *mode,
     468                 :           CompressFileHandle *CFH)
     469                 : {
     470                 :     FILE       *fp;
     471                 :     ZstdCompressorState *zstdcs;
     472                 : 
     473              54 :     if (fd >= 0)
     474 UNC           0 :         fp = fdopen(fd, mode);
     475                 :     else
     476 GNC          54 :         fp = fopen(path, mode);
     477                 : 
     478              54 :     if (fp == NULL)
     479 UNC           0 :         return false;
     480                 : 
     481 GNC          54 :     zstdcs = (ZstdCompressorState *) pg_malloc0(sizeof(*zstdcs));
     482              54 :     CFH->private_data = zstdcs;
     483              54 :     zstdcs->fp = fp;
     484                 : 
     485              54 :     if (mode[0] == 'r')
     486                 :     {
     487              27 :         zstdcs->input.src = pg_malloc0(ZSTD_DStreamInSize());
     488              27 :         zstdcs->dstream = ZSTD_createDStream();
     489              27 :         if (zstdcs->dstream == NULL)
     490 UNC           0 :             pg_fatal("could not initialize compression library");
     491                 :     }
     492 GNC          27 :     else if (mode[0] == 'w' || mode[0] == 'a')
     493                 :     {
     494              27 :         zstdcs->output.size = ZSTD_CStreamOutSize();
     495              27 :         zstdcs->output.dst = pg_malloc0(zstdcs->output.size);
     496              27 :         zstdcs->cstream = _ZstdCStreamParams(CFH->compression_spec);
     497              27 :         if (zstdcs->cstream == NULL)
     498 UNC           0 :             pg_fatal("could not initialize compression library");
     499                 :     }
     500                 :     else
     501               0 :         pg_fatal("unhandled mode");
     502                 : 
     503 GNC          54 :     return true;
     504                 : }
     505                 : 
     506                 : static bool
     507              26 : Zstd_open_write(const char *path, const char *mode, CompressFileHandle *CFH)
     508                 : {
     509                 :     char        fname[MAXPGPATH];
     510                 : 
     511              26 :     sprintf(fname, "%s.zst", path);
     512              26 :     return CFH->open_func(fname, -1, mode, CFH);
     513                 : }
     514                 : 
     515                 : static const char *
     516 UNC           0 : Zstd_get_error(CompressFileHandle *CFH)
     517                 : {
     518               0 :     ZstdCompressorState *zstdcs = (ZstdCompressorState *) CFH->private_data;
     519                 : 
     520               0 :     return zstdcs->zstderror;
     521                 : }
     522                 : 
     523                 : void
     524 GNC          54 : InitCompressFileHandleZstd(CompressFileHandle *CFH,
     525                 :                            const pg_compress_specification compression_spec)
     526                 : {
     527              54 :     CFH->open_func = Zstd_open;
     528              54 :     CFH->open_write_func = Zstd_open_write;
     529              54 :     CFH->read_func = Zstd_read;
     530              54 :     CFH->write_func = Zstd_write;
     531              54 :     CFH->gets_func = Zstd_gets;
     532              54 :     CFH->getc_func = Zstd_getc;
     533              54 :     CFH->close_func = Zstd_close;
     534              54 :     CFH->eof_func = Zstd_eof;
     535              54 :     CFH->get_error_func = Zstd_get_error;
     536                 : 
     537              54 :     CFH->compression_spec = compression_spec;
     538                 : 
     539              54 :     CFH->private_data = NULL;
     540              54 : }
     541                 : 
     542                 : #endif                          /* USE_ZSTD */
         |