Age Owner TLA Line data Source code
1 : /*-------------------------------------------------------------------------
2 : *
3 : * walmethods.c - implementations of different ways to write received wal
4 : *
5 : * Portions Copyright (c) 1996-2023, PostgreSQL Global Development Group
6 : *
7 : * IDENTIFICATION
8 : * src/bin/pg_basebackup/walmethods.c
9 : *-------------------------------------------------------------------------
10 : */
11 :
12 : #include "postgres_fe.h"
13 :
14 : #include <sys/stat.h>
15 : #include <time.h>
16 : #include <unistd.h>
17 :
18 : #ifdef USE_LZ4
19 : #include <lz4frame.h>
20 : #endif
21 : #ifdef HAVE_LIBZ
22 : #include <zlib.h>
23 : #endif
24 :
25 : #include "common/file_perm.h"
26 : #include "common/file_utils.h"
27 : #include "common/logging.h"
28 : #include "pgtar.h"
29 : #include "receivelog.h"
30 : #include "streamutil.h"
31 :
32 : /* Size of zlib buffer for .tar.gz */
33 : #define ZLIB_OUT_SIZE 4096
34 :
35 : /* Size of LZ4 input chunk for .lz4 */
36 : #define LZ4_IN_SIZE 4096
37 :
38 : /*-------------------------------------------------------------------------
39 : * WalDirectoryMethod - write wal to a directory looking like pg_wal
40 : *-------------------------------------------------------------------------
41 : */
42 :
43 : static Walfile *dir_open_for_write(WalWriteMethod *wwmethod,
44 : const char *pathname,
45 : const char *temp_suffix,
46 : size_t pad_to_size);
47 : static int dir_close(Walfile *f, WalCloseMethod method);
48 : static bool dir_existsfile(WalWriteMethod *wwmethod, const char *pathname);
49 : static ssize_t dir_get_file_size(WalWriteMethod *wwmethod,
50 : const char *pathname);
51 : static char *dir_get_file_name(WalWriteMethod *wwmethod,
52 : const char *pathname, const char *temp_suffix);
53 : static ssize_t dir_write(Walfile *f, const void *buf, size_t count);
54 : static int dir_sync(Walfile *f);
55 : static bool dir_finish(WalWriteMethod *wwmethod);
56 : static void dir_free(WalWriteMethod *wwmethod);
57 :
58 : const WalWriteMethodOps WalDirectoryMethodOps = {
59 : .open_for_write = dir_open_for_write,
60 : .close = dir_close,
61 : .existsfile = dir_existsfile,
62 : .get_file_size = dir_get_file_size,
63 : .get_file_name = dir_get_file_name,
64 : .write = dir_write,
65 : .sync = dir_sync,
66 : .finish = dir_finish,
67 : .free = dir_free
68 : };
69 :
70 : /*
71 : * Global static data for this method
72 : */
73 : typedef struct DirectoryMethodData
74 : {
75 : WalWriteMethod base;
76 : char *basedir;
77 : } DirectoryMethodData;
78 :
79 : /*
80 : * Local file handle
81 : */
82 : typedef struct DirectoryMethodFile
83 : {
84 : Walfile base;
85 : int fd;
86 : char *fullpath;
87 : char *temp_suffix;
88 : #ifdef HAVE_LIBZ
89 : gzFile gzfp;
90 : #endif
91 : #ifdef USE_LZ4
92 : LZ4F_compressionContext_t ctx;
93 : size_t lz4bufsize;
94 : void *lz4buf;
95 : #endif
96 : } DirectoryMethodFile;
97 :
98 : #define clear_error(wwmethod) \
99 : ((wwmethod)->lasterrstring = NULL, (wwmethod)->lasterrno = 0)
100 :
101 : static char *
202 rhaas 102 GNC 351 : dir_get_file_name(WalWriteMethod *wwmethod,
103 : const char *pathname, const char *temp_suffix)
104 : {
628 michael 105 GIC 351 : char *filename = pg_malloc0(MAXPGPATH * sizeof(char));
106 :
107 702 : snprintf(filename, MAXPGPATH, "%s%s%s",
108 : pathname,
202 rhaas 109 GNC 351 : wwmethod->compression_algorithm == PG_COMPRESSION_GZIP ? ".gz" :
110 343 : wwmethod->compression_algorithm == PG_COMPRESSION_LZ4 ? ".lz4" : "",
628 michael 111 ECB : temp_suffix ? temp_suffix : "");
112 :
628 michael 113 GIC 351 : return filename;
628 michael 114 ECB : }
115 :
116 : static Walfile *
202 rhaas 117 GNC 139 : dir_open_for_write(WalWriteMethod *wwmethod, const char *pathname,
118 : const char *temp_suffix, size_t pad_to_size)
2359 magnus 119 ECB : {
202 rhaas 120 GNC 139 : DirectoryMethodData *dir_data = (DirectoryMethodData *) wwmethod;
508 tgl 121 ECB : char tmppath[MAXPGPATH];
122 : char *filename;
123 : int fd;
2359 magnus 124 : DirectoryMethodFile *f;
125 : #ifdef HAVE_LIBZ
2273 magnus 126 GIC 139 : gzFile gzfp = NULL;
127 : #endif
390 rhaas 128 ECB : #ifdef USE_LZ4
520 michael 129 GIC 139 : LZ4F_compressionContext_t ctx = NULL;
130 139 : size_t lz4bufsize = 0;
520 michael 131 CBC 139 : void *lz4buf = NULL;
132 : #endif
133 :
202 rhaas 134 GNC 139 : clear_error(wwmethod);
135 :
136 139 : filename = dir_get_file_name(wwmethod, pathname, temp_suffix);
628 michael 137 CBC 139 : snprintf(tmppath, sizeof(tmppath), "%s/%s",
138 : dir_data->basedir, filename);
622 michael 139 GIC 139 : pg_free(filename);
2359 magnus 140 ECB :
2273 141 : /*
142 : * Open a file for non-compressed as well as compressed files. Tracking
143 : * the file descriptor is important for dir_sync() method as gzflush()
144 : * does not do any system calls to fsync() to make changes permanent on
145 : * disk.
146 : */
1828 sfrost 147 CBC 139 : fd = open(tmppath, O_WRONLY | O_CREAT | PG_BINARY, pg_file_create_mode);
2359 magnus 148 139 : if (fd < 0)
149 : {
202 rhaas 150 UNC 0 : wwmethod->lasterrno = errno;
2359 magnus 151 UIC 0 : return NULL;
152 : }
153 :
154 : #ifdef HAVE_LIBZ
202 rhaas 155 GNC 139 : if (wwmethod->compression_algorithm == PG_COMPRESSION_GZIP)
156 : {
2273 magnus 157 GIC 2 : gzfp = gzdopen(fd, "wb");
2273 magnus 158 CBC 2 : if (gzfp == NULL)
2273 magnus 159 ECB : {
202 rhaas 160 UNC 0 : wwmethod->lasterrno = errno;
2273 magnus 161 UBC 0 : close(fd);
162 0 : return NULL;
163 : }
164 :
202 rhaas 165 GNC 2 : if (gzsetparams(gzfp, wwmethod->compression_level,
2273 magnus 166 ECB : Z_DEFAULT_STRATEGY) != Z_OK)
167 : {
202 rhaas 168 UNC 0 : wwmethod->lasterrno = errno;
2273 magnus 169 LBC 0 : gzclose(gzfp);
2273 magnus 170 UIC 0 : return NULL;
2273 magnus 171 EUB : }
172 : }
173 : #endif
174 : #ifdef USE_LZ4
202 rhaas 175 GNC 139 : if (wwmethod->compression_algorithm == PG_COMPRESSION_LZ4)
520 michael 176 ECB : {
177 : size_t ctx_out;
178 : size_t header_size;
356 michael 179 EUB : LZ4F_preferences_t prefs;
520 180 :
520 michael 181 GBC 2 : ctx_out = LZ4F_createCompressionContext(&ctx, LZ4F_VERSION);
520 michael 182 GIC 2 : if (LZ4F_isError(ctx_out))
183 : {
202 rhaas 184 UNC 0 : wwmethod->lasterrstring = LZ4F_getErrorName(ctx_out);
520 michael 185 UIC 0 : close(fd);
520 michael 186 LBC 0 : return NULL;
187 : }
188 :
520 michael 189 GIC 2 : lz4bufsize = LZ4F_compressBound(LZ4_IN_SIZE, NULL);
190 2 : lz4buf = pg_malloc0(lz4bufsize);
191 :
356 michael 192 ECB : /* assign the compression level, default is 0 */
356 michael 193 CBC 2 : memset(&prefs, 0, sizeof(prefs));
202 rhaas 194 GNC 2 : prefs.compressionLevel = wwmethod->compression_level;
356 michael 195 EUB :
520 196 : /* add the header */
356 michael 197 GBC 2 : header_size = LZ4F_compressBegin(ctx, lz4buf, lz4bufsize, &prefs);
520 michael 198 GIC 2 : if (LZ4F_isError(header_size))
199 : {
202 rhaas 200 UNC 0 : wwmethod->lasterrstring = LZ4F_getErrorName(header_size);
520 michael 201 LBC 0 : (void) LZ4F_freeCompressionContext(ctx);
520 michael 202 UIC 0 : pg_free(lz4buf);
203 0 : close(fd);
520 michael 204 LBC 0 : return NULL;
520 michael 205 ECB : }
206 :
520 michael 207 GIC 2 : errno = 0;
520 michael 208 CBC 2 : if (write(fd, lz4buf, header_size) != header_size)
520 michael 209 ECB : {
210 : /* If write didn't set errno, assume problem is no disk space */
202 rhaas 211 UNC 0 : wwmethod->lasterrno = errno ? errno : ENOSPC;
520 michael 212 UBC 0 : (void) LZ4F_freeCompressionContext(ctx);
213 0 : pg_free(lz4buf);
214 0 : close(fd);
215 0 : return NULL;
216 : }
217 : }
520 michael 218 ECB : #endif
2273 magnus 219 :
220 : /* Do pre-padding on non-compressed files */
202 rhaas 221 GNC 139 : if (pad_to_size && wwmethod->compression_algorithm == PG_COMPRESSION_NONE)
2359 magnus 222 EUB : {
223 : ssize_t rc;
224 :
34 michael 225 GNC 94 : rc = pg_pwrite_zeros(fd, pad_to_size, 0);
226 :
152 227 94 : if (rc < 0)
228 : {
152 michael 229 UNC 0 : wwmethod->lasterrno = errno;
230 0 : close(fd);
231 0 : return NULL;
232 : }
2359 magnus 233 ECB :
234 : /*
235 : * pg_pwrite() (called via pg_pwrite_zeros()) may have moved the file
236 : * position, so reset it (see win32pwrite.c).
237 : */
2359 magnus 238 GIC 94 : if (lseek(fd, 0, SEEK_SET) != 0)
2359 magnus 239 EUB : {
202 rhaas 240 UNC 0 : wwmethod->lasterrno = errno;
2359 magnus 241 UBC 0 : close(fd);
2359 magnus 242 UIC 0 : return NULL;
243 : }
244 : }
245 :
246 : /*
247 : * fsync WAL file and containing directory, to ensure the file is
2359 magnus 248 ECB : * persistently created and zeroed (if padded). That's particularly
249 : * important when using synchronous mode, where the file is modified and
2359 magnus 250 EUB : * fsynced in-place, without a directory fsync.
251 : */
202 rhaas 252 GNC 139 : if (wwmethod->sync)
253 : {
1469 peter 254 GIC 14 : if (fsync_fname(tmppath, false) != 0 ||
255 7 : fsync_parent_path(tmppath) != 0)
256 : {
202 rhaas 257 UNC 0 : wwmethod->lasterrno = errno;
258 : #ifdef HAVE_LIBZ
259 0 : if (wwmethod->compression_algorithm == PG_COMPRESSION_GZIP)
2273 magnus 260 UIC 0 : gzclose(gzfp);
261 : else
520 michael 262 ECB : #endif
263 : #ifdef USE_LZ4
202 rhaas 264 UNC 0 : if (wwmethod->compression_algorithm == PG_COMPRESSION_LZ4)
520 michael 265 ECB : {
520 michael 266 UIC 0 : (void) LZ4F_compressEnd(ctx, lz4buf, lz4bufsize, NULL);
520 michael 267 UBC 0 : (void) LZ4F_freeCompressionContext(ctx);
520 michael 268 UIC 0 : pg_free(lz4buf);
520 michael 269 UBC 0 : close(fd);
520 michael 270 EUB : }
271 : else
272 : #endif
2273 magnus 273 UIC 0 : close(fd);
2359 magnus 274 UBC 0 : return NULL;
275 : }
2359 magnus 276 EUB : }
277 :
2359 magnus 278 GBC 139 : f = pg_malloc0(sizeof(DirectoryMethodFile));
2273 magnus 279 EUB : #ifdef HAVE_LIBZ
202 rhaas 280 GNC 139 : if (wwmethod->compression_algorithm == PG_COMPRESSION_GZIP)
2273 magnus 281 GIC 2 : f->gzfp = gzfp;
282 : #endif
390 rhaas 283 EUB : #ifdef USE_LZ4
202 rhaas 284 GNC 139 : if (wwmethod->compression_algorithm == PG_COMPRESSION_LZ4)
285 : {
520 michael 286 GIC 2 : f->ctx = ctx;
287 2 : f->lz4buf = lz4buf;
520 michael 288 CBC 2 : f->lz4bufsize = lz4bufsize;
289 : }
520 michael 290 ECB : #endif
291 :
202 rhaas 292 GNC 139 : f->base.wwmethod = wwmethod;
293 139 : f->base.currpos = 0;
294 139 : f->base.pathname = pg_strdup(pathname);
2359 magnus 295 GIC 139 : f->fd = fd;
296 139 : f->fullpath = pg_strdup(tmppath);
2359 magnus 297 CBC 139 : if (temp_suffix)
298 14 : f->temp_suffix = pg_strdup(temp_suffix);
2359 magnus 299 ECB :
202 rhaas 300 GNC 139 : return &f->base;
301 : }
302 :
2359 magnus 303 ECB : static ssize_t
202 rhaas 304 GNC 6281 : dir_write(Walfile *f, const void *buf, size_t count)
2359 magnus 305 ECB : {
306 : ssize_t r;
2359 magnus 307 CBC 6281 : DirectoryMethodFile *df = (DirectoryMethodFile *) f;
2359 magnus 308 ECB :
2359 magnus 309 CBC 6281 : Assert(f != NULL);
202 rhaas 310 GNC 6281 : clear_error(f->wwmethod);
2359 magnus 311 ECB :
312 : #ifdef HAVE_LIBZ
202 rhaas 313 GNC 6281 : if (f->wwmethod->compression_algorithm == PG_COMPRESSION_GZIP)
314 : {
508 tgl 315 CBC 9 : errno = 0;
2273 magnus 316 GIC 9 : r = (ssize_t) gzwrite(df->gzfp, buf, count);
508 tgl 317 9 : if (r != count)
508 tgl 318 ECB : {
319 : /* If write didn't set errno, assume problem is no disk space */
202 rhaas 320 UNC 0 : f->wwmethod->lasterrno = errno ? errno : ENOSPC;
508 tgl 321 ECB : }
322 : }
323 : else
520 michael 324 : #endif
325 : #ifdef USE_LZ4
202 rhaas 326 GNC 6272 : if (f->wwmethod->compression_algorithm == PG_COMPRESSION_LZ4)
520 michael 327 ECB : {
328 : size_t chunk;
329 : size_t remaining;
520 michael 330 GIC 9 : const void *inbuf = buf;
520 michael 331 EUB :
520 michael 332 GIC 9 : remaining = count;
333 266 : while (remaining > 0)
334 : {
335 : size_t compressed;
336 :
520 michael 337 CBC 257 : if (remaining > LZ4_IN_SIZE)
520 michael 338 GIC 248 : chunk = LZ4_IN_SIZE;
339 : else
340 9 : chunk = remaining;
520 michael 341 ECB :
520 michael 342 GIC 257 : remaining -= chunk;
520 michael 343 CBC 257 : compressed = LZ4F_compressUpdate(df->ctx,
520 michael 344 ECB : df->lz4buf, df->lz4bufsize,
345 : inbuf, chunk,
346 : NULL);
347 :
520 michael 348 CBC 257 : if (LZ4F_isError(compressed))
508 tgl 349 ECB : {
202 rhaas 350 UNC 0 : f->wwmethod->lasterrstring = LZ4F_getErrorName(compressed);
520 michael 351 LBC 0 : return -1;
352 : }
520 michael 353 ECB :
508 tgl 354 CBC 257 : errno = 0;
520 michael 355 GIC 257 : if (write(df->fd, df->lz4buf, compressed) != compressed)
356 : {
357 : /* If write didn't set errno, assume problem is no disk space */
202 rhaas 358 UNC 0 : f->wwmethod->lasterrno = errno ? errno : ENOSPC;
520 michael 359 LBC 0 : return -1;
360 : }
520 michael 361 EUB :
520 michael 362 GBC 257 : inbuf = ((char *) inbuf) + chunk;
363 : }
364 :
520 michael 365 ECB : /* Our caller keeps track of the uncompressed size. */
520 michael 366 CBC 9 : r = (ssize_t) count;
367 : }
368 : else
2273 magnus 369 EUB : #endif
508 tgl 370 : {
508 tgl 371 GIC 6263 : errno = 0;
2273 magnus 372 6263 : r = write(df->fd, buf, count);
508 tgl 373 CBC 6263 : if (r != count)
374 : {
375 : /* If write didn't set errno, assume problem is no disk space */
202 rhaas 376 UNC 0 : f->wwmethod->lasterrno = errno ? errno : ENOSPC;
508 tgl 377 ECB : }
378 : }
2359 magnus 379 GIC 6281 : if (r > 0)
202 rhaas 380 GNC 6281 : df->base.currpos += r;
2359 magnus 381 GIC 6281 : return r;
2359 magnus 382 ECB : }
383 :
384 : static int
202 rhaas 385 GNC 139 : dir_close(Walfile *f, WalCloseMethod method)
2359 magnus 386 ECB : {
387 : int r;
2359 magnus 388 GIC 139 : DirectoryMethodFile *df = (DirectoryMethodFile *) f;
202 rhaas 389 GNC 139 : DirectoryMethodData *dir_data = (DirectoryMethodData *) f->wwmethod;
508 tgl 390 ECB : char tmppath[MAXPGPATH];
391 : char tmppath2[MAXPGPATH];
392 :
2359 magnus 393 GIC 139 : Assert(f != NULL);
202 rhaas 394 GNC 139 : clear_error(f->wwmethod);
2359 magnus 395 ECB :
2273 396 : #ifdef HAVE_LIBZ
202 rhaas 397 GNC 139 : if (f->wwmethod->compression_algorithm == PG_COMPRESSION_GZIP)
398 : {
508 tgl 399 CBC 2 : errno = 0; /* in case gzclose() doesn't set it */
2273 magnus 400 GIC 2 : r = gzclose(df->gzfp);
508 tgl 401 ECB : }
2273 magnus 402 : else
403 : #endif
404 : #ifdef USE_LZ4
202 rhaas 405 GNC 137 : if (f->wwmethod->compression_algorithm == PG_COMPRESSION_LZ4)
406 : {
520 michael 407 ECB : size_t compressed;
408 :
520 michael 409 GIC 2 : compressed = LZ4F_compressEnd(df->ctx,
410 : df->lz4buf, df->lz4bufsize,
520 michael 411 ECB : NULL);
412 :
520 michael 413 GIC 2 : if (LZ4F_isError(compressed))
414 : {
202 rhaas 415 UNC 0 : f->wwmethod->lasterrstring = LZ4F_getErrorName(compressed);
520 michael 416 UIC 0 : return -1;
508 tgl 417 EUB : }
520 michael 418 :
508 tgl 419 GIC 2 : errno = 0;
520 michael 420 2 : if (write(df->fd, df->lz4buf, compressed) != compressed)
508 tgl 421 ECB : {
422 : /* If write didn't set errno, assume problem is no disk space */
202 rhaas 423 UNC 0 : f->wwmethod->lasterrno = errno ? errno : ENOSPC;
520 michael 424 UIC 0 : return -1;
508 tgl 425 EUB : }
520 michael 426 :
520 michael 427 GIC 2 : r = close(df->fd);
428 : }
520 michael 429 ECB : else
430 : #endif
2273 magnus 431 GIC 135 : r = close(df->fd);
432 :
2359 magnus 433 CBC 139 : if (r == 0)
434 : {
2359 magnus 435 ECB : /* Build path to the current version of the file */
2359 magnus 436 GIC 139 : if (method == CLOSE_NORMAL && df->temp_suffix)
437 8 : {
628 michael 438 ECB : char *filename;
439 : char *filename2;
440 :
441 : /*
442 : * If we have a temp prefix, normal operation is to rename the
443 : * file.
444 : */
202 rhaas 445 GNC 8 : filename = dir_get_file_name(f->wwmethod, df->base.pathname,
446 8 : df->temp_suffix);
628 michael 447 GIC 8 : snprintf(tmppath, sizeof(tmppath), "%s/%s",
628 michael 448 ECB : dir_data->basedir, filename);
622 michael 449 CBC 8 : pg_free(filename);
628 michael 450 ECB :
451 : /* permanent name, so no need for the prefix */
202 rhaas 452 GNC 8 : filename2 = dir_get_file_name(f->wwmethod, df->base.pathname, NULL);
628 michael 453 GIC 8 : snprintf(tmppath2, sizeof(tmppath2), "%s/%s",
454 : dir_data->basedir, filename2);
622 michael 455 CBC 8 : pg_free(filename2);
202 rhaas 456 GNC 8 : if (f->wwmethod->sync)
441 andres 457 GIC 3 : r = durable_rename(tmppath, tmppath2);
441 andres 458 ECB : else
459 : {
441 andres 460 CBC 5 : if (rename(tmppath, tmppath2) != 0)
461 : {
441 andres 462 UIC 0 : pg_log_error("could not rename file \"%s\" to \"%s\": %m",
441 andres 463 ECB : tmppath, tmppath2);
441 andres 464 UIC 0 : r = -1;
441 andres 465 EUB : }
466 : }
2359 magnus 467 : }
2359 magnus 468 GIC 131 : else if (method == CLOSE_UNLINK)
469 : {
470 : char *filename;
628 michael 471 ECB :
472 : /* Unlink the file once it's closed */
202 rhaas 473 UNC 0 : filename = dir_get_file_name(f->wwmethod, df->base.pathname,
474 0 : df->temp_suffix);
628 michael 475 UIC 0 : snprintf(tmppath, sizeof(tmppath), "%s/%s",
476 : dir_data->basedir, filename);
622 michael 477 UBC 0 : pg_free(filename);
2359 magnus 478 0 : r = unlink(tmppath);
2359 magnus 479 EUB : }
480 : else
481 : {
482 : /*
483 : * Else either CLOSE_NORMAL and no temp suffix, or
484 : * CLOSE_NO_RENAME. In this case, fsync the file and containing
485 : * directory if sync mode is requested.
486 : */
202 rhaas 487 GNC 131 : if (f->wwmethod->sync)
488 : {
1469 peter 489 GIC 4 : r = fsync_fname(df->fullpath, false);
2359 magnus 490 4 : if (r == 0)
1469 peter 491 CBC 4 : r = fsync_parent_path(df->fullpath);
492 : }
2359 magnus 493 ECB : }
494 : }
495 :
508 tgl 496 GIC 139 : if (r != 0)
202 rhaas 497 UNC 0 : f->wwmethod->lasterrno = errno;
498 :
499 : #ifdef USE_LZ4
520 michael 500 CBC 139 : pg_free(df->lz4buf);
520 michael 501 EUB : /* supports free on NULL */
520 michael 502 GIC 139 : LZ4F_freeCompressionContext(df->ctx);
503 : #endif
520 michael 504 ECB :
202 rhaas 505 GNC 139 : pg_free(df->base.pathname);
2359 magnus 506 CBC 139 : pg_free(df->fullpath);
296 peter 507 GNC 139 : pg_free(df->temp_suffix);
2359 magnus 508 CBC 139 : pg_free(df);
2359 magnus 509 ECB :
2359 magnus 510 CBC 139 : return r;
2359 magnus 511 ECB : }
512 :
513 : static int
202 rhaas 514 UNC 0 : dir_sync(Walfile *f)
515 : {
516 : int r;
508 tgl 517 EUB :
2359 magnus 518 UIC 0 : Assert(f != NULL);
202 rhaas 519 UNC 0 : clear_error(f->wwmethod);
520 :
521 0 : if (!f->wwmethod->sync)
2359 magnus 522 UBC 0 : return 0;
523 :
2273 magnus 524 EUB : #ifdef HAVE_LIBZ
202 rhaas 525 UNC 0 : if (f->wwmethod->compression_algorithm == PG_COMPRESSION_GZIP)
526 : {
2273 magnus 527 UIC 0 : if (gzflush(((DirectoryMethodFile *) f)->gzfp, Z_SYNC_FLUSH) != Z_OK)
508 tgl 528 EUB : {
202 rhaas 529 UNC 0 : f->wwmethod->lasterrno = errno;
2273 magnus 530 UBC 0 : return -1;
531 : }
2273 magnus 532 EUB : }
533 : #endif
534 : #ifdef USE_LZ4
202 rhaas 535 UNC 0 : if (f->wwmethod->compression_algorithm == PG_COMPRESSION_LZ4)
536 : {
520 michael 537 UIC 0 : DirectoryMethodFile *df = (DirectoryMethodFile *) f;
520 michael 538 EUB : size_t compressed;
539 :
540 : /* Flush any internal buffers */
520 michael 541 UIC 0 : compressed = LZ4F_flush(df->ctx, df->lz4buf, df->lz4bufsize, NULL);
542 0 : if (LZ4F_isError(compressed))
543 : {
202 rhaas 544 UNC 0 : f->wwmethod->lasterrstring = LZ4F_getErrorName(compressed);
520 michael 545 UBC 0 : return -1;
546 : }
520 michael 547 EUB :
508 tgl 548 UBC 0 : errno = 0;
520 michael 549 UIC 0 : if (write(df->fd, df->lz4buf, compressed) != compressed)
550 : {
508 tgl 551 EUB : /* If write didn't set errno, assume problem is no disk space */
202 rhaas 552 UNC 0 : f->wwmethod->lasterrno = errno ? errno : ENOSPC;
520 michael 553 UIC 0 : return -1;
554 : }
520 michael 555 EUB : }
556 : #endif
557 :
508 tgl 558 UIC 0 : r = fsync(((DirectoryMethodFile *) f)->fd);
559 0 : if (r < 0)
202 rhaas 560 UNC 0 : f->wwmethod->lasterrno = errno;
508 tgl 561 UBC 0 : return r;
2359 magnus 562 EUB : }
563 :
564 : static ssize_t
202 rhaas 565 UNC 0 : dir_get_file_size(WalWriteMethod *wwmethod, const char *pathname)
566 : {
567 0 : DirectoryMethodData *dir_data = (DirectoryMethodData *) wwmethod;
568 : struct stat statbuf;
508 tgl 569 EUB : char tmppath[MAXPGPATH];
570 :
2359 magnus 571 UBC 0 : snprintf(tmppath, sizeof(tmppath), "%s/%s",
572 : dir_data->basedir, pathname);
573 :
2359 magnus 574 UIC 0 : if (stat(tmppath, &statbuf) != 0)
508 tgl 575 EUB : {
202 rhaas 576 UNC 0 : wwmethod->lasterrno = errno;
2359 magnus 577 UIC 0 : return -1;
508 tgl 578 EUB : }
579 :
2359 magnus 580 UBC 0 : return statbuf.st_size;
2359 magnus 581 EUB : }
582 :
583 : static bool
202 rhaas 584 GNC 96 : dir_existsfile(WalWriteMethod *wwmethod, const char *pathname)
585 : {
586 96 : DirectoryMethodData *dir_data = (DirectoryMethodData *) wwmethod;
587 : char tmppath[MAXPGPATH];
588 : int fd;
2359 magnus 589 ECB :
202 rhaas 590 GNC 96 : clear_error(wwmethod);
508 tgl 591 ECB :
2359 magnus 592 GIC 96 : snprintf(tmppath, sizeof(tmppath), "%s/%s",
593 : dir_data->basedir, pathname);
2359 magnus 594 ECB :
2359 magnus 595 CBC 96 : fd = open(tmppath, O_RDONLY | PG_BINARY, 0);
596 96 : if (fd < 0)
2359 magnus 597 GBC 96 : return false;
2359 magnus 598 UBC 0 : close(fd);
2359 magnus 599 UIC 0 : return true;
600 : }
601 :
2359 magnus 602 ECB : static bool
202 rhaas 603 GNC 90 : dir_finish(WalWriteMethod *wwmethod)
2359 magnus 604 ECB : {
202 rhaas 605 GNC 90 : clear_error(wwmethod);
508 tgl 606 ECB :
202 rhaas 607 GNC 90 : if (wwmethod->sync)
2359 magnus 608 ECB : {
202 rhaas 609 GNC 4 : DirectoryMethodData *dir_data = (DirectoryMethodData *) wwmethod;
610 :
611 : /*
612 : * Files are fsynced when they are closed, but we need to fsync the
613 : * directory entry here as well.
614 : */
1469 peter 615 GIC 4 : if (fsync_fname(dir_data->basedir, true) != 0)
508 tgl 616 ECB : {
202 rhaas 617 UNC 0 : wwmethod->lasterrno = errno;
2359 magnus 618 UBC 0 : return false;
508 tgl 619 EUB : }
620 : }
2359 magnus 621 GIC 90 : return true;
2359 magnus 622 ECB : }
623 :
624 : static void
202 rhaas 625 GNC 90 : dir_free(WalWriteMethod *wwmethod)
626 : {
627 90 : DirectoryMethodData *dir_data = (DirectoryMethodData *) wwmethod;
628 :
629 90 : pg_free(dir_data->basedir);
630 90 : pg_free(wwmethod);
631 90 : }
632 :
633 :
634 : WalWriteMethod *
521 michael 635 CBC 93 : CreateWalDirectoryMethod(const char *basedir,
636 : pg_compress_algorithm compression_algorithm,
521 michael 637 ECB : int compression_level, bool sync)
638 : {
639 : DirectoryMethodData *wwmethod;
202 rhaas 640 :
202 rhaas 641 GNC 93 : wwmethod = pg_malloc0(sizeof(DirectoryMethodData));
642 93 : *((const WalWriteMethodOps **) &wwmethod->base.ops) =
643 : &WalDirectoryMethodOps;
644 93 : wwmethod->base.compression_algorithm = compression_algorithm;
645 93 : wwmethod->base.compression_level = compression_level;
646 93 : wwmethod->base.sync = sync;
647 93 : clear_error(&wwmethod->base);
648 93 : wwmethod->basedir = pg_strdup(basedir);
649 :
650 93 : return &wwmethod->base;
651 : }
652 :
653 :
654 : /*-------------------------------------------------------------------------
655 : * WalTarMethod - write wal to a tar file containing pg_wal contents
656 : *-------------------------------------------------------------------------
657 : */
658 :
659 : static Walfile *tar_open_for_write(WalWriteMethod *wwmethod,
660 : const char *pathname,
661 : const char *temp_suffix,
662 : size_t pad_to_size);
663 : static int tar_close(Walfile *f, WalCloseMethod method);
664 : static bool tar_existsfile(WalWriteMethod *wwmethod, const char *pathname);
665 : static ssize_t tar_get_file_size(WalWriteMethod *wwmethod,
666 : const char *pathname);
667 : static char *tar_get_file_name(WalWriteMethod *wwmethod,
668 : const char *pathname, const char *temp_suffix);
669 : static ssize_t tar_write(Walfile *f, const void *buf, size_t count);
670 : static int tar_sync(Walfile *f);
671 : static bool tar_finish(WalWriteMethod *wwmethod);
672 : static void tar_free(WalWriteMethod *wwmethod);
673 :
674 : const WalWriteMethodOps WalTarMethodOps = {
675 : .open_for_write = tar_open_for_write,
676 : .close = tar_close,
677 : .existsfile = tar_existsfile,
678 : .get_file_size = tar_get_file_size,
679 : .get_file_name = tar_get_file_name,
680 : .write = tar_write,
681 : .sync = tar_sync,
682 : .finish = tar_finish,
683 : .free = tar_free
684 : };
685 :
686 : typedef struct TarMethodFile
687 : {
688 : Walfile base;
689 : off_t ofs_start; /* Where does the *header* for this file start */
690 : char header[TAR_BLOCK_SIZE];
691 : size_t pad_to_size;
692 : } TarMethodFile;
693 :
694 : typedef struct TarMethodData
695 : {
696 : WalWriteMethod base;
697 : char *tarfilename;
698 : int fd;
699 : TarMethodFile *currentfile;
700 : #ifdef HAVE_LIBZ
701 : z_streamp zp;
702 : void *zlibOut;
703 : #endif
704 : } TarMethodData;
705 :
706 : #ifdef HAVE_LIBZ
707 : static bool
708 5607 : tar_write_compressed_data(TarMethodData *tar_data, void *buf, size_t count,
709 : bool flush)
710 : {
2359 magnus 711 GIC 5607 : tar_data->zp->next_in = buf;
2359 magnus 712 CBC 5607 : tar_data->zp->avail_in = count;
2359 magnus 713 ECB :
2359 magnus 714 GIC 11228 : while (tar_data->zp->avail_in || flush)
2359 magnus 715 ECB : {
716 : int r;
717 :
2359 magnus 718 GIC 5633 : r = deflate(tar_data->zp, flush ? Z_FINISH : Z_NO_FLUSH);
2359 magnus 719 CBC 5633 : if (r == Z_STREAM_ERROR)
2359 magnus 720 ECB : {
202 rhaas 721 UNC 0 : tar_data->base.lasterrstring = "could not compress data";
2359 magnus 722 UBC 0 : return false;
2359 magnus 723 EUB : }
724 :
2359 magnus 725 GIC 5633 : if (tar_data->zp->avail_out < ZLIB_OUT_SIZE)
2359 magnus 726 ECB : {
2359 magnus 727 GIC 62 : size_t len = ZLIB_OUT_SIZE - tar_data->zp->avail_out;
2359 magnus 728 ECB :
1708 michael 729 GIC 62 : errno = 0;
2359 magnus 730 CBC 62 : if (write(tar_data->fd, tar_data->zlibOut, len) != len)
1749 michael 731 ECB : {
732 : /* If write didn't set errno, assume problem is no disk space */
202 rhaas 733 UNC 0 : tar_data->base.lasterrno = errno ? errno : ENOSPC;
2359 magnus 734 UBC 0 : return false;
1749 michael 735 EUB : }
736 :
2359 magnus 737 GIC 62 : tar_data->zp->next_out = tar_data->zlibOut;
2359 magnus 738 CBC 62 : tar_data->zp->avail_out = ZLIB_OUT_SIZE;
2359 magnus 739 ECB : }
740 :
2359 magnus 741 GIC 5633 : if (r == Z_STREAM_END)
2359 magnus 742 CBC 12 : break;
2359 magnus 743 ECB : }
744 :
2359 magnus 745 GIC 5607 : if (flush)
2359 magnus 746 ECB : {
747 : /* Reset the stream for writing */
2359 magnus 748 GIC 12 : if (deflateReset(tar_data->zp) != Z_OK)
2359 magnus 749 ECB : {
202 rhaas 750 UNC 0 : tar_data->base.lasterrstring = "could not reset compression stream";
2359 magnus 751 UBC 0 : return false;
2359 magnus 752 EUB : }
753 : }
754 :
2359 magnus 755 GIC 5607 : return true;
2359 magnus 756 ECB : }
757 : #endif
758 :
759 : static ssize_t
202 rhaas 760 GNC 14127 : tar_write(Walfile *f, const void *buf, size_t count)
2359 magnus 761 ECB : {
202 rhaas 762 GNC 14127 : TarMethodData *tar_data = (TarMethodData *) f->wwmethod;
763 : ssize_t r;
2359 magnus 764 ECB :
2359 magnus 765 GIC 14127 : Assert(f != NULL);
202 rhaas 766 GNC 14127 : clear_error(f->wwmethod);
2359 magnus 767 ECB :
768 : /* Tarfile will always be positioned at the end */
202 rhaas 769 GNC 14127 : if (f->wwmethod->compression_algorithm == PG_COMPRESSION_NONE)
770 : {
508 tgl 771 CBC 8535 : errno = 0;
2359 magnus 772 GIC 8535 : r = write(tar_data->fd, buf, count);
508 tgl 773 CBC 8535 : if (r != count)
508 tgl 774 ECB : {
775 : /* If write didn't set errno, assume problem is no disk space */
202 rhaas 776 UNC 0 : f->wwmethod->lasterrno = errno ? errno : ENOSPC;
508 tgl 777 UIC 0 : return -1;
508 tgl 778 EUB : }
202 rhaas 779 GNC 8535 : f->currpos += r;
2359 magnus 780 GIC 8535 : return r;
2359 magnus 781 ECB : }
782 : #ifdef HAVE_LIBZ
202 rhaas 783 GNC 5592 : else if (f->wwmethod->compression_algorithm == PG_COMPRESSION_GZIP)
784 : {
785 5592 : if (!tar_write_compressed_data(tar_data, unconstify(void *, buf),
786 : count, false))
2359 magnus 787 UIC 0 : return -1;
202 rhaas 788 GNC 5592 : f->currpos += count;
2359 magnus 789 GIC 5592 : return count;
2359 magnus 790 EUB : }
457 michael 791 ECB : #endif
2359 magnus 792 : else
793 : {
794 : /* Can't happen - compression enabled with no method set */
202 rhaas 795 UNC 0 : f->wwmethod->lasterrno = ENOSYS;
2359 magnus 796 UIC 0 : return -1;
797 : }
2359 magnus 798 EUB : }
799 :
800 : static bool
2153 bruce 801 GIC 7 : tar_write_padding_data(TarMethodFile *f, size_t bytes)
802 : {
803 : PGAlignedXLogBlock zerobuf;
2359 magnus 804 CBC 7 : size_t bytesleft = bytes;
805 :
1681 tgl 806 GIC 7 : memset(zerobuf.data, 0, XLOG_BLCKSZ);
2359 magnus 807 CBC 13751 : while (bytesleft)
808 : {
1681 tgl 809 13744 : size_t bytestowrite = Min(bytesleft, XLOG_BLCKSZ);
202 rhaas 810 GNC 13744 : ssize_t r = tar_write(&f->base, zerobuf.data, bytestowrite);
811 :
2359 magnus 812 CBC 13744 : if (r < 0)
2359 magnus 813 LBC 0 : return false;
2359 magnus 814 GIC 13744 : bytesleft -= r;
2359 magnus 815 ECB : }
2352 magnus 816 EUB :
2359 magnus 817 CBC 7 : return true;
818 : }
819 :
628 michael 820 ECB : static char *
202 rhaas 821 GNC 23 : tar_get_file_name(WalWriteMethod *wwmethod, const char *pathname,
822 : const char *temp_suffix)
823 : {
628 michael 824 GIC 23 : char *filename = pg_malloc0(MAXPGPATH * sizeof(char));
628 michael 825 ECB :
628 michael 826 GIC 23 : snprintf(filename, MAXPGPATH, "%s%s",
827 : pathname, temp_suffix ? temp_suffix : "");
628 michael 828 ECB :
628 michael 829 GIC 23 : return filename;
628 michael 830 ECB : }
831 :
832 : static Walfile *
202 rhaas 833 GNC 9 : tar_open_for_write(WalWriteMethod *wwmethod, const char *pathname,
834 : const char *temp_suffix, size_t pad_to_size)
835 : {
836 9 : TarMethodData *tar_data = (TarMethodData *) wwmethod;
837 : char *tmppath;
838 :
839 9 : clear_error(wwmethod);
840 :
2359 magnus 841 GIC 9 : if (tar_data->fd < 0)
2359 magnus 842 ECB : {
843 : /*
844 : * We open the tar file only when we first try to write to it.
845 : */
2359 magnus 846 GIC 7 : tar_data->fd = open(tar_data->tarfilename,
1828 sfrost 847 ECB : O_WRONLY | O_CREAT | PG_BINARY,
848 : pg_file_create_mode);
2359 magnus 849 GIC 7 : if (tar_data->fd < 0)
850 : {
202 rhaas 851 UNC 0 : wwmethod->lasterrno = errno;
2359 magnus 852 LBC 0 : return NULL;
853 : }
854 :
2359 magnus 855 ECB : #ifdef HAVE_LIBZ
202 rhaas 856 GNC 7 : if (wwmethod->compression_algorithm == PG_COMPRESSION_GZIP)
2359 magnus 857 EUB : {
2359 magnus 858 GBC 3 : tar_data->zp = (z_streamp) pg_malloc(sizeof(z_stream));
2359 magnus 859 GIC 3 : tar_data->zp->zalloc = Z_NULL;
860 3 : tar_data->zp->zfree = Z_NULL;
861 3 : tar_data->zp->opaque = Z_NULL;
2359 magnus 862 CBC 3 : tar_data->zp->next_out = tar_data->zlibOut;
2359 magnus 863 GIC 3 : tar_data->zp->avail_out = ZLIB_OUT_SIZE;
2359 magnus 864 ECB :
865 : /*
866 : * Initialize deflation library. Adding the magic value 16 to the
867 : * default 15 for the windowBits parameter makes the output be
868 : * gzip instead of zlib.
869 : */
202 rhaas 870 GNC 3 : if (deflateInit2(tar_data->zp, wwmethod->compression_level,
871 : Z_DEFLATED, 15 + 16, 8, Z_DEFAULT_STRATEGY) != Z_OK)
872 : {
2359 magnus 873 UIC 0 : pg_free(tar_data->zp);
874 0 : tar_data->zp = NULL;
202 rhaas 875 UNC 0 : wwmethod->lasterrstring =
876 : "could not initialize compression library";
2359 magnus 877 LBC 0 : return NULL;
878 : }
879 : }
2359 magnus 880 EUB : #endif
881 :
882 : /* There's no tar header itself, the file starts with regular files */
883 : }
884 :
2359 magnus 885 GIC 9 : if (tar_data->currentfile != NULL)
886 : {
202 rhaas 887 UNC 0 : wwmethod->lasterrstring =
888 : "implementation error: tar files can't have more than one open file";
2359 magnus 889 UIC 0 : return NULL;
890 : }
891 :
2359 magnus 892 GIC 9 : tar_data->currentfile = pg_malloc0(sizeof(TarMethodFile));
202 rhaas 893 GNC 9 : tar_data->currentfile->base.wwmethod = wwmethod;
2359 magnus 894 ECB :
202 rhaas 895 GNC 9 : tmppath = tar_get_file_name(wwmethod, pathname, temp_suffix);
2359 magnus 896 EUB :
897 : /* Create a header with size set to 0 - we will fill out the size on close */
2359 magnus 898 GBC 9 : if (tarCreateHeader(tar_data->currentfile->header, tmppath, NULL, 0, S_IRUSR | S_IWUSR, 0, 0, time(NULL)) != TAR_OK)
899 : {
2359 magnus 900 UIC 0 : pg_free(tar_data->currentfile);
622 michael 901 LBC 0 : pg_free(tmppath);
2359 magnus 902 0 : tar_data->currentfile = NULL;
202 rhaas 903 UNC 0 : wwmethod->lasterrstring = "could not create tar header";
2359 magnus 904 LBC 0 : return NULL;
905 : }
906 :
622 michael 907 CBC 9 : pg_free(tmppath);
908 :
2359 magnus 909 EUB : #ifdef HAVE_LIBZ
202 rhaas 910 GNC 9 : if (wwmethod->compression_algorithm == PG_COMPRESSION_GZIP)
2359 magnus 911 EUB : {
912 : /* Flush existing data */
202 rhaas 913 GNC 3 : if (!tar_write_compressed_data(tar_data, NULL, 0, true))
2359 magnus 914 UIC 0 : return NULL;
915 :
2359 magnus 916 ECB : /* Turn off compression for header */
207 michael 917 GIC 3 : if (deflateParams(tar_data->zp, 0, Z_DEFAULT_STRATEGY) != Z_OK)
918 : {
202 rhaas 919 UNC 0 : wwmethod->lasterrstring =
920 : "could not change compression parameters";
2359 magnus 921 UIC 0 : return NULL;
922 : }
2359 magnus 923 ECB : }
2359 magnus 924 EUB : #endif
925 :
2359 magnus 926 GIC 9 : tar_data->currentfile->ofs_start = lseek(tar_data->fd, 0, SEEK_CUR);
2359 magnus 927 CBC 9 : if (tar_data->currentfile->ofs_start == -1)
928 : {
202 rhaas 929 UNC 0 : wwmethod->lasterrno = errno;
2359 magnus 930 UIC 0 : pg_free(tar_data->currentfile);
2359 magnus 931 UBC 0 : tar_data->currentfile = NULL;
2359 magnus 932 UIC 0 : return NULL;
933 : }
202 rhaas 934 GNC 9 : tar_data->currentfile->base.currpos = 0;
935 :
936 9 : if (wwmethod->compression_algorithm == PG_COMPRESSION_NONE)
2359 magnus 937 ECB : {
1708 michael 938 GIC 6 : errno = 0;
1080 rhaas 939 GBC 6 : if (write(tar_data->fd, tar_data->currentfile->header,
1080 rhaas 940 EUB : TAR_BLOCK_SIZE) != TAR_BLOCK_SIZE)
2359 magnus 941 : {
508 tgl 942 : /* If write didn't set errno, assume problem is no disk space */
202 rhaas 943 UNC 0 : wwmethod->lasterrno = errno ? errno : ENOSPC;
2359 magnus 944 LBC 0 : pg_free(tar_data->currentfile);
2359 magnus 945 UIC 0 : tar_data->currentfile = NULL;
2359 magnus 946 LBC 0 : return NULL;
947 : }
2359 magnus 948 ECB : }
949 : #ifdef HAVE_LIBZ
202 rhaas 950 GNC 3 : else if (wwmethod->compression_algorithm == PG_COMPRESSION_GZIP)
951 : {
952 : /* Write header through the zlib APIs but with no compression */
953 3 : if (!tar_write_compressed_data(tar_data, tar_data->currentfile->header,
1080 rhaas 954 EUB : TAR_BLOCK_SIZE, true))
2359 magnus 955 UBC 0 : return NULL;
2359 magnus 956 EUB :
957 : /* Re-enable compression for the rest of the file */
202 rhaas 958 GNC 3 : if (deflateParams(tar_data->zp, wwmethod->compression_level,
959 : Z_DEFAULT_STRATEGY) != Z_OK)
2359 magnus 960 ECB : {
202 rhaas 961 UNC 0 : wwmethod->lasterrstring = "could not change compression parameters";
2359 magnus 962 UIC 0 : return NULL;
2359 magnus 963 ECB : }
964 : }
2359 magnus 965 EUB : #endif
966 : else
967 : {
457 michael 968 ECB : /* not reachable */
457 michael 969 UIC 0 : Assert(false);
970 : }
2359 magnus 971 EUB :
202 rhaas 972 GNC 9 : tar_data->currentfile->base.pathname = pg_strdup(pathname);
973 :
974 : /*
975 : * Uncompressed files are padded on creation, but for compression we can't
976 : * do that
977 : */
2359 magnus 978 GIC 9 : if (pad_to_size)
2359 magnus 979 EUB : {
2359 magnus 980 GIC 7 : tar_data->currentfile->pad_to_size = pad_to_size;
202 rhaas 981 GNC 7 : if (wwmethod->compression_algorithm == PG_COMPRESSION_NONE)
2359 magnus 982 ECB : {
983 : /* Uncompressed, so pad now */
508 tgl 984 GIC 4 : if (!tar_write_padding_data(tar_data->currentfile, pad_to_size))
508 tgl 985 UIC 0 : return NULL;
986 : /* Seek back to start */
1080 rhaas 987 GIC 8 : if (lseek(tar_data->fd,
1080 rhaas 988 CBC 4 : tar_data->currentfile->ofs_start + TAR_BLOCK_SIZE,
1080 rhaas 989 GIC 4 : SEEK_SET) != tar_data->currentfile->ofs_start + TAR_BLOCK_SIZE)
508 tgl 990 ECB : {
202 rhaas 991 UNC 0 : wwmethod->lasterrno = errno;
2359 magnus 992 UIC 0 : return NULL;
993 : }
2359 magnus 994 ECB :
202 rhaas 995 GNC 4 : tar_data->currentfile->base.currpos = 0;
996 : }
2359 magnus 997 ECB : }
998 :
202 rhaas 999 GNC 9 : return &tar_data->currentfile->base;
1000 : }
2359 magnus 1001 EUB :
1002 : static ssize_t
202 rhaas 1003 UNC 0 : tar_get_file_size(WalWriteMethod *wwmethod, const char *pathname)
1004 : {
1005 0 : clear_error(wwmethod);
1006 :
1007 : /* Currently not used, so not supported */
1008 0 : wwmethod->lasterrno = ENOSYS;
2359 magnus 1009 LBC 0 : return -1;
1010 : }
1011 :
1012 : static int
202 rhaas 1013 GNC 9 : tar_sync(Walfile *f)
2359 magnus 1014 ECB : {
202 rhaas 1015 GNC 9 : TarMethodData *tar_data = (TarMethodData *) f->wwmethod;
1016 : int r;
508 tgl 1017 ECB :
2359 magnus 1018 CBC 9 : Assert(f != NULL);
202 rhaas 1019 GNC 9 : clear_error(f->wwmethod);
1020 :
1021 9 : if (!f->wwmethod->sync)
2357 magnus 1022 GIC 9 : return 0;
1023 :
2359 magnus 1024 EUB : /*
1025 : * Always sync the whole tarfile, because that's all we can do. This makes
1026 : * no sense on compressed files, so just ignore those.
1027 : */
202 rhaas 1028 UNC 0 : if (f->wwmethod->compression_algorithm != PG_COMPRESSION_NONE)
2359 magnus 1029 UBC 0 : return 0;
2359 magnus 1030 EUB :
508 tgl 1031 UIC 0 : r = fsync(tar_data->fd);
1032 0 : if (r < 0)
202 rhaas 1033 UNC 0 : f->wwmethod->lasterrno = errno;
508 tgl 1034 LBC 0 : return r;
1035 : }
1036 :
1037 : static int
202 rhaas 1038 GNC 9 : tar_close(Walfile *f, WalCloseMethod method)
2359 magnus 1039 ECB : {
1040 : ssize_t filesize;
1041 : int padding;
202 rhaas 1042 GNC 9 : TarMethodData *tar_data = (TarMethodData *) f->wwmethod;
2359 magnus 1043 CBC 9 : TarMethodFile *tf = (TarMethodFile *) f;
1044 :
1045 9 : Assert(f != NULL);
202 rhaas 1046 GNC 9 : clear_error(f->wwmethod);
2359 magnus 1047 EUB :
2359 magnus 1048 GIC 9 : if (method == CLOSE_UNLINK)
2359 magnus 1049 EUB : {
202 rhaas 1050 UNC 0 : if (f->wwmethod->compression_algorithm != PG_COMPRESSION_NONE)
1051 : {
1052 0 : f->wwmethod->lasterrstring = "unlink not supported with compression";
2359 magnus 1053 UIC 0 : return -1;
1054 : }
1055 :
1056 : /*
1057 : * Unlink the file that we just wrote to the tar. We do this by
2359 magnus 1058 EUB : * truncating it to the start of the header. This is safe as we only
1059 : * allow writing of the very last file.
1060 : */
2359 magnus 1061 UBC 0 : if (ftruncate(tar_data->fd, tf->ofs_start) != 0)
1062 : {
202 rhaas 1063 UNC 0 : f->wwmethod->lasterrno = errno;
2359 magnus 1064 UBC 0 : return -1;
508 tgl 1065 EUB : }
2359 magnus 1066 :
202 rhaas 1067 UNC 0 : pg_free(tf->base.pathname);
2359 magnus 1068 UBC 0 : pg_free(tf);
2359 magnus 1069 UIC 0 : tar_data->currentfile = NULL;
1070 :
1071 0 : return 0;
1072 : }
1073 :
1074 : /*
1075 : * Pad the file itself with zeroes if necessary. Note that this is
2359 magnus 1076 ECB : * different from the tar format padding -- this is the padding we asked
1077 : * for when the file was opened.
1078 : */
2359 magnus 1079 GIC 9 : if (tf->pad_to_size)
1080 : {
202 rhaas 1081 GNC 7 : if (f->wwmethod->compression_algorithm == PG_COMPRESSION_GZIP)
1082 : {
1083 : /*
2359 magnus 1084 ECB : * A compressed tarfile is padded on close since we cannot know
1085 : * the size of the compressed output until the end.
1086 : */
202 rhaas 1087 GNC 3 : size_t sizeleft = tf->pad_to_size - tf->base.currpos;
2359 magnus 1088 ECB :
2359 magnus 1089 GBC 3 : if (sizeleft)
1090 : {
2359 magnus 1091 GIC 3 : if (!tar_write_padding_data(tf, sizeleft))
2359 magnus 1092 UIC 0 : return -1;
1093 : }
1094 : }
1095 : else
1096 : {
1097 : /*
2359 magnus 1098 ECB : * An uncompressed tarfile was padded on creation, so just adjust
1099 : * the current position as if we seeked to the end.
1100 : */
202 rhaas 1101 GNC 4 : tf->base.currpos = tf->pad_to_size;
1102 : }
1103 : }
1104 :
1105 : /*
1080 rhaas 1106 ECB : * Get the size of the file, and pad out to a multiple of the tar block
1107 : * size.
2359 magnus 1108 : */
202 rhaas 1109 GNC 9 : filesize = f->currpos;
1080 rhaas 1110 GBC 9 : padding = tarPaddingBytesRequired(filesize);
2359 magnus 1111 GIC 9 : if (padding)
2359 magnus 1112 EUB : {
267 peter 1113 UNC 0 : char zerobuf[TAR_BLOCK_SIZE] = {0};
1114 :
2359 magnus 1115 UIC 0 : if (tar_write(f, zerobuf, padding) != padding)
1116 0 : return -1;
2359 magnus 1117 ECB : }
1118 :
1119 :
1120 : #ifdef HAVE_LIBZ
202 rhaas 1121 GNC 9 : if (f->wwmethod->compression_algorithm == PG_COMPRESSION_GZIP)
1122 : {
1123 : /* Flush the current buffer */
1124 3 : if (!tar_write_compressed_data(tar_data, NULL, 0, true))
2359 magnus 1125 UIC 0 : return -1;
1126 : }
1127 : #endif
1128 :
1129 : /*
2359 magnus 1130 ECB : * Now go back and update the header with the correct filesize and
1131 : * possibly also renaming the file. We overwrite the entire current header
1132 : * when done, including the checksum.
1133 : */
2359 magnus 1134 GIC 9 : print_tar_number(&(tf->header[124]), 12, filesize);
1135 :
1136 9 : if (method == CLOSE_NORMAL)
1137 :
2359 magnus 1138 ECB : /*
1139 : * We overwrite it with what it was before if we have no tempname,
1140 : * since we're going to write the buffer anyway.
1141 : */
202 rhaas 1142 GNC 9 : strlcpy(&(tf->header[0]), tf->base.pathname, 100);
2359 magnus 1143 EUB :
2359 magnus 1144 GBC 9 : print_tar_number(&(tf->header[148]), 8, tarChecksum(((TarMethodFile *) f)->header));
2359 magnus 1145 GIC 9 : if (lseek(tar_data->fd, tf->ofs_start, SEEK_SET) != ((TarMethodFile *) f)->ofs_start)
508 tgl 1146 ECB : {
202 rhaas 1147 UNC 0 : f->wwmethod->lasterrno = errno;
2359 magnus 1148 LBC 0 : return -1;
508 tgl 1149 ECB : }
202 rhaas 1150 GNC 9 : if (f->wwmethod->compression_algorithm == PG_COMPRESSION_NONE)
1151 : {
1708 michael 1152 GBC 6 : errno = 0;
1080 rhaas 1153 6 : if (write(tar_data->fd, tf->header, TAR_BLOCK_SIZE) != TAR_BLOCK_SIZE)
1154 : {
1155 : /* If write didn't set errno, assume problem is no disk space */
202 rhaas 1156 UNC 0 : f->wwmethod->lasterrno = errno ? errno : ENOSPC;
2359 magnus 1157 LBC 0 : return -1;
1158 : }
1159 : }
2359 magnus 1160 ECB : #ifdef HAVE_LIBZ
202 rhaas 1161 GNC 3 : else if (f->wwmethod->compression_algorithm == PG_COMPRESSION_GZIP)
2359 magnus 1162 EUB : {
1163 : /* Turn off compression */
207 michael 1164 GIC 3 : if (deflateParams(tar_data->zp, 0, Z_DEFAULT_STRATEGY) != Z_OK)
1165 : {
202 rhaas 1166 UNC 0 : f->wwmethod->lasterrstring = "could not change compression parameters";
2359 magnus 1167 LBC 0 : return -1;
1168 : }
2359 magnus 1169 EUB :
1170 : /* Overwrite the header, assuming the size will be the same */
202 rhaas 1171 GNC 3 : if (!tar_write_compressed_data(tar_data, tar_data->currentfile->header,
1080 rhaas 1172 ECB : TAR_BLOCK_SIZE, true))
2359 magnus 1173 UIC 0 : return -1;
1174 :
2359 magnus 1175 EUB : /* Turn compression back on */
202 rhaas 1176 GNC 3 : if (deflateParams(tar_data->zp, f->wwmethod->compression_level,
1177 : Z_DEFAULT_STRATEGY) != Z_OK)
1178 : {
202 rhaas 1179 UNC 0 : f->wwmethod->lasterrstring = "could not change compression parameters";
2359 magnus 1180 UIC 0 : return -1;
1181 : }
1182 : }
2359 magnus 1183 EUB : #endif
1184 : else
1185 : {
1186 : /* not reachable */
457 michael 1187 LBC 0 : Assert(false);
1188 : }
2359 magnus 1189 EUB :
1190 : /* Move file pointer back down to end, so we can write the next file */
2359 magnus 1191 GIC 9 : if (lseek(tar_data->fd, 0, SEEK_END) < 0)
1192 : {
202 rhaas 1193 UNC 0 : f->wwmethod->lasterrno = errno;
2359 magnus 1194 LBC 0 : return -1;
1195 : }
1196 :
2359 magnus 1197 EUB : /* Always fsync on close, so the padding gets fsynced */
1748 michael 1198 GIC 9 : if (tar_sync(f) < 0)
1199 : {
1200 : /* XXX this seems pretty bogus; why is only this case fatal? */
366 tgl 1201 UIC 0 : pg_fatal("could not fsync file \"%s\": %s",
1202 : tf->base.pathname, GetLastWalMethodError(f->wwmethod));
508 tgl 1203 ECB : }
2359 magnus 1204 :
1205 : /* Clean up and done */
202 rhaas 1206 GNC 9 : pg_free(tf->base.pathname);
2359 magnus 1207 GIC 9 : pg_free(tf);
1208 9 : tar_data->currentfile = NULL;
1209 :
2359 magnus 1210 CBC 9 : return 0;
1211 : }
2359 magnus 1212 ECB :
1213 : static bool
202 rhaas 1214 GNC 4 : tar_existsfile(WalWriteMethod *wwmethod, const char *pathname)
1215 : {
1216 4 : clear_error(wwmethod);
1217 : /* We only deal with new tarfiles, so nothing externally created exists */
2359 magnus 1218 CBC 4 : return false;
1219 : }
2359 magnus 1220 ECB :
1221 : static bool
202 rhaas 1222 GNC 7 : tar_finish(WalWriteMethod *wwmethod)
2359 magnus 1223 ECB : {
202 rhaas 1224 GNC 7 : TarMethodData *tar_data = (TarMethodData *) wwmethod;
267 peter 1225 7 : char zerobuf[1024] = {0};
2359 magnus 1226 ECB :
202 rhaas 1227 GNC 7 : clear_error(wwmethod);
2359 magnus 1228 EUB :
2359 magnus 1229 GBC 7 : if (tar_data->currentfile)
1230 : {
202 rhaas 1231 UNC 0 : if (tar_close(&tar_data->currentfile->base, CLOSE_NORMAL) != 0)
2359 magnus 1232 UIC 0 : return false;
2359 magnus 1233 ECB : }
1234 :
1748 michael 1235 : /* A tarfile always ends with two empty blocks */
202 rhaas 1236 GNC 7 : if (wwmethod->compression_algorithm == PG_COMPRESSION_NONE)
1237 : {
1708 michael 1238 GBC 4 : errno = 0;
2359 magnus 1239 4 : if (write(tar_data->fd, zerobuf, sizeof(zerobuf)) != sizeof(zerobuf))
1240 : {
1241 : /* If write didn't set errno, assume problem is no disk space */
202 rhaas 1242 UNC 0 : wwmethod->lasterrno = errno ? errno : ENOSPC;
2359 magnus 1243 LBC 0 : return false;
1244 : }
2359 magnus 1245 ECB : }
1246 : #ifdef HAVE_LIBZ
202 rhaas 1247 GNC 3 : else if (wwmethod->compression_algorithm == PG_COMPRESSION_GZIP)
1248 : {
1249 3 : if (!tar_write_compressed_data(tar_data, zerobuf, sizeof(zerobuf),
1250 : false))
2359 magnus 1251 LBC 0 : return false;
2359 magnus 1252 ECB :
1253 : /* Also flush all data to make sure the gzip stream is finished */
2359 magnus 1254 GBC 3 : tar_data->zp->next_in = NULL;
2359 magnus 1255 GIC 3 : tar_data->zp->avail_in = 0;
1256 : while (true)
2359 magnus 1257 LBC 0 : {
1258 : int r;
2359 magnus 1259 ECB :
2359 magnus 1260 GIC 3 : r = deflate(tar_data->zp, Z_FINISH);
2359 magnus 1261 EUB :
2359 magnus 1262 GBC 3 : if (r == Z_STREAM_ERROR)
1263 : {
202 rhaas 1264 UNC 0 : wwmethod->lasterrstring = "could not compress data";
2359 magnus 1265 UIC 0 : return false;
2359 magnus 1266 ECB : }
2359 magnus 1267 GIC 3 : if (tar_data->zp->avail_out < ZLIB_OUT_SIZE)
2359 magnus 1268 ECB : {
2359 magnus 1269 CBC 3 : size_t len = ZLIB_OUT_SIZE - tar_data->zp->avail_out;
1270 :
1708 michael 1271 GIC 3 : errno = 0;
2359 magnus 1272 3 : if (write(tar_data->fd, tar_data->zlibOut, len) != len)
1273 : {
1274 : /*
1749 michael 1275 EUB : * If write didn't set errno, assume problem is no disk
1276 : * space.
1277 : */
202 rhaas 1278 UNC 0 : wwmethod->lasterrno = errno ? errno : ENOSPC;
2359 magnus 1279 LBC 0 : return false;
1749 michael 1280 ECB : }
1281 : }
2359 magnus 1282 GIC 3 : if (r == Z_STREAM_END)
2359 magnus 1283 CBC 3 : break;
1284 : }
2359 magnus 1285 EUB :
2359 magnus 1286 GBC 3 : if (deflateEnd(tar_data->zp) != Z_OK)
1287 : {
202 rhaas 1288 UNC 0 : wwmethod->lasterrstring = "could not close compression stream";
2359 magnus 1289 UIC 0 : return false;
1290 : }
1291 : }
1292 : #endif
457 michael 1293 EUB : else
1294 : {
1295 : /* not reachable */
457 michael 1296 UIC 0 : Assert(false);
457 michael 1297 ECB : }
1298 :
2359 magnus 1299 EUB : /* sync the empty blocks as well, since they're after the last file */
202 rhaas 1300 GNC 7 : if (wwmethod->sync)
1748 michael 1301 EUB : {
1748 michael 1302 UBC 0 : if (fsync(tar_data->fd) != 0)
1303 : {
202 rhaas 1304 UNC 0 : wwmethod->lasterrno = errno;
1748 michael 1305 UIC 0 : return false;
508 tgl 1306 ECB : }
1307 : }
2359 magnus 1308 EUB :
2359 magnus 1309 GBC 7 : if (close(tar_data->fd) != 0)
1310 : {
202 rhaas 1311 UNC 0 : wwmethod->lasterrno = errno;
2359 magnus 1312 LBC 0 : return false;
1313 : }
2359 magnus 1314 ECB :
2359 magnus 1315 GIC 7 : tar_data->fd = -1;
2359 magnus 1316 EUB :
202 rhaas 1317 GNC 7 : if (wwmethod->sync)
1318 : {
508 tgl 1319 UBC 0 : if (fsync_fname(tar_data->tarfilename, false) != 0 ||
1320 0 : fsync_parent_path(tar_data->tarfilename) != 0)
1321 : {
202 rhaas 1322 UNC 0 : wwmethod->lasterrno = errno;
2359 magnus 1323 UIC 0 : return false;
508 tgl 1324 ECB : }
1325 : }
1326 :
2359 magnus 1327 GIC 7 : return true;
2359 magnus 1328 ECB : }
1329 :
1330 : static void
202 rhaas 1331 GNC 7 : tar_free(WalWriteMethod *wwmethod)
1332 : {
1333 7 : TarMethodData *tar_data = (TarMethodData *) wwmethod;
1334 :
1335 7 : pg_free(tar_data->tarfilename);
1336 : #ifdef HAVE_LIBZ
1337 7 : if (wwmethod->compression_algorithm == PG_COMPRESSION_GZIP)
1338 3 : pg_free(tar_data->zlibOut);
1339 : #endif
1340 7 : pg_free(wwmethod);
1341 7 : }
1342 :
521 michael 1343 ECB : /*
1344 : * The argument compression_algorithm is currently ignored. It is in place for
1345 : * symmetry with CreateWalDirectoryMethod which uses it for distinguishing
1346 : * between the different compression methods. CreateWalTarMethod and its family
1347 : * of functions handle only zlib compression.
1348 : */
1349 : WalWriteMethod *
521 michael 1350 CBC 7 : CreateWalTarMethod(const char *tarbase,
362 michael 1351 ECB : pg_compress_algorithm compression_algorithm,
1352 : int compression_level, bool sync)
1353 : {
1354 : TarMethodData *wwmethod;
362 michael 1355 GIC 7 : const char *suffix = (compression_algorithm == PG_COMPRESSION_GZIP) ?
457 1356 7 : ".tar.gz" : ".tar";
1357 :
202 rhaas 1358 GNC 7 : wwmethod = pg_malloc0(sizeof(TarMethodData));
1359 7 : *((const WalWriteMethodOps **) &wwmethod->base.ops) =
1360 : &WalTarMethodOps;
1361 7 : wwmethod->base.compression_algorithm = compression_algorithm;
1362 7 : wwmethod->base.compression_level = compression_level;
1363 7 : wwmethod->base.sync = sync;
1364 7 : clear_error(&wwmethod->base);
1365 :
1366 7 : wwmethod->tarfilename = pg_malloc0(strlen(tarbase) + strlen(suffix) + 1);
1367 7 : sprintf(wwmethod->tarfilename, "%s%s", tarbase, suffix);
1368 7 : wwmethod->fd = -1;
2359 magnus 1369 ECB : #ifdef HAVE_LIBZ
362 michael 1370 GIC 7 : if (compression_algorithm == PG_COMPRESSION_GZIP)
202 rhaas 1371 GNC 3 : wwmethod->zlibOut = (char *) pg_malloc(ZLIB_OUT_SIZE + 1);
2359 magnus 1372 ECB : #endif
1373 :
202 rhaas 1374 GNC 7 : return &wwmethod->base;
2359 magnus 1375 ECB : }
1376 :
1377 : const char *
202 rhaas 1378 UNC 0 : GetLastWalMethodError(WalWriteMethod *wwmethod)
2357 magnus 1379 EUB : {
202 rhaas 1380 UNC 0 : if (wwmethod->lasterrstring)
1381 0 : return wwmethod->lasterrstring;
1382 0 : return strerror(wwmethod->lasterrno);
1383 : }
|