Age Owner TLA Line data Source code
1 : /*-------------------------------------------------------------------------
2 : *
3 : * basebackup_zstd.c
4 : * Basebackup sink implementing zstd compression.
5 : *
6 : * Portions Copyright (c) 2010-2023, PostgreSQL Global Development Group
7 : *
8 : * IDENTIFICATION
9 : * src/backend/backup/basebackup_zstd.c
10 : *
11 : *-------------------------------------------------------------------------
12 : */
13 : #include "postgres.h"
14 :
15 : #ifdef USE_ZSTD
16 : #include <zstd.h>
17 : #endif
18 :
19 : #include "backup/basebackup_sink.h"
20 :
21 : #ifdef USE_ZSTD
22 :
23 : typedef struct bbsink_zstd
24 : {
25 : /* Common information for all types of sink. */
26 : bbsink base;
27 :
28 : /* Compression options */
29 : pg_compress_specification *compress;
30 :
31 : ZSTD_CCtx *cctx;
32 : ZSTD_outBuffer zstd_outBuf;
33 : } bbsink_zstd;
34 :
35 : static void bbsink_zstd_begin_backup(bbsink *sink);
36 : static void bbsink_zstd_begin_archive(bbsink *sink, const char *archive_name);
37 : static void bbsink_zstd_archive_contents(bbsink *sink, size_t avail_in);
38 : static void bbsink_zstd_manifest_contents(bbsink *sink, size_t len);
39 : static void bbsink_zstd_end_archive(bbsink *sink);
40 : static void bbsink_zstd_cleanup(bbsink *sink);
41 : static void bbsink_zstd_end_backup(bbsink *sink, XLogRecPtr endptr,
42 : TimeLineID endtli);
43 :
44 : static const bbsink_ops bbsink_zstd_ops = {
45 : .begin_backup = bbsink_zstd_begin_backup,
46 : .begin_archive = bbsink_zstd_begin_archive,
47 : .archive_contents = bbsink_zstd_archive_contents,
48 : .end_archive = bbsink_zstd_end_archive,
49 : .begin_manifest = bbsink_forward_begin_manifest,
50 : .manifest_contents = bbsink_zstd_manifest_contents,
51 : .end_manifest = bbsink_forward_end_manifest,
52 : .end_backup = bbsink_zstd_end_backup,
53 : .cleanup = bbsink_zstd_cleanup
54 : };
55 : #endif
56 :
57 : /*
58 : * Create a new basebackup sink that performs zstd compression.
59 : */
60 : bbsink *
362 michael 61 CBC 4 : bbsink_zstd_new(bbsink *next, pg_compress_specification *compress)
62 : {
63 : #ifndef USE_ZSTD
64 : ereport(ERROR,
65 : (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
66 : errmsg("zstd compression is not supported by this build")));
67 : return NULL; /* keep compiler quiet */
68 : #else
69 : bbsink_zstd *sink;
70 :
398 rhaas 71 4 : Assert(next != NULL);
72 :
73 4 : sink = palloc0(sizeof(bbsink_zstd));
74 4 : *((const bbsink_ops **) &sink->base.bbs_ops) = &bbsink_zstd_ops;
75 4 : sink->base.bbs_next = next;
375 76 4 : sink->compress = compress;
77 :
398 78 4 : return &sink->base;
79 : #endif
80 : }
81 :
82 : #ifdef USE_ZSTD
83 :
84 : /*
85 : * Begin backup.
86 : */
87 : static void
88 4 : bbsink_zstd_begin_backup(bbsink *sink)
89 : {
90 4 : bbsink_zstd *mysink = (bbsink_zstd *) sink;
91 : size_t output_buffer_bound;
92 : size_t ret;
362 michael 93 4 : pg_compress_specification *compress = mysink->compress;
94 :
398 rhaas 95 4 : mysink->cctx = ZSTD_createCCtx();
96 4 : if (!mysink->cctx)
398 rhaas 97 UBC 0 : elog(ERROR, "could not create zstd compression context");
98 :
207 michael 99 CBC 4 : ret = ZSTD_CCtx_setParameter(mysink->cctx, ZSTD_c_compressionLevel,
100 : compress->level);
101 4 : if (ZSTD_isError(ret))
207 michael 102 UBC 0 : elog(ERROR, "could not set zstd compression level to %d: %s",
103 : compress->level, ZSTD_getErrorName(ret));
104 :
362 michael 105 CBC 4 : if ((compress->options & PG_COMPRESSION_OPTION_WORKERS) != 0)
106 : {
107 : /*
108 : * On older versions of libzstd, this option does not exist, and
109 : * trying to set it will fail. Similarly for newer versions if they
110 : * are compiled without threading support.
111 : */
375 rhaas 112 1 : ret = ZSTD_CCtx_setParameter(mysink->cctx, ZSTD_c_nbWorkers,
113 : compress->workers);
114 1 : if (ZSTD_isError(ret))
375 rhaas 115 UBC 0 : ereport(ERROR,
116 : errcode(ERRCODE_INVALID_PARAMETER_VALUE),
117 : errmsg("could not set compression worker count to %d: %s",
118 : compress->workers, ZSTD_getErrorName(ret)));
119 : }
120 :
3 tomas.vondra 121 GNC 4 : if ((compress->options & PG_COMPRESSION_OPTION_LONG_DISTANCE) != 0)
122 : {
123 1 : ret = ZSTD_CCtx_setParameter(mysink->cctx,
124 : ZSTD_c_enableLongDistanceMatching,
125 1 : compress->long_distance);
126 1 : if (ZSTD_isError(ret))
3 tomas.vondra 127 UNC 0 : ereport(ERROR,
128 : errcode(ERRCODE_INVALID_PARAMETER_VALUE),
129 : errmsg("could not set compression flag for %s: %s",
130 : "long", ZSTD_getErrorName(ret)));
131 : }
132 :
398 rhaas 133 ECB : /*
134 : * We need our own buffer, because we're going to pass different data to
135 : * the next sink than what gets passed to us.
136 : */
398 rhaas 137 CBC 4 : mysink->base.bbs_buffer = palloc(mysink->base.bbs_buffer_length);
398 rhaas 138 ECB :
398 rhaas 139 EUB : /*
140 : * Make sure that the next sink's bbs_buffer is big enough to accommodate
141 : * the compressed input buffer.
142 : */
398 rhaas 143 GIC 4 : output_buffer_bound = ZSTD_compressBound(mysink->base.bbs_buffer_length);
144 :
145 : /*
146 : * The buffer length is expected to be a multiple of BLCKSZ, so round up.
147 : */
148 4 : output_buffer_bound = output_buffer_bound + BLCKSZ -
398 rhaas 149 ECB : (output_buffer_bound % BLCKSZ);
150 :
398 rhaas 151 GIC 4 : bbsink_begin_backup(sink->bbs_next, sink->bbs_state, output_buffer_bound);
152 4 : }
153 :
154 : /*
398 rhaas 155 ECB : * Prepare to compress the next archive.
156 : */
157 : static void
398 rhaas 158 GIC 4 : bbsink_zstd_begin_archive(bbsink *sink, const char *archive_name)
159 : {
398 rhaas 160 CBC 4 : bbsink_zstd *mysink = (bbsink_zstd *) sink;
161 : char *zstd_archive_name;
162 :
398 rhaas 163 ECB : /*
164 : * At the start of each archive we reset the state to start a new
165 : * compression operation. The parameters are sticky and they will stick
166 : * around as we are resetting with option ZSTD_reset_session_only.
167 : */
398 rhaas 168 GIC 4 : ZSTD_CCtx_reset(mysink->cctx, ZSTD_reset_session_only);
169 :
398 rhaas 170 CBC 4 : mysink->zstd_outBuf.dst = mysink->base.bbs_next->bbs_buffer;
398 rhaas 171 GIC 4 : mysink->zstd_outBuf.size = mysink->base.bbs_next->bbs_buffer_length;
398 rhaas 172 CBC 4 : mysink->zstd_outBuf.pos = 0;
173 :
174 : /* Add ".zst" to the archive name. */
398 rhaas 175 GIC 4 : zstd_archive_name = psprintf("%s.zst", archive_name);
176 4 : Assert(sink->bbs_next != NULL);
177 4 : bbsink_begin_archive(sink->bbs_next, zstd_archive_name);
178 4 : pfree(zstd_archive_name);
179 4 : }
398 rhaas 180 ECB :
181 : /*
182 : * Compress the input data to the output buffer until we run out of input
183 : * data. Each time the output buffer falls below the compression bound for
184 : * the input buffer, invoke the archive_contents() method for the next sink.
185 : *
186 : * Note that since we're compressing the input, it may very commonly happen
187 : * that we consume all the input data without filling the output buffer. In
188 : * that case, the compressed representation of the current input data won't
189 : * actually be sent to the next bbsink until a later call to this function,
190 : * or perhaps even not until bbsink_zstd_end_archive() is invoked.
191 : */
192 : static void
398 rhaas 193 GIC 10810 : bbsink_zstd_archive_contents(bbsink *sink, size_t len)
194 : {
195 10810 : bbsink_zstd *mysink = (bbsink_zstd *) sink;
196 10810 : ZSTD_inBuffer inBuf = {mysink->base.bbs_buffer, len, 0};
197 :
198 21650 : while (inBuf.pos < inBuf.size)
199 : {
200 : size_t yet_to_flush;
201 10840 : size_t max_needed = ZSTD_compressBound(inBuf.size - inBuf.pos);
202 :
203 : /*
204 : * If the out buffer is not left with enough space, send the output
398 rhaas 205 ECB : * buffer to the next sink, and reset it.
206 : */
398 rhaas 207 CBC 10840 : if (mysink->zstd_outBuf.size - mysink->zstd_outBuf.pos < max_needed)
398 rhaas 208 ECB : {
398 rhaas 209 GIC 410 : bbsink_archive_contents(mysink->base.bbs_next,
398 rhaas 210 ECB : mysink->zstd_outBuf.pos);
398 rhaas 211 GIC 410 : mysink->zstd_outBuf.dst = mysink->base.bbs_next->bbs_buffer;
212 410 : mysink->zstd_outBuf.size =
398 rhaas 213 CBC 410 : mysink->base.bbs_next->bbs_buffer_length;
398 rhaas 214 GIC 410 : mysink->zstd_outBuf.pos = 0;
215 : }
216 :
217 10840 : yet_to_flush = ZSTD_compressStream2(mysink->cctx, &mysink->zstd_outBuf,
218 : &inBuf, ZSTD_e_continue);
398 rhaas 219 ECB :
398 rhaas 220 GIC 10840 : if (ZSTD_isError(yet_to_flush))
398 rhaas 221 LBC 0 : elog(ERROR,
222 : "could not compress data: %s",
398 rhaas 223 ECB : ZSTD_getErrorName(yet_to_flush));
224 : }
398 rhaas 225 CBC 10810 : }
398 rhaas 226 ECB :
227 : /*
228 : * There might be some data inside zstd's internal buffers; we need to get that
229 : * flushed out, also end the zstd frame and then get that forwarded to the
230 : * successor sink as archive content.
231 : *
232 : * Then we can end processing for this archive.
398 rhaas 233 EUB : */
234 : static void
398 rhaas 235 GIC 4 : bbsink_zstd_end_archive(bbsink *sink)
236 : {
398 rhaas 237 CBC 4 : bbsink_zstd *mysink = (bbsink_zstd *) sink;
238 : size_t yet_to_flush;
239 :
240 : do
241 : {
398 rhaas 242 GIC 4 : ZSTD_inBuffer in = {NULL, 0, 0};
243 4 : size_t max_needed = ZSTD_compressBound(0);
244 :
245 : /*
246 : * If the out buffer is not left with enough space, send the output
398 rhaas 247 ECB : * buffer to the next sink, and reset it.
248 : */
398 rhaas 249 CBC 4 : if (mysink->zstd_outBuf.size - mysink->zstd_outBuf.pos < max_needed)
250 : {
398 rhaas 251 UIC 0 : bbsink_archive_contents(mysink->base.bbs_next,
252 : mysink->zstd_outBuf.pos);
253 0 : mysink->zstd_outBuf.dst = mysink->base.bbs_next->bbs_buffer;
398 rhaas 254 LBC 0 : mysink->zstd_outBuf.size =
255 0 : mysink->base.bbs_next->bbs_buffer_length;
398 rhaas 256 UIC 0 : mysink->zstd_outBuf.pos = 0;
257 : }
258 :
398 rhaas 259 GIC 4 : yet_to_flush = ZSTD_compressStream2(mysink->cctx,
260 : &mysink->zstd_outBuf,
398 rhaas 261 ECB : &in, ZSTD_e_end);
262 :
398 rhaas 263 GBC 4 : if (ZSTD_isError(yet_to_flush))
398 rhaas 264 UIC 0 : elog(ERROR, "could not compress data: %s",
398 rhaas 265 EUB : ZSTD_getErrorName(yet_to_flush));
266 :
398 rhaas 267 GBC 4 : } while (yet_to_flush > 0);
398 rhaas 268 EUB :
269 : /* Make sure to pass any remaining bytes to the next sink. */
398 rhaas 270 GIC 4 : if (mysink->zstd_outBuf.pos > 0)
398 rhaas 271 CBC 4 : bbsink_archive_contents(mysink->base.bbs_next,
272 : mysink->zstd_outBuf.pos);
273 :
274 : /* Pass on the information that this archive has ended. */
275 4 : bbsink_forward_end_archive(sink);
398 rhaas 276 GBC 4 : }
277 :
278 : /*
398 rhaas 279 ECB : * Free the resources and context.
280 : */
281 : static void
398 rhaas 282 CBC 4 : bbsink_zstd_end_backup(bbsink *sink, XLogRecPtr endptr,
398 rhaas 283 ECB : TimeLineID endtli)
284 : {
398 rhaas 285 GIC 4 : bbsink_zstd *mysink = (bbsink_zstd *) sink;
286 :
398 rhaas 287 ECB : /* Release the context. */
398 rhaas 288 CBC 4 : if (mysink->cctx)
289 : {
398 rhaas 290 GIC 4 : ZSTD_freeCCtx(mysink->cctx);
291 4 : mysink->cctx = NULL;
292 : }
293 :
398 rhaas 294 CBC 4 : bbsink_forward_end_backup(sink, endptr, endtli);
398 rhaas 295 GIC 4 : }
296 :
398 rhaas 297 ECB : /*
298 : * Manifest contents are not compressed, but we do need to copy them into
299 : * the successor sink's buffer, because we have our own.
300 : */
301 : static void
398 rhaas 302 CBC 20 : bbsink_zstd_manifest_contents(bbsink *sink, size_t len)
398 rhaas 303 ECB : {
398 rhaas 304 GIC 20 : memcpy(sink->bbs_next->bbs_buffer, sink->bbs_buffer, len);
305 20 : bbsink_manifest_contents(sink->bbs_next, len);
398 rhaas 306 CBC 20 : }
398 rhaas 307 ECB :
308 : /*
309 : * In case the backup fails, make sure we free any compression context that
310 : * got allocated, so that we don't leak memory.
311 : */
312 : static void
398 rhaas 313 GIC 4 : bbsink_zstd_cleanup(bbsink *sink)
398 rhaas 314 ECB : {
398 rhaas 315 GIC 4 : bbsink_zstd *mysink = (bbsink_zstd *) sink;
398 rhaas 316 ECB :
317 : /* Release the context if not already released. */
398 rhaas 318 CBC 4 : if (mysink->cctx)
319 : {
398 rhaas 320 UIC 0 : ZSTD_freeCCtx(mysink->cctx);
321 0 : mysink->cctx = NULL;
322 : }
398 rhaas 323 GIC 4 : }
324 :
398 rhaas 325 ECB : #endif
|