LCOV - differential code coverage report
Current view: top level - src/bin/pg_basebackup - pg_receivewal.c (source / functions) Coverage Total Hit UNC LBC UIC UBC GBC GIC GNC CBC EUB ECB DUB DCB
Current: Differential Code Coverage HEAD vs 15 Lines: 78.1 % 375 293 13 21 39 9 26 161 12 94 44 139 3 39
Current Date: 2023-04-08 17:13:01 Functions: 90.0 % 10 9 1 9 1 8 1
Baseline: 15 Line coverage date bins:
Baseline Date: 2023-04-08 15:09:40 (60,120] days: 36.8 % 19 7 12 7
Legend: Lines: hit not hit (180,240] days: 66.7 % 9 6 1 1 1 1 1 4 1 1
(240..) days: 80.7 % 347 280 20 38 9 25 160 1 94 43 138
Function coverage date bins:
(180,240] days: 0.0 % 1 0 1
(240..) days: 50.0 % 18 9 9 1 8

 Age         Owner                  TLA  Line data    Source code
                                  1                 : /*-------------------------------------------------------------------------
                                  2                 :  *
                                  3                 :  * pg_receivewal.c - receive streaming WAL data and write it
                                  4                 :  *                    to a local file.
                                  5                 :  *
                                  6                 :  * Author: Magnus Hagander <magnus@hagander.net>
                                  7                 :  *
                                  8                 :  * Portions Copyright (c) 1996-2023, PostgreSQL Global Development Group
                                  9                 :  *
                                 10                 :  * IDENTIFICATION
                                 11                 :  *        src/bin/pg_basebackup/pg_receivewal.c
                                 12                 :  *-------------------------------------------------------------------------
                                 13                 :  */
                                 14                 : 
                                 15                 : #include "postgres_fe.h"
                                 16                 : 
                                 17                 : #include <dirent.h>
                                 18                 : #include <limits.h>
                                 19                 : #include <signal.h>
                                 20                 : #include <sys/stat.h>
                                 21                 : #include <unistd.h>
                                 22                 : 
                                 23                 : #ifdef USE_LZ4
                                 24                 : #include <lz4frame.h>
                                 25                 : #endif
                                 26                 : #ifdef HAVE_LIBZ
                                 27                 : #include <zlib.h>
                                 28                 : #endif
                                 29                 : 
                                 30                 : #include "access/xlog_internal.h"
                                 31                 : #include "common/file_perm.h"
                                 32                 : #include "common/logging.h"
                                 33                 : #include "fe_utils/option_utils.h"
                                 34                 : #include "getopt_long.h"
                                 35                 : #include "libpq-fe.h"
                                 36                 : #include "receivelog.h"
                                 37                 : #include "streamutil.h"
                                 38                 : 
                                 39                 : /* Time to sleep between reconnection attempts */
                                 40                 : #define RECONNECT_SLEEP_TIME 5
                                 41                 : 
                                 42                 : /* Global options */
                                 43                 : static char *basedir = NULL;
                                 44                 : static int  verbose = 0;
                                 45                 : static int  compresslevel = 0;
                                 46                 : static bool noloop = false;
                                 47                 : static int  standby_message_timeout = 10 * 1000;    /* 10 sec = default */
                                 48                 : static volatile sig_atomic_t time_to_stop = false;
                                 49                 : static bool do_create_slot = false;
                                 50                 : static bool slot_exists_ok = false;
                                 51                 : static bool do_drop_slot = false;
                                 52                 : static bool do_sync = true;
                                 53                 : static bool synchronous = false;
                                 54                 : static char *replication_slot = NULL;
                                 55                 : static pg_compress_algorithm compression_algorithm = PG_COMPRESSION_NONE;
                                 56                 : static XLogRecPtr endpos = InvalidXLogRecPtr;
                                 57                 : 
                                 58                 : 
                                 59                 : static void usage(void);
                                 60                 : static DIR *get_destination_dir(char *dest_folder);
                                 61                 : static void close_destination_dir(DIR *dest_dir, char *dest_folder);
                                 62                 : static XLogRecPtr FindStreamingStart(uint32 *tli);
                                 63                 : static void StreamLog(void);
                                 64                 : static bool stop_streaming(XLogRecPtr xlogpos, uint32 timeline,
                                 65                 :                            bool segment_finished);
 4183 magnus                     66 ECB             : 
                                 67                 : static void
 1562 peter                      68 CBC           9 : disconnect_atexit(void)
 1562 peter                      69 ECB             : {
 1562 peter                      70 CBC           9 :     if (conn != NULL)
 1562 peter                      71 GIC           3 :         PQfinish(conn);
                                 72               9 : }
 3346 magnus                     73 ECB             : 
                                 74                 : static void
 4183 magnus                     75 CBC           1 : usage(void)
                                 76                 : {
 2158 peter_e                    77               1 :     printf(_("%s receives PostgreSQL streaming write-ahead logs.\n\n"),
 4183 magnus                     78 ECB             :            progname);
 4183 magnus                     79 CBC           1 :     printf(_("Usage:\n"));
                                 80               1 :     printf(_("  %s [OPTION]...\n"), progname);
 3953 peter_e                    81               1 :     printf(_("\nOptions:\n"));
 2158                            82               1 :     printf(_("  -D, --directory=DIR    receive write-ahead log files into this directory\n"));
 2036                            83               1 :     printf(_("  -E, --endpos=LSN       exit after receiving the specified LSN\n"));
 2762                            84               1 :     printf(_("      --if-not-exists    do not error if slot already exists when creating a slot\n"));
 3904 alvherre                   85               1 :     printf(_("  -n, --no-loop          do not loop on connection lost\n"));
 1988 rhaas                      86 GIC           1 :     printf(_("      --no-sync          do not wait for changes to be written safely to disk\n"));
 3101 peter_e                    87 CBC           1 :     printf(_("  -s, --status-interval=SECS\n"
 3101 peter_e                    88 ECB             :              "                         time between status packets sent to server (default: %d)\n"), (standby_message_timeout / 1000));
 3101 peter_e                    89 CBC           1 :     printf(_("  -S, --slot=SLOTNAME    replication slot to use\n"));
 2158                            90               1 :     printf(_("      --synchronous      flush write-ahead log immediately after writing\n"));
 3904 alvherre                   91               1 :     printf(_("  -v, --verbose          output verbose messages\n"));
 3904 alvherre                   92 GIC           1 :     printf(_("  -V, --version          output version information, then exit\n"));
  361 michael                    93 CBC           1 :     printf(_("  -Z, --compress=METHOD[:DETAIL]\n"
  361 michael                    94 ECB             :              "                         compress as specified\n"));
 3904 alvherre                   95 CBC           1 :     printf(_("  -?, --help             show this help, then exit\n"));
 4183 magnus                     96               1 :     printf(_("\nConnection options:\n"));
 3695 heikki.linnakangas         97               1 :     printf(_("  -d, --dbname=CONNSTR   connection string\n"));
 3904 alvherre                   98               1 :     printf(_("  -h, --host=HOSTNAME    database server host or socket directory\n"));
                                 99               1 :     printf(_("  -p, --port=PORT        database server port number\n"));
                                100               1 :     printf(_("  -U, --username=NAME    connect as specified database user\n"));
                                101               1 :     printf(_("  -w, --no-password      never prompt for password\n"));
                                102               1 :     printf(_("  -W, --password         force password prompt (should happen automatically)\n"));
 3107 andres                    103               1 :     printf(_("\nOptional actions:\n"));
                                104               1 :     printf(_("      --create-slot      create a new replication slot (for the slot's name see --slot)\n"));
                                105               1 :     printf(_("      --drop-slot        drop the replication slot (for the slot's name see --slot)\n"));
 1136 peter                     106               1 :     printf(_("\nReport bugs to <%s>.\n"), PACKAGE_BUGREPORT);
 1136 peter                     107 GIC           1 :     printf(_("%s home page: <%s>\n"), PACKAGE_NAME, PACKAGE_URL);
 4183 magnus                    108               1 : }
                                109                 : 
  361 michael                   110 ECB             : 
  521                           111                 : /*
                                112                 :  * Check if the filename looks like a WAL file, letting caller know if this
                                113                 :  * WAL segment is partial and/or compressed.
                                114                 :  */
                                115                 : static bool
  521 michael                   116 GIC          23 : is_xlogfilename(const char *filename, bool *ispartial,
                                117                 :                 pg_compress_algorithm *wal_compression_algorithm)
                                118                 : {
  521 michael                   119 GBC          23 :     size_t      fname_len = strlen(filename);
  521 michael                   120 GIC          23 :     size_t      xlog_pattern_len = strspn(filename, "0123456789ABCDEF");
                                121                 : 
                                122                 :     /* File does not look like a WAL file */
  521 michael                   123 CBC          23 :     if (xlog_pattern_len != XLOG_FNAME_LEN)
  521 michael                   124 GIC          14 :         return false;
                                125                 : 
                                126                 :     /* File looks like a completed uncompressed WAL file */
                                127               9 :     if (fname_len == XLOG_FNAME_LEN)
                                128                 :     {
  521 michael                   129 LBC           0 :         *ispartial = false;
  362                           130               0 :         *wal_compression_algorithm = PG_COMPRESSION_NONE;
  521 michael                   131 UIC           0 :         return true;
                                132                 :     }
                                133                 : 
  521 michael                   134 ECB             :     /* File looks like a completed gzip-compressed WAL file */
  521 michael                   135 GIC           9 :     if (fname_len == XLOG_FNAME_LEN + strlen(".gz") &&
  521 michael                   136 CBC           2 :         strcmp(filename + XLOG_FNAME_LEN, ".gz") == 0)
  521 michael                   137 ECB             :     {
  521 michael                   138 GIC           2 :         *ispartial = false;
  362                           139               2 :         *wal_compression_algorithm = PG_COMPRESSION_GZIP;
  521 michael                   140 CBC           2 :         return true;
  521 michael                   141 ECB             :     }
                                142                 : 
                                143                 :     /* File looks like a completed LZ4-compressed WAL file */
  520 michael                   144 GIC           7 :     if (fname_len == XLOG_FNAME_LEN + strlen(".lz4") &&
                                145               1 :         strcmp(filename + XLOG_FNAME_LEN, ".lz4") == 0)
                                146                 :     {
                                147               1 :         *ispartial = false;
  362                           148               1 :         *wal_compression_algorithm = PG_COMPRESSION_LZ4;
  520                           149               1 :         return true;
                                150                 :     }
                                151                 : 
  521 michael                   152 ECB             :     /* File looks like a partial uncompressed WAL file */
  521 michael                   153 CBC           6 :     if (fname_len == XLOG_FNAME_LEN + strlen(".partial") &&
  521 michael                   154 GIC           3 :         strcmp(filename + XLOG_FNAME_LEN, ".partial") == 0)
                                155                 :     {
                                156               3 :         *ispartial = true;
  362 michael                   157 CBC           3 :         *wal_compression_algorithm = PG_COMPRESSION_NONE;
  521                           158               3 :         return true;
                                159                 :     }
  521 michael                   160 ECB             : 
                                161                 :     /* File looks like a partial gzip-compressed WAL file */
  521 michael                   162 GBC           3 :     if (fname_len == XLOG_FNAME_LEN + strlen(".gz.partial") &&
                                163               2 :         strcmp(filename + XLOG_FNAME_LEN, ".gz.partial") == 0)
  521 michael                   164 EUB             :     {
  521 michael                   165 GIC           2 :         *ispartial = true;
  362 michael                   166 CBC           2 :         *wal_compression_algorithm = PG_COMPRESSION_GZIP;
  521 michael                   167 GIC           2 :         return true;
                                168                 :     }
                                169                 : 
                                170                 :     /* File looks like a partial LZ4-compressed WAL file */
  520                           171               1 :     if (fname_len == XLOG_FNAME_LEN + strlen(".lz4.partial") &&
                                172               1 :         strcmp(filename + XLOG_FNAME_LEN, ".lz4.partial") == 0)
                                173                 :     {
  520 michael                   174 CBC           1 :         *ispartial = true;
  362 michael                   175 GIC           1 :         *wal_compression_algorithm = PG_COMPRESSION_LZ4;
  520                           176               1 :         return true;
                                177                 :     }
  520 michael                   178 ECB             : 
  521                           179                 :     /* File does not look like something we know */
  521 michael                   180 LBC           0 :     return false;
  521 michael                   181 EUB             : }
                                182                 : 
 4183 magnus                    183 ECB             : static bool
 3734 heikki.linnakangas        184 GIC         182 : stop_streaming(XLogRecPtr xlogpos, uint32 timeline, bool segment_finished)
                                185                 : {
                                186                 :     static uint32 prevtimeline = 0;
                                187                 :     static XLogRecPtr prevpos = InvalidXLogRecPtr;
                                188                 : 
                                189                 :     /* we assume that we get called once at the end of each segment */
 3971 magnus                    190             182 :     if (verbose && segment_finished)
 1469 peter                     191 CBC           6 :         pg_log_info("finished segment at %X/%X (timeline %u)",
                                192                 :                     LSN_FORMAT_ARGS(xlogpos),
 1469 peter                     193 ECB             :                     timeline);
 4183 magnus                    194                 : 
 2036 peter_e                   195 GBC         182 :     if (!XLogRecPtrIsInvalid(endpos) && endpos < xlogpos)
 2036 peter_e                   196 ECB             :     {
 2036 peter_e                   197 GIC          12 :         if (verbose)
 1469 peter                     198              12 :             pg_log_info("stopped log streaming at %X/%X (timeline %u)",
                                199                 :                         LSN_FORMAT_ARGS(xlogpos),
                                200                 :                         timeline);
 2036 peter_e                   201              12 :         time_to_stop = true;
                                202              12 :         return true;
                                203                 :     }
                                204                 : 
                                205                 :     /*
                                206                 :      * Note that we report the previous, not current, position here. After a
 3623 heikki.linnakangas        207 ECB             :      * timeline switch, xlogpos points to the beginning of the segment because
                                208                 :      * that's where we always begin streaming. Reporting the end of previous
                                209                 :      * timeline isn't totally accurate, because the next timeline can begin
                                210                 :      * slightly before the end of the WAL that we received on the previous
                                211                 :      * timeline, but it's close enough for reporting purposes.
 3734                           212                 :      */
 2063 peter_e                   213 CBC         170 :     if (verbose && prevtimeline != 0 && prevtimeline != timeline)
 1469 peter                     214 GIC           1 :         pg_log_info("switched to timeline %u at %X/%X",
 1469 peter                     215 ECB             :                     timeline,
                                216                 :                     LSN_FORMAT_ARGS(prevpos));
 3734 heikki.linnakangas        217                 : 
 3734 heikki.linnakangas        218 GIC         170 :     prevtimeline = timeline;
                                219             170 :     prevpos = xlogpos;
                                220                 : 
 2036 peter_e                   221             170 :     if (time_to_stop)
                                222                 :     {
 2063 peter_e                   223 UIC           0 :         if (verbose)
 1469 peter                     224 LBC           0 :             pg_log_info("received interrupt signal, exiting");
 4183 magnus                    225 UIC           0 :         return true;
 4183 magnus                    226 ECB             :     }
 4183 magnus                    227 GIC         170 :     return false;
                                228                 : }
                                229                 : 
                                230                 : 
 3107 andres                    231 ECB             : /*
                                232                 :  * Get destination directory.
                                233                 :  */
                                234                 : static DIR *
 3107 andres                    235 GIC          14 : get_destination_dir(char *dest_folder)
                                236                 : {
                                237                 :     DIR        *dir;
                                238                 : 
                                239              14 :     Assert(dest_folder != NULL);
                                240              14 :     dir = opendir(dest_folder);
                                241              14 :     if (dir == NULL)
  366 tgl                       242 UIC           0 :         pg_fatal("could not open directory \"%s\": %m", dest_folder);
                                243                 : 
 3107 andres                    244 GIC          14 :     return dir;
                                245                 : }
                                246                 : 
                                247                 : 
                                248                 : /*
                                249                 :  * Close existing directory.
                                250                 :  */
 3107 andres                    251 ECB             : static void
 3107 andres                    252 GBC          14 : close_destination_dir(DIR *dest_dir, char *dest_folder)
                                253                 : {
 3107 andres                    254 GIC          14 :     Assert(dest_dir != NULL && dest_folder != NULL);
                                255              14 :     if (closedir(dest_dir))
  366 tgl                       256 UBC           0 :         pg_fatal("could not close directory \"%s\": %m", dest_folder);
 3107 andres                    257 GBC          14 : }
 3107 andres                    258 EUB             : 
                                259                 : 
 4183 magnus                    260                 : /*
                                261                 :  * Determine starting location for streaming, based on any existing xlog
 3734 heikki.linnakangas        262                 :  * segments in the directory. We start at the end of the last one that is
                                263                 :  * complete (size matches wal segment size), on the timeline with highest ID.
                                264                 :  *
                                265                 :  * If there are no WAL files in the directory, returns InvalidXLogRecPtr.
                                266                 :  */
 4183 magnus                    267 ECB             : static XLogRecPtr
 3734 heikki.linnakangas        268 CBC           7 : FindStreamingStart(uint32 *tli)
                                269                 : {
                                270                 :     DIR        *dir;
                                271                 :     struct dirent *dirent;
 3941 heikki.linnakangas        272 GIC           7 :     XLogSegNo   high_segno = 0;
 3734                           273               7 :     uint32      high_tli = 0;
 3485                           274               7 :     bool        high_ispartial = false;
 4183 magnus                    275 ECB             : 
 3107 andres                    276 GIC           7 :     dir = get_destination_dir(basedir);
 4183 magnus                    277 ECB             : 
 3306 bruce                     278 CBC          30 :     while (errno = 0, (dirent = readdir(dir)) != NULL)
 4183 magnus                    279 EUB             :     {
                                280                 :         uint32      tli;
 3941 heikki.linnakangas        281 ECB             :         XLogSegNo   segno;
  362 michael                   282 EUB             :         pg_compress_algorithm wal_compression_algorithm;
                                283                 :         bool        ispartial;
 4183 magnus                    284 ECB             : 
  521 michael                   285 CBC          23 :         if (!is_xlogfilename(dirent->d_name,
                                286                 :                              &ispartial, &wal_compression_algorithm))
 4183 magnus                    287 GBC          14 :             continue;
 4183 magnus                    288 EUB             : 
                                289                 :         /*
                                290                 :          * Looks like an xlog file. Parse its position.
                                291                 :          */
 2028 andres                    292 GIC           9 :         XLogFromFileName(dirent->d_name, &tli, &segno, WalSegSz);
                                293                 : 
                                294                 :         /*
 3485 heikki.linnakangas        295 ECB             :          * Check that the segment has the right size, if it's supposed to be
 2273 magnus                    296                 :          * completed.  For non-compressed segments just check the on-disk size
  520 michael                   297                 :          * and see if it matches a completed segment.  For gzip-compressed
                                298                 :          * segments, look at the last 4 bytes of the compressed file, which is
  521                           299                 :          * where the uncompressed size is located for files with a size lower
                                300                 :          * than 4GB, and then compare it to the size of a completed segment.
  521 michael                   301 EUB             :          * The 4 last bytes correspond to the ISIZE member according to
                                302                 :          * http://www.zlib.org/rfc-gzip.html.
  520                           303                 :          *
                                304                 :          * For LZ4-compressed segments, uncompress the file in a throw-away
                                305                 :          * buffer keeping track of the uncompressed size, then compare it to
  520 michael                   306 ECB             :          * the size of a completed segment.  Per its protocol, LZ4 does not
                                307                 :          * store the uncompressed size of an object by default.  contentSize
                                308                 :          * is one possible way to do that, but we need to rely on a method
                                309                 :          * where WAL segments could have been compressed by a different source
                                310                 :          * than pg_receivewal, like an archive_command with lz4.
                                311                 :          */
  362 michael                   312 CBC           9 :         if (!ispartial && wal_compression_algorithm == PG_COMPRESSION_NONE)
 4183 magnus                    313 UIC           0 :         {
                                314                 :             struct stat statbuf;
                                315                 :             char        fullpath[MAXPGPATH * 2];
 4183 magnus                    316 ECB             : 
 3485 heikki.linnakangas        317 UIC           0 :             snprintf(fullpath, sizeof(fullpath), "%s/%s", basedir, dirent->d_name);
                                318               0 :             if (stat(fullpath, &statbuf) != 0)
  366 tgl                       319               0 :                 pg_fatal("could not stat file \"%s\": %m", fullpath);
 3485 heikki.linnakangas        320 ECB             : 
 2028 andres                    321 LBC           0 :             if (statbuf.st_size != WalSegSz)
                                322                 :             {
  927 peter                     323               0 :                 pg_log_warning("segment file \"%s\" has incorrect size %lld, skipping",
  927 peter                     324 ECB             :                                dirent->d_name, (long long int) statbuf.st_size);
 4183 magnus                    325 UBC           0 :                 continue;
                                326                 :             }
 4183 magnus                    327 ECB             :         }
  362 michael                   328 CBC           9 :         else if (!ispartial && wal_compression_algorithm == PG_COMPRESSION_GZIP)
 2273 magnus                    329 GBC           2 :         {
                                330                 :             int         fd;
                                331                 :             char        buf[4];
 2153 bruce                     332 ECB             :             int         bytes_out;
                                333                 :             char        fullpath[MAXPGPATH * 2];
                                334                 :             int         r;
                                335                 : 
 2273 magnus                    336 GIC           2 :             snprintf(fullpath, sizeof(fullpath), "%s/%s", basedir, dirent->d_name);
                                337                 : 
 1668 michael                   338               2 :             fd = open(fullpath, O_RDONLY | PG_BINARY, 0);
 2273 magnus                    339 CBC           2 :             if (fd < 0)
  366 tgl                       340 LBC           0 :                 pg_fatal("could not open compressed file \"%s\": %m",
  366 tgl                       341 EUB             :                          fullpath);
 2153 bruce                     342 GIC           2 :             if (lseek(fd, (off_t) (-4), SEEK_END) < 0)
  366 tgl                       343 UIC           0 :                 pg_fatal("could not seek in compressed file \"%s\": %m",
  366 tgl                       344 ECB             :                          fullpath);
 1726 michael                   345 CBC           2 :             r = read(fd, (char *) buf, sizeof(buf));
 1726 michael                   346 GIC           2 :             if (r != sizeof(buf))
                                347                 :             {
 1726 michael                   348 LBC           0 :                 if (r < 0)
  366 tgl                       349               0 :                     pg_fatal("could not read compressed file \"%s\": %m",
  366 tgl                       350 ECB             :                              fullpath);
                                351                 :                 else
  366 tgl                       352 LBC           0 :                     pg_fatal("could not read compressed file \"%s\": read %d of %zu",
  366 tgl                       353 ECB             :                              fullpath, r, sizeof(buf));
                                354                 :             }
 2273 magnus                    355                 : 
 2273 magnus                    356 CBC           2 :             close(fd);
 2273 magnus                    357 GIC           2 :             bytes_out = (buf[3] << 24) | (buf[2] << 16) |
 2153 bruce                     358 CBC           2 :                 (buf[1] << 8) | buf[0];
 2273 magnus                    359 EUB             : 
 2028 andres                    360 GIC           2 :             if (bytes_out != WalSegSz)
                                361                 :             {
 1469 peter                     362 UIC           0 :                 pg_log_warning("compressed segment file \"%s\" has incorrect uncompressed size %d, skipping",
 1469 peter                     363 ECB             :                                dirent->d_name, bytes_out);
 2273 magnus                    364 LBC           0 :                 continue;
                                365                 :             }
                                366                 :         }
  362 michael                   367 GIC           7 :         else if (!ispartial && wal_compression_algorithm == PG_COMPRESSION_LZ4)
                                368                 :         {
                                369                 : #ifdef USE_LZ4
                                370                 : #define LZ4_CHUNK_SZ    64 * 1024   /* 64kB as maximum chunk size read */
                                371                 :             int         fd;
                                372                 :             ssize_t     r;
  520                           373               1 :             size_t      uncompressed_size = 0;
  520 michael                   374 ECB             :             char        fullpath[MAXPGPATH * 2];
                                375                 :             char       *outbuf;
                                376                 :             char       *readbuf;
  520 michael                   377 CBC           1 :             LZ4F_decompressionContext_t ctx = NULL;
  520 michael                   378 ECB             :             LZ4F_decompressOptions_t dec_opt;
                                379                 :             LZ4F_errorCode_t status;
                                380                 : 
  520 michael                   381 CBC           1 :             memset(&dec_opt, 0, sizeof(dec_opt));
  520 michael                   382 GBC           1 :             snprintf(fullpath, sizeof(fullpath), "%s/%s", basedir, dirent->d_name);
                                383                 : 
  520 michael                   384 GIC           1 :             fd = open(fullpath, O_RDONLY | PG_BINARY, 0);
  520 michael                   385 CBC           1 :             if (fd < 0)
  366 tgl                       386 UIC           0 :                 pg_fatal("could not open file \"%s\": %m", fullpath);
  520 michael                   387 EUB             : 
  520 michael                   388 GIC           1 :             status = LZ4F_createDecompressionContext(&ctx, LZ4F_VERSION);
  520 michael                   389 GBC           1 :             if (LZ4F_isError(status))
  366 tgl                       390 UIC           0 :                 pg_fatal("could not create LZ4 decompression context: %s",
                                391                 :                          LZ4F_getErrorName(status));
                                392                 : 
  520 michael                   393 GIC           1 :             outbuf = pg_malloc0(LZ4_CHUNK_SZ);
                                394               1 :             readbuf = pg_malloc0(LZ4_CHUNK_SZ);
                                395                 :             do
                                396                 :             {
                                397                 :                 char       *readp;
                                398                 :                 char       *readend;
  520 michael                   399 ECB             : 
  520 michael                   400 CBC           2 :                 r = read(fd, readbuf, LZ4_CHUNK_SZ);
                                401               2 :                 if (r < 0)
  366 tgl                       402 UIC           0 :                     pg_fatal("could not read file \"%s\": %m", fullpath);
  520 michael                   403 ECB             : 
                                404                 :                 /* Done reading the file */
  520 michael                   405 CBC           2 :                 if (r == 0)
  520 michael                   406 GIC           1 :                     break;
                                407                 : 
                                408                 :                 /* Process one chunk */
  520 michael                   409 CBC           1 :                 readp = readbuf;
  520 michael                   410 GBC           1 :                 readend = readbuf + r;
  520 michael                   411 GIC          17 :                 while (readp < readend)
  520 michael                   412 ECB             :                 {
  520 michael                   413 GIC          16 :                     size_t      out_size = LZ4_CHUNK_SZ;
  520 michael                   414 CBC          16 :                     size_t      read_size = readend - readp;
                                415                 : 
  520 michael                   416 GIC          16 :                     memset(outbuf, 0, LZ4_CHUNK_SZ);
                                417              16 :                     status = LZ4F_decompress(ctx, outbuf, &out_size,
                                418                 :                                              readp, &read_size, &dec_opt);
                                419              16 :                     if (LZ4F_isError(status))
  366 tgl                       420 UIC           0 :                         pg_fatal("could not decompress file \"%s\": %s",
                                421                 :                                  fullpath,
                                422                 :                                  LZ4F_getErrorName(status));
  520 michael                   423 ECB             : 
  520 michael                   424 GBC          16 :                     readp += read_size;
  520 michael                   425 GIC          16 :                     uncompressed_size += out_size;
  520 michael                   426 ECB             :                 }
                                427                 : 
                                428                 :                 /*
                                429                 :                  * No need to continue reading the file when the
                                430                 :                  * uncompressed_size exceeds WalSegSz, even if there are still
                                431                 :                  * data left to read. However, if uncompressed_size is equal
                                432                 :                  * to WalSegSz, it should verify that there is no more data to
                                433                 :                  * read.
                                434                 :                  */
  520 michael                   435 GIC           1 :             } while (uncompressed_size <= WalSegSz && r > 0);
                                436                 : 
                                437               1 :             close(fd);
                                438               1 :             pg_free(outbuf);
  520 michael                   439 CBC           1 :             pg_free(readbuf);
                                440                 : 
  520 michael                   441 GIC           1 :             status = LZ4F_freeDecompressionContext(ctx);
                                442               1 :             if (LZ4F_isError(status))
  366 tgl                       443 LBC           0 :                 pg_fatal("could not free LZ4 decompression context: %s",
                                444                 :                          LZ4F_getErrorName(status));
                                445                 : 
  520 michael                   446 GIC           1 :             if (uncompressed_size != WalSegSz)
                                447                 :             {
  517 tgl                       448 UIC           0 :                 pg_log_warning("compressed segment file \"%s\" has incorrect uncompressed size %zu, skipping",
  520 michael                   449 ECB             :                                dirent->d_name, uncompressed_size);
  520 michael                   450 UBC           0 :                 continue;
  520 michael                   451 ECB             :             }
                                452                 : #else
  197 peter                     453                 :             pg_log_error("cannot check file \"%s\": compression with %s not supported by this build",
                                454                 :                          dirent->d_name, "LZ4");
  520 michael                   455                 :             exit(1);
                                456                 : #endif
                                457                 :         }
                                458                 : 
                                459                 :         /* Looks like a valid segment. Remember that we saw it. */
 3485 heikki.linnakangas        460 GIC           9 :         if ((segno > high_segno) ||
                                461               4 :             (segno == high_segno && tli > high_tli) ||
 3485 heikki.linnakangas        462 GBC           4 :             (segno == high_segno && tli == high_tli && high_ispartial && !ispartial))
                                463                 :         {
 3485 heikki.linnakangas        464 GIC           5 :             high_segno = segno;
                                465               5 :             high_tli = tli;
                                466               5 :             high_ispartial = ispartial;
                                467                 :         }
                                468                 :     }
                                469                 : 
 3306 bruce                     470 CBC           7 :     if (errno)
  366 tgl                       471 UBC           0 :         pg_fatal("could not read directory \"%s\": %m", basedir);
                                472                 : 
 3107 andres                    473 GIC           7 :     close_destination_dir(dir, basedir);
                                474                 : 
 3941 heikki.linnakangas        475               7 :     if (high_segno > 0)
 4183 magnus                    476 ECB             :     {
                                477                 :         XLogRecPtr  high_ptr;
                                478                 : 
                                479                 :         /*
                                480                 :          * Move the starting pointer to the start of the next segment, if the
                                481                 :          * highest one we saw was completed. Otherwise start streaming from
                                482                 :          * the beginning of the .partial segment.
 4175                           483                 :          */
 3485 heikki.linnakangas        484 CBC           3 :         if (!high_ispartial)
 3485 heikki.linnakangas        485 UIC           0 :             high_segno++;
 4183 magnus                    486 ECB             : 
 1735 alvherre                  487 GIC           3 :         XLogSegNoOffsetToRecPtr(high_segno, 0, WalSegSz, high_ptr);
                                488                 : 
 3734 heikki.linnakangas        489               3 :         *tli = high_tli;
 4183 magnus                    490 CBC           3 :         return high_ptr;
                                491                 :     }
                                492                 :     else
 3734 heikki.linnakangas        493 GIC           4 :         return InvalidXLogRecPtr;
                                494                 : }
                                495                 : 
                                496                 : /*
                                497                 :  * Start the log streaming
 4183 magnus                    498 ECB             :  */
                                499                 : static void
 4183 magnus                    500 CBC           7 : StreamLog(void)
 4183 magnus                    501 ECB             : {
                                502                 :     XLogRecPtr  serverpos;
                                503                 :     TimeLineID  servertli;
  267 peter                     504 GNC           7 :     StreamCtl   stream = {0};
  568 michael                   505 ECB             :     char       *sysidentifier;
                                506                 : 
                                507                 :     /*
                                508                 :      * Connect in replication mode to the server
 4183 magnus                    509                 :      */
 3095 fujii                     510 GIC           7 :     if (conn == NULL)
 3095 fujii                     511 UIC           0 :         conn = GetConnection();
 3969 magnus                    512 GIC           7 :     if (!conn)
                                513                 :         /* Error message already written in GetConnection() */
 3969 magnus                    514 CBC           1 :         return;
 4183 magnus                    515 ECB             : 
 3670 heikki.linnakangas        516 GIC           7 :     if (!CheckServerVersionForStreaming(conn))
                                517                 :     {
                                518                 :         /*
 3670 heikki.linnakangas        519 ECB             :          * Error message already written in CheckServerVersionForStreaming().
                                520                 :          * There's no hope of recovering from a version mismatch, so don't
                                521                 :          * retry.
                                522                 :          */
 1562 peter                     523 LBC           0 :         exit(1);
 3670 heikki.linnakangas        524 ECB             :     }
                                525                 : 
                                526                 :     /*
                                527                 :      * Identify server, obtaining start LSN position and current timeline ID
 3112 andres                    528                 :      * at the same time, necessary if not valid data can be found in the
                                529                 :      * existing output directory.
 4183 magnus                    530                 :      */
  568 michael                   531 CBC           7 :     if (!RunIdentifySystem(conn, &sysidentifier, &servertli, &serverpos, NULL))
 1562 peter                     532 UIC           0 :         exit(1);
 4183 magnus                    533 ECB             : 
                                534                 :     /*
  530 michael                   535                 :      * Figure out where to start streaming.  First scan the local directory.
                                536                 :      */
 2585 magnus                    537 GBC           7 :     stream.startpos = FindStreamingStart(&stream.timeline);
                                538               7 :     if (stream.startpos == InvalidXLogRecPtr)
                                539                 :     {
                                540                 :         /*
  530 michael                   541 ECB             :          * Try to get the starting point from the slot if any.  This is
                                542                 :          * supported in PostgreSQL 15 and newer.
                                543                 :          */
  530 michael                   544 CBC           7 :         if (replication_slot != NULL &&
  530 michael                   545 GIC           3 :             PQserverVersion(conn) >= 150000)
                                546                 :         {
                                547               3 :             if (!GetSlotInformation(conn, replication_slot, &stream.startpos,
                                548                 :                                     &stream.timeline))
                                549                 :             {
                                550                 :                 /* Error is logged by GetSlotInformation() */
                                551               1 :                 return;
                                552                 :             }
                                553                 :         }
  530 michael                   554 EUB             : 
                                555                 :         /*
                                556                 :          * If it the starting point is still not known, use the current WAL
                                557                 :          * flush value as last resort.
                                558                 :          */
  530 michael                   559 GIC           3 :         if (stream.startpos == InvalidXLogRecPtr)
                                560                 :         {
  530 michael                   561 CBC           1 :             stream.startpos = serverpos;
  530 michael                   562 GIC           1 :             stream.timeline = servertli;
                                563                 :         }
                                564                 :     }
                                565                 : 
                                566               6 :     Assert(stream.startpos != InvalidXLogRecPtr &&
                                567                 :            stream.timeline != 0);
                                568                 : 
                                569                 :     /*
                                570                 :      * Always start streaming at the beginning of a segment
                                571                 :      */
 2028 andres                    572               6 :     stream.startpos -= XLogSegmentOffset(stream.startpos, WalSegSz);
                                573                 : 
                                574                 :     /*
                                575                 :      * Start the replication
                                576                 :      */
 4183 magnus                    577               6 :     if (verbose)
 1469 peter                     578               6 :         pg_log_info("starting log streaming at %X/%X (timeline %u)",
                                579                 :                     LSN_FORMAT_ARGS(stream.startpos),
                                580                 :                     stream.timeline);
                                581                 : 
 2585 magnus                    582               6 :     stream.stream_stop = stop_streaming;
 2173 tgl                       583               6 :     stream.stop_socket = PGINVALID_SOCKET;
 2585 magnus                    584               6 :     stream.standby_message_timeout = standby_message_timeout;
                                585               6 :     stream.synchronous = synchronous;
 1988 rhaas                     586               6 :     stream.do_sync = do_sync;
 2585 magnus                    587               6 :     stream.mark_done = false;
  521 michael                   588              12 :     stream.walmethod = CreateWalDirectoryMethod(basedir,
                                589                 :                                                 compression_algorithm,
                                590                 :                                                 compresslevel,
 2273 magnus                    591               6 :                                                 stream.do_sync);
 2585                           592               6 :     stream.partial_suffix = ".partial";
 2274                           593               6 :     stream.replication_slot = replication_slot;
  568 michael                   594 CBC           6 :     stream.sysidentifier = sysidentifier;
 4183 magnus                    595 ECB             : 
 2585 magnus                    596 CBC           6 :     ReceiveXlogStream(conn, &stream);
                                597                 : 
  202 rhaas                     598 GNC           6 :     if (!stream.walmethod->ops->finish(stream.walmethod))
 2359 magnus                    599 ECB             :     {
 1469 peter                     600 LBC           0 :         pg_log_info("could not finish writing WAL files: %m");
 2359 magnus                    601 UIC           0 :         return;
 2359 magnus                    602 ECB             :     }
                                603                 : 
 4175 magnus                    604 CBC           6 :     PQfinish(conn);
 1562 peter                     605 GIC           6 :     conn = NULL;
 2357 magnus                    606 ECB             : 
  202 rhaas                     607 GNC           6 :     stream.walmethod->ops->free(stream.walmethod);
 4183 magnus                    608 ECB             : }
                                609                 : 
                                610                 : /*
                                611                 :  * When SIGINT/SIGTERM are caught, just tell the system to exit at the next
                                612                 :  * possible moment.
                                613                 :  */
                                614                 : #ifndef WIN32
 4138 andrew                    615                 : 
 4183 magnus                    616                 : static void
  207 tgl                       617 UNC           0 : sigexit_handler(SIGNAL_ARGS)
 4183 magnus                    618 ECB             : {
 2036 peter_e                   619 UIC           0 :     time_to_stop = true;
 4183 magnus                    620 UBC           0 : }
 4138 andrew                    621 EUB             : #endif
                                622                 : 
 4183 magnus                    623 ECB             : int
 4183 magnus                    624 CBC          17 : main(int argc, char **argv)
 4183 magnus                    625 ECB             : {
                                626                 :     static struct option long_options[] = {
                                627                 :         {"help", no_argument, NULL, '?'},
 4183 magnus                    628 EUB             :         {"version", no_argument, NULL, 'V'},
 3953 peter_e                   629 ECB             :         {"directory", required_argument, NULL, 'D'},
 3695 heikki.linnakangas        630                 :         {"dbname", required_argument, NULL, 'd'},
 2036 peter_e                   631 EUB             :         {"endpos", required_argument, NULL, 'E'},
 4183 magnus                    632                 :         {"host", required_argument, NULL, 'h'},
                                633                 :         {"port", required_argument, NULL, 'p'},
 4183 magnus                    634 ECB             :         {"username", required_argument, NULL, 'U'},
 3904 alvherre                  635                 :         {"no-loop", no_argument, NULL, 'n'},
 4183 magnus                    636                 :         {"no-password", no_argument, NULL, 'w'},
 4183 magnus                    637 EUB             :         {"password", no_argument, NULL, 'W'},
 3904 alvherre                  638                 :         {"status-interval", required_argument, NULL, 's'},
 3355 rhaas                     639                 :         {"slot", required_argument, NULL, 'S'},
 4183 magnus                    640                 :         {"verbose", no_argument, NULL, 'v'},
 2273                           641                 :         {"compress", required_argument, NULL, 'Z'},
                                642                 : /* action */
                                643                 :         {"create-slot", no_argument, NULL, 1},
 3107 andres                    644                 :         {"drop-slot", no_argument, NULL, 2},
 2828                           645                 :         {"if-not-exists", no_argument, NULL, 3},
                                646                 :         {"synchronous", no_argument, NULL, 4},
 1988 rhaas                     647 ECB             :         {"no-sync", no_argument, NULL, 5},
 4183 magnus                    648                 :         {NULL, 0, NULL, 0}
                                649                 :     };
 3782 bruce                     650 EUB             : 
 4183 magnus                    651                 :     int         c;
                                652                 :     int         option_index;
 3107 andres                    653 ECB             :     char       *db_name;
 1957 rhaas                     654                 :     uint32      hi,
                                655                 :                 lo;
  361 michael                   656 EUB             :     pg_compress_specification compression_spec;
  361 michael                   657 GBC          17 :     char       *compression_detail = NULL;
                                658              17 :     char       *compression_algorithm_str = "none";
                                659              17 :     char       *error_detail = NULL;
 4183 magnus                    660 EUB             : 
 1469 peter                     661 GBC          17 :     pg_logging_init(argv[0]);
 4183 magnus                    662 CBC          17 :     progname = get_progname(argv[0]);
 2659 alvherre                  663              17 :     set_pglocale_pgservice(argv[0], PG_TEXTDOMAIN("pg_basebackup"));
                                664                 : 
 4183 magnus                    665              17 :     if (argc > 1)
 4183 magnus                    666 ECB             :     {
 4183 magnus                    667 CBC          16 :         if (strcmp(argv[1], "--help") == 0 || strcmp(argv[1], "-?") == 0)
 4183 magnus                    668 ECB             :         {
 4183 magnus                    669 CBC           1 :             usage();
                                670               1 :             exit(0);
 4183 magnus                    671 ECB             :         }
 3904 alvherre                  672 GBC          15 :         else if (strcmp(argv[1], "-V") == 0 ||
                                673              15 :                  strcmp(argv[1], "--version") == 0)
 4183 magnus                    674 EUB             :         {
 2250 rhaas                     675 CBC           1 :             puts("pg_receivewal (PostgreSQL) " PG_VERSION);
 4183 magnus                    676               1 :             exit(0);
 4183 magnus                    677 ECB             :         }
                                678                 :     }
                                679                 : 
  118 peter                     680 GNC          66 :     while ((c = getopt_long(argc, argv, "d:D:E:h:np:s:S:U:vwWZ:",
 4183 magnus                    681 CBC          66 :                             long_options, &option_index)) != -1)
                                682                 :     {
                                683              52 :         switch (c)
 4183 magnus                    684 ECB             :         {
  118 peter                     685 UNC           0 :             case 'd':
                                686               0 :                 connection_string = pg_strdup(optarg);
                                687               0 :                 break;
 4183 magnus                    688 GIC          11 :             case 'D':
 3841 tgl                       689              11 :                 basedir = pg_strdup(optarg);
 4183 magnus                    690              11 :                 break;
  118 peter                     691 GNC           7 :             case 'E':
                                692               7 :                 if (sscanf(optarg, "%X/%X", &hi, &lo) != 2)
  118 peter                     693 UNC           0 :                     pg_fatal("could not parse end position \"%s\"", optarg);
  118 peter                     694 GNC           7 :                 endpos = ((uint64) hi) << 32 | lo;
 3695 heikki.linnakangas        695 GIC           7 :                 break;
 4183 magnus                    696 LBC           0 :             case 'h':
 3841 tgl                       697 UIC           0 :                 dbhost = pg_strdup(optarg);
 4183 magnus                    698 UBC           0 :                 break;
  118 peter                     699 GNC           7 :             case 'n':
                                700               7 :                 noloop = true;
                                701               7 :                 break;
 4183 magnus                    702 UIC           0 :             case 'p':
 3841 tgl                       703 UBC           0 :                 dbport = pg_strdup(optarg);
 4183 magnus                    704               0 :                 break;
 4183 magnus                    705 LBC           0 :             case 's':
  624 michael                   706 UIC           0 :                 if (!option_parse_int(optarg, "-s/--status-interval", 0,
                                707                 :                                       INT_MAX / 1000,
  624 michael                   708 ECB             :                                       &standby_message_timeout))
 4183 magnus                    709 UIC           0 :                     exit(1);
  624 michael                   710 LBC           0 :                 standby_message_timeout *= 1000;
 4183 magnus                    711               0 :                 break;
 3355 rhaas                     712 GIC           5 :             case 'S':
                                713               5 :                 replication_slot = pg_strdup(optarg);
 3355 rhaas                     714 CBC           5 :                 break;
  118 peter                     715 UNC           0 :             case 'U':
                                716               0 :                 dbuser = pg_strdup(optarg);
 3969 magnus                    717 UIC           0 :                 break;
 4183 magnus                    718 GIC           7 :             case 'v':
 4183 magnus                    719 CBC           7 :                 verbose++;
 4183 magnus                    720 GIC           7 :                 break;
  118 peter                     721 UNC           0 :             case 'w':
                                722               0 :                 dbgetpassword = -1;
                                723               0 :                 break;
                                724               0 :             case 'W':
                                725               0 :                 dbgetpassword = 1;
                                726               0 :                 break;
 2273 magnus                    727 CBC           3 :             case 'Z':
  361 michael                   728               3 :                 parse_compress_options(optarg, &compression_algorithm_str,
  361 michael                   729 ECB             :                                        &compression_detail);
 2273 magnus                    730 GIC           3 :                 break;
 3107 andres                    731               3 :             case 1:
                                732               3 :                 do_create_slot = true;
                                733               3 :                 break;
 3107 andres                    734 CBC           2 :             case 2:
 3107 andres                    735 GIC           2 :                 do_drop_slot = true;
 3107 andres                    736 GBC           2 :                 break;
 3064 fujii                     737 UIC           0 :             case 3:
 2828 andres                    738               0 :                 slot_exists_ok = true;
 2828 andres                    739 LBC           0 :                 break;
 2828 andres                    740 GIC           2 :             case 4:
 3064 fujii                     741 CBC           2 :                 synchronous = true;
                                742               2 :                 break;
 1988 rhaas                     743               4 :             case 5:
 1988 rhaas                     744 GIC           4 :                 do_sync = false;
                                745               4 :                 break;
 4183 magnus                    746               1 :             default:
  366 tgl                       747 ECB             :                 /* getopt_long already emitted a complaint */
  366 tgl                       748 GIC           1 :                 pg_log_error_hint("Try \"%s --help\" for more information.", progname);
 4183 magnus                    749 CBC           1 :                 exit(1);
 4183 magnus                    750 EUB             :         }
                                751                 :     }
                                752                 : 
                                753                 :     /*
                                754                 :      * Any non-option arguments?
 4183 magnus                    755 ECB             :      */
 4183 magnus                    756 GIC          14 :     if (optind < argc)
 4183 magnus                    757 ECB             :     {
 1469 peter                     758 UIC           0 :         pg_log_error("too many command-line arguments (first is \"%s\")",
 1469 peter                     759 ECB             :                      argv[optind]);
  366 tgl                       760 UIC           0 :         pg_log_error_hint("Try \"%s --help\" for more information.", progname);
 4183 magnus                    761               0 :         exit(1);
                                762                 :     }
                                763                 : 
 2883 peter_e                   764 GIC          14 :     if (do_drop_slot && do_create_slot)
 3107 andres                    765 ECB             :     {
 1469 peter                     766 CBC           1 :         pg_log_error("cannot use --create-slot together with --drop-slot");
  366 tgl                       767 GIC           1 :         pg_log_error_hint("Try \"%s --help\" for more information.", progname);
 3107 andres                    768 GBC           1 :         exit(1);
 3107 andres                    769 ECB             :     }
                                770                 : 
 2883 peter_e                   771 GIC          13 :     if (replication_slot == NULL && (do_drop_slot || do_create_slot))
                                772                 :     {
                                773                 :         /* translator: second %s is an option name */
 1469 peter                     774               1 :         pg_log_error("%s needs a slot to be specified using --slot",
                                775                 :                      do_drop_slot ? "--drop-slot" : "--create-slot");
  366 tgl                       776 CBC           1 :         pg_log_error_hint("Try \"%s --help\" for more information.", progname);
 3107 andres                    777               1 :         exit(1);
                                778                 :     }
                                779                 : 
 1988 rhaas                     780 GIC          12 :     if (synchronous && !do_sync)
                                781                 :     {
 1469 peter                     782               1 :         pg_log_error("cannot use --synchronous together with --no-sync");
  366 tgl                       783               1 :         pg_log_error_hint("Try \"%s --help\" for more information.", progname);
 1988 rhaas                     784               1 :         exit(1);
 1988 rhaas                     785 ECB             :     }
 1988 rhaas                     786 EUB             : 
                                787                 :     /*
                                788                 :      * Required arguments
                                789                 :      */
 2799 andres                    790 GIC          11 :     if (basedir == NULL && !do_drop_slot && !do_create_slot)
                                791                 :     {
 1469 peter                     792 CBC           1 :         pg_log_error("no target directory specified");
  366 tgl                       793 GBC           1 :         pg_log_error_hint("Try \"%s --help\" for more information.", progname);
 4183 magnus                    794 GIC           1 :         exit(1);
                                795                 :     }
                                796                 : 
                                797                 :     /*
                                798                 :      * Compression options
                                799                 :      */
  361 michael                   800              10 :     if (!parse_compress_algorithm(compression_algorithm_str,
                                801                 :                                   &compression_algorithm))
  197 peter                     802 UIC           0 :         pg_fatal("unrecognized compression algorithm: \"%s\"",
                                803                 :                  compression_algorithm_str);
  361 michael                   804 ECB             : 
  361 michael                   805 GIC          10 :     parse_compress_specification(compression_algorithm, compression_detail,
                                806                 :                                  &compression_spec);
                                807              10 :     error_detail = validate_compress_specification(&compression_spec);
                                808              10 :     if (error_detail != NULL)
  361 michael                   809 CBC           1 :         pg_fatal("invalid compression specification: %s",
                                810                 :                  error_detail);
  361 michael                   811 ECB             : 
  207 michael                   812 EUB             :     /* Extract the compression level */
  207 michael                   813 GIC           9 :     compresslevel = compression_spec.level;
  521 michael                   814 ECB             : 
  207 michael                   815 GBC           9 :     if (compression_algorithm == PG_COMPRESSION_ZSTD)
  207 michael                   816 LBC           0 :         pg_fatal("compression with %s is not yet supported", "ZSTD");
                                817                 : 
                                818                 :     /*
                                819                 :      * Check existence of destination folder.
 3107 andres                    820 ECB             :      */
 2799 andres                    821 GIC           9 :     if (!do_drop_slot && !do_create_slot)
 3107 andres                    822 ECB             :     {
 2878 bruce                     823 GBC           7 :         DIR        *dir = get_destination_dir(basedir);
                                824                 : 
 3107 andres                    825 CBC           7 :         close_destination_dir(dir, basedir);
                                826                 :     }
 3107 andres                    827 EUB             : 
 3107 andres                    828 ECB             :     /*
                                829                 :      * Obtain a connection before doing anything.
                                830                 :      */
 3107 andres                    831 GIC           9 :     conn = GetConnection();
 3107 andres                    832 CBC           9 :     if (!conn)
 3107 andres                    833 EUB             :         /* error message already written in GetConnection() */
 3107 andres                    834 UIC           0 :         exit(1);
 1562 peter                     835 GIC           9 :     atexit(disconnect_atexit);
                                836                 : 
                                837                 :     /*
                                838                 :      * Trap signals.  (Don't do this until after the initial password prompt,
                                839                 :      * if one is needed, in GetConnection.)
                                840                 :      */
                                841                 : #ifndef WIN32
  207 dgustafsson               842 GNC           9 :     pqsignal(SIGINT, sigexit_handler);
                                843               9 :     pqsignal(SIGTERM, sigexit_handler);
  504 tgl                       844 ECB             : #endif
                                845                 : 
                                846                 :     /*
                                847                 :      * Run IDENTIFY_SYSTEM to make sure we've successfully have established a
                                848                 :      * replication connection and haven't connected using a database specific
                                849                 :      * connection.
 3107 andres                    850                 :      */
 3107 andres                    851 GIC           9 :     if (!RunIdentifySystem(conn, NULL, NULL, NULL, &db_name))
 1562 peter                     852 LBC           0 :         exit(1);
 3107 andres                    853 ECB             : 
                                854                 :     /*
                                855                 :      * Check that there is a database associated with connection, none should
                                856                 :      * be defined in this context.
 3107 andres                    857 EUB             :      */
 3107 andres                    858 GIC           9 :     if (db_name)
  366 tgl                       859 UBC           0 :         pg_fatal("replication connection using slot \"%s\" is unexpectedly database specific",
                                860                 :                  replication_slot);
                                861                 : 
                                862                 :     /*
                                863                 :      * Set umask so that directories/files are created with the same
                                864                 :      * permissions as directories/files in the source data directory.
                                865                 :      *
                                866                 :      * pg_mode_mask is set to owner-only by default and then updated in
                                867                 :      * GetConnection() where we get the mode from the server-side with
                                868                 :      * RetrieveDataDirCreatePerm() and then call SetDataDirectoryCreatePerm().
                                869                 :      */
  568 michael                   870 GIC           9 :     umask(pg_mode_mask);
                                871                 : 
                                872                 :     /*
                                873                 :      * Drop a replication slot.
                                874                 :      */
 3107 andres                    875               9 :     if (do_drop_slot)
                                876                 :     {
                                877               1 :         if (verbose)
 1418 tgl                       878 UIC           0 :             pg_log_info("dropping replication slot \"%s\"", replication_slot);
                                879                 : 
 3107 andres                    880 GIC           1 :         if (!DropReplicationSlot(conn, replication_slot))
 1562 peter                     881 UIC           0 :             exit(1);
 1562 peter                     882 GIC           1 :         exit(0);
                                883                 :     }
                                884                 : 
                                885                 :     /* Create a replication slot */
 3107 andres                    886               8 :     if (do_create_slot)
                                887                 :     {
                                888               1 :         if (verbose)
 1469 peter                     889 UIC           0 :             pg_log_info("creating replication slot \"%s\"", replication_slot);
                                890                 : 
 2021 peter_e                   891 GIC           1 :         if (!CreateReplicationSlot(conn, replication_slot, NULL, false, true, false,
                                892                 :                                    slot_exists_ok, false))
 1562 peter                     893 UIC           0 :             exit(1);
 1562 peter                     894 GIC           1 :         exit(0);
                                895                 :     }
                                896                 : 
                                897                 :     /* determine remote server's xlog segment size */
  568 michael                   898               7 :     if (!RetrieveWalSegSize(conn))
  568 michael                   899 UIC           0 :         exit(1);
                                900                 : 
                                901                 :     /*
                                902                 :      * Don't close the connection here so that subsequent StreamLog() can
                                903                 :      * reuse it.
                                904                 :      */
                                905                 : 
                                906                 :     while (true)
                                907                 :     {
 3969 magnus                    908 GIC           7 :         StreamLog();
 2036 peter_e                   909               7 :         if (time_to_stop)
                                910                 :         {
                                911                 :             /*
                                912                 :              * We've been Ctrl-C'ed or end of streaming position has been
                                913                 :              * willingly reached, so exit without an error code.
                                914                 :              */
 3969 magnus                    915               6 :             exit(0);
                                916                 :         }
                                917               1 :         else if (noloop)
  366 tgl                       918               1 :             pg_fatal("disconnected");
                                919                 :         else
                                920                 :         {
                                921                 :             /* translator: check source for value for %d */
 1469 peter                     922 UIC           0 :             pg_log_info("disconnected; waiting %d seconds to try again",
                                923                 :                         RECONNECT_SLEEP_TIME);
 3969 magnus                    924               0 :             pg_usleep(RECONNECT_SLEEP_TIME * 1000000);
                                925                 :         }
                                926                 :     }
                                927                 : }
        

Generated by: LCOV version v1.16-55-g56c0a2a