LCOV - differential code coverage report
Current view: top level - src/bin/pg_basebackup - receivelog.c (source / functions) Coverage Total Hit UNC LBC UIC UBC GBC GIC GNC CBC EUB ECB DUB DCB
Current: Differential Code Coverage HEAD vs 15 Lines: 63.9 % 424 271 8 23 94 28 24 152 28 67 91 167 10 14
Current Date: 2023-04-08 17:13:01 Functions: 94.1 % 17 16 1 16 1 16
Baseline: 15 Line coverage date bins:
Baseline Date: 2023-04-08 15:09:40 (180,240] days: 77.8 % 36 28 8 28
Legend: Lines: hit not hit (240..) days: 62.6 % 388 243 23 94 28 24 152 67 91 167
Function coverage date bins:
(240..) days: 47.1 % 34 16 1 16 1 16

 Age         Owner                  TLA  Line data    Source code
                                  1                 : /*-------------------------------------------------------------------------
                                  2                 :  *
                                  3                 :  * receivelog.c - receive WAL files using the streaming
                                  4                 :  *                replication protocol.
                                  5                 :  *
                                  6                 :  * Author: Magnus Hagander <magnus@hagander.net>
                                  7                 :  *
                                  8                 :  * Portions Copyright (c) 1996-2023, PostgreSQL Global Development Group
                                  9                 :  *
                                 10                 :  * IDENTIFICATION
                                 11                 :  *        src/bin/pg_basebackup/receivelog.c
                                 12                 :  *-------------------------------------------------------------------------
                                 13                 :  */
                                 14                 : 
                                 15                 : #include "postgres_fe.h"
                                 16                 : 
                                 17                 : #include <sys/select.h>
                                 18                 : #include <sys/stat.h>
                                 19                 : #include <unistd.h>
                                 20                 : 
                                 21                 : #include "access/xlog_internal.h"
                                 22                 : #include "common/file_utils.h"
                                 23                 : #include "common/logging.h"
                                 24                 : #include "libpq-fe.h"
                                 25                 : #include "receivelog.h"
                                 26                 : #include "streamutil.h"
                                 27                 : 
                                 28                 : /* currently open WAL file */
                                 29                 : static Walfile *walfile = NULL;
                                 30                 : static bool reportFlushPosition = false;
                                 31                 : static XLogRecPtr lastFlushPosition = InvalidXLogRecPtr;
                                 32                 : 
                                 33                 : static bool still_sending = true;   /* feedback still needs to be sent? */
                                 34                 : 
                                 35                 : static PGresult *HandleCopyStream(PGconn *conn, StreamCtl *stream,
                                 36                 :                                   XLogRecPtr *stoppos);
                                 37                 : static int  CopyStreamPoll(PGconn *conn, long timeout_ms, pgsocket stop_socket);
                                 38                 : static int  CopyStreamReceive(PGconn *conn, long timeout, pgsocket stop_socket,
                                 39                 :                               char **buffer);
                                 40                 : static bool ProcessKeepaliveMsg(PGconn *conn, StreamCtl *stream, char *copybuf,
                                 41                 :                                 int len, XLogRecPtr blockpos, TimestampTz *last_status);
                                 42                 : static bool ProcessXLogDataMsg(PGconn *conn, StreamCtl *stream, char *copybuf, int len,
                                 43                 :                                XLogRecPtr *blockpos);
                                 44                 : static PGresult *HandleEndOfCopyStream(PGconn *conn, StreamCtl *stream, char *copybuf,
                                 45                 :                                        XLogRecPtr blockpos, XLogRecPtr *stoppos);
                                 46                 : static bool CheckCopyStreamStop(PGconn *conn, StreamCtl *stream, XLogRecPtr blockpos);
                                 47                 : static long CalculateCopyStreamSleeptime(TimestampTz now, int standby_message_timeout,
                                 48                 :                                          TimestampTz last_status);
                                 49                 : 
                                 50                 : static bool ReadEndOfStreamingResult(PGresult *res, XLogRecPtr *startpos,
 1418 tgl                        51 ECB             :                                      uint32 *timeline);
                                 52                 : 
                                 53                 : static bool
 2359 magnus                     54 GIC          41 : mark_file_as_archived(StreamCtl *stream, const char *fname)
                                 55                 : {
 2359 magnus                     56 ECB             :     Walfile    *f;
                                 57                 :     static char tmppath[MAXPGPATH];
                                 58                 : 
 2359 magnus                     59 CBC          41 :     snprintf(tmppath, sizeof(tmppath), "archive_status/%s.done",
                                 60                 :              fname);
 3018 andres                     61 ECB             : 
  202 rhaas                      62 GNC          41 :     f = stream->walmethod->ops->open_for_write(stream->walmethod, tmppath,
                                 63                 :                                                NULL, 0);
 2359 magnus                     64 GBC          41 :     if (f == NULL)
                                 65                 :     {
 1469 peter                      66 UBC           0 :         pg_log_error("could not create archive status file \"%s\": %s",
                                 67                 :                      tmppath, GetLastWalMethodError(stream->walmethod));
 3018 andres                     68 UIC           0 :         return false;
 3018 andres                     69 ECB             :     }
                                 70                 : 
  202 rhaas                      71 GNC          41 :     if (stream->walmethod->ops->close(f, CLOSE_NORMAL) != 0)
                                 72                 :     {
  508 tgl                        73 UBC           0 :         pg_log_error("could not close archive status file \"%s\": %s",
                                 74                 :                      tmppath, GetLastWalMethodError(stream->walmethod));
  508 tgl                        75 UIC           0 :         return false;
  508 tgl                        76 ECB             :     }
                                 77                 : 
 3018 andres                     78 GIC          41 :     return true;
                                 79                 : }
                                 80                 : 
                                 81                 : /*
                                 82                 :  * Open a new WAL file in the specified directory.
                                 83                 :  *
                                 84                 :  * Returns true if OK; on failure, returns false after printing an error msg.
                                 85                 :  * On success, 'walfile' is set to the opened WAL file.
                                 86                 :  *
 2380 tgl                        87 ECB             :  * The file will be padded to 16Mb with zeroes.
                                 88                 :  */
                                 89                 : static bool
 2585 magnus                     90 GIC         105 : open_walfile(StreamCtl *stream, XLogRecPtr startpoint)
                                 91                 : {
                                 92                 :     Walfile    *f;
                                 93                 :     char       *fn;
                                 94                 :     ssize_t     size;
 3941 heikki.linnakangas         95 ECB             :     XLogSegNo   segno;
                                 96                 :     char        walfile_name[MAXPGPATH];
 4183 magnus                     97                 : 
 2028 andres                     98 GIC         105 :     XLByteToSeg(startpoint, segno, WalSegSz);
  202 rhaas                      99 GNC         105 :     XLogFileName(walfile_name, stream->timeline, segno, WalSegSz);
 4183 magnus                    100 ECB             : 
                                101                 :     /* Note that this considers the compression used if necessary */
  202 rhaas                     102 GNC         105 :     fn = stream->walmethod->ops->get_file_name(stream->walmethod,
                                103                 :                                                walfile_name,
                                104             105 :                                                stream->partial_suffix);
                                105                 : 
                                106                 :     /*
                                107                 :      * When streaming to files, if an existing file exists we verify that it's
                                108                 :      * either empty (just created), or a complete WalSegSz segment (in which
                                109                 :      * case it has been created and padded). Anything else indicates a corrupt
                                110                 :      * file. Compressed files have no need for padding, so just ignore this
                                111                 :      * case.
                                112                 :      *
                                113                 :      * When streaming to tar, no file with this name will exist before, so we
                                114                 :      * never have to verify a size.
 4175 magnus                    115 ECB             :      */
  202 rhaas                     116 GNC         203 :     if (stream->walmethod->compression_algorithm == PG_COMPRESSION_NONE &&
                                117              98 :         stream->walmethod->ops->existsfile(stream->walmethod, fn))
 4175 magnus                    118 EUB             :     {
  202 rhaas                     119 UNC           0 :         size = stream->walmethod->ops->get_file_size(stream->walmethod, fn);
 2359 magnus                    120 UIC           0 :         if (size < 0)
 2380 tgl                       121 EUB             :         {
 1469 peter                     122 UIC           0 :             pg_log_error("could not get size of write-ahead log file \"%s\": %s",
                                123                 :                          fn, GetLastWalMethodError(stream->walmethod));
  622 michael                   124 UBC           0 :             pg_free(fn);
 2359 magnus                    125 UIC           0 :             return false;
 2359 magnus                    126 EUB             :         }
 2028 andres                    127 UIC           0 :         if (size == WalSegSz)
                                128                 :         {
 2359 magnus                    129 EUB             :             /* Already padded file. Open it for use */
  202 rhaas                     130 UNC           0 :             f = stream->walmethod->ops->open_for_write(stream->walmethod, walfile_name, stream->partial_suffix, 0);
 2359 magnus                    131 UIC           0 :             if (f == NULL)
 2380 tgl                       132 EUB             :             {
 1469 peter                     133 UIC           0 :                 pg_log_error("could not open existing write-ahead log file \"%s\": %s",
                                134                 :                              fn, GetLastWalMethodError(stream->walmethod));
  622 michael                   135 UBC           0 :                 pg_free(fn);
 2380 tgl                       136 UIC           0 :                 return false;
                                137                 :             }
                                138                 : 
 2359 magnus                    139 EUB             :             /* fsync file in case of a previous crash */
  202 rhaas                     140 UNC           0 :             if (stream->walmethod->ops->sync(f) != 0)
 2359 magnus                    141 EUB             :             {
  366 tgl                       142 UIC           0 :                 pg_log_error("could not fsync existing write-ahead log file \"%s\": %s",
                                143                 :                              fn, GetLastWalMethodError(stream->walmethod));
  202 rhaas                     144 UNC           0 :                 stream->walmethod->ops->close(f, CLOSE_UNLINK);
 1350 peter                     145 UIC           0 :                 exit(1);
                                146                 :             }
 4175 magnus                    147 EUB             : 
 2359 magnus                    148 UBC           0 :             walfile = f;
  622 michael                   149               0 :             pg_free(fn);
 2359 magnus                    150 UIC           0 :             return true;
 2359 magnus                    151 EUB             :         }
 2359 magnus                    152 UIC           0 :         if (size != 0)
                                153                 :         {
 2380 tgl                       154 EUB             :             /* if write didn't set errno, assume problem is no disk space */
 2380 tgl                       155 UBC           0 :             if (errno == 0)
                                156               0 :                 errno = ENOSPC;
  609 peter                     157 UIC           0 :             pg_log_error(ngettext("write-ahead log file \"%s\" has %zd byte, should be 0 or %d",
                                158                 :                                   "write-ahead log file \"%s\" has %zd bytes, should be 0 or %d",
                                159                 :                                   size),
  609 peter                     160 EUB             :                          fn, size, WalSegSz);
  622 michael                   161 UBC           0 :             pg_free(fn);
 3734 heikki.linnakangas        162 UIC           0 :             return false;
                                163                 :         }
                                164                 :         /* File existed and was empty, so fall through and open */
                                165                 :     }
                                166                 : 
                                167                 :     /* No file existed, so create one */
 2383 peter_e                   168 ECB             : 
  202 rhaas                     169 GNC         105 :     f = stream->walmethod->ops->open_for_write(stream->walmethod,
                                170                 :                                                walfile_name,
                                171             105 :                                                stream->partial_suffix,
                                172                 :                                                WalSegSz);
 2359 magnus                    173 GIC         105 :     if (f == NULL)
 4175 magnus                    174 ECB             :     {
 1469 peter                     175 UIC           0 :         pg_log_error("could not open write-ahead log file \"%s\": %s",
                                176                 :                      fn, GetLastWalMethodError(stream->walmethod));
  622 michael                   177               0 :         pg_free(fn);
 3734 heikki.linnakangas        178 UBC           0 :         return false;
 4175 magnus                    179 EUB             :     }
                                180                 : 
  622 michael                   181 GIC         105 :     pg_free(fn);
 3734 heikki.linnakangas        182 CBC         105 :     walfile = f;
                                183             105 :     return true;
 4183 magnus                    184 ECB             : }
                                185                 : 
                                186                 : /*
                                187                 :  * Close the current WAL file (if open), and rename it to the correct
                                188                 :  * filename if it's complete. On failure, prints an error message to stderr
                                189                 :  * and returns false, otherwise returns true.
                                190                 :  */
                                191                 : static bool
 2585 magnus                    192 GIC         144 : close_walfile(StreamCtl *stream, XLogRecPtr pos)
 4175 magnus                    193 ECB             : {
                                194                 :     char       *fn;
                                195                 :     off_t       currpos;
                                196                 :     int         r;
                                197                 :     char        walfile_name[MAXPGPATH];
                                198                 : 
 2359 magnus                    199 GIC         144 :     if (walfile == NULL)
 3734 heikki.linnakangas        200              39 :         return true;
 4175 magnus                    201 ECB             : 
  202 rhaas                     202 GNC         105 :     strlcpy(walfile_name, walfile->pathname, MAXPGPATH);
                                203             105 :     currpos = walfile->currpos;
                                204                 : 
  569 michael                   205 ECB             :     /* Note that this considers the compression used if necessary */
  202 rhaas                     206 GNC         105 :     fn = stream->walmethod->ops->get_file_name(stream->walmethod,
                                207                 :                                                walfile_name,
                                208             105 :                                                stream->partial_suffix);
  569 michael                   209 ECB             : 
 2359 magnus                    210 CBC         105 :     if (stream->partial_suffix)
                                211                 :     {
 2028 andres                    212 GIC          12 :         if (currpos == WalSegSz)
  202 rhaas                     213 GNC           6 :             r = stream->walmethod->ops->close(walfile, CLOSE_NORMAL);
 2359 magnus                    214 ECB             :         else
                                215                 :         {
  569 michael                   216 CBC           6 :             pg_log_info("not renaming \"%s\", segment is not complete", fn);
  202 rhaas                     217 GNC           6 :             r = stream->walmethod->ops->close(walfile, CLOSE_NO_RENAME);
 2359 magnus                    218 ECB             :         }
                                219                 :     }
 2359 magnus                    220 EUB             :     else
  202 rhaas                     221 GNC          93 :         r = stream->walmethod->ops->close(walfile, CLOSE_NORMAL);
                                222                 : 
 2359 magnus                    223 GBC         105 :     walfile = NULL;
 2359 magnus                    224 EUB             : 
 2359 magnus                    225 GIC         105 :     if (r != 0)
                                226                 :     {
 1469 peter                     227 LBC           0 :         pg_log_error("could not close file \"%s\": %s",
                                228                 :                      fn, GetLastWalMethodError(stream->walmethod));
                                229                 : 
  569 michael                   230 UIC           0 :         pg_free(fn);
 4175 magnus                    231               0 :         return false;
                                232                 :     }
                                233                 : 
  569 michael                   234 GIC         105 :     pg_free(fn);
  569 michael                   235 ECB             : 
                                236                 :     /*
                                237                 :      * Mark file as archived if requested by the caller - pg_basebackup needs
 3018 andres                    238                 :      * to do so as files can otherwise get archived again after promotion of a
 3018 andres                    239 EUB             :      * new node. This is in line with walreceiver.c always doing a
                                240                 :      * XLogArchiveForceDone() after a complete segment.
                                241                 :      */
 2028 andres                    242 CBC         105 :     if (currpos == WalSegSz && stream->mark_done)
 3018 andres                    243 ECB             :     {
                                244                 :         /* writes error message if failed */
  202 rhaas                     245 GNC          40 :         if (!mark_file_as_archived(stream, walfile_name))
 3018 andres                    246 UIC           0 :             return false;
                                247                 :     }
                                248                 : 
 3355 rhaas                     249 GIC         105 :     lastFlushPosition = pos;
 4175 magnus                    250             105 :     return true;
 4175 magnus                    251 ECB             : }
                                252                 : 
                                253                 : 
                                254                 : /*
                                255                 :  * Check if a timeline history file exists.
                                256                 :  */
                                257                 : static bool
 2585 magnus                    258 GIC         101 : existsTimeLineHistoryFile(StreamCtl *stream)
 3734 heikki.linnakangas        259 ECB             : {
                                260                 :     char        histfname[MAXFNAMELEN];
                                261                 : 
                                262                 :     /*
                                263                 :      * Timeline 1 never has a history file. We treat that as if it existed,
                                264                 :      * since we never need to stream it.
                                265                 :      */
 2585 magnus                    266 GIC         101 :     if (stream->timeline == 1)
 3734 heikki.linnakangas        267              99 :         return true;
 3734 heikki.linnakangas        268 ECB             : 
 2585 magnus                    269 GIC           2 :     TLHistoryFileName(histfname, stream->timeline);
 3734 heikki.linnakangas        270 ECB             : 
  202 rhaas                     271 GNC           2 :     return stream->walmethod->ops->existsfile(stream->walmethod, histfname);
                                272                 : }
                                273                 : 
                                274                 : static bool
 2585 magnus                    275 GIC           2 : writeTimeLineHistoryFile(StreamCtl *stream, char *filename, char *content)
                                276                 : {
 3734 heikki.linnakangas        277               2 :     int         size = strlen(content);
 3734 heikki.linnakangas        278 ECB             :     char        histfname[MAXFNAMELEN];
 2359 magnus                    279                 :     Walfile    *f;
                                280                 : 
 3734 heikki.linnakangas        281 EUB             :     /*
                                282                 :      * Check that the server's idea of how timeline history files should be
                                283                 :      * named matches ours.
                                284                 :      */
 2585 magnus                    285 GIC           2 :     TLHistoryFileName(histfname, stream->timeline);
 3734 heikki.linnakangas        286 CBC           2 :     if (strcmp(histfname, filename) != 0)
                                287                 :     {
 1469 peter                     288 LBC           0 :         pg_log_error("server reported unexpected history file name for timeline %u: %s",
                                289                 :                      stream->timeline, filename);
 3734 heikki.linnakangas        290 UBC           0 :         return false;
                                291                 :     }
 3734 heikki.linnakangas        292 EUB             : 
  202 rhaas                     293 GNC           2 :     f = stream->walmethod->ops->open_for_write(stream->walmethod,
                                294                 :                                                histfname, ".tmp", 0);
 2359 magnus                    295 GIC           2 :     if (f == NULL)
 3734 heikki.linnakangas        296 ECB             :     {
 1469 peter                     297 UIC           0 :         pg_log_error("could not create timeline history file \"%s\": %s",
                                298                 :                      histfname, GetLastWalMethodError(stream->walmethod));
 3734 heikki.linnakangas        299               0 :         return false;
                                300                 :     }
                                301                 : 
  202 rhaas                     302 GNC           2 :     if ((int) stream->walmethod->ops->write(f, content, size) != size)
                                303                 :     {
 1469 peter                     304 UBC           0 :         pg_log_error("could not write timeline history file \"%s\": %s",
                                305                 :                      histfname, GetLastWalMethodError(stream->walmethod));
 3734 heikki.linnakangas        306 EUB             : 
                                307                 :         /*
                                308                 :          * If we fail to make the file, delete it to release disk space
 3734 heikki.linnakangas        309 ECB             :          */
  202 rhaas                     310 UNC           0 :         stream->walmethod->ops->close(f, CLOSE_UNLINK);
 3734 heikki.linnakangas        311 EUB             : 
 3734 heikki.linnakangas        312 UIC           0 :         return false;
 3734 heikki.linnakangas        313 EUB             :     }
                                314                 : 
  202 rhaas                     315 GNC           2 :     if (stream->walmethod->ops->close(f, CLOSE_NORMAL) != 0)
                                316                 :     {
 1469 peter                     317 LBC           0 :         pg_log_error("could not close file \"%s\": %s",
                                318                 :                      histfname, GetLastWalMethodError(stream->walmethod));
 3734 heikki.linnakangas        319 UIC           0 :         return false;
 3734 heikki.linnakangas        320 ECB             :     }
 3734 heikki.linnakangas        321 EUB             : 
                                322                 :     /* Maintain archive_status, check close_walfile() for details. */
 2585 magnus                    323 GIC           2 :     if (stream->mark_done)
 3018 andres                    324 ECB             :     {
                                325                 :         /* writes error message if failed */
 2359 magnus                    326 GIC           1 :         if (!mark_file_as_archived(stream, histfname))
 3018 andres                    327 UIC           0 :             return false;
                                328                 :     }
                                329                 : 
 3734 heikki.linnakangas        330 GIC           2 :     return true;
 3734 heikki.linnakangas        331 ECB             : }
                                332                 : 
                                333                 : /*
 3805                           334                 :  * Send a Standby Status Update message to server.
                                335                 :  */
                                336                 : static bool
 2236 tgl                       337 CBC          99 : sendFeedback(PGconn *conn, XLogRecPtr blockpos, TimestampTz now, bool replyRequested)
 3805 heikki.linnakangas        338 ECB             : {
                                339                 :     char        replybuf[1 + 8 + 8 + 8 + 8 + 1];
 3602 bruce                     340 CBC          99 :     int         len = 0;
 3805 heikki.linnakangas        341 ECB             : 
 3805 heikki.linnakangas        342 GIC          99 :     replybuf[len] = 'r';
 3805 heikki.linnakangas        343 CBC          99 :     len += 1;
 2118 tgl                       344              99 :     fe_sendint64(blockpos, &replybuf[len]); /* write */
 3805 heikki.linnakangas        345              99 :     len += 8;
 3355 rhaas                     346              99 :     if (reportFlushPosition)
 2118 tgl                       347              95 :         fe_sendint64(lastFlushPosition, &replybuf[len]);    /* flush */
 3355 rhaas                     348 ECB             :     else
 2118 tgl                       349 CBC           4 :         fe_sendint64(InvalidXLogRecPtr, &replybuf[len]);    /* flush */
 3805 heikki.linnakangas        350              99 :     len += 8;
 3260 bruce                     351 GIC          99 :     fe_sendint64(InvalidXLogRecPtr, &replybuf[len]);    /* apply */
 3805 heikki.linnakangas        352 CBC          99 :     len += 8;
 3260 bruce                     353 GIC          99 :     fe_sendint64(now, &replybuf[len]);  /* sendTime */
 3805 heikki.linnakangas        354 GBC          99 :     len += 8;
 2118 tgl                       355 GIC          99 :     replybuf[len] = replyRequested ? 1 : 0; /* replyRequested */
 3805 heikki.linnakangas        356 GBC          99 :     len += 1;
                                357                 : 
 3805 heikki.linnakangas        358 GIC          99 :     if (PQputCopyData(conn, replybuf, len) <= 0 || PQflush(conn))
 3805 heikki.linnakangas        359 ECB             :     {
 1469 peter                     360 UIC           0 :         pg_log_error("could not send feedback packet: %s",
                                361                 :                      PQerrorMessage(conn));
 3805 heikki.linnakangas        362               0 :         return false;
                                363                 :     }
                                364                 : 
 3805 heikki.linnakangas        365 GIC          99 :     return true;
                                366                 : }
                                367                 : 
                                368                 : /*
 3670 heikki.linnakangas        369 ECB             :  * Check that the server version we're connected to is supported by
                                370                 :  * ReceiveXlogStream().
                                371                 :  *
                                372                 :  * If it's not, an error message is printed to stderr, and false is returned.
                                373                 :  */
                                374                 : bool
 3670 heikki.linnakangas        375 GIC         218 : CheckServerVersionForStreaming(PGconn *conn)
                                376                 : {
                                377                 :     int         minServerMajor,
                                378                 :                 maxServerMajor;
                                379                 :     int         serverMajor;
                                380                 : 
 3670 heikki.linnakangas        381 ECB             :     /*
                                382                 :      * The message format used in streaming replication changed in 9.3, so we
                                383                 :      * cannot stream from older servers. And we don't support servers newer
                                384                 :      * than the client; it might work, but we don't know, so err on the safe
                                385                 :      * side.
 3670 heikki.linnakangas        386 EUB             :      */
 3670 heikki.linnakangas        387 GIC         218 :     minServerMajor = 903;
 3670 heikki.linnakangas        388 GBC         218 :     maxServerMajor = PG_VERSION_NUM / 100;
 3670 heikki.linnakangas        389 GIC         218 :     serverMajor = PQserverVersion(conn) / 100;
 3260 simon                     390             218 :     if (serverMajor < minServerMajor)
 3670 heikki.linnakangas        391 EUB             :     {
 3670 heikki.linnakangas        392 UIC           0 :         const char *serverver = PQparameterStatus(conn, "server_version");
 3602 bruce                     393 ECB             : 
 1469 peter                     394 UIC           0 :         pg_log_error("incompatible server version %s; client does not support streaming from server versions older than %s",
 1418 tgl                       395 EUB             :                      serverver ? serverver : "'unknown'",
                                396                 :                      "9.3");
 3260 simon                     397 UBC           0 :         return false;
                                398                 :     }
 3260 simon                     399 GIC         218 :     else if (serverMajor > maxServerMajor)
 3260 simon                     400 EUB             :     {
 3260 simon                     401 UIC           0 :         const char *serverver = PQparameterStatus(conn, "server_version");
 3260 simon                     402 ECB             : 
 1469 peter                     403 UIC           0 :         pg_log_error("incompatible server version %s; client does not support streaming from server versions newer than %s",
                                404                 :                      serverver ? serverver : "'unknown'",
                                405                 :                      PG_VERSION);
 3670 heikki.linnakangas        406               0 :         return false;
                                407                 :     }
 3670 heikki.linnakangas        408 GIC         218 :     return true;
                                409                 : }
                                410                 : 
                                411                 : /*
                                412                 :  * Receive a log stream starting at the specified position.
                                413                 :  *
                                414                 :  * Individual parameters are passed through the StreamCtl structure.
                                415                 :  *
                                416                 :  * If sysidentifier is specified, validate that both the system
                                417                 :  * identifier and the timeline matches the specified ones
                                418                 :  * (by sending an extra IDENTIFY_SYSTEM command)
                                419                 :  *
                                420                 :  * All received segments will be written to the directory
                                421                 :  * specified by basedir. This will also fetch any missing timeline history
                                422                 :  * files.
                                423                 :  *
                                424                 :  * The stream_stop callback will be called every time data
                                425                 :  * is received, and whenever a segment is completed. If it returns
                                426                 :  * true, the streaming will stop and the function
                                427                 :  * return. As long as it returns false, streaming will continue
                                428                 :  * indefinitely.
                                429                 :  *
                                430                 :  * If stream_stop() checks for external input, stop_socket should be set to
                                431                 :  * the FD it checks.  This will allow such input to be detected promptly
                                432                 :  * rather than after standby_message_timeout (which might be indefinite).
                                433                 :  * Note that signals will interrupt waits for input as well, but that is
                                434                 :  * race-y since a signal received while busy won't interrupt the wait.
                                435                 :  *
                                436                 :  * standby_message_timeout controls how often we send a message
                                437                 :  * back to the primary letting it know our progress, in milliseconds.
                                438                 :  * Zero means no messages are sent.
                                439                 :  * This message will only contain the write location, and never
                                440                 :  * flush or replay.
                                441                 :  *
                                442                 :  * If 'partial_suffix' is not NULL, files are initially created with the
                                443                 :  * given suffix, and the suffix is removed once the file is finished. That
                                444                 :  * allows you to tell the difference between partial and completed files,
                                445                 :  * so that you can continue later where you left.
                                446                 :  *
 3064 fujii                     447 ECB             :  * If 'synchronous' is true, the received WAL is flushed as soon as written,
                                448                 :  * otherwise only when the WAL file is closed.
                                449                 :  *
                                450                 :  * Note: The WAL location *must* be at a log segment start!
                                451                 :  */
                                452                 : bool
 2585 magnus                    453 GIC         100 : ReceiveXlogStream(PGconn *conn, StreamCtl *stream)
                                454                 : {
                                455                 :     char        query[128];
                                456                 :     char        slotcmd[128];
                                457                 :     PGresult   *res;
 3734 heikki.linnakangas        458 ECB             :     XLogRecPtr  stoppos;
 4183 magnus                    459 EUB             : 
                                460                 :     /*
                                461                 :      * The caller should've checked the server version already, but doesn't do
                                462                 :      * any harm to check it here too.
                                463                 :      */
 3670 heikki.linnakangas        464 GIC         100 :     if (!CheckServerVersionForStreaming(conn))
 3736 heikki.linnakangas        465 UIC           0 :         return false;
                                466                 : 
                                467                 :     /*
                                468                 :      * Decide whether we want to report the flush position. If we report the
                                469                 :      * flush position, the primary will know what WAL we'll possibly
                                470                 :      * re-request, and it can then remove older WAL safely. We must always do
                                471                 :      * that when we are using slots.
 2414 simon                     472 ECB             :      *
                                473                 :      * Reporting the flush position makes one eligible as a synchronous
                                474                 :      * replica. People shouldn't include generic names in
 2153 bruce                     475                 :      * synchronous_standby_names, but we've protected them against it so far,
                                476                 :      * so let's continue to do so unless specifically requested.
                                477                 :      */
 2274 magnus                    478 GIC         100 :     if (stream->replication_slot != NULL)
 3355 rhaas                     479 ECB             :     {
 3355 rhaas                     480 CBC          95 :         reportFlushPosition = true;
 2274 magnus                    481 GIC          95 :         sprintf(slotcmd, "SLOT \"%s\" ", stream->replication_slot);
 3355 rhaas                     482 ECB             :     }
                                483                 :     else
                                484                 :     {
 2414 simon                     485 GIC           5 :         if (stream->synchronous)
 2414 simon                     486 CBC           1 :             reportFlushPosition = true;
                                487                 :         else
                                488               4 :             reportFlushPosition = false;
 3355 rhaas                     489 GIC           5 :         slotcmd[0] = 0;
                                490                 :     }
                                491                 : 
 2585 magnus                    492             100 :     if (stream->sysidentifier != NULL)
                                493                 :     {
  586 michael                   494 CBC         100 :         char       *sysidentifier = NULL;
                                495                 :         TimeLineID  servertli;
  586 michael                   496 EUB             : 
                                497                 :         /*
                                498                 :          * Get the server system identifier and timeline, and validate them.
                                499                 :          */
  586 michael                   500 CBC         100 :         if (!RunIdentifySystem(conn, &sysidentifier, &servertli, NULL, NULL))
                                501                 :         {
  586 michael                   502 UBC           0 :             pg_free(sysidentifier);
 4183 magnus                    503               0 :             return false;
 4183 magnus                    504 EUB             :         }
                                505                 : 
  586 michael                   506 CBC         100 :         if (strcmp(stream->sysidentifier, sysidentifier) != 0)
                                507                 :         {
 1469 peter                     508 LBC           0 :             pg_log_error("system identifier does not match between base backup and streaming connection");
  586 michael                   509 UIC           0 :             pg_free(sysidentifier);
 4183 magnus                    510 UBC           0 :             return false;
                                511                 :         }
  586 michael                   512 GBC         100 :         pg_free(sysidentifier);
                                513                 : 
  586 michael                   514 GIC         100 :         if (stream->timeline > servertli)
                                515                 :         {
 1469 peter                     516 UIC           0 :             pg_log_error("starting timeline %u is not present in the server",
                                517                 :                          stream->timeline);
 4183 magnus                    518               0 :             return false;
                                519                 :         }
 4183 magnus                    520 ECB             :     }
                                521                 : 
                                522                 :     /*
                                523                 :      * initialize flush position to starting point, it's the caller's
                                524                 :      * responsibility that that's sane.
                                525                 :      */
 2585 magnus                    526 GIC         100 :     lastFlushPosition = stream->startpos;
                                527                 : 
                                528                 :     while (1)
                                529                 :     {
 3734 heikki.linnakangas        530 ECB             :         /*
                                531                 :          * Fetch the timeline history file for this timeline, if we don't have
 2359 magnus                    532                 :          * it already. When streaming log to tar, this will always return
                                533                 :          * false, as we are never streaming into an existing file and
                                534                 :          * therefore there can be no pre-existing timeline history file.
                                535                 :          */
 2585 magnus                    536 GIC         101 :         if (!existsTimeLineHistoryFile(stream))
 3734 heikki.linnakangas        537 EUB             :         {
 2585 magnus                    538 GIC           2 :             snprintf(query, sizeof(query), "TIMELINE_HISTORY %u", stream->timeline);
 3734 heikki.linnakangas        539 GBC           2 :             res = PQexec(conn, query);
                                540               2 :             if (PQresultStatus(res) != PGRES_TUPLES_OK)
                                541                 :             {
                                542                 :                 /* FIXME: we might send it ok, but get an error */
 1469 peter                     543 UIC           0 :                 pg_log_error("could not send replication command \"%s\": %s",
                                544                 :                              "TIMELINE_HISTORY", PQresultErrorMessage(res));
 3734 heikki.linnakangas        545               0 :                 PQclear(res);
                                546               0 :                 return false;
 3734 heikki.linnakangas        547 ECB             :             }
                                548                 : 
 3734 heikki.linnakangas        549 EUB             :             /*
                                550                 :              * The response to TIMELINE_HISTORY is a single row result set
                                551                 :              * with two fields: filename and content
                                552                 :              */
 3734 heikki.linnakangas        553 GIC           2 :             if (PQnfields(res) != 2 || PQntuples(res) != 1)
 3734 heikki.linnakangas        554 ECB             :             {
 1469 peter                     555 UIC           0 :                 pg_log_warning("unexpected response to TIMELINE_HISTORY command: got %d rows and %d fields, expected %d rows and %d fields",
                                556                 :                                PQntuples(res), PQnfields(res), 1, 2);
                                557                 :             }
 3734 heikki.linnakangas        558 ECB             : 
                                559                 :             /* Write the history file to disk */
 2585 magnus                    560 GIC           2 :             writeTimeLineHistoryFile(stream,
                                561                 :                                      PQgetvalue(res, 0, 0),
                                562                 :                                      PQgetvalue(res, 0, 1));
                                563                 : 
 3734 heikki.linnakangas        564               2 :             PQclear(res);
 3734 heikki.linnakangas        565 ECB             :         }
 3734 heikki.linnakangas        566 EUB             : 
                                567                 :         /*
                                568                 :          * Before we start streaming from the requested location, check if the
 3602 bruce                     569 ECB             :          * callback tells us to stop here.
                                570                 :          */
 2585 magnus                    571 CBC         101 :         if (stream->stream_stop(stream->startpos, stream->timeline, false))
 3734 heikki.linnakangas        572 UIC           0 :             return true;
 3734 heikki.linnakangas        573 ECB             : 
                                574                 :         /* Initiate the replication stream at specified location */
 3355 rhaas                     575 GIC         101 :         snprintf(query, sizeof(query), "START_REPLICATION %s%X/%X TIMELINE %u",
 3355 rhaas                     576 ECB             :                  slotcmd,
  775 peter                     577 GIC         101 :                  LSN_FORMAT_ARGS(stream->startpos),
 2585 magnus                    578 ECB             :                  stream->timeline);
 3734 heikki.linnakangas        579 CBC         101 :         res = PQexec(conn, query);
 3734 heikki.linnakangas        580 GIC         101 :         if (PQresultStatus(res) != PGRES_COPY_BOTH)
 3734 heikki.linnakangas        581 ECB             :         {
 1469 peter                     582 GIC           2 :             pg_log_error("could not send replication command \"%s\": %s",
                                583                 :                          "START_REPLICATION", PQresultErrorMessage(res));
 3734 heikki.linnakangas        584 CBC           2 :             PQclear(res);
                                585               2 :             return false;
 3734 heikki.linnakangas        586 EUB             :         }
 3923 magnus                    587 GIC          99 :         PQclear(res);
                                588                 : 
                                589                 :         /* Stream the WAL */
 2585                           590              99 :         res = HandleCopyStream(conn, stream, &stoppos);
 3632 rhaas                     591              99 :         if (res == NULL)
 3734 heikki.linnakangas        592 UIC           0 :             goto error;
                                593                 : 
                                594                 :         /*
                                595                 :          * Streaming finished.
                                596                 :          *
                                597                 :          * There are two possible reasons for that: a controlled shutdown, or
 3602 bruce                     598 ECB             :          * we reached the end of the current timeline. In case of
 3734 heikki.linnakangas        599                 :          * end-of-timeline, the server sends a result set after Copy has
                                600                 :          * finished, containing information about the next timeline. Read
                                601                 :          * that, and restart streaming from the next timeline. In case of
                                602                 :          * controlled shutdown, stop here.
                                603                 :          */
 3734 heikki.linnakangas        604 GIC          99 :         if (PQresultStatus(res) == PGRES_TUPLES_OK)
                                605               1 :         {
                                606                 :             /*
                                607                 :              * End-of-timeline. Read the next timeline's ID and starting
                                608                 :              * position. Usually, the starting position will match the end of
                                609                 :              * the previous timeline, but there are corner cases like if the
                                610                 :              * server had sent us half of a WAL record, when it was promoted.
                                611                 :              * The new timeline will begin at the end of the last complete
 3623 heikki.linnakangas        612 ECB             :              * record in that case, overlapping the partial WAL record on the
 2133 rhaas                     613                 :              * old timeline.
 3734 heikki.linnakangas        614                 :              */
 3734 heikki.linnakangas        615 EUB             :             uint32      newtimeline;
                                616                 :             bool        parsed;
                                617                 : 
 2585 magnus                    618 CBC           1 :             parsed = ReadEndOfStreamingResult(res, &stream->startpos, &newtimeline);
 3734 heikki.linnakangas        619 GIC           1 :             PQclear(res);
 3623 heikki.linnakangas        620 GBC           1 :             if (!parsed)
 3623 heikki.linnakangas        621 UIC           0 :                 goto error;
 3734 heikki.linnakangas        622 EUB             : 
                                623                 :             /* Sanity check the values the server gave us */
 2585 magnus                    624 CBC           1 :             if (newtimeline <= stream->timeline)
                                625                 :             {
 1469 peter                     626 UBC           0 :                 pg_log_error("server reported unexpected next timeline %u, following timeline %u",
                                627                 :                              newtimeline, stream->timeline);
 3623 heikki.linnakangas        628 UIC           0 :                 goto error;
 3623 heikki.linnakangas        629 EUB             :             }
 2585 magnus                    630 GIC           1 :             if (stream->startpos > stoppos)
                                631                 :             {
 1469 peter                     632 UIC           0 :                 pg_log_error("server stopped streaming timeline %u at %X/%X, but reported next timeline %u to begin at %X/%X",
  775 peter                     633 ECB             :                              stream->timeline, LSN_FORMAT_ARGS(stoppos),
                                634                 :                              newtimeline, LSN_FORMAT_ARGS(stream->startpos));
 3734 heikki.linnakangas        635 UIC           0 :                 goto error;
 3734 heikki.linnakangas        636 EUB             :             }
                                637                 : 
                                638                 :             /* Read the final result, which should be CommandComplete. */
 3734 heikki.linnakangas        639 GBC           1 :             res = PQgetResult(conn);
 3734 heikki.linnakangas        640 GIC           1 :             if (PQresultStatus(res) != PGRES_COMMAND_OK)
 3734 heikki.linnakangas        641 ECB             :             {
 1469 peter                     642 UIC           0 :                 pg_log_error("unexpected termination of replication stream: %s",
                                643                 :                              PQresultErrorMessage(res));
 3172 fujii                     644               0 :                 PQclear(res);
 3734 heikki.linnakangas        645               0 :                 goto error;
                                646                 :             }
 3734 heikki.linnakangas        647 CBC           1 :             PQclear(res);
 3734 heikki.linnakangas        648 ECB             : 
                                649                 :             /*
 3602 bruce                     650                 :              * Loop back to start streaming from the new timeline. Always
                                651                 :              * start streaming at the beginning of a segment.
 3734 heikki.linnakangas        652                 :              */
 2585 magnus                    653 GIC           1 :             stream->timeline = newtimeline;
 2028 andres                    654 CBC           1 :             stream->startpos = stream->startpos -
 2028 andres                    655 GIC           1 :                 XLogSegmentOffset(stream->startpos, WalSegSz);
 3734 heikki.linnakangas        656               1 :             continue;
                                657                 :         }
                                658              98 :         else if (PQresultStatus(res) == PGRES_COMMAND_OK)
                                659                 :         {
 3172 fujii                     660              97 :             PQclear(res);
                                661                 : 
 3734 heikki.linnakangas        662 ECB             :             /*
                                663                 :              * End of replication (ie. controlled shut down of the server).
                                664                 :              *
                                665                 :              * Check if the callback thinks it's OK to stop here. If not,
 3734 heikki.linnakangas        666 EUB             :              * complain.
                                667                 :              */
 2585 magnus                    668 GIC          97 :             if (stream->stream_stop(stoppos, stream->timeline, false))
 3734 heikki.linnakangas        669              97 :                 return true;
                                670                 :             else
                                671                 :             {
 1469 peter                     672 UIC           0 :                 pg_log_error("replication stream was terminated before stop point");
 3734 heikki.linnakangas        673 LBC           0 :                 goto error;
                                674                 :             }
 3734 heikki.linnakangas        675 ECB             :         }
                                676                 :         else
                                677                 :         {
                                678                 :             /* Server returned an error. */
 1469 peter                     679 GIC           1 :             pg_log_error("unexpected termination of replication stream: %s",
 1469 peter                     680 ECB             :                          PQresultErrorMessage(res));
 3172 fujii                     681 CBC           1 :             PQclear(res);
 3734 heikki.linnakangas        682 GBC           1 :             goto error;
                                683                 :         }
 4183 magnus                    684 ECB             :     }
                                685                 : 
 3734 heikki.linnakangas        686 GIC           1 : error:
  202 rhaas                     687 GNC           1 :     if (walfile != NULL && stream->walmethod->ops->close(walfile, CLOSE_NO_RENAME) != 0)
 1469 peter                     688 UIC           0 :         pg_log_error("could not close file \"%s\": %s",
                                689                 :                      walfile->pathname, GetLastWalMethodError(stream->walmethod));
 2359 magnus                    690 GIC           1 :     walfile = NULL;
 3734 heikki.linnakangas        691               1 :     return false;
                                692                 : }
 3734 heikki.linnakangas        693 ECB             : 
                                694                 : /*
                                695                 :  * Helper function to parse the result set returned by server after streaming
                                696                 :  * has finished. On failure, prints an error to stderr and returns false.
                                697                 :  */
                                698                 : static bool
 3623 heikki.linnakangas        699 GIC           1 : ReadEndOfStreamingResult(PGresult *res, XLogRecPtr *startpos, uint32 *timeline)
                                700                 : {
                                701                 :     uint32      startpos_xlogid,
                                702                 :                 startpos_xrecoff;
                                703                 : 
                                704                 :     /*----------
                                705                 :      * The result set consists of one row and two columns, e.g:
                                706                 :      *
                                707                 :      *  next_tli | next_tli_startpos
                                708                 :      * ----------+-------------------
                                709                 :      *         4 | 0/9949AE0
 3623 heikki.linnakangas        710 ECB             :      *
                                711                 :      * next_tli is the timeline ID of the next timeline after the one that
 2158 peter_e                   712 EUB             :      * just finished streaming. next_tli_startpos is the WAL location where
                                713                 :      * the server switched to it.
 3623 heikki.linnakangas        714                 :      *----------
                                715                 :      */
 3623 heikki.linnakangas        716 GIC           1 :     if (PQnfields(res) < 2 || PQntuples(res) != 1)
 3623 heikki.linnakangas        717 ECB             :     {
 1469 peter                     718 LBC           0 :         pg_log_error("unexpected result set after end-of-timeline: got %d rows and %d fields, expected %d rows and %d fields",
                                719                 :                      PQntuples(res), PQnfields(res), 1, 2);
 3623 heikki.linnakangas        720 UIC           0 :         return false;
 3623 heikki.linnakangas        721 EUB             :     }
                                722                 : 
 3623 heikki.linnakangas        723 GBC           1 :     *timeline = atoi(PQgetvalue(res, 0, 0));
 3623 heikki.linnakangas        724 GIC           1 :     if (sscanf(PQgetvalue(res, 0, 1), "%X/%X", &startpos_xlogid,
 3623 heikki.linnakangas        725 ECB             :                &startpos_xrecoff) != 2)
                                726                 :     {
 1469 peter                     727 LBC           0 :         pg_log_error("could not parse next timeline's starting point \"%s\"",
                                728                 :                      PQgetvalue(res, 0, 1));
 3623 heikki.linnakangas        729 UIC           0 :         return false;
                                730                 :     }
 3623 heikki.linnakangas        731 GIC           1 :     *startpos = ((uint64) startpos_xlogid << 32) | startpos_xrecoff;
                                732                 : 
                                733               1 :     return true;
                                734                 : }
                                735                 : 
                                736                 : /*
                                737                 :  * The main loop of ReceiveXlogStream. Handles the COPY stream after
                                738                 :  * initiating streaming with the START_REPLICATION command.
 3734 heikki.linnakangas        739 ECB             :  *
                                740                 :  * If the COPY ends (not necessarily successfully) due a message from the
                                741                 :  * server, returns a PGresult and sets *stoppos to the last byte written.
 3632 rhaas                     742                 :  * On any other sort of error, returns NULL.
 3734 heikki.linnakangas        743                 :  */
 3632 rhaas                     744                 : static PGresult *
 2585 magnus                    745 GIC          99 : HandleCopyStream(PGconn *conn, StreamCtl *stream,
 2585 magnus                    746 ECB             :                  XLogRecPtr *stoppos)
                                747                 : {
 3734 heikki.linnakangas        748 GIC          99 :     char       *copybuf = NULL;
 2236 tgl                       749 CBC          99 :     TimestampTz last_status = -1;
 2585 magnus                    750 GIC          99 :     XLogRecPtr  blockpos = stream->startpos;
                                751                 : 
 3168 fujii                     752              99 :     still_sending = true;
                                753                 : 
                                754                 :     while (1)
 4183 magnus                    755            3532 :     {
                                756                 :         int         r;
 2236 tgl                       757 ECB             :         TimestampTz now;
 3201 fujii                     758 EUB             :         long        sleeptime;
                                759                 : 
 4183 magnus                    760 ECB             :         /*
                                761                 :          * Check if we should continue streaming, or abort at this point.
                                762                 :          */
  941 peter                     763 GIC        3631 :         if (!CheckCopyStreamStop(conn, stream, blockpos))
 3166 fujii                     764 UIC           0 :             goto error;
                                765                 : 
 3166 fujii                     766 CBC        3631 :         now = feGetCurrentTimestamp();
                                767                 : 
 3166 fujii                     768 EUB             :         /*
 2878 bruce                     769                 :          * If synchronous option is true, issue sync command as soon as there
                                770                 :          * are WAL data which has not been flushed yet.
 3166 fujii                     771                 :          */
 2359 magnus                    772 GIC        3631 :         if (stream->synchronous && lastFlushPosition < blockpos && walfile != NULL)
                                773                 :         {
  202 rhaas                     774 UNC           0 :             if (stream->walmethod->ops->sync(walfile) != 0)
  366 tgl                       775 UIC           0 :                 pg_fatal("could not fsync file \"%s\": %s",
                                776                 :                          walfile->pathname, GetLastWalMethodError(stream->walmethod));
 3166 fujii                     777 UBC           0 :             lastFlushPosition = blockpos;
 3064 fujii                     778 EUB             : 
                                779                 :             /*
                                780                 :              * Send feedback so that the server sees the latest WAL locations
                                781                 :              * immediately.
                                782                 :              */
 3064 fujii                     783 UIC           0 :             if (!sendFeedback(conn, blockpos, now, false))
                                784               0 :                 goto error;
 3064 fujii                     785 LBC           0 :             last_status = now;
 4183 magnus                    786 ECB             :         }
                                787                 : 
                                788                 :         /*
                                789                 :          * Potentially send a status message to the primary
                                790                 :          */
 2585 magnus                    791 GBC        7179 :         if (still_sending && stream->standby_message_timeout > 0 &&
 3309 rhaas                     792 CBC        3548 :             feTimestampDifferenceExceeds(last_status, now,
                                793                 :                                          stream->standby_message_timeout))
                                794                 :         {
                                795                 :             /* Time to send feedback! */
 3804 heikki.linnakangas        796 GIC          99 :             if (!sendFeedback(conn, blockpos, now, false))
 3923 magnus                    797 UIC           0 :                 goto error;
 4183 magnus                    798 CBC          99 :             last_status = now;
                                799                 :         }
                                800                 : 
 3201 fujii                     801 ECB             :         /*
 3166                           802                 :          * Calculate how long send/receive loops should sleep
                                803                 :          */
 2585 magnus                    804 CBC        3631 :         sleeptime = CalculateCopyStreamSleeptime(now, stream->standby_message_timeout,
 3064 fujii                     805 EUB             :                                                  last_status);
 3201 fujii                     806 ECB             : 
 2173 tgl                       807 GIC        3631 :         r = CopyStreamReceive(conn, sleeptime, stream->stop_socket, &copybuf);
 3166 fujii                     808 CBC       10379 :         while (r != 0)
                                809                 :         {
                                810            6847 :             if (r == -1)
 3168 fujii                     811 UBC           0 :                 goto error;
 3166 fujii                     812 GIC        6847 :             if (r == -2)
 3166 fujii                     813 ECB             :             {
 2585 magnus                    814 GIC          99 :                 PGresult   *res = HandleEndOfCopyStream(conn, stream, copybuf, blockpos, stoppos);
                                815                 : 
 3166 fujii                     816              99 :                 if (res == NULL)
 3166 fujii                     817 LBC           0 :                     goto error;
                                818                 :                 else
 3166 fujii                     819 GBC          99 :                     return res;
                                820                 :             }
 3805 heikki.linnakangas        821 EUB             : 
                                822                 :             /* Check the message type. */
 3166 fujii                     823 CBC        6748 :             if (copybuf[0] == 'k')
                                824                 :             {
 2383 peter_e                   825 LBC           0 :                 if (!ProcessKeepaliveMsg(conn, stream, copybuf, r, blockpos,
 3166 fujii                     826 EUB             :                                          &last_status))
 3166 fujii                     827 UIC           0 :                     goto error;
                                828                 :             }
 3166 fujii                     829 GIC        6748 :             else if (copybuf[0] == 'w')
                                830                 :             {
 2585 magnus                    831            6748 :                 if (!ProcessXLogDataMsg(conn, stream, copybuf, r, &blockpos))
 3166 fujii                     832 LBC           0 :                     goto error;
 3166 fujii                     833 EUB             : 
                                834                 :                 /*
                                835                 :                  * Check if we should continue streaming, or abort at this
                                836                 :                  * point.
                                837                 :                  */
  941 peter                     838 GIC        6748 :                 if (!CheckCopyStreamStop(conn, stream, blockpos))
 3166 fujii                     839 UBC           0 :                     goto error;
                                840                 :             }
                                841                 :             else
                                842                 :             {
 1469 peter                     843 UIC           0 :                 pg_log_error("unrecognized streaming header: \"%c\"",
                                844                 :                              copybuf[0]);
 3923 magnus                    845               0 :                 goto error;
 3166 fujii                     846 ECB             :             }
                                847                 : 
                                848                 :             /*
                                849                 :              * Process the received data, and any subsequent data we can read
 2878 bruce                     850 EUB             :              * without blocking.
 3166 fujii                     851                 :              */
 2173 tgl                       852 GBC        6748 :             r = CopyStreamReceive(conn, 0, stream->stop_socket, &copybuf);
                                853                 :         }
                                854                 :     }
                                855                 : 
 3923 magnus                    856 UIC           0 : error:
  226 peter                     857 UNC           0 :     PQfreemem(copybuf);
 3632 rhaas                     858 UIC           0 :     return NULL;
                                859                 : }
                                860                 : 
                                861                 : /*
                                862                 :  * Wait until we can read a CopyData message,
 2173 tgl                       863 ECB             :  * or timeout, or occurrence of a signal or input on the stop_socket.
                                864                 :  * (timeout_ms < 0 means wait indefinitely; 0 means don't wait.)
                                865                 :  *
                                866                 :  * Returns 1 if data has become available for reading, 0 if timed out
                                867                 :  * or interrupted by signal or stop_socket input, and -1 on an error.
                                868                 :  */
                                869                 : static int
 2173 tgl                       870 GIC       10257 : CopyStreamPoll(PGconn *conn, long timeout_ms, pgsocket stop_socket)
                                871                 : {
 3201 fujii                     872 ECB             :     int         ret;
                                873                 :     fd_set      input_mask;
                                874                 :     int         connsocket;
 2173 tgl                       875 EUB             :     int         maxfd;
 3201 fujii                     876                 :     struct timeval timeout;
                                877                 :     struct timeval *timeoutptr;
                                878                 : 
 2173 tgl                       879 CBC       10257 :     connsocket = PQsocket(conn);
                                880           10257 :     if (connsocket < 0)
 3201 fujii                     881 ECB             :     {
 1469 peter                     882 LBC           0 :         pg_log_error("invalid socket: %s", PQerrorMessage(conn));
 3201 fujii                     883 UIC           0 :         return -1;
 3201 fujii                     884 ECB             :     }
                                885                 : 
 3201 fujii                     886 GIC      174369 :     FD_ZERO(&input_mask);
 2173 tgl                       887           10257 :     FD_SET(connsocket, &input_mask);
 2173 tgl                       888 CBC       10257 :     maxfd = connsocket;
                                889           10257 :     if (stop_socket != PGINVALID_SOCKET)
                                890                 :     {
 2173 tgl                       891 GIC       10091 :         FD_SET(stop_socket, &input_mask);
 2173 tgl                       892 CBC       10091 :         maxfd = Max(maxfd, stop_socket);
 2173 tgl                       893 ECB             :     }
 3201 fujii                     894                 : 
 3201 fujii                     895 GIC       10257 :     if (timeout_ms < 0)
                                896              83 :         timeoutptr = NULL;
 3201 fujii                     897 ECB             :     else
                                898                 :     {
 3201 fujii                     899 CBC       10174 :         timeout.tv_sec = timeout_ms / 1000L;
 3201 fujii                     900 GIC       10174 :         timeout.tv_usec = (timeout_ms % 1000L) * 1000L;
 3201 fujii                     901 GBC       10174 :         timeoutptr = &timeout;
 3201 fujii                     902 EUB             :     }
                                903                 : 
 2173 tgl                       904 GBC       10257 :     ret = select(maxfd + 1, &input_mask, NULL, NULL, timeoutptr);
                                905                 : 
 2173 tgl                       906 CBC       10257 :     if (ret < 0)
 3201 fujii                     907 ECB             :     {
 2173 tgl                       908 UIC           0 :         if (errno == EINTR)
 2173 tgl                       909 LBC           0 :             return 0;           /* Got a signal, so not an error */
  716 peter                     910 UIC           0 :         pg_log_error("%s() failed: %m", "select");
 3201 fujii                     911               0 :         return -1;
                                912                 :     }
 2173 tgl                       913 GIC       10257 :     if (ret > 0 && FD_ISSET(connsocket, &input_mask))
                                914            6987 :         return 1;               /* Got input on connection socket */
                                915                 : 
                                916            3270 :     return 0;                   /* Got timeout or input on stop_socket */
                                917                 : }
                                918                 : 
                                919                 : /*
                                920                 :  * Receive CopyData message available from XLOG stream, blocking for
                                921                 :  * maximum of 'timeout' ms.
                                922                 :  *
                                923                 :  * If data was received, returns the length of the data. *buffer is set to
                                924                 :  * point to a buffer holding the received message. The buffer is only valid
 3201 fujii                     925 ECB             :  * until the next CopyStreamReceive call.
                                926                 :  *
                                927                 :  * Returns 0 if no data was available within timeout, or if wait was
 2173 tgl                       928                 :  * interrupted by signal or stop_socket input.
                                929                 :  * -1 on error. -2 if the server ended the COPY.
                                930                 :  */
 3201 fujii                     931                 : static int
 2173 tgl                       932 CBC       10379 : CopyStreamReceive(PGconn *conn, long timeout, pgsocket stop_socket,
                                933                 :                   char **buffer)
                                934                 : {
 3201 fujii                     935           10379 :     char       *copybuf = NULL;
 3201 fujii                     936 ECB             :     int         rawlen;
                                937                 : 
  226 peter                     938 GNC       10379 :     PQfreemem(*buffer);
 3201 fujii                     939 GIC       10379 :     *buffer = NULL;
                                940                 : 
                                941                 :     /* Try to receive a CopyData message */
                                942           10379 :     rawlen = PQgetCopyData(conn, &copybuf, 1);
                                943           10379 :     if (rawlen == 0)
 3201 fujii                     944 ECB             :     {
 2173 tgl                       945                 :         int         ret;
                                946                 : 
                                947                 :         /*
                                948                 :          * No data available.  Wait for some to appear, but not longer than
                                949                 :          * the specified timeout, so that we can ping the server.  Also stop
                                950                 :          * waiting if input appears on stop_socket.
 3201 fujii                     951 EUB             :          */
 2173 tgl                       952 GIC       10257 :         ret = CopyStreamPoll(conn, timeout, stop_socket);
 2173 tgl                       953 GBC       10257 :         if (ret <= 0)
 2173 tgl                       954 GIC        3270 :             return ret;
                                955                 : 
                                956                 :         /* Now there is actually data on the socket */
 3201 fujii                     957 CBC        6987 :         if (PQconsumeInput(conn) == 0)
 3201 fujii                     958 ECB             :         {
 1469 peter                     959 LBC           0 :             pg_log_error("could not receive data from WAL stream: %s",
                                960                 :                          PQerrorMessage(conn));
 3201 fujii                     961               0 :             return -1;
 3201 fujii                     962 ECB             :         }
                                963                 : 
                                964                 :         /* Now that we've consumed some input, try again */
 3201 fujii                     965 GBC        6987 :         rawlen = PQgetCopyData(conn, &copybuf, 1);
                                966            6987 :         if (rawlen == 0)
 3201 fujii                     967 GIC         262 :             return 0;
                                968                 :     }
                                969            6847 :     if (rawlen == -1)           /* end-of-streaming or error */
 3201 fujii                     970 CBC          99 :         return -2;
                                971            6748 :     if (rawlen == -2)
                                972                 :     {
 1469 peter                     973 UIC           0 :         pg_log_error("could not read COPY data: %s", PQerrorMessage(conn));
 3201 fujii                     974               0 :         return -1;
                                975                 :     }
                                976                 : 
                                977                 :     /* Return received messages to caller */
 3201 fujii                     978 GBC        6748 :     *buffer = copybuf;
 3201 fujii                     979 GIC        6748 :     return rawlen;
                                980                 : }
                                981                 : 
                                982                 : /*
                                983                 :  * Process the keepalive message.
                                984                 :  */
                                985                 : static bool
 2383 peter_e                   986 UIC           0 : ProcessKeepaliveMsg(PGconn *conn, StreamCtl *stream, char *copybuf, int len,
                                987                 :                     XLogRecPtr blockpos, TimestampTz *last_status)
                                988                 : {
 3168 fujii                     989 EUB             :     int         pos;
                                990                 :     bool        replyRequested;
 2236 tgl                       991                 :     TimestampTz now;
                                992                 : 
 3168 fujii                     993                 :     /*
                                994                 :      * Parse the keepalive message, enclosed in the CopyData message. We just
 2878 bruce                     995                 :      * check if the server requested a reply, and ignore the rest.
 3168 fujii                     996                 :      */
 2878 bruce                     997 UIC           0 :     pos = 1;                    /* skip msgtype 'k' */
 2878 bruce                     998 UBC           0 :     pos += 8;                   /* skip walEnd */
 2878 bruce                     999 UIC           0 :     pos += 8;                   /* skip sendTime */
                               1000                 : 
 3168 fujii                    1001 UBC           0 :     if (len < pos + 1)
                               1002                 :     {
 1469 peter                    1003               0 :         pg_log_error("streaming header too small: %d", len);
 3168 fujii                    1004               0 :         return false;
                               1005                 :     }
 3168 fujii                    1006 UIC           0 :     replyRequested = copybuf[pos];
                               1007                 : 
                               1008                 :     /* If the server requested an immediate reply, send one. */
                               1009               0 :     if (replyRequested && still_sending)
                               1010                 :     {
 3063                          1011               0 :         if (reportFlushPosition && lastFlushPosition < blockpos &&
 2359 magnus                   1012               0 :             walfile != NULL)
 3063 fujii                    1013 EUB             :         {
                               1014                 :             /*
                               1015                 :              * If a valid flush location needs to be reported, flush the
 2878 bruce                    1016                 :              * current WAL file so that the latest flush location is sent back
                               1017                 :              * to the server. This is necessary to see whether the last WAL
                               1018                 :              * data has been successfully replicated or not, at the normal
                               1019                 :              * shutdown of the server.
 3063 fujii                    1020                 :              */
  202 rhaas                    1021 UNC           0 :             if (stream->walmethod->ops->sync(walfile) != 0)
  366 tgl                      1022 UBC           0 :                 pg_fatal("could not fsync file \"%s\": %s",
                               1023                 :                          walfile->pathname, GetLastWalMethodError(stream->walmethod));
 3063 fujii                    1024 UIC           0 :             lastFlushPosition = blockpos;
 3063 fujii                    1025 EUB             :         }
                               1026                 : 
 3168 fujii                    1027 UIC           0 :         now = feGetCurrentTimestamp();
                               1028               0 :         if (!sendFeedback(conn, blockpos, now, false))
                               1029               0 :             return false;
                               1030               0 :         *last_status = now;
                               1031                 :     }
 3168 fujii                    1032 ECB             : 
 3168 fujii                    1033 UIC           0 :     return true;
                               1034                 : }
                               1035                 : 
                               1036                 : /*
                               1037                 :  * Process XLogData message.
                               1038                 :  */
                               1039                 : static bool
 2585 magnus                   1040 GIC        6748 : ProcessXLogDataMsg(PGconn *conn, StreamCtl *stream, char *copybuf, int len,
                               1041                 :                    XLogRecPtr *blockpos)
                               1042                 : {
                               1043                 :     int         xlogoff;
 3168 fujii                    1044 ECB             :     int         bytes_left;
                               1045                 :     int         bytes_written;
                               1046                 :     int         hdr_len;
                               1047                 : 
                               1048                 :     /*
                               1049                 :      * Once we've decided we don't want to receive any more, just ignore any
                               1050                 :      * subsequent XLogData messages.
                               1051                 :      */
 3168 fujii                    1052 CBC        6748 :     if (!(still_sending))
                               1053              86 :         return true;
 3168 fujii                    1054 ECB             : 
                               1055                 :     /*
 2878 bruce                    1056                 :      * Read the header of the XLogData message, enclosed in the CopyData
                               1057                 :      * message. We only need the WAL location field (dataStart), the rest of
 2878 bruce                    1058 EUB             :      * the header is ignored.
 3168 fujii                    1059                 :      */
 2878 bruce                    1060 GIC        6662 :     hdr_len = 1;                /* msgtype 'w' */
 2878 bruce                    1061 CBC        6662 :     hdr_len += 8;               /* dataStart */
 2878 bruce                    1062 GIC        6662 :     hdr_len += 8;               /* walEnd */
                               1063            6662 :     hdr_len += 8;               /* sendTime */
 3168 fujii                    1064 CBC        6662 :     if (len < hdr_len)
                               1065                 :     {
 1469 peter                    1066 UIC           0 :         pg_log_error("streaming header too small: %d", len);
 3168 fujii                    1067               0 :         return false;
                               1068                 :     }
 3168 fujii                    1069 GIC        6662 :     *blockpos = fe_recvint64(&copybuf[1]);
 3168 fujii                    1070 ECB             : 
                               1071                 :     /* Extract WAL location for this block */
 2028 andres                   1072 GIC        6662 :     xlogoff = XLogSegmentOffset(*blockpos, WalSegSz);
 3168 fujii                    1073 ECB             : 
                               1074                 :     /*
 2878 bruce                    1075 EUB             :      * Verify that the initial location in the stream matches where we think
                               1076                 :      * we are.
 3168 fujii                    1077                 :      */
 2359 magnus                   1078 GIC        6662 :     if (walfile == NULL)
                               1079                 :     {
                               1080                 :         /* No file open yet */
 3168 fujii                    1081             105 :         if (xlogoff != 0)
                               1082                 :         {
 1469 peter                    1083 LBC           0 :             pg_log_error("received write-ahead log record for offset %u with no file open",
                               1084                 :                          xlogoff);
 3168 fujii                    1085 UBC           0 :             return false;
                               1086                 :         }
 3168 fujii                    1087 EUB             :     }
                               1088                 :     else
                               1089                 :     {
                               1090                 :         /* More data in existing segment */
  202 rhaas                    1091 GNC        6557 :         if (walfile->currpos != xlogoff)
 3168 fujii                    1092 ECB             :         {
 1469 peter                    1093 UIC           0 :             pg_log_error("got WAL data offset %08x, expected %08x",
                               1094                 :                          xlogoff, (int) walfile->currpos);
 3168 fujii                    1095               0 :             return false;
                               1096                 :         }
                               1097                 :     }
                               1098                 : 
 3168 fujii                    1099 GIC        6662 :     bytes_left = len - hdr_len;
                               1100            6662 :     bytes_written = 0;
                               1101                 : 
 3168 fujii                    1102 CBC       13323 :     while (bytes_left)
 3168 fujii                    1103 EUB             :     {
                               1104                 :         int         bytes_to_write;
 3168 fujii                    1105 ECB             : 
                               1106                 :         /*
 2028 andres                   1107                 :          * If crossing a WAL boundary, only write up until we reach wal
                               1108                 :          * segment size.
 3168 fujii                    1109                 :          */
 2028 andres                   1110 GIC        6662 :         if (xlogoff + bytes_left > WalSegSz)
 2028 andres                   1111 UIC           0 :             bytes_to_write = WalSegSz - xlogoff;
 3168 fujii                    1112 EUB             :         else
 3168 fujii                    1113 GIC        6662 :             bytes_to_write = bytes_left;
                               1114                 : 
 2359 magnus                   1115            6662 :         if (walfile == NULL)
 3168 fujii                    1116 ECB             :         {
 2585 magnus                   1117 CBC         105 :             if (!open_walfile(stream, *blockpos))
 3168 fujii                    1118 ECB             :             {
                               1119                 :                 /* Error logged by open_walfile */
 3168 fujii                    1120 UBC           0 :                 return false;
                               1121                 :             }
                               1122                 :         }
 3168 fujii                    1123 EUB             : 
  202 rhaas                    1124 GNC       13324 :         if (stream->walmethod->ops->write(walfile,
                               1125            6662 :                                           copybuf + hdr_len + bytes_written,
                               1126            6662 :                                           bytes_to_write) != bytes_to_write)
                               1127                 :         {
  508 peter                    1128 LBC           0 :             pg_log_error("could not write %d bytes to WAL file \"%s\": %s",
                               1129                 :                          bytes_to_write, walfile->pathname,
                               1130                 :                          GetLastWalMethodError(stream->walmethod));
 3168 fujii                    1131               0 :             return false;
                               1132                 :         }
                               1133                 : 
 3168 fujii                    1134 ECB             :         /* Write was successful, advance our position */
 3168 fujii                    1135 GIC        6662 :         bytes_written += bytes_to_write;
 3168 fujii                    1136 CBC        6662 :         bytes_left -= bytes_to_write;
 3168 fujii                    1137 GIC        6662 :         *blockpos += bytes_to_write;
 3168 fujii                    1138 GBC        6662 :         xlogoff += bytes_to_write;
                               1139                 : 
 3168 fujii                    1140 ECB             :         /* Did we reach the end of a WAL segment? */
 2028 andres                   1141 GIC        6662 :         if (XLogSegmentOffset(*blockpos, WalSegSz) == 0)
 3168 fujii                    1142 ECB             :         {
 2585 magnus                   1143 GIC          46 :             if (!close_walfile(stream, *blockpos))
 3168 fujii                    1144 ECB             :                 /* Error message written in close_walfile() */
 3168 fujii                    1145 UIC           0 :                 return false;
 3168 fujii                    1146 EUB             : 
 3168 fujii                    1147 GIC          46 :             xlogoff = 0;
 3168 fujii                    1148 EUB             : 
 2585 magnus                   1149 GIC          46 :             if (still_sending && stream->stream_stop(*blockpos, stream->timeline, true))
 3168 fujii                    1150 ECB             :             {
 3168 fujii                    1151 CBC           1 :                 if (PQputCopyEnd(conn, NULL) <= 0 || PQflush(conn))
                               1152                 :                 {
 1469 peter                    1153 UIC           0 :                     pg_log_error("could not send copy-end packet: %s",
                               1154                 :                                  PQerrorMessage(conn));
 3168 fujii                    1155               0 :                     return false;
                               1156                 :                 }
 3168 fujii                    1157 CBC           1 :                 still_sending = false;
 3168 fujii                    1158 GIC           1 :                 return true;    /* ignore the rest of this XLogData packet */
                               1159                 :             }
                               1160                 :         }
                               1161                 :     }
                               1162                 :     /* No more data left to write, receive next copy packet */
                               1163                 : 
 3168 fujii                    1164 CBC        6661 :     return true;
                               1165                 : }
                               1166                 : 
 3168 fujii                    1167 ECB             : /*
                               1168                 :  * Handle end of the copy stream.
                               1169                 :  */
                               1170                 : static PGresult *
 2585 magnus                   1171 GIC          99 : HandleEndOfCopyStream(PGconn *conn, StreamCtl *stream, char *copybuf,
                               1172                 :                       XLogRecPtr blockpos, XLogRecPtr *stoppos)
                               1173                 : {
 3168 fujii                    1174 CBC          99 :     PGresult   *res = PQgetResult(conn);
                               1175                 : 
 3168 fujii                    1176 ECB             :     /*
                               1177                 :      * The server closed its end of the copy stream.  If we haven't closed
                               1178                 :      * ours already, we need to do so now, unless the server threw an error,
 2878 bruce                    1179 EUB             :      * in which case we don't.
 3168 fujii                    1180                 :      */
 3168 fujii                    1181 GIC          99 :     if (still_sending)
 3168 fujii                    1182 ECB             :     {
 2585 magnus                   1183 GIC           2 :         if (!close_walfile(stream, blockpos))
 3168 fujii                    1184 ECB             :         {
                               1185                 :             /* Error message written in close_walfile() */
 3168 fujii                    1186 UBC           0 :             PQclear(res);
 3168 fujii                    1187 UIC           0 :             return NULL;
 3168 fujii                    1188 EUB             :         }
 3168 fujii                    1189 GBC           2 :         if (PQresultStatus(res) == PGRES_COPY_IN)
                               1190                 :         {
 3168 fujii                    1191 CBC           1 :             if (PQputCopyEnd(conn, NULL) <= 0 || PQflush(conn))
                               1192                 :             {
 1469 peter                    1193 LBC           0 :                 pg_log_error("could not send copy-end packet: %s",
                               1194                 :                              PQerrorMessage(conn));
 3168 fujii                    1195               0 :                 PQclear(res);
                               1196               0 :                 return NULL;
 3168 fujii                    1197 ECB             :             }
 3168 fujii                    1198 GIC           1 :             res = PQgetResult(conn);
                               1199                 :         }
                               1200               2 :         still_sending = false;
                               1201                 :     }
  226 peter                    1202 GNC          99 :     PQfreemem(copybuf);
 3168 fujii                    1203 CBC          99 :     *stoppos = blockpos;
 3168 fujii                    1204 GIC          99 :     return res;
 3168 fujii                    1205 ECB             : }
                               1206                 : 
 3166                          1207                 : /*
                               1208                 :  * Check if we should continue streaming, or abort at this point.
                               1209                 :  */
 3166 fujii                    1210 EUB             : static bool
  941 peter                    1211 GIC       10379 : CheckCopyStreamStop(PGconn *conn, StreamCtl *stream, XLogRecPtr blockpos)
 3166 fujii                    1212 ECB             : {
 2585 magnus                   1213 GIC       10379 :     if (still_sending && stream->stream_stop(blockpos, stream->timeline, false))
 3166 fujii                    1214 EUB             :     {
 2585 magnus                   1215 GIC          96 :         if (!close_walfile(stream, blockpos))
 3166 fujii                    1216 EUB             :         {
                               1217                 :             /* Potential error message is written by close_walfile */
 3166 fujii                    1218 LBC           0 :             return false;
                               1219                 :         }
 3166 fujii                    1220 GIC          96 :         if (PQputCopyEnd(conn, NULL) <= 0 || PQflush(conn))
 3166 fujii                    1221 ECB             :         {
 1469 peter                    1222 UIC           0 :             pg_log_error("could not send copy-end packet: %s",
                               1223                 :                          PQerrorMessage(conn));
 3166 fujii                    1224               0 :             return false;
                               1225                 :         }
 3166 fujii                    1226 GIC          96 :         still_sending = false;
                               1227                 :     }
 3166 fujii                    1228 ECB             : 
 3166 fujii                    1229 GIC       10379 :     return true;
                               1230                 : }
 3166 fujii                    1231 ECB             : 
                               1232                 : /*
                               1233                 :  * Calculate how long send/receive loops should sleep
                               1234                 :  */
                               1235                 : static long
 2236 tgl                      1236 CBC        3631 : CalculateCopyStreamSleeptime(TimestampTz now, int standby_message_timeout,
                               1237                 :                              TimestampTz last_status)
 3166 fujii                    1238 ECB             : {
 2236 tgl                      1239 GIC        3631 :     TimestampTz status_targettime = 0;
                               1240                 :     long        sleeptime;
                               1241                 : 
 3166 fujii                    1242            3631 :     if (standby_message_timeout && still_sending)
 3166 fujii                    1243 CBC        3548 :         status_targettime = last_status +
 3166 fujii                    1244 GIC        3548 :             (standby_message_timeout - 1) * ((int64) 1000);
                               1245                 : 
 3064                          1246            3631 :     if (status_targettime > 0)
                               1247                 :     {
 3166 fujii                    1248 ECB             :         long        secs;
                               1249                 :         int         usecs;
 3166 fujii                    1250 EUB             : 
 3166 fujii                    1251 GBC        3548 :         feTimestampDifference(now,
                               1252                 :                               status_targettime,
                               1253                 :                               &secs,
 3166 fujii                    1254 ECB             :                               &usecs);
                               1255                 :         /* Always sleep at least 1 sec */
 3166 fujii                    1256 GIC        3548 :         if (secs <= 0)
 3166 fujii                    1257 ECB             :         {
 3166 fujii                    1258 UIC           0 :             secs = 1;
 3166 fujii                    1259 LBC           0 :             usecs = 0;
                               1260                 :         }
                               1261                 : 
 3166 fujii                    1262 GIC        3548 :         sleeptime = secs * 1000 + usecs / 1000;
                               1263                 :     }
                               1264                 :     else
                               1265              83 :         sleeptime = -1;
                               1266                 : 
                               1267            3631 :     return sleeptime;
                               1268                 : }
        

Generated by: LCOV version v1.16-55-g56c0a2a