LCOV - differential code coverage report
Current view: top level - src/bin/pg_basebackup - receivelog.c (source / functions) Coverage Total Hit UNC LBC UIC UBC GBC GIC GNC CBC EUB ECB DUB DCB
Current: Differential Code Coverage HEAD vs 15 Lines: 63.9 % 424 271 8 23 94 28 24 152 28 67 91 167 10 14
Current Date: 2023-04-08 15:15:32 Functions: 94.1 % 17 16 1 16 1 16
Baseline: 15
Baseline Date: 2023-04-08 15:09:40
Legend: Lines: hit not hit

           TLA  Line data    Source code
       1                 : /*-------------------------------------------------------------------------
       2                 :  *
       3                 :  * receivelog.c - receive WAL files using the streaming
       4                 :  *                replication protocol.
       5                 :  *
       6                 :  * Author: Magnus Hagander <magnus@hagander.net>
       7                 :  *
       8                 :  * Portions Copyright (c) 1996-2023, PostgreSQL Global Development Group
       9                 :  *
      10                 :  * IDENTIFICATION
      11                 :  *        src/bin/pg_basebackup/receivelog.c
      12                 :  *-------------------------------------------------------------------------
      13                 :  */
      14                 : 
      15                 : #include "postgres_fe.h"
      16                 : 
      17                 : #include <sys/select.h>
      18                 : #include <sys/stat.h>
      19                 : #include <unistd.h>
      20                 : 
      21                 : #include "access/xlog_internal.h"
      22                 : #include "common/file_utils.h"
      23                 : #include "common/logging.h"
      24                 : #include "libpq-fe.h"
      25                 : #include "receivelog.h"
      26                 : #include "streamutil.h"
      27                 : 
      28                 : /* currently open WAL file */
      29                 : static Walfile *walfile = NULL;
      30                 : static bool reportFlushPosition = false;
      31                 : static XLogRecPtr lastFlushPosition = InvalidXLogRecPtr;
      32                 : 
      33                 : static bool still_sending = true;   /* feedback still needs to be sent? */
      34                 : 
      35                 : static PGresult *HandleCopyStream(PGconn *conn, StreamCtl *stream,
      36                 :                                   XLogRecPtr *stoppos);
      37                 : static int  CopyStreamPoll(PGconn *conn, long timeout_ms, pgsocket stop_socket);
      38                 : static int  CopyStreamReceive(PGconn *conn, long timeout, pgsocket stop_socket,
      39                 :                               char **buffer);
      40                 : static bool ProcessKeepaliveMsg(PGconn *conn, StreamCtl *stream, char *copybuf,
      41                 :                                 int len, XLogRecPtr blockpos, TimestampTz *last_status);
      42                 : static bool ProcessXLogDataMsg(PGconn *conn, StreamCtl *stream, char *copybuf, int len,
      43                 :                                XLogRecPtr *blockpos);
      44                 : static PGresult *HandleEndOfCopyStream(PGconn *conn, StreamCtl *stream, char *copybuf,
      45                 :                                        XLogRecPtr blockpos, XLogRecPtr *stoppos);
      46                 : static bool CheckCopyStreamStop(PGconn *conn, StreamCtl *stream, XLogRecPtr blockpos);
      47                 : static long CalculateCopyStreamSleeptime(TimestampTz now, int standby_message_timeout,
      48                 :                                          TimestampTz last_status);
      49                 : 
      50                 : static bool ReadEndOfStreamingResult(PGresult *res, XLogRecPtr *startpos,
      51 ECB             :                                      uint32 *timeline);
      52                 : 
      53                 : static bool
      54 GIC          41 : mark_file_as_archived(StreamCtl *stream, const char *fname)
      55                 : {
      56 ECB             :     Walfile    *f;
      57                 :     static char tmppath[MAXPGPATH];
      58                 : 
      59 CBC          41 :     snprintf(tmppath, sizeof(tmppath), "archive_status/%s.done",
      60                 :              fname);
      61 ECB             : 
      62 GNC          41 :     f = stream->walmethod->ops->open_for_write(stream->walmethod, tmppath,
      63                 :                                                NULL, 0);
      64 GBC          41 :     if (f == NULL)
      65                 :     {
      66 UBC           0 :         pg_log_error("could not create archive status file \"%s\": %s",
      67                 :                      tmppath, GetLastWalMethodError(stream->walmethod));
      68 UIC           0 :         return false;
      69 ECB             :     }
      70                 : 
      71 GNC          41 :     if (stream->walmethod->ops->close(f, CLOSE_NORMAL) != 0)
      72                 :     {
      73 UBC           0 :         pg_log_error("could not close archive status file \"%s\": %s",
      74                 :                      tmppath, GetLastWalMethodError(stream->walmethod));
      75 UIC           0 :         return false;
      76 ECB             :     }
      77                 : 
      78 GIC          41 :     return true;
      79                 : }
      80                 : 
      81                 : /*
      82                 :  * Open a new WAL file in the specified directory.
      83                 :  *
      84                 :  * Returns true if OK; on failure, returns false after printing an error msg.
      85                 :  * On success, 'walfile' is set to the opened WAL file.
      86                 :  *
      87 ECB             :  * The file will be padded to 16Mb with zeroes.
      88                 :  */
      89                 : static bool
      90 GIC         105 : open_walfile(StreamCtl *stream, XLogRecPtr startpoint)
      91                 : {
      92                 :     Walfile    *f;
      93                 :     char       *fn;
      94                 :     ssize_t     size;
      95 ECB             :     XLogSegNo   segno;
      96                 :     char        walfile_name[MAXPGPATH];
      97                 : 
      98 GIC         105 :     XLByteToSeg(startpoint, segno, WalSegSz);
      99 GNC         105 :     XLogFileName(walfile_name, stream->timeline, segno, WalSegSz);
     100 ECB             : 
     101                 :     /* Note that this considers the compression used if necessary */
     102 GNC         105 :     fn = stream->walmethod->ops->get_file_name(stream->walmethod,
     103                 :                                                walfile_name,
     104             105 :                                                stream->partial_suffix);
     105                 : 
     106                 :     /*
     107                 :      * When streaming to files, if an existing file exists we verify that it's
     108                 :      * either empty (just created), or a complete WalSegSz segment (in which
     109                 :      * case it has been created and padded). Anything else indicates a corrupt
     110                 :      * file. Compressed files have no need for padding, so just ignore this
     111                 :      * case.
     112                 :      *
     113                 :      * When streaming to tar, no file with this name will exist before, so we
     114                 :      * never have to verify a size.
     115 ECB             :      */
     116 GNC         203 :     if (stream->walmethod->compression_algorithm == PG_COMPRESSION_NONE &&
     117              98 :         stream->walmethod->ops->existsfile(stream->walmethod, fn))
     118 EUB             :     {
     119 UNC           0 :         size = stream->walmethod->ops->get_file_size(stream->walmethod, fn);
     120 UIC           0 :         if (size < 0)
     121 EUB             :         {
     122 UIC           0 :             pg_log_error("could not get size of write-ahead log file \"%s\": %s",
     123                 :                          fn, GetLastWalMethodError(stream->walmethod));
     124 UBC           0 :             pg_free(fn);
     125 UIC           0 :             return false;
     126 EUB             :         }
     127 UIC           0 :         if (size == WalSegSz)
     128                 :         {
     129 EUB             :             /* Already padded file. Open it for use */
     130 UNC           0 :             f = stream->walmethod->ops->open_for_write(stream->walmethod, walfile_name, stream->partial_suffix, 0);
     131 UIC           0 :             if (f == NULL)
     132 EUB             :             {
     133 UIC           0 :                 pg_log_error("could not open existing write-ahead log file \"%s\": %s",
     134                 :                              fn, GetLastWalMethodError(stream->walmethod));
     135 UBC           0 :                 pg_free(fn);
     136 UIC           0 :                 return false;
     137                 :             }
     138                 : 
     139 EUB             :             /* fsync file in case of a previous crash */
     140 UNC           0 :             if (stream->walmethod->ops->sync(f) != 0)
     141 EUB             :             {
     142 UIC           0 :                 pg_log_error("could not fsync existing write-ahead log file \"%s\": %s",
     143                 :                              fn, GetLastWalMethodError(stream->walmethod));
     144 UNC           0 :                 stream->walmethod->ops->close(f, CLOSE_UNLINK);
     145 UIC           0 :                 exit(1);
     146                 :             }
     147 EUB             : 
     148 UBC           0 :             walfile = f;
     149               0 :             pg_free(fn);
     150 UIC           0 :             return true;
     151 EUB             :         }
     152 UIC           0 :         if (size != 0)
     153                 :         {
     154 EUB             :             /* if write didn't set errno, assume problem is no disk space */
     155 UBC           0 :             if (errno == 0)
     156               0 :                 errno = ENOSPC;
     157 UIC           0 :             pg_log_error(ngettext("write-ahead log file \"%s\" has %zd byte, should be 0 or %d",
     158                 :                                   "write-ahead log file \"%s\" has %zd bytes, should be 0 or %d",
     159                 :                                   size),
     160 EUB             :                          fn, size, WalSegSz);
     161 UBC           0 :             pg_free(fn);
     162 UIC           0 :             return false;
     163                 :         }
     164                 :         /* File existed and was empty, so fall through and open */
     165                 :     }
     166                 : 
     167                 :     /* No file existed, so create one */
     168 ECB             : 
     169 GNC         105 :     f = stream->walmethod->ops->open_for_write(stream->walmethod,
     170                 :                                                walfile_name,
     171             105 :                                                stream->partial_suffix,
     172                 :                                                WalSegSz);
     173 GIC         105 :     if (f == NULL)
     174 ECB             :     {
     175 UIC           0 :         pg_log_error("could not open write-ahead log file \"%s\": %s",
     176                 :                      fn, GetLastWalMethodError(stream->walmethod));
     177               0 :         pg_free(fn);
     178 UBC           0 :         return false;
     179 EUB             :     }
     180                 : 
     181 GIC         105 :     pg_free(fn);
     182 CBC         105 :     walfile = f;
     183             105 :     return true;
     184 ECB             : }
     185                 : 
     186                 : /*
     187                 :  * Close the current WAL file (if open), and rename it to the correct
     188                 :  * filename if it's complete. On failure, prints an error message to stderr
     189                 :  * and returns false, otherwise returns true.
     190                 :  */
     191                 : static bool
     192 GIC         144 : close_walfile(StreamCtl *stream, XLogRecPtr pos)
     193 ECB             : {
     194                 :     char       *fn;
     195                 :     off_t       currpos;
     196                 :     int         r;
     197                 :     char        walfile_name[MAXPGPATH];
     198                 : 
     199 GIC         144 :     if (walfile == NULL)
     200              39 :         return true;
     201 ECB             : 
     202 GNC         105 :     strlcpy(walfile_name, walfile->pathname, MAXPGPATH);
     203             105 :     currpos = walfile->currpos;
     204                 : 
     205 ECB             :     /* Note that this considers the compression used if necessary */
     206 GNC         105 :     fn = stream->walmethod->ops->get_file_name(stream->walmethod,
     207                 :                                                walfile_name,
     208             105 :                                                stream->partial_suffix);
     209 ECB             : 
     210 CBC         105 :     if (stream->partial_suffix)
     211                 :     {
     212 GIC          12 :         if (currpos == WalSegSz)
     213 GNC           6 :             r = stream->walmethod->ops->close(walfile, CLOSE_NORMAL);
     214 ECB             :         else
     215                 :         {
     216 CBC           6 :             pg_log_info("not renaming \"%s\", segment is not complete", fn);
     217 GNC           6 :             r = stream->walmethod->ops->close(walfile, CLOSE_NO_RENAME);
     218 ECB             :         }
     219                 :     }
     220 EUB             :     else
     221 GNC          93 :         r = stream->walmethod->ops->close(walfile, CLOSE_NORMAL);
     222                 : 
     223 GBC         105 :     walfile = NULL;
     224 EUB             : 
     225 GIC         105 :     if (r != 0)
     226                 :     {
     227 LBC           0 :         pg_log_error("could not close file \"%s\": %s",
     228                 :                      fn, GetLastWalMethodError(stream->walmethod));
     229                 : 
     230 UIC           0 :         pg_free(fn);
     231               0 :         return false;
     232                 :     }
     233                 : 
     234 GIC         105 :     pg_free(fn);
     235 ECB             : 
     236                 :     /*
     237                 :      * Mark file as archived if requested by the caller - pg_basebackup needs
     238                 :      * to do so as files can otherwise get archived again after promotion of a
     239 EUB             :      * new node. This is in line with walreceiver.c always doing a
     240                 :      * XLogArchiveForceDone() after a complete segment.
     241                 :      */
     242 CBC         105 :     if (currpos == WalSegSz && stream->mark_done)
     243 ECB             :     {
     244                 :         /* writes error message if failed */
     245 GNC          40 :         if (!mark_file_as_archived(stream, walfile_name))
     246 UIC           0 :             return false;
     247                 :     }
     248                 : 
     249 GIC         105 :     lastFlushPosition = pos;
     250             105 :     return true;
     251 ECB             : }
     252                 : 
     253                 : 
     254                 : /*
     255                 :  * Check if a timeline history file exists.
     256                 :  */
     257                 : static bool
     258 GIC         101 : existsTimeLineHistoryFile(StreamCtl *stream)
     259 ECB             : {
     260                 :     char        histfname[MAXFNAMELEN];
     261                 : 
     262                 :     /*
     263                 :      * Timeline 1 never has a history file. We treat that as if it existed,
     264                 :      * since we never need to stream it.
     265                 :      */
     266 GIC         101 :     if (stream->timeline == 1)
     267              99 :         return true;
     268 ECB             : 
     269 GIC           2 :     TLHistoryFileName(histfname, stream->timeline);
     270 ECB             : 
     271 GNC           2 :     return stream->walmethod->ops->existsfile(stream->walmethod, histfname);
     272                 : }
     273                 : 
     274                 : static bool
     275 GIC           2 : writeTimeLineHistoryFile(StreamCtl *stream, char *filename, char *content)
     276                 : {
     277               2 :     int         size = strlen(content);
     278 ECB             :     char        histfname[MAXFNAMELEN];
     279                 :     Walfile    *f;
     280                 : 
     281 EUB             :     /*
     282                 :      * Check that the server's idea of how timeline history files should be
     283                 :      * named matches ours.
     284                 :      */
     285 GIC           2 :     TLHistoryFileName(histfname, stream->timeline);
     286 CBC           2 :     if (strcmp(histfname, filename) != 0)
     287                 :     {
     288 LBC           0 :         pg_log_error("server reported unexpected history file name for timeline %u: %s",
     289                 :                      stream->timeline, filename);
     290 UBC           0 :         return false;
     291                 :     }
     292 EUB             : 
     293 GNC           2 :     f = stream->walmethod->ops->open_for_write(stream->walmethod,
     294                 :                                                histfname, ".tmp", 0);
     295 GIC           2 :     if (f == NULL)
     296 ECB             :     {
     297 UIC           0 :         pg_log_error("could not create timeline history file \"%s\": %s",
     298                 :                      histfname, GetLastWalMethodError(stream->walmethod));
     299               0 :         return false;
     300                 :     }
     301                 : 
     302 GNC           2 :     if ((int) stream->walmethod->ops->write(f, content, size) != size)
     303                 :     {
     304 UBC           0 :         pg_log_error("could not write timeline history file \"%s\": %s",
     305                 :                      histfname, GetLastWalMethodError(stream->walmethod));
     306 EUB             : 
     307                 :         /*
     308                 :          * If we fail to make the file, delete it to release disk space
     309 ECB             :          */
     310 UNC           0 :         stream->walmethod->ops->close(f, CLOSE_UNLINK);
     311 EUB             : 
     312 UIC           0 :         return false;
     313 EUB             :     }
     314                 : 
     315 GNC           2 :     if (stream->walmethod->ops->close(f, CLOSE_NORMAL) != 0)
     316                 :     {
     317 LBC           0 :         pg_log_error("could not close file \"%s\": %s",
     318                 :                      histfname, GetLastWalMethodError(stream->walmethod));
     319 UIC           0 :         return false;
     320 ECB             :     }
     321 EUB             : 
     322                 :     /* Maintain archive_status, check close_walfile() for details. */
     323 GIC           2 :     if (stream->mark_done)
     324 ECB             :     {
     325                 :         /* writes error message if failed */
     326 GIC           1 :         if (!mark_file_as_archived(stream, histfname))
     327 UIC           0 :             return false;
     328                 :     }
     329                 : 
     330 GIC           2 :     return true;
     331 ECB             : }
     332                 : 
     333                 : /*
     334                 :  * Send a Standby Status Update message to server.
     335                 :  */
     336                 : static bool
     337 CBC          99 : sendFeedback(PGconn *conn, XLogRecPtr blockpos, TimestampTz now, bool replyRequested)
     338 ECB             : {
     339                 :     char        replybuf[1 + 8 + 8 + 8 + 8 + 1];
     340 CBC          99 :     int         len = 0;
     341 ECB             : 
     342 GIC          99 :     replybuf[len] = 'r';
     343 CBC          99 :     len += 1;
     344              99 :     fe_sendint64(blockpos, &replybuf[len]); /* write */
     345              99 :     len += 8;
     346              99 :     if (reportFlushPosition)
     347              95 :         fe_sendint64(lastFlushPosition, &replybuf[len]);    /* flush */
     348 ECB             :     else
     349 CBC           4 :         fe_sendint64(InvalidXLogRecPtr, &replybuf[len]);    /* flush */
     350              99 :     len += 8;
     351 GIC          99 :     fe_sendint64(InvalidXLogRecPtr, &replybuf[len]);    /* apply */
     352 CBC          99 :     len += 8;
     353 GIC          99 :     fe_sendint64(now, &replybuf[len]);  /* sendTime */
     354 GBC          99 :     len += 8;
     355 GIC          99 :     replybuf[len] = replyRequested ? 1 : 0; /* replyRequested */
     356 GBC          99 :     len += 1;
     357                 : 
     358 GIC          99 :     if (PQputCopyData(conn, replybuf, len) <= 0 || PQflush(conn))
     359 ECB             :     {
     360 UIC           0 :         pg_log_error("could not send feedback packet: %s",
     361                 :                      PQerrorMessage(conn));
     362               0 :         return false;
     363                 :     }
     364                 : 
     365 GIC          99 :     return true;
     366                 : }
     367                 : 
     368                 : /*
     369 ECB             :  * Check that the server version we're connected to is supported by
     370                 :  * ReceiveXlogStream().
     371                 :  *
     372                 :  * If it's not, an error message is printed to stderr, and false is returned.
     373                 :  */
     374                 : bool
     375 GIC         218 : CheckServerVersionForStreaming(PGconn *conn)
     376                 : {
     377                 :     int         minServerMajor,
     378                 :                 maxServerMajor;
     379                 :     int         serverMajor;
     380                 : 
     381 ECB             :     /*
     382                 :      * The message format used in streaming replication changed in 9.3, so we
     383                 :      * cannot stream from older servers. And we don't support servers newer
     384                 :      * than the client; it might work, but we don't know, so err on the safe
     385                 :      * side.
     386 EUB             :      */
     387 GIC         218 :     minServerMajor = 903;
     388 GBC         218 :     maxServerMajor = PG_VERSION_NUM / 100;
     389 GIC         218 :     serverMajor = PQserverVersion(conn) / 100;
     390             218 :     if (serverMajor < minServerMajor)
     391 EUB             :     {
     392 UIC           0 :         const char *serverver = PQparameterStatus(conn, "server_version");
     393 ECB             : 
     394 UIC           0 :         pg_log_error("incompatible server version %s; client does not support streaming from server versions older than %s",
     395 EUB             :                      serverver ? serverver : "'unknown'",
     396                 :                      "9.3");
     397 UBC           0 :         return false;
     398                 :     }
     399 GIC         218 :     else if (serverMajor > maxServerMajor)
     400 EUB             :     {
     401 UIC           0 :         const char *serverver = PQparameterStatus(conn, "server_version");
     402 ECB             : 
     403 UIC           0 :         pg_log_error("incompatible server version %s; client does not support streaming from server versions newer than %s",
     404                 :                      serverver ? serverver : "'unknown'",
     405                 :                      PG_VERSION);
     406               0 :         return false;
     407                 :     }
     408 GIC         218 :     return true;
     409                 : }
     410                 : 
     411                 : /*
     412                 :  * Receive a log stream starting at the specified position.
     413                 :  *
     414                 :  * Individual parameters are passed through the StreamCtl structure.
     415                 :  *
     416                 :  * If sysidentifier is specified, validate that both the system
     417                 :  * identifier and the timeline matches the specified ones
     418                 :  * (by sending an extra IDENTIFY_SYSTEM command)
     419                 :  *
     420                 :  * All received segments will be written to the directory
     421                 :  * specified by basedir. This will also fetch any missing timeline history
     422                 :  * files.
     423                 :  *
     424                 :  * The stream_stop callback will be called every time data
     425                 :  * is received, and whenever a segment is completed. If it returns
     426                 :  * true, the streaming will stop and the function
     427                 :  * return. As long as it returns false, streaming will continue
     428                 :  * indefinitely.
     429                 :  *
     430                 :  * If stream_stop() checks for external input, stop_socket should be set to
     431                 :  * the FD it checks.  This will allow such input to be detected promptly
     432                 :  * rather than after standby_message_timeout (which might be indefinite).
     433                 :  * Note that signals will interrupt waits for input as well, but that is
     434                 :  * race-y since a signal received while busy won't interrupt the wait.
     435                 :  *
     436                 :  * standby_message_timeout controls how often we send a message
     437                 :  * back to the primary letting it know our progress, in milliseconds.
     438                 :  * Zero means no messages are sent.
     439                 :  * This message will only contain the write location, and never
     440                 :  * flush or replay.
     441                 :  *
     442                 :  * If 'partial_suffix' is not NULL, files are initially created with the
     443                 :  * given suffix, and the suffix is removed once the file is finished. That
     444                 :  * allows you to tell the difference between partial and completed files,
     445                 :  * so that you can continue later where you left.
     446                 :  *
     447 ECB             :  * If 'synchronous' is true, the received WAL is flushed as soon as written,
     448                 :  * otherwise only when the WAL file is closed.
     449                 :  *
     450                 :  * Note: The WAL location *must* be at a log segment start!
     451                 :  */
     452                 : bool
     453 GIC         100 : ReceiveXlogStream(PGconn *conn, StreamCtl *stream)
     454                 : {
     455                 :     char        query[128];
     456                 :     char        slotcmd[128];
     457                 :     PGresult   *res;
     458 ECB             :     XLogRecPtr  stoppos;
     459 EUB             : 
     460                 :     /*
     461                 :      * The caller should've checked the server version already, but doesn't do
     462                 :      * any harm to check it here too.
     463                 :      */
     464 GIC         100 :     if (!CheckServerVersionForStreaming(conn))
     465 UIC           0 :         return false;
     466                 : 
     467                 :     /*
     468                 :      * Decide whether we want to report the flush position. If we report the
     469                 :      * flush position, the primary will know what WAL we'll possibly
     470                 :      * re-request, and it can then remove older WAL safely. We must always do
     471                 :      * that when we are using slots.
     472 ECB             :      *
     473                 :      * Reporting the flush position makes one eligible as a synchronous
     474                 :      * replica. People shouldn't include generic names in
     475                 :      * synchronous_standby_names, but we've protected them against it so far,
     476                 :      * so let's continue to do so unless specifically requested.
     477                 :      */
     478 GIC         100 :     if (stream->replication_slot != NULL)
     479 ECB             :     {
     480 CBC          95 :         reportFlushPosition = true;
     481 GIC          95 :         sprintf(slotcmd, "SLOT \"%s\" ", stream->replication_slot);
     482 ECB             :     }
     483                 :     else
     484                 :     {
     485 GIC           5 :         if (stream->synchronous)
     486 CBC           1 :             reportFlushPosition = true;
     487                 :         else
     488               4 :             reportFlushPosition = false;
     489 GIC           5 :         slotcmd[0] = 0;
     490                 :     }
     491                 : 
     492             100 :     if (stream->sysidentifier != NULL)
     493                 :     {
     494 CBC         100 :         char       *sysidentifier = NULL;
     495                 :         TimeLineID  servertli;
     496 EUB             : 
     497                 :         /*
     498                 :          * Get the server system identifier and timeline, and validate them.
     499                 :          */
     500 CBC         100 :         if (!RunIdentifySystem(conn, &sysidentifier, &servertli, NULL, NULL))
     501                 :         {
     502 UBC           0 :             pg_free(sysidentifier);
     503               0 :             return false;
     504 EUB             :         }
     505                 : 
     506 CBC         100 :         if (strcmp(stream->sysidentifier, sysidentifier) != 0)
     507                 :         {
     508 LBC           0 :             pg_log_error("system identifier does not match between base backup and streaming connection");
     509 UIC           0 :             pg_free(sysidentifier);
     510 UBC           0 :             return false;
     511                 :         }
     512 GBC         100 :         pg_free(sysidentifier);
     513                 : 
     514 GIC         100 :         if (stream->timeline > servertli)
     515                 :         {
     516 UIC           0 :             pg_log_error("starting timeline %u is not present in the server",
     517                 :                          stream->timeline);
     518               0 :             return false;
     519                 :         }
     520 ECB             :     }
     521                 : 
     522                 :     /*
     523                 :      * initialize flush position to starting point, it's the caller's
     524                 :      * responsibility that that's sane.
     525                 :      */
     526 GIC         100 :     lastFlushPosition = stream->startpos;
     527                 : 
     528                 :     while (1)
     529                 :     {
     530 ECB             :         /*
     531                 :          * Fetch the timeline history file for this timeline, if we don't have
     532                 :          * it already. When streaming log to tar, this will always return
     533                 :          * false, as we are never streaming into an existing file and
     534                 :          * therefore there can be no pre-existing timeline history file.
     535                 :          */
     536 GIC         101 :         if (!existsTimeLineHistoryFile(stream))
     537 EUB             :         {
     538 GIC           2 :             snprintf(query, sizeof(query), "TIMELINE_HISTORY %u", stream->timeline);
     539 GBC           2 :             res = PQexec(conn, query);
     540               2 :             if (PQresultStatus(res) != PGRES_TUPLES_OK)
     541                 :             {
     542                 :                 /* FIXME: we might send it ok, but get an error */
     543 UIC           0 :                 pg_log_error("could not send replication command \"%s\": %s",
     544                 :                              "TIMELINE_HISTORY", PQresultErrorMessage(res));
     545               0 :                 PQclear(res);
     546               0 :                 return false;
     547 ECB             :             }
     548                 : 
     549 EUB             :             /*
     550                 :              * The response to TIMELINE_HISTORY is a single row result set
     551                 :              * with two fields: filename and content
     552                 :              */
     553 GIC           2 :             if (PQnfields(res) != 2 || PQntuples(res) != 1)
     554 ECB             :             {
     555 UIC           0 :                 pg_log_warning("unexpected response to TIMELINE_HISTORY command: got %d rows and %d fields, expected %d rows and %d fields",
     556                 :                                PQntuples(res), PQnfields(res), 1, 2);
     557                 :             }
     558 ECB             : 
     559                 :             /* Write the history file to disk */
     560 GIC           2 :             writeTimeLineHistoryFile(stream,
     561                 :                                      PQgetvalue(res, 0, 0),
     562                 :                                      PQgetvalue(res, 0, 1));
     563                 : 
     564               2 :             PQclear(res);
     565 ECB             :         }
     566 EUB             : 
     567                 :         /*
     568                 :          * Before we start streaming from the requested location, check if the
     569 ECB             :          * callback tells us to stop here.
     570                 :          */
     571 CBC         101 :         if (stream->stream_stop(stream->startpos, stream->timeline, false))
     572 UIC           0 :             return true;
     573 ECB             : 
     574                 :         /* Initiate the replication stream at specified location */
     575 GIC         101 :         snprintf(query, sizeof(query), "START_REPLICATION %s%X/%X TIMELINE %u",
     576 ECB             :                  slotcmd,
     577 GIC         101 :                  LSN_FORMAT_ARGS(stream->startpos),
     578 ECB             :                  stream->timeline);
     579 CBC         101 :         res = PQexec(conn, query);
     580 GIC         101 :         if (PQresultStatus(res) != PGRES_COPY_BOTH)
     581 ECB             :         {
     582 GIC           2 :             pg_log_error("could not send replication command \"%s\": %s",
     583                 :                          "START_REPLICATION", PQresultErrorMessage(res));
     584 CBC           2 :             PQclear(res);
     585               2 :             return false;
     586 EUB             :         }
     587 GIC          99 :         PQclear(res);
     588                 : 
     589                 :         /* Stream the WAL */
     590              99 :         res = HandleCopyStream(conn, stream, &stoppos);
     591              99 :         if (res == NULL)
     592 UIC           0 :             goto error;
     593                 : 
     594                 :         /*
     595                 :          * Streaming finished.
     596                 :          *
     597                 :          * There are two possible reasons for that: a controlled shutdown, or
     598 ECB             :          * we reached the end of the current timeline. In case of
     599                 :          * end-of-timeline, the server sends a result set after Copy has
     600                 :          * finished, containing information about the next timeline. Read
     601                 :          * that, and restart streaming from the next timeline. In case of
     602                 :          * controlled shutdown, stop here.
     603                 :          */
     604 GIC          99 :         if (PQresultStatus(res) == PGRES_TUPLES_OK)
     605               1 :         {
     606                 :             /*
     607                 :              * End-of-timeline. Read the next timeline's ID and starting
     608                 :              * position. Usually, the starting position will match the end of
     609                 :              * the previous timeline, but there are corner cases like if the
     610                 :              * server had sent us half of a WAL record, when it was promoted.
     611                 :              * The new timeline will begin at the end of the last complete
     612 ECB             :              * record in that case, overlapping the partial WAL record on the
     613                 :              * old timeline.
     614                 :              */
     615 EUB             :             uint32      newtimeline;
     616                 :             bool        parsed;
     617                 : 
     618 CBC           1 :             parsed = ReadEndOfStreamingResult(res, &stream->startpos, &newtimeline);
     619 GIC           1 :             PQclear(res);
     620 GBC           1 :             if (!parsed)
     621 UIC           0 :                 goto error;
     622 EUB             : 
     623                 :             /* Sanity check the values the server gave us */
     624 CBC           1 :             if (newtimeline <= stream->timeline)
     625                 :             {
     626 UBC           0 :                 pg_log_error("server reported unexpected next timeline %u, following timeline %u",
     627                 :                              newtimeline, stream->timeline);
     628 UIC           0 :                 goto error;
     629 EUB             :             }
     630 GIC           1 :             if (stream->startpos > stoppos)
     631                 :             {
     632 UIC           0 :                 pg_log_error("server stopped streaming timeline %u at %X/%X, but reported next timeline %u to begin at %X/%X",
     633 ECB             :                              stream->timeline, LSN_FORMAT_ARGS(stoppos),
     634                 :                              newtimeline, LSN_FORMAT_ARGS(stream->startpos));
     635 UIC           0 :                 goto error;
     636 EUB             :             }
     637                 : 
     638                 :             /* Read the final result, which should be CommandComplete. */
     639 GBC           1 :             res = PQgetResult(conn);
     640 GIC           1 :             if (PQresultStatus(res) != PGRES_COMMAND_OK)
     641 ECB             :             {
     642 UIC           0 :                 pg_log_error("unexpected termination of replication stream: %s",
     643                 :                              PQresultErrorMessage(res));
     644               0 :                 PQclear(res);
     645               0 :                 goto error;
     646                 :             }
     647 CBC           1 :             PQclear(res);
     648 ECB             : 
     649                 :             /*
     650                 :              * Loop back to start streaming from the new timeline. Always
     651                 :              * start streaming at the beginning of a segment.
     652                 :              */
     653 GIC           1 :             stream->timeline = newtimeline;
     654 CBC           1 :             stream->startpos = stream->startpos -
     655 GIC           1 :                 XLogSegmentOffset(stream->startpos, WalSegSz);
     656               1 :             continue;
     657                 :         }
     658              98 :         else if (PQresultStatus(res) == PGRES_COMMAND_OK)
     659                 :         {
     660              97 :             PQclear(res);
     661                 : 
     662 ECB             :             /*
     663                 :              * End of replication (ie. controlled shut down of the server).
     664                 :              *
     665                 :              * Check if the callback thinks it's OK to stop here. If not,
     666 EUB             :              * complain.
     667                 :              */
     668 GIC          97 :             if (stream->stream_stop(stoppos, stream->timeline, false))
     669              97 :                 return true;
     670                 :             else
     671                 :             {
     672 UIC           0 :                 pg_log_error("replication stream was terminated before stop point");
     673 LBC           0 :                 goto error;
     674                 :             }
     675 ECB             :         }
     676                 :         else
     677                 :         {
     678                 :             /* Server returned an error. */
     679 GIC           1 :             pg_log_error("unexpected termination of replication stream: %s",
     680 ECB             :                          PQresultErrorMessage(res));
     681 CBC           1 :             PQclear(res);
     682 GBC           1 :             goto error;
     683                 :         }
     684 ECB             :     }
     685                 : 
     686 GIC           1 : error:
     687 GNC           1 :     if (walfile != NULL && stream->walmethod->ops->close(walfile, CLOSE_NO_RENAME) != 0)
     688 UIC           0 :         pg_log_error("could not close file \"%s\": %s",
     689                 :                      walfile->pathname, GetLastWalMethodError(stream->walmethod));
     690 GIC           1 :     walfile = NULL;
     691               1 :     return false;
     692                 : }
     693 ECB             : 
     694                 : /*
     695                 :  * Helper function to parse the result set returned by server after streaming
     696                 :  * has finished. On failure, prints an error to stderr and returns false.
     697                 :  */
     698                 : static bool
     699 GIC           1 : ReadEndOfStreamingResult(PGresult *res, XLogRecPtr *startpos, uint32 *timeline)
     700                 : {
     701                 :     uint32      startpos_xlogid,
     702                 :                 startpos_xrecoff;
     703                 : 
     704                 :     /*----------
     705                 :      * The result set consists of one row and two columns, e.g:
     706                 :      *
     707                 :      *  next_tli | next_tli_startpos
     708                 :      * ----------+-------------------
     709                 :      *         4 | 0/9949AE0
     710 ECB             :      *
     711                 :      * next_tli is the timeline ID of the next timeline after the one that
     712 EUB             :      * just finished streaming. next_tli_startpos is the WAL location where
     713                 :      * the server switched to it.
     714                 :      *----------
     715                 :      */
     716 GIC           1 :     if (PQnfields(res) < 2 || PQntuples(res) != 1)
     717 ECB             :     {
     718 LBC           0 :         pg_log_error("unexpected result set after end-of-timeline: got %d rows and %d fields, expected %d rows and %d fields",
     719                 :                      PQntuples(res), PQnfields(res), 1, 2);
     720 UIC           0 :         return false;
     721 EUB             :     }
     722                 : 
     723 GBC           1 :     *timeline = atoi(PQgetvalue(res, 0, 0));
     724 GIC           1 :     if (sscanf(PQgetvalue(res, 0, 1), "%X/%X", &startpos_xlogid,
     725 ECB             :                &startpos_xrecoff) != 2)
     726                 :     {
     727 LBC           0 :         pg_log_error("could not parse next timeline's starting point \"%s\"",
     728                 :                      PQgetvalue(res, 0, 1));
     729 UIC           0 :         return false;
     730                 :     }
     731 GIC           1 :     *startpos = ((uint64) startpos_xlogid << 32) | startpos_xrecoff;
     732                 : 
     733               1 :     return true;
     734                 : }
     735                 : 
     736                 : /*
     737                 :  * The main loop of ReceiveXlogStream. Handles the COPY stream after
     738                 :  * initiating streaming with the START_REPLICATION command.
     739 ECB             :  *
     740                 :  * If the COPY ends (not necessarily successfully) due a message from the
     741                 :  * server, returns a PGresult and sets *stoppos to the last byte written.
     742                 :  * On any other sort of error, returns NULL.
     743                 :  */
     744                 : static PGresult *
     745 GIC          99 : HandleCopyStream(PGconn *conn, StreamCtl *stream,
     746 ECB             :                  XLogRecPtr *stoppos)
     747                 : {
     748 GIC          99 :     char       *copybuf = NULL;
     749 CBC          99 :     TimestampTz last_status = -1;
     750 GIC          99 :     XLogRecPtr  blockpos = stream->startpos;
     751                 : 
     752              99 :     still_sending = true;
     753                 : 
     754                 :     while (1)
     755            3532 :     {
     756                 :         int         r;
     757 ECB             :         TimestampTz now;
     758 EUB             :         long        sleeptime;
     759                 : 
     760 ECB             :         /*
     761                 :          * Check if we should continue streaming, or abort at this point.
     762                 :          */
     763 GIC        3631 :         if (!CheckCopyStreamStop(conn, stream, blockpos))
     764 UIC           0 :             goto error;
     765                 : 
     766 CBC        3631 :         now = feGetCurrentTimestamp();
     767                 : 
     768 EUB             :         /*
     769                 :          * If synchronous option is true, issue sync command as soon as there
     770                 :          * are WAL data which has not been flushed yet.
     771                 :          */
     772 GIC        3631 :         if (stream->synchronous && lastFlushPosition < blockpos && walfile != NULL)
     773                 :         {
     774 UNC           0 :             if (stream->walmethod->ops->sync(walfile) != 0)
     775 UIC           0 :                 pg_fatal("could not fsync file \"%s\": %s",
     776                 :                          walfile->pathname, GetLastWalMethodError(stream->walmethod));
     777 UBC           0 :             lastFlushPosition = blockpos;
     778 EUB             : 
     779                 :             /*
     780                 :              * Send feedback so that the server sees the latest WAL locations
     781                 :              * immediately.
     782                 :              */
     783 UIC           0 :             if (!sendFeedback(conn, blockpos, now, false))
     784               0 :                 goto error;
     785 LBC           0 :             last_status = now;
     786 ECB             :         }
     787                 : 
     788                 :         /*
     789                 :          * Potentially send a status message to the primary
     790                 :          */
     791 GBC        7179 :         if (still_sending && stream->standby_message_timeout > 0 &&
     792 CBC        3548 :             feTimestampDifferenceExceeds(last_status, now,
     793                 :                                          stream->standby_message_timeout))
     794                 :         {
     795                 :             /* Time to send feedback! */
     796 GIC          99 :             if (!sendFeedback(conn, blockpos, now, false))
     797 UIC           0 :                 goto error;
     798 CBC          99 :             last_status = now;
     799                 :         }
     800                 : 
     801 ECB             :         /*
     802                 :          * Calculate how long send/receive loops should sleep
     803                 :          */
     804 CBC        3631 :         sleeptime = CalculateCopyStreamSleeptime(now, stream->standby_message_timeout,
     805 EUB             :                                                  last_status);
     806 ECB             : 
     807 GIC        3631 :         r = CopyStreamReceive(conn, sleeptime, stream->stop_socket, &copybuf);
     808 CBC       10379 :         while (r != 0)
     809                 :         {
     810            6847 :             if (r == -1)
     811 UBC           0 :                 goto error;
     812 GIC        6847 :             if (r == -2)
     813 ECB             :             {
     814 GIC          99 :                 PGresult   *res = HandleEndOfCopyStream(conn, stream, copybuf, blockpos, stoppos);
     815                 : 
     816              99 :                 if (res == NULL)
     817 LBC           0 :                     goto error;
     818                 :                 else
     819 GBC          99 :                     return res;
     820                 :             }
     821 EUB             : 
     822                 :             /* Check the message type. */
     823 CBC        6748 :             if (copybuf[0] == 'k')
     824                 :             {
     825 LBC           0 :                 if (!ProcessKeepaliveMsg(conn, stream, copybuf, r, blockpos,
     826 EUB             :                                          &last_status))
     827 UIC           0 :                     goto error;
     828                 :             }
     829 GIC        6748 :             else if (copybuf[0] == 'w')
     830                 :             {
     831            6748 :                 if (!ProcessXLogDataMsg(conn, stream, copybuf, r, &blockpos))
     832 LBC           0 :                     goto error;
     833 EUB             : 
     834                 :                 /*
     835                 :                  * Check if we should continue streaming, or abort at this
     836                 :                  * point.
     837                 :                  */
     838 GIC        6748 :                 if (!CheckCopyStreamStop(conn, stream, blockpos))
     839 UBC           0 :                     goto error;
     840                 :             }
     841                 :             else
     842                 :             {
     843 UIC           0 :                 pg_log_error("unrecognized streaming header: \"%c\"",
     844                 :                              copybuf[0]);
     845               0 :                 goto error;
     846 ECB             :             }
     847                 : 
     848                 :             /*
     849                 :              * Process the received data, and any subsequent data we can read
     850 EUB             :              * without blocking.
     851                 :              */
     852 GBC        6748 :             r = CopyStreamReceive(conn, 0, stream->stop_socket, &copybuf);
     853                 :         }
     854                 :     }
     855                 : 
     856 UIC           0 : error:
     857 UNC           0 :     PQfreemem(copybuf);
     858 UIC           0 :     return NULL;
     859                 : }
     860                 : 
     861                 : /*
     862                 :  * Wait until we can read a CopyData message,
     863 ECB             :  * or timeout, or occurrence of a signal or input on the stop_socket.
     864                 :  * (timeout_ms < 0 means wait indefinitely; 0 means don't wait.)
     865                 :  *
     866                 :  * Returns 1 if data has become available for reading, 0 if timed out
     867                 :  * or interrupted by signal or stop_socket input, and -1 on an error.
     868                 :  */
     869                 : static int
     870 GIC       10257 : CopyStreamPoll(PGconn *conn, long timeout_ms, pgsocket stop_socket)
     871                 : {
     872 ECB             :     int         ret;
     873                 :     fd_set      input_mask;
     874                 :     int         connsocket;
     875 EUB             :     int         maxfd;
     876                 :     struct timeval timeout;
     877                 :     struct timeval *timeoutptr;
     878                 : 
     879 CBC       10257 :     connsocket = PQsocket(conn);
     880           10257 :     if (connsocket < 0)
     881 ECB             :     {
     882 LBC           0 :         pg_log_error("invalid socket: %s", PQerrorMessage(conn));
     883 UIC           0 :         return -1;
     884 ECB             :     }
     885                 : 
     886 GIC      174369 :     FD_ZERO(&input_mask);
     887           10257 :     FD_SET(connsocket, &input_mask);
     888 CBC       10257 :     maxfd = connsocket;
     889           10257 :     if (stop_socket != PGINVALID_SOCKET)
     890                 :     {
     891 GIC       10091 :         FD_SET(stop_socket, &input_mask);
     892 CBC       10091 :         maxfd = Max(maxfd, stop_socket);
     893 ECB             :     }
     894                 : 
     895 GIC       10257 :     if (timeout_ms < 0)
     896              83 :         timeoutptr = NULL;
     897 ECB             :     else
     898                 :     {
     899 CBC       10174 :         timeout.tv_sec = timeout_ms / 1000L;
     900 GIC       10174 :         timeout.tv_usec = (timeout_ms % 1000L) * 1000L;
     901 GBC       10174 :         timeoutptr = &timeout;
     902 EUB             :     }
     903                 : 
     904 GBC       10257 :     ret = select(maxfd + 1, &input_mask, NULL, NULL, timeoutptr);
     905                 : 
     906 CBC       10257 :     if (ret < 0)
     907 ECB             :     {
     908 UIC           0 :         if (errno == EINTR)
     909 LBC           0 :             return 0;           /* Got a signal, so not an error */
     910 UIC           0 :         pg_log_error("%s() failed: %m", "select");
     911               0 :         return -1;
     912                 :     }
     913 GIC       10257 :     if (ret > 0 && FD_ISSET(connsocket, &input_mask))
     914            6987 :         return 1;               /* Got input on connection socket */
     915                 : 
     916            3270 :     return 0;                   /* Got timeout or input on stop_socket */
     917                 : }
     918                 : 
     919                 : /*
     920                 :  * Receive CopyData message available from XLOG stream, blocking for
     921                 :  * maximum of 'timeout' ms.
     922                 :  *
     923                 :  * If data was received, returns the length of the data. *buffer is set to
     924                 :  * point to a buffer holding the received message. The buffer is only valid
     925 ECB             :  * until the next CopyStreamReceive call.
     926                 :  *
     927                 :  * Returns 0 if no data was available within timeout, or if wait was
     928                 :  * interrupted by signal or stop_socket input.
     929                 :  * -1 on error. -2 if the server ended the COPY.
     930                 :  */
     931                 : static int
     932 CBC       10379 : CopyStreamReceive(PGconn *conn, long timeout, pgsocket stop_socket,
     933                 :                   char **buffer)
     934                 : {
     935           10379 :     char       *copybuf = NULL;
     936 ECB             :     int         rawlen;
     937                 : 
     938 GNC       10379 :     PQfreemem(*buffer);
     939 GIC       10379 :     *buffer = NULL;
     940                 : 
     941                 :     /* Try to receive a CopyData message */
     942           10379 :     rawlen = PQgetCopyData(conn, &copybuf, 1);
     943           10379 :     if (rawlen == 0)
     944 ECB             :     {
     945                 :         int         ret;
     946                 : 
     947                 :         /*
     948                 :          * No data available.  Wait for some to appear, but not longer than
     949                 :          * the specified timeout, so that we can ping the server.  Also stop
     950                 :          * waiting if input appears on stop_socket.
     951 EUB             :          */
     952 GIC       10257 :         ret = CopyStreamPoll(conn, timeout, stop_socket);
     953 GBC       10257 :         if (ret <= 0)
     954 GIC        3270 :             return ret;
     955                 : 
     956                 :         /* Now there is actually data on the socket */
     957 CBC        6987 :         if (PQconsumeInput(conn) == 0)
     958 ECB             :         {
     959 LBC           0 :             pg_log_error("could not receive data from WAL stream: %s",
     960                 :                          PQerrorMessage(conn));
     961               0 :             return -1;
     962 ECB             :         }
     963                 : 
     964                 :         /* Now that we've consumed some input, try again */
     965 GBC        6987 :         rawlen = PQgetCopyData(conn, &copybuf, 1);
     966            6987 :         if (rawlen == 0)
     967 GIC         262 :             return 0;
     968                 :     }
     969            6847 :     if (rawlen == -1)           /* end-of-streaming or error */
     970 CBC          99 :         return -2;
     971            6748 :     if (rawlen == -2)
     972                 :     {
     973 UIC           0 :         pg_log_error("could not read COPY data: %s", PQerrorMessage(conn));
     974               0 :         return -1;
     975                 :     }
     976                 : 
     977                 :     /* Return received messages to caller */
     978 GBC        6748 :     *buffer = copybuf;
     979 GIC        6748 :     return rawlen;
     980                 : }
     981                 : 
     982                 : /*
     983                 :  * Process the keepalive message.
     984                 :  */
     985                 : static bool
     986 UIC           0 : ProcessKeepaliveMsg(PGconn *conn, StreamCtl *stream, char *copybuf, int len,
     987                 :                     XLogRecPtr blockpos, TimestampTz *last_status)
     988                 : {
     989 EUB             :     int         pos;
     990                 :     bool        replyRequested;
     991                 :     TimestampTz now;
     992                 : 
     993                 :     /*
     994                 :      * Parse the keepalive message, enclosed in the CopyData message. We just
     995                 :      * check if the server requested a reply, and ignore the rest.
     996                 :      */
     997 UIC           0 :     pos = 1;                    /* skip msgtype 'k' */
     998 UBC           0 :     pos += 8;                   /* skip walEnd */
     999 UIC           0 :     pos += 8;                   /* skip sendTime */
    1000                 : 
    1001 UBC           0 :     if (len < pos + 1)
    1002                 :     {
    1003               0 :         pg_log_error("streaming header too small: %d", len);
    1004               0 :         return false;
    1005                 :     }
    1006 UIC           0 :     replyRequested = copybuf[pos];
    1007                 : 
    1008                 :     /* If the server requested an immediate reply, send one. */
    1009               0 :     if (replyRequested && still_sending)
    1010                 :     {
    1011               0 :         if (reportFlushPosition && lastFlushPosition < blockpos &&
    1012               0 :             walfile != NULL)
    1013 EUB             :         {
    1014                 :             /*
    1015                 :              * If a valid flush location needs to be reported, flush the
    1016                 :              * current WAL file so that the latest flush location is sent back
    1017                 :              * to the server. This is necessary to see whether the last WAL
    1018                 :              * data has been successfully replicated or not, at the normal
    1019                 :              * shutdown of the server.
    1020                 :              */
    1021 UNC           0 :             if (stream->walmethod->ops->sync(walfile) != 0)
    1022 UBC           0 :                 pg_fatal("could not fsync file \"%s\": %s",
    1023                 :                          walfile->pathname, GetLastWalMethodError(stream->walmethod));
    1024 UIC           0 :             lastFlushPosition = blockpos;
    1025 EUB             :         }
    1026                 : 
    1027 UIC           0 :         now = feGetCurrentTimestamp();
    1028               0 :         if (!sendFeedback(conn, blockpos, now, false))
    1029               0 :             return false;
    1030               0 :         *last_status = now;
    1031                 :     }
    1032 ECB             : 
    1033 UIC           0 :     return true;
    1034                 : }
    1035                 : 
    1036                 : /*
    1037                 :  * Process XLogData message.
    1038                 :  */
    1039                 : static bool
    1040 GIC        6748 : ProcessXLogDataMsg(PGconn *conn, StreamCtl *stream, char *copybuf, int len,
    1041                 :                    XLogRecPtr *blockpos)
    1042                 : {
    1043                 :     int         xlogoff;
    1044 ECB             :     int         bytes_left;
    1045                 :     int         bytes_written;
    1046                 :     int         hdr_len;
    1047                 : 
    1048                 :     /*
    1049                 :      * Once we've decided we don't want to receive any more, just ignore any
    1050                 :      * subsequent XLogData messages.
    1051                 :      */
    1052 CBC        6748 :     if (!(still_sending))
    1053              86 :         return true;
    1054 ECB             : 
    1055                 :     /*
    1056                 :      * Read the header of the XLogData message, enclosed in the CopyData
    1057                 :      * message. We only need the WAL location field (dataStart), the rest of
    1058 EUB             :      * the header is ignored.
    1059                 :      */
    1060 GIC        6662 :     hdr_len = 1;                /* msgtype 'w' */
    1061 CBC        6662 :     hdr_len += 8;               /* dataStart */
    1062 GIC        6662 :     hdr_len += 8;               /* walEnd */
    1063            6662 :     hdr_len += 8;               /* sendTime */
    1064 CBC        6662 :     if (len < hdr_len)
    1065                 :     {
    1066 UIC           0 :         pg_log_error("streaming header too small: %d", len);
    1067               0 :         return false;
    1068                 :     }
    1069 GIC        6662 :     *blockpos = fe_recvint64(&copybuf[1]);
    1070 ECB             : 
    1071                 :     /* Extract WAL location for this block */
    1072 GIC        6662 :     xlogoff = XLogSegmentOffset(*blockpos, WalSegSz);
    1073 ECB             : 
    1074                 :     /*
    1075 EUB             :      * Verify that the initial location in the stream matches where we think
    1076                 :      * we are.
    1077                 :      */
    1078 GIC        6662 :     if (walfile == NULL)
    1079                 :     {
    1080                 :         /* No file open yet */
    1081             105 :         if (xlogoff != 0)
    1082                 :         {
    1083 LBC           0 :             pg_log_error("received write-ahead log record for offset %u with no file open",
    1084                 :                          xlogoff);
    1085 UBC           0 :             return false;
    1086                 :         }
    1087 EUB             :     }
    1088                 :     else
    1089                 :     {
    1090                 :         /* More data in existing segment */
    1091 GNC        6557 :         if (walfile->currpos != xlogoff)
    1092 ECB             :         {
    1093 UIC           0 :             pg_log_error("got WAL data offset %08x, expected %08x",
    1094                 :                          xlogoff, (int) walfile->currpos);
    1095               0 :             return false;
    1096                 :         }
    1097                 :     }
    1098                 : 
    1099 GIC        6662 :     bytes_left = len - hdr_len;
    1100            6662 :     bytes_written = 0;
    1101                 : 
    1102 CBC       13323 :     while (bytes_left)
    1103 EUB             :     {
    1104                 :         int         bytes_to_write;
    1105 ECB             : 
    1106                 :         /*
    1107                 :          * If crossing a WAL boundary, only write up until we reach wal
    1108                 :          * segment size.
    1109                 :          */
    1110 GIC        6662 :         if (xlogoff + bytes_left > WalSegSz)
    1111 UIC           0 :             bytes_to_write = WalSegSz - xlogoff;
    1112 EUB             :         else
    1113 GIC        6662 :             bytes_to_write = bytes_left;
    1114                 : 
    1115            6662 :         if (walfile == NULL)
    1116 ECB             :         {
    1117 CBC         105 :             if (!open_walfile(stream, *blockpos))
    1118 ECB             :             {
    1119                 :                 /* Error logged by open_walfile */
    1120 UBC           0 :                 return false;
    1121                 :             }
    1122                 :         }
    1123 EUB             : 
    1124 GNC       13324 :         if (stream->walmethod->ops->write(walfile,
    1125            6662 :                                           copybuf + hdr_len + bytes_written,
    1126            6662 :                                           bytes_to_write) != bytes_to_write)
    1127                 :         {
    1128 LBC           0 :             pg_log_error("could not write %d bytes to WAL file \"%s\": %s",
    1129                 :                          bytes_to_write, walfile->pathname,
    1130                 :                          GetLastWalMethodError(stream->walmethod));
    1131               0 :             return false;
    1132                 :         }
    1133                 : 
    1134 ECB             :         /* Write was successful, advance our position */
    1135 GIC        6662 :         bytes_written += bytes_to_write;
    1136 CBC        6662 :         bytes_left -= bytes_to_write;
    1137 GIC        6662 :         *blockpos += bytes_to_write;
    1138 GBC        6662 :         xlogoff += bytes_to_write;
    1139                 : 
    1140 ECB             :         /* Did we reach the end of a WAL segment? */
    1141 GIC        6662 :         if (XLogSegmentOffset(*blockpos, WalSegSz) == 0)
    1142 ECB             :         {
    1143 GIC          46 :             if (!close_walfile(stream, *blockpos))
    1144 ECB             :                 /* Error message written in close_walfile() */
    1145 UIC           0 :                 return false;
    1146 EUB             : 
    1147 GIC          46 :             xlogoff = 0;
    1148 EUB             : 
    1149 GIC          46 :             if (still_sending && stream->stream_stop(*blockpos, stream->timeline, true))
    1150 ECB             :             {
    1151 CBC           1 :                 if (PQputCopyEnd(conn, NULL) <= 0 || PQflush(conn))
    1152                 :                 {
    1153 UIC           0 :                     pg_log_error("could not send copy-end packet: %s",
    1154                 :                                  PQerrorMessage(conn));
    1155               0 :                     return false;
    1156                 :                 }
    1157 CBC           1 :                 still_sending = false;
    1158 GIC           1 :                 return true;    /* ignore the rest of this XLogData packet */
    1159                 :             }
    1160                 :         }
    1161                 :     }
    1162                 :     /* No more data left to write, receive next copy packet */
    1163                 : 
    1164 CBC        6661 :     return true;
    1165                 : }
    1166                 : 
    1167 ECB             : /*
    1168                 :  * Handle end of the copy stream.
    1169                 :  */
    1170                 : static PGresult *
    1171 GIC          99 : HandleEndOfCopyStream(PGconn *conn, StreamCtl *stream, char *copybuf,
    1172                 :                       XLogRecPtr blockpos, XLogRecPtr *stoppos)
    1173                 : {
    1174 CBC          99 :     PGresult   *res = PQgetResult(conn);
    1175                 : 
    1176 ECB             :     /*
    1177                 :      * The server closed its end of the copy stream.  If we haven't closed
    1178                 :      * ours already, we need to do so now, unless the server threw an error,
    1179 EUB             :      * in which case we don't.
    1180                 :      */
    1181 GIC          99 :     if (still_sending)
    1182 ECB             :     {
    1183 GIC           2 :         if (!close_walfile(stream, blockpos))
    1184 ECB             :         {
    1185                 :             /* Error message written in close_walfile() */
    1186 UBC           0 :             PQclear(res);
    1187 UIC           0 :             return NULL;
    1188 EUB             :         }
    1189 GBC           2 :         if (PQresultStatus(res) == PGRES_COPY_IN)
    1190                 :         {
    1191 CBC           1 :             if (PQputCopyEnd(conn, NULL) <= 0 || PQflush(conn))
    1192                 :             {
    1193 LBC           0 :                 pg_log_error("could not send copy-end packet: %s",
    1194                 :                              PQerrorMessage(conn));
    1195               0 :                 PQclear(res);
    1196               0 :                 return NULL;
    1197 ECB             :             }
    1198 GIC           1 :             res = PQgetResult(conn);
    1199                 :         }
    1200               2 :         still_sending = false;
    1201                 :     }
    1202 GNC          99 :     PQfreemem(copybuf);
    1203 CBC          99 :     *stoppos = blockpos;
    1204 GIC          99 :     return res;
    1205 ECB             : }
    1206                 : 
    1207                 : /*
    1208                 :  * Check if we should continue streaming, or abort at this point.
    1209                 :  */
    1210 EUB             : static bool
    1211 GIC       10379 : CheckCopyStreamStop(PGconn *conn, StreamCtl *stream, XLogRecPtr blockpos)
    1212 ECB             : {
    1213 GIC       10379 :     if (still_sending && stream->stream_stop(blockpos, stream->timeline, false))
    1214 EUB             :     {
    1215 GIC          96 :         if (!close_walfile(stream, blockpos))
    1216 EUB             :         {
    1217                 :             /* Potential error message is written by close_walfile */
    1218 LBC           0 :             return false;
    1219                 :         }
    1220 GIC          96 :         if (PQputCopyEnd(conn, NULL) <= 0 || PQflush(conn))
    1221 ECB             :         {
    1222 UIC           0 :             pg_log_error("could not send copy-end packet: %s",
    1223                 :                          PQerrorMessage(conn));
    1224               0 :             return false;
    1225                 :         }
    1226 GIC          96 :         still_sending = false;
    1227                 :     }
    1228 ECB             : 
    1229 GIC       10379 :     return true;
    1230                 : }
    1231 ECB             : 
    1232                 : /*
    1233                 :  * Calculate how long send/receive loops should sleep
    1234                 :  */
    1235                 : static long
    1236 CBC        3631 : CalculateCopyStreamSleeptime(TimestampTz now, int standby_message_timeout,
    1237                 :                              TimestampTz last_status)
    1238 ECB             : {
    1239 GIC        3631 :     TimestampTz status_targettime = 0;
    1240                 :     long        sleeptime;
    1241                 : 
    1242            3631 :     if (standby_message_timeout && still_sending)
    1243 CBC        3548 :         status_targettime = last_status +
    1244 GIC        3548 :             (standby_message_timeout - 1) * ((int64) 1000);
    1245                 : 
    1246            3631 :     if (status_targettime > 0)
    1247                 :     {
    1248 ECB             :         long        secs;
    1249                 :         int         usecs;
    1250 EUB             : 
    1251 GBC        3548 :         feTimestampDifference(now,
    1252                 :                               status_targettime,
    1253                 :                               &secs,
    1254 ECB             :                               &usecs);
    1255                 :         /* Always sleep at least 1 sec */
    1256 GIC        3548 :         if (secs <= 0)
    1257 ECB             :         {
    1258 UIC           0 :             secs = 1;
    1259 LBC           0 :             usecs = 0;
    1260                 :         }
    1261                 : 
    1262 GIC        3548 :         sleeptime = secs * 1000 + usecs / 1000;
    1263                 :     }
    1264                 :     else
    1265              83 :         sleeptime = -1;
    1266                 : 
    1267            3631 :     return sleeptime;
    1268                 : }
        

Generated by: LCOV version v1.16-55-g56c0a2a