Age Owner TLA Line data Source code
1 : /*-------------------------------------------------------------------------
2 : *
3 : * pg_receivewal.c - receive streaming WAL data and write it
4 : * to a local file.
5 : *
6 : * Author: Magnus Hagander <magnus@hagander.net>
7 : *
8 : * Portions Copyright (c) 1996-2023, PostgreSQL Global Development Group
9 : *
10 : * IDENTIFICATION
11 : * src/bin/pg_basebackup/pg_receivewal.c
12 : *-------------------------------------------------------------------------
13 : */
14 :
15 : #include "postgres_fe.h"
16 :
17 : #include <dirent.h>
18 : #include <limits.h>
19 : #include <signal.h>
20 : #include <sys/stat.h>
21 : #include <unistd.h>
22 :
23 : #ifdef USE_LZ4
24 : #include <lz4frame.h>
25 : #endif
26 : #ifdef HAVE_LIBZ
27 : #include <zlib.h>
28 : #endif
29 :
30 : #include "access/xlog_internal.h"
31 : #include "common/file_perm.h"
32 : #include "common/logging.h"
33 : #include "fe_utils/option_utils.h"
34 : #include "getopt_long.h"
35 : #include "libpq-fe.h"
36 : #include "receivelog.h"
37 : #include "streamutil.h"
38 :
39 : /* Time to sleep between reconnection attempts */
40 : #define RECONNECT_SLEEP_TIME 5
41 :
42 : /* Global options */
43 : static char *basedir = NULL;
44 : static int verbose = 0;
45 : static int compresslevel = 0;
46 : static bool noloop = false;
47 : static int standby_message_timeout = 10 * 1000; /* 10 sec = default */
48 : static volatile sig_atomic_t time_to_stop = false;
49 : static bool do_create_slot = false;
50 : static bool slot_exists_ok = false;
51 : static bool do_drop_slot = false;
52 : static bool do_sync = true;
53 : static bool synchronous = false;
54 : static char *replication_slot = NULL;
55 : static pg_compress_algorithm compression_algorithm = PG_COMPRESSION_NONE;
56 : static XLogRecPtr endpos = InvalidXLogRecPtr;
57 :
58 :
59 : static void usage(void);
60 : static DIR *get_destination_dir(char *dest_folder);
61 : static void close_destination_dir(DIR *dest_dir, char *dest_folder);
62 : static XLogRecPtr FindStreamingStart(uint32 *tli);
63 : static void StreamLog(void);
64 : static bool stop_streaming(XLogRecPtr xlogpos, uint32 timeline,
65 : bool segment_finished);
4183 magnus 66 ECB :
67 : static void
1562 peter 68 CBC 9 : disconnect_atexit(void)
1562 peter 69 ECB : {
1562 peter 70 CBC 9 : if (conn != NULL)
1562 peter 71 GIC 3 : PQfinish(conn);
72 9 : }
3346 magnus 73 ECB :
74 : static void
4183 magnus 75 CBC 1 : usage(void)
76 : {
2158 peter_e 77 1 : printf(_("%s receives PostgreSQL streaming write-ahead logs.\n\n"),
4183 magnus 78 ECB : progname);
4183 magnus 79 CBC 1 : printf(_("Usage:\n"));
80 1 : printf(_(" %s [OPTION]...\n"), progname);
3953 peter_e 81 1 : printf(_("\nOptions:\n"));
2158 82 1 : printf(_(" -D, --directory=DIR receive write-ahead log files into this directory\n"));
2036 83 1 : printf(_(" -E, --endpos=LSN exit after receiving the specified LSN\n"));
2762 84 1 : printf(_(" --if-not-exists do not error if slot already exists when creating a slot\n"));
3904 alvherre 85 1 : printf(_(" -n, --no-loop do not loop on connection lost\n"));
1988 rhaas 86 GIC 1 : printf(_(" --no-sync do not wait for changes to be written safely to disk\n"));
3101 peter_e 87 CBC 1 : printf(_(" -s, --status-interval=SECS\n"
3101 peter_e 88 ECB : " time between status packets sent to server (default: %d)\n"), (standby_message_timeout / 1000));
3101 peter_e 89 CBC 1 : printf(_(" -S, --slot=SLOTNAME replication slot to use\n"));
2158 90 1 : printf(_(" --synchronous flush write-ahead log immediately after writing\n"));
3904 alvherre 91 1 : printf(_(" -v, --verbose output verbose messages\n"));
3904 alvherre 92 GIC 1 : printf(_(" -V, --version output version information, then exit\n"));
361 michael 93 CBC 1 : printf(_(" -Z, --compress=METHOD[:DETAIL]\n"
361 michael 94 ECB : " compress as specified\n"));
3904 alvherre 95 CBC 1 : printf(_(" -?, --help show this help, then exit\n"));
4183 magnus 96 1 : printf(_("\nConnection options:\n"));
3695 heikki.linnakangas 97 1 : printf(_(" -d, --dbname=CONNSTR connection string\n"));
3904 alvherre 98 1 : printf(_(" -h, --host=HOSTNAME database server host or socket directory\n"));
99 1 : printf(_(" -p, --port=PORT database server port number\n"));
100 1 : printf(_(" -U, --username=NAME connect as specified database user\n"));
101 1 : printf(_(" -w, --no-password never prompt for password\n"));
102 1 : printf(_(" -W, --password force password prompt (should happen automatically)\n"));
3107 andres 103 1 : printf(_("\nOptional actions:\n"));
104 1 : printf(_(" --create-slot create a new replication slot (for the slot's name see --slot)\n"));
105 1 : printf(_(" --drop-slot drop the replication slot (for the slot's name see --slot)\n"));
1136 peter 106 1 : printf(_("\nReport bugs to <%s>.\n"), PACKAGE_BUGREPORT);
1136 peter 107 GIC 1 : printf(_("%s home page: <%s>\n"), PACKAGE_NAME, PACKAGE_URL);
4183 magnus 108 1 : }
109 :
361 michael 110 ECB :
521 111 : /*
112 : * Check if the filename looks like a WAL file, letting caller know if this
113 : * WAL segment is partial and/or compressed.
114 : */
115 : static bool
521 michael 116 GIC 23 : is_xlogfilename(const char *filename, bool *ispartial,
117 : pg_compress_algorithm *wal_compression_algorithm)
118 : {
521 michael 119 GBC 23 : size_t fname_len = strlen(filename);
521 michael 120 GIC 23 : size_t xlog_pattern_len = strspn(filename, "0123456789ABCDEF");
121 :
122 : /* File does not look like a WAL file */
521 michael 123 CBC 23 : if (xlog_pattern_len != XLOG_FNAME_LEN)
521 michael 124 GIC 14 : return false;
125 :
126 : /* File looks like a completed uncompressed WAL file */
127 9 : if (fname_len == XLOG_FNAME_LEN)
128 : {
521 michael 129 LBC 0 : *ispartial = false;
362 130 0 : *wal_compression_algorithm = PG_COMPRESSION_NONE;
521 michael 131 UIC 0 : return true;
132 : }
133 :
521 michael 134 ECB : /* File looks like a completed gzip-compressed WAL file */
521 michael 135 GIC 9 : if (fname_len == XLOG_FNAME_LEN + strlen(".gz") &&
521 michael 136 CBC 2 : strcmp(filename + XLOG_FNAME_LEN, ".gz") == 0)
521 michael 137 ECB : {
521 michael 138 GIC 2 : *ispartial = false;
362 139 2 : *wal_compression_algorithm = PG_COMPRESSION_GZIP;
521 michael 140 CBC 2 : return true;
521 michael 141 ECB : }
142 :
143 : /* File looks like a completed LZ4-compressed WAL file */
520 michael 144 GIC 7 : if (fname_len == XLOG_FNAME_LEN + strlen(".lz4") &&
145 1 : strcmp(filename + XLOG_FNAME_LEN, ".lz4") == 0)
146 : {
147 1 : *ispartial = false;
362 148 1 : *wal_compression_algorithm = PG_COMPRESSION_LZ4;
520 149 1 : return true;
150 : }
151 :
521 michael 152 ECB : /* File looks like a partial uncompressed WAL file */
521 michael 153 CBC 6 : if (fname_len == XLOG_FNAME_LEN + strlen(".partial") &&
521 michael 154 GIC 3 : strcmp(filename + XLOG_FNAME_LEN, ".partial") == 0)
155 : {
156 3 : *ispartial = true;
362 michael 157 CBC 3 : *wal_compression_algorithm = PG_COMPRESSION_NONE;
521 158 3 : return true;
159 : }
521 michael 160 ECB :
161 : /* File looks like a partial gzip-compressed WAL file */
521 michael 162 GBC 3 : if (fname_len == XLOG_FNAME_LEN + strlen(".gz.partial") &&
163 2 : strcmp(filename + XLOG_FNAME_LEN, ".gz.partial") == 0)
521 michael 164 EUB : {
521 michael 165 GIC 2 : *ispartial = true;
362 michael 166 CBC 2 : *wal_compression_algorithm = PG_COMPRESSION_GZIP;
521 michael 167 GIC 2 : return true;
168 : }
169 :
170 : /* File looks like a partial LZ4-compressed WAL file */
520 171 1 : if (fname_len == XLOG_FNAME_LEN + strlen(".lz4.partial") &&
172 1 : strcmp(filename + XLOG_FNAME_LEN, ".lz4.partial") == 0)
173 : {
520 michael 174 CBC 1 : *ispartial = true;
362 michael 175 GIC 1 : *wal_compression_algorithm = PG_COMPRESSION_LZ4;
520 176 1 : return true;
177 : }
520 michael 178 ECB :
521 179 : /* File does not look like something we know */
521 michael 180 LBC 0 : return false;
521 michael 181 EUB : }
182 :
4183 magnus 183 ECB : static bool
3734 heikki.linnakangas 184 GIC 182 : stop_streaming(XLogRecPtr xlogpos, uint32 timeline, bool segment_finished)
185 : {
186 : static uint32 prevtimeline = 0;
187 : static XLogRecPtr prevpos = InvalidXLogRecPtr;
188 :
189 : /* we assume that we get called once at the end of each segment */
3971 magnus 190 182 : if (verbose && segment_finished)
1469 peter 191 CBC 6 : pg_log_info("finished segment at %X/%X (timeline %u)",
192 : LSN_FORMAT_ARGS(xlogpos),
1469 peter 193 ECB : timeline);
4183 magnus 194 :
2036 peter_e 195 GBC 182 : if (!XLogRecPtrIsInvalid(endpos) && endpos < xlogpos)
2036 peter_e 196 ECB : {
2036 peter_e 197 GIC 12 : if (verbose)
1469 peter 198 12 : pg_log_info("stopped log streaming at %X/%X (timeline %u)",
199 : LSN_FORMAT_ARGS(xlogpos),
200 : timeline);
2036 peter_e 201 12 : time_to_stop = true;
202 12 : return true;
203 : }
204 :
205 : /*
206 : * Note that we report the previous, not current, position here. After a
3623 heikki.linnakangas 207 ECB : * timeline switch, xlogpos points to the beginning of the segment because
208 : * that's where we always begin streaming. Reporting the end of previous
209 : * timeline isn't totally accurate, because the next timeline can begin
210 : * slightly before the end of the WAL that we received on the previous
211 : * timeline, but it's close enough for reporting purposes.
3734 212 : */
2063 peter_e 213 CBC 170 : if (verbose && prevtimeline != 0 && prevtimeline != timeline)
1469 peter 214 GIC 1 : pg_log_info("switched to timeline %u at %X/%X",
1469 peter 215 ECB : timeline,
216 : LSN_FORMAT_ARGS(prevpos));
3734 heikki.linnakangas 217 :
3734 heikki.linnakangas 218 GIC 170 : prevtimeline = timeline;
219 170 : prevpos = xlogpos;
220 :
2036 peter_e 221 170 : if (time_to_stop)
222 : {
2063 peter_e 223 UIC 0 : if (verbose)
1469 peter 224 LBC 0 : pg_log_info("received interrupt signal, exiting");
4183 magnus 225 UIC 0 : return true;
4183 magnus 226 ECB : }
4183 magnus 227 GIC 170 : return false;
228 : }
229 :
230 :
3107 andres 231 ECB : /*
232 : * Get destination directory.
233 : */
234 : static DIR *
3107 andres 235 GIC 14 : get_destination_dir(char *dest_folder)
236 : {
237 : DIR *dir;
238 :
239 14 : Assert(dest_folder != NULL);
240 14 : dir = opendir(dest_folder);
241 14 : if (dir == NULL)
366 tgl 242 UIC 0 : pg_fatal("could not open directory \"%s\": %m", dest_folder);
243 :
3107 andres 244 GIC 14 : return dir;
245 : }
246 :
247 :
248 : /*
249 : * Close existing directory.
250 : */
3107 andres 251 ECB : static void
3107 andres 252 GBC 14 : close_destination_dir(DIR *dest_dir, char *dest_folder)
253 : {
3107 andres 254 GIC 14 : Assert(dest_dir != NULL && dest_folder != NULL);
255 14 : if (closedir(dest_dir))
366 tgl 256 UBC 0 : pg_fatal("could not close directory \"%s\": %m", dest_folder);
3107 andres 257 GBC 14 : }
3107 andres 258 EUB :
259 :
4183 magnus 260 : /*
261 : * Determine starting location for streaming, based on any existing xlog
3734 heikki.linnakangas 262 : * segments in the directory. We start at the end of the last one that is
263 : * complete (size matches wal segment size), on the timeline with highest ID.
264 : *
265 : * If there are no WAL files in the directory, returns InvalidXLogRecPtr.
266 : */
4183 magnus 267 ECB : static XLogRecPtr
3734 heikki.linnakangas 268 CBC 7 : FindStreamingStart(uint32 *tli)
269 : {
270 : DIR *dir;
271 : struct dirent *dirent;
3941 heikki.linnakangas 272 GIC 7 : XLogSegNo high_segno = 0;
3734 273 7 : uint32 high_tli = 0;
3485 274 7 : bool high_ispartial = false;
4183 magnus 275 ECB :
3107 andres 276 GIC 7 : dir = get_destination_dir(basedir);
4183 magnus 277 ECB :
3306 bruce 278 CBC 30 : while (errno = 0, (dirent = readdir(dir)) != NULL)
4183 magnus 279 EUB : {
280 : uint32 tli;
3941 heikki.linnakangas 281 ECB : XLogSegNo segno;
362 michael 282 EUB : pg_compress_algorithm wal_compression_algorithm;
283 : bool ispartial;
4183 magnus 284 ECB :
521 michael 285 CBC 23 : if (!is_xlogfilename(dirent->d_name,
286 : &ispartial, &wal_compression_algorithm))
4183 magnus 287 GBC 14 : continue;
4183 magnus 288 EUB :
289 : /*
290 : * Looks like an xlog file. Parse its position.
291 : */
2028 andres 292 GIC 9 : XLogFromFileName(dirent->d_name, &tli, &segno, WalSegSz);
293 :
294 : /*
3485 heikki.linnakangas 295 ECB : * Check that the segment has the right size, if it's supposed to be
2273 magnus 296 : * completed. For non-compressed segments just check the on-disk size
520 michael 297 : * and see if it matches a completed segment. For gzip-compressed
298 : * segments, look at the last 4 bytes of the compressed file, which is
521 299 : * where the uncompressed size is located for files with a size lower
300 : * than 4GB, and then compare it to the size of a completed segment.
521 michael 301 EUB : * The 4 last bytes correspond to the ISIZE member according to
302 : * http://www.zlib.org/rfc-gzip.html.
520 303 : *
304 : * For LZ4-compressed segments, uncompress the file in a throw-away
305 : * buffer keeping track of the uncompressed size, then compare it to
520 michael 306 ECB : * the size of a completed segment. Per its protocol, LZ4 does not
307 : * store the uncompressed size of an object by default. contentSize
308 : * is one possible way to do that, but we need to rely on a method
309 : * where WAL segments could have been compressed by a different source
310 : * than pg_receivewal, like an archive_command with lz4.
311 : */
362 michael 312 CBC 9 : if (!ispartial && wal_compression_algorithm == PG_COMPRESSION_NONE)
4183 magnus 313 UIC 0 : {
314 : struct stat statbuf;
315 : char fullpath[MAXPGPATH * 2];
4183 magnus 316 ECB :
3485 heikki.linnakangas 317 UIC 0 : snprintf(fullpath, sizeof(fullpath), "%s/%s", basedir, dirent->d_name);
318 0 : if (stat(fullpath, &statbuf) != 0)
366 tgl 319 0 : pg_fatal("could not stat file \"%s\": %m", fullpath);
3485 heikki.linnakangas 320 ECB :
2028 andres 321 LBC 0 : if (statbuf.st_size != WalSegSz)
322 : {
927 peter 323 0 : pg_log_warning("segment file \"%s\" has incorrect size %lld, skipping",
927 peter 324 ECB : dirent->d_name, (long long int) statbuf.st_size);
4183 magnus 325 UBC 0 : continue;
326 : }
4183 magnus 327 ECB : }
362 michael 328 CBC 9 : else if (!ispartial && wal_compression_algorithm == PG_COMPRESSION_GZIP)
2273 magnus 329 GBC 2 : {
330 : int fd;
331 : char buf[4];
2153 bruce 332 ECB : int bytes_out;
333 : char fullpath[MAXPGPATH * 2];
334 : int r;
335 :
2273 magnus 336 GIC 2 : snprintf(fullpath, sizeof(fullpath), "%s/%s", basedir, dirent->d_name);
337 :
1668 michael 338 2 : fd = open(fullpath, O_RDONLY | PG_BINARY, 0);
2273 magnus 339 CBC 2 : if (fd < 0)
366 tgl 340 LBC 0 : pg_fatal("could not open compressed file \"%s\": %m",
366 tgl 341 EUB : fullpath);
2153 bruce 342 GIC 2 : if (lseek(fd, (off_t) (-4), SEEK_END) < 0)
366 tgl 343 UIC 0 : pg_fatal("could not seek in compressed file \"%s\": %m",
366 tgl 344 ECB : fullpath);
1726 michael 345 CBC 2 : r = read(fd, (char *) buf, sizeof(buf));
1726 michael 346 GIC 2 : if (r != sizeof(buf))
347 : {
1726 michael 348 LBC 0 : if (r < 0)
366 tgl 349 0 : pg_fatal("could not read compressed file \"%s\": %m",
366 tgl 350 ECB : fullpath);
351 : else
366 tgl 352 LBC 0 : pg_fatal("could not read compressed file \"%s\": read %d of %zu",
366 tgl 353 ECB : fullpath, r, sizeof(buf));
354 : }
2273 magnus 355 :
2273 magnus 356 CBC 2 : close(fd);
2273 magnus 357 GIC 2 : bytes_out = (buf[3] << 24) | (buf[2] << 16) |
2153 bruce 358 CBC 2 : (buf[1] << 8) | buf[0];
2273 magnus 359 EUB :
2028 andres 360 GIC 2 : if (bytes_out != WalSegSz)
361 : {
1469 peter 362 UIC 0 : pg_log_warning("compressed segment file \"%s\" has incorrect uncompressed size %d, skipping",
1469 peter 363 ECB : dirent->d_name, bytes_out);
2273 magnus 364 LBC 0 : continue;
365 : }
366 : }
362 michael 367 GIC 7 : else if (!ispartial && wal_compression_algorithm == PG_COMPRESSION_LZ4)
368 : {
369 : #ifdef USE_LZ4
370 : #define LZ4_CHUNK_SZ 64 * 1024 /* 64kB as maximum chunk size read */
371 : int fd;
372 : ssize_t r;
520 373 1 : size_t uncompressed_size = 0;
520 michael 374 ECB : char fullpath[MAXPGPATH * 2];
375 : char *outbuf;
376 : char *readbuf;
520 michael 377 CBC 1 : LZ4F_decompressionContext_t ctx = NULL;
520 michael 378 ECB : LZ4F_decompressOptions_t dec_opt;
379 : LZ4F_errorCode_t status;
380 :
520 michael 381 CBC 1 : memset(&dec_opt, 0, sizeof(dec_opt));
520 michael 382 GBC 1 : snprintf(fullpath, sizeof(fullpath), "%s/%s", basedir, dirent->d_name);
383 :
520 michael 384 GIC 1 : fd = open(fullpath, O_RDONLY | PG_BINARY, 0);
520 michael 385 CBC 1 : if (fd < 0)
366 tgl 386 UIC 0 : pg_fatal("could not open file \"%s\": %m", fullpath);
520 michael 387 EUB :
520 michael 388 GIC 1 : status = LZ4F_createDecompressionContext(&ctx, LZ4F_VERSION);
520 michael 389 GBC 1 : if (LZ4F_isError(status))
366 tgl 390 UIC 0 : pg_fatal("could not create LZ4 decompression context: %s",
391 : LZ4F_getErrorName(status));
392 :
520 michael 393 GIC 1 : outbuf = pg_malloc0(LZ4_CHUNK_SZ);
394 1 : readbuf = pg_malloc0(LZ4_CHUNK_SZ);
395 : do
396 : {
397 : char *readp;
398 : char *readend;
520 michael 399 ECB :
520 michael 400 CBC 2 : r = read(fd, readbuf, LZ4_CHUNK_SZ);
401 2 : if (r < 0)
366 tgl 402 UIC 0 : pg_fatal("could not read file \"%s\": %m", fullpath);
520 michael 403 ECB :
404 : /* Done reading the file */
520 michael 405 CBC 2 : if (r == 0)
520 michael 406 GIC 1 : break;
407 :
408 : /* Process one chunk */
520 michael 409 CBC 1 : readp = readbuf;
520 michael 410 GBC 1 : readend = readbuf + r;
520 michael 411 GIC 17 : while (readp < readend)
520 michael 412 ECB : {
520 michael 413 GIC 16 : size_t out_size = LZ4_CHUNK_SZ;
520 michael 414 CBC 16 : size_t read_size = readend - readp;
415 :
520 michael 416 GIC 16 : memset(outbuf, 0, LZ4_CHUNK_SZ);
417 16 : status = LZ4F_decompress(ctx, outbuf, &out_size,
418 : readp, &read_size, &dec_opt);
419 16 : if (LZ4F_isError(status))
366 tgl 420 UIC 0 : pg_fatal("could not decompress file \"%s\": %s",
421 : fullpath,
422 : LZ4F_getErrorName(status));
520 michael 423 ECB :
520 michael 424 GBC 16 : readp += read_size;
520 michael 425 GIC 16 : uncompressed_size += out_size;
520 michael 426 ECB : }
427 :
428 : /*
429 : * No need to continue reading the file when the
430 : * uncompressed_size exceeds WalSegSz, even if there are still
431 : * data left to read. However, if uncompressed_size is equal
432 : * to WalSegSz, it should verify that there is no more data to
433 : * read.
434 : */
520 michael 435 GIC 1 : } while (uncompressed_size <= WalSegSz && r > 0);
436 :
437 1 : close(fd);
438 1 : pg_free(outbuf);
520 michael 439 CBC 1 : pg_free(readbuf);
440 :
520 michael 441 GIC 1 : status = LZ4F_freeDecompressionContext(ctx);
442 1 : if (LZ4F_isError(status))
366 tgl 443 LBC 0 : pg_fatal("could not free LZ4 decompression context: %s",
444 : LZ4F_getErrorName(status));
445 :
520 michael 446 GIC 1 : if (uncompressed_size != WalSegSz)
447 : {
517 tgl 448 UIC 0 : pg_log_warning("compressed segment file \"%s\" has incorrect uncompressed size %zu, skipping",
520 michael 449 ECB : dirent->d_name, uncompressed_size);
520 michael 450 UBC 0 : continue;
520 michael 451 ECB : }
452 : #else
197 peter 453 : pg_log_error("cannot check file \"%s\": compression with %s not supported by this build",
454 : dirent->d_name, "LZ4");
520 michael 455 : exit(1);
456 : #endif
457 : }
458 :
459 : /* Looks like a valid segment. Remember that we saw it. */
3485 heikki.linnakangas 460 GIC 9 : if ((segno > high_segno) ||
461 4 : (segno == high_segno && tli > high_tli) ||
3485 heikki.linnakangas 462 GBC 4 : (segno == high_segno && tli == high_tli && high_ispartial && !ispartial))
463 : {
3485 heikki.linnakangas 464 GIC 5 : high_segno = segno;
465 5 : high_tli = tli;
466 5 : high_ispartial = ispartial;
467 : }
468 : }
469 :
3306 bruce 470 CBC 7 : if (errno)
366 tgl 471 UBC 0 : pg_fatal("could not read directory \"%s\": %m", basedir);
472 :
3107 andres 473 GIC 7 : close_destination_dir(dir, basedir);
474 :
3941 heikki.linnakangas 475 7 : if (high_segno > 0)
4183 magnus 476 ECB : {
477 : XLogRecPtr high_ptr;
478 :
479 : /*
480 : * Move the starting pointer to the start of the next segment, if the
481 : * highest one we saw was completed. Otherwise start streaming from
482 : * the beginning of the .partial segment.
4175 483 : */
3485 heikki.linnakangas 484 CBC 3 : if (!high_ispartial)
3485 heikki.linnakangas 485 UIC 0 : high_segno++;
4183 magnus 486 ECB :
1735 alvherre 487 GIC 3 : XLogSegNoOffsetToRecPtr(high_segno, 0, WalSegSz, high_ptr);
488 :
3734 heikki.linnakangas 489 3 : *tli = high_tli;
4183 magnus 490 CBC 3 : return high_ptr;
491 : }
492 : else
3734 heikki.linnakangas 493 GIC 4 : return InvalidXLogRecPtr;
494 : }
495 :
496 : /*
497 : * Start the log streaming
4183 magnus 498 ECB : */
499 : static void
4183 magnus 500 CBC 7 : StreamLog(void)
4183 magnus 501 ECB : {
502 : XLogRecPtr serverpos;
503 : TimeLineID servertli;
267 peter 504 GNC 7 : StreamCtl stream = {0};
568 michael 505 ECB : char *sysidentifier;
506 :
507 : /*
508 : * Connect in replication mode to the server
4183 magnus 509 : */
3095 fujii 510 GIC 7 : if (conn == NULL)
3095 fujii 511 UIC 0 : conn = GetConnection();
3969 magnus 512 GIC 7 : if (!conn)
513 : /* Error message already written in GetConnection() */
3969 magnus 514 CBC 1 : return;
4183 magnus 515 ECB :
3670 heikki.linnakangas 516 GIC 7 : if (!CheckServerVersionForStreaming(conn))
517 : {
518 : /*
3670 heikki.linnakangas 519 ECB : * Error message already written in CheckServerVersionForStreaming().
520 : * There's no hope of recovering from a version mismatch, so don't
521 : * retry.
522 : */
1562 peter 523 LBC 0 : exit(1);
3670 heikki.linnakangas 524 ECB : }
525 :
526 : /*
527 : * Identify server, obtaining start LSN position and current timeline ID
3112 andres 528 : * at the same time, necessary if not valid data can be found in the
529 : * existing output directory.
4183 magnus 530 : */
568 michael 531 CBC 7 : if (!RunIdentifySystem(conn, &sysidentifier, &servertli, &serverpos, NULL))
1562 peter 532 UIC 0 : exit(1);
4183 magnus 533 ECB :
534 : /*
530 michael 535 : * Figure out where to start streaming. First scan the local directory.
536 : */
2585 magnus 537 GBC 7 : stream.startpos = FindStreamingStart(&stream.timeline);
538 7 : if (stream.startpos == InvalidXLogRecPtr)
539 : {
540 : /*
530 michael 541 ECB : * Try to get the starting point from the slot if any. This is
542 : * supported in PostgreSQL 15 and newer.
543 : */
530 michael 544 CBC 7 : if (replication_slot != NULL &&
530 michael 545 GIC 3 : PQserverVersion(conn) >= 150000)
546 : {
547 3 : if (!GetSlotInformation(conn, replication_slot, &stream.startpos,
548 : &stream.timeline))
549 : {
550 : /* Error is logged by GetSlotInformation() */
551 1 : return;
552 : }
553 : }
530 michael 554 EUB :
555 : /*
556 : * If it the starting point is still not known, use the current WAL
557 : * flush value as last resort.
558 : */
530 michael 559 GIC 3 : if (stream.startpos == InvalidXLogRecPtr)
560 : {
530 michael 561 CBC 1 : stream.startpos = serverpos;
530 michael 562 GIC 1 : stream.timeline = servertli;
563 : }
564 : }
565 :
566 6 : Assert(stream.startpos != InvalidXLogRecPtr &&
567 : stream.timeline != 0);
568 :
569 : /*
570 : * Always start streaming at the beginning of a segment
571 : */
2028 andres 572 6 : stream.startpos -= XLogSegmentOffset(stream.startpos, WalSegSz);
573 :
574 : /*
575 : * Start the replication
576 : */
4183 magnus 577 6 : if (verbose)
1469 peter 578 6 : pg_log_info("starting log streaming at %X/%X (timeline %u)",
579 : LSN_FORMAT_ARGS(stream.startpos),
580 : stream.timeline);
581 :
2585 magnus 582 6 : stream.stream_stop = stop_streaming;
2173 tgl 583 6 : stream.stop_socket = PGINVALID_SOCKET;
2585 magnus 584 6 : stream.standby_message_timeout = standby_message_timeout;
585 6 : stream.synchronous = synchronous;
1988 rhaas 586 6 : stream.do_sync = do_sync;
2585 magnus 587 6 : stream.mark_done = false;
521 michael 588 12 : stream.walmethod = CreateWalDirectoryMethod(basedir,
589 : compression_algorithm,
590 : compresslevel,
2273 magnus 591 6 : stream.do_sync);
2585 592 6 : stream.partial_suffix = ".partial";
2274 593 6 : stream.replication_slot = replication_slot;
568 michael 594 CBC 6 : stream.sysidentifier = sysidentifier;
4183 magnus 595 ECB :
2585 magnus 596 CBC 6 : ReceiveXlogStream(conn, &stream);
597 :
202 rhaas 598 GNC 6 : if (!stream.walmethod->ops->finish(stream.walmethod))
2359 magnus 599 ECB : {
1469 peter 600 LBC 0 : pg_log_info("could not finish writing WAL files: %m");
2359 magnus 601 UIC 0 : return;
2359 magnus 602 ECB : }
603 :
4175 magnus 604 CBC 6 : PQfinish(conn);
1562 peter 605 GIC 6 : conn = NULL;
2357 magnus 606 ECB :
202 rhaas 607 GNC 6 : stream.walmethod->ops->free(stream.walmethod);
4183 magnus 608 ECB : }
609 :
610 : /*
611 : * When SIGINT/SIGTERM are caught, just tell the system to exit at the next
612 : * possible moment.
613 : */
614 : #ifndef WIN32
4138 andrew 615 :
4183 magnus 616 : static void
207 tgl 617 UNC 0 : sigexit_handler(SIGNAL_ARGS)
4183 magnus 618 ECB : {
2036 peter_e 619 UIC 0 : time_to_stop = true;
4183 magnus 620 UBC 0 : }
4138 andrew 621 EUB : #endif
622 :
4183 magnus 623 ECB : int
4183 magnus 624 CBC 17 : main(int argc, char **argv)
4183 magnus 625 ECB : {
626 : static struct option long_options[] = {
627 : {"help", no_argument, NULL, '?'},
4183 magnus 628 EUB : {"version", no_argument, NULL, 'V'},
3953 peter_e 629 ECB : {"directory", required_argument, NULL, 'D'},
3695 heikki.linnakangas 630 : {"dbname", required_argument, NULL, 'd'},
2036 peter_e 631 EUB : {"endpos", required_argument, NULL, 'E'},
4183 magnus 632 : {"host", required_argument, NULL, 'h'},
633 : {"port", required_argument, NULL, 'p'},
4183 magnus 634 ECB : {"username", required_argument, NULL, 'U'},
3904 alvherre 635 : {"no-loop", no_argument, NULL, 'n'},
4183 magnus 636 : {"no-password", no_argument, NULL, 'w'},
4183 magnus 637 EUB : {"password", no_argument, NULL, 'W'},
3904 alvherre 638 : {"status-interval", required_argument, NULL, 's'},
3355 rhaas 639 : {"slot", required_argument, NULL, 'S'},
4183 magnus 640 : {"verbose", no_argument, NULL, 'v'},
2273 641 : {"compress", required_argument, NULL, 'Z'},
642 : /* action */
643 : {"create-slot", no_argument, NULL, 1},
3107 andres 644 : {"drop-slot", no_argument, NULL, 2},
2828 645 : {"if-not-exists", no_argument, NULL, 3},
646 : {"synchronous", no_argument, NULL, 4},
1988 rhaas 647 ECB : {"no-sync", no_argument, NULL, 5},
4183 magnus 648 : {NULL, 0, NULL, 0}
649 : };
3782 bruce 650 EUB :
4183 magnus 651 : int c;
652 : int option_index;
3107 andres 653 ECB : char *db_name;
1957 rhaas 654 : uint32 hi,
655 : lo;
361 michael 656 EUB : pg_compress_specification compression_spec;
361 michael 657 GBC 17 : char *compression_detail = NULL;
658 17 : char *compression_algorithm_str = "none";
659 17 : char *error_detail = NULL;
4183 magnus 660 EUB :
1469 peter 661 GBC 17 : pg_logging_init(argv[0]);
4183 magnus 662 CBC 17 : progname = get_progname(argv[0]);
2659 alvherre 663 17 : set_pglocale_pgservice(argv[0], PG_TEXTDOMAIN("pg_basebackup"));
664 :
4183 magnus 665 17 : if (argc > 1)
4183 magnus 666 ECB : {
4183 magnus 667 CBC 16 : if (strcmp(argv[1], "--help") == 0 || strcmp(argv[1], "-?") == 0)
4183 magnus 668 ECB : {
4183 magnus 669 CBC 1 : usage();
670 1 : exit(0);
4183 magnus 671 ECB : }
3904 alvherre 672 GBC 15 : else if (strcmp(argv[1], "-V") == 0 ||
673 15 : strcmp(argv[1], "--version") == 0)
4183 magnus 674 EUB : {
2250 rhaas 675 CBC 1 : puts("pg_receivewal (PostgreSQL) " PG_VERSION);
4183 magnus 676 1 : exit(0);
4183 magnus 677 ECB : }
678 : }
679 :
118 peter 680 GNC 66 : while ((c = getopt_long(argc, argv, "d:D:E:h:np:s:S:U:vwWZ:",
4183 magnus 681 CBC 66 : long_options, &option_index)) != -1)
682 : {
683 52 : switch (c)
4183 magnus 684 ECB : {
118 peter 685 UNC 0 : case 'd':
686 0 : connection_string = pg_strdup(optarg);
687 0 : break;
4183 magnus 688 GIC 11 : case 'D':
3841 tgl 689 11 : basedir = pg_strdup(optarg);
4183 magnus 690 11 : break;
118 peter 691 GNC 7 : case 'E':
692 7 : if (sscanf(optarg, "%X/%X", &hi, &lo) != 2)
118 peter 693 UNC 0 : pg_fatal("could not parse end position \"%s\"", optarg);
118 peter 694 GNC 7 : endpos = ((uint64) hi) << 32 | lo;
3695 heikki.linnakangas 695 GIC 7 : break;
4183 magnus 696 LBC 0 : case 'h':
3841 tgl 697 UIC 0 : dbhost = pg_strdup(optarg);
4183 magnus 698 UBC 0 : break;
118 peter 699 GNC 7 : case 'n':
700 7 : noloop = true;
701 7 : break;
4183 magnus 702 UIC 0 : case 'p':
3841 tgl 703 UBC 0 : dbport = pg_strdup(optarg);
4183 magnus 704 0 : break;
4183 magnus 705 LBC 0 : case 's':
624 michael 706 UIC 0 : if (!option_parse_int(optarg, "-s/--status-interval", 0,
707 : INT_MAX / 1000,
624 michael 708 ECB : &standby_message_timeout))
4183 magnus 709 UIC 0 : exit(1);
624 michael 710 LBC 0 : standby_message_timeout *= 1000;
4183 magnus 711 0 : break;
3355 rhaas 712 GIC 5 : case 'S':
713 5 : replication_slot = pg_strdup(optarg);
3355 rhaas 714 CBC 5 : break;
118 peter 715 UNC 0 : case 'U':
716 0 : dbuser = pg_strdup(optarg);
3969 magnus 717 UIC 0 : break;
4183 magnus 718 GIC 7 : case 'v':
4183 magnus 719 CBC 7 : verbose++;
4183 magnus 720 GIC 7 : break;
118 peter 721 UNC 0 : case 'w':
722 0 : dbgetpassword = -1;
723 0 : break;
724 0 : case 'W':
725 0 : dbgetpassword = 1;
726 0 : break;
2273 magnus 727 CBC 3 : case 'Z':
361 michael 728 3 : parse_compress_options(optarg, &compression_algorithm_str,
361 michael 729 ECB : &compression_detail);
2273 magnus 730 GIC 3 : break;
3107 andres 731 3 : case 1:
732 3 : do_create_slot = true;
733 3 : break;
3107 andres 734 CBC 2 : case 2:
3107 andres 735 GIC 2 : do_drop_slot = true;
3107 andres 736 GBC 2 : break;
3064 fujii 737 UIC 0 : case 3:
2828 andres 738 0 : slot_exists_ok = true;
2828 andres 739 LBC 0 : break;
2828 andres 740 GIC 2 : case 4:
3064 fujii 741 CBC 2 : synchronous = true;
742 2 : break;
1988 rhaas 743 4 : case 5:
1988 rhaas 744 GIC 4 : do_sync = false;
745 4 : break;
4183 magnus 746 1 : default:
366 tgl 747 ECB : /* getopt_long already emitted a complaint */
366 tgl 748 GIC 1 : pg_log_error_hint("Try \"%s --help\" for more information.", progname);
4183 magnus 749 CBC 1 : exit(1);
4183 magnus 750 EUB : }
751 : }
752 :
753 : /*
754 : * Any non-option arguments?
4183 magnus 755 ECB : */
4183 magnus 756 GIC 14 : if (optind < argc)
4183 magnus 757 ECB : {
1469 peter 758 UIC 0 : pg_log_error("too many command-line arguments (first is \"%s\")",
1469 peter 759 ECB : argv[optind]);
366 tgl 760 UIC 0 : pg_log_error_hint("Try \"%s --help\" for more information.", progname);
4183 magnus 761 0 : exit(1);
762 : }
763 :
2883 peter_e 764 GIC 14 : if (do_drop_slot && do_create_slot)
3107 andres 765 ECB : {
1469 peter 766 CBC 1 : pg_log_error("cannot use --create-slot together with --drop-slot");
366 tgl 767 GIC 1 : pg_log_error_hint("Try \"%s --help\" for more information.", progname);
3107 andres 768 GBC 1 : exit(1);
3107 andres 769 ECB : }
770 :
2883 peter_e 771 GIC 13 : if (replication_slot == NULL && (do_drop_slot || do_create_slot))
772 : {
773 : /* translator: second %s is an option name */
1469 peter 774 1 : pg_log_error("%s needs a slot to be specified using --slot",
775 : do_drop_slot ? "--drop-slot" : "--create-slot");
366 tgl 776 CBC 1 : pg_log_error_hint("Try \"%s --help\" for more information.", progname);
3107 andres 777 1 : exit(1);
778 : }
779 :
1988 rhaas 780 GIC 12 : if (synchronous && !do_sync)
781 : {
1469 peter 782 1 : pg_log_error("cannot use --synchronous together with --no-sync");
366 tgl 783 1 : pg_log_error_hint("Try \"%s --help\" for more information.", progname);
1988 rhaas 784 1 : exit(1);
1988 rhaas 785 ECB : }
1988 rhaas 786 EUB :
787 : /*
788 : * Required arguments
789 : */
2799 andres 790 GIC 11 : if (basedir == NULL && !do_drop_slot && !do_create_slot)
791 : {
1469 peter 792 CBC 1 : pg_log_error("no target directory specified");
366 tgl 793 GBC 1 : pg_log_error_hint("Try \"%s --help\" for more information.", progname);
4183 magnus 794 GIC 1 : exit(1);
795 : }
796 :
797 : /*
798 : * Compression options
799 : */
361 michael 800 10 : if (!parse_compress_algorithm(compression_algorithm_str,
801 : &compression_algorithm))
197 peter 802 UIC 0 : pg_fatal("unrecognized compression algorithm: \"%s\"",
803 : compression_algorithm_str);
361 michael 804 ECB :
361 michael 805 GIC 10 : parse_compress_specification(compression_algorithm, compression_detail,
806 : &compression_spec);
807 10 : error_detail = validate_compress_specification(&compression_spec);
808 10 : if (error_detail != NULL)
361 michael 809 CBC 1 : pg_fatal("invalid compression specification: %s",
810 : error_detail);
361 michael 811 ECB :
207 michael 812 EUB : /* Extract the compression level */
207 michael 813 GIC 9 : compresslevel = compression_spec.level;
521 michael 814 ECB :
207 michael 815 GBC 9 : if (compression_algorithm == PG_COMPRESSION_ZSTD)
207 michael 816 LBC 0 : pg_fatal("compression with %s is not yet supported", "ZSTD");
817 :
818 : /*
819 : * Check existence of destination folder.
3107 andres 820 ECB : */
2799 andres 821 GIC 9 : if (!do_drop_slot && !do_create_slot)
3107 andres 822 ECB : {
2878 bruce 823 GBC 7 : DIR *dir = get_destination_dir(basedir);
824 :
3107 andres 825 CBC 7 : close_destination_dir(dir, basedir);
826 : }
3107 andres 827 EUB :
3107 andres 828 ECB : /*
829 : * Obtain a connection before doing anything.
830 : */
3107 andres 831 GIC 9 : conn = GetConnection();
3107 andres 832 CBC 9 : if (!conn)
3107 andres 833 EUB : /* error message already written in GetConnection() */
3107 andres 834 UIC 0 : exit(1);
1562 peter 835 GIC 9 : atexit(disconnect_atexit);
836 :
837 : /*
838 : * Trap signals. (Don't do this until after the initial password prompt,
839 : * if one is needed, in GetConnection.)
840 : */
841 : #ifndef WIN32
207 dgustafsson 842 GNC 9 : pqsignal(SIGINT, sigexit_handler);
843 9 : pqsignal(SIGTERM, sigexit_handler);
504 tgl 844 ECB : #endif
845 :
846 : /*
847 : * Run IDENTIFY_SYSTEM to make sure we've successfully have established a
848 : * replication connection and haven't connected using a database specific
849 : * connection.
3107 andres 850 : */
3107 andres 851 GIC 9 : if (!RunIdentifySystem(conn, NULL, NULL, NULL, &db_name))
1562 peter 852 LBC 0 : exit(1);
3107 andres 853 ECB :
854 : /*
855 : * Check that there is a database associated with connection, none should
856 : * be defined in this context.
3107 andres 857 EUB : */
3107 andres 858 GIC 9 : if (db_name)
366 tgl 859 UBC 0 : pg_fatal("replication connection using slot \"%s\" is unexpectedly database specific",
860 : replication_slot);
861 :
862 : /*
863 : * Set umask so that directories/files are created with the same
864 : * permissions as directories/files in the source data directory.
865 : *
866 : * pg_mode_mask is set to owner-only by default and then updated in
867 : * GetConnection() where we get the mode from the server-side with
868 : * RetrieveDataDirCreatePerm() and then call SetDataDirectoryCreatePerm().
869 : */
568 michael 870 GIC 9 : umask(pg_mode_mask);
871 :
872 : /*
873 : * Drop a replication slot.
874 : */
3107 andres 875 9 : if (do_drop_slot)
876 : {
877 1 : if (verbose)
1418 tgl 878 UIC 0 : pg_log_info("dropping replication slot \"%s\"", replication_slot);
879 :
3107 andres 880 GIC 1 : if (!DropReplicationSlot(conn, replication_slot))
1562 peter 881 UIC 0 : exit(1);
1562 peter 882 GIC 1 : exit(0);
883 : }
884 :
885 : /* Create a replication slot */
3107 andres 886 8 : if (do_create_slot)
887 : {
888 1 : if (verbose)
1469 peter 889 UIC 0 : pg_log_info("creating replication slot \"%s\"", replication_slot);
890 :
2021 peter_e 891 GIC 1 : if (!CreateReplicationSlot(conn, replication_slot, NULL, false, true, false,
892 : slot_exists_ok, false))
1562 peter 893 UIC 0 : exit(1);
1562 peter 894 GIC 1 : exit(0);
895 : }
896 :
897 : /* determine remote server's xlog segment size */
568 michael 898 7 : if (!RetrieveWalSegSize(conn))
568 michael 899 UIC 0 : exit(1);
900 :
901 : /*
902 : * Don't close the connection here so that subsequent StreamLog() can
903 : * reuse it.
904 : */
905 :
906 : while (true)
907 : {
3969 magnus 908 GIC 7 : StreamLog();
2036 peter_e 909 7 : if (time_to_stop)
910 : {
911 : /*
912 : * We've been Ctrl-C'ed or end of streaming position has been
913 : * willingly reached, so exit without an error code.
914 : */
3969 magnus 915 6 : exit(0);
916 : }
917 1 : else if (noloop)
366 tgl 918 1 : pg_fatal("disconnected");
919 : else
920 : {
921 : /* translator: check source for value for %d */
1469 peter 922 UIC 0 : pg_log_info("disconnected; waiting %d seconds to try again",
923 : RECONNECT_SLEEP_TIME);
3969 magnus 924 0 : pg_usleep(RECONNECT_SLEEP_TIME * 1000000);
925 : }
926 : }
927 : }
|