LCOV - differential code coverage report
Current view: top level - src/bin/pg_basebackup - pg_receivewal.c (source / functions) Coverage Total Hit UNC LBC UIC UBC GBC GIC GNC CBC EUB ECB DUB DCB
Current: Differential Code Coverage HEAD vs 15 Lines: 78.1 % 375 293 13 21 39 9 26 161 12 94 44 139 3 39
Current Date: 2023-04-08 15:15:32 Functions: 90.0 % 10 9 1 9 1 8 1
Baseline: 15
Baseline Date: 2023-04-08 15:09:40
Legend: Lines: hit not hit

           TLA  Line data    Source code
       1                 : /*-------------------------------------------------------------------------
       2                 :  *
       3                 :  * pg_receivewal.c - receive streaming WAL data and write it
       4                 :  *                    to a local file.
       5                 :  *
       6                 :  * Author: Magnus Hagander <magnus@hagander.net>
       7                 :  *
       8                 :  * Portions Copyright (c) 1996-2023, PostgreSQL Global Development Group
       9                 :  *
      10                 :  * IDENTIFICATION
      11                 :  *        src/bin/pg_basebackup/pg_receivewal.c
      12                 :  *-------------------------------------------------------------------------
      13                 :  */
      14                 : 
      15                 : #include "postgres_fe.h"
      16                 : 
      17                 : #include <dirent.h>
      18                 : #include <limits.h>
      19                 : #include <signal.h>
      20                 : #include <sys/stat.h>
      21                 : #include <unistd.h>
      22                 : 
      23                 : #ifdef USE_LZ4
      24                 : #include <lz4frame.h>
      25                 : #endif
      26                 : #ifdef HAVE_LIBZ
      27                 : #include <zlib.h>
      28                 : #endif
      29                 : 
      30                 : #include "access/xlog_internal.h"
      31                 : #include "common/file_perm.h"
      32                 : #include "common/logging.h"
      33                 : #include "fe_utils/option_utils.h"
      34                 : #include "getopt_long.h"
      35                 : #include "libpq-fe.h"
      36                 : #include "receivelog.h"
      37                 : #include "streamutil.h"
      38                 : 
      39                 : /* Time to sleep between reconnection attempts */
      40                 : #define RECONNECT_SLEEP_TIME 5
      41                 : 
      42                 : /* Global options */
      43                 : static char *basedir = NULL;
      44                 : static int  verbose = 0;
      45                 : static int  compresslevel = 0;
      46                 : static bool noloop = false;
      47                 : static int  standby_message_timeout = 10 * 1000;    /* 10 sec = default */
      48                 : static volatile sig_atomic_t time_to_stop = false;
      49                 : static bool do_create_slot = false;
      50                 : static bool slot_exists_ok = false;
      51                 : static bool do_drop_slot = false;
      52                 : static bool do_sync = true;
      53                 : static bool synchronous = false;
      54                 : static char *replication_slot = NULL;
      55                 : static pg_compress_algorithm compression_algorithm = PG_COMPRESSION_NONE;
      56                 : static XLogRecPtr endpos = InvalidXLogRecPtr;
      57                 : 
      58                 : 
      59                 : static void usage(void);
      60                 : static DIR *get_destination_dir(char *dest_folder);
      61                 : static void close_destination_dir(DIR *dest_dir, char *dest_folder);
      62                 : static XLogRecPtr FindStreamingStart(uint32 *tli);
      63                 : static void StreamLog(void);
      64                 : static bool stop_streaming(XLogRecPtr xlogpos, uint32 timeline,
      65                 :                            bool segment_finished);
      66 ECB             : 
      67                 : static void
      68 CBC           9 : disconnect_atexit(void)
      69 ECB             : {
      70 CBC           9 :     if (conn != NULL)
      71 GIC           3 :         PQfinish(conn);
      72               9 : }
      73 ECB             : 
      74                 : static void
      75 CBC           1 : usage(void)
      76                 : {
      77               1 :     printf(_("%s receives PostgreSQL streaming write-ahead logs.\n\n"),
      78 ECB             :            progname);
      79 CBC           1 :     printf(_("Usage:\n"));
      80               1 :     printf(_("  %s [OPTION]...\n"), progname);
      81               1 :     printf(_("\nOptions:\n"));
      82               1 :     printf(_("  -D, --directory=DIR    receive write-ahead log files into this directory\n"));
      83               1 :     printf(_("  -E, --endpos=LSN       exit after receiving the specified LSN\n"));
      84               1 :     printf(_("      --if-not-exists    do not error if slot already exists when creating a slot\n"));
      85               1 :     printf(_("  -n, --no-loop          do not loop on connection lost\n"));
      86 GIC           1 :     printf(_("      --no-sync          do not wait for changes to be written safely to disk\n"));
      87 CBC           1 :     printf(_("  -s, --status-interval=SECS\n"
      88 ECB             :              "                         time between status packets sent to server (default: %d)\n"), (standby_message_timeout / 1000));
      89 CBC           1 :     printf(_("  -S, --slot=SLOTNAME    replication slot to use\n"));
      90               1 :     printf(_("      --synchronous      flush write-ahead log immediately after writing\n"));
      91               1 :     printf(_("  -v, --verbose          output verbose messages\n"));
      92 GIC           1 :     printf(_("  -V, --version          output version information, then exit\n"));
      93 CBC           1 :     printf(_("  -Z, --compress=METHOD[:DETAIL]\n"
      94 ECB             :              "                         compress as specified\n"));
      95 CBC           1 :     printf(_("  -?, --help             show this help, then exit\n"));
      96               1 :     printf(_("\nConnection options:\n"));
      97               1 :     printf(_("  -d, --dbname=CONNSTR   connection string\n"));
      98               1 :     printf(_("  -h, --host=HOSTNAME    database server host or socket directory\n"));
      99               1 :     printf(_("  -p, --port=PORT        database server port number\n"));
     100               1 :     printf(_("  -U, --username=NAME    connect as specified database user\n"));
     101               1 :     printf(_("  -w, --no-password      never prompt for password\n"));
     102               1 :     printf(_("  -W, --password         force password prompt (should happen automatically)\n"));
     103               1 :     printf(_("\nOptional actions:\n"));
     104               1 :     printf(_("      --create-slot      create a new replication slot (for the slot's name see --slot)\n"));
     105               1 :     printf(_("      --drop-slot        drop the replication slot (for the slot's name see --slot)\n"));
     106               1 :     printf(_("\nReport bugs to <%s>.\n"), PACKAGE_BUGREPORT);
     107 GIC           1 :     printf(_("%s home page: <%s>\n"), PACKAGE_NAME, PACKAGE_URL);
     108               1 : }
     109                 : 
     110 ECB             : 
     111                 : /*
     112                 :  * Check if the filename looks like a WAL file, letting caller know if this
     113                 :  * WAL segment is partial and/or compressed.
     114                 :  */
     115                 : static bool
     116 GIC          23 : is_xlogfilename(const char *filename, bool *ispartial,
     117                 :                 pg_compress_algorithm *wal_compression_algorithm)
     118                 : {
     119 GBC          23 :     size_t      fname_len = strlen(filename);
     120 GIC          23 :     size_t      xlog_pattern_len = strspn(filename, "0123456789ABCDEF");
     121                 : 
     122                 :     /* File does not look like a WAL file */
     123 CBC          23 :     if (xlog_pattern_len != XLOG_FNAME_LEN)
     124 GIC          14 :         return false;
     125                 : 
     126                 :     /* File looks like a completed uncompressed WAL file */
     127               9 :     if (fname_len == XLOG_FNAME_LEN)
     128                 :     {
     129 LBC           0 :         *ispartial = false;
     130               0 :         *wal_compression_algorithm = PG_COMPRESSION_NONE;
     131 UIC           0 :         return true;
     132                 :     }
     133                 : 
     134 ECB             :     /* File looks like a completed gzip-compressed WAL file */
     135 GIC           9 :     if (fname_len == XLOG_FNAME_LEN + strlen(".gz") &&
     136 CBC           2 :         strcmp(filename + XLOG_FNAME_LEN, ".gz") == 0)
     137 ECB             :     {
     138 GIC           2 :         *ispartial = false;
     139               2 :         *wal_compression_algorithm = PG_COMPRESSION_GZIP;
     140 CBC           2 :         return true;
     141 ECB             :     }
     142                 : 
     143                 :     /* File looks like a completed LZ4-compressed WAL file */
     144 GIC           7 :     if (fname_len == XLOG_FNAME_LEN + strlen(".lz4") &&
     145               1 :         strcmp(filename + XLOG_FNAME_LEN, ".lz4") == 0)
     146                 :     {
     147               1 :         *ispartial = false;
     148               1 :         *wal_compression_algorithm = PG_COMPRESSION_LZ4;
     149               1 :         return true;
     150                 :     }
     151                 : 
     152 ECB             :     /* File looks like a partial uncompressed WAL file */
     153 CBC           6 :     if (fname_len == XLOG_FNAME_LEN + strlen(".partial") &&
     154 GIC           3 :         strcmp(filename + XLOG_FNAME_LEN, ".partial") == 0)
     155                 :     {
     156               3 :         *ispartial = true;
     157 CBC           3 :         *wal_compression_algorithm = PG_COMPRESSION_NONE;
     158               3 :         return true;
     159                 :     }
     160 ECB             : 
     161                 :     /* File looks like a partial gzip-compressed WAL file */
     162 GBC           3 :     if (fname_len == XLOG_FNAME_LEN + strlen(".gz.partial") &&
     163               2 :         strcmp(filename + XLOG_FNAME_LEN, ".gz.partial") == 0)
     164 EUB             :     {
     165 GIC           2 :         *ispartial = true;
     166 CBC           2 :         *wal_compression_algorithm = PG_COMPRESSION_GZIP;
     167 GIC           2 :         return true;
     168                 :     }
     169                 : 
     170                 :     /* File looks like a partial LZ4-compressed WAL file */
     171               1 :     if (fname_len == XLOG_FNAME_LEN + strlen(".lz4.partial") &&
     172               1 :         strcmp(filename + XLOG_FNAME_LEN, ".lz4.partial") == 0)
     173                 :     {
     174 CBC           1 :         *ispartial = true;
     175 GIC           1 :         *wal_compression_algorithm = PG_COMPRESSION_LZ4;
     176               1 :         return true;
     177                 :     }
     178 ECB             : 
     179                 :     /* File does not look like something we know */
     180 LBC           0 :     return false;
     181 EUB             : }
     182                 : 
     183 ECB             : static bool
     184 GIC         182 : stop_streaming(XLogRecPtr xlogpos, uint32 timeline, bool segment_finished)
     185                 : {
     186                 :     static uint32 prevtimeline = 0;
     187                 :     static XLogRecPtr prevpos = InvalidXLogRecPtr;
     188                 : 
     189                 :     /* we assume that we get called once at the end of each segment */
     190             182 :     if (verbose && segment_finished)
     191 CBC           6 :         pg_log_info("finished segment at %X/%X (timeline %u)",
     192                 :                     LSN_FORMAT_ARGS(xlogpos),
     193 ECB             :                     timeline);
     194                 : 
     195 GBC         182 :     if (!XLogRecPtrIsInvalid(endpos) && endpos < xlogpos)
     196 ECB             :     {
     197 GIC          12 :         if (verbose)
     198              12 :             pg_log_info("stopped log streaming at %X/%X (timeline %u)",
     199                 :                         LSN_FORMAT_ARGS(xlogpos),
     200                 :                         timeline);
     201              12 :         time_to_stop = true;
     202              12 :         return true;
     203                 :     }
     204                 : 
     205                 :     /*
     206                 :      * Note that we report the previous, not current, position here. After a
     207 ECB             :      * timeline switch, xlogpos points to the beginning of the segment because
     208                 :      * that's where we always begin streaming. Reporting the end of previous
     209                 :      * timeline isn't totally accurate, because the next timeline can begin
     210                 :      * slightly before the end of the WAL that we received on the previous
     211                 :      * timeline, but it's close enough for reporting purposes.
     212                 :      */
     213 CBC         170 :     if (verbose && prevtimeline != 0 && prevtimeline != timeline)
     214 GIC           1 :         pg_log_info("switched to timeline %u at %X/%X",
     215 ECB             :                     timeline,
     216                 :                     LSN_FORMAT_ARGS(prevpos));
     217                 : 
     218 GIC         170 :     prevtimeline = timeline;
     219             170 :     prevpos = xlogpos;
     220                 : 
     221             170 :     if (time_to_stop)
     222                 :     {
     223 UIC           0 :         if (verbose)
     224 LBC           0 :             pg_log_info("received interrupt signal, exiting");
     225 UIC           0 :         return true;
     226 ECB             :     }
     227 GIC         170 :     return false;
     228                 : }
     229                 : 
     230                 : 
     231 ECB             : /*
     232                 :  * Get destination directory.
     233                 :  */
     234                 : static DIR *
     235 GIC          14 : get_destination_dir(char *dest_folder)
     236                 : {
     237                 :     DIR        *dir;
     238                 : 
     239              14 :     Assert(dest_folder != NULL);
     240              14 :     dir = opendir(dest_folder);
     241              14 :     if (dir == NULL)
     242 UIC           0 :         pg_fatal("could not open directory \"%s\": %m", dest_folder);
     243                 : 
     244 GIC          14 :     return dir;
     245                 : }
     246                 : 
     247                 : 
     248                 : /*
     249                 :  * Close existing directory.
     250                 :  */
     251 ECB             : static void
     252 GBC          14 : close_destination_dir(DIR *dest_dir, char *dest_folder)
     253                 : {
     254 GIC          14 :     Assert(dest_dir != NULL && dest_folder != NULL);
     255              14 :     if (closedir(dest_dir))
     256 UBC           0 :         pg_fatal("could not close directory \"%s\": %m", dest_folder);
     257 GBC          14 : }
     258 EUB             : 
     259                 : 
     260                 : /*
     261                 :  * Determine starting location for streaming, based on any existing xlog
     262                 :  * segments in the directory. We start at the end of the last one that is
     263                 :  * complete (size matches wal segment size), on the timeline with highest ID.
     264                 :  *
     265                 :  * If there are no WAL files in the directory, returns InvalidXLogRecPtr.
     266                 :  */
     267 ECB             : static XLogRecPtr
     268 CBC           7 : FindStreamingStart(uint32 *tli)
     269                 : {
     270                 :     DIR        *dir;
     271                 :     struct dirent *dirent;
     272 GIC           7 :     XLogSegNo   high_segno = 0;
     273               7 :     uint32      high_tli = 0;
     274               7 :     bool        high_ispartial = false;
     275 ECB             : 
     276 GIC           7 :     dir = get_destination_dir(basedir);
     277 ECB             : 
     278 CBC          30 :     while (errno = 0, (dirent = readdir(dir)) != NULL)
     279 EUB             :     {
     280                 :         uint32      tli;
     281 ECB             :         XLogSegNo   segno;
     282 EUB             :         pg_compress_algorithm wal_compression_algorithm;
     283                 :         bool        ispartial;
     284 ECB             : 
     285 CBC          23 :         if (!is_xlogfilename(dirent->d_name,
     286                 :                              &ispartial, &wal_compression_algorithm))
     287 GBC          14 :             continue;
     288 EUB             : 
     289                 :         /*
     290                 :          * Looks like an xlog file. Parse its position.
     291                 :          */
     292 GIC           9 :         XLogFromFileName(dirent->d_name, &tli, &segno, WalSegSz);
     293                 : 
     294                 :         /*
     295 ECB             :          * Check that the segment has the right size, if it's supposed to be
     296                 :          * completed.  For non-compressed segments just check the on-disk size
     297                 :          * and see if it matches a completed segment.  For gzip-compressed
     298                 :          * segments, look at the last 4 bytes of the compressed file, which is
     299                 :          * where the uncompressed size is located for files with a size lower
     300                 :          * than 4GB, and then compare it to the size of a completed segment.
     301 EUB             :          * The 4 last bytes correspond to the ISIZE member according to
     302                 :          * http://www.zlib.org/rfc-gzip.html.
     303                 :          *
     304                 :          * For LZ4-compressed segments, uncompress the file in a throw-away
     305                 :          * buffer keeping track of the uncompressed size, then compare it to
     306 ECB             :          * the size of a completed segment.  Per its protocol, LZ4 does not
     307                 :          * store the uncompressed size of an object by default.  contentSize
     308                 :          * is one possible way to do that, but we need to rely on a method
     309                 :          * where WAL segments could have been compressed by a different source
     310                 :          * than pg_receivewal, like an archive_command with lz4.
     311                 :          */
     312 CBC           9 :         if (!ispartial && wal_compression_algorithm == PG_COMPRESSION_NONE)
     313 UIC           0 :         {
     314                 :             struct stat statbuf;
     315                 :             char        fullpath[MAXPGPATH * 2];
     316 ECB             : 
     317 UIC           0 :             snprintf(fullpath, sizeof(fullpath), "%s/%s", basedir, dirent->d_name);
     318               0 :             if (stat(fullpath, &statbuf) != 0)
     319               0 :                 pg_fatal("could not stat file \"%s\": %m", fullpath);
     320 ECB             : 
     321 LBC           0 :             if (statbuf.st_size != WalSegSz)
     322                 :             {
     323               0 :                 pg_log_warning("segment file \"%s\" has incorrect size %lld, skipping",
     324 ECB             :                                dirent->d_name, (long long int) statbuf.st_size);
     325 UBC           0 :                 continue;
     326                 :             }
     327 ECB             :         }
     328 CBC           9 :         else if (!ispartial && wal_compression_algorithm == PG_COMPRESSION_GZIP)
     329 GBC           2 :         {
     330                 :             int         fd;
     331                 :             char        buf[4];
     332 ECB             :             int         bytes_out;
     333                 :             char        fullpath[MAXPGPATH * 2];
     334                 :             int         r;
     335                 : 
     336 GIC           2 :             snprintf(fullpath, sizeof(fullpath), "%s/%s", basedir, dirent->d_name);
     337                 : 
     338               2 :             fd = open(fullpath, O_RDONLY | PG_BINARY, 0);
     339 CBC           2 :             if (fd < 0)
     340 LBC           0 :                 pg_fatal("could not open compressed file \"%s\": %m",
     341 EUB             :                          fullpath);
     342 GIC           2 :             if (lseek(fd, (off_t) (-4), SEEK_END) < 0)
     343 UIC           0 :                 pg_fatal("could not seek in compressed file \"%s\": %m",
     344 ECB             :                          fullpath);
     345 CBC           2 :             r = read(fd, (char *) buf, sizeof(buf));
     346 GIC           2 :             if (r != sizeof(buf))
     347                 :             {
     348 LBC           0 :                 if (r < 0)
     349               0 :                     pg_fatal("could not read compressed file \"%s\": %m",
     350 ECB             :                              fullpath);
     351                 :                 else
     352 LBC           0 :                     pg_fatal("could not read compressed file \"%s\": read %d of %zu",
     353 ECB             :                              fullpath, r, sizeof(buf));
     354                 :             }
     355                 : 
     356 CBC           2 :             close(fd);
     357 GIC           2 :             bytes_out = (buf[3] << 24) | (buf[2] << 16) |
     358 CBC           2 :                 (buf[1] << 8) | buf[0];
     359 EUB             : 
     360 GIC           2 :             if (bytes_out != WalSegSz)
     361                 :             {
     362 UIC           0 :                 pg_log_warning("compressed segment file \"%s\" has incorrect uncompressed size %d, skipping",
     363 ECB             :                                dirent->d_name, bytes_out);
     364 LBC           0 :                 continue;
     365                 :             }
     366                 :         }
     367 GIC           7 :         else if (!ispartial && wal_compression_algorithm == PG_COMPRESSION_LZ4)
     368                 :         {
     369                 : #ifdef USE_LZ4
     370                 : #define LZ4_CHUNK_SZ    64 * 1024   /* 64kB as maximum chunk size read */
     371                 :             int         fd;
     372                 :             ssize_t     r;
     373               1 :             size_t      uncompressed_size = 0;
     374 ECB             :             char        fullpath[MAXPGPATH * 2];
     375                 :             char       *outbuf;
     376                 :             char       *readbuf;
     377 CBC           1 :             LZ4F_decompressionContext_t ctx = NULL;
     378 ECB             :             LZ4F_decompressOptions_t dec_opt;
     379                 :             LZ4F_errorCode_t status;
     380                 : 
     381 CBC           1 :             memset(&dec_opt, 0, sizeof(dec_opt));
     382 GBC           1 :             snprintf(fullpath, sizeof(fullpath), "%s/%s", basedir, dirent->d_name);
     383                 : 
     384 GIC           1 :             fd = open(fullpath, O_RDONLY | PG_BINARY, 0);
     385 CBC           1 :             if (fd < 0)
     386 UIC           0 :                 pg_fatal("could not open file \"%s\": %m", fullpath);
     387 EUB             : 
     388 GIC           1 :             status = LZ4F_createDecompressionContext(&ctx, LZ4F_VERSION);
     389 GBC           1 :             if (LZ4F_isError(status))
     390 UIC           0 :                 pg_fatal("could not create LZ4 decompression context: %s",
     391                 :                          LZ4F_getErrorName(status));
     392                 : 
     393 GIC           1 :             outbuf = pg_malloc0(LZ4_CHUNK_SZ);
     394               1 :             readbuf = pg_malloc0(LZ4_CHUNK_SZ);
     395                 :             do
     396                 :             {
     397                 :                 char       *readp;
     398                 :                 char       *readend;
     399 ECB             : 
     400 CBC           2 :                 r = read(fd, readbuf, LZ4_CHUNK_SZ);
     401               2 :                 if (r < 0)
     402 UIC           0 :                     pg_fatal("could not read file \"%s\": %m", fullpath);
     403 ECB             : 
     404                 :                 /* Done reading the file */
     405 CBC           2 :                 if (r == 0)
     406 GIC           1 :                     break;
     407                 : 
     408                 :                 /* Process one chunk */
     409 CBC           1 :                 readp = readbuf;
     410 GBC           1 :                 readend = readbuf + r;
     411 GIC          17 :                 while (readp < readend)
     412 ECB             :                 {
     413 GIC          16 :                     size_t      out_size = LZ4_CHUNK_SZ;
     414 CBC          16 :                     size_t      read_size = readend - readp;
     415                 : 
     416 GIC          16 :                     memset(outbuf, 0, LZ4_CHUNK_SZ);
     417              16 :                     status = LZ4F_decompress(ctx, outbuf, &out_size,
     418                 :                                              readp, &read_size, &dec_opt);
     419              16 :                     if (LZ4F_isError(status))
     420 UIC           0 :                         pg_fatal("could not decompress file \"%s\": %s",
     421                 :                                  fullpath,
     422                 :                                  LZ4F_getErrorName(status));
     423 ECB             : 
     424 GBC          16 :                     readp += read_size;
     425 GIC          16 :                     uncompressed_size += out_size;
     426 ECB             :                 }
     427                 : 
     428                 :                 /*
     429                 :                  * No need to continue reading the file when the
     430                 :                  * uncompressed_size exceeds WalSegSz, even if there are still
     431                 :                  * data left to read. However, if uncompressed_size is equal
     432                 :                  * to WalSegSz, it should verify that there is no more data to
     433                 :                  * read.
     434                 :                  */
     435 GIC           1 :             } while (uncompressed_size <= WalSegSz && r > 0);
     436                 : 
     437               1 :             close(fd);
     438               1 :             pg_free(outbuf);
     439 CBC           1 :             pg_free(readbuf);
     440                 : 
     441 GIC           1 :             status = LZ4F_freeDecompressionContext(ctx);
     442               1 :             if (LZ4F_isError(status))
     443 LBC           0 :                 pg_fatal("could not free LZ4 decompression context: %s",
     444                 :                          LZ4F_getErrorName(status));
     445                 : 
     446 GIC           1 :             if (uncompressed_size != WalSegSz)
     447                 :             {
     448 UIC           0 :                 pg_log_warning("compressed segment file \"%s\" has incorrect uncompressed size %zu, skipping",
     449 ECB             :                                dirent->d_name, uncompressed_size);
     450 UBC           0 :                 continue;
     451 ECB             :             }
     452                 : #else
     453                 :             pg_log_error("cannot check file \"%s\": compression with %s not supported by this build",
     454                 :                          dirent->d_name, "LZ4");
     455                 :             exit(1);
     456                 : #endif
     457                 :         }
     458                 : 
     459                 :         /* Looks like a valid segment. Remember that we saw it. */
     460 GIC           9 :         if ((segno > high_segno) ||
     461               4 :             (segno == high_segno && tli > high_tli) ||
     462 GBC           4 :             (segno == high_segno && tli == high_tli && high_ispartial && !ispartial))
     463                 :         {
     464 GIC           5 :             high_segno = segno;
     465               5 :             high_tli = tli;
     466               5 :             high_ispartial = ispartial;
     467                 :         }
     468                 :     }
     469                 : 
     470 CBC           7 :     if (errno)
     471 UBC           0 :         pg_fatal("could not read directory \"%s\": %m", basedir);
     472                 : 
     473 GIC           7 :     close_destination_dir(dir, basedir);
     474                 : 
     475               7 :     if (high_segno > 0)
     476 ECB             :     {
     477                 :         XLogRecPtr  high_ptr;
     478                 : 
     479                 :         /*
     480                 :          * Move the starting pointer to the start of the next segment, if the
     481                 :          * highest one we saw was completed. Otherwise start streaming from
     482                 :          * the beginning of the .partial segment.
     483                 :          */
     484 CBC           3 :         if (!high_ispartial)
     485 UIC           0 :             high_segno++;
     486 ECB             : 
     487 GIC           3 :         XLogSegNoOffsetToRecPtr(high_segno, 0, WalSegSz, high_ptr);
     488                 : 
     489               3 :         *tli = high_tli;
     490 CBC           3 :         return high_ptr;
     491                 :     }
     492                 :     else
     493 GIC           4 :         return InvalidXLogRecPtr;
     494                 : }
     495                 : 
     496                 : /*
     497                 :  * Start the log streaming
     498 ECB             :  */
     499                 : static void
     500 CBC           7 : StreamLog(void)
     501 ECB             : {
     502                 :     XLogRecPtr  serverpos;
     503                 :     TimeLineID  servertli;
     504 GNC           7 :     StreamCtl   stream = {0};
     505 ECB             :     char       *sysidentifier;
     506                 : 
     507                 :     /*
     508                 :      * Connect in replication mode to the server
     509                 :      */
     510 GIC           7 :     if (conn == NULL)
     511 UIC           0 :         conn = GetConnection();
     512 GIC           7 :     if (!conn)
     513                 :         /* Error message already written in GetConnection() */
     514 CBC           1 :         return;
     515 ECB             : 
     516 GIC           7 :     if (!CheckServerVersionForStreaming(conn))
     517                 :     {
     518                 :         /*
     519 ECB             :          * Error message already written in CheckServerVersionForStreaming().
     520                 :          * There's no hope of recovering from a version mismatch, so don't
     521                 :          * retry.
     522                 :          */
     523 LBC           0 :         exit(1);
     524 ECB             :     }
     525                 : 
     526                 :     /*
     527                 :      * Identify server, obtaining start LSN position and current timeline ID
     528                 :      * at the same time, necessary if not valid data can be found in the
     529                 :      * existing output directory.
     530                 :      */
     531 CBC           7 :     if (!RunIdentifySystem(conn, &sysidentifier, &servertli, &serverpos, NULL))
     532 UIC           0 :         exit(1);
     533 ECB             : 
     534                 :     /*
     535                 :      * Figure out where to start streaming.  First scan the local directory.
     536                 :      */
     537 GBC           7 :     stream.startpos = FindStreamingStart(&stream.timeline);
     538               7 :     if (stream.startpos == InvalidXLogRecPtr)
     539                 :     {
     540                 :         /*
     541 ECB             :          * Try to get the starting point from the slot if any.  This is
     542                 :          * supported in PostgreSQL 15 and newer.
     543                 :          */
     544 CBC           7 :         if (replication_slot != NULL &&
     545 GIC           3 :             PQserverVersion(conn) >= 150000)
     546                 :         {
     547               3 :             if (!GetSlotInformation(conn, replication_slot, &stream.startpos,
     548                 :                                     &stream.timeline))
     549                 :             {
     550                 :                 /* Error is logged by GetSlotInformation() */
     551               1 :                 return;
     552                 :             }
     553                 :         }
     554 EUB             : 
     555                 :         /*
     556                 :          * If it the starting point is still not known, use the current WAL
     557                 :          * flush value as last resort.
     558                 :          */
     559 GIC           3 :         if (stream.startpos == InvalidXLogRecPtr)
     560                 :         {
     561 CBC           1 :             stream.startpos = serverpos;
     562 GIC           1 :             stream.timeline = servertli;
     563                 :         }
     564                 :     }
     565                 : 
     566               6 :     Assert(stream.startpos != InvalidXLogRecPtr &&
     567                 :            stream.timeline != 0);
     568                 : 
     569                 :     /*
     570                 :      * Always start streaming at the beginning of a segment
     571                 :      */
     572               6 :     stream.startpos -= XLogSegmentOffset(stream.startpos, WalSegSz);
     573                 : 
     574                 :     /*
     575                 :      * Start the replication
     576                 :      */
     577               6 :     if (verbose)
     578               6 :         pg_log_info("starting log streaming at %X/%X (timeline %u)",
     579                 :                     LSN_FORMAT_ARGS(stream.startpos),
     580                 :                     stream.timeline);
     581                 : 
     582               6 :     stream.stream_stop = stop_streaming;
     583               6 :     stream.stop_socket = PGINVALID_SOCKET;
     584               6 :     stream.standby_message_timeout = standby_message_timeout;
     585               6 :     stream.synchronous = synchronous;
     586               6 :     stream.do_sync = do_sync;
     587               6 :     stream.mark_done = false;
     588              12 :     stream.walmethod = CreateWalDirectoryMethod(basedir,
     589                 :                                                 compression_algorithm,
     590                 :                                                 compresslevel,
     591               6 :                                                 stream.do_sync);
     592               6 :     stream.partial_suffix = ".partial";
     593               6 :     stream.replication_slot = replication_slot;
     594 CBC           6 :     stream.sysidentifier = sysidentifier;
     595 ECB             : 
     596 CBC           6 :     ReceiveXlogStream(conn, &stream);
     597                 : 
     598 GNC           6 :     if (!stream.walmethod->ops->finish(stream.walmethod))
     599 ECB             :     {
     600 LBC           0 :         pg_log_info("could not finish writing WAL files: %m");
     601 UIC           0 :         return;
     602 ECB             :     }
     603                 : 
     604 CBC           6 :     PQfinish(conn);
     605 GIC           6 :     conn = NULL;
     606 ECB             : 
     607 GNC           6 :     stream.walmethod->ops->free(stream.walmethod);
     608 ECB             : }
     609                 : 
     610                 : /*
     611                 :  * When SIGINT/SIGTERM are caught, just tell the system to exit at the next
     612                 :  * possible moment.
     613                 :  */
     614                 : #ifndef WIN32
     615                 : 
     616                 : static void
     617 UNC           0 : sigexit_handler(SIGNAL_ARGS)
     618 ECB             : {
     619 UIC           0 :     time_to_stop = true;
     620 UBC           0 : }
     621 EUB             : #endif
     622                 : 
     623 ECB             : int
     624 CBC          17 : main(int argc, char **argv)
     625 ECB             : {
     626                 :     static struct option long_options[] = {
     627                 :         {"help", no_argument, NULL, '?'},
     628 EUB             :         {"version", no_argument, NULL, 'V'},
     629 ECB             :         {"directory", required_argument, NULL, 'D'},
     630                 :         {"dbname", required_argument, NULL, 'd'},
     631 EUB             :         {"endpos", required_argument, NULL, 'E'},
     632                 :         {"host", required_argument, NULL, 'h'},
     633                 :         {"port", required_argument, NULL, 'p'},
     634 ECB             :         {"username", required_argument, NULL, 'U'},
     635                 :         {"no-loop", no_argument, NULL, 'n'},
     636                 :         {"no-password", no_argument, NULL, 'w'},
     637 EUB             :         {"password", no_argument, NULL, 'W'},
     638                 :         {"status-interval", required_argument, NULL, 's'},
     639                 :         {"slot", required_argument, NULL, 'S'},
     640                 :         {"verbose", no_argument, NULL, 'v'},
     641                 :         {"compress", required_argument, NULL, 'Z'},
     642                 : /* action */
     643                 :         {"create-slot", no_argument, NULL, 1},
     644                 :         {"drop-slot", no_argument, NULL, 2},
     645                 :         {"if-not-exists", no_argument, NULL, 3},
     646                 :         {"synchronous", no_argument, NULL, 4},
     647 ECB             :         {"no-sync", no_argument, NULL, 5},
     648                 :         {NULL, 0, NULL, 0}
     649                 :     };
     650 EUB             : 
     651                 :     int         c;
     652                 :     int         option_index;
     653 ECB             :     char       *db_name;
     654                 :     uint32      hi,
     655                 :                 lo;
     656 EUB             :     pg_compress_specification compression_spec;
     657 GBC          17 :     char       *compression_detail = NULL;
     658              17 :     char       *compression_algorithm_str = "none";
     659              17 :     char       *error_detail = NULL;
     660 EUB             : 
     661 GBC          17 :     pg_logging_init(argv[0]);
     662 CBC          17 :     progname = get_progname(argv[0]);
     663              17 :     set_pglocale_pgservice(argv[0], PG_TEXTDOMAIN("pg_basebackup"));
     664                 : 
     665              17 :     if (argc > 1)
     666 ECB             :     {
     667 CBC          16 :         if (strcmp(argv[1], "--help") == 0 || strcmp(argv[1], "-?") == 0)
     668 ECB             :         {
     669 CBC           1 :             usage();
     670               1 :             exit(0);
     671 ECB             :         }
     672 GBC          15 :         else if (strcmp(argv[1], "-V") == 0 ||
     673              15 :                  strcmp(argv[1], "--version") == 0)
     674 EUB             :         {
     675 CBC           1 :             puts("pg_receivewal (PostgreSQL) " PG_VERSION);
     676               1 :             exit(0);
     677 ECB             :         }
     678                 :     }
     679                 : 
     680 GNC          66 :     while ((c = getopt_long(argc, argv, "d:D:E:h:np:s:S:U:vwWZ:",
     681 CBC          66 :                             long_options, &option_index)) != -1)
     682                 :     {
     683              52 :         switch (c)
     684 ECB             :         {
     685 UNC           0 :             case 'd':
     686               0 :                 connection_string = pg_strdup(optarg);
     687               0 :                 break;
     688 GIC          11 :             case 'D':
     689              11 :                 basedir = pg_strdup(optarg);
     690              11 :                 break;
     691 GNC           7 :             case 'E':
     692               7 :                 if (sscanf(optarg, "%X/%X", &hi, &lo) != 2)
     693 UNC           0 :                     pg_fatal("could not parse end position \"%s\"", optarg);
     694 GNC           7 :                 endpos = ((uint64) hi) << 32 | lo;
     695 GIC           7 :                 break;
     696 LBC           0 :             case 'h':
     697 UIC           0 :                 dbhost = pg_strdup(optarg);
     698 UBC           0 :                 break;
     699 GNC           7 :             case 'n':
     700               7 :                 noloop = true;
     701               7 :                 break;
     702 UIC           0 :             case 'p':
     703 UBC           0 :                 dbport = pg_strdup(optarg);
     704               0 :                 break;
     705 LBC           0 :             case 's':
     706 UIC           0 :                 if (!option_parse_int(optarg, "-s/--status-interval", 0,
     707                 :                                       INT_MAX / 1000,
     708 ECB             :                                       &standby_message_timeout))
     709 UIC           0 :                     exit(1);
     710 LBC           0 :                 standby_message_timeout *= 1000;
     711               0 :                 break;
     712 GIC           5 :             case 'S':
     713               5 :                 replication_slot = pg_strdup(optarg);
     714 CBC           5 :                 break;
     715 UNC           0 :             case 'U':
     716               0 :                 dbuser = pg_strdup(optarg);
     717 UIC           0 :                 break;
     718 GIC           7 :             case 'v':
     719 CBC           7 :                 verbose++;
     720 GIC           7 :                 break;
     721 UNC           0 :             case 'w':
     722               0 :                 dbgetpassword = -1;
     723               0 :                 break;
     724               0 :             case 'W':
     725               0 :                 dbgetpassword = 1;
     726               0 :                 break;
     727 CBC           3 :             case 'Z':
     728               3 :                 parse_compress_options(optarg, &compression_algorithm_str,
     729 ECB             :                                        &compression_detail);
     730 GIC           3 :                 break;
     731               3 :             case 1:
     732               3 :                 do_create_slot = true;
     733               3 :                 break;
     734 CBC           2 :             case 2:
     735 GIC           2 :                 do_drop_slot = true;
     736 GBC           2 :                 break;
     737 UIC           0 :             case 3:
     738               0 :                 slot_exists_ok = true;
     739 LBC           0 :                 break;
     740 GIC           2 :             case 4:
     741 CBC           2 :                 synchronous = true;
     742               2 :                 break;
     743               4 :             case 5:
     744 GIC           4 :                 do_sync = false;
     745               4 :                 break;
     746               1 :             default:
     747 ECB             :                 /* getopt_long already emitted a complaint */
     748 GIC           1 :                 pg_log_error_hint("Try \"%s --help\" for more information.", progname);
     749 CBC           1 :                 exit(1);
     750 EUB             :         }
     751                 :     }
     752                 : 
     753                 :     /*
     754                 :      * Any non-option arguments?
     755 ECB             :      */
     756 GIC          14 :     if (optind < argc)
     757 ECB             :     {
     758 UIC           0 :         pg_log_error("too many command-line arguments (first is \"%s\")",
     759 ECB             :                      argv[optind]);
     760 UIC           0 :         pg_log_error_hint("Try \"%s --help\" for more information.", progname);
     761               0 :         exit(1);
     762                 :     }
     763                 : 
     764 GIC          14 :     if (do_drop_slot && do_create_slot)
     765 ECB             :     {
     766 CBC           1 :         pg_log_error("cannot use --create-slot together with --drop-slot");
     767 GIC           1 :         pg_log_error_hint("Try \"%s --help\" for more information.", progname);
     768 GBC           1 :         exit(1);
     769 ECB             :     }
     770                 : 
     771 GIC          13 :     if (replication_slot == NULL && (do_drop_slot || do_create_slot))
     772                 :     {
     773                 :         /* translator: second %s is an option name */
     774               1 :         pg_log_error("%s needs a slot to be specified using --slot",
     775                 :                      do_drop_slot ? "--drop-slot" : "--create-slot");
     776 CBC           1 :         pg_log_error_hint("Try \"%s --help\" for more information.", progname);
     777               1 :         exit(1);
     778                 :     }
     779                 : 
     780 GIC          12 :     if (synchronous && !do_sync)
     781                 :     {
     782               1 :         pg_log_error("cannot use --synchronous together with --no-sync");
     783               1 :         pg_log_error_hint("Try \"%s --help\" for more information.", progname);
     784               1 :         exit(1);
     785 ECB             :     }
     786 EUB             : 
     787                 :     /*
     788                 :      * Required arguments
     789                 :      */
     790 GIC          11 :     if (basedir == NULL && !do_drop_slot && !do_create_slot)
     791                 :     {
     792 CBC           1 :         pg_log_error("no target directory specified");
     793 GBC           1 :         pg_log_error_hint("Try \"%s --help\" for more information.", progname);
     794 GIC           1 :         exit(1);
     795                 :     }
     796                 : 
     797                 :     /*
     798                 :      * Compression options
     799                 :      */
     800              10 :     if (!parse_compress_algorithm(compression_algorithm_str,
     801                 :                                   &compression_algorithm))
     802 UIC           0 :         pg_fatal("unrecognized compression algorithm: \"%s\"",
     803                 :                  compression_algorithm_str);
     804 ECB             : 
     805 GIC          10 :     parse_compress_specification(compression_algorithm, compression_detail,
     806                 :                                  &compression_spec);
     807              10 :     error_detail = validate_compress_specification(&compression_spec);
     808              10 :     if (error_detail != NULL)
     809 CBC           1 :         pg_fatal("invalid compression specification: %s",
     810                 :                  error_detail);
     811 ECB             : 
     812 EUB             :     /* Extract the compression level */
     813 GIC           9 :     compresslevel = compression_spec.level;
     814 ECB             : 
     815 GBC           9 :     if (compression_algorithm == PG_COMPRESSION_ZSTD)
     816 LBC           0 :         pg_fatal("compression with %s is not yet supported", "ZSTD");
     817                 : 
     818                 :     /*
     819                 :      * Check existence of destination folder.
     820 ECB             :      */
     821 GIC           9 :     if (!do_drop_slot && !do_create_slot)
     822 ECB             :     {
     823 GBC           7 :         DIR        *dir = get_destination_dir(basedir);
     824                 : 
     825 CBC           7 :         close_destination_dir(dir, basedir);
     826                 :     }
     827 EUB             : 
     828 ECB             :     /*
     829                 :      * Obtain a connection before doing anything.
     830                 :      */
     831 GIC           9 :     conn = GetConnection();
     832 CBC           9 :     if (!conn)
     833 EUB             :         /* error message already written in GetConnection() */
     834 UIC           0 :         exit(1);
     835 GIC           9 :     atexit(disconnect_atexit);
     836                 : 
     837                 :     /*
     838                 :      * Trap signals.  (Don't do this until after the initial password prompt,
     839                 :      * if one is needed, in GetConnection.)
     840                 :      */
     841                 : #ifndef WIN32
     842 GNC           9 :     pqsignal(SIGINT, sigexit_handler);
     843               9 :     pqsignal(SIGTERM, sigexit_handler);
     844 ECB             : #endif
     845                 : 
     846                 :     /*
     847                 :      * Run IDENTIFY_SYSTEM to make sure we've successfully have established a
     848                 :      * replication connection and haven't connected using a database specific
     849                 :      * connection.
     850                 :      */
     851 GIC           9 :     if (!RunIdentifySystem(conn, NULL, NULL, NULL, &db_name))
     852 LBC           0 :         exit(1);
     853 ECB             : 
     854                 :     /*
     855                 :      * Check that there is a database associated with connection, none should
     856                 :      * be defined in this context.
     857 EUB             :      */
     858 GIC           9 :     if (db_name)
     859 UBC           0 :         pg_fatal("replication connection using slot \"%s\" is unexpectedly database specific",
     860                 :                  replication_slot);
     861                 : 
     862                 :     /*
     863                 :      * Set umask so that directories/files are created with the same
     864                 :      * permissions as directories/files in the source data directory.
     865                 :      *
     866                 :      * pg_mode_mask is set to owner-only by default and then updated in
     867                 :      * GetConnection() where we get the mode from the server-side with
     868                 :      * RetrieveDataDirCreatePerm() and then call SetDataDirectoryCreatePerm().
     869                 :      */
     870 GIC           9 :     umask(pg_mode_mask);
     871                 : 
     872                 :     /*
     873                 :      * Drop a replication slot.
     874                 :      */
     875               9 :     if (do_drop_slot)
     876                 :     {
     877               1 :         if (verbose)
     878 UIC           0 :             pg_log_info("dropping replication slot \"%s\"", replication_slot);
     879                 : 
     880 GIC           1 :         if (!DropReplicationSlot(conn, replication_slot))
     881 UIC           0 :             exit(1);
     882 GIC           1 :         exit(0);
     883                 :     }
     884                 : 
     885                 :     /* Create a replication slot */
     886               8 :     if (do_create_slot)
     887                 :     {
     888               1 :         if (verbose)
     889 UIC           0 :             pg_log_info("creating replication slot \"%s\"", replication_slot);
     890                 : 
     891 GIC           1 :         if (!CreateReplicationSlot(conn, replication_slot, NULL, false, true, false,
     892                 :                                    slot_exists_ok, false))
     893 UIC           0 :             exit(1);
     894 GIC           1 :         exit(0);
     895                 :     }
     896                 : 
     897                 :     /* determine remote server's xlog segment size */
     898               7 :     if (!RetrieveWalSegSize(conn))
     899 UIC           0 :         exit(1);
     900                 : 
     901                 :     /*
     902                 :      * Don't close the connection here so that subsequent StreamLog() can
     903                 :      * reuse it.
     904                 :      */
     905                 : 
     906                 :     while (true)
     907                 :     {
     908 GIC           7 :         StreamLog();
     909               7 :         if (time_to_stop)
     910                 :         {
     911                 :             /*
     912                 :              * We've been Ctrl-C'ed or end of streaming position has been
     913                 :              * willingly reached, so exit without an error code.
     914                 :              */
     915               6 :             exit(0);
     916                 :         }
     917               1 :         else if (noloop)
     918               1 :             pg_fatal("disconnected");
     919                 :         else
     920                 :         {
     921                 :             /* translator: check source for value for %d */
     922 UIC           0 :             pg_log_info("disconnected; waiting %d seconds to try again",
     923                 :                         RECONNECT_SLEEP_TIME);
     924               0 :             pg_usleep(RECONNECT_SLEEP_TIME * 1000000);
     925                 :         }
     926                 :     }
     927                 : }
        

Generated by: LCOV version v1.16-55-g56c0a2a