TLA Line data Source code
1 : /*-------------------------------------------------------------------------
2 : *
3 : * walmethods.c - implementations of different ways to write received wal
4 : *
5 : * Portions Copyright (c) 1996-2023, PostgreSQL Global Development Group
6 : *
7 : * IDENTIFICATION
8 : * src/bin/pg_basebackup/walmethods.c
9 : *-------------------------------------------------------------------------
10 : */
11 :
12 : #include "postgres_fe.h"
13 :
14 : #include <sys/stat.h>
15 : #include <time.h>
16 : #include <unistd.h>
17 :
18 : #ifdef USE_LZ4
19 : #include <lz4frame.h>
20 : #endif
21 : #ifdef HAVE_LIBZ
22 : #include <zlib.h>
23 : #endif
24 :
25 : #include "common/file_perm.h"
26 : #include "common/file_utils.h"
27 : #include "common/logging.h"
28 : #include "pgtar.h"
29 : #include "receivelog.h"
30 : #include "streamutil.h"
31 :
32 : /* Size of zlib buffer for .tar.gz */
33 : #define ZLIB_OUT_SIZE 4096
34 :
35 : /* Size of LZ4 input chunk for .lz4 */
36 : #define LZ4_IN_SIZE 4096
37 :
38 : /*-------------------------------------------------------------------------
39 : * WalDirectoryMethod - write wal to a directory looking like pg_wal
40 : *-------------------------------------------------------------------------
41 : */
42 :
43 : static Walfile *dir_open_for_write(WalWriteMethod *wwmethod,
44 : const char *pathname,
45 : const char *temp_suffix,
46 : size_t pad_to_size);
47 : static int dir_close(Walfile *f, WalCloseMethod method);
48 : static bool dir_existsfile(WalWriteMethod *wwmethod, const char *pathname);
49 : static ssize_t dir_get_file_size(WalWriteMethod *wwmethod,
50 : const char *pathname);
51 : static char *dir_get_file_name(WalWriteMethod *wwmethod,
52 : const char *pathname, const char *temp_suffix);
53 : static ssize_t dir_write(Walfile *f, const void *buf, size_t count);
54 : static int dir_sync(Walfile *f);
55 : static bool dir_finish(WalWriteMethod *wwmethod);
56 : static void dir_free(WalWriteMethod *wwmethod);
57 :
58 : const WalWriteMethodOps WalDirectoryMethodOps = {
59 : .open_for_write = dir_open_for_write,
60 : .close = dir_close,
61 : .existsfile = dir_existsfile,
62 : .get_file_size = dir_get_file_size,
63 : .get_file_name = dir_get_file_name,
64 : .write = dir_write,
65 : .sync = dir_sync,
66 : .finish = dir_finish,
67 : .free = dir_free
68 : };
69 :
70 : /*
71 : * Global static data for this method
72 : */
73 : typedef struct DirectoryMethodData
74 : {
75 : WalWriteMethod base;
76 : char *basedir;
77 : } DirectoryMethodData;
78 :
79 : /*
80 : * Local file handle
81 : */
82 : typedef struct DirectoryMethodFile
83 : {
84 : Walfile base;
85 : int fd;
86 : char *fullpath;
87 : char *temp_suffix;
88 : #ifdef HAVE_LIBZ
89 : gzFile gzfp;
90 : #endif
91 : #ifdef USE_LZ4
92 : LZ4F_compressionContext_t ctx;
93 : size_t lz4bufsize;
94 : void *lz4buf;
95 : #endif
96 : } DirectoryMethodFile;
97 :
98 : #define clear_error(wwmethod) \
99 : ((wwmethod)->lasterrstring = NULL, (wwmethod)->lasterrno = 0)
100 :
101 : static char *
102 GNC 351 : dir_get_file_name(WalWriteMethod *wwmethod,
103 : const char *pathname, const char *temp_suffix)
104 : {
105 GIC 351 : char *filename = pg_malloc0(MAXPGPATH * sizeof(char));
106 :
107 702 : snprintf(filename, MAXPGPATH, "%s%s%s",
108 : pathname,
109 GNC 351 : wwmethod->compression_algorithm == PG_COMPRESSION_GZIP ? ".gz" :
110 343 : wwmethod->compression_algorithm == PG_COMPRESSION_LZ4 ? ".lz4" : "",
111 ECB : temp_suffix ? temp_suffix : "");
112 :
113 GIC 351 : return filename;
114 ECB : }
115 :
116 : static Walfile *
117 GNC 139 : dir_open_for_write(WalWriteMethod *wwmethod, const char *pathname,
118 : const char *temp_suffix, size_t pad_to_size)
119 ECB : {
120 GNC 139 : DirectoryMethodData *dir_data = (DirectoryMethodData *) wwmethod;
121 ECB : char tmppath[MAXPGPATH];
122 : char *filename;
123 : int fd;
124 : DirectoryMethodFile *f;
125 : #ifdef HAVE_LIBZ
126 GIC 139 : gzFile gzfp = NULL;
127 : #endif
128 ECB : #ifdef USE_LZ4
129 GIC 139 : LZ4F_compressionContext_t ctx = NULL;
130 139 : size_t lz4bufsize = 0;
131 CBC 139 : void *lz4buf = NULL;
132 : #endif
133 :
134 GNC 139 : clear_error(wwmethod);
135 :
136 139 : filename = dir_get_file_name(wwmethod, pathname, temp_suffix);
137 CBC 139 : snprintf(tmppath, sizeof(tmppath), "%s/%s",
138 : dir_data->basedir, filename);
139 GIC 139 : pg_free(filename);
140 ECB :
141 : /*
142 : * Open a file for non-compressed as well as compressed files. Tracking
143 : * the file descriptor is important for dir_sync() method as gzflush()
144 : * does not do any system calls to fsync() to make changes permanent on
145 : * disk.
146 : */
147 CBC 139 : fd = open(tmppath, O_WRONLY | O_CREAT | PG_BINARY, pg_file_create_mode);
148 139 : if (fd < 0)
149 : {
150 UNC 0 : wwmethod->lasterrno = errno;
151 UIC 0 : return NULL;
152 : }
153 :
154 : #ifdef HAVE_LIBZ
155 GNC 139 : if (wwmethod->compression_algorithm == PG_COMPRESSION_GZIP)
156 : {
157 GIC 2 : gzfp = gzdopen(fd, "wb");
158 CBC 2 : if (gzfp == NULL)
159 ECB : {
160 UNC 0 : wwmethod->lasterrno = errno;
161 UBC 0 : close(fd);
162 0 : return NULL;
163 : }
164 :
165 GNC 2 : if (gzsetparams(gzfp, wwmethod->compression_level,
166 ECB : Z_DEFAULT_STRATEGY) != Z_OK)
167 : {
168 UNC 0 : wwmethod->lasterrno = errno;
169 LBC 0 : gzclose(gzfp);
170 UIC 0 : return NULL;
171 EUB : }
172 : }
173 : #endif
174 : #ifdef USE_LZ4
175 GNC 139 : if (wwmethod->compression_algorithm == PG_COMPRESSION_LZ4)
176 ECB : {
177 : size_t ctx_out;
178 : size_t header_size;
179 EUB : LZ4F_preferences_t prefs;
180 :
181 GBC 2 : ctx_out = LZ4F_createCompressionContext(&ctx, LZ4F_VERSION);
182 GIC 2 : if (LZ4F_isError(ctx_out))
183 : {
184 UNC 0 : wwmethod->lasterrstring = LZ4F_getErrorName(ctx_out);
185 UIC 0 : close(fd);
186 LBC 0 : return NULL;
187 : }
188 :
189 GIC 2 : lz4bufsize = LZ4F_compressBound(LZ4_IN_SIZE, NULL);
190 2 : lz4buf = pg_malloc0(lz4bufsize);
191 :
192 ECB : /* assign the compression level, default is 0 */
193 CBC 2 : memset(&prefs, 0, sizeof(prefs));
194 GNC 2 : prefs.compressionLevel = wwmethod->compression_level;
195 EUB :
196 : /* add the header */
197 GBC 2 : header_size = LZ4F_compressBegin(ctx, lz4buf, lz4bufsize, &prefs);
198 GIC 2 : if (LZ4F_isError(header_size))
199 : {
200 UNC 0 : wwmethod->lasterrstring = LZ4F_getErrorName(header_size);
201 LBC 0 : (void) LZ4F_freeCompressionContext(ctx);
202 UIC 0 : pg_free(lz4buf);
203 0 : close(fd);
204 LBC 0 : return NULL;
205 ECB : }
206 :
207 GIC 2 : errno = 0;
208 CBC 2 : if (write(fd, lz4buf, header_size) != header_size)
209 ECB : {
210 : /* If write didn't set errno, assume problem is no disk space */
211 UNC 0 : wwmethod->lasterrno = errno ? errno : ENOSPC;
212 UBC 0 : (void) LZ4F_freeCompressionContext(ctx);
213 0 : pg_free(lz4buf);
214 0 : close(fd);
215 0 : return NULL;
216 : }
217 : }
218 ECB : #endif
219 :
220 : /* Do pre-padding on non-compressed files */
221 GNC 139 : if (pad_to_size && wwmethod->compression_algorithm == PG_COMPRESSION_NONE)
222 EUB : {
223 : ssize_t rc;
224 :
225 GNC 94 : rc = pg_pwrite_zeros(fd, pad_to_size, 0);
226 :
227 94 : if (rc < 0)
228 : {
229 UNC 0 : wwmethod->lasterrno = errno;
230 0 : close(fd);
231 0 : return NULL;
232 : }
233 ECB :
234 : /*
235 : * pg_pwrite() (called via pg_pwrite_zeros()) may have moved the file
236 : * position, so reset it (see win32pwrite.c).
237 : */
238 GIC 94 : if (lseek(fd, 0, SEEK_SET) != 0)
239 EUB : {
240 UNC 0 : wwmethod->lasterrno = errno;
241 UBC 0 : close(fd);
242 UIC 0 : return NULL;
243 : }
244 : }
245 :
246 : /*
247 : * fsync WAL file and containing directory, to ensure the file is
248 ECB : * persistently created and zeroed (if padded). That's particularly
249 : * important when using synchronous mode, where the file is modified and
250 EUB : * fsynced in-place, without a directory fsync.
251 : */
252 GNC 139 : if (wwmethod->sync)
253 : {
254 GIC 14 : if (fsync_fname(tmppath, false) != 0 ||
255 7 : fsync_parent_path(tmppath) != 0)
256 : {
257 UNC 0 : wwmethod->lasterrno = errno;
258 : #ifdef HAVE_LIBZ
259 0 : if (wwmethod->compression_algorithm == PG_COMPRESSION_GZIP)
260 UIC 0 : gzclose(gzfp);
261 : else
262 ECB : #endif
263 : #ifdef USE_LZ4
264 UNC 0 : if (wwmethod->compression_algorithm == PG_COMPRESSION_LZ4)
265 ECB : {
266 UIC 0 : (void) LZ4F_compressEnd(ctx, lz4buf, lz4bufsize, NULL);
267 UBC 0 : (void) LZ4F_freeCompressionContext(ctx);
268 UIC 0 : pg_free(lz4buf);
269 UBC 0 : close(fd);
270 EUB : }
271 : else
272 : #endif
273 UIC 0 : close(fd);
274 UBC 0 : return NULL;
275 : }
276 EUB : }
277 :
278 GBC 139 : f = pg_malloc0(sizeof(DirectoryMethodFile));
279 EUB : #ifdef HAVE_LIBZ
280 GNC 139 : if (wwmethod->compression_algorithm == PG_COMPRESSION_GZIP)
281 GIC 2 : f->gzfp = gzfp;
282 : #endif
283 EUB : #ifdef USE_LZ4
284 GNC 139 : if (wwmethod->compression_algorithm == PG_COMPRESSION_LZ4)
285 : {
286 GIC 2 : f->ctx = ctx;
287 2 : f->lz4buf = lz4buf;
288 CBC 2 : f->lz4bufsize = lz4bufsize;
289 : }
290 ECB : #endif
291 :
292 GNC 139 : f->base.wwmethod = wwmethod;
293 139 : f->base.currpos = 0;
294 139 : f->base.pathname = pg_strdup(pathname);
295 GIC 139 : f->fd = fd;
296 139 : f->fullpath = pg_strdup(tmppath);
297 CBC 139 : if (temp_suffix)
298 14 : f->temp_suffix = pg_strdup(temp_suffix);
299 ECB :
300 GNC 139 : return &f->base;
301 : }
302 :
303 ECB : static ssize_t
304 GNC 6281 : dir_write(Walfile *f, const void *buf, size_t count)
305 ECB : {
306 : ssize_t r;
307 CBC 6281 : DirectoryMethodFile *df = (DirectoryMethodFile *) f;
308 ECB :
309 CBC 6281 : Assert(f != NULL);
310 GNC 6281 : clear_error(f->wwmethod);
311 ECB :
312 : #ifdef HAVE_LIBZ
313 GNC 6281 : if (f->wwmethod->compression_algorithm == PG_COMPRESSION_GZIP)
314 : {
315 CBC 9 : errno = 0;
316 GIC 9 : r = (ssize_t) gzwrite(df->gzfp, buf, count);
317 9 : if (r != count)
318 ECB : {
319 : /* If write didn't set errno, assume problem is no disk space */
320 UNC 0 : f->wwmethod->lasterrno = errno ? errno : ENOSPC;
321 ECB : }
322 : }
323 : else
324 : #endif
325 : #ifdef USE_LZ4
326 GNC 6272 : if (f->wwmethod->compression_algorithm == PG_COMPRESSION_LZ4)
327 ECB : {
328 : size_t chunk;
329 : size_t remaining;
330 GIC 9 : const void *inbuf = buf;
331 EUB :
332 GIC 9 : remaining = count;
333 266 : while (remaining > 0)
334 : {
335 : size_t compressed;
336 :
337 CBC 257 : if (remaining > LZ4_IN_SIZE)
338 GIC 248 : chunk = LZ4_IN_SIZE;
339 : else
340 9 : chunk = remaining;
341 ECB :
342 GIC 257 : remaining -= chunk;
343 CBC 257 : compressed = LZ4F_compressUpdate(df->ctx,
344 ECB : df->lz4buf, df->lz4bufsize,
345 : inbuf, chunk,
346 : NULL);
347 :
348 CBC 257 : if (LZ4F_isError(compressed))
349 ECB : {
350 UNC 0 : f->wwmethod->lasterrstring = LZ4F_getErrorName(compressed);
351 LBC 0 : return -1;
352 : }
353 ECB :
354 CBC 257 : errno = 0;
355 GIC 257 : if (write(df->fd, df->lz4buf, compressed) != compressed)
356 : {
357 : /* If write didn't set errno, assume problem is no disk space */
358 UNC 0 : f->wwmethod->lasterrno = errno ? errno : ENOSPC;
359 LBC 0 : return -1;
360 : }
361 EUB :
362 GBC 257 : inbuf = ((char *) inbuf) + chunk;
363 : }
364 :
365 ECB : /* Our caller keeps track of the uncompressed size. */
366 CBC 9 : r = (ssize_t) count;
367 : }
368 : else
369 EUB : #endif
370 : {
371 GIC 6263 : errno = 0;
372 6263 : r = write(df->fd, buf, count);
373 CBC 6263 : if (r != count)
374 : {
375 : /* If write didn't set errno, assume problem is no disk space */
376 UNC 0 : f->wwmethod->lasterrno = errno ? errno : ENOSPC;
377 ECB : }
378 : }
379 GIC 6281 : if (r > 0)
380 GNC 6281 : df->base.currpos += r;
381 GIC 6281 : return r;
382 ECB : }
383 :
384 : static int
385 GNC 139 : dir_close(Walfile *f, WalCloseMethod method)
386 ECB : {
387 : int r;
388 GIC 139 : DirectoryMethodFile *df = (DirectoryMethodFile *) f;
389 GNC 139 : DirectoryMethodData *dir_data = (DirectoryMethodData *) f->wwmethod;
390 ECB : char tmppath[MAXPGPATH];
391 : char tmppath2[MAXPGPATH];
392 :
393 GIC 139 : Assert(f != NULL);
394 GNC 139 : clear_error(f->wwmethod);
395 ECB :
396 : #ifdef HAVE_LIBZ
397 GNC 139 : if (f->wwmethod->compression_algorithm == PG_COMPRESSION_GZIP)
398 : {
399 CBC 2 : errno = 0; /* in case gzclose() doesn't set it */
400 GIC 2 : r = gzclose(df->gzfp);
401 ECB : }
402 : else
403 : #endif
404 : #ifdef USE_LZ4
405 GNC 137 : if (f->wwmethod->compression_algorithm == PG_COMPRESSION_LZ4)
406 : {
407 ECB : size_t compressed;
408 :
409 GIC 2 : compressed = LZ4F_compressEnd(df->ctx,
410 : df->lz4buf, df->lz4bufsize,
411 ECB : NULL);
412 :
413 GIC 2 : if (LZ4F_isError(compressed))
414 : {
415 UNC 0 : f->wwmethod->lasterrstring = LZ4F_getErrorName(compressed);
416 UIC 0 : return -1;
417 EUB : }
418 :
419 GIC 2 : errno = 0;
420 2 : if (write(df->fd, df->lz4buf, compressed) != compressed)
421 ECB : {
422 : /* If write didn't set errno, assume problem is no disk space */
423 UNC 0 : f->wwmethod->lasterrno = errno ? errno : ENOSPC;
424 UIC 0 : return -1;
425 EUB : }
426 :
427 GIC 2 : r = close(df->fd);
428 : }
429 ECB : else
430 : #endif
431 GIC 135 : r = close(df->fd);
432 :
433 CBC 139 : if (r == 0)
434 : {
435 ECB : /* Build path to the current version of the file */
436 GIC 139 : if (method == CLOSE_NORMAL && df->temp_suffix)
437 8 : {
438 ECB : char *filename;
439 : char *filename2;
440 :
441 : /*
442 : * If we have a temp prefix, normal operation is to rename the
443 : * file.
444 : */
445 GNC 8 : filename = dir_get_file_name(f->wwmethod, df->base.pathname,
446 8 : df->temp_suffix);
447 GIC 8 : snprintf(tmppath, sizeof(tmppath), "%s/%s",
448 ECB : dir_data->basedir, filename);
449 CBC 8 : pg_free(filename);
450 ECB :
451 : /* permanent name, so no need for the prefix */
452 GNC 8 : filename2 = dir_get_file_name(f->wwmethod, df->base.pathname, NULL);
453 GIC 8 : snprintf(tmppath2, sizeof(tmppath2), "%s/%s",
454 : dir_data->basedir, filename2);
455 CBC 8 : pg_free(filename2);
456 GNC 8 : if (f->wwmethod->sync)
457 GIC 3 : r = durable_rename(tmppath, tmppath2);
458 ECB : else
459 : {
460 CBC 5 : if (rename(tmppath, tmppath2) != 0)
461 : {
462 UIC 0 : pg_log_error("could not rename file \"%s\" to \"%s\": %m",
463 ECB : tmppath, tmppath2);
464 UIC 0 : r = -1;
465 EUB : }
466 : }
467 : }
468 GIC 131 : else if (method == CLOSE_UNLINK)
469 : {
470 : char *filename;
471 ECB :
472 : /* Unlink the file once it's closed */
473 UNC 0 : filename = dir_get_file_name(f->wwmethod, df->base.pathname,
474 0 : df->temp_suffix);
475 UIC 0 : snprintf(tmppath, sizeof(tmppath), "%s/%s",
476 : dir_data->basedir, filename);
477 UBC 0 : pg_free(filename);
478 0 : r = unlink(tmppath);
479 EUB : }
480 : else
481 : {
482 : /*
483 : * Else either CLOSE_NORMAL and no temp suffix, or
484 : * CLOSE_NO_RENAME. In this case, fsync the file and containing
485 : * directory if sync mode is requested.
486 : */
487 GNC 131 : if (f->wwmethod->sync)
488 : {
489 GIC 4 : r = fsync_fname(df->fullpath, false);
490 4 : if (r == 0)
491 CBC 4 : r = fsync_parent_path(df->fullpath);
492 : }
493 ECB : }
494 : }
495 :
496 GIC 139 : if (r != 0)
497 UNC 0 : f->wwmethod->lasterrno = errno;
498 :
499 : #ifdef USE_LZ4
500 CBC 139 : pg_free(df->lz4buf);
501 EUB : /* supports free on NULL */
502 GIC 139 : LZ4F_freeCompressionContext(df->ctx);
503 : #endif
504 ECB :
505 GNC 139 : pg_free(df->base.pathname);
506 CBC 139 : pg_free(df->fullpath);
507 GNC 139 : pg_free(df->temp_suffix);
508 CBC 139 : pg_free(df);
509 ECB :
510 CBC 139 : return r;
511 ECB : }
512 :
513 : static int
514 UNC 0 : dir_sync(Walfile *f)
515 : {
516 : int r;
517 EUB :
518 UIC 0 : Assert(f != NULL);
519 UNC 0 : clear_error(f->wwmethod);
520 :
521 0 : if (!f->wwmethod->sync)
522 UBC 0 : return 0;
523 :
524 EUB : #ifdef HAVE_LIBZ
525 UNC 0 : if (f->wwmethod->compression_algorithm == PG_COMPRESSION_GZIP)
526 : {
527 UIC 0 : if (gzflush(((DirectoryMethodFile *) f)->gzfp, Z_SYNC_FLUSH) != Z_OK)
528 EUB : {
529 UNC 0 : f->wwmethod->lasterrno = errno;
530 UBC 0 : return -1;
531 : }
532 EUB : }
533 : #endif
534 : #ifdef USE_LZ4
535 UNC 0 : if (f->wwmethod->compression_algorithm == PG_COMPRESSION_LZ4)
536 : {
537 UIC 0 : DirectoryMethodFile *df = (DirectoryMethodFile *) f;
538 EUB : size_t compressed;
539 :
540 : /* Flush any internal buffers */
541 UIC 0 : compressed = LZ4F_flush(df->ctx, df->lz4buf, df->lz4bufsize, NULL);
542 0 : if (LZ4F_isError(compressed))
543 : {
544 UNC 0 : f->wwmethod->lasterrstring = LZ4F_getErrorName(compressed);
545 UBC 0 : return -1;
546 : }
547 EUB :
548 UBC 0 : errno = 0;
549 UIC 0 : if (write(df->fd, df->lz4buf, compressed) != compressed)
550 : {
551 EUB : /* If write didn't set errno, assume problem is no disk space */
552 UNC 0 : f->wwmethod->lasterrno = errno ? errno : ENOSPC;
553 UIC 0 : return -1;
554 : }
555 EUB : }
556 : #endif
557 :
558 UIC 0 : r = fsync(((DirectoryMethodFile *) f)->fd);
559 0 : if (r < 0)
560 UNC 0 : f->wwmethod->lasterrno = errno;
561 UBC 0 : return r;
562 EUB : }
563 :
564 : static ssize_t
565 UNC 0 : dir_get_file_size(WalWriteMethod *wwmethod, const char *pathname)
566 : {
567 0 : DirectoryMethodData *dir_data = (DirectoryMethodData *) wwmethod;
568 : struct stat statbuf;
569 EUB : char tmppath[MAXPGPATH];
570 :
571 UBC 0 : snprintf(tmppath, sizeof(tmppath), "%s/%s",
572 : dir_data->basedir, pathname);
573 :
574 UIC 0 : if (stat(tmppath, &statbuf) != 0)
575 EUB : {
576 UNC 0 : wwmethod->lasterrno = errno;
577 UIC 0 : return -1;
578 EUB : }
579 :
580 UBC 0 : return statbuf.st_size;
581 EUB : }
582 :
583 : static bool
584 GNC 96 : dir_existsfile(WalWriteMethod *wwmethod, const char *pathname)
585 : {
586 96 : DirectoryMethodData *dir_data = (DirectoryMethodData *) wwmethod;
587 : char tmppath[MAXPGPATH];
588 : int fd;
589 ECB :
590 GNC 96 : clear_error(wwmethod);
591 ECB :
592 GIC 96 : snprintf(tmppath, sizeof(tmppath), "%s/%s",
593 : dir_data->basedir, pathname);
594 ECB :
595 CBC 96 : fd = open(tmppath, O_RDONLY | PG_BINARY, 0);
596 96 : if (fd < 0)
597 GBC 96 : return false;
598 UBC 0 : close(fd);
599 UIC 0 : return true;
600 : }
601 :
602 ECB : static bool
603 GNC 90 : dir_finish(WalWriteMethod *wwmethod)
604 ECB : {
605 GNC 90 : clear_error(wwmethod);
606 ECB :
607 GNC 90 : if (wwmethod->sync)
608 ECB : {
609 GNC 4 : DirectoryMethodData *dir_data = (DirectoryMethodData *) wwmethod;
610 :
611 : /*
612 : * Files are fsynced when they are closed, but we need to fsync the
613 : * directory entry here as well.
614 : */
615 GIC 4 : if (fsync_fname(dir_data->basedir, true) != 0)
616 ECB : {
617 UNC 0 : wwmethod->lasterrno = errno;
618 UBC 0 : return false;
619 EUB : }
620 : }
621 GIC 90 : return true;
622 ECB : }
623 :
624 : static void
625 GNC 90 : dir_free(WalWriteMethod *wwmethod)
626 : {
627 90 : DirectoryMethodData *dir_data = (DirectoryMethodData *) wwmethod;
628 :
629 90 : pg_free(dir_data->basedir);
630 90 : pg_free(wwmethod);
631 90 : }
632 :
633 :
634 : WalWriteMethod *
635 CBC 93 : CreateWalDirectoryMethod(const char *basedir,
636 : pg_compress_algorithm compression_algorithm,
637 ECB : int compression_level, bool sync)
638 : {
639 : DirectoryMethodData *wwmethod;
640 :
641 GNC 93 : wwmethod = pg_malloc0(sizeof(DirectoryMethodData));
642 93 : *((const WalWriteMethodOps **) &wwmethod->base.ops) =
643 : &WalDirectoryMethodOps;
644 93 : wwmethod->base.compression_algorithm = compression_algorithm;
645 93 : wwmethod->base.compression_level = compression_level;
646 93 : wwmethod->base.sync = sync;
647 93 : clear_error(&wwmethod->base);
648 93 : wwmethod->basedir = pg_strdup(basedir);
649 :
650 93 : return &wwmethod->base;
651 : }
652 :
653 :
654 : /*-------------------------------------------------------------------------
655 : * WalTarMethod - write wal to a tar file containing pg_wal contents
656 : *-------------------------------------------------------------------------
657 : */
658 :
659 : static Walfile *tar_open_for_write(WalWriteMethod *wwmethod,
660 : const char *pathname,
661 : const char *temp_suffix,
662 : size_t pad_to_size);
663 : static int tar_close(Walfile *f, WalCloseMethod method);
664 : static bool tar_existsfile(WalWriteMethod *wwmethod, const char *pathname);
665 : static ssize_t tar_get_file_size(WalWriteMethod *wwmethod,
666 : const char *pathname);
667 : static char *tar_get_file_name(WalWriteMethod *wwmethod,
668 : const char *pathname, const char *temp_suffix);
669 : static ssize_t tar_write(Walfile *f, const void *buf, size_t count);
670 : static int tar_sync(Walfile *f);
671 : static bool tar_finish(WalWriteMethod *wwmethod);
672 : static void tar_free(WalWriteMethod *wwmethod);
673 :
674 : const WalWriteMethodOps WalTarMethodOps = {
675 : .open_for_write = tar_open_for_write,
676 : .close = tar_close,
677 : .existsfile = tar_existsfile,
678 : .get_file_size = tar_get_file_size,
679 : .get_file_name = tar_get_file_name,
680 : .write = tar_write,
681 : .sync = tar_sync,
682 : .finish = tar_finish,
683 : .free = tar_free
684 : };
685 :
686 : typedef struct TarMethodFile
687 : {
688 : Walfile base;
689 : off_t ofs_start; /* Where does the *header* for this file start */
690 : char header[TAR_BLOCK_SIZE];
691 : size_t pad_to_size;
692 : } TarMethodFile;
693 :
694 : typedef struct TarMethodData
695 : {
696 : WalWriteMethod base;
697 : char *tarfilename;
698 : int fd;
699 : TarMethodFile *currentfile;
700 : #ifdef HAVE_LIBZ
701 : z_streamp zp;
702 : void *zlibOut;
703 : #endif
704 : } TarMethodData;
705 :
706 : #ifdef HAVE_LIBZ
707 : static bool
708 5607 : tar_write_compressed_data(TarMethodData *tar_data, void *buf, size_t count,
709 : bool flush)
710 : {
711 GIC 5607 : tar_data->zp->next_in = buf;
712 CBC 5607 : tar_data->zp->avail_in = count;
713 ECB :
714 GIC 11228 : while (tar_data->zp->avail_in || flush)
715 ECB : {
716 : int r;
717 :
718 GIC 5633 : r = deflate(tar_data->zp, flush ? Z_FINISH : Z_NO_FLUSH);
719 CBC 5633 : if (r == Z_STREAM_ERROR)
720 ECB : {
721 UNC 0 : tar_data->base.lasterrstring = "could not compress data";
722 UBC 0 : return false;
723 EUB : }
724 :
725 GIC 5633 : if (tar_data->zp->avail_out < ZLIB_OUT_SIZE)
726 ECB : {
727 GIC 62 : size_t len = ZLIB_OUT_SIZE - tar_data->zp->avail_out;
728 ECB :
729 GIC 62 : errno = 0;
730 CBC 62 : if (write(tar_data->fd, tar_data->zlibOut, len) != len)
731 ECB : {
732 : /* If write didn't set errno, assume problem is no disk space */
733 UNC 0 : tar_data->base.lasterrno = errno ? errno : ENOSPC;
734 UBC 0 : return false;
735 EUB : }
736 :
737 GIC 62 : tar_data->zp->next_out = tar_data->zlibOut;
738 CBC 62 : tar_data->zp->avail_out = ZLIB_OUT_SIZE;
739 ECB : }
740 :
741 GIC 5633 : if (r == Z_STREAM_END)
742 CBC 12 : break;
743 ECB : }
744 :
745 GIC 5607 : if (flush)
746 ECB : {
747 : /* Reset the stream for writing */
748 GIC 12 : if (deflateReset(tar_data->zp) != Z_OK)
749 ECB : {
750 UNC 0 : tar_data->base.lasterrstring = "could not reset compression stream";
751 UBC 0 : return false;
752 EUB : }
753 : }
754 :
755 GIC 5607 : return true;
756 ECB : }
757 : #endif
758 :
759 : static ssize_t
760 GNC 14127 : tar_write(Walfile *f, const void *buf, size_t count)
761 ECB : {
762 GNC 14127 : TarMethodData *tar_data = (TarMethodData *) f->wwmethod;
763 : ssize_t r;
764 ECB :
765 GIC 14127 : Assert(f != NULL);
766 GNC 14127 : clear_error(f->wwmethod);
767 ECB :
768 : /* Tarfile will always be positioned at the end */
769 GNC 14127 : if (f->wwmethod->compression_algorithm == PG_COMPRESSION_NONE)
770 : {
771 CBC 8535 : errno = 0;
772 GIC 8535 : r = write(tar_data->fd, buf, count);
773 CBC 8535 : if (r != count)
774 ECB : {
775 : /* If write didn't set errno, assume problem is no disk space */
776 UNC 0 : f->wwmethod->lasterrno = errno ? errno : ENOSPC;
777 UIC 0 : return -1;
778 EUB : }
779 GNC 8535 : f->currpos += r;
780 GIC 8535 : return r;
781 ECB : }
782 : #ifdef HAVE_LIBZ
783 GNC 5592 : else if (f->wwmethod->compression_algorithm == PG_COMPRESSION_GZIP)
784 : {
785 5592 : if (!tar_write_compressed_data(tar_data, unconstify(void *, buf),
786 : count, false))
787 UIC 0 : return -1;
788 GNC 5592 : f->currpos += count;
789 GIC 5592 : return count;
790 EUB : }
791 ECB : #endif
792 : else
793 : {
794 : /* Can't happen - compression enabled with no method set */
795 UNC 0 : f->wwmethod->lasterrno = ENOSYS;
796 UIC 0 : return -1;
797 : }
798 EUB : }
799 :
800 : static bool
801 GIC 7 : tar_write_padding_data(TarMethodFile *f, size_t bytes)
802 : {
803 : PGAlignedXLogBlock zerobuf;
804 CBC 7 : size_t bytesleft = bytes;
805 :
806 GIC 7 : memset(zerobuf.data, 0, XLOG_BLCKSZ);
807 CBC 13751 : while (bytesleft)
808 : {
809 13744 : size_t bytestowrite = Min(bytesleft, XLOG_BLCKSZ);
810 GNC 13744 : ssize_t r = tar_write(&f->base, zerobuf.data, bytestowrite);
811 :
812 CBC 13744 : if (r < 0)
813 LBC 0 : return false;
814 GIC 13744 : bytesleft -= r;
815 ECB : }
816 EUB :
817 CBC 7 : return true;
818 : }
819 :
820 ECB : static char *
821 GNC 23 : tar_get_file_name(WalWriteMethod *wwmethod, const char *pathname,
822 : const char *temp_suffix)
823 : {
824 GIC 23 : char *filename = pg_malloc0(MAXPGPATH * sizeof(char));
825 ECB :
826 GIC 23 : snprintf(filename, MAXPGPATH, "%s%s",
827 : pathname, temp_suffix ? temp_suffix : "");
828 ECB :
829 GIC 23 : return filename;
830 ECB : }
831 :
832 : static Walfile *
833 GNC 9 : tar_open_for_write(WalWriteMethod *wwmethod, const char *pathname,
834 : const char *temp_suffix, size_t pad_to_size)
835 : {
836 9 : TarMethodData *tar_data = (TarMethodData *) wwmethod;
837 : char *tmppath;
838 :
839 9 : clear_error(wwmethod);
840 :
841 GIC 9 : if (tar_data->fd < 0)
842 ECB : {
843 : /*
844 : * We open the tar file only when we first try to write to it.
845 : */
846 GIC 7 : tar_data->fd = open(tar_data->tarfilename,
847 ECB : O_WRONLY | O_CREAT | PG_BINARY,
848 : pg_file_create_mode);
849 GIC 7 : if (tar_data->fd < 0)
850 : {
851 UNC 0 : wwmethod->lasterrno = errno;
852 LBC 0 : return NULL;
853 : }
854 :
855 ECB : #ifdef HAVE_LIBZ
856 GNC 7 : if (wwmethod->compression_algorithm == PG_COMPRESSION_GZIP)
857 EUB : {
858 GBC 3 : tar_data->zp = (z_streamp) pg_malloc(sizeof(z_stream));
859 GIC 3 : tar_data->zp->zalloc = Z_NULL;
860 3 : tar_data->zp->zfree = Z_NULL;
861 3 : tar_data->zp->opaque = Z_NULL;
862 CBC 3 : tar_data->zp->next_out = tar_data->zlibOut;
863 GIC 3 : tar_data->zp->avail_out = ZLIB_OUT_SIZE;
864 ECB :
865 : /*
866 : * Initialize deflation library. Adding the magic value 16 to the
867 : * default 15 for the windowBits parameter makes the output be
868 : * gzip instead of zlib.
869 : */
870 GNC 3 : if (deflateInit2(tar_data->zp, wwmethod->compression_level,
871 : Z_DEFLATED, 15 + 16, 8, Z_DEFAULT_STRATEGY) != Z_OK)
872 : {
873 UIC 0 : pg_free(tar_data->zp);
874 0 : tar_data->zp = NULL;
875 UNC 0 : wwmethod->lasterrstring =
876 : "could not initialize compression library";
877 LBC 0 : return NULL;
878 : }
879 : }
880 EUB : #endif
881 :
882 : /* There's no tar header itself, the file starts with regular files */
883 : }
884 :
885 GIC 9 : if (tar_data->currentfile != NULL)
886 : {
887 UNC 0 : wwmethod->lasterrstring =
888 : "implementation error: tar files can't have more than one open file";
889 UIC 0 : return NULL;
890 : }
891 :
892 GIC 9 : tar_data->currentfile = pg_malloc0(sizeof(TarMethodFile));
893 GNC 9 : tar_data->currentfile->base.wwmethod = wwmethod;
894 ECB :
895 GNC 9 : tmppath = tar_get_file_name(wwmethod, pathname, temp_suffix);
896 EUB :
897 : /* Create a header with size set to 0 - we will fill out the size on close */
898 GBC 9 : if (tarCreateHeader(tar_data->currentfile->header, tmppath, NULL, 0, S_IRUSR | S_IWUSR, 0, 0, time(NULL)) != TAR_OK)
899 : {
900 UIC 0 : pg_free(tar_data->currentfile);
901 LBC 0 : pg_free(tmppath);
902 0 : tar_data->currentfile = NULL;
903 UNC 0 : wwmethod->lasterrstring = "could not create tar header";
904 LBC 0 : return NULL;
905 : }
906 :
907 CBC 9 : pg_free(tmppath);
908 :
909 EUB : #ifdef HAVE_LIBZ
910 GNC 9 : if (wwmethod->compression_algorithm == PG_COMPRESSION_GZIP)
911 EUB : {
912 : /* Flush existing data */
913 GNC 3 : if (!tar_write_compressed_data(tar_data, NULL, 0, true))
914 UIC 0 : return NULL;
915 :
916 ECB : /* Turn off compression for header */
917 GIC 3 : if (deflateParams(tar_data->zp, 0, Z_DEFAULT_STRATEGY) != Z_OK)
918 : {
919 UNC 0 : wwmethod->lasterrstring =
920 : "could not change compression parameters";
921 UIC 0 : return NULL;
922 : }
923 ECB : }
924 EUB : #endif
925 :
926 GIC 9 : tar_data->currentfile->ofs_start = lseek(tar_data->fd, 0, SEEK_CUR);
927 CBC 9 : if (tar_data->currentfile->ofs_start == -1)
928 : {
929 UNC 0 : wwmethod->lasterrno = errno;
930 UIC 0 : pg_free(tar_data->currentfile);
931 UBC 0 : tar_data->currentfile = NULL;
932 UIC 0 : return NULL;
933 : }
934 GNC 9 : tar_data->currentfile->base.currpos = 0;
935 :
936 9 : if (wwmethod->compression_algorithm == PG_COMPRESSION_NONE)
937 ECB : {
938 GIC 6 : errno = 0;
939 GBC 6 : if (write(tar_data->fd, tar_data->currentfile->header,
940 EUB : TAR_BLOCK_SIZE) != TAR_BLOCK_SIZE)
941 : {
942 : /* If write didn't set errno, assume problem is no disk space */
943 UNC 0 : wwmethod->lasterrno = errno ? errno : ENOSPC;
944 LBC 0 : pg_free(tar_data->currentfile);
945 UIC 0 : tar_data->currentfile = NULL;
946 LBC 0 : return NULL;
947 : }
948 ECB : }
949 : #ifdef HAVE_LIBZ
950 GNC 3 : else if (wwmethod->compression_algorithm == PG_COMPRESSION_GZIP)
951 : {
952 : /* Write header through the zlib APIs but with no compression */
953 3 : if (!tar_write_compressed_data(tar_data, tar_data->currentfile->header,
954 EUB : TAR_BLOCK_SIZE, true))
955 UBC 0 : return NULL;
956 EUB :
957 : /* Re-enable compression for the rest of the file */
958 GNC 3 : if (deflateParams(tar_data->zp, wwmethod->compression_level,
959 : Z_DEFAULT_STRATEGY) != Z_OK)
960 ECB : {
961 UNC 0 : wwmethod->lasterrstring = "could not change compression parameters";
962 UIC 0 : return NULL;
963 ECB : }
964 : }
965 EUB : #endif
966 : else
967 : {
968 ECB : /* not reachable */
969 UIC 0 : Assert(false);
970 : }
971 EUB :
972 GNC 9 : tar_data->currentfile->base.pathname = pg_strdup(pathname);
973 :
974 : /*
975 : * Uncompressed files are padded on creation, but for compression we can't
976 : * do that
977 : */
978 GIC 9 : if (pad_to_size)
979 EUB : {
980 GIC 7 : tar_data->currentfile->pad_to_size = pad_to_size;
981 GNC 7 : if (wwmethod->compression_algorithm == PG_COMPRESSION_NONE)
982 ECB : {
983 : /* Uncompressed, so pad now */
984 GIC 4 : if (!tar_write_padding_data(tar_data->currentfile, pad_to_size))
985 UIC 0 : return NULL;
986 : /* Seek back to start */
987 GIC 8 : if (lseek(tar_data->fd,
988 CBC 4 : tar_data->currentfile->ofs_start + TAR_BLOCK_SIZE,
989 GIC 4 : SEEK_SET) != tar_data->currentfile->ofs_start + TAR_BLOCK_SIZE)
990 ECB : {
991 UNC 0 : wwmethod->lasterrno = errno;
992 UIC 0 : return NULL;
993 : }
994 ECB :
995 GNC 4 : tar_data->currentfile->base.currpos = 0;
996 : }
997 ECB : }
998 :
999 GNC 9 : return &tar_data->currentfile->base;
1000 : }
1001 EUB :
1002 : static ssize_t
1003 UNC 0 : tar_get_file_size(WalWriteMethod *wwmethod, const char *pathname)
1004 : {
1005 0 : clear_error(wwmethod);
1006 :
1007 : /* Currently not used, so not supported */
1008 0 : wwmethod->lasterrno = ENOSYS;
1009 LBC 0 : return -1;
1010 : }
1011 :
1012 : static int
1013 GNC 9 : tar_sync(Walfile *f)
1014 ECB : {
1015 GNC 9 : TarMethodData *tar_data = (TarMethodData *) f->wwmethod;
1016 : int r;
1017 ECB :
1018 CBC 9 : Assert(f != NULL);
1019 GNC 9 : clear_error(f->wwmethod);
1020 :
1021 9 : if (!f->wwmethod->sync)
1022 GIC 9 : return 0;
1023 :
1024 EUB : /*
1025 : * Always sync the whole tarfile, because that's all we can do. This makes
1026 : * no sense on compressed files, so just ignore those.
1027 : */
1028 UNC 0 : if (f->wwmethod->compression_algorithm != PG_COMPRESSION_NONE)
1029 UBC 0 : return 0;
1030 EUB :
1031 UIC 0 : r = fsync(tar_data->fd);
1032 0 : if (r < 0)
1033 UNC 0 : f->wwmethod->lasterrno = errno;
1034 LBC 0 : return r;
1035 : }
1036 :
1037 : static int
1038 GNC 9 : tar_close(Walfile *f, WalCloseMethod method)
1039 ECB : {
1040 : ssize_t filesize;
1041 : int padding;
1042 GNC 9 : TarMethodData *tar_data = (TarMethodData *) f->wwmethod;
1043 CBC 9 : TarMethodFile *tf = (TarMethodFile *) f;
1044 :
1045 9 : Assert(f != NULL);
1046 GNC 9 : clear_error(f->wwmethod);
1047 EUB :
1048 GIC 9 : if (method == CLOSE_UNLINK)
1049 EUB : {
1050 UNC 0 : if (f->wwmethod->compression_algorithm != PG_COMPRESSION_NONE)
1051 : {
1052 0 : f->wwmethod->lasterrstring = "unlink not supported with compression";
1053 UIC 0 : return -1;
1054 : }
1055 :
1056 : /*
1057 : * Unlink the file that we just wrote to the tar. We do this by
1058 EUB : * truncating it to the start of the header. This is safe as we only
1059 : * allow writing of the very last file.
1060 : */
1061 UBC 0 : if (ftruncate(tar_data->fd, tf->ofs_start) != 0)
1062 : {
1063 UNC 0 : f->wwmethod->lasterrno = errno;
1064 UBC 0 : return -1;
1065 EUB : }
1066 :
1067 UNC 0 : pg_free(tf->base.pathname);
1068 UBC 0 : pg_free(tf);
1069 UIC 0 : tar_data->currentfile = NULL;
1070 :
1071 0 : return 0;
1072 : }
1073 :
1074 : /*
1075 : * Pad the file itself with zeroes if necessary. Note that this is
1076 ECB : * different from the tar format padding -- this is the padding we asked
1077 : * for when the file was opened.
1078 : */
1079 GIC 9 : if (tf->pad_to_size)
1080 : {
1081 GNC 7 : if (f->wwmethod->compression_algorithm == PG_COMPRESSION_GZIP)
1082 : {
1083 : /*
1084 ECB : * A compressed tarfile is padded on close since we cannot know
1085 : * the size of the compressed output until the end.
1086 : */
1087 GNC 3 : size_t sizeleft = tf->pad_to_size - tf->base.currpos;
1088 ECB :
1089 GBC 3 : if (sizeleft)
1090 : {
1091 GIC 3 : if (!tar_write_padding_data(tf, sizeleft))
1092 UIC 0 : return -1;
1093 : }
1094 : }
1095 : else
1096 : {
1097 : /*
1098 ECB : * An uncompressed tarfile was padded on creation, so just adjust
1099 : * the current position as if we seeked to the end.
1100 : */
1101 GNC 4 : tf->base.currpos = tf->pad_to_size;
1102 : }
1103 : }
1104 :
1105 : /*
1106 ECB : * Get the size of the file, and pad out to a multiple of the tar block
1107 : * size.
1108 : */
1109 GNC 9 : filesize = f->currpos;
1110 GBC 9 : padding = tarPaddingBytesRequired(filesize);
1111 GIC 9 : if (padding)
1112 EUB : {
1113 UNC 0 : char zerobuf[TAR_BLOCK_SIZE] = {0};
1114 :
1115 UIC 0 : if (tar_write(f, zerobuf, padding) != padding)
1116 0 : return -1;
1117 ECB : }
1118 :
1119 :
1120 : #ifdef HAVE_LIBZ
1121 GNC 9 : if (f->wwmethod->compression_algorithm == PG_COMPRESSION_GZIP)
1122 : {
1123 : /* Flush the current buffer */
1124 3 : if (!tar_write_compressed_data(tar_data, NULL, 0, true))
1125 UIC 0 : return -1;
1126 : }
1127 : #endif
1128 :
1129 : /*
1130 ECB : * Now go back and update the header with the correct filesize and
1131 : * possibly also renaming the file. We overwrite the entire current header
1132 : * when done, including the checksum.
1133 : */
1134 GIC 9 : print_tar_number(&(tf->header[124]), 12, filesize);
1135 :
1136 9 : if (method == CLOSE_NORMAL)
1137 :
1138 ECB : /*
1139 : * We overwrite it with what it was before if we have no tempname,
1140 : * since we're going to write the buffer anyway.
1141 : */
1142 GNC 9 : strlcpy(&(tf->header[0]), tf->base.pathname, 100);
1143 EUB :
1144 GBC 9 : print_tar_number(&(tf->header[148]), 8, tarChecksum(((TarMethodFile *) f)->header));
1145 GIC 9 : if (lseek(tar_data->fd, tf->ofs_start, SEEK_SET) != ((TarMethodFile *) f)->ofs_start)
1146 ECB : {
1147 UNC 0 : f->wwmethod->lasterrno = errno;
1148 LBC 0 : return -1;
1149 ECB : }
1150 GNC 9 : if (f->wwmethod->compression_algorithm == PG_COMPRESSION_NONE)
1151 : {
1152 GBC 6 : errno = 0;
1153 6 : if (write(tar_data->fd, tf->header, TAR_BLOCK_SIZE) != TAR_BLOCK_SIZE)
1154 : {
1155 : /* If write didn't set errno, assume problem is no disk space */
1156 UNC 0 : f->wwmethod->lasterrno = errno ? errno : ENOSPC;
1157 LBC 0 : return -1;
1158 : }
1159 : }
1160 ECB : #ifdef HAVE_LIBZ
1161 GNC 3 : else if (f->wwmethod->compression_algorithm == PG_COMPRESSION_GZIP)
1162 EUB : {
1163 : /* Turn off compression */
1164 GIC 3 : if (deflateParams(tar_data->zp, 0, Z_DEFAULT_STRATEGY) != Z_OK)
1165 : {
1166 UNC 0 : f->wwmethod->lasterrstring = "could not change compression parameters";
1167 LBC 0 : return -1;
1168 : }
1169 EUB :
1170 : /* Overwrite the header, assuming the size will be the same */
1171 GNC 3 : if (!tar_write_compressed_data(tar_data, tar_data->currentfile->header,
1172 ECB : TAR_BLOCK_SIZE, true))
1173 UIC 0 : return -1;
1174 :
1175 EUB : /* Turn compression back on */
1176 GNC 3 : if (deflateParams(tar_data->zp, f->wwmethod->compression_level,
1177 : Z_DEFAULT_STRATEGY) != Z_OK)
1178 : {
1179 UNC 0 : f->wwmethod->lasterrstring = "could not change compression parameters";
1180 UIC 0 : return -1;
1181 : }
1182 : }
1183 EUB : #endif
1184 : else
1185 : {
1186 : /* not reachable */
1187 LBC 0 : Assert(false);
1188 : }
1189 EUB :
1190 : /* Move file pointer back down to end, so we can write the next file */
1191 GIC 9 : if (lseek(tar_data->fd, 0, SEEK_END) < 0)
1192 : {
1193 UNC 0 : f->wwmethod->lasterrno = errno;
1194 LBC 0 : return -1;
1195 : }
1196 :
1197 EUB : /* Always fsync on close, so the padding gets fsynced */
1198 GIC 9 : if (tar_sync(f) < 0)
1199 : {
1200 : /* XXX this seems pretty bogus; why is only this case fatal? */
1201 UIC 0 : pg_fatal("could not fsync file \"%s\": %s",
1202 : tf->base.pathname, GetLastWalMethodError(f->wwmethod));
1203 ECB : }
1204 :
1205 : /* Clean up and done */
1206 GNC 9 : pg_free(tf->base.pathname);
1207 GIC 9 : pg_free(tf);
1208 9 : tar_data->currentfile = NULL;
1209 :
1210 CBC 9 : return 0;
1211 : }
1212 ECB :
1213 : static bool
1214 GNC 4 : tar_existsfile(WalWriteMethod *wwmethod, const char *pathname)
1215 : {
1216 4 : clear_error(wwmethod);
1217 : /* We only deal with new tarfiles, so nothing externally created exists */
1218 CBC 4 : return false;
1219 : }
1220 ECB :
1221 : static bool
1222 GNC 7 : tar_finish(WalWriteMethod *wwmethod)
1223 ECB : {
1224 GNC 7 : TarMethodData *tar_data = (TarMethodData *) wwmethod;
1225 7 : char zerobuf[1024] = {0};
1226 ECB :
1227 GNC 7 : clear_error(wwmethod);
1228 EUB :
1229 GBC 7 : if (tar_data->currentfile)
1230 : {
1231 UNC 0 : if (tar_close(&tar_data->currentfile->base, CLOSE_NORMAL) != 0)
1232 UIC 0 : return false;
1233 ECB : }
1234 :
1235 : /* A tarfile always ends with two empty blocks */
1236 GNC 7 : if (wwmethod->compression_algorithm == PG_COMPRESSION_NONE)
1237 : {
1238 GBC 4 : errno = 0;
1239 4 : if (write(tar_data->fd, zerobuf, sizeof(zerobuf)) != sizeof(zerobuf))
1240 : {
1241 : /* If write didn't set errno, assume problem is no disk space */
1242 UNC 0 : wwmethod->lasterrno = errno ? errno : ENOSPC;
1243 LBC 0 : return false;
1244 : }
1245 ECB : }
1246 : #ifdef HAVE_LIBZ
1247 GNC 3 : else if (wwmethod->compression_algorithm == PG_COMPRESSION_GZIP)
1248 : {
1249 3 : if (!tar_write_compressed_data(tar_data, zerobuf, sizeof(zerobuf),
1250 : false))
1251 LBC 0 : return false;
1252 ECB :
1253 : /* Also flush all data to make sure the gzip stream is finished */
1254 GBC 3 : tar_data->zp->next_in = NULL;
1255 GIC 3 : tar_data->zp->avail_in = 0;
1256 : while (true)
1257 LBC 0 : {
1258 : int r;
1259 ECB :
1260 GIC 3 : r = deflate(tar_data->zp, Z_FINISH);
1261 EUB :
1262 GBC 3 : if (r == Z_STREAM_ERROR)
1263 : {
1264 UNC 0 : wwmethod->lasterrstring = "could not compress data";
1265 UIC 0 : return false;
1266 ECB : }
1267 GIC 3 : if (tar_data->zp->avail_out < ZLIB_OUT_SIZE)
1268 ECB : {
1269 CBC 3 : size_t len = ZLIB_OUT_SIZE - tar_data->zp->avail_out;
1270 :
1271 GIC 3 : errno = 0;
1272 3 : if (write(tar_data->fd, tar_data->zlibOut, len) != len)
1273 : {
1274 : /*
1275 EUB : * If write didn't set errno, assume problem is no disk
1276 : * space.
1277 : */
1278 UNC 0 : wwmethod->lasterrno = errno ? errno : ENOSPC;
1279 LBC 0 : return false;
1280 ECB : }
1281 : }
1282 GIC 3 : if (r == Z_STREAM_END)
1283 CBC 3 : break;
1284 : }
1285 EUB :
1286 GBC 3 : if (deflateEnd(tar_data->zp) != Z_OK)
1287 : {
1288 UNC 0 : wwmethod->lasterrstring = "could not close compression stream";
1289 UIC 0 : return false;
1290 : }
1291 : }
1292 : #endif
1293 EUB : else
1294 : {
1295 : /* not reachable */
1296 UIC 0 : Assert(false);
1297 ECB : }
1298 :
1299 EUB : /* sync the empty blocks as well, since they're after the last file */
1300 GNC 7 : if (wwmethod->sync)
1301 EUB : {
1302 UBC 0 : if (fsync(tar_data->fd) != 0)
1303 : {
1304 UNC 0 : wwmethod->lasterrno = errno;
1305 UIC 0 : return false;
1306 ECB : }
1307 : }
1308 EUB :
1309 GBC 7 : if (close(tar_data->fd) != 0)
1310 : {
1311 UNC 0 : wwmethod->lasterrno = errno;
1312 LBC 0 : return false;
1313 : }
1314 ECB :
1315 GIC 7 : tar_data->fd = -1;
1316 EUB :
1317 GNC 7 : if (wwmethod->sync)
1318 : {
1319 UBC 0 : if (fsync_fname(tar_data->tarfilename, false) != 0 ||
1320 0 : fsync_parent_path(tar_data->tarfilename) != 0)
1321 : {
1322 UNC 0 : wwmethod->lasterrno = errno;
1323 UIC 0 : return false;
1324 ECB : }
1325 : }
1326 :
1327 GIC 7 : return true;
1328 ECB : }
1329 :
1330 : static void
1331 GNC 7 : tar_free(WalWriteMethod *wwmethod)
1332 : {
1333 7 : TarMethodData *tar_data = (TarMethodData *) wwmethod;
1334 :
1335 7 : pg_free(tar_data->tarfilename);
1336 : #ifdef HAVE_LIBZ
1337 7 : if (wwmethod->compression_algorithm == PG_COMPRESSION_GZIP)
1338 3 : pg_free(tar_data->zlibOut);
1339 : #endif
1340 7 : pg_free(wwmethod);
1341 7 : }
1342 :
1343 ECB : /*
1344 : * The argument compression_algorithm is currently ignored. It is in place for
1345 : * symmetry with CreateWalDirectoryMethod which uses it for distinguishing
1346 : * between the different compression methods. CreateWalTarMethod and its family
1347 : * of functions handle only zlib compression.
1348 : */
1349 : WalWriteMethod *
1350 CBC 7 : CreateWalTarMethod(const char *tarbase,
1351 ECB : pg_compress_algorithm compression_algorithm,
1352 : int compression_level, bool sync)
1353 : {
1354 : TarMethodData *wwmethod;
1355 GIC 7 : const char *suffix = (compression_algorithm == PG_COMPRESSION_GZIP) ?
1356 7 : ".tar.gz" : ".tar";
1357 :
1358 GNC 7 : wwmethod = pg_malloc0(sizeof(TarMethodData));
1359 7 : *((const WalWriteMethodOps **) &wwmethod->base.ops) =
1360 : &WalTarMethodOps;
1361 7 : wwmethod->base.compression_algorithm = compression_algorithm;
1362 7 : wwmethod->base.compression_level = compression_level;
1363 7 : wwmethod->base.sync = sync;
1364 7 : clear_error(&wwmethod->base);
1365 :
1366 7 : wwmethod->tarfilename = pg_malloc0(strlen(tarbase) + strlen(suffix) + 1);
1367 7 : sprintf(wwmethod->tarfilename, "%s%s", tarbase, suffix);
1368 7 : wwmethod->fd = -1;
1369 ECB : #ifdef HAVE_LIBZ
1370 GIC 7 : if (compression_algorithm == PG_COMPRESSION_GZIP)
1371 GNC 3 : wwmethod->zlibOut = (char *) pg_malloc(ZLIB_OUT_SIZE + 1);
1372 ECB : #endif
1373 :
1374 GNC 7 : return &wwmethod->base;
1375 ECB : }
1376 :
1377 : const char *
1378 UNC 0 : GetLastWalMethodError(WalWriteMethod *wwmethod)
1379 EUB : {
1380 UNC 0 : if (wwmethod->lasterrstring)
1381 0 : return wwmethod->lasterrstring;
1382 0 : return strerror(wwmethod->lasterrno);
1383 : }
|