LCOV - differential code coverage report
Current view: top level - src/bin/pg_basebackup - pg_recvlogical.c (source / functions) Coverage Total Hit UNC UBC GBC GNC CBC DUB DCB
Current: Differential Code Coverage 16@8cea358b128 vs 17@8cea358b128 Lines: 71.6 % 476 341 14 121 8 333 4 4
Current Date: 2024-04-14 14:21:10 Functions: 90.0 % 10 9 1 3 6 1
Baseline: 16@8cea358b128 Branches: 64.9 % 291 189 7 95 1 2 186
Baseline Date: 2024-04-14 14:21:09 Line coverage date bins:
Legend: Lines: hit not hit | Branches: + taken - not taken # not executed (240..) days: 71.6 % 476 341 14 121 8 333
Function coverage date bins:
(240..) days: 90.0 % 10 9 1 3 6
Branch coverage date bins:
(240..) days: 64.9 % 291 189 7 95 1 2 186

 Age         Owner                    Branch data    TLA  Line data    Source code
                                  1                 :                : /*-------------------------------------------------------------------------
                                  2                 :                :  *
                                  3                 :                :  * pg_recvlogical.c - receive data from a logical decoding slot in a streaming
                                  4                 :                :  *                    fashion and write it to a local file.
                                  5                 :                :  *
                                  6                 :                :  * Portions Copyright (c) 1996-2024, PostgreSQL Global Development Group
                                  7                 :                :  *
                                  8                 :                :  * IDENTIFICATION
                                  9                 :                :  *        src/bin/pg_basebackup/pg_recvlogical.c
                                 10                 :                :  *-------------------------------------------------------------------------
                                 11                 :                :  */
                                 12                 :                : 
                                 13                 :                : #include "postgres_fe.h"
                                 14                 :                : 
                                 15                 :                : #include <dirent.h>
                                 16                 :                : #include <limits.h>
                                 17                 :                : #include <sys/select.h>
                                 18                 :                : #include <sys/stat.h>
                                 19                 :                : #include <unistd.h>
                                 20                 :                : 
                                 21                 :                : #include "access/xlog_internal.h"
                                 22                 :                : #include "common/fe_memutils.h"
                                 23                 :                : #include "common/file_perm.h"
                                 24                 :                : #include "common/logging.h"
                                 25                 :                : #include "fe_utils/option_utils.h"
                                 26                 :                : #include "getopt_long.h"
                                 27                 :                : #include "libpq-fe.h"
                                 28                 :                : #include "libpq/pqsignal.h"
                                 29                 :                : #include "pqexpbuffer.h"
                                 30                 :                : #include "streamutil.h"
                                 31                 :                : 
                                 32                 :                : /* Time to sleep between reconnection attempts */
                                 33                 :                : #define RECONNECT_SLEEP_TIME 5
                                 34                 :                : 
                                 35                 :                : typedef enum
                                 36                 :                : {
                                 37                 :                :     STREAM_STOP_NONE,
                                 38                 :                :     STREAM_STOP_END_OF_WAL,
                                 39                 :                :     STREAM_STOP_KEEPALIVE,
                                 40                 :                :     STREAM_STOP_SIGNAL
                                 41                 :                : } StreamStopReason;
                                 42                 :                : 
                                 43                 :                : /* Global Options */
                                 44                 :                : static char *outfile = NULL;
                                 45                 :                : static int  verbose = 0;
                                 46                 :                : static bool two_phase = false;
                                 47                 :                : static int  noloop = 0;
                                 48                 :                : static int  standby_message_timeout = 10 * 1000;    /* 10 sec = default */
                                 49                 :                : static int  fsync_interval = 10 * 1000; /* 10 sec = default */
                                 50                 :                : static XLogRecPtr startpos = InvalidXLogRecPtr;
                                 51                 :                : static XLogRecPtr endpos = InvalidXLogRecPtr;
                                 52                 :                : static bool do_create_slot = false;
                                 53                 :                : static bool slot_exists_ok = false;
                                 54                 :                : static bool do_start_slot = false;
                                 55                 :                : static bool do_drop_slot = false;
                                 56                 :                : static char *replication_slot = NULL;
                                 57                 :                : 
                                 58                 :                : /* filled pairwise with option, value. value may be NULL */
                                 59                 :                : static char **options;
                                 60                 :                : static size_t noptions = 0;
                                 61                 :                : static const char *plugin = "test_decoding";
                                 62                 :                : 
                                 63                 :                : /* Global State */
                                 64                 :                : static int  outfd = -1;
                                 65                 :                : static volatile sig_atomic_t time_to_abort = false;
                                 66                 :                : static volatile sig_atomic_t stop_reason = STREAM_STOP_NONE;
                                 67                 :                : static volatile sig_atomic_t output_reopen = false;
                                 68                 :                : static bool output_isfile;
                                 69                 :                : static TimestampTz output_last_fsync = -1;
                                 70                 :                : static bool output_needs_fsync = false;
                                 71                 :                : static XLogRecPtr output_written_lsn = InvalidXLogRecPtr;
                                 72                 :                : static XLogRecPtr output_fsync_lsn = InvalidXLogRecPtr;
                                 73                 :                : 
                                 74                 :                : static void usage(void);
                                 75                 :                : static void StreamLogicalLog(void);
                                 76                 :                : static bool flushAndSendFeedback(PGconn *conn, TimestampTz *now);
                                 77                 :                : static void prepareToTerminate(PGconn *conn, XLogRecPtr endpos,
                                 78                 :                :                                StreamStopReason reason,
                                 79                 :                :                                XLogRecPtr lsn);
                                 80                 :                : 
                                 81                 :                : static void
 3680 rhaas@postgresql.org       82                 :CBC           1 : usage(void)
                                 83                 :                : {
 3313 bruce@momjian.us           84                 :              1 :     printf(_("%s controls PostgreSQL logical decoding streams.\n\n"),
                                 85                 :                :            progname);
 3680 rhaas@postgresql.org       86                 :              1 :     printf(_("Usage:\n"));
                                 87                 :              1 :     printf(_("  %s [OPTION]...\n"), progname);
 3472 peter_e@gmx.net            88                 :              1 :     printf(_("\nAction to be performed:\n"));
                                 89                 :              1 :     printf(_("      --create-slot      create a new replication slot (for the slot's name see --slot)\n"));
                                 90                 :              1 :     printf(_("      --drop-slot        drop the replication slot (for the slot's name see --slot)\n"));
                                 91                 :              1 :     printf(_("      --start            start streaming in a replication slot (for the slot's name see --slot)\n"));
 3680 rhaas@postgresql.org       92                 :              1 :     printf(_("\nOptions:\n"));
 2502 peter_e@gmx.net            93                 :              1 :     printf(_("  -E, --endpos=LSN       exit after receiving the specified LSN\n"));
 3472                            94                 :              1 :     printf(_("  -f, --file=FILE        receive log into this file, - for stdout\n"));
 3612 andres@anarazel.de         95                 :              1 :     printf(_("  -F  --fsync-interval=SECS\n"
                                 96                 :                :              "                         time between fsyncs to the output file (default: %d)\n"), (fsync_interval / 1000));
 3133 peter_e@gmx.net            97                 :              1 :     printf(_("      --if-not-exists    do not error if slot already exists when creating a slot\n"));
 3472                            98                 :              1 :     printf(_("  -I, --startpos=LSN     where in an existing slot should the streaming start\n"));
 3680 rhaas@postgresql.org       99                 :              1 :     printf(_("  -n, --no-loop          do not loop on connection lost\n"));
 3472 peter_e@gmx.net           100                 :              1 :     printf(_("  -o, --option=NAME[=VALUE]\n"
                                101                 :                :              "                         pass option NAME with optional value VALUE to the\n"
                                102                 :                :              "                         output plugin\n"));
                                103                 :              1 :     printf(_("  -P, --plugin=PLUGIN    use output plugin PLUGIN (default: %s)\n"), plugin);
                                104                 :              1 :     printf(_("  -s, --status-interval=SECS\n"
                                105                 :                :              "                         time between status packets sent to server (default: %d)\n"), (standby_message_timeout / 1000));
                                106                 :              1 :     printf(_("  -S, --slot=SLOTNAME    name of the logical replication slot\n"));
  569 peter@eisentraut.org      107                 :              1 :     printf(_("  -t, --two-phase        enable decoding of prepared transactions when creating a slot\n"));
 3680 rhaas@postgresql.org      108                 :              1 :     printf(_("  -v, --verbose          output verbose messages\n"));
                                109                 :              1 :     printf(_("  -V, --version          output version information, then exit\n"));
                                110                 :              1 :     printf(_("  -?, --help             show this help, then exit\n"));
                                111                 :              1 :     printf(_("\nConnection options:\n"));
                                112                 :              1 :     printf(_("  -d, --dbname=DBNAME    database to connect to\n"));
                                113                 :              1 :     printf(_("  -h, --host=HOSTNAME    database server host or socket directory\n"));
                                114                 :              1 :     printf(_("  -p, --port=PORT        database server port number\n"));
                                115                 :              1 :     printf(_("  -U, --username=NAME    connect as specified database user\n"));
                                116                 :              1 :     printf(_("  -w, --no-password      never prompt for password\n"));
                                117                 :              1 :     printf(_("  -W, --password         force password prompt (should happen automatically)\n"));
 1507 peter@eisentraut.org      118                 :              1 :     printf(_("\nReport bugs to <%s>.\n"), PACKAGE_BUGREPORT);
                                119                 :              1 :     printf(_("%s home page: <%s>\n"), PACKAGE_NAME, PACKAGE_URL);
 3680 rhaas@postgresql.org      120                 :              1 : }
                                121                 :                : 
                                122                 :                : /*
                                123                 :                :  * Send a Standby Status Update message to server.
                                124                 :                :  */
                                125                 :                : static bool
 2607 tgl@sss.pgh.pa.us         126                 :             25 : sendFeedback(PGconn *conn, TimestampTz now, bool force, bool replyRequested)
                                127                 :                : {
                                128                 :                :     static XLogRecPtr last_written_lsn = InvalidXLogRecPtr;
                                129                 :                :     static XLogRecPtr last_fsync_lsn = InvalidXLogRecPtr;
                                130                 :                : 
                                131                 :                :     char        replybuf[1 + 8 + 8 + 8 + 8 + 1];
 3680 rhaas@postgresql.org      132                 :             25 :     int         len = 0;
                                133                 :                : 
                                134                 :                :     /*
                                135                 :                :      * we normally don't want to send superfluous feedback, but if it's
                                136                 :                :      * because of a timeout we need to, otherwise wal_sender_timeout will kill
                                137                 :                :      * us.
                                138                 :                :      */
                                139         [ -  + ]:             25 :     if (!force &&
 3680 rhaas@postgresql.org      140         [ #  # ]:UBC           0 :         last_written_lsn == output_written_lsn &&
 1432 noah@leadboat.com         141         [ #  # ]:              0 :         last_fsync_lsn == output_fsync_lsn)
 3680 rhaas@postgresql.org      142                 :              0 :         return true;
                                143                 :                : 
 3680 rhaas@postgresql.org      144         [ -  + ]:CBC          25 :     if (verbose)
 1840 peter@eisentraut.org      145                 :UBC           0 :         pg_log_info("confirming write up to %X/%X, flush to %X/%X (slot %s)",
                                146                 :                :                     LSN_FORMAT_ARGS(output_written_lsn),
                                147                 :                :                     LSN_FORMAT_ARGS(output_fsync_lsn),
                                148                 :                :                     replication_slot);
                                149                 :                : 
 3680 rhaas@postgresql.org      150                 :CBC          25 :     replybuf[len] = 'r';
                                151                 :             25 :     len += 1;
 3631 bruce@momjian.us          152                 :             25 :     fe_sendint64(output_written_lsn, &replybuf[len]);   /* write */
 3680 rhaas@postgresql.org      153                 :             25 :     len += 8;
 2489 tgl@sss.pgh.pa.us         154                 :             25 :     fe_sendint64(output_fsync_lsn, &replybuf[len]); /* flush */
 3680 rhaas@postgresql.org      155                 :             25 :     len += 8;
 3631 bruce@momjian.us          156                 :             25 :     fe_sendint64(InvalidXLogRecPtr, &replybuf[len]);    /* apply */
 3680 rhaas@postgresql.org      157                 :             25 :     len += 8;
 3631 bruce@momjian.us          158                 :             25 :     fe_sendint64(now, &replybuf[len]);  /* sendTime */
 3680 rhaas@postgresql.org      159                 :             25 :     len += 8;
 2489 tgl@sss.pgh.pa.us         160                 :             25 :     replybuf[len] = replyRequested ? 1 : 0; /* replyRequested */
 3680 rhaas@postgresql.org      161                 :             25 :     len += 1;
                                162                 :                : 
                                163                 :             25 :     startpos = output_written_lsn;
                                164                 :             25 :     last_written_lsn = output_written_lsn;
                                165                 :             25 :     last_fsync_lsn = output_fsync_lsn;
                                166                 :                : 
                                167   [ +  -  -  + ]:             25 :     if (PQputCopyData(conn, replybuf, len) <= 0 || PQflush(conn))
                                168                 :                :     {
 1840 peter@eisentraut.org      169                 :UBC           0 :         pg_log_error("could not send feedback packet: %s",
                                170                 :                :                      PQerrorMessage(conn));
 3680 rhaas@postgresql.org      171                 :              0 :         return false;
                                172                 :                :     }
                                173                 :                : 
 3680 rhaas@postgresql.org      174                 :CBC          25 :     return true;
                                175                 :                : }
                                176                 :                : 
                                177                 :                : static void
 1933 peter@eisentraut.org      178                 :             47 : disconnect_atexit(void)
                                179                 :                : {
 3680 rhaas@postgresql.org      180         [ +  + ]:             47 :     if (conn != NULL)
                                181                 :             24 :         PQfinish(conn);
                                182                 :             47 : }
                                183                 :                : 
                                184                 :                : static bool
 2607 tgl@sss.pgh.pa.us         185                 :             24 : OutputFsync(TimestampTz now)
                                186                 :                : {
 3680 rhaas@postgresql.org      187                 :             24 :     output_last_fsync = now;
                                188                 :                : 
                                189                 :             24 :     output_fsync_lsn = output_written_lsn;
                                190                 :                : 
                                191         [ -  + ]:             24 :     if (fsync_interval <= 0)
 3680 rhaas@postgresql.org      192                 :UBC           0 :         return true;
                                193                 :                : 
 3622 heikki.linnakangas@i      194         [ +  + ]:CBC          24 :     if (!output_needs_fsync)
 3680 rhaas@postgresql.org      195                 :             18 :         return true;
                                196                 :                : 
 3622 heikki.linnakangas@i      197                 :              6 :     output_needs_fsync = false;
                                198                 :                : 
                                199                 :                :     /* can only fsync if it's a regular file */
 3204 andres@anarazel.de        200         [ +  + ]:              6 :     if (!output_isfile)
                                201                 :              4 :         return true;
                                202                 :                : 
                                203         [ -  + ]:              2 :     if (fsync(outfd) != 0)
  737 tgl@sss.pgh.pa.us         204                 :UBC           0 :         pg_fatal("could not fsync file \"%s\": %m", outfile);
                                205                 :                : 
 3680 rhaas@postgresql.org      206                 :CBC           2 :     return true;
                                207                 :                : }
                                208                 :                : 
                                209                 :                : /*
                                210                 :                :  * Start the log streaming
                                211                 :                :  */
                                212                 :                : static void
 3485 andres@anarazel.de        213                 :             23 : StreamLogicalLog(void)
                                214                 :                : {
                                215                 :                :     PGresult   *res;
 3680 rhaas@postgresql.org      216                 :             23 :     char       *copybuf = NULL;
 2607 tgl@sss.pgh.pa.us         217                 :             23 :     TimestampTz last_status = -1;
                                218                 :                :     int         i;
                                219                 :                :     PQExpBuffer query;
                                220                 :                :     XLogRecPtr  cur_record_lsn;
                                221                 :                : 
 3680 rhaas@postgresql.org      222                 :             23 :     output_written_lsn = InvalidXLogRecPtr;
                                223                 :             23 :     output_fsync_lsn = InvalidXLogRecPtr;
  269 michael@paquier.xyz       224                 :GNC          23 :     cur_record_lsn = InvalidXLogRecPtr;
                                225                 :                : 
                                226                 :                :     /*
                                227                 :                :      * Connect in replication mode to the server
                                228                 :                :      */
 3680 rhaas@postgresql.org      229         [ -  + ]:CBC          23 :     if (!conn)
 3680 rhaas@postgresql.org      230                 :UBC           0 :         conn = GetConnection();
 3680 rhaas@postgresql.org      231         [ -  + ]:CBC          23 :     if (!conn)
                                232                 :                :         /* Error message already written in GetConnection() */
 3680 rhaas@postgresql.org      233                 :UBC           0 :         return;
                                234                 :                : 
                                235                 :                :     /*
                                236                 :                :      * Start the replication
                                237                 :                :      */
 3680 rhaas@postgresql.org      238         [ -  + ]:CBC          23 :     if (verbose)
 1840 peter@eisentraut.org      239                 :UBC           0 :         pg_log_info("starting log streaming at %X/%X (slot %s)",
                                240                 :                :                     LSN_FORMAT_ARGS(startpos),
                                241                 :                :                     replication_slot);
                                242                 :                : 
                                243                 :                :     /* Initiate the replication stream at specified location */
  875 tgl@sss.pgh.pa.us         244                 :CBC          23 :     query = createPQExpBuffer();
 3680 rhaas@postgresql.org      245                 :             23 :     appendPQExpBuffer(query, "START_REPLICATION SLOT \"%s\" LOGICAL %X/%X",
 1146 peter@eisentraut.org      246                 :             23 :                       replication_slot, LSN_FORMAT_ARGS(startpos));
                                247                 :                : 
                                248                 :                :     /* print options if there are any */
 3680 rhaas@postgresql.org      249         [ +  + ]:             23 :     if (noptions)
                                250                 :             20 :         appendPQExpBufferStr(query, " (");
                                251                 :                : 
                                252         [ +  + ]:             63 :     for (i = 0; i < noptions; i++)
                                253                 :                :     {
                                254                 :                :         /* separator */
                                255         [ +  + ]:             40 :         if (i > 0)
                                256                 :             20 :             appendPQExpBufferStr(query, ", ");
                                257                 :                : 
                                258                 :                :         /* write option name */
                                259                 :             40 :         appendPQExpBuffer(query, "\"%s\"", options[(i * 2)]);
                                260                 :                : 
                                261                 :                :         /* write option value if specified */
                                262         [ +  - ]:             40 :         if (options[(i * 2) + 1] != NULL)
                                263                 :             40 :             appendPQExpBuffer(query, " '%s'", options[(i * 2) + 1]);
                                264                 :                :     }
                                265                 :                : 
                                266         [ +  + ]:             23 :     if (noptions)
                                267                 :             20 :         appendPQExpBufferChar(query, ')');
                                268                 :                : 
                                269                 :             23 :     res = PQexec(conn, query->data);
                                270         [ +  + ]:             23 :     if (PQresultStatus(res) != PGRES_COPY_BOTH)
                                271                 :                :     {
 1840 peter@eisentraut.org      272                 :              6 :         pg_log_error("could not send replication command \"%s\": %s",
                                273                 :                :                      query->data, PQresultErrorMessage(res));
 3680 rhaas@postgresql.org      274                 :              6 :         PQclear(res);
                                275                 :              6 :         goto error;
                                276                 :                :     }
                                277                 :             17 :     PQclear(res);
                                278                 :             17 :     resetPQExpBuffer(query);
                                279                 :                : 
                                280         [ -  + ]:             17 :     if (verbose)
 1840 peter@eisentraut.org      281                 :UBC           0 :         pg_log_info("streaming initiated");
                                282                 :                : 
 3680 rhaas@postgresql.org      283         [ +  + ]:CBC         857 :     while (!time_to_abort)
                                284                 :                :     {
                                285                 :                :         int         r;
                                286                 :                :         int         bytes_left;
                                287                 :                :         int         bytes_written;
                                288                 :                :         TimestampTz now;
                                289                 :                :         int         hdr_len;
                                290                 :                : 
  269 michael@paquier.xyz       291                 :GNC         856 :         cur_record_lsn = InvalidXLogRecPtr;
                                292                 :                : 
 3680 rhaas@postgresql.org      293         [ +  + ]:CBC         856 :         if (copybuf != NULL)
                                294                 :                :         {
                                295                 :            496 :             PQfreemem(copybuf);
                                296                 :            496 :             copybuf = NULL;
                                297                 :                :         }
                                298                 :                : 
                                299                 :                :         /*
                                300                 :                :          * Potentially send a status message to the primary.
                                301                 :                :          */
                                302                 :            856 :         now = feGetCurrentTimestamp();
                                303                 :                : 
                                304   [ +  +  +  + ]:           1695 :         if (outfd != -1 &&
                                305                 :            839 :             feTimestampDifferenceExceeds(output_last_fsync, now,
                                306                 :                :                                          fsync_interval))
                                307                 :                :         {
                                308         [ -  + ]:             16 :             if (!OutputFsync(now))
 3680 rhaas@postgresql.org      309                 :UBC           0 :                 goto error;
                                310                 :                :         }
                                311                 :                : 
 3680 rhaas@postgresql.org      312   [ +  -  +  + ]:CBC        1712 :         if (standby_message_timeout > 0 &&
                                313                 :            856 :             feTimestampDifferenceExceeds(last_status, now,
                                314                 :                :                                          standby_message_timeout))
                                315                 :                :         {
                                316                 :                :             /* Time to send feedback! */
                                317         [ -  + ]:             17 :             if (!sendFeedback(conn, now, true, false))
 3680 rhaas@postgresql.org      318                 :UBC           0 :                 goto error;
                                319                 :                : 
 3680 rhaas@postgresql.org      320                 :CBC          17 :             last_status = now;
                                321                 :                :         }
                                322                 :                : 
                                323                 :                :         /* got SIGHUP, close output file */
 3622 heikki.linnakangas@i      324   [ +  +  -  +  :            856 :         if (outfd != -1 && output_reopen && strcmp(outfile, "-") != 0)
                                              -  - ]
                                325                 :                :         {
 3622 heikki.linnakangas@i      326                 :UBC           0 :             now = feGetCurrentTimestamp();
                                327         [ #  # ]:              0 :             if (!OutputFsync(now))
                                328                 :              0 :                 goto error;
                                329                 :              0 :             close(outfd);
                                330                 :              0 :             outfd = -1;
                                331                 :                :         }
 3622 heikki.linnakangas@i      332                 :CBC         856 :         output_reopen = false;
                                333                 :                : 
                                334                 :                :         /* open the output file, if not open yet */
 3621                           335         [ +  + ]:            856 :         if (outfd == -1)
                                336                 :                :         {
                                337                 :                :             struct stat statbuf;
                                338                 :                : 
                                339         [ +  - ]:             17 :             if (strcmp(outfile, "-") == 0)
                                340                 :             17 :                 outfd = fileno(stdout);
                                341                 :                :             else
 3621 heikki.linnakangas@i      342                 :UBC           0 :                 outfd = open(outfile, O_CREAT | O_APPEND | O_WRONLY | PG_BINARY,
                                343                 :                :                              S_IRUSR | S_IWUSR);
 3621 heikki.linnakangas@i      344         [ -  + ]:CBC          17 :             if (outfd == -1)
                                345                 :                :             {
 1840 peter@eisentraut.org      346                 :UBC           0 :                 pg_log_error("could not open log file \"%s\": %m", outfile);
 3621 heikki.linnakangas@i      347                 :              0 :                 goto error;
                                348                 :                :             }
                                349                 :                : 
 3204 andres@anarazel.de        350         [ -  + ]:CBC          17 :             if (fstat(outfd, &statbuf) != 0)
                                351                 :                :             {
 1840 peter@eisentraut.org      352                 :UBC           0 :                 pg_log_error("could not stat file \"%s\": %m", outfile);
  990 michael@paquier.xyz       353                 :              0 :                 goto error;
                                354                 :                :             }
                                355                 :                : 
 3204 andres@anarazel.de        356   [ +  +  +  - ]:CBC          17 :             output_isfile = S_ISREG(statbuf.st_mode) && !isatty(outfd);
                                357                 :                :         }
                                358                 :                : 
 3680 rhaas@postgresql.org      359                 :            856 :         r = PQgetCopyData(conn, &copybuf, 1);
                                360         [ +  + ]:            856 :         if (r == 0)
                                361                 :            343 :         {
                                362                 :                :             /*
                                363                 :                :              * In async mode, and no data available. We block on reading but
                                364                 :                :              * not more than the specified timeout, so that we can send a
                                365                 :                :              * response back to the client.
                                366                 :                :              */
                                367                 :                :             fd_set      input_mask;
 2607 tgl@sss.pgh.pa.us         368                 :            346 :             TimestampTz message_target = 0;
                                369                 :            346 :             TimestampTz fsync_target = 0;
                                370                 :                :             struct timeval timeout;
 3680 rhaas@postgresql.org      371                 :            346 :             struct timeval *timeoutptr = NULL;
                                372                 :                : 
 2959 peter_e@gmx.net           373         [ -  + ]:            346 :             if (PQsocket(conn) < 0)
                                374                 :                :             {
 1840 peter@eisentraut.org      375                 :UBC           0 :                 pg_log_error("invalid socket: %s", PQerrorMessage(conn));
 2959 peter_e@gmx.net           376                 :              0 :                 goto error;
                                377                 :                :             }
                                378                 :                : 
 3680 rhaas@postgresql.org      379         [ +  + ]:CBC        5882 :             FD_ZERO(&input_mask);
                                380                 :            346 :             FD_SET(PQsocket(conn), &input_mask);
                                381                 :                : 
                                382                 :                :             /* Compute when we need to wakeup to send a keepalive message. */
                                383         [ +  - ]:            346 :             if (standby_message_timeout)
                                384                 :            346 :                 message_target = last_status + (standby_message_timeout - 1) *
                                385                 :                :                     ((int64) 1000);
                                386                 :                : 
                                387                 :                :             /* Compute when we need to wakeup to fsync the output file. */
 3622 heikki.linnakangas@i      388   [ +  -  +  + ]:            346 :             if (fsync_interval > 0 && output_needs_fsync)
 3680 rhaas@postgresql.org      389                 :            136 :                 fsync_target = output_last_fsync + (fsync_interval - 1) *
                                390                 :                :                     ((int64) 1000);
                                391                 :                : 
                                392                 :                :             /* Now compute when to wakeup. */
                                393   [ -  +  -  - ]:            346 :             if (message_target > 0 || fsync_target > 0)
                                394                 :                :             {
                                395                 :                :                 TimestampTz targettime;
                                396                 :                :                 long        secs;
                                397                 :                :                 int         usecs;
                                398                 :                : 
                                399                 :            346 :                 targettime = message_target;
                                400                 :                : 
                                401   [ +  +  -  + ]:            346 :                 if (fsync_target > 0 && fsync_target < targettime)
 3680 rhaas@postgresql.org      402                 :UBC           0 :                     targettime = fsync_target;
                                403                 :                : 
 3680 rhaas@postgresql.org      404                 :CBC         346 :                 feTimestampDifference(now,
                                405                 :                :                                       targettime,
                                406                 :                :                                       &secs,
                                407                 :                :                                       &usecs);
                                408         [ -  + ]:            346 :                 if (secs <= 0)
 3680 rhaas@postgresql.org      409                 :UBC           0 :                     timeout.tv_sec = 1; /* Always sleep at least 1 sec */
                                410                 :                :                 else
 3680 rhaas@postgresql.org      411                 :CBC         346 :                     timeout.tv_sec = secs;
                                412                 :            346 :                 timeout.tv_usec = usecs;
                                413                 :            346 :                 timeoutptr = &timeout;
                                414                 :                :             }
                                415                 :                : 
                                416                 :            346 :             r = select(PQsocket(conn) + 1, &input_mask, NULL, NULL, timeoutptr);
                                417   [ +  -  +  +  :            346 :             if (r == 0 || (r < 0 && errno == EINTR))
                                              +  - ]
                                418                 :                :             {
                                419                 :                :                 /*
                                420                 :                :                  * Got a timeout or signal. Continue the loop and either
                                421                 :                :                  * deliver a status packet to the server or just go back into
                                422                 :                :                  * blocking.
                                423                 :                :                  */
                                424                 :            344 :                 continue;
                                425                 :                :             }
                                426         [ -  + ]:            345 :             else if (r < 0)
                                427                 :                :             {
 1087 peter@eisentraut.org      428                 :UBC           0 :                 pg_log_error("%s() failed: %m", "select");
 3680 rhaas@postgresql.org      429                 :              0 :                 goto error;
                                430                 :                :             }
                                431                 :                : 
                                432                 :                :             /* Else there is actually data on the socket */
 3680 rhaas@postgresql.org      433         [ +  + ]:CBC         345 :             if (PQconsumeInput(conn) == 0)
                                434                 :                :             {
 1840 peter@eisentraut.org      435                 :              2 :                 pg_log_error("could not receive data from WAL stream: %s",
                                436                 :                :                              PQerrorMessage(conn));
 3680 rhaas@postgresql.org      437                 :              2 :                 goto error;
                                438                 :                :             }
                                439                 :            343 :             continue;
                                440                 :                :         }
                                441                 :                : 
                                442                 :                :         /* End of copy stream */
                                443         [ +  + ]:            510 :         if (r == -1)
                                444                 :             14 :             break;
                                445                 :                : 
                                446                 :                :         /* Failure while reading the copy stream */
                                447         [ -  + ]:            503 :         if (r == -2)
                                448                 :                :         {
 1840 peter@eisentraut.org      449                 :UBC           0 :             pg_log_error("could not read COPY data: %s",
                                450                 :                :                          PQerrorMessage(conn));
 3680 rhaas@postgresql.org      451                 :              0 :             goto error;
                                452                 :                :         }
                                453                 :                : 
                                454                 :                :         /* Check the message type. */
 3680 rhaas@postgresql.org      455         [ +  + ]:CBC         503 :         if (copybuf[0] == 'k')
                                456                 :            410 :         {
                                457                 :                :             int         pos;
                                458                 :                :             bool        replyRequested;
                                459                 :                :             XLogRecPtr  walEnd;
 2657 simon@2ndQuadrant.co      460                 :            412 :             bool        endposReached = false;
                                461                 :                : 
                                462                 :                :             /*
                                463                 :                :              * Parse the keepalive message, enclosed in the CopyData message.
                                464                 :                :              * We just check if the server requested a reply, and ignore the
                                465                 :                :              * rest.
                                466                 :                :              */
 3680 rhaas@postgresql.org      467                 :            412 :             pos = 1;            /* skip msgtype 'k' */
                                468                 :            412 :             walEnd = fe_recvint64(&copybuf[pos]);
                                469                 :            412 :             output_written_lsn = Max(walEnd, output_written_lsn);
                                470                 :                : 
                                471                 :            412 :             pos += 8;           /* read walEnd */
                                472                 :                : 
                                473                 :            412 :             pos += 8;           /* skip sendTime */
                                474                 :                : 
                                475         [ -  + ]:            412 :             if (r < pos + 1)
                                476                 :                :             {
 1840 peter@eisentraut.org      477                 :UBC           0 :                 pg_log_error("streaming header too small: %d", r);
 3680 rhaas@postgresql.org      478                 :              0 :                 goto error;
                                479                 :                :             }
 3680 rhaas@postgresql.org      480                 :CBC         412 :             replyRequested = copybuf[pos];
                                481                 :                : 
 2657 simon@2ndQuadrant.co      482   [ +  +  +  + ]:            412 :             if (endpos != InvalidXLogRecPtr && walEnd >= endpos)
                                483                 :                :             {
                                484                 :                :                 /*
                                485                 :                :                  * If there's nothing to read on the socket until a keepalive
                                486                 :                :                  * we know that the server has nothing to send us; and if
                                487                 :                :                  * walEnd has passed endpos, we know nothing else can have
                                488                 :                :                  * committed before endpos.  So we can bail out now.
                                489                 :                :                  */
                                490                 :              2 :                 endposReached = true;
                                491                 :                :             }
                                492                 :                : 
                                493                 :                :             /* Send a reply, if necessary */
                                494   [ +  +  +  + ]:            412 :             if (replyRequested || endposReached)
                                495                 :                :             {
                                496         [ -  + ]:              3 :                 if (!flushAndSendFeedback(conn, &now))
 3680 rhaas@postgresql.org      497                 :UBC           0 :                     goto error;
 3680 rhaas@postgresql.org      498                 :CBC           3 :                 last_status = now;
                                499                 :                :             }
                                500                 :                : 
 2657 simon@2ndQuadrant.co      501         [ +  + ]:            412 :             if (endposReached)
                                502                 :                :             {
  269 michael@paquier.xyz       503                 :GNC           2 :                 stop_reason = STREAM_STOP_KEEPALIVE;
 2657 simon@2ndQuadrant.co      504                 :CBC           2 :                 time_to_abort = true;
                                505                 :              2 :                 break;
                                506                 :                :             }
                                507                 :                : 
 3680 rhaas@postgresql.org      508                 :            410 :             continue;
                                509                 :                :         }
                                510         [ -  + ]:             91 :         else if (copybuf[0] != 'w')
                                511                 :                :         {
 1840 peter@eisentraut.org      512                 :UBC           0 :             pg_log_error("unrecognized streaming header: \"%c\"",
                                513                 :                :                          copybuf[0]);
 3680 rhaas@postgresql.org      514                 :              0 :             goto error;
                                515                 :                :         }
                                516                 :                : 
                                517                 :                :         /*
                                518                 :                :          * Read the header of the XLogData message, enclosed in the CopyData
                                519                 :                :          * message. We only need the WAL location field (dataStart), the rest
                                520                 :                :          * of the header is ignored.
                                521                 :                :          */
 3680 rhaas@postgresql.org      522                 :CBC          91 :         hdr_len = 1;            /* msgtype 'w' */
                                523                 :             91 :         hdr_len += 8;           /* dataStart */
                                524                 :             91 :         hdr_len += 8;           /* walEnd */
                                525                 :             91 :         hdr_len += 8;           /* sendTime */
                                526         [ -  + ]:             91 :         if (r < hdr_len + 1)
                                527                 :                :         {
 1840 peter@eisentraut.org      528                 :UBC           0 :             pg_log_error("streaming header too small: %d", r);
 3680 rhaas@postgresql.org      529                 :              0 :             goto error;
                                530                 :                :         }
                                531                 :                : 
                                532                 :                :         /* Extract WAL location for this block */
 2657 simon@2ndQuadrant.co      533                 :CBC          91 :         cur_record_lsn = fe_recvint64(&copybuf[1]);
                                534                 :                : 
                                535   [ +  +  -  + ]:             91 :         if (endpos != InvalidXLogRecPtr && cur_record_lsn > endpos)
                                536                 :                :         {
                                537                 :                :             /*
                                538                 :                :              * We've read past our endpoint, so prepare to go away being
                                539                 :                :              * cautious about what happens to our output data.
                                540                 :                :              */
 2657 simon@2ndQuadrant.co      541         [ #  # ]:UBC           0 :             if (!flushAndSendFeedback(conn, &now))
                                542                 :              0 :                 goto error;
  269 michael@paquier.xyz       543                 :UNC           0 :             stop_reason = STREAM_STOP_END_OF_WAL;
 2657 simon@2ndQuadrant.co      544                 :UBC           0 :             time_to_abort = true;
                                545                 :              0 :             break;
                                546                 :                :         }
                                547                 :                : 
 2657 simon@2ndQuadrant.co      548                 :CBC          91 :         output_written_lsn = Max(cur_record_lsn, output_written_lsn);
                                549                 :                : 
 3680 rhaas@postgresql.org      550                 :             91 :         bytes_left = r - hdr_len;
                                551                 :             91 :         bytes_written = 0;
                                552                 :                : 
                                553                 :                :         /* signal that a fsync is needed */
 3622 heikki.linnakangas@i      554                 :             91 :         output_needs_fsync = true;
                                555                 :                : 
 3680 rhaas@postgresql.org      556         [ +  + ]:            182 :         while (bytes_left)
                                557                 :                :         {
                                558                 :                :             int         ret;
                                559                 :                : 
                                560                 :            182 :             ret = write(outfd,
                                561                 :             91 :                         copybuf + hdr_len + bytes_written,
                                562                 :                :                         bytes_left);
                                563                 :                : 
                                564         [ -  + ]:             91 :             if (ret < 0)
                                565                 :                :             {
  879 peter@eisentraut.org      566                 :UBC           0 :                 pg_log_error("could not write %d bytes to log file \"%s\": %m",
                                567                 :                :                              bytes_left, outfile);
 3680 rhaas@postgresql.org      568                 :              0 :                 goto error;
                                569                 :                :             }
                                570                 :                : 
                                571                 :                :             /* Write was successful, advance our position */
 3680 rhaas@postgresql.org      572                 :CBC          91 :             bytes_written += ret;
                                573                 :             91 :             bytes_left -= ret;
                                574                 :                :         }
                                575                 :                : 
                                576         [ -  + ]:             91 :         if (write(outfd, "\n", 1) != 1)
                                577                 :                :         {
  879 peter@eisentraut.org      578                 :UBC           0 :             pg_log_error("could not write %d bytes to log file \"%s\": %m",
                                579                 :                :                          1, outfile);
 3680 rhaas@postgresql.org      580                 :              0 :             goto error;
                                581                 :                :         }
                                582                 :                : 
 2657 simon@2ndQuadrant.co      583   [ +  +  +  + ]:CBC          91 :         if (endpos != InvalidXLogRecPtr && cur_record_lsn == endpos)
                                584                 :                :         {
                                585                 :                :             /* endpos was exactly the record we just processed, we're done */
                                586         [ -  + ]:              5 :             if (!flushAndSendFeedback(conn, &now))
 2657 simon@2ndQuadrant.co      587                 :UBC           0 :                 goto error;
  269 michael@paquier.xyz       588                 :GNC           5 :             stop_reason = STREAM_STOP_END_OF_WAL;
 2657 simon@2ndQuadrant.co      589                 :CBC           5 :             time_to_abort = true;
                                590                 :              5 :             break;
                                591                 :                :         }
                                592                 :                :     }
                                593                 :                : 
                                594                 :                :     /* Clean up connection state if stream has been aborted */
  269 michael@paquier.xyz       595         [ +  + ]:GNC          15 :     if (time_to_abort)
                                596                 :              8 :         prepareToTerminate(conn, endpos, stop_reason, cur_record_lsn);
                                597                 :                : 
 3680 rhaas@postgresql.org      598                 :CBC          15 :     res = PQgetResult(conn);
 2657 simon@2ndQuadrant.co      599         [ +  + ]:             15 :     if (PQresultStatus(res) == PGRES_COPY_OUT)
                                600                 :                :     {
 1432 noah@leadboat.com         601                 :              8 :         PQclear(res);
                                602                 :                : 
                                603                 :                :         /*
                                604                 :                :          * We're doing a client-initiated clean exit and have sent CopyDone to
                                605                 :                :          * the server. Drain any messages, so we don't miss a last-minute
                                606                 :                :          * ErrorResponse. The walsender stops generating XLogData records once
                                607                 :                :          * it sees CopyDone, so expect this to finish quickly. After CopyDone,
                                608                 :                :          * it's too late for sendFeedback(), even if this were to take a long
                                609                 :                :          * time. Hence, use synchronous-mode PQgetCopyData().
                                610                 :                :          */
                                611                 :                :         while (1)
                                612                 :             52 :         {
                                613                 :                :             int         r;
                                614                 :                : 
                                615         [ +  + ]:             60 :             if (copybuf != NULL)
                                616                 :                :             {
                                617                 :             59 :                 PQfreemem(copybuf);
                                618                 :             59 :                 copybuf = NULL;
                                619                 :                :             }
                                620                 :             60 :             r = PQgetCopyData(conn, &copybuf, 0);
                                621         [ +  + ]:             60 :             if (r == -1)
                                622                 :              8 :                 break;
                                623         [ -  + ]:             52 :             if (r == -2)
                                624                 :                :             {
 1432 noah@leadboat.com         625                 :UBC           0 :                 pg_log_error("could not read COPY data: %s",
                                626                 :                :                              PQerrorMessage(conn));
                                627                 :              0 :                 time_to_abort = false;  /* unclean exit */
                                628                 :              0 :                 goto error;
                                629                 :                :             }
                                630                 :                :         }
                                631                 :                : 
 1432 noah@leadboat.com         632                 :CBC           8 :         res = PQgetResult(conn);
                                633                 :                :     }
                                634         [ +  + ]:             15 :     if (PQresultStatus(res) != PGRES_COMMAND_OK)
                                635                 :                :     {
 1840 peter@eisentraut.org      636                 :              6 :         pg_log_error("unexpected termination of replication stream: %s",
                                637                 :                :                      PQresultErrorMessage(res));
 3680 rhaas@postgresql.org      638                 :              6 :         goto error;
                                639                 :                :     }
                                640                 :              9 :     PQclear(res);
                                641                 :                : 
                                642   [ +  -  -  + ]:              9 :     if (outfd != -1 && strcmp(outfile, "-") != 0)
                                643                 :                :     {
 2607 tgl@sss.pgh.pa.us         644                 :UBC           0 :         TimestampTz t = feGetCurrentTimestamp();
                                645                 :                : 
                                646                 :                :         /* no need to jump to error on failure here, we're finishing anyway */
 3680 rhaas@postgresql.org      647                 :              0 :         OutputFsync(t);
                                648                 :                : 
                                649         [ #  # ]:              0 :         if (close(outfd) != 0)
 1840 peter@eisentraut.org      650                 :              0 :             pg_log_error("could not close file \"%s\": %m", outfile);
                                651                 :                :     }
 3680 rhaas@postgresql.org      652                 :CBC           9 :     outfd = -1;
                                653                 :             23 : error:
 3632 heikki.linnakangas@i      654         [ -  + ]:             23 :     if (copybuf != NULL)
                                655                 :                :     {
 3632 heikki.linnakangas@i      656                 :UBC           0 :         PQfreemem(copybuf);
                                657                 :              0 :         copybuf = NULL;
                                658                 :                :     }
 3680 rhaas@postgresql.org      659                 :CBC          23 :     destroyPQExpBuffer(query);
                                660                 :             23 :     PQfinish(conn);
                                661                 :             23 :     conn = NULL;
                                662                 :                : }
                                663                 :                : 
                                664                 :                : /*
                                665                 :                :  * Unfortunately we can't do sensible signal handling on windows...
                                666                 :                :  */
                                667                 :                : #ifndef WIN32
                                668                 :                : 
                                669                 :                : /*
                                670                 :                :  * When SIGINT/SIGTERM are caught, just tell the system to exit at the next
                                671                 :                :  * possible moment.
                                672                 :                :  */
                                673                 :                : static void
  578 tgl@sss.pgh.pa.us         674                 :              1 : sigexit_handler(SIGNAL_ARGS)
                                675                 :                : {
  269 michael@paquier.xyz       676                 :GNC           1 :     stop_reason = STREAM_STOP_SIGNAL;
 3680 rhaas@postgresql.org      677                 :CBC           1 :     time_to_abort = true;
                                678                 :              1 : }
                                679                 :                : 
                                680                 :                : /*
                                681                 :                :  * Trigger the output file to be reopened.
                                682                 :                :  */
                                683                 :                : static void
  578 tgl@sss.pgh.pa.us         684                 :UBC           0 : sighup_handler(SIGNAL_ARGS)
                                685                 :                : {
 3680 rhaas@postgresql.org      686                 :              0 :     output_reopen = true;
                                687                 :              0 : }
                                688                 :                : #endif
                                689                 :                : 
                                690                 :                : 
                                691                 :                : int
 3680 rhaas@postgresql.org      692                 :CBC          55 : main(int argc, char **argv)
                                693                 :                : {
                                694                 :                :     static struct option long_options[] = {
                                695                 :                : /* general options */
                                696                 :                :         {"file", required_argument, NULL, 'f'},
                                697                 :                :         {"fsync-interval", required_argument, NULL, 'F'},
                                698                 :                :         {"no-loop", no_argument, NULL, 'n'},
                                699                 :                :         {"verbose", no_argument, NULL, 'v'},
                                700                 :                :         {"two-phase", no_argument, NULL, 't'},
                                701                 :                :         {"version", no_argument, NULL, 'V'},
                                702                 :                :         {"help", no_argument, NULL, '?'},
                                703                 :                : /* connection options */
                                704                 :                :         {"dbname", required_argument, NULL, 'd'},
                                705                 :                :         {"host", required_argument, NULL, 'h'},
                                706                 :                :         {"port", required_argument, NULL, 'p'},
                                707                 :                :         {"username", required_argument, NULL, 'U'},
                                708                 :                :         {"no-password", no_argument, NULL, 'w'},
                                709                 :                :         {"password", no_argument, NULL, 'W'},
                                710                 :                : /* replication options */
                                711                 :                :         {"startpos", required_argument, NULL, 'I'},
                                712                 :                :         {"endpos", required_argument, NULL, 'E'},
                                713                 :                :         {"option", required_argument, NULL, 'o'},
                                714                 :                :         {"plugin", required_argument, NULL, 'P'},
                                715                 :                :         {"status-interval", required_argument, NULL, 's'},
                                716                 :                :         {"slot", required_argument, NULL, 'S'},
                                717                 :                : /* action */
                                718                 :                :         {"create-slot", no_argument, NULL, 1},
                                719                 :                :         {"start", no_argument, NULL, 2},
                                720                 :                :         {"drop-slot", no_argument, NULL, 3},
                                721                 :                :         {"if-not-exists", no_argument, NULL, 4},
                                722                 :                :         {NULL, 0, NULL, 0}
                                723                 :                :     };
                                724                 :                :     int         c;
                                725                 :                :     int         option_index;
                                726                 :                :     uint32      hi,
                                727                 :                :                 lo;
                                728                 :                :     char       *db_name;
                                729                 :                : 
 1840 peter@eisentraut.org      730                 :             55 :     pg_logging_init(argv[0]);
 3680 rhaas@postgresql.org      731                 :             55 :     progname = get_progname(argv[0]);
 3030 alvherre@alvh.no-ip.      732                 :             55 :     set_pglocale_pgservice(argv[0], PG_TEXTDOMAIN("pg_basebackup"));
                                733                 :                : 
 3680 rhaas@postgresql.org      734         [ +  + ]:             55 :     if (argc > 1)
                                735                 :                :     {
                                736   [ +  +  -  + ]:             54 :         if (strcmp(argv[1], "--help") == 0 || strcmp(argv[1], "-?") == 0)
                                737                 :                :         {
                                738                 :              1 :             usage();
                                739                 :              1 :             exit(0);
                                740                 :                :         }
                                741         [ +  - ]:             53 :         else if (strcmp(argv[1], "-V") == 0 ||
                                742         [ +  + ]:             53 :                  strcmp(argv[1], "--version") == 0)
                                743                 :                :         {
                                744                 :              1 :             puts("pg_recvlogical (PostgreSQL) " PG_VERSION);
                                745                 :              1 :             exit(0);
                                746                 :                :         }
                                747                 :                :     }
                                748                 :                : 
  489 peter@eisentraut.org      749                 :            321 :     while ((c = getopt_long(argc, argv, "E:f:F:ntvd:h:p:U:wWI:o:P:s:S:",
 3680 rhaas@postgresql.org      750         [ +  + ]:            321 :                             long_options, &option_index)) != -1)
                                751                 :                :     {
                                752   [ +  -  +  +  :            269 :         switch (c)
                                     -  +  -  -  -  
                                     -  -  -  +  +  
                                     +  -  +  +  +  
                                           +  -  + ]
                                753                 :                :         {
                                754                 :                : /* general options */
                                755                 :             24 :             case 'f':
                                756                 :             24 :                 outfile = pg_strdup(optarg);
                                757                 :             24 :                 break;
 3612 andres@anarazel.de        758                 :UBC           0 :             case 'F':
  995 michael@paquier.xyz       759         [ #  # ]:              0 :                 if (!option_parse_int(optarg, "-F/--fsync-interval", 0,
                                760                 :                :                                       INT_MAX / 1000,
                                761                 :                :                                       &fsync_interval))
 3612 andres@anarazel.de        762                 :              0 :                     exit(1);
  995 michael@paquier.xyz       763                 :              0 :                 fsync_interval *= 1000;
 3612 andres@anarazel.de        764                 :              0 :                 break;
 3680 rhaas@postgresql.org      765                 :CBC          23 :             case 'n':
                                766                 :             23 :                 noloop = 1;
                                767                 :             23 :                 break;
 1019 akapila@postgresql.o      768                 :              2 :             case 't':
                                769                 :              2 :                 two_phase = true;
                                770                 :              2 :                 break;
  489 peter@eisentraut.org      771                 :UBC           0 :             case 'v':
                                772                 :              0 :                 verbose++;
                                773                 :              0 :                 break;
                                774                 :                : /* connection options */
 3680 rhaas@postgresql.org      775                 :CBC          50 :             case 'd':
                                776                 :             50 :                 dbname = pg_strdup(optarg);
                                777                 :             50 :                 break;
 3680 rhaas@postgresql.org      778                 :UBC           0 :             case 'h':
                                779                 :              0 :                 dbhost = pg_strdup(optarg);
                                780                 :              0 :                 break;
                                781                 :              0 :             case 'p':
                                782                 :              0 :                 dbport = pg_strdup(optarg);
                                783                 :              0 :                 break;
                                784                 :              0 :             case 'U':
                                785                 :              0 :                 dbuser = pg_strdup(optarg);
                                786                 :              0 :                 break;
                                787                 :              0 :             case 'w':
                                788                 :              0 :                 dbgetpassword = -1;
                                789                 :              0 :                 break;
                                790                 :              0 :             case 'W':
                                791                 :              0 :                 dbgetpassword = 1;
                                792                 :              0 :                 break;
                                793                 :                : /* replication options */
 3612 andres@anarazel.de        794                 :              0 :             case 'I':
                                795         [ #  # ]:              0 :                 if (sscanf(optarg, "%X/%X", &hi, &lo) != 2)
  737 tgl@sss.pgh.pa.us         796                 :              0 :                     pg_fatal("could not parse start position \"%s\"", optarg);
 3612 andres@anarazel.de        797                 :              0 :                 startpos = ((uint64) hi) << 32 | lo;
                                798                 :              0 :                 break;
 2657 simon@2ndQuadrant.co      799                 :CBC           8 :             case 'E':
                                800         [ -  + ]:              8 :                 if (sscanf(optarg, "%X/%X", &hi, &lo) != 2)
  737 tgl@sss.pgh.pa.us         801                 :UBC           0 :                     pg_fatal("could not parse end position \"%s\"", optarg);
 2657 simon@2ndQuadrant.co      802                 :CBC           8 :                 endpos = ((uint64) hi) << 32 | lo;
                                803                 :              8 :                 break;
 3680 rhaas@postgresql.org      804                 :             40 :             case 'o':
                                805                 :                :                 {
 3631 bruce@momjian.us          806                 :             40 :                     char       *data = pg_strdup(optarg);
                                807                 :             40 :                     char       *val = strchr(data, '=');
                                808                 :                : 
 3680 rhaas@postgresql.org      809         [ +  - ]:             40 :                     if (val != NULL)
                                810                 :                :                     {
                                811                 :                :                         /* remove =; separate data from val */
                                812                 :             40 :                         *val = '\0';
                                813                 :             40 :                         val++;
                                814                 :                :                     }
                                815                 :                : 
                                816                 :             40 :                     noptions += 1;
 3631 bruce@momjian.us          817                 :             40 :                     options = pg_realloc(options, sizeof(char *) * noptions * 2);
                                818                 :                : 
 3680 rhaas@postgresql.org      819                 :             40 :                     options[(noptions - 1) * 2] = data;
                                820                 :             40 :                     options[(noptions - 1) * 2 + 1] = val;
                                821                 :                :                 }
                                822                 :                : 
                                823                 :             40 :                 break;
                                824                 :             21 :             case 'P':
                                825                 :             21 :                 plugin = pg_strdup(optarg);
                                826                 :             21 :                 break;
 3680 rhaas@postgresql.org      827                 :UBC           0 :             case 's':
  995 michael@paquier.xyz       828         [ #  # ]:              0 :                 if (!option_parse_int(optarg, "-s/--status-interval", 0,
                                829                 :                :                                       INT_MAX / 1000,
                                830                 :                :                                       &standby_message_timeout))
 3680 rhaas@postgresql.org      831                 :              0 :                     exit(1);
  995 michael@paquier.xyz       832                 :              0 :                 standby_message_timeout *= 1000;
 3680 rhaas@postgresql.org      833                 :              0 :                 break;
 3680 rhaas@postgresql.org      834                 :CBC          51 :             case 'S':
                                835                 :             51 :                 replication_slot = pg_strdup(optarg);
                                836                 :             51 :                 break;
                                837                 :                : /* action */
                                838                 :             23 :             case 1:
                                839                 :             23 :                 do_create_slot = true;
                                840                 :             23 :                 break;
                                841                 :             25 :             case 2:
                                842                 :             25 :                 do_start_slot = true;
                                843                 :             25 :                 break;
                                844                 :              1 :             case 3:
                                845                 :              1 :                 do_drop_slot = true;
                                846                 :              1 :                 break;
 3199 andres@anarazel.de        847                 :UBC           0 :             case 4:
                                848                 :              0 :                 slot_exists_ok = true;
                                849                 :              0 :                 break;
                                850                 :                : 
 3680 rhaas@postgresql.org      851                 :CBC           1 :             default:
                                852                 :                :                 /* getopt_long already emitted a complaint */
  737 tgl@sss.pgh.pa.us         853                 :              1 :                 pg_log_error_hint("Try \"%s --help\" for more information.", progname);
 3680 rhaas@postgresql.org      854                 :              1 :                 exit(1);
                                855                 :                :         }
                                856                 :                :     }
                                857                 :                : 
                                858                 :                :     /*
                                859                 :                :      * Any non-option arguments?
                                860                 :                :      */
                                861         [ -  + ]:             52 :     if (optind < argc)
                                862                 :                :     {
 1840 peter@eisentraut.org      863                 :UBC           0 :         pg_log_error("too many command-line arguments (first is \"%s\")",
                                864                 :                :                      argv[optind]);
  737 tgl@sss.pgh.pa.us         865                 :              0 :         pg_log_error_hint("Try \"%s --help\" for more information.", progname);
 3680 rhaas@postgresql.org      866                 :              0 :         exit(1);
                                867                 :                :     }
                                868                 :                : 
                                869                 :                :     /*
                                870                 :                :      * Required arguments
                                871                 :                :      */
 3680 rhaas@postgresql.org      872         [ +  + ]:CBC          52 :     if (replication_slot == NULL)
                                873                 :                :     {
 1840 peter@eisentraut.org      874                 :              1 :         pg_log_error("no slot specified");
  737 tgl@sss.pgh.pa.us         875                 :              1 :         pg_log_error_hint("Try \"%s --help\" for more information.", progname);
 3680 rhaas@postgresql.org      876                 :              1 :         exit(1);
                                877                 :                :     }
                                878                 :                : 
                                879   [ +  +  +  + ]:             51 :     if (do_start_slot && outfile == NULL)
                                880                 :                :     {
 1840 peter@eisentraut.org      881                 :              1 :         pg_log_error("no target file specified");
  737 tgl@sss.pgh.pa.us         882                 :              1 :         pg_log_error_hint("Try \"%s --help\" for more information.", progname);
 3680 rhaas@postgresql.org      883                 :              1 :         exit(1);
                                884                 :                :     }
                                885                 :                : 
                                886   [ +  +  +  + ]:             50 :     if (!do_drop_slot && dbname == NULL)
                                887                 :                :     {
 1840 peter@eisentraut.org      888                 :              1 :         pg_log_error("no database specified");
  737 tgl@sss.pgh.pa.us         889                 :              1 :         pg_log_error_hint("Try \"%s --help\" for more information.", progname);
 3680 rhaas@postgresql.org      890                 :              1 :         exit(1);
                                891                 :                :     }
                                892                 :                : 
                                893   [ +  +  +  +  :             49 :     if (!do_drop_slot && !do_create_slot && !do_start_slot)
                                              +  + ]
                                894                 :                :     {
 1840 peter@eisentraut.org      895                 :              1 :         pg_log_error("at least one action needs to be specified");
  737 tgl@sss.pgh.pa.us         896                 :              1 :         pg_log_error_hint("Try \"%s --help\" for more information.", progname);
 3680 rhaas@postgresql.org      897                 :              1 :         exit(1);
                                898                 :                :     }
                                899                 :                : 
                                900   [ +  +  +  -  :             48 :     if (do_drop_slot && (do_create_slot || do_start_slot))
                                              -  + ]
                                901                 :                :     {
 1840 peter@eisentraut.org      902                 :UBC           0 :         pg_log_error("cannot use --create-slot or --start together with --drop-slot");
  737 tgl@sss.pgh.pa.us         903                 :              0 :         pg_log_error_hint("Try \"%s --help\" for more information.", progname);
 3680 rhaas@postgresql.org      904                 :              0 :         exit(1);
                                905                 :                :     }
                                906                 :                : 
 3533 andres@anarazel.de        907   [ -  +  -  -  :CBC          48 :     if (startpos != InvalidXLogRecPtr && (do_create_slot || do_drop_slot))
                                              -  - ]
                                908                 :                :     {
 1840 peter@eisentraut.org      909                 :UBC           0 :         pg_log_error("cannot use --create-slot or --drop-slot together with --startpos");
  737 tgl@sss.pgh.pa.us         910                 :              0 :         pg_log_error_hint("Try \"%s --help\" for more information.", progname);
 3680 rhaas@postgresql.org      911                 :              0 :         exit(1);
                                912                 :                :     }
                                913                 :                : 
 2657 simon@2ndQuadrant.co      914   [ +  +  -  + ]:CBC          48 :     if (endpos != InvalidXLogRecPtr && !do_start_slot)
                                915                 :                :     {
 1840 peter@eisentraut.org      916                 :UBC           0 :         pg_log_error("--endpos may only be specified with --start");
  737 tgl@sss.pgh.pa.us         917                 :              0 :         pg_log_error_hint("Try \"%s --help\" for more information.", progname);
 2657 simon@2ndQuadrant.co      918                 :              0 :         exit(1);
                                919                 :                :     }
                                920                 :                : 
 1019 akapila@postgresql.o      921   [ +  +  +  + ]:CBC          48 :     if (two_phase && !do_create_slot)
                                922                 :                :     {
                                923                 :              1 :         pg_log_error("--two-phase may only be specified with --create-slot");
  737 tgl@sss.pgh.pa.us         924                 :              1 :         pg_log_error_hint("Try \"%s --help\" for more information.", progname);
 1019 akapila@postgresql.o      925                 :              1 :         exit(1);
                                926                 :                :     }
                                927                 :                : 
                                928                 :                :     /*
                                929                 :                :      * Obtain a connection to server.  Notably, if we need a password, we want
                                930                 :                :      * to collect it from the user immediately.
                                931                 :                :      */
 3483 andres@anarazel.de        932                 :             47 :     conn = GetConnection();
                                933         [ -  + ]:             47 :     if (!conn)
                                934                 :                :         /* Error message already written in GetConnection() */
 3483 andres@anarazel.de        935                 :UBC           0 :         exit(1);
 1933 peter@eisentraut.org      936                 :CBC          47 :     atexit(disconnect_atexit);
                                937                 :                : 
                                938                 :                :     /*
                                939                 :                :      * Trap signals.  (Don't do this until after the initial password prompt,
                                940                 :                :      * if one is needed, in GetConnection.)
                                941                 :                :      */
                                942                 :                : #ifndef WIN32
  578 dgustafsson@postgres      943                 :             47 :     pqsignal(SIGINT, sigexit_handler);
                                944                 :             47 :     pqsignal(SIGTERM, sigexit_handler);
  875 tgl@sss.pgh.pa.us         945                 :             47 :     pqsignal(SIGHUP, sighup_handler);
                                946                 :                : #endif
                                947                 :                : 
                                948                 :                :     /*
                                949                 :                :      * Run IDENTIFY_SYSTEM to make sure we connected using a database specific
                                950                 :                :      * replication connection.
                                951                 :                :      */
 3483 andres@anarazel.de        952         [ -  + ]:             47 :     if (!RunIdentifySystem(conn, NULL, NULL, NULL, &db_name))
 1933 peter@eisentraut.org      953                 :UBC           0 :         exit(1);
                                954                 :                : 
 3483 andres@anarazel.de        955         [ -  + ]:CBC          47 :     if (db_name == NULL)
  737 tgl@sss.pgh.pa.us         956                 :UBC           0 :         pg_fatal("could not establish database-specific replication connection");
                                957                 :                : 
                                958                 :                :     /*
                                959                 :                :      * Set umask so that directories/files are created with the same
                                960                 :                :      * permissions as directories/files in the source data directory.
                                961                 :                :      *
                                962                 :                :      * pg_mode_mask is set to owner-only by default and then updated in
                                963                 :                :      * GetConnection() where we get the mode from the server-side with
                                964                 :                :      * RetrieveDataDirCreatePerm() and then call SetDataDirectoryCreatePerm().
                                965                 :                :      */
 2199 sfrost@snowman.net        966                 :CBC          47 :     umask(pg_mode_mask);
                                967                 :                : 
                                968                 :                :     /* Drop a replication slot. */
 3680 rhaas@postgresql.org      969         [ +  + ]:             47 :     if (do_drop_slot)
                                970                 :                :     {
                                971         [ -  + ]:              1 :         if (verbose)
 1840 peter@eisentraut.org      972                 :UBC           0 :             pg_log_info("dropping replication slot \"%s\"", replication_slot);
                                973                 :                : 
 3483 andres@anarazel.de        974         [ -  + ]:CBC           1 :         if (!DropReplicationSlot(conn, replication_slot))
 1933 peter@eisentraut.org      975                 :UBC           0 :             exit(1);
                                976                 :                :     }
                                977                 :                : 
                                978                 :                :     /* Create a replication slot. */
 3680 rhaas@postgresql.org      979         [ +  + ]:CBC          47 :     if (do_create_slot)
                                980                 :                :     {
                                981         [ -  + ]:             23 :         if (verbose)
 1840 peter@eisentraut.org      982                 :UBC           0 :             pg_log_info("creating replication slot \"%s\"", replication_slot);
                                983                 :                : 
 2392 peter_e@gmx.net           984         [ -  + ]:CBC          23 :         if (!CreateReplicationSlot(conn, replication_slot, plugin, false,
                                985                 :                :                                    false, false, slot_exists_ok, two_phase))
 1933 peter@eisentraut.org      986                 :UBC           0 :             exit(1);
 3199 andres@anarazel.de        987                 :CBC          23 :         startpos = InvalidXLogRecPtr;
                                988                 :                :     }
                                989                 :                : 
 3680 rhaas@postgresql.org      990         [ +  + ]:             47 :     if (!do_start_slot)
 1933 peter@eisentraut.org      991                 :             24 :         exit(0);
                                992                 :                : 
                                993                 :                :     /* Stream loop */
                                994                 :                :     while (true)
                                995                 :                :     {
 3483 andres@anarazel.de        996                 :UBC           0 :         StreamLogicalLog();
 3680 rhaas@postgresql.org      997         [ +  + ]:CBC          23 :         if (time_to_abort)
                                998                 :                :         {
                                999                 :                :             /*
                               1000                 :                :              * We've been Ctrl-C'ed or reached an exit limit condition. That's
                               1001                 :                :              * not an error, so exit without an errorcode.
                               1002                 :                :              */
 1933 peter@eisentraut.org     1003                 :              8 :             exit(0);
                               1004                 :                :         }
 3680 rhaas@postgresql.org     1005         [ +  - ]:             15 :         else if (noloop)
  737 tgl@sss.pgh.pa.us        1006                 :             15 :             pg_fatal("disconnected");
                               1007                 :                :         else
                               1008                 :                :         {
                               1009                 :                :             /* translator: check source for value for %d */
 1840 peter@eisentraut.org     1010                 :UBC           0 :             pg_log_info("disconnected; waiting %d seconds to try again",
                               1011                 :                :                         RECONNECT_SLEEP_TIME);
 3680 rhaas@postgresql.org     1012                 :              0 :             pg_usleep(RECONNECT_SLEEP_TIME * 1000000);
                               1013                 :                :         }
                               1014                 :                :     }
                               1015                 :                : }
                               1016                 :                : 
                               1017                 :                : /*
                               1018                 :                :  * Fsync our output data, and send a feedback message to the server.  Returns
                               1019                 :                :  * true if successful, false otherwise.
                               1020                 :                :  *
                               1021                 :                :  * If successful, *now is updated to the current timestamp just before sending
                               1022                 :                :  * feedback.
                               1023                 :                :  */
                               1024                 :                : static bool
 2657 simon@2ndQuadrant.co     1025                 :CBC           8 : flushAndSendFeedback(PGconn *conn, TimestampTz *now)
                               1026                 :                : {
                               1027                 :                :     /* flush data to disk, so that we send a recent flush pointer */
                               1028         [ -  + ]:              8 :     if (!OutputFsync(*now))
 2657 simon@2ndQuadrant.co     1029                 :UBC           0 :         return false;
 2657 simon@2ndQuadrant.co     1030                 :CBC           8 :     *now = feGetCurrentTimestamp();
                               1031         [ -  + ]:              8 :     if (!sendFeedback(conn, *now, true, false))
 2657 simon@2ndQuadrant.co     1032                 :UBC           0 :         return false;
                               1033                 :                : 
 2657 simon@2ndQuadrant.co     1034                 :CBC           8 :     return true;
                               1035                 :                : }
                               1036                 :                : 
                               1037                 :                : /*
                               1038                 :                :  * Try to inform the server about our upcoming demise, but don't wait around or
                               1039                 :                :  * retry on failure.
                               1040                 :                :  */
                               1041                 :                : static void
  269 michael@paquier.xyz      1042                 :GNC           8 : prepareToTerminate(PGconn *conn, XLogRecPtr endpos, StreamStopReason reason,
                               1043                 :                :                    XLogRecPtr lsn)
                               1044                 :                : {
 2657 simon@2ndQuadrant.co     1045                 :CBC           8 :     (void) PQputCopyEnd(conn, NULL);
                               1046                 :              8 :     (void) PQflush(conn);
                               1047                 :                : 
                               1048         [ -  + ]:              8 :     if (verbose)
                               1049                 :                :     {
  269 michael@paquier.xyz      1050   [ #  #  #  #  :UNC           0 :         switch (reason)
                                                 # ]
                               1051                 :                :         {
                               1052                 :              0 :             case STREAM_STOP_SIGNAL:
                               1053                 :              0 :                 pg_log_info("received interrupt signal, exiting");
                               1054                 :              0 :                 break;
                               1055                 :              0 :             case STREAM_STOP_KEEPALIVE:
                               1056                 :              0 :                 pg_log_info("end position %X/%X reached by keepalive",
                               1057                 :                :                             LSN_FORMAT_ARGS(endpos));
                               1058                 :              0 :                 break;
                               1059                 :              0 :             case STREAM_STOP_END_OF_WAL:
                               1060         [ #  # ]:              0 :                 Assert(!XLogRecPtrIsInvalid(lsn));
                               1061                 :              0 :                 pg_log_info("end position %X/%X reached by WAL record at %X/%X",
                               1062                 :                :                             LSN_FORMAT_ARGS(endpos), LSN_FORMAT_ARGS(lsn));
                               1063                 :              0 :                 break;
                               1064                 :              0 :             case STREAM_STOP_NONE:
                               1065                 :              0 :                 Assert(false);
                               1066                 :                :                 break;
                               1067                 :                :         }
                               1068                 :                :     }
 2657 simon@2ndQuadrant.co     1069                 :CBC           8 : }
        

Generated by: LCOV version 2.1-beta2-3-g6141622