LCOV - differential code coverage report
Current view: top level - src/bin/pg_basebackup - pg_recvlogical.c (source / functions) Coverage Total Hit UNC LBC UIC UBC GBC GIC GNC CBC EUB ECB DUB DCB
Current: Differential Code Coverage HEAD vs 15 Lines: 73.6 % 462 340 4 23 56 39 40 154 4 142 40 172 3 3
Current Date: 2023-04-08 17:13:01 Functions: 90.0 % 10 9 1 8 1 1 9
Baseline: 15 Line coverage date bins:
Baseline Date: 2023-04-08 15:09:40 (60,120] days: 25.0 % 4 1 3 1
Legend: Lines: hit not hit (180,240] days: 80.0 % 5 4 1 3 1
(240..) days: 74.0 % 453 335 23 56 39 40 154 141 40 172
Function coverage date bins:
(180,240] days: 50.0 % 2 1 1 1
(240..) days: 44.4 % 18 8 8 1 9

 Age         Owner                  TLA  Line data    Source code
                                  1                 : /*-------------------------------------------------------------------------
                                  2                 :  *
                                  3                 :  * pg_recvlogical.c - receive data from a logical decoding slot in a streaming
                                  4                 :  *                    fashion and write it to a local file.
                                  5                 :  *
                                  6                 :  * Portions Copyright (c) 1996-2023, PostgreSQL Global Development Group
                                  7                 :  *
                                  8                 :  * IDENTIFICATION
                                  9                 :  *        src/bin/pg_basebackup/pg_recvlogical.c
                                 10                 :  *-------------------------------------------------------------------------
                                 11                 :  */
                                 12                 : 
                                 13                 : #include "postgres_fe.h"
                                 14                 : 
                                 15                 : #include <dirent.h>
                                 16                 : #include <limits.h>
                                 17                 : #include <sys/select.h>
                                 18                 : #include <sys/stat.h>
                                 19                 : #include <unistd.h>
                                 20                 : 
                                 21                 : #include "access/xlog_internal.h"
                                 22                 : #include "common/fe_memutils.h"
                                 23                 : #include "common/file_perm.h"
                                 24                 : #include "common/logging.h"
                                 25                 : #include "fe_utils/option_utils.h"
                                 26                 : #include "getopt_long.h"
                                 27                 : #include "libpq-fe.h"
                                 28                 : #include "libpq/pqsignal.h"
                                 29                 : #include "pqexpbuffer.h"
                                 30                 : #include "streamutil.h"
                                 31                 : 
                                 32                 : /* Time to sleep between reconnection attempts */
                                 33                 : #define RECONNECT_SLEEP_TIME 5
                                 34                 : 
                                 35                 : /* Global Options */
                                 36                 : static char *outfile = NULL;
                                 37                 : static int  verbose = 0;
                                 38                 : static bool two_phase = false;
                                 39                 : static int  noloop = 0;
                                 40                 : static int  standby_message_timeout = 10 * 1000;    /* 10 sec = default */
                                 41                 : static int  fsync_interval = 10 * 1000; /* 10 sec = default */
                                 42                 : static XLogRecPtr startpos = InvalidXLogRecPtr;
                                 43                 : static XLogRecPtr endpos = InvalidXLogRecPtr;
                                 44                 : static bool do_create_slot = false;
                                 45                 : static bool slot_exists_ok = false;
                                 46                 : static bool do_start_slot = false;
                                 47                 : static bool do_drop_slot = false;
                                 48                 : static char *replication_slot = NULL;
                                 49                 : 
                                 50                 : /* filled pairwise with option, value. value may be NULL */
                                 51                 : static char **options;
                                 52                 : static size_t noptions = 0;
                                 53                 : static const char *plugin = "test_decoding";
                                 54                 : 
                                 55                 : /* Global State */
                                 56                 : static int  outfd = -1;
                                 57                 : static volatile sig_atomic_t time_to_abort = false;
                                 58                 : static volatile sig_atomic_t output_reopen = false;
                                 59                 : static bool output_isfile;
                                 60                 : static TimestampTz output_last_fsync = -1;
                                 61                 : static bool output_needs_fsync = false;
                                 62                 : static XLogRecPtr output_written_lsn = InvalidXLogRecPtr;
                                 63                 : static XLogRecPtr output_fsync_lsn = InvalidXLogRecPtr;
                                 64                 : 
                                 65                 : static void usage(void);
                                 66                 : static void StreamLogicalLog(void);
                                 67                 : static bool flushAndSendFeedback(PGconn *conn, TimestampTz *now);
                                 68                 : static void prepareToTerminate(PGconn *conn, XLogRecPtr endpos,
                                 69                 :                                bool keepalive, XLogRecPtr lsn);
 3309 rhaas                      70 ECB             : 
                                 71                 : static void
 3309 rhaas                      72 CBC           1 : usage(void)
                                 73                 : {
 2942 bruce                      74               1 :     printf(_("%s controls PostgreSQL logical decoding streams.\n\n"),
 3309 rhaas                      75 ECB             :            progname);
 3309 rhaas                      76 CBC           1 :     printf(_("Usage:\n"));
                                 77               1 :     printf(_("  %s [OPTION]...\n"), progname);
 3101 peter_e                    78               1 :     printf(_("\nAction to be performed:\n"));
                                 79               1 :     printf(_("      --create-slot      create a new replication slot (for the slot's name see --slot)\n"));
                                 80               1 :     printf(_("      --drop-slot        drop the replication slot (for the slot's name see --slot)\n"));
                                 81               1 :     printf(_("      --start            start streaming in a replication slot (for the slot's name see --slot)\n"));
 3309 rhaas                      82               1 :     printf(_("\nOptions:\n"));
 2131 peter_e                    83               1 :     printf(_("  -E, --endpos=LSN       exit after receiving the specified LSN\n"));
 3101 peter_e                    84 GIC           1 :     printf(_("  -f, --file=FILE        receive log into this file, - for stdout\n"));
 3241 andres                     85 CBC           1 :     printf(_("  -F  --fsync-interval=SECS\n"
 3101 peter_e                    86 ECB             :              "                         time between fsyncs to the output file (default: %d)\n"), (fsync_interval / 1000));
 2762 peter_e                    87 CBC           1 :     printf(_("      --if-not-exists    do not error if slot already exists when creating a slot\n"));
 3101                            88               1 :     printf(_("  -I, --startpos=LSN     where in an existing slot should the streaming start\n"));
 3309 rhaas                      89 GIC           1 :     printf(_("  -n, --no-loop          do not loop on connection lost\n"));
 3101 peter_e                    90               1 :     printf(_("  -o, --option=NAME[=VALUE]\n"
 3101 peter_e                    91 ECB             :              "                         pass option NAME with optional value VALUE to the\n"
                                 92                 :              "                         output plugin\n"));
 3101 peter_e                    93 GIC           1 :     printf(_("  -P, --plugin=PLUGIN    use output plugin PLUGIN (default: %s)\n"), plugin);
 3101 peter_e                    94 CBC           1 :     printf(_("  -s, --status-interval=SECS\n"
 3101 peter_e                    95 ECB             :              "                         time between status packets sent to server (default: %d)\n"), (standby_message_timeout / 1000));
 3101 peter_e                    96 CBC           1 :     printf(_("  -S, --slot=SLOTNAME    name of the logical replication slot\n"));
  198 peter                      97               1 :     printf(_("  -t, --two-phase        enable decoding of prepared transactions when creating a slot\n"));
 3309 rhaas                      98               1 :     printf(_("  -v, --verbose          output verbose messages\n"));
                                 99               1 :     printf(_("  -V, --version          output version information, then exit\n"));
                                100               1 :     printf(_("  -?, --help             show this help, then exit\n"));
                                101               1 :     printf(_("\nConnection options:\n"));
                                102               1 :     printf(_("  -d, --dbname=DBNAME    database to connect to\n"));
                                103               1 :     printf(_("  -h, --host=HOSTNAME    database server host or socket directory\n"));
                                104               1 :     printf(_("  -p, --port=PORT        database server port number\n"));
                                105               1 :     printf(_("  -U, --username=NAME    connect as specified database user\n"));
                                106               1 :     printf(_("  -w, --no-password      never prompt for password\n"));
                                107               1 :     printf(_("  -W, --password         force password prompt (should happen automatically)\n"));
 1136 peter                     108               1 :     printf(_("\nReport bugs to <%s>.\n"), PACKAGE_BUGREPORT);
 1136 peter                     109 GIC           1 :     printf(_("%s home page: <%s>\n"), PACKAGE_NAME, PACKAGE_URL);
 3309 rhaas                     110               1 : }
                                111                 : 
                                112                 : /*
                                113                 :  * Send a Standby Status Update message to server.
 3309 rhaas                     114 ECB             :  */
                                115                 : static bool
 2236 tgl                       116 GIC          25 : sendFeedback(PGconn *conn, TimestampTz now, bool force, bool replyRequested)
                                117                 : {
                                118                 :     static XLogRecPtr last_written_lsn = InvalidXLogRecPtr;
                                119                 :     static XLogRecPtr last_fsync_lsn = InvalidXLogRecPtr;
 3309 rhaas                     120 ECB             : 
                                121                 :     char        replybuf[1 + 8 + 8 + 8 + 8 + 1];
 3309 rhaas                     122 GIC          25 :     int         len = 0;
                                123                 : 
                                124                 :     /*
                                125                 :      * we normally don't want to send superfluous feedback, but if it's
                                126                 :      * because of a timeout we need to, otherwise wal_sender_timeout will kill
 3260 bruce                     127 ECB             :      * us.
 3309 rhaas                     128 EUB             :      */
 3309 rhaas                     129 GBC          25 :     if (!force &&
 3309 rhaas                     130 UBC           0 :         last_written_lsn == output_written_lsn &&
 1061 noah                      131 UIC           0 :         last_fsync_lsn == output_fsync_lsn)
 3309 rhaas                     132 LBC           0 :         return true;
 3309 rhaas                     133 EUB             : 
 3309 rhaas                     134 GIC          25 :     if (verbose)
 1469 peter                     135 UIC           0 :         pg_log_info("confirming write up to %X/%X, flush to %X/%X (slot %s)",
                                136                 :                     LSN_FORMAT_ARGS(output_written_lsn),
                                137                 :                     LSN_FORMAT_ARGS(output_fsync_lsn),
 1418 tgl                       138 ECB             :                     replication_slot);
 3309 rhaas                     139                 : 
 3309 rhaas                     140 CBC          25 :     replybuf[len] = 'r';
                                141              25 :     len += 1;
 3260 bruce                     142              25 :     fe_sendint64(output_written_lsn, &replybuf[len]);   /* write */
 3309 rhaas                     143              25 :     len += 8;
 2118 tgl                       144              25 :     fe_sendint64(output_fsync_lsn, &replybuf[len]); /* flush */
 3309 rhaas                     145              25 :     len += 8;
 3260 bruce                     146              25 :     fe_sendint64(InvalidXLogRecPtr, &replybuf[len]);    /* apply */
 3309 rhaas                     147              25 :     len += 8;
 3260 bruce                     148              25 :     fe_sendint64(now, &replybuf[len]);  /* sendTime */
 3309 rhaas                     149              25 :     len += 8;
 2118 tgl                       150 GIC          25 :     replybuf[len] = replyRequested ? 1 : 0; /* replyRequested */
 3309 rhaas                     151 CBC          25 :     len += 1;
 3309 rhaas                     152 ECB             : 
 3309 rhaas                     153 CBC          25 :     startpos = output_written_lsn;
 3309 rhaas                     154 GIC          25 :     last_written_lsn = output_written_lsn;
 3309 rhaas                     155 CBC          25 :     last_fsync_lsn = output_fsync_lsn;
                                156                 : 
 3309 rhaas                     157 GBC          25 :     if (PQputCopyData(conn, replybuf, len) <= 0 || PQflush(conn))
                                158                 :     {
 1469 peter                     159 UBC           0 :         pg_log_error("could not send feedback packet: %s",
                                160                 :                      PQerrorMessage(conn));
 3309 rhaas                     161 UIC           0 :         return false;
 3309 rhaas                     162 ECB             :     }
                                163                 : 
 3309 rhaas                     164 GIC          25 :     return true;
                                165                 : }
 3309 rhaas                     166 ECB             : 
                                167                 : static void
 1562 peter                     168 CBC          47 : disconnect_atexit(void)
 3309 rhaas                     169 ECB             : {
 3309 rhaas                     170 CBC          47 :     if (conn != NULL)
 3309 rhaas                     171 GIC          24 :         PQfinish(conn);
                                172              47 : }
 3309 rhaas                     173 ECB             : 
                                174                 : static bool
 2236 tgl                       175 CBC          25 : OutputFsync(TimestampTz now)
                                176                 : {
 3309 rhaas                     177              25 :     output_last_fsync = now;
                                178                 : 
                                179              25 :     output_fsync_lsn = output_written_lsn;
 3309 rhaas                     180 EUB             : 
 3309 rhaas                     181 GIC          25 :     if (fsync_interval <= 0)
 3309 rhaas                     182 LBC           0 :         return true;
 3309 rhaas                     183 ECB             : 
 3251 heikki.linnakangas        184 GIC          25 :     if (!output_needs_fsync)
 3309 rhaas                     185 CBC          19 :         return true;
                                186                 : 
 3251 heikki.linnakangas        187 GIC           6 :     output_needs_fsync = false;
 3309 rhaas                     188 ECB             : 
 2833 andres                    189                 :     /* can only fsync if it's a regular file */
 2833 andres                    190 GIC           6 :     if (!output_isfile)
 2833 andres                    191 CBC           4 :         return true;
 2833 andres                    192 EUB             : 
 2833 andres                    193 GIC           2 :     if (fsync(outfd) != 0)
  366 tgl                       194 LBC           0 :         pg_fatal("could not fsync file \"%s\": %m", outfile);
                                195                 : 
 3309 rhaas                     196 GIC           2 :     return true;
                                197                 : }
                                198                 : 
                                199                 : /*
                                200                 :  * Start the log streaming
 3309 rhaas                     201 ECB             :  */
                                202                 : static void
 3114 andres                    203 GIC          23 : StreamLogicalLog(void)
 3309 rhaas                     204 ECB             : {
                                205                 :     PGresult   *res;
 3309 rhaas                     206 GIC          23 :     char       *copybuf = NULL;
 2236 tgl                       207              23 :     TimestampTz last_status = -1;
                                208                 :     int         i;
 3309 rhaas                     209 ECB             :     PQExpBuffer query;
                                210                 : 
 3309 rhaas                     211 GIC          23 :     output_written_lsn = InvalidXLogRecPtr;
                                212              23 :     output_fsync_lsn = InvalidXLogRecPtr;
                                213                 : 
                                214                 :     /*
 3309 rhaas                     215 ECB             :      * Connect in replication mode to the server
 3309 rhaas                     216 EUB             :      */
 3309 rhaas                     217 CBC          23 :     if (!conn)
 3309 rhaas                     218 UIC           0 :         conn = GetConnection();
 3309 rhaas                     219 GBC          23 :     if (!conn)
                                220                 :         /* Error message already written in GetConnection() */
 3309 rhaas                     221 UIC           0 :         return;
                                222                 : 
                                223                 :     /*
 3309 rhaas                     224 ECB             :      * Start the replication
 3309 rhaas                     225 EUB             :      */
 3309 rhaas                     226 GIC          23 :     if (verbose)
 1469 peter                     227 UIC           0 :         pg_log_info("starting log streaming at %X/%X (slot %s)",
                                228                 :                     LSN_FORMAT_ARGS(startpos),
                                229                 :                     replication_slot);
 3309 rhaas                     230 ECB             : 
                                231                 :     /* Initiate the replication stream at specified location */
  504 tgl                       232 CBC          23 :     query = createPQExpBuffer();
 3309 rhaas                     233 GIC          23 :     appendPQExpBuffer(query, "START_REPLICATION SLOT \"%s\" LOGICAL %X/%X",
  775 peter                     234              23 :                       replication_slot, LSN_FORMAT_ARGS(startpos));
 3309 rhaas                     235 ECB             : 
                                236                 :     /* print options if there are any */
 3309 rhaas                     237 GIC          23 :     if (noptions)
 3309 rhaas                     238 CBC          20 :         appendPQExpBufferStr(query, " (");
                                239                 : 
 3309 rhaas                     240 GIC          63 :     for (i = 0; i < noptions; i++)
 3309 rhaas                     241 ECB             :     {
                                242                 :         /* separator */
 3309 rhaas                     243 GIC          40 :         if (i > 0)
                                244              20 :             appendPQExpBufferStr(query, ", ");
 3309 rhaas                     245 ECB             : 
                                246                 :         /* write option name */
 3309 rhaas                     247 GIC          40 :         appendPQExpBuffer(query, "\"%s\"", options[(i * 2)]);
 3309 rhaas                     248 ECB             : 
                                249                 :         /* write option value if specified */
 3309 rhaas                     250 GIC          40 :         if (options[(i * 2) + 1] != NULL)
                                251              40 :             appendPQExpBuffer(query, " '%s'", options[(i * 2) + 1]);
 3309 rhaas                     252 ECB             :     }
                                253                 : 
 3309 rhaas                     254 GIC          23 :     if (noptions)
 3309 rhaas                     255 CBC          20 :         appendPQExpBufferChar(query, ')');
 3309 rhaas                     256 ECB             : 
 3309 rhaas                     257 GIC          23 :     res = PQexec(conn, query->data);
 3309 rhaas                     258 CBC          23 :     if (PQresultStatus(res) != PGRES_COPY_BOTH)
                                259                 :     {
 1469 peter                     260               6 :         pg_log_error("could not send replication command \"%s\": %s",
 1469 peter                     261 ECB             :                      query->data, PQresultErrorMessage(res));
 3309 rhaas                     262 GIC           6 :         PQclear(res);
 3309 rhaas                     263 CBC           6 :         goto error;
 3309 rhaas                     264 ECB             :     }
 3309 rhaas                     265 GIC          17 :     PQclear(res);
 3309 rhaas                     266 CBC          17 :     resetPQExpBuffer(query);
 3309 rhaas                     267 EUB             : 
 3309 rhaas                     268 GIC          17 :     if (verbose)
 1469 peter                     269 LBC           0 :         pg_log_info("streaming initiated");
                                270                 : 
 3309 rhaas                     271 GIC         640 :     while (!time_to_abort)
                                272                 :     {
                                273                 :         int         r;
                                274                 :         int         bytes_left;
                                275                 :         int         bytes_written;
 2236 tgl                       276 ECB             :         TimestampTz now;
                                277                 :         int         hdr_len;
 2286 simon                     278 CBC         639 :         XLogRecPtr  cur_record_lsn = InvalidXLogRecPtr;
                                279                 : 
 3309 rhaas                     280             639 :         if (copybuf != NULL)
 3309 rhaas                     281 ECB             :         {
 3309 rhaas                     282 GIC         380 :             PQfreemem(copybuf);
                                283             380 :             copybuf = NULL;
                                284                 :         }
                                285                 : 
                                286                 :         /*
 1029 andres                    287 ECB             :          * Potentially send a status message to the primary.
                                288                 :          */
 3309 rhaas                     289 CBC         639 :         now = feGetCurrentTimestamp();
 3309 rhaas                     290 ECB             : 
 3309 rhaas                     291 GIC        1261 :         if (outfd != -1 &&
                                292             622 :             feTimestampDifferenceExceeds(output_last_fsync, now,
 3309 rhaas                     293 ECB             :                                          fsync_interval))
                                294                 :         {
 3309 rhaas                     295 GIC          17 :             if (!OutputFsync(now))
                                296               2 :                 goto error;
 3309 rhaas                     297 ECB             :         }
                                298                 : 
 3309 rhaas                     299 GIC        1278 :         if (standby_message_timeout > 0 &&
                                300             639 :             feTimestampDifferenceExceeds(last_status, now,
                                301                 :                                          standby_message_timeout))
 3309 rhaas                     302 ECB             :         {
 3309 rhaas                     303 EUB             :             /* Time to send feedback! */
 3309 rhaas                     304 GIC          17 :             if (!sendFeedback(conn, now, true, false))
 3309 rhaas                     305 LBC           0 :                 goto error;
                                306                 : 
 3309 rhaas                     307 GIC          17 :             last_status = now;
                                308                 :         }
 3309 rhaas                     309 ECB             : 
                                310                 :         /* got SIGHUP, close output file */
 3251 heikki.linnakangas        311 GBC         639 :         if (outfd != -1 && output_reopen && strcmp(outfile, "-") != 0)
 3251 heikki.linnakangas        312 EUB             :         {
 3251 heikki.linnakangas        313 UBC           0 :             now = feGetCurrentTimestamp();
                                314               0 :             if (!OutputFsync(now))
                                315               0 :                 goto error;
 3251 heikki.linnakangas        316 UIC           0 :             close(outfd);
 3251 heikki.linnakangas        317 LBC           0 :             outfd = -1;
                                318                 :         }
 3251 heikki.linnakangas        319 GIC         639 :         output_reopen = false;
 3251 heikki.linnakangas        320 ECB             : 
                                321                 :         /* open the output file, if not open yet */
 3250 heikki.linnakangas        322 GIC         639 :         if (outfd == -1)
                                323                 :         {
 2833 andres                    324 ECB             :             struct stat statbuf;
                                325                 : 
 3250 heikki.linnakangas        326 GIC          17 :             if (strcmp(outfile, "-") == 0)
 3250 heikki.linnakangas        327 GBC          17 :                 outfd = fileno(stdout);
                                328                 :             else
 3250 heikki.linnakangas        329 LBC           0 :                 outfd = open(outfile, O_CREAT | O_APPEND | O_WRONLY | PG_BINARY,
                                330                 :                              S_IRUSR | S_IWUSR);
 3250 heikki.linnakangas        331 GBC          17 :             if (outfd == -1)
 3250 heikki.linnakangas        332 EUB             :             {
 1469 peter                     333 UIC           0 :                 pg_log_error("could not open log file \"%s\": %m", outfile);
 3250 heikki.linnakangas        334               0 :                 goto error;
 3250 heikki.linnakangas        335 ECB             :             }
                                336                 : 
 2833 andres                    337 GBC          17 :             if (fstat(outfd, &statbuf) != 0)
  619 michael                   338 EUB             :             {
 1469 peter                     339 UIC           0 :                 pg_log_error("could not stat file \"%s\": %m", outfile);
  619 michael                   340               0 :                 goto error;
  619 michael                   341 ECB             :             }
                                342                 : 
 2833 andres                    343 GIC          17 :             output_isfile = S_ISREG(statbuf.st_mode) && !isatty(outfd);
 3250 heikki.linnakangas        344 ECB             :         }
                                345                 : 
 3309 rhaas                     346 CBC         639 :         r = PQgetCopyData(conn, &copybuf, 1);
 3309 rhaas                     347 GIC         639 :         if (r == 0)
                                348             242 :         {
                                349                 :             /*
                                350                 :              * In async mode, and no data available. We block on reading but
                                351                 :              * not more than the specified timeout, so that we can send a
                                352                 :              * response back to the client.
 3309 rhaas                     353 ECB             :              */
                                354                 :             fd_set      input_mask;
 2236 tgl                       355 GIC         245 :             TimestampTz message_target = 0;
 2236 tgl                       356 CBC         245 :             TimestampTz fsync_target = 0;
                                357                 :             struct timeval timeout;
 3309 rhaas                     358             245 :             struct timeval *timeoutptr = NULL;
                                359                 : 
 2588 peter_e                   360 GBC         245 :             if (PQsocket(conn) < 0)
 2588 peter_e                   361 ECB             :             {
 1469 peter                     362 UIC           0 :                 pg_log_error("invalid socket: %s", PQerrorMessage(conn));
 2588 peter_e                   363 GIC           2 :                 goto error;
 2588 peter_e                   364 ECB             :             }
                                365                 : 
 3309 rhaas                     366 GIC        4165 :             FD_ZERO(&input_mask);
                                367             245 :             FD_SET(PQsocket(conn), &input_mask);
 3309 rhaas                     368 ECB             : 
                                369                 :             /* Compute when we need to wakeup to send a keepalive message. */
 3309 rhaas                     370 GIC         245 :             if (standby_message_timeout)
                                371             245 :                 message_target = last_status + (standby_message_timeout - 1) *
                                372                 :                     ((int64) 1000);
 3309 rhaas                     373 ECB             : 
                                374                 :             /* Compute when we need to wakeup to fsync the output file. */
 3251 heikki.linnakangas        375 GIC         245 :             if (fsync_interval > 0 && output_needs_fsync)
 3309 rhaas                     376              91 :                 fsync_target = output_last_fsync + (fsync_interval - 1) *
                                377                 :                     ((int64) 1000);
 3309 rhaas                     378 ECB             : 
                                379                 :             /* Now compute when to wakeup. */
 3309 rhaas                     380 GIC         245 :             if (message_target > 0 || fsync_target > 0)
                                381                 :             {
                                382                 :                 TimestampTz targettime;
                                383                 :                 long        secs;
 3309 rhaas                     384 ECB             :                 int         usecs;
                                385                 : 
 3309 rhaas                     386 CBC         245 :                 targettime = message_target;
 3309 rhaas                     387 EUB             : 
 3309 rhaas                     388 GIC         245 :                 if (fsync_target > 0 && fsync_target < targettime)
 3309 rhaas                     389 LBC           0 :                     targettime = fsync_target;
                                390                 : 
 3309 rhaas                     391 GIC         245 :                 feTimestampDifference(now,
                                392                 :                                       targettime,
 3309 rhaas                     393 ECB             :                                       &secs,
 3309 rhaas                     394 EUB             :                                       &usecs);
 3309 rhaas                     395 GIC         245 :                 if (secs <= 0)
 3309 rhaas                     396 LBC           0 :                     timeout.tv_sec = 1; /* Always sleep at least 1 sec */
 3309 rhaas                     397 ECB             :                 else
 3309 rhaas                     398 CBC         245 :                     timeout.tv_sec = secs;
 3309 rhaas                     399 GIC         245 :                 timeout.tv_usec = usecs;
                                400             245 :                 timeoutptr = &timeout;
 3309 rhaas                     401 ECB             :             }
                                402                 : 
 3309 rhaas                     403 GIC         245 :             r = select(PQsocket(conn) + 1, &input_mask, NULL, NULL, timeoutptr);
                                404             245 :             if (r == 0 || (r < 0 && errno == EINTR))
                                405                 :             {
                                406                 :                 /*
                                407                 :                  * Got a timeout or signal. Continue the loop and either
                                408                 :                  * deliver a status packet to the server or just go back into
 3309 rhaas                     409 ECB             :                  * blocking.
                                410                 :                  */
 3309 rhaas                     411 CBC         243 :                 continue;
                                412                 :             }
 3309 rhaas                     413 GBC         244 :             else if (r < 0)
 3309 rhaas                     414 EUB             :             {
  716 peter                     415 UIC           0 :                 pg_log_error("%s() failed: %m", "select");
 3309 rhaas                     416               0 :                 goto error;
                                417                 :             }
 3309 rhaas                     418 ECB             : 
                                419                 :             /* Else there is actually data on the socket */
 3309 rhaas                     420 CBC         244 :             if (PQconsumeInput(conn) == 0)
                                421                 :             {
 1469 peter                     422               2 :                 pg_log_error("could not receive data from WAL stream: %s",
                                423                 :                              PQerrorMessage(conn));
 3309 rhaas                     424               2 :                 goto error;
                                425                 :             }
 3309 rhaas                     426 GIC         242 :             continue;
                                427                 :         }
 3309 rhaas                     428 ECB             : 
                                429                 :         /* End of copy stream */
 3309 rhaas                     430 GIC         394 :         if (r == -1)
                                431              14 :             break;
 3309 rhaas                     432 ECB             : 
                                433                 :         /* Failure while reading the copy stream */
 3309 rhaas                     434 GBC         387 :         if (r == -2)
                                435                 :         {
 1469 peter                     436 UBC           0 :             pg_log_error("could not read COPY data: %s",
                                437                 :                          PQerrorMessage(conn));
 3309 rhaas                     438 UIC           0 :             goto error;
                                439                 :         }
 3309 rhaas                     440 ECB             : 
                                441                 :         /* Check the message type. */
 3309 rhaas                     442 GIC         387 :         if (copybuf[0] == 'k')
                                443             297 :         {
                                444                 :             int         pos;
 3309 rhaas                     445 ECB             :             bool        replyRequested;
                                446                 :             XLogRecPtr  walEnd;
 2286 simon                     447 GIC         299 :             bool        endposReached = false;
                                448                 : 
                                449                 :             /*
                                450                 :              * Parse the keepalive message, enclosed in the CopyData message.
                                451                 :              * We just check if the server requested a reply, and ignore the
 3309 rhaas                     452 ECB             :              * rest.
                                453                 :              */
 3309 rhaas                     454 CBC         299 :             pos = 1;            /* skip msgtype 'k' */
 3309 rhaas                     455 GIC         299 :             walEnd = fe_recvint64(&copybuf[pos]);
 3309 rhaas                     456 CBC         299 :             output_written_lsn = Max(walEnd, output_written_lsn);
                                457                 : 
                                458             299 :             pos += 8;           /* read walEnd */
                                459                 : 
                                460             299 :             pos += 8;           /* skip sendTime */
                                461                 : 
 3309 rhaas                     462 GBC         299 :             if (r < pos + 1)
 3309 rhaas                     463 EUB             :             {
 1469 peter                     464 UIC           0 :                 pg_log_error("streaming header too small: %d", r);
 3309 rhaas                     465 LBC           0 :                 goto error;
                                466                 :             }
 3309 rhaas                     467 CBC         299 :             replyRequested = copybuf[pos];
                                468                 : 
 2286 simon                     469 GIC         299 :             if (endpos != InvalidXLogRecPtr && walEnd >= endpos)
                                470                 :             {
                                471                 :                 /*
                                472                 :                  * If there's nothing to read on the socket until a keepalive
                                473                 :                  * we know that the server has nothing to send us; and if
                                474                 :                  * walEnd has passed endpos, we know nothing else can have
 2286 simon                     475 ECB             :                  * committed before endpos.  So we can bail out now.
                                476                 :                  */
 2286 simon                     477 GIC           2 :                 endposReached = true;
                                478                 :             }
 3309 rhaas                     479 ECB             : 
                                480                 :             /* Send a reply, if necessary */
 2286 simon                     481 CBC         299 :             if (replyRequested || endposReached)
 2286 simon                     482 EUB             :             {
 2286 simon                     483 CBC           3 :                 if (!flushAndSendFeedback(conn, &now))
 3309 rhaas                     484 UIC           0 :                     goto error;
 3309 rhaas                     485 GIC           3 :                 last_status = now;
 3309 rhaas                     486 ECB             :             }
                                487                 : 
 2286 simon                     488 CBC         299 :             if (endposReached)
 2286 simon                     489 ECB             :             {
 2286 simon                     490 CBC           2 :                 prepareToTerminate(conn, endpos, true, InvalidXLogRecPtr);
 2286 simon                     491 GIC           2 :                 time_to_abort = true;
                                492               2 :                 break;
 2286 simon                     493 ECB             :             }
                                494                 : 
 3309 rhaas                     495 CBC         297 :             continue;
                                496                 :         }
 3309 rhaas                     497 GBC          88 :         else if (copybuf[0] != 'w')
                                498                 :         {
 1469 peter                     499 UBC           0 :             pg_log_error("unrecognized streaming header: \"%c\"",
                                500                 :                          copybuf[0]);
 3309 rhaas                     501 UIC           0 :             goto error;
                                502                 :         }
                                503                 : 
                                504                 :         /*
                                505                 :          * Read the header of the XLogData message, enclosed in the CopyData
                                506                 :          * message. We only need the WAL location field (dataStart), the rest
 3309 rhaas                     507 ECB             :          * of the header is ignored.
                                508                 :          */
 3309 rhaas                     509 CBC          88 :         hdr_len = 1;            /* msgtype 'w' */
                                510              88 :         hdr_len += 8;           /* dataStart */
                                511              88 :         hdr_len += 8;           /* walEnd */
 3309 rhaas                     512 GIC          88 :         hdr_len += 8;           /* sendTime */
 3309 rhaas                     513 GBC          88 :         if (r < hdr_len + 1)
 3309 rhaas                     514 EUB             :         {
 1469 peter                     515 UIC           0 :             pg_log_error("streaming header too small: %d", r);
 3309 rhaas                     516               0 :             goto error;
                                517                 :         }
 3309 rhaas                     518 ECB             : 
                                519                 :         /* Extract WAL location for this block */
 2286 simon                     520 CBC          88 :         cur_record_lsn = fe_recvint64(&copybuf[1]);
                                521                 : 
 2286 simon                     522 GIC          88 :         if (endpos != InvalidXLogRecPtr && cur_record_lsn > endpos)
                                523                 :         {
                                524                 :             /*
                                525                 :              * We've read past our endpoint, so prepare to go away being
 2286 simon                     526 EUB             :              * cautious about what happens to our output data.
                                527                 :              */
 2286 simon                     528 UBC           0 :             if (!flushAndSendFeedback(conn, &now))
                                529               0 :                 goto error;
                                530               0 :             prepareToTerminate(conn, endpos, false, cur_record_lsn);
 2286 simon                     531 UIC           0 :             time_to_abort = true;
                                532               0 :             break;
 3309 rhaas                     533 ECB             :         }
                                534                 : 
 2286 simon                     535 CBC          88 :         output_written_lsn = Max(cur_record_lsn, output_written_lsn);
 2286 simon                     536 ECB             : 
 3309 rhaas                     537 GIC          88 :         bytes_left = r - hdr_len;
                                538              88 :         bytes_written = 0;
 3309 rhaas                     539 ECB             : 
                                540                 :         /* signal that a fsync is needed */
 3251 heikki.linnakangas        541 CBC          88 :         output_needs_fsync = true;
                                542                 : 
 3309 rhaas                     543 GIC         176 :         while (bytes_left)
                                544                 :         {
 3309 rhaas                     545 ECB             :             int         ret;
                                546                 : 
 3309 rhaas                     547 GIC         176 :             ret = write(outfd,
                                548              88 :                         copybuf + hdr_len + bytes_written,
 3309 rhaas                     549 ECB             :                         bytes_left);
                                550                 : 
 3309 rhaas                     551 GBC          88 :             if (ret < 0)
                                552                 :             {
  508 peter                     553 UBC           0 :                 pg_log_error("could not write %d bytes to log file \"%s\": %m",
                                554                 :                              bytes_left, outfile);
 3309 rhaas                     555 UIC           0 :                 goto error;
                                556                 :             }
 3309 rhaas                     557 ECB             : 
                                558                 :             /* Write was successful, advance our position */
 3309 rhaas                     559 GIC          88 :             bytes_written += ret;
                                560              88 :             bytes_left -= ret;
 3309 rhaas                     561 ECB             :         }
                                562                 : 
 3309 rhaas                     563 GBC          88 :         if (write(outfd, "\n", 1) != 1)
                                564                 :         {
  508 peter                     565 UBC           0 :             pg_log_error("could not write %d bytes to log file \"%s\": %m",
                                566                 :                          1, outfile);
 3309 rhaas                     567 UIC           0 :             goto error;
 3309 rhaas                     568 ECB             :         }
                                569                 : 
 2286 simon                     570 GIC          88 :         if (endpos != InvalidXLogRecPtr && cur_record_lsn == endpos)
 2286 simon                     571 ECB             :         {
 2286 simon                     572 EUB             :             /* endpos was exactly the record we just processed, we're done */
 2286 simon                     573 CBC           5 :             if (!flushAndSendFeedback(conn, &now))
 2286 simon                     574 LBC           0 :                 goto error;
 2286 simon                     575 CBC           5 :             prepareToTerminate(conn, endpos, false, cur_record_lsn);
 2286 simon                     576 GIC           5 :             time_to_abort = true;
                                577               5 :             break;
                                578                 :         }
 3309 rhaas                     579 ECB             :     }
                                580                 : 
 3309 rhaas                     581 GIC          15 :     res = PQgetResult(conn);
 2286 simon                     582 CBC          15 :     if (PQresultStatus(res) == PGRES_COPY_OUT)
                                583                 :     {
 1061 noah                      584 GIC           7 :         PQclear(res);
                                585                 : 
                                586                 :         /*
                                587                 :          * We're doing a client-initiated clean exit and have sent CopyDone to
                                588                 :          * the server. Drain any messages, so we don't miss a last-minute
                                589                 :          * ErrorResponse. The walsender stops generating XLogData records once
                                590                 :          * it sees CopyDone, so expect this to finish quickly. After CopyDone,
                                591                 :          * it's too late for sendFeedback(), even if this were to take a long
                                592                 :          * time. Hence, use synchronous-mode PQgetCopyData().
 2286 simon                     593 ECB             :          */
                                594                 :         while (1)
 1061 noah                      595 GIC           3 :         {
 1061 noah                      596 ECB             :             int         r;
                                597                 : 
 1061 noah                      598 CBC          10 :             if (copybuf != NULL)
 1061 noah                      599 ECB             :             {
 1061 noah                      600 GIC          10 :                 PQfreemem(copybuf);
 1061 noah                      601 CBC          10 :                 copybuf = NULL;
 1061 noah                      602 ECB             :             }
 1061 noah                      603 CBC          10 :             r = PQgetCopyData(conn, &copybuf, 0);
                                604              10 :             if (r == -1)
 1061 noah                      605 GIC           7 :                 break;
 1061 noah                      606 GBC           3 :             if (r == -2)
                                607                 :             {
 1061 noah                      608 UBC           0 :                 pg_log_error("could not read COPY data: %s",
 1061 noah                      609 EUB             :                              PQerrorMessage(conn));
 1061 noah                      610 UIC           0 :                 time_to_abort = false;  /* unclean exit */
                                611               0 :                 goto error;
                                612                 :             }
 1061 noah                      613 ECB             :         }
                                614                 : 
 1061 noah                      615 CBC           7 :         res = PQgetResult(conn);
                                616                 :     }
                                617              15 :     if (PQresultStatus(res) != PGRES_COMMAND_OK)
                                618                 :     {
 1469 peter                     619               7 :         pg_log_error("unexpected termination of replication stream: %s",
                                620                 :                      PQresultErrorMessage(res));
 3309 rhaas                     621               7 :         goto error;
                                622                 :     }
                                623               8 :     PQclear(res);
                                624                 : 
 3309 rhaas                     625 GBC           8 :     if (outfd != -1 && strcmp(outfile, "-") != 0)
                                626                 :     {
 2236 tgl                       627 UIC           0 :         TimestampTz t = feGetCurrentTimestamp();
 3309 rhaas                     628 EUB             : 
                                629                 :         /* no need to jump to error on failure here, we're finishing anyway */
 3309 rhaas                     630 UBC           0 :         OutputFsync(t);
 3309 rhaas                     631 EUB             : 
 3309 rhaas                     632 UIC           0 :         if (close(outfd) != 0)
 1469 peter                     633 LBC           0 :             pg_log_error("could not close file \"%s\": %m", outfile);
 3309 rhaas                     634 ECB             :     }
 3309 rhaas                     635 CBC           8 :     outfd = -1;
 3309 rhaas                     636 GIC          23 : error:
 3261 heikki.linnakangas        637 GBC          23 :     if (copybuf != NULL)
 3261 heikki.linnakangas        638 EUB             :     {
 3261 heikki.linnakangas        639 UIC           0 :         PQfreemem(copybuf);
 3261 heikki.linnakangas        640 LBC           0 :         copybuf = NULL;
 3261 heikki.linnakangas        641 ECB             :     }
 3309 rhaas                     642 CBC          23 :     destroyPQExpBuffer(query);
 3309 rhaas                     643 GIC          23 :     PQfinish(conn);
                                644              23 :     conn = NULL;
                                645                 : }
                                646                 : 
                                647                 : /*
                                648                 :  * Unfortunately we can't do sensible signal handling on windows...
                                649                 :  */
                                650                 : #ifndef WIN32
                                651                 : 
                                652                 : /*
                                653                 :  * When SIGINT/SIGTERM are caught, just tell the system to exit at the next
                                654                 :  * possible moment.
 3309 rhaas                     655 ECB             :  */
                                656                 : static void
  207 tgl                       657 GNC           1 : sigexit_handler(SIGNAL_ARGS)
 3309 rhaas                     658 ECB             : {
 3309 rhaas                     659 GIC           1 :     time_to_abort = true;
                                660               1 : }
                                661                 : 
                                662                 : /*
                                663                 :  * Trigger the output file to be reopened.
 3309 rhaas                     664 EUB             :  */
                                665                 : static void
  207 tgl                       666 UNC           0 : sighup_handler(SIGNAL_ARGS)
 3309 rhaas                     667 EUB             : {
 3309 rhaas                     668 UIC           0 :     output_reopen = true;
                                669               0 : }
                                670                 : #endif
                                671                 : 
 3309 rhaas                     672 ECB             : 
                                673                 : int
 3309 rhaas                     674 GIC          55 : main(int argc, char **argv)
                                675                 : {
                                676                 :     static struct option long_options[] = {
                                677                 : /* general options */
                                678                 :         {"file", required_argument, NULL, 'f'},
                                679                 :         {"fsync-interval", required_argument, NULL, 'F'},
                                680                 :         {"no-loop", no_argument, NULL, 'n'},
                                681                 :         {"verbose", no_argument, NULL, 'v'},
                                682                 :         {"two-phase", no_argument, NULL, 't'},
                                683                 :         {"version", no_argument, NULL, 'V'},
                                684                 :         {"help", no_argument, NULL, '?'},
                                685                 : /* connection options */
                                686                 :         {"dbname", required_argument, NULL, 'd'},
                                687                 :         {"host", required_argument, NULL, 'h'},
                                688                 :         {"port", required_argument, NULL, 'p'},
                                689                 :         {"username", required_argument, NULL, 'U'},
                                690                 :         {"no-password", no_argument, NULL, 'w'},
                                691                 :         {"password", no_argument, NULL, 'W'},
                                692                 : /* replication options */
                                693                 :         {"startpos", required_argument, NULL, 'I'},
                                694                 :         {"endpos", required_argument, NULL, 'E'},
                                695                 :         {"option", required_argument, NULL, 'o'},
                                696                 :         {"plugin", required_argument, NULL, 'P'},
                                697                 :         {"status-interval", required_argument, NULL, 's'},
                                698                 :         {"slot", required_argument, NULL, 'S'},
                                699                 : /* action */
                                700                 :         {"create-slot", no_argument, NULL, 1},
                                701                 :         {"start", no_argument, NULL, 2},
                                702                 :         {"drop-slot", no_argument, NULL, 3},
                                703                 :         {"if-not-exists", no_argument, NULL, 4},
                                704                 :         {NULL, 0, NULL, 0}
                                705                 :     };
                                706                 :     int         c;
                                707                 :     int         option_index;
                                708                 :     uint32      hi,
                                709                 :                 lo;
 3112 andres                    710 ECB             :     char       *db_name;
 3309 rhaas                     711                 : 
 1469 peter                     712 CBC          55 :     pg_logging_init(argv[0]);
 3309 rhaas                     713 GIC          55 :     progname = get_progname(argv[0]);
 2659 alvherre                  714 CBC          55 :     set_pglocale_pgservice(argv[0], PG_TEXTDOMAIN("pg_basebackup"));
                                715                 : 
 3309 rhaas                     716              55 :     if (argc > 1)
                                717                 :     {
                                718              54 :         if (strcmp(argv[1], "--help") == 0 || strcmp(argv[1], "-?") == 0)
 3309 rhaas                     719 ECB             :         {
 3309 rhaas                     720 GIC           1 :             usage();
 3309 rhaas                     721 CBC           1 :             exit(0);
 3309 rhaas                     722 ECB             :         }
 3309 rhaas                     723 GIC          53 :         else if (strcmp(argv[1], "-V") == 0 ||
 3309 rhaas                     724 CBC          53 :                  strcmp(argv[1], "--version") == 0)
 3309 rhaas                     725 ECB             :         {
 3309 rhaas                     726 GIC           1 :             puts("pg_recvlogical (PostgreSQL) " PG_VERSION);
                                727               1 :             exit(0);
                                728                 :         }
 3309 rhaas                     729 ECB             :     }
                                730                 : 
  118 peter                     731 GNC         321 :     while ((c = getopt_long(argc, argv, "E:f:F:ntvd:h:p:U:wWI:o:P:s:S:",
 3309 rhaas                     732 CBC         321 :                             long_options, &option_index)) != -1)
                                733                 :     {
 3309 rhaas                     734 GIC         269 :         switch (c)
 3309 rhaas                     735 ECB             :         {
                                736                 : /* general options */
 3309 rhaas                     737 CBC          24 :             case 'f':
 3309 rhaas                     738 GBC          24 :                 outfile = pg_strdup(optarg);
                                739              24 :                 break;
 3241 andres                    740 UIC           0 :             case 'F':
  624 michael                   741               0 :                 if (!option_parse_int(optarg, "-F/--fsync-interval", 0,
  624 michael                   742 EUB             :                                       INT_MAX / 1000,
                                743                 :                                       &fsync_interval))
 3241 andres                    744 UBC           0 :                     exit(1);
  624 michael                   745 LBC           0 :                 fsync_interval *= 1000;
 3241 andres                    746               0 :                 break;
 3309 rhaas                     747 CBC          23 :             case 'n':
                                748              23 :                 noloop = 1;
                                749              23 :                 break;
  648 akapila                   750 GBC           2 :             case 't':
  648 akapila                   751 GIC           2 :                 two_phase = true;
  648 akapila                   752 CBC           2 :                 break;
  118 peter                     753 UNC           0 :             case 'v':
                                754               0 :                 verbose++;
                                755               0 :                 break;
 2905 andres                    756 ECB             : /* connection options */
 3309 rhaas                     757 CBC          50 :             case 'd':
 3309 rhaas                     758 GBC          50 :                 dbname = pg_strdup(optarg);
                                759              50 :                 break;
 3309 rhaas                     760 UBC           0 :             case 'h':
                                761               0 :                 dbhost = pg_strdup(optarg);
                                762               0 :                 break;
                                763               0 :             case 'p':
                                764               0 :                 dbport = pg_strdup(optarg);
                                765               0 :                 break;
                                766               0 :             case 'U':
                                767               0 :                 dbuser = pg_strdup(optarg);
                                768               0 :                 break;
                                769               0 :             case 'w':
                                770               0 :                 dbgetpassword = -1;
                                771               0 :                 break;
                                772               0 :             case 'W':
 3309 rhaas                     773 UIC           0 :                 dbgetpassword = 1;
 3309 rhaas                     774 UBC           0 :                 break;
 3309 rhaas                     775 EUB             : /* replication options */
 3241 andres                    776 UBC           0 :             case 'I':
                                777               0 :                 if (sscanf(optarg, "%X/%X", &hi, &lo) != 2)
  366 tgl                       778               0 :                     pg_fatal("could not parse start position \"%s\"", optarg);
 3241 andres                    779 LBC           0 :                 startpos = ((uint64) hi) << 32 | lo;
                                780               0 :                 break;
 2286 simon                     781 GBC           8 :             case 'E':
 2286 simon                     782 CBC           8 :                 if (sscanf(optarg, "%X/%X", &hi, &lo) != 2)
  366 tgl                       783 LBC           0 :                     pg_fatal("could not parse end position \"%s\"", optarg);
 2286 simon                     784 CBC           8 :                 endpos = ((uint64) hi) << 32 | lo;
 2286 simon                     785 GIC           8 :                 break;
 3309 rhaas                     786 CBC          40 :             case 'o':
 3309 rhaas                     787 ECB             :                 {
 3260 bruce                     788 GIC          40 :                     char       *data = pg_strdup(optarg);
 3260 bruce                     789 CBC          40 :                     char       *val = strchr(data, '=');
                                790                 : 
 3309 rhaas                     791 GIC          40 :                     if (val != NULL)
 3309 rhaas                     792 ECB             :                     {
                                793                 :                         /* remove =; separate data from val */
 3309 rhaas                     794 GIC          40 :                         *val = '\0';
                                795              40 :                         val++;
 3309 rhaas                     796 ECB             :                     }
                                797                 : 
 3309 rhaas                     798 GIC          40 :                     noptions += 1;
 3260 bruce                     799 CBC          40 :                     options = pg_realloc(options, sizeof(char *) * noptions * 2);
 3309 rhaas                     800 ECB             : 
 3309 rhaas                     801 GIC          40 :                     options[(noptions - 1) * 2] = data;
                                802              40 :                     options[(noptions - 1) * 2 + 1] = val;
 3309 rhaas                     803 ECB             :                 }
                                804                 : 
 3309 rhaas                     805 CBC          40 :                 break;
                                806              21 :             case 'P':
 3309 rhaas                     807 GBC          21 :                 plugin = pg_strdup(optarg);
                                808              21 :                 break;
 3309 rhaas                     809 UIC           0 :             case 's':
  624 michael                   810               0 :                 if (!option_parse_int(optarg, "-s/--status-interval", 0,
  624 michael                   811 EUB             :                                       INT_MAX / 1000,
                                812                 :                                       &standby_message_timeout))
 3309 rhaas                     813 UBC           0 :                     exit(1);
  624 michael                   814 LBC           0 :                 standby_message_timeout *= 1000;
 3309 rhaas                     815               0 :                 break;
 3309 rhaas                     816 CBC          51 :             case 'S':
 3309 rhaas                     817 GIC          51 :                 replication_slot = pg_strdup(optarg);
 3309 rhaas                     818 CBC          51 :                 break;
 3309 rhaas                     819 ECB             : /* action */
 3309 rhaas                     820 CBC          23 :             case 1:
                                821              23 :                 do_create_slot = true;
                                822              23 :                 break;
                                823              25 :             case 2:
                                824              25 :                 do_start_slot = true;
                                825              25 :                 break;
                                826               1 :             case 3:
 3309 rhaas                     827 GBC           1 :                 do_drop_slot = true;
                                828               1 :                 break;
 2828 andres                    829 UBC           0 :             case 4:
 2828 andres                    830 UIC           0 :                 slot_exists_ok = true;
 2828 andres                    831 LBC           0 :                 break;
                                832                 : 
 3309 rhaas                     833 CBC           1 :             default:
  366 tgl                       834 ECB             :                 /* getopt_long already emitted a complaint */
  366 tgl                       835 GIC           1 :                 pg_log_error_hint("Try \"%s --help\" for more information.", progname);
 3309 rhaas                     836               1 :                 exit(1);
                                837                 :         }
                                838                 :     }
                                839                 : 
                                840                 :     /*
 3309 rhaas                     841 ECB             :      * Any non-option arguments?
                                842                 :      */
 3309 rhaas                     843 GBC          52 :     if (optind < argc)
                                844                 :     {
 1469 peter                     845 UBC           0 :         pg_log_error("too many command-line arguments (first is \"%s\")",
 1469 peter                     846 EUB             :                      argv[optind]);
  366 tgl                       847 UIC           0 :         pg_log_error_hint("Try \"%s --help\" for more information.", progname);
 3309 rhaas                     848               0 :         exit(1);
                                849                 :     }
                                850                 : 
                                851                 :     /*
 3309 rhaas                     852 ECB             :      * Required arguments
                                853                 :      */
 3309 rhaas                     854 CBC          52 :     if (replication_slot == NULL)
 3309 rhaas                     855 ECB             :     {
 1469 peter                     856 CBC           1 :         pg_log_error("no slot specified");
  366 tgl                       857 GIC           1 :         pg_log_error_hint("Try \"%s --help\" for more information.", progname);
 3309 rhaas                     858               1 :         exit(1);
 3309 rhaas                     859 ECB             :     }
                                860                 : 
 3309 rhaas                     861 CBC          51 :     if (do_start_slot && outfile == NULL)
 3309 rhaas                     862 ECB             :     {
 1469 peter                     863 CBC           1 :         pg_log_error("no target file specified");
  366 tgl                       864 GIC           1 :         pg_log_error_hint("Try \"%s --help\" for more information.", progname);
 3309 rhaas                     865               1 :         exit(1);
 3309 rhaas                     866 ECB             :     }
                                867                 : 
 3309 rhaas                     868 CBC          50 :     if (!do_drop_slot && dbname == NULL)
 3309 rhaas                     869 ECB             :     {
 1469 peter                     870 CBC           1 :         pg_log_error("no database specified");
  366 tgl                       871 GIC           1 :         pg_log_error_hint("Try \"%s --help\" for more information.", progname);
 3309 rhaas                     872               1 :         exit(1);
 3309 rhaas                     873 ECB             :     }
                                874                 : 
 3309 rhaas                     875 CBC          49 :     if (!do_drop_slot && !do_create_slot && !do_start_slot)
 3309 rhaas                     876 ECB             :     {
 1469 peter                     877 CBC           1 :         pg_log_error("at least one action needs to be specified");
  366 tgl                       878 GIC           1 :         pg_log_error_hint("Try \"%s --help\" for more information.", progname);
 3309 rhaas                     879               1 :         exit(1);
 3309 rhaas                     880 ECB             :     }
                                881                 : 
 3309 rhaas                     882 GBC          48 :     if (do_drop_slot && (do_create_slot || do_start_slot))
 3309 rhaas                     883 EUB             :     {
 1469 peter                     884 UBC           0 :         pg_log_error("cannot use --create-slot or --start together with --drop-slot");
  366 tgl                       885 UIC           0 :         pg_log_error_hint("Try \"%s --help\" for more information.", progname);
 3309 rhaas                     886               0 :         exit(1);
 3309 rhaas                     887 ECB             :     }
                                888                 : 
 3162 andres                    889 GBC          48 :     if (startpos != InvalidXLogRecPtr && (do_create_slot || do_drop_slot))
 3309 rhaas                     890 EUB             :     {
 1469 peter                     891 UBC           0 :         pg_log_error("cannot use --create-slot or --drop-slot together with --startpos");
  366 tgl                       892 UIC           0 :         pg_log_error_hint("Try \"%s --help\" for more information.", progname);
 3309 rhaas                     893               0 :         exit(1);
 3309 rhaas                     894 ECB             :     }
                                895                 : 
 2286 simon                     896 GBC          48 :     if (endpos != InvalidXLogRecPtr && !do_start_slot)
 2286 simon                     897 EUB             :     {
 1469 peter                     898 UBC           0 :         pg_log_error("--endpos may only be specified with --start");
  366 tgl                       899 UIC           0 :         pg_log_error_hint("Try \"%s --help\" for more information.", progname);
 2286 simon                     900               0 :         exit(1);
 2286 simon                     901 ECB             :     }
                                902                 : 
  648 akapila                   903 CBC          48 :     if (two_phase && !do_create_slot)
  648 akapila                   904 ECB             :     {
  648 akapila                   905 CBC           1 :         pg_log_error("--two-phase may only be specified with --create-slot");
  366 tgl                       906 GIC           1 :         pg_log_error_hint("Try \"%s --help\" for more information.", progname);
  648 akapila                   907               1 :         exit(1);
                                908                 :     }
                                909                 : 
                                910                 :     /*
                                911                 :      * Obtain a connection to server.  Notably, if we need a password, we want
  504 tgl                       912 ECB             :      * to collect it from the user immediately.
 3309 rhaas                     913                 :      */
 3112 andres                    914 GIC          47 :     conn = GetConnection();
 3112 andres                    915 GBC          47 :     if (!conn)
 3112 andres                    916 ECB             :         /* Error message already written in GetConnection() */
 3112 andres                    917 UIC           0 :         exit(1);
 1562 peter                     918 GIC          47 :     atexit(disconnect_atexit);
                                919                 : 
                                920                 :     /*
                                921                 :      * Trap signals.  (Don't do this until after the initial password prompt,
                                922                 :      * if one is needed, in GetConnection.)
  504 tgl                       923 ECB             :      */
                                924                 : #ifndef WIN32
  207 dgustafsson               925 GNC          47 :     pqsignal(SIGINT, sigexit_handler);
                                926              47 :     pqsignal(SIGTERM, sigexit_handler);
  504 tgl                       927 GIC          47 :     pqsignal(SIGHUP, sighup_handler);
                                928                 : #endif
                                929                 : 
                                930                 :     /*
                                931                 :      * Run IDENTIFY_SYSTEM to make sure we connected using a database specific
                                932                 :      * replication connection.
 3112 andres                    933 ECB             :      */
 3112 andres                    934 GBC          47 :     if (!RunIdentifySystem(conn, NULL, NULL, NULL, &db_name))
 1562 peter                     935 UIC           0 :         exit(1);
 3309 rhaas                     936 ECB             : 
 3112 andres                    937 GBC          47 :     if (db_name == NULL)
  366 tgl                       938 UIC           0 :         pg_fatal("could not establish database-specific replication connection");
                                939                 : 
                                940                 :     /*
                                941                 :      * Set umask so that directories/files are created with the same
                                942                 :      * permissions as directories/files in the source data directory.
                                943                 :      *
                                944                 :      * pg_mode_mask is set to owner-only by default and then updated in
                                945                 :      * GetConnection() where we get the mode from the server-side with
                                946                 :      * RetrieveDataDirCreatePerm() and then call SetDataDirectoryCreatePerm().
 1828 sfrost                    947 ECB             :      */
 1828 sfrost                    948 GIC          47 :     umask(pg_mode_mask);
                                949                 : 
 3112 andres                    950 ECB             :     /* Drop a replication slot. */
 3309 rhaas                     951 GIC          47 :     if (do_drop_slot)
 3309 rhaas                     952 ECB             :     {
 3309 rhaas                     953 GBC           1 :         if (verbose)
 1469 peter                     954 UIC           0 :             pg_log_info("dropping replication slot \"%s\"", replication_slot);
 3309 rhaas                     955 ECB             : 
 3112 andres                    956 GBC           1 :         if (!DropReplicationSlot(conn, replication_slot))
 1562 peter                     957 UIC           0 :             exit(1);
                                958                 :     }
                                959                 : 
 3112 andres                    960 ECB             :     /* Create a replication slot. */
 3309 rhaas                     961 GIC          47 :     if (do_create_slot)
 3309 rhaas                     962 ECB             :     {
 3309 rhaas                     963 GBC          23 :         if (verbose)
 1469 peter                     964 UIC           0 :             pg_log_info("creating replication slot \"%s\"", replication_slot);
 3309 rhaas                     965 ECB             : 
 2021 peter_e                   966 GIC          23 :         if (!CreateReplicationSlot(conn, replication_slot, plugin, false,
  648 akapila                   967 EUB             :                                    false, false, slot_exists_ok, two_phase))
 1562 peter                     968 LBC           0 :             exit(1);
 2828 andres                    969 GIC          23 :         startpos = InvalidXLogRecPtr;
                                970                 :     }
 3309 rhaas                     971 ECB             : 
 3309 rhaas                     972 CBC          47 :     if (!do_start_slot)
 1562 peter                     973 GIC          24 :         exit(0);
                                974                 : 
                                975                 :     /* Stream loop */
                                976                 :     while (true)
 3309 rhaas                     977 ECB             :     {
 3112 andres                    978 CBC          23 :         StreamLogicalLog();
 3309 rhaas                     979 GIC          23 :         if (time_to_abort)
                                980                 :         {
                                981                 :             /*
                                982                 :              * We've been Ctrl-C'ed or reached an exit limit condition. That's
                                983                 :              * not an error, so exit without an errorcode.
 3309 rhaas                     984 ECB             :              */
 1562 peter                     985 GIC           8 :             exit(0);
 3309 rhaas                     986 ECB             :         }
 3309 rhaas                     987 CBC          15 :         else if (noloop)
  366 tgl                       988 GIC          15 :             pg_fatal("disconnected");
                                989                 :         else
                                990                 :         {
 3309 rhaas                     991 EUB             :             /* translator: check source for value for %d */
 1469 peter                     992 UIC           0 :             pg_log_info("disconnected; waiting %d seconds to try again",
 1469 peter                     993 EUB             :                         RECONNECT_SLEEP_TIME);
 3309 rhaas                     994 UIC           0 :             pg_usleep(RECONNECT_SLEEP_TIME * 1000000);
                                995                 :         }
                                996                 :     }
                                997                 : }
                                998                 : 
                                999                 : /*
                               1000                 :  * Fsync our output data, and send a feedback message to the server.  Returns
                               1001                 :  * true if successful, false otherwise.
                               1002                 :  *
                               1003                 :  * If successful, *now is updated to the current timestamp just before sending
                               1004                 :  * feedback.
                               1005                 :  */
 2286 simon                    1006 ECB             : static bool
 2286 simon                    1007 GIC           8 : flushAndSendFeedback(PGconn *conn, TimestampTz *now)
                               1008                 : {
 2286 simon                    1009 ECB             :     /* flush data to disk, so that we send a recent flush pointer */
 2286 simon                    1010 GBC           8 :     if (!OutputFsync(*now))
 2286 simon                    1011 LBC           0 :         return false;
 2286 simon                    1012 CBC           8 :     *now = feGetCurrentTimestamp();
 2286 simon                    1013 GBC           8 :     if (!sendFeedback(conn, *now, true, false))
 2286 simon                    1014 UIC           0 :         return false;
 2286 simon                    1015 ECB             : 
 2286 simon                    1016 GIC           8 :     return true;
                               1017                 : }
                               1018                 : 
                               1019                 : /*
                               1020                 :  * Try to inform the server about our upcoming demise, but don't wait around or
                               1021                 :  * retry on failure.
                               1022                 :  */
 2286 simon                    1023 ECB             : static void
 2286 simon                    1024 GIC           7 : prepareToTerminate(PGconn *conn, XLogRecPtr endpos, bool keepalive, XLogRecPtr lsn)
 2286 simon                    1025 ECB             : {
 2286 simon                    1026 CBC           7 :     (void) PQputCopyEnd(conn, NULL);
 2286 simon                    1027 GIC           7 :     (void) PQflush(conn);
 2286 simon                    1028 ECB             : 
 2286 simon                    1029 GIC           7 :     if (verbose)
 2286 simon                    1030 EUB             :     {
 2286 simon                    1031 UBC           0 :         if (keepalive)
 1370 peter                    1032 UIC           0 :             pg_log_info("end position %X/%X reached by keepalive",
                               1033                 :                         LSN_FORMAT_ARGS(endpos));
 2286 simon                    1034 EUB             :         else
 1370 peter                    1035 UIC           0 :             pg_log_info("end position %X/%X reached by WAL record at %X/%X",
                               1036                 :                         LSN_FORMAT_ARGS(endpos), LSN_FORMAT_ARGS(lsn));
 2286 simon                    1037 ECB             :     }
 2286 simon                    1038 GIC           7 : }
        

Generated by: LCOV version v1.16-55-g56c0a2a