Age Owner Branch data TLA Line data Source code
1 : : /*-------------------------------------------------------------------------
2 : : *
3 : : * basebackup_zstd.c
4 : : * Basebackup sink implementing zstd compression.
5 : : *
6 : : * Portions Copyright (c) 2010-2024, PostgreSQL Global Development Group
7 : : *
8 : : * IDENTIFICATION
9 : : * src/backend/backup/basebackup_zstd.c
10 : : *
11 : : *-------------------------------------------------------------------------
12 : : */
13 : : #include "postgres.h"
14 : :
15 : : #ifdef USE_ZSTD
16 : : #include <zstd.h>
17 : : #endif
18 : :
19 : : #include "backup/basebackup_sink.h"
20 : :
21 : : #ifdef USE_ZSTD
22 : :
23 : : typedef struct bbsink_zstd
24 : : {
25 : : /* Common information for all types of sink. */
26 : : bbsink base;
27 : :
28 : : /* Compression options */
29 : : pg_compress_specification *compress;
30 : :
31 : : ZSTD_CCtx *cctx;
32 : : ZSTD_outBuffer zstd_outBuf;
33 : : } bbsink_zstd;
34 : :
35 : : static void bbsink_zstd_begin_backup(bbsink *sink);
36 : : static void bbsink_zstd_begin_archive(bbsink *sink, const char *archive_name);
37 : : static void bbsink_zstd_archive_contents(bbsink *sink, size_t avail_in);
38 : : static void bbsink_zstd_manifest_contents(bbsink *sink, size_t len);
39 : : static void bbsink_zstd_end_archive(bbsink *sink);
40 : : static void bbsink_zstd_cleanup(bbsink *sink);
41 : : static void bbsink_zstd_end_backup(bbsink *sink, XLogRecPtr endptr,
42 : : TimeLineID endtli);
43 : :
44 : : static const bbsink_ops bbsink_zstd_ops = {
45 : : .begin_backup = bbsink_zstd_begin_backup,
46 : : .begin_archive = bbsink_zstd_begin_archive,
47 : : .archive_contents = bbsink_zstd_archive_contents,
48 : : .end_archive = bbsink_zstd_end_archive,
49 : : .begin_manifest = bbsink_forward_begin_manifest,
50 : : .manifest_contents = bbsink_zstd_manifest_contents,
51 : : .end_manifest = bbsink_forward_end_manifest,
52 : : .end_backup = bbsink_zstd_end_backup,
53 : : .cleanup = bbsink_zstd_cleanup
54 : : };
55 : : #endif
56 : :
57 : : /*
58 : : * Create a new basebackup sink that performs zstd compression.
59 : : */
60 : : bbsink *
733 michael@paquier.xyz 61 :CBC 4 : bbsink_zstd_new(bbsink *next, pg_compress_specification *compress)
62 : : {
63 : : #ifndef USE_ZSTD
64 : : ereport(ERROR,
65 : : (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
66 : : errmsg("zstd compression is not supported by this build")));
67 : : return NULL; /* keep compiler quiet */
68 : : #else
69 : : bbsink_zstd *sink;
70 : :
769 rhaas@postgresql.org 71 [ - + ]: 4 : Assert(next != NULL);
72 : :
73 : 4 : sink = palloc0(sizeof(bbsink_zstd));
74 : 4 : *((const bbsink_ops **) &sink->base.bbs_ops) = &bbsink_zstd_ops;
75 : 4 : sink->base.bbs_next = next;
746 76 : 4 : sink->compress = compress;
77 : :
769 78 : 4 : return &sink->base;
79 : : #endif
80 : : }
81 : :
82 : : #ifdef USE_ZSTD
83 : :
84 : : /*
85 : : * Begin backup.
86 : : */
87 : : static void
88 : 4 : bbsink_zstd_begin_backup(bbsink *sink)
89 : : {
90 : 4 : bbsink_zstd *mysink = (bbsink_zstd *) sink;
91 : : size_t output_buffer_bound;
92 : : size_t ret;
733 michael@paquier.xyz 93 : 4 : pg_compress_specification *compress = mysink->compress;
94 : :
769 rhaas@postgresql.org 95 : 4 : mysink->cctx = ZSTD_createCCtx();
96 [ - + ]: 4 : if (!mysink->cctx)
769 rhaas@postgresql.org 97 [ # # ]:UBC 0 : elog(ERROR, "could not create zstd compression context");
98 : :
578 michael@paquier.xyz 99 :CBC 4 : ret = ZSTD_CCtx_setParameter(mysink->cctx, ZSTD_c_compressionLevel,
100 : : compress->level);
101 [ - + ]: 4 : if (ZSTD_isError(ret))
578 michael@paquier.xyz 102 [ # # ]:UBC 0 : elog(ERROR, "could not set zstd compression level to %d: %s",
103 : : compress->level, ZSTD_getErrorName(ret));
104 : :
733 michael@paquier.xyz 105 [ + + ]:CBC 4 : if ((compress->options & PG_COMPRESSION_OPTION_WORKERS) != 0)
106 : : {
107 : : /*
108 : : * On older versions of libzstd, this option does not exist, and
109 : : * trying to set it will fail. Similarly for newer versions if they
110 : : * are compiled without threading support.
111 : : */
746 rhaas@postgresql.org 112 : 1 : ret = ZSTD_CCtx_setParameter(mysink->cctx, ZSTD_c_nbWorkers,
113 : : compress->workers);
114 [ - + ]: 1 : if (ZSTD_isError(ret))
746 rhaas@postgresql.org 115 [ # # ]:UBC 0 : ereport(ERROR,
116 : : errcode(ERRCODE_INVALID_PARAMETER_VALUE),
117 : : errmsg("could not set compression worker count to %d: %s",
118 : : compress->workers, ZSTD_getErrorName(ret)));
119 : : }
120 : :
374 tomas.vondra@postgre 121 [ + + ]:CBC 4 : if ((compress->options & PG_COMPRESSION_OPTION_LONG_DISTANCE) != 0)
122 : : {
123 : 1 : ret = ZSTD_CCtx_setParameter(mysink->cctx,
124 : : ZSTD_c_enableLongDistanceMatching,
125 : 1 : compress->long_distance);
126 [ - + ]: 1 : if (ZSTD_isError(ret))
374 tomas.vondra@postgre 127 [ # # ]:UBC 0 : ereport(ERROR,
128 : : errcode(ERRCODE_INVALID_PARAMETER_VALUE),
129 : : errmsg("could not enable long-distance mode: %s",
130 : : ZSTD_getErrorName(ret)));
131 : : }
132 : :
133 : : /*
134 : : * We need our own buffer, because we're going to pass different data to
135 : : * the next sink than what gets passed to us.
136 : : */
769 rhaas@postgresql.org 137 :CBC 4 : mysink->base.bbs_buffer = palloc(mysink->base.bbs_buffer_length);
138 : :
139 : : /*
140 : : * Make sure that the next sink's bbs_buffer is big enough to accommodate
141 : : * the compressed input buffer.
142 : : */
143 : 4 : output_buffer_bound = ZSTD_compressBound(mysink->base.bbs_buffer_length);
144 : :
145 : : /*
146 : : * The buffer length is expected to be a multiple of BLCKSZ, so round up.
147 : : */
148 : 4 : output_buffer_bound = output_buffer_bound + BLCKSZ -
149 : : (output_buffer_bound % BLCKSZ);
150 : :
151 : 4 : bbsink_begin_backup(sink->bbs_next, sink->bbs_state, output_buffer_bound);
152 : 4 : }
153 : :
154 : : /*
155 : : * Prepare to compress the next archive.
156 : : */
157 : : static void
158 : 4 : bbsink_zstd_begin_archive(bbsink *sink, const char *archive_name)
159 : : {
160 : 4 : bbsink_zstd *mysink = (bbsink_zstd *) sink;
161 : : char *zstd_archive_name;
162 : :
163 : : /*
164 : : * At the start of each archive we reset the state to start a new
165 : : * compression operation. The parameters are sticky and they will stick
166 : : * around as we are resetting with option ZSTD_reset_session_only.
167 : : */
168 : 4 : ZSTD_CCtx_reset(mysink->cctx, ZSTD_reset_session_only);
169 : :
170 : 4 : mysink->zstd_outBuf.dst = mysink->base.bbs_next->bbs_buffer;
171 : 4 : mysink->zstd_outBuf.size = mysink->base.bbs_next->bbs_buffer_length;
172 : 4 : mysink->zstd_outBuf.pos = 0;
173 : :
174 : : /* Add ".zst" to the archive name. */
175 : 4 : zstd_archive_name = psprintf("%s.zst", archive_name);
176 [ - + ]: 4 : Assert(sink->bbs_next != NULL);
177 : 4 : bbsink_begin_archive(sink->bbs_next, zstd_archive_name);
178 : 4 : pfree(zstd_archive_name);
179 : 4 : }
180 : :
181 : : /*
182 : : * Compress the input data to the output buffer until we run out of input
183 : : * data. Each time the output buffer falls below the compression bound for
184 : : * the input buffer, invoke the archive_contents() method for the next sink.
185 : : *
186 : : * Note that since we're compressing the input, it may very commonly happen
187 : : * that we consume all the input data without filling the output buffer. In
188 : : * that case, the compressed representation of the current input data won't
189 : : * actually be sent to the next bbsink until a later call to this function,
190 : : * or perhaps even not until bbsink_zstd_end_archive() is invoked.
191 : : */
192 : : static void
193 : 10790 : bbsink_zstd_archive_contents(bbsink *sink, size_t len)
194 : : {
195 : 10790 : bbsink_zstd *mysink = (bbsink_zstd *) sink;
196 : 10790 : ZSTD_inBuffer inBuf = {mysink->base.bbs_buffer, len, 0};
197 : :
198 [ + + ]: 21611 : while (inBuf.pos < inBuf.size)
199 : : {
200 : : size_t yet_to_flush;
201 : 10821 : size_t max_needed = ZSTD_compressBound(inBuf.size - inBuf.pos);
202 : :
203 : : /*
204 : : * If the out buffer is not left with enough space, send the output
205 : : * buffer to the next sink, and reset it.
206 : : */
207 [ + + ]: 10821 : if (mysink->zstd_outBuf.size - mysink->zstd_outBuf.pos < max_needed)
208 : : {
209 : 391 : bbsink_archive_contents(mysink->base.bbs_next,
210 : : mysink->zstd_outBuf.pos);
211 : 391 : mysink->zstd_outBuf.dst = mysink->base.bbs_next->bbs_buffer;
212 : 391 : mysink->zstd_outBuf.size =
213 : 391 : mysink->base.bbs_next->bbs_buffer_length;
214 : 391 : mysink->zstd_outBuf.pos = 0;
215 : : }
216 : :
217 : 10821 : yet_to_flush = ZSTD_compressStream2(mysink->cctx, &mysink->zstd_outBuf,
218 : : &inBuf, ZSTD_e_continue);
219 : :
220 [ - + ]: 10821 : if (ZSTD_isError(yet_to_flush))
769 rhaas@postgresql.org 221 [ # # ]:UBC 0 : elog(ERROR,
222 : : "could not compress data: %s",
223 : : ZSTD_getErrorName(yet_to_flush));
224 : : }
769 rhaas@postgresql.org 225 :CBC 10790 : }
226 : :
227 : : /*
228 : : * There might be some data inside zstd's internal buffers; we need to get that
229 : : * flushed out, also end the zstd frame and then get that forwarded to the
230 : : * successor sink as archive content.
231 : : *
232 : : * Then we can end processing for this archive.
233 : : */
234 : : static void
235 : 4 : bbsink_zstd_end_archive(bbsink *sink)
236 : : {
237 : 4 : bbsink_zstd *mysink = (bbsink_zstd *) sink;
238 : : size_t yet_to_flush;
239 : :
240 : : do
241 : : {
242 : 4 : ZSTD_inBuffer in = {NULL, 0, 0};
243 : 4 : size_t max_needed = ZSTD_compressBound(0);
244 : :
245 : : /*
246 : : * If the out buffer is not left with enough space, send the output
247 : : * buffer to the next sink, and reset it.
248 : : */
249 [ - + ]: 4 : if (mysink->zstd_outBuf.size - mysink->zstd_outBuf.pos < max_needed)
250 : : {
769 rhaas@postgresql.org 251 :LBC (20) : bbsink_archive_contents(mysink->base.bbs_next,
252 : : mysink->zstd_outBuf.pos);
253 : (20) : mysink->zstd_outBuf.dst = mysink->base.bbs_next->bbs_buffer;
254 : (20) : mysink->zstd_outBuf.size =
255 : (20) : mysink->base.bbs_next->bbs_buffer_length;
256 : (20) : mysink->zstd_outBuf.pos = 0;
257 : : }
258 : :
769 rhaas@postgresql.org 259 :CBC 4 : yet_to_flush = ZSTD_compressStream2(mysink->cctx,
260 : : &mysink->zstd_outBuf,
261 : : &in, ZSTD_e_end);
262 : :
263 [ - + ]: 4 : if (ZSTD_isError(yet_to_flush))
769 rhaas@postgresql.org 264 [ # # ]:UBC 0 : elog(ERROR, "could not compress data: %s",
265 : : ZSTD_getErrorName(yet_to_flush));
266 : :
769 rhaas@postgresql.org 267 [ - + ]:CBC 4 : } while (yet_to_flush > 0);
268 : :
269 : : /* Make sure to pass any remaining bytes to the next sink. */
270 [ + - ]: 4 : if (mysink->zstd_outBuf.pos > 0)
271 : 4 : bbsink_archive_contents(mysink->base.bbs_next,
272 : : mysink->zstd_outBuf.pos);
273 : :
274 : : /* Pass on the information that this archive has ended. */
275 : 4 : bbsink_forward_end_archive(sink);
276 : 4 : }
277 : :
278 : : /*
279 : : * Free the resources and context.
280 : : */
281 : : static void
282 : 4 : bbsink_zstd_end_backup(bbsink *sink, XLogRecPtr endptr,
283 : : TimeLineID endtli)
284 : : {
285 : 4 : bbsink_zstd *mysink = (bbsink_zstd *) sink;
286 : :
287 : : /* Release the context. */
288 [ + - ]: 4 : if (mysink->cctx)
289 : : {
290 : 4 : ZSTD_freeCCtx(mysink->cctx);
291 : 4 : mysink->cctx = NULL;
292 : : }
293 : :
294 : 4 : bbsink_forward_end_backup(sink, endptr, endtli);
295 : 4 : }
296 : :
297 : : /*
298 : : * Manifest contents are not compressed, but we do need to copy them into
299 : : * the successor sink's buffer, because we have our own.
300 : : */
301 : : static void
302 : 20 : bbsink_zstd_manifest_contents(bbsink *sink, size_t len)
303 : : {
304 : 20 : memcpy(sink->bbs_next->bbs_buffer, sink->bbs_buffer, len);
305 : 20 : bbsink_manifest_contents(sink->bbs_next, len);
306 : 20 : }
307 : :
308 : : /*
309 : : * In case the backup fails, make sure we free any compression context that
310 : : * got allocated, so that we don't leak memory.
311 : : */
312 : : static void
313 : 4 : bbsink_zstd_cleanup(bbsink *sink)
314 : : {
315 : 4 : bbsink_zstd *mysink = (bbsink_zstd *) sink;
316 : :
317 : : /* Release the context if not already released. */
318 [ - + ]: 4 : if (mysink->cctx)
319 : : {
769 rhaas@postgresql.org 320 :UBC 0 : ZSTD_freeCCtx(mysink->cctx);
321 : 0 : mysink->cctx = NULL;
322 : : }
769 rhaas@postgresql.org 323 :CBC 4 : }
324 : :
325 : : #endif
|