Age Owner Branch data TLA Line data Source code
1 : : /*-------------------------------------------------------------------------
2 : : *
3 : : * pg_receivewal.c - receive streaming WAL data and write it
4 : : * to a local file.
5 : : *
6 : : * Author: Magnus Hagander <magnus@hagander.net>
7 : : *
8 : : * Portions Copyright (c) 1996-2024, PostgreSQL Global Development Group
9 : : *
10 : : * IDENTIFICATION
11 : : * src/bin/pg_basebackup/pg_receivewal.c
12 : : *-------------------------------------------------------------------------
13 : : */
14 : :
15 : : #include "postgres_fe.h"
16 : :
17 : : #include <dirent.h>
18 : : #include <limits.h>
19 : : #include <signal.h>
20 : : #include <sys/stat.h>
21 : : #include <unistd.h>
22 : :
23 : : #ifdef USE_LZ4
24 : : #include <lz4frame.h>
25 : : #endif
26 : : #ifdef HAVE_LIBZ
27 : : #include <zlib.h>
28 : : #endif
29 : :
30 : : #include "access/xlog_internal.h"
31 : : #include "common/file_perm.h"
32 : : #include "common/logging.h"
33 : : #include "fe_utils/option_utils.h"
34 : : #include "getopt_long.h"
35 : : #include "libpq-fe.h"
36 : : #include "receivelog.h"
37 : : #include "streamutil.h"
38 : :
39 : : /* Time to sleep between reconnection attempts */
40 : : #define RECONNECT_SLEEP_TIME 5
41 : :
42 : : /* Global options */
43 : : static char *basedir = NULL;
44 : : static int verbose = 0;
45 : : static int compresslevel = 0;
46 : : static bool noloop = false;
47 : : static int standby_message_timeout = 10 * 1000; /* 10 sec = default */
48 : : static volatile sig_atomic_t time_to_stop = false;
49 : : static bool do_create_slot = false;
50 : : static bool slot_exists_ok = false;
51 : : static bool do_drop_slot = false;
52 : : static bool do_sync = true;
53 : : static bool synchronous = false;
54 : : static char *replication_slot = NULL;
55 : : static pg_compress_algorithm compression_algorithm = PG_COMPRESSION_NONE;
56 : : static XLogRecPtr endpos = InvalidXLogRecPtr;
57 : :
58 : :
59 : : static void usage(void);
60 : : static DIR *get_destination_dir(char *dest_folder);
61 : : static void close_destination_dir(DIR *dest_dir, char *dest_folder);
62 : : static XLogRecPtr FindStreamingStart(uint32 *tli);
63 : : static void StreamLog(void);
64 : : static bool stop_streaming(XLogRecPtr xlogpos, uint32 timeline,
65 : : bool segment_finished);
66 : :
67 : : static void
1933 peter@eisentraut.org 68 :CBC 9 : disconnect_atexit(void)
69 : : {
70 [ + + ]: 9 : if (conn != NULL)
71 : 3 : PQfinish(conn);
72 : 9 : }
73 : :
74 : : static void
4554 magnus@hagander.net 75 : 1 : usage(void)
76 : : {
2529 peter_e@gmx.net 77 : 1 : printf(_("%s receives PostgreSQL streaming write-ahead logs.\n\n"),
78 : : progname);
4554 magnus@hagander.net 79 : 1 : printf(_("Usage:\n"));
80 : 1 : printf(_(" %s [OPTION]...\n"), progname);
4324 peter_e@gmx.net 81 : 1 : printf(_("\nOptions:\n"));
2529 82 : 1 : printf(_(" -D, --directory=DIR receive write-ahead log files into this directory\n"));
2407 83 : 1 : printf(_(" -E, --endpos=LSN exit after receiving the specified LSN\n"));
3133 84 : 1 : printf(_(" --if-not-exists do not error if slot already exists when creating a slot\n"));
4275 alvherre@alvh.no-ip. 85 : 1 : printf(_(" -n, --no-loop do not loop on connection lost\n"));
2359 rhaas@postgresql.org 86 : 1 : printf(_(" --no-sync do not wait for changes to be written safely to disk\n"));
3472 peter_e@gmx.net 87 : 1 : printf(_(" -s, --status-interval=SECS\n"
88 : : " time between status packets sent to server (default: %d)\n"), (standby_message_timeout / 1000));
89 : 1 : printf(_(" -S, --slot=SLOTNAME replication slot to use\n"));
2529 90 : 1 : printf(_(" --synchronous flush write-ahead log immediately after writing\n"));
4275 alvherre@alvh.no-ip. 91 : 1 : printf(_(" -v, --verbose output verbose messages\n"));
92 : 1 : printf(_(" -V, --version output version information, then exit\n"));
732 michael@paquier.xyz 93 : 1 : printf(_(" -Z, --compress=METHOD[:DETAIL]\n"
94 : : " compress as specified\n"));
4275 alvherre@alvh.no-ip. 95 : 1 : printf(_(" -?, --help show this help, then exit\n"));
4554 magnus@hagander.net 96 : 1 : printf(_("\nConnection options:\n"));
4066 heikki.linnakangas@i 97 : 1 : printf(_(" -d, --dbname=CONNSTR connection string\n"));
4275 alvherre@alvh.no-ip. 98 : 1 : printf(_(" -h, --host=HOSTNAME database server host or socket directory\n"));
99 : 1 : printf(_(" -p, --port=PORT database server port number\n"));
100 : 1 : printf(_(" -U, --username=NAME connect as specified database user\n"));
101 : 1 : printf(_(" -w, --no-password never prompt for password\n"));
102 : 1 : printf(_(" -W, --password force password prompt (should happen automatically)\n"));
3478 andres@anarazel.de 103 : 1 : printf(_("\nOptional actions:\n"));
104 : 1 : printf(_(" --create-slot create a new replication slot (for the slot's name see --slot)\n"));
105 : 1 : printf(_(" --drop-slot drop the replication slot (for the slot's name see --slot)\n"));
1507 peter@eisentraut.org 106 : 1 : printf(_("\nReport bugs to <%s>.\n"), PACKAGE_BUGREPORT);
107 : 1 : printf(_("%s home page: <%s>\n"), PACKAGE_NAME, PACKAGE_URL);
4554 magnus@hagander.net 108 : 1 : }
109 : :
110 : :
111 : : /*
112 : : * Check if the filename looks like a WAL file, letting caller know if this
113 : : * WAL segment is partial and/or compressed.
114 : : */
115 : : static bool
892 michael@paquier.xyz 116 : 23 : is_xlogfilename(const char *filename, bool *ispartial,
117 : : pg_compress_algorithm *wal_compression_algorithm)
118 : : {
119 : 23 : size_t fname_len = strlen(filename);
120 : 23 : size_t xlog_pattern_len = strspn(filename, "0123456789ABCDEF");
121 : :
122 : : /* File does not look like a WAL file */
123 [ + + ]: 23 : if (xlog_pattern_len != XLOG_FNAME_LEN)
124 : 14 : return false;
125 : :
126 : : /* File looks like a completed uncompressed WAL file */
127 [ - + ]: 9 : if (fname_len == XLOG_FNAME_LEN)
128 : : {
892 michael@paquier.xyz 129 :UBC 0 : *ispartial = false;
733 130 : 0 : *wal_compression_algorithm = PG_COMPRESSION_NONE;
892 131 : 0 : return true;
132 : : }
133 : :
134 : : /* File looks like a completed gzip-compressed WAL file */
892 michael@paquier.xyz 135 [ + + ]:CBC 9 : if (fname_len == XLOG_FNAME_LEN + strlen(".gz") &&
136 [ + - ]: 2 : strcmp(filename + XLOG_FNAME_LEN, ".gz") == 0)
137 : : {
138 : 2 : *ispartial = false;
733 139 : 2 : *wal_compression_algorithm = PG_COMPRESSION_GZIP;
892 140 : 2 : return true;
141 : : }
142 : :
143 : : /* File looks like a completed LZ4-compressed WAL file */
891 144 [ + + ]: 7 : if (fname_len == XLOG_FNAME_LEN + strlen(".lz4") &&
145 [ + - ]: 1 : strcmp(filename + XLOG_FNAME_LEN, ".lz4") == 0)
146 : : {
147 : 1 : *ispartial = false;
733 148 : 1 : *wal_compression_algorithm = PG_COMPRESSION_LZ4;
891 149 : 1 : return true;
150 : : }
151 : :
152 : : /* File looks like a partial uncompressed WAL file */
892 153 [ + + ]: 6 : if (fname_len == XLOG_FNAME_LEN + strlen(".partial") &&
154 [ + - ]: 3 : strcmp(filename + XLOG_FNAME_LEN, ".partial") == 0)
155 : : {
156 : 3 : *ispartial = true;
733 157 : 3 : *wal_compression_algorithm = PG_COMPRESSION_NONE;
892 158 : 3 : return true;
159 : : }
160 : :
161 : : /* File looks like a partial gzip-compressed WAL file */
162 [ + + ]: 3 : if (fname_len == XLOG_FNAME_LEN + strlen(".gz.partial") &&
163 [ + - ]: 2 : strcmp(filename + XLOG_FNAME_LEN, ".gz.partial") == 0)
164 : : {
165 : 2 : *ispartial = true;
733 166 : 2 : *wal_compression_algorithm = PG_COMPRESSION_GZIP;
892 167 : 2 : return true;
168 : : }
169 : :
170 : : /* File looks like a partial LZ4-compressed WAL file */
891 171 [ + - ]: 1 : if (fname_len == XLOG_FNAME_LEN + strlen(".lz4.partial") &&
172 [ + - ]: 1 : strcmp(filename + XLOG_FNAME_LEN, ".lz4.partial") == 0)
173 : : {
174 : 1 : *ispartial = true;
733 175 : 1 : *wal_compression_algorithm = PG_COMPRESSION_LZ4;
891 176 : 1 : return true;
177 : : }
178 : :
179 : : /* File does not look like something we know */
892 michael@paquier.xyz 180 :UBC 0 : return false;
181 : : }
182 : :
183 : : static bool
4105 heikki.linnakangas@i 184 :CBC 97 : stop_streaming(XLogRecPtr xlogpos, uint32 timeline, bool segment_finished)
185 : : {
186 : : static uint32 prevtimeline = 0;
187 : : static XLogRecPtr prevpos = InvalidXLogRecPtr;
188 : :
189 : : /* we assume that we get called once at the end of each segment */
4342 magnus@hagander.net 190 [ + - + + ]: 97 : if (verbose && segment_finished)
1840 peter@eisentraut.org 191 : 6 : pg_log_info("finished segment at %X/%X (timeline %u)",
192 : : LSN_FORMAT_ARGS(xlogpos),
193 : : timeline);
194 : :
2407 peter_e@gmx.net 195 [ + - + + ]: 97 : if (!XLogRecPtrIsInvalid(endpos) && endpos < xlogpos)
196 : : {
197 [ + - ]: 12 : if (verbose)
1840 peter@eisentraut.org 198 : 12 : pg_log_info("stopped log streaming at %X/%X (timeline %u)",
199 : : LSN_FORMAT_ARGS(xlogpos),
200 : : timeline);
2407 peter_e@gmx.net 201 : 12 : time_to_stop = true;
202 : 12 : return true;
203 : : }
204 : :
205 : : /*
206 : : * Note that we report the previous, not current, position here. After a
207 : : * timeline switch, xlogpos points to the beginning of the segment because
208 : : * that's where we always begin streaming. Reporting the end of previous
209 : : * timeline isn't totally accurate, because the next timeline can begin
210 : : * slightly before the end of the WAL that we received on the previous
211 : : * timeline, but it's close enough for reporting purposes.
212 : : */
2434 213 [ + - + + : 85 : if (verbose && prevtimeline != 0 && prevtimeline != timeline)
+ + ]
1840 peter@eisentraut.org 214 : 1 : pg_log_info("switched to timeline %u at %X/%X",
215 : : timeline,
216 : : LSN_FORMAT_ARGS(prevpos));
217 : :
4105 heikki.linnakangas@i 218 : 85 : prevtimeline = timeline;
219 : 85 : prevpos = xlogpos;
220 : :
2407 peter_e@gmx.net 221 [ - + ]: 85 : if (time_to_stop)
222 : : {
2434 peter_e@gmx.net 223 [ # # ]:UBC 0 : if (verbose)
1840 peter@eisentraut.org 224 : 0 : pg_log_info("received interrupt signal, exiting");
4554 magnus@hagander.net 225 : 0 : return true;
226 : : }
4554 magnus@hagander.net 227 :CBC 85 : return false;
228 : : }
229 : :
230 : :
231 : : /*
232 : : * Get destination directory.
233 : : */
234 : : static DIR *
3478 andres@anarazel.de 235 : 14 : get_destination_dir(char *dest_folder)
236 : : {
237 : : DIR *dir;
238 : :
239 [ - + ]: 14 : Assert(dest_folder != NULL);
240 : 14 : dir = opendir(dest_folder);
241 [ - + ]: 14 : if (dir == NULL)
737 tgl@sss.pgh.pa.us 242 :UBC 0 : pg_fatal("could not open directory \"%s\": %m", dest_folder);
243 : :
3478 andres@anarazel.de 244 :CBC 14 : return dir;
245 : : }
246 : :
247 : :
248 : : /*
249 : : * Close existing directory.
250 : : */
251 : : static void
252 : 14 : close_destination_dir(DIR *dest_dir, char *dest_folder)
253 : : {
254 [ + - + - ]: 14 : Assert(dest_dir != NULL && dest_folder != NULL);
255 [ - + ]: 14 : if (closedir(dest_dir))
737 tgl@sss.pgh.pa.us 256 :UBC 0 : pg_fatal("could not close directory \"%s\": %m", dest_folder);
3478 andres@anarazel.de 257 :CBC 14 : }
258 : :
259 : :
260 : : /*
261 : : * Determine starting location for streaming, based on any existing xlog
262 : : * segments in the directory. We start at the end of the last one that is
263 : : * complete (size matches wal segment size), on the timeline with highest ID.
264 : : *
265 : : * If there are no WAL files in the directory, returns InvalidXLogRecPtr.
266 : : */
267 : : static XLogRecPtr
4105 heikki.linnakangas@i 268 : 7 : FindStreamingStart(uint32 *tli)
269 : : {
270 : : DIR *dir;
271 : : struct dirent *dirent;
4312 272 : 7 : XLogSegNo high_segno = 0;
4105 273 : 7 : uint32 high_tli = 0;
3856 274 : 7 : bool high_ispartial = false;
275 : :
3478 andres@anarazel.de 276 : 7 : dir = get_destination_dir(basedir);
277 : :
3677 bruce@momjian.us 278 [ + + ]: 30 : while (errno = 0, (dirent = readdir(dir)) != NULL)
279 : : {
280 : : uint32 tli;
281 : : XLogSegNo segno;
282 : : pg_compress_algorithm wal_compression_algorithm;
283 : : bool ispartial;
284 : :
892 michael@paquier.xyz 285 [ + + ]: 23 : if (!is_xlogfilename(dirent->d_name,
286 : : &ispartial, &wal_compression_algorithm))
4554 magnus@hagander.net 287 : 14 : continue;
288 : :
289 : : /*
290 : : * Looks like an xlog file. Parse its position.
291 : : */
2399 andres@anarazel.de 292 : 9 : XLogFromFileName(dirent->d_name, &tli, &segno, WalSegSz);
293 : :
294 : : /*
295 : : * Check that the segment has the right size, if it's supposed to be
296 : : * completed. For non-compressed segments just check the on-disk size
297 : : * and see if it matches a completed segment. For gzip-compressed
298 : : * segments, look at the last 4 bytes of the compressed file, which is
299 : : * where the uncompressed size is located for files with a size lower
300 : : * than 4GB, and then compare it to the size of a completed segment.
301 : : * The 4 last bytes correspond to the ISIZE member according to
302 : : * http://www.zlib.org/rfc-gzip.html.
303 : : *
304 : : * For LZ4-compressed segments, uncompress the file in a throw-away
305 : : * buffer keeping track of the uncompressed size, then compare it to
306 : : * the size of a completed segment. Per its protocol, LZ4 does not
307 : : * store the uncompressed size of an object by default. contentSize
308 : : * is one possible way to do that, but we need to rely on a method
309 : : * where WAL segments could have been compressed by a different source
310 : : * than pg_receivewal, like an archive_command with lz4.
311 : : */
733 michael@paquier.xyz 312 [ + + - + ]: 9 : if (!ispartial && wal_compression_algorithm == PG_COMPRESSION_NONE)
4554 magnus@hagander.net 313 :UBC 0 : {
314 : : struct stat statbuf;
315 : : char fullpath[MAXPGPATH * 2];
316 : :
3856 heikki.linnakangas@i 317 : 0 : snprintf(fullpath, sizeof(fullpath), "%s/%s", basedir, dirent->d_name);
318 [ # # ]: 0 : if (stat(fullpath, &statbuf) != 0)
737 tgl@sss.pgh.pa.us 319 : 0 : pg_fatal("could not stat file \"%s\": %m", fullpath);
320 : :
2399 andres@anarazel.de 321 [ # # ]: 0 : if (statbuf.st_size != WalSegSz)
322 : : {
1298 peter@eisentraut.org 323 : 0 : pg_log_warning("segment file \"%s\" has incorrect size %lld, skipping",
324 : : dirent->d_name, (long long int) statbuf.st_size);
4554 magnus@hagander.net 325 : 0 : continue;
326 : : }
327 : : }
733 michael@paquier.xyz 328 [ + + + + ]:CBC 9 : else if (!ispartial && wal_compression_algorithm == PG_COMPRESSION_GZIP)
2644 magnus@hagander.net 329 : 2 : {
330 : : int fd;
331 : : char buf[4];
332 : : int bytes_out;
333 : : char fullpath[MAXPGPATH * 2];
334 : : int r;
335 : :
336 : 2 : snprintf(fullpath, sizeof(fullpath), "%s/%s", basedir, dirent->d_name);
337 : :
2039 michael@paquier.xyz 338 : 2 : fd = open(fullpath, O_RDONLY | PG_BINARY, 0);
2644 magnus@hagander.net 339 [ - + ]: 2 : if (fd < 0)
737 tgl@sss.pgh.pa.us 340 :UBC 0 : pg_fatal("could not open compressed file \"%s\": %m",
341 : : fullpath);
2524 bruce@momjian.us 342 [ - + ]:CBC 2 : if (lseek(fd, (off_t) (-4), SEEK_END) < 0)
737 tgl@sss.pgh.pa.us 343 :UBC 0 : pg_fatal("could not seek in compressed file \"%s\": %m",
344 : : fullpath);
2097 michael@paquier.xyz 345 :CBC 2 : r = read(fd, (char *) buf, sizeof(buf));
346 [ - + ]: 2 : if (r != sizeof(buf))
347 : : {
2097 michael@paquier.xyz 348 [ # # ]:UBC 0 : if (r < 0)
737 tgl@sss.pgh.pa.us 349 : 0 : pg_fatal("could not read compressed file \"%s\": %m",
350 : : fullpath);
351 : : else
352 : 0 : pg_fatal("could not read compressed file \"%s\": read %d of %zu",
353 : : fullpath, r, sizeof(buf));
354 : : }
355 : :
2644 magnus@hagander.net 356 :CBC 2 : close(fd);
357 : 2 : bytes_out = (buf[3] << 24) | (buf[2] << 16) |
2524 bruce@momjian.us 358 : 2 : (buf[1] << 8) | buf[0];
359 : :
2399 andres@anarazel.de 360 [ - + ]: 2 : if (bytes_out != WalSegSz)
361 : : {
1840 peter@eisentraut.org 362 :UBC 0 : pg_log_warning("compressed segment file \"%s\" has incorrect uncompressed size %d, skipping",
363 : : dirent->d_name, bytes_out);
2644 magnus@hagander.net 364 : 0 : continue;
365 : : }
366 : : }
733 michael@paquier.xyz 367 [ + + + - ]:CBC 7 : else if (!ispartial && wal_compression_algorithm == PG_COMPRESSION_LZ4)
368 : : {
369 : : #ifdef USE_LZ4
370 : : #define LZ4_CHUNK_SZ 64 * 1024 /* 64kB as maximum chunk size read */
371 : : int fd;
372 : : ssize_t r;
891 373 : 1 : size_t uncompressed_size = 0;
374 : : char fullpath[MAXPGPATH * 2];
375 : : char *outbuf;
376 : : char *readbuf;
377 : 1 : LZ4F_decompressionContext_t ctx = NULL;
378 : : LZ4F_decompressOptions_t dec_opt;
379 : : LZ4F_errorCode_t status;
380 : :
381 : 1 : memset(&dec_opt, 0, sizeof(dec_opt));
382 : 1 : snprintf(fullpath, sizeof(fullpath), "%s/%s", basedir, dirent->d_name);
383 : :
384 : 1 : fd = open(fullpath, O_RDONLY | PG_BINARY, 0);
385 [ - + ]: 1 : if (fd < 0)
737 tgl@sss.pgh.pa.us 386 :UBC 0 : pg_fatal("could not open file \"%s\": %m", fullpath);
387 : :
891 michael@paquier.xyz 388 :CBC 1 : status = LZ4F_createDecompressionContext(&ctx, LZ4F_VERSION);
389 [ - + ]: 1 : if (LZ4F_isError(status))
737 tgl@sss.pgh.pa.us 390 :UBC 0 : pg_fatal("could not create LZ4 decompression context: %s",
391 : : LZ4F_getErrorName(status));
392 : :
891 michael@paquier.xyz 393 :CBC 1 : outbuf = pg_malloc0(LZ4_CHUNK_SZ);
394 : 1 : readbuf = pg_malloc0(LZ4_CHUNK_SZ);
395 : : do
396 : : {
397 : : char *readp;
398 : : char *readend;
399 : :
400 : 2 : r = read(fd, readbuf, LZ4_CHUNK_SZ);
401 [ - + ]: 2 : if (r < 0)
737 tgl@sss.pgh.pa.us 402 :UBC 0 : pg_fatal("could not read file \"%s\": %m", fullpath);
403 : :
404 : : /* Done reading the file */
891 michael@paquier.xyz 405 [ + + ]:CBC 2 : if (r == 0)
406 : 1 : break;
407 : :
408 : : /* Process one chunk */
409 : 1 : readp = readbuf;
410 : 1 : readend = readbuf + r;
411 [ + + ]: 17 : while (readp < readend)
412 : : {
413 : 16 : size_t out_size = LZ4_CHUNK_SZ;
414 : 16 : size_t read_size = readend - readp;
415 : :
416 : 16 : memset(outbuf, 0, LZ4_CHUNK_SZ);
417 : 16 : status = LZ4F_decompress(ctx, outbuf, &out_size,
418 : : readp, &read_size, &dec_opt);
419 [ - + ]: 16 : if (LZ4F_isError(status))
737 tgl@sss.pgh.pa.us 420 :UBC 0 : pg_fatal("could not decompress file \"%s\": %s",
421 : : fullpath,
422 : : LZ4F_getErrorName(status));
423 : :
891 michael@paquier.xyz 424 :CBC 16 : readp += read_size;
425 : 16 : uncompressed_size += out_size;
426 : : }
427 : :
428 : : /*
429 : : * No need to continue reading the file when the
430 : : * uncompressed_size exceeds WalSegSz, even if there are still
431 : : * data left to read. However, if uncompressed_size is equal
432 : : * to WalSegSz, it should verify that there is no more data to
433 : : * read.
434 : : */
435 [ + - + - ]: 1 : } while (uncompressed_size <= WalSegSz && r > 0);
436 : :
437 : 1 : close(fd);
438 : 1 : pg_free(outbuf);
439 : 1 : pg_free(readbuf);
440 : :
441 : 1 : status = LZ4F_freeDecompressionContext(ctx);
442 [ - + ]: 1 : if (LZ4F_isError(status))
737 tgl@sss.pgh.pa.us 443 :UBC 0 : pg_fatal("could not free LZ4 decompression context: %s",
444 : : LZ4F_getErrorName(status));
445 : :
891 michael@paquier.xyz 446 [ - + ]:CBC 1 : if (uncompressed_size != WalSegSz)
447 : : {
888 tgl@sss.pgh.pa.us 448 :UBC 0 : pg_log_warning("compressed segment file \"%s\" has incorrect uncompressed size %zu, skipping",
449 : : dirent->d_name, uncompressed_size);
891 michael@paquier.xyz 450 : 0 : continue;
451 : : }
452 : : #else
453 : : pg_log_error("cannot check file \"%s\": compression with %s not supported by this build",
454 : : dirent->d_name, "LZ4");
455 : : exit(1);
456 : : #endif
457 : : }
458 : :
459 : : /* Looks like a valid segment. Remember that we saw it. */
3856 heikki.linnakangas@i 460 [ + + ]:CBC 9 : if ((segno > high_segno) ||
461 [ + - + - ]: 3 : (segno == high_segno && tli > high_tli) ||
462 [ + - + - : 3 : (segno == high_segno && tli == high_tli && high_ispartial && !ispartial))
+ - + - ]
463 : : {
464 : 9 : high_segno = segno;
465 : 9 : high_tli = tli;
466 : 9 : high_ispartial = ispartial;
467 : : }
468 : : }
469 : :
3677 bruce@momjian.us 470 [ - + ]: 7 : if (errno)
737 tgl@sss.pgh.pa.us 471 :UBC 0 : pg_fatal("could not read directory \"%s\": %m", basedir);
472 : :
3478 andres@anarazel.de 473 :CBC 7 : close_destination_dir(dir, basedir);
474 : :
4312 heikki.linnakangas@i 475 [ + + ]: 7 : if (high_segno > 0)
476 : : {
477 : : XLogRecPtr high_ptr;
478 : :
479 : : /*
480 : : * Move the starting pointer to the start of the next segment, if the
481 : : * highest one we saw was completed. Otherwise start streaming from
482 : : * the beginning of the .partial segment.
483 : : */
3856 484 [ - + ]: 3 : if (!high_ispartial)
3856 heikki.linnakangas@i 485 :UBC 0 : high_segno++;
486 : :
2106 alvherre@alvh.no-ip. 487 :CBC 3 : XLogSegNoOffsetToRecPtr(high_segno, 0, WalSegSz, high_ptr);
488 : :
4105 heikki.linnakangas@i 489 : 3 : *tli = high_tli;
4554 magnus@hagander.net 490 : 3 : return high_ptr;
491 : : }
492 : : else
4105 heikki.linnakangas@i 493 : 4 : return InvalidXLogRecPtr;
494 : : }
495 : :
496 : : /*
497 : : * Start the log streaming
498 : : */
499 : : static void
4554 magnus@hagander.net 500 : 7 : StreamLog(void)
501 : : {
502 : : XLogRecPtr serverpos;
503 : : TimeLineID servertli;
638 peter@eisentraut.org 504 : 7 : StreamCtl stream = {0};
505 : : char *sysidentifier;
506 : :
507 : : /*
508 : : * Connect in replication mode to the server
509 : : */
3466 fujii@postgresql.org 510 [ - + ]: 7 : if (conn == NULL)
3466 fujii@postgresql.org 511 :UBC 0 : conn = GetConnection();
4340 magnus@hagander.net 512 [ - + ]:CBC 7 : if (!conn)
513 : : /* Error message already written in GetConnection() */
4340 magnus@hagander.net 514 :UBC 0 : return;
515 : :
4041 heikki.linnakangas@i 516 [ - + ]:CBC 7 : if (!CheckServerVersionForStreaming(conn))
517 : : {
518 : : /*
519 : : * Error message already written in CheckServerVersionForStreaming().
520 : : * There's no hope of recovering from a version mismatch, so don't
521 : : * retry.
522 : : */
1933 peter@eisentraut.org 523 :UBC 0 : exit(1);
524 : : }
525 : :
526 : : /*
527 : : * Identify server, obtaining start LSN position and current timeline ID
528 : : * at the same time, necessary if not valid data can be found in the
529 : : * existing output directory.
530 : : */
939 michael@paquier.xyz 531 [ - + ]:CBC 7 : if (!RunIdentifySystem(conn, &sysidentifier, &servertli, &serverpos, NULL))
1933 peter@eisentraut.org 532 :UBC 0 : exit(1);
533 : :
534 : : /*
535 : : * Figure out where to start streaming. First scan the local directory.
536 : : */
2956 magnus@hagander.net 537 :CBC 7 : stream.startpos = FindStreamingStart(&stream.timeline);
538 [ + + ]: 7 : if (stream.startpos == InvalidXLogRecPtr)
539 : : {
540 : : /*
541 : : * Try to get the starting point from the slot if any. This is
542 : : * supported in PostgreSQL 15 and newer.
543 : : */
901 michael@paquier.xyz 544 [ + + + - ]: 7 : if (replication_slot != NULL &&
545 : 3 : PQserverVersion(conn) >= 150000)
546 : : {
547 [ + + ]: 3 : if (!GetSlotInformation(conn, replication_slot, &stream.startpos,
548 : : &stream.timeline))
549 : : {
550 : : /* Error is logged by GetSlotInformation() */
551 : 1 : return;
552 : : }
553 : : }
554 : :
555 : : /*
556 : : * If it the starting point is still not known, use the current WAL
557 : : * flush value as last resort.
558 : : */
559 [ + + ]: 3 : if (stream.startpos == InvalidXLogRecPtr)
560 : : {
561 : 1 : stream.startpos = serverpos;
562 : 1 : stream.timeline = servertli;
563 : : }
564 : : }
565 : :
566 [ + - + - ]: 6 : Assert(stream.startpos != InvalidXLogRecPtr &&
567 : : stream.timeline != 0);
568 : :
569 : : /*
570 : : * Always start streaming at the beginning of a segment
571 : : */
2399 andres@anarazel.de 572 : 6 : stream.startpos -= XLogSegmentOffset(stream.startpos, WalSegSz);
573 : :
574 : : /*
575 : : * Start the replication
576 : : */
4554 magnus@hagander.net 577 [ + - ]: 6 : if (verbose)
1840 peter@eisentraut.org 578 : 6 : pg_log_info("starting log streaming at %X/%X (timeline %u)",
579 : : LSN_FORMAT_ARGS(stream.startpos),
580 : : stream.timeline);
581 : :
2956 magnus@hagander.net 582 : 6 : stream.stream_stop = stop_streaming;
2544 tgl@sss.pgh.pa.us 583 : 6 : stream.stop_socket = PGINVALID_SOCKET;
2956 magnus@hagander.net 584 : 6 : stream.standby_message_timeout = standby_message_timeout;
585 : 6 : stream.synchronous = synchronous;
2359 rhaas@postgresql.org 586 : 6 : stream.do_sync = do_sync;
2956 magnus@hagander.net 587 : 6 : stream.mark_done = false;
892 michael@paquier.xyz 588 : 12 : stream.walmethod = CreateWalDirectoryMethod(basedir,
589 : : compression_algorithm,
590 : : compresslevel,
2644 magnus@hagander.net 591 : 6 : stream.do_sync);
2956 592 : 6 : stream.partial_suffix = ".partial";
2645 593 : 6 : stream.replication_slot = replication_slot;
939 michael@paquier.xyz 594 : 6 : stream.sysidentifier = sysidentifier;
595 : :
2956 magnus@hagander.net 596 : 6 : ReceiveXlogStream(conn, &stream);
597 : :
573 rhaas@postgresql.org 598 [ - + ]: 6 : if (!stream.walmethod->ops->finish(stream.walmethod))
599 : : {
1840 peter@eisentraut.org 600 :UBC 0 : pg_log_info("could not finish writing WAL files: %m");
2730 magnus@hagander.net 601 : 0 : return;
602 : : }
603 : :
4546 magnus@hagander.net 604 :CBC 6 : PQfinish(conn);
1933 peter@eisentraut.org 605 : 6 : conn = NULL;
606 : :
573 rhaas@postgresql.org 607 : 6 : stream.walmethod->ops->free(stream.walmethod);
608 : : }
609 : :
610 : : /*
611 : : * When SIGINT/SIGTERM are caught, just tell the system to exit at the next
612 : : * possible moment.
613 : : */
614 : : #ifndef WIN32
615 : :
616 : : static void
578 tgl@sss.pgh.pa.us 617 :UBC 0 : sigexit_handler(SIGNAL_ARGS)
618 : : {
2407 peter_e@gmx.net 619 : 0 : time_to_stop = true;
4554 magnus@hagander.net 620 : 0 : }
621 : : #endif
622 : :
623 : : int
4554 magnus@hagander.net 624 :CBC 17 : main(int argc, char **argv)
625 : : {
626 : : static struct option long_options[] = {
627 : : {"help", no_argument, NULL, '?'},
628 : : {"version", no_argument, NULL, 'V'},
629 : : {"directory", required_argument, NULL, 'D'},
630 : : {"dbname", required_argument, NULL, 'd'},
631 : : {"endpos", required_argument, NULL, 'E'},
632 : : {"host", required_argument, NULL, 'h'},
633 : : {"port", required_argument, NULL, 'p'},
634 : : {"username", required_argument, NULL, 'U'},
635 : : {"no-loop", no_argument, NULL, 'n'},
636 : : {"no-password", no_argument, NULL, 'w'},
637 : : {"password", no_argument, NULL, 'W'},
638 : : {"status-interval", required_argument, NULL, 's'},
639 : : {"slot", required_argument, NULL, 'S'},
640 : : {"verbose", no_argument, NULL, 'v'},
641 : : {"compress", required_argument, NULL, 'Z'},
642 : : /* action */
643 : : {"create-slot", no_argument, NULL, 1},
644 : : {"drop-slot", no_argument, NULL, 2},
645 : : {"if-not-exists", no_argument, NULL, 3},
646 : : {"synchronous", no_argument, NULL, 4},
647 : : {"no-sync", no_argument, NULL, 5},
648 : : {NULL, 0, NULL, 0}
649 : : };
650 : :
651 : : int c;
652 : : int option_index;
653 : : char *db_name;
654 : : uint32 hi,
655 : : lo;
656 : : pg_compress_specification compression_spec;
732 michael@paquier.xyz 657 : 17 : char *compression_detail = NULL;
658 : 17 : char *compression_algorithm_str = "none";
659 : 17 : char *error_detail = NULL;
660 : :
1840 peter@eisentraut.org 661 : 17 : pg_logging_init(argv[0]);
4554 magnus@hagander.net 662 : 17 : progname = get_progname(argv[0]);
3030 alvherre@alvh.no-ip. 663 : 17 : set_pglocale_pgservice(argv[0], PG_TEXTDOMAIN("pg_basebackup"));
664 : :
4554 magnus@hagander.net 665 [ + + ]: 17 : if (argc > 1)
666 : : {
667 [ + + - + ]: 16 : if (strcmp(argv[1], "--help") == 0 || strcmp(argv[1], "-?") == 0)
668 : : {
669 : 1 : usage();
670 : 1 : exit(0);
671 : : }
4275 alvherre@alvh.no-ip. 672 [ + - ]: 15 : else if (strcmp(argv[1], "-V") == 0 ||
673 [ + + ]: 15 : strcmp(argv[1], "--version") == 0)
674 : : {
2621 rhaas@postgresql.org 675 : 1 : puts("pg_receivewal (PostgreSQL) " PG_VERSION);
4554 magnus@hagander.net 676 : 1 : exit(0);
677 : : }
678 : : }
679 : :
489 peter@eisentraut.org 680 : 66 : while ((c = getopt_long(argc, argv, "d:D:E:h:np:s:S:U:vwWZ:",
4554 magnus@hagander.net 681 [ + + ]: 66 : long_options, &option_index)) != -1)
682 : : {
683 [ - + + - : 52 : switch (c)
+ - - + -
+ - - + +
+ - + +
+ ]
684 : : {
489 peter@eisentraut.org 685 :UBC 0 : case 'd':
686 : 0 : connection_string = pg_strdup(optarg);
687 : 0 : break;
4554 magnus@hagander.net 688 :CBC 11 : case 'D':
4212 tgl@sss.pgh.pa.us 689 : 11 : basedir = pg_strdup(optarg);
4554 magnus@hagander.net 690 : 11 : break;
489 peter@eisentraut.org 691 : 7 : case 'E':
692 [ - + ]: 7 : if (sscanf(optarg, "%X/%X", &hi, &lo) != 2)
489 peter@eisentraut.org 693 :UBC 0 : pg_fatal("could not parse end position \"%s\"", optarg);
489 peter@eisentraut.org 694 :CBC 7 : endpos = ((uint64) hi) << 32 | lo;
4066 heikki.linnakangas@i 695 : 7 : break;
4554 magnus@hagander.net 696 :UBC 0 : case 'h':
4212 tgl@sss.pgh.pa.us 697 : 0 : dbhost = pg_strdup(optarg);
4554 magnus@hagander.net 698 : 0 : break;
489 peter@eisentraut.org 699 :CBC 7 : case 'n':
700 : 7 : noloop = true;
701 : 7 : break;
4554 magnus@hagander.net 702 :UBC 0 : case 'p':
4212 tgl@sss.pgh.pa.us 703 : 0 : dbport = pg_strdup(optarg);
4554 magnus@hagander.net 704 : 0 : break;
705 : 0 : case 's':
995 michael@paquier.xyz 706 [ # # ]: 0 : if (!option_parse_int(optarg, "-s/--status-interval", 0,
707 : : INT_MAX / 1000,
708 : : &standby_message_timeout))
4554 magnus@hagander.net 709 : 0 : exit(1);
995 michael@paquier.xyz 710 : 0 : standby_message_timeout *= 1000;
4554 magnus@hagander.net 711 : 0 : break;
3726 rhaas@postgresql.org 712 :CBC 5 : case 'S':
713 : 5 : replication_slot = pg_strdup(optarg);
714 : 5 : break;
489 peter@eisentraut.org 715 :UBC 0 : case 'U':
716 : 0 : dbuser = pg_strdup(optarg);
4340 magnus@hagander.net 717 : 0 : break;
4554 magnus@hagander.net 718 :CBC 7 : case 'v':
719 : 7 : verbose++;
720 : 7 : break;
489 peter@eisentraut.org 721 :UBC 0 : case 'w':
722 : 0 : dbgetpassword = -1;
723 : 0 : break;
724 : 0 : case 'W':
725 : 0 : dbgetpassword = 1;
726 : 0 : break;
2644 magnus@hagander.net 727 :CBC 3 : case 'Z':
732 michael@paquier.xyz 728 : 3 : parse_compress_options(optarg, &compression_algorithm_str,
729 : : &compression_detail);
2644 magnus@hagander.net 730 : 3 : break;
3478 andres@anarazel.de 731 : 3 : case 1:
732 : 3 : do_create_slot = true;
733 : 3 : break;
734 : 2 : case 2:
735 : 2 : do_drop_slot = true;
736 : 2 : break;
3435 fujii@postgresql.org 737 :UBC 0 : case 3:
3199 andres@anarazel.de 738 : 0 : slot_exists_ok = true;
739 : 0 : break;
3199 andres@anarazel.de 740 :CBC 2 : case 4:
3435 fujii@postgresql.org 741 : 2 : synchronous = true;
742 : 2 : break;
2359 rhaas@postgresql.org 743 : 4 : case 5:
744 : 4 : do_sync = false;
745 : 4 : break;
4554 magnus@hagander.net 746 : 1 : default:
747 : : /* getopt_long already emitted a complaint */
737 tgl@sss.pgh.pa.us 748 : 1 : pg_log_error_hint("Try \"%s --help\" for more information.", progname);
4554 magnus@hagander.net 749 : 1 : exit(1);
750 : : }
751 : : }
752 : :
753 : : /*
754 : : * Any non-option arguments?
755 : : */
756 [ - + ]: 14 : if (optind < argc)
757 : : {
1840 peter@eisentraut.org 758 :UBC 0 : pg_log_error("too many command-line arguments (first is \"%s\")",
759 : : argv[optind]);
737 tgl@sss.pgh.pa.us 760 : 0 : pg_log_error_hint("Try \"%s --help\" for more information.", progname);
4554 magnus@hagander.net 761 : 0 : exit(1);
762 : : }
763 : :
3254 peter_e@gmx.net 764 [ + + + + ]:CBC 14 : if (do_drop_slot && do_create_slot)
765 : : {
1840 peter@eisentraut.org 766 : 1 : pg_log_error("cannot use --create-slot together with --drop-slot");
737 tgl@sss.pgh.pa.us 767 : 1 : pg_log_error_hint("Try \"%s --help\" for more information.", progname);
3478 andres@anarazel.de 768 : 1 : exit(1);
769 : : }
770 : :
3254 peter_e@gmx.net 771 [ + + + - : 13 : if (replication_slot == NULL && (do_drop_slot || do_create_slot))
+ + ]
772 : : {
773 : : /* translator: second %s is an option name */
1840 peter@eisentraut.org 774 [ - + ]: 1 : pg_log_error("%s needs a slot to be specified using --slot",
775 : : do_drop_slot ? "--drop-slot" : "--create-slot");
737 tgl@sss.pgh.pa.us 776 : 1 : pg_log_error_hint("Try \"%s --help\" for more information.", progname);
3478 andres@anarazel.de 777 : 1 : exit(1);
778 : : }
779 : :
2359 rhaas@postgresql.org 780 [ + + + + ]: 12 : if (synchronous && !do_sync)
781 : : {
1840 peter@eisentraut.org 782 : 1 : pg_log_error("cannot use --synchronous together with --no-sync");
737 tgl@sss.pgh.pa.us 783 : 1 : pg_log_error_hint("Try \"%s --help\" for more information.", progname);
2359 rhaas@postgresql.org 784 : 1 : exit(1);
785 : : }
786 : :
787 : : /*
788 : : * Required arguments
789 : : */
3170 andres@anarazel.de 790 [ + + + + : 11 : if (basedir == NULL && !do_drop_slot && !do_create_slot)
+ + ]
791 : : {
1840 peter@eisentraut.org 792 : 1 : pg_log_error("no target directory specified");
737 tgl@sss.pgh.pa.us 793 : 1 : pg_log_error_hint("Try \"%s --help\" for more information.", progname);
4554 magnus@hagander.net 794 : 1 : exit(1);
795 : : }
796 : :
797 : : /*
798 : : * Compression options
799 : : */
732 michael@paquier.xyz 800 [ - + ]: 10 : if (!parse_compress_algorithm(compression_algorithm_str,
801 : : &compression_algorithm))
568 peter@eisentraut.org 802 :UBC 0 : pg_fatal("unrecognized compression algorithm: \"%s\"",
803 : : compression_algorithm_str);
804 : :
732 michael@paquier.xyz 805 :CBC 10 : parse_compress_specification(compression_algorithm, compression_detail,
806 : : &compression_spec);
807 : 10 : error_detail = validate_compress_specification(&compression_spec);
808 [ + + ]: 10 : if (error_detail != NULL)
809 : 1 : pg_fatal("invalid compression specification: %s",
810 : : error_detail);
811 : :
812 : : /* Extract the compression level */
578 813 : 9 : compresslevel = compression_spec.level;
814 : :
815 [ - + ]: 9 : if (compression_algorithm == PG_COMPRESSION_ZSTD)
578 michael@paquier.xyz 816 :UBC 0 : pg_fatal("compression with %s is not yet supported", "ZSTD");
817 : :
818 : : /*
819 : : * Check existence of destination folder.
820 : : */
3170 andres@anarazel.de 821 [ + + + + ]:CBC 9 : if (!do_drop_slot && !do_create_slot)
822 : : {
3249 bruce@momjian.us 823 : 7 : DIR *dir = get_destination_dir(basedir);
824 : :
3478 andres@anarazel.de 825 : 7 : close_destination_dir(dir, basedir);
826 : : }
827 : :
828 : : /*
829 : : * Obtain a connection before doing anything.
830 : : */
831 : 9 : conn = GetConnection();
832 [ - + ]: 9 : if (!conn)
833 : : /* error message already written in GetConnection() */
3478 andres@anarazel.de 834 :UBC 0 : exit(1);
1933 peter@eisentraut.org 835 :CBC 9 : atexit(disconnect_atexit);
836 : :
837 : : /*
838 : : * Trap signals. (Don't do this until after the initial password prompt,
839 : : * if one is needed, in GetConnection.)
840 : : */
841 : : #ifndef WIN32
578 dgustafsson@postgres 842 : 9 : pqsignal(SIGINT, sigexit_handler);
843 : 9 : pqsignal(SIGTERM, sigexit_handler);
844 : : #endif
845 : :
846 : : /*
847 : : * Run IDENTIFY_SYSTEM to make sure we've successfully have established a
848 : : * replication connection and haven't connected using a database specific
849 : : * connection.
850 : : */
3478 andres@anarazel.de 851 [ - + ]: 9 : if (!RunIdentifySystem(conn, NULL, NULL, NULL, &db_name))
1933 peter@eisentraut.org 852 :UBC 0 : exit(1);
853 : :
854 : : /*
855 : : * Check that there is a database associated with connection, none should
856 : : * be defined in this context.
857 : : */
3478 andres@anarazel.de 858 [ - + ]:CBC 9 : if (db_name)
737 tgl@sss.pgh.pa.us 859 :UBC 0 : pg_fatal("replication connection using slot \"%s\" is unexpectedly database specific",
860 : : replication_slot);
861 : :
862 : : /*
863 : : * Set umask so that directories/files are created with the same
864 : : * permissions as directories/files in the source data directory.
865 : : *
866 : : * pg_mode_mask is set to owner-only by default and then updated in
867 : : * GetConnection() where we get the mode from the server-side with
868 : : * RetrieveDataDirCreatePerm() and then call SetDataDirectoryCreatePerm().
869 : : */
939 michael@paquier.xyz 870 :CBC 9 : umask(pg_mode_mask);
871 : :
872 : : /*
873 : : * Drop a replication slot.
874 : : */
3478 andres@anarazel.de 875 [ + + ]: 9 : if (do_drop_slot)
876 : : {
877 [ - + ]: 1 : if (verbose)
1789 tgl@sss.pgh.pa.us 878 :UBC 0 : pg_log_info("dropping replication slot \"%s\"", replication_slot);
879 : :
3478 andres@anarazel.de 880 [ - + ]:CBC 1 : if (!DropReplicationSlot(conn, replication_slot))
1933 peter@eisentraut.org 881 :UBC 0 : exit(1);
1933 peter@eisentraut.org 882 :CBC 1 : exit(0);
883 : : }
884 : :
885 : : /* Create a replication slot */
3478 andres@anarazel.de 886 [ + + ]: 8 : if (do_create_slot)
887 : : {
888 [ - + ]: 1 : if (verbose)
1840 peter@eisentraut.org 889 :UBC 0 : pg_log_info("creating replication slot \"%s\"", replication_slot);
890 : :
2392 peter_e@gmx.net 891 [ - + ]:CBC 1 : if (!CreateReplicationSlot(conn, replication_slot, NULL, false, true, false,
892 : : slot_exists_ok, false))
1933 peter@eisentraut.org 893 :UBC 0 : exit(1);
1933 peter@eisentraut.org 894 :CBC 1 : exit(0);
895 : : }
896 : :
897 : : /* determine remote server's xlog segment size */
939 michael@paquier.xyz 898 [ - + ]: 7 : if (!RetrieveWalSegSize(conn))
939 michael@paquier.xyz 899 :UBC 0 : exit(1);
900 : :
901 : : /*
902 : : * Don't close the connection here so that subsequent StreamLog() can
903 : : * reuse it.
904 : : */
905 : :
906 : : while (true)
907 : : {
4340 magnus@hagander.net 908 : 0 : StreamLog();
2407 peter_e@gmx.net 909 [ + + ]:CBC 7 : if (time_to_stop)
910 : : {
911 : : /*
912 : : * We've been Ctrl-C'ed or end of streaming position has been
913 : : * willingly reached, so exit without an error code.
914 : : */
4340 magnus@hagander.net 915 : 6 : exit(0);
916 : : }
917 [ + - ]: 1 : else if (noloop)
737 tgl@sss.pgh.pa.us 918 : 1 : pg_fatal("disconnected");
919 : : else
920 : : {
921 : : /* translator: check source for value for %d */
1840 peter@eisentraut.org 922 :UBC 0 : pg_log_info("disconnected; waiting %d seconds to try again",
923 : : RECONNECT_SLEEP_TIME);
4340 magnus@hagander.net 924 : 0 : pg_usleep(RECONNECT_SLEEP_TIME * 1000000);
925 : : }
926 : : }
927 : : }
|