Age Owner Branch data TLA Line data Source code
1 : : /*-------------------------------------------------------------------------
2 : : *
3 : : * receivelog.c - receive WAL files using the streaming
4 : : * replication protocol.
5 : : *
6 : : * Author: Magnus Hagander <magnus@hagander.net>
7 : : *
8 : : * Portions Copyright (c) 1996-2024, PostgreSQL Global Development Group
9 : : *
10 : : * IDENTIFICATION
11 : : * src/bin/pg_basebackup/receivelog.c
12 : : *-------------------------------------------------------------------------
13 : : */
14 : :
15 : : #include "postgres_fe.h"
16 : :
17 : : #include <sys/select.h>
18 : : #include <sys/stat.h>
19 : : #include <unistd.h>
20 : :
21 : : #include "access/xlog_internal.h"
22 : : #include "common/file_utils.h"
23 : : #include "common/logging.h"
24 : : #include "libpq-fe.h"
25 : : #include "receivelog.h"
26 : : #include "streamutil.h"
27 : :
28 : : /* currently open WAL file */
29 : : static Walfile *walfile = NULL;
30 : : static bool reportFlushPosition = false;
31 : : static XLogRecPtr lastFlushPosition = InvalidXLogRecPtr;
32 : :
33 : : static bool still_sending = true; /* feedback still needs to be sent? */
34 : :
35 : : static PGresult *HandleCopyStream(PGconn *conn, StreamCtl *stream,
36 : : XLogRecPtr *stoppos);
37 : : static int CopyStreamPoll(PGconn *conn, long timeout_ms, pgsocket stop_socket);
38 : : static int CopyStreamReceive(PGconn *conn, long timeout, pgsocket stop_socket,
39 : : char **buffer);
40 : : static bool ProcessKeepaliveMsg(PGconn *conn, StreamCtl *stream, char *copybuf,
41 : : int len, XLogRecPtr blockpos, TimestampTz *last_status);
42 : : static bool ProcessXLogDataMsg(PGconn *conn, StreamCtl *stream, char *copybuf, int len,
43 : : XLogRecPtr *blockpos);
44 : : static PGresult *HandleEndOfCopyStream(PGconn *conn, StreamCtl *stream, char *copybuf,
45 : : XLogRecPtr blockpos, XLogRecPtr *stoppos);
46 : : static bool CheckCopyStreamStop(PGconn *conn, StreamCtl *stream, XLogRecPtr blockpos);
47 : : static long CalculateCopyStreamSleeptime(TimestampTz now, int standby_message_timeout,
48 : : TimestampTz last_status);
49 : :
50 : : static bool ReadEndOfStreamingResult(PGresult *res, XLogRecPtr *startpos,
51 : : uint32 *timeline);
52 : :
53 : : static bool
2730 magnus@hagander.net 54 :CBC 30 : mark_file_as_archived(StreamCtl *stream, const char *fname)
55 : : {
56 : : Walfile *f;
57 : : static char tmppath[MAXPGPATH];
58 : :
59 : 30 : snprintf(tmppath, sizeof(tmppath), "archive_status/%s.done",
60 : : fname);
61 : :
573 rhaas@postgresql.org 62 : 30 : f = stream->walmethod->ops->open_for_write(stream->walmethod, tmppath,
63 : : NULL, 0);
2730 magnus@hagander.net 64 [ - + ]: 30 : if (f == NULL)
65 : : {
1840 peter@eisentraut.org 66 :UBC 0 : pg_log_error("could not create archive status file \"%s\": %s",
67 : : tmppath, GetLastWalMethodError(stream->walmethod));
3389 andres@anarazel.de 68 : 0 : return false;
69 : : }
70 : :
573 rhaas@postgresql.org 71 [ - + ]:CBC 30 : if (stream->walmethod->ops->close(f, CLOSE_NORMAL) != 0)
72 : : {
879 tgl@sss.pgh.pa.us 73 :UBC 0 : pg_log_error("could not close archive status file \"%s\": %s",
74 : : tmppath, GetLastWalMethodError(stream->walmethod));
75 : 0 : return false;
76 : : }
77 : :
3389 andres@anarazel.de 78 :CBC 30 : return true;
79 : : }
80 : :
81 : : /*
82 : : * Open a new WAL file in the specified directory.
83 : : *
84 : : * Returns true if OK; on failure, returns false after printing an error msg.
85 : : * On success, 'walfile' is set to the opened WAL file.
86 : : *
87 : : * The file will be padded to 16Mb with zeroes.
88 : : */
89 : : static bool
2956 magnus@hagander.net 90 : 130 : open_walfile(StreamCtl *stream, XLogRecPtr startpoint)
91 : : {
92 : : Walfile *f;
93 : : char *fn;
94 : : ssize_t size;
95 : : XLogSegNo segno;
96 : : char walfile_name[MAXPGPATH];
97 : :
2399 andres@anarazel.de 98 : 130 : XLByteToSeg(startpoint, segno, WalSegSz);
573 rhaas@postgresql.org 99 : 130 : XLogFileName(walfile_name, stream->timeline, segno, WalSegSz);
100 : :
101 : : /* Note that this considers the compression used if necessary */
102 : 130 : fn = stream->walmethod->ops->get_file_name(stream->walmethod,
103 : : walfile_name,
104 : 130 : stream->partial_suffix);
105 : :
106 : : /*
107 : : * When streaming to files, if an existing file exists we verify that it's
108 : : * either empty (just created), or a complete WalSegSz segment (in which
109 : : * case it has been created and padded). Anything else indicates a corrupt
110 : : * file. Compressed files have no need for padding, so just ignore this
111 : : * case.
112 : : *
113 : : * When streaming to tar, no file with this name will exist before, so we
114 : : * never have to verify a size.
115 : : */
116 [ + + - + ]: 253 : if (stream->walmethod->compression_algorithm == PG_COMPRESSION_NONE &&
117 : 123 : stream->walmethod->ops->existsfile(stream->walmethod, fn))
118 : : {
573 rhaas@postgresql.org 119 :UBC 0 : size = stream->walmethod->ops->get_file_size(stream->walmethod, fn);
2730 magnus@hagander.net 120 [ # # ]: 0 : if (size < 0)
121 : : {
1840 peter@eisentraut.org 122 : 0 : pg_log_error("could not get size of write-ahead log file \"%s\": %s",
123 : : fn, GetLastWalMethodError(stream->walmethod));
993 michael@paquier.xyz 124 : 0 : pg_free(fn);
2730 magnus@hagander.net 125 : 0 : return false;
126 : : }
2399 andres@anarazel.de 127 [ # # ]: 0 : if (size == WalSegSz)
128 : : {
129 : : /* Already padded file. Open it for use */
573 rhaas@postgresql.org 130 : 0 : f = stream->walmethod->ops->open_for_write(stream->walmethod, walfile_name, stream->partial_suffix, 0);
2730 magnus@hagander.net 131 [ # # ]: 0 : if (f == NULL)
132 : : {
1840 peter@eisentraut.org 133 : 0 : pg_log_error("could not open existing write-ahead log file \"%s\": %s",
134 : : fn, GetLastWalMethodError(stream->walmethod));
993 michael@paquier.xyz 135 : 0 : pg_free(fn);
2751 tgl@sss.pgh.pa.us 136 : 0 : return false;
137 : : }
138 : :
139 : : /* fsync file in case of a previous crash */
573 rhaas@postgresql.org 140 [ # # ]: 0 : if (stream->walmethod->ops->sync(f) != 0)
141 : : {
737 tgl@sss.pgh.pa.us 142 : 0 : pg_log_error("could not fsync existing write-ahead log file \"%s\": %s",
143 : : fn, GetLastWalMethodError(stream->walmethod));
573 rhaas@postgresql.org 144 : 0 : stream->walmethod->ops->close(f, CLOSE_UNLINK);
1721 peter@eisentraut.org 145 : 0 : exit(1);
146 : : }
147 : :
2730 magnus@hagander.net 148 : 0 : walfile = f;
993 michael@paquier.xyz 149 : 0 : pg_free(fn);
2730 magnus@hagander.net 150 : 0 : return true;
151 : : }
152 [ # # ]: 0 : if (size != 0)
153 : : {
154 : : /* if write didn't set errno, assume problem is no disk space */
2751 tgl@sss.pgh.pa.us 155 [ # # ]: 0 : if (errno == 0)
156 : 0 : errno = ENOSPC;
980 peter@eisentraut.org 157 : 0 : pg_log_error(ngettext("write-ahead log file \"%s\" has %zd byte, should be 0 or %d",
158 : : "write-ahead log file \"%s\" has %zd bytes, should be 0 or %d",
159 : : size),
160 : : fn, size, WalSegSz);
993 michael@paquier.xyz 161 : 0 : pg_free(fn);
4105 heikki.linnakangas@i 162 : 0 : return false;
163 : : }
164 : : /* File existed and was empty, so fall through and open */
165 : : }
166 : :
167 : : /* No file existed, so create one */
168 : :
573 rhaas@postgresql.org 169 :CBC 130 : f = stream->walmethod->ops->open_for_write(stream->walmethod,
170 : : walfile_name,
171 : 130 : stream->partial_suffix,
172 : : WalSegSz);
2730 magnus@hagander.net 173 [ - + ]: 130 : if (f == NULL)
174 : : {
1840 peter@eisentraut.org 175 :UBC 0 : pg_log_error("could not open write-ahead log file \"%s\": %s",
176 : : fn, GetLastWalMethodError(stream->walmethod));
993 michael@paquier.xyz 177 : 0 : pg_free(fn);
4105 heikki.linnakangas@i 178 : 0 : return false;
179 : : }
180 : :
993 michael@paquier.xyz 181 :CBC 130 : pg_free(fn);
4105 heikki.linnakangas@i 182 : 130 : walfile = f;
183 : 130 : return true;
184 : : }
185 : :
186 : : /*
187 : : * Close the current WAL file (if open), and rename it to the correct
188 : : * filename if it's complete. On failure, prints an error message to stderr
189 : : * and returns false, otherwise returns true.
190 : : */
191 : : static bool
2956 magnus@hagander.net 192 : 155 : close_walfile(StreamCtl *stream, XLogRecPtr pos)
193 : : {
194 : : char *fn;
195 : : off_t currpos;
196 : : int r;
197 : : char walfile_name[MAXPGPATH];
198 : :
2730 199 [ + + ]: 155 : if (walfile == NULL)
4105 heikki.linnakangas@i 200 : 25 : return true;
201 : :
573 rhaas@postgresql.org 202 : 130 : strlcpy(walfile_name, walfile->pathname, MAXPGPATH);
203 : 130 : currpos = walfile->currpos;
204 : :
205 : : /* Note that this considers the compression used if necessary */
206 : 130 : fn = stream->walmethod->ops->get_file_name(stream->walmethod,
207 : : walfile_name,
208 : 130 : stream->partial_suffix);
209 : :
2730 magnus@hagander.net 210 [ + + ]: 130 : if (stream->partial_suffix)
211 : : {
2399 andres@anarazel.de 212 [ + + ]: 12 : if (currpos == WalSegSz)
573 rhaas@postgresql.org 213 : 6 : r = stream->walmethod->ops->close(walfile, CLOSE_NORMAL);
214 : : else
215 : : {
940 michael@paquier.xyz 216 : 6 : pg_log_info("not renaming \"%s\", segment is not complete", fn);
573 rhaas@postgresql.org 217 : 6 : r = stream->walmethod->ops->close(walfile, CLOSE_NO_RENAME);
218 : : }
219 : : }
220 : : else
221 : 118 : r = stream->walmethod->ops->close(walfile, CLOSE_NORMAL);
222 : :
2730 magnus@hagander.net 223 : 130 : walfile = NULL;
224 : :
225 [ - + ]: 130 : if (r != 0)
226 : : {
1840 peter@eisentraut.org 227 :UBC 0 : pg_log_error("could not close file \"%s\": %s",
228 : : fn, GetLastWalMethodError(stream->walmethod));
229 : :
940 michael@paquier.xyz 230 : 0 : pg_free(fn);
4546 magnus@hagander.net 231 : 0 : return false;
232 : : }
233 : :
940 michael@paquier.xyz 234 :CBC 130 : pg_free(fn);
235 : :
236 : : /*
237 : : * Mark file as archived if requested by the caller - pg_basebackup needs
238 : : * to do so as files can otherwise get archived again after promotion of a
239 : : * new node. This is in line with walreceiver.c always doing a
240 : : * XLogArchiveForceDone() after a complete segment.
241 : : */
2399 andres@anarazel.de 242 [ + + + + ]: 130 : if (currpos == WalSegSz && stream->mark_done)
243 : : {
244 : : /* writes error message if failed */
573 rhaas@postgresql.org 245 [ - + ]: 29 : if (!mark_file_as_archived(stream, walfile_name))
3389 andres@anarazel.de 246 :UBC 0 : return false;
247 : : }
248 : :
3726 rhaas@postgresql.org 249 :CBC 130 : lastFlushPosition = pos;
4546 magnus@hagander.net 250 : 130 : return true;
251 : : }
252 : :
253 : :
254 : : /*
255 : : * Check if a timeline history file exists.
256 : : */
257 : : static bool
2956 258 : 125 : existsTimeLineHistoryFile(StreamCtl *stream)
259 : : {
260 : : char histfname[MAXFNAMELEN];
261 : :
262 : : /*
263 : : * Timeline 1 never has a history file. We treat that as if it existed,
264 : : * since we never need to stream it.
265 : : */
266 [ + + ]: 125 : if (stream->timeline == 1)
4105 heikki.linnakangas@i 267 : 123 : return true;
268 : :
2956 magnus@hagander.net 269 : 2 : TLHistoryFileName(histfname, stream->timeline);
270 : :
573 rhaas@postgresql.org 271 : 2 : return stream->walmethod->ops->existsfile(stream->walmethod, histfname);
272 : : }
273 : :
274 : : static bool
2956 magnus@hagander.net 275 : 2 : writeTimeLineHistoryFile(StreamCtl *stream, char *filename, char *content)
276 : : {
4105 heikki.linnakangas@i 277 : 2 : int size = strlen(content);
278 : : char histfname[MAXFNAMELEN];
279 : : Walfile *f;
280 : :
281 : : /*
282 : : * Check that the server's idea of how timeline history files should be
283 : : * named matches ours.
284 : : */
2956 magnus@hagander.net 285 : 2 : TLHistoryFileName(histfname, stream->timeline);
4105 heikki.linnakangas@i 286 [ - + ]: 2 : if (strcmp(histfname, filename) != 0)
287 : : {
1840 peter@eisentraut.org 288 :UBC 0 : pg_log_error("server reported unexpected history file name for timeline %u: %s",
289 : : stream->timeline, filename);
4105 heikki.linnakangas@i 290 : 0 : return false;
291 : : }
292 : :
573 rhaas@postgresql.org 293 :CBC 2 : f = stream->walmethod->ops->open_for_write(stream->walmethod,
294 : : histfname, ".tmp", 0);
2730 magnus@hagander.net 295 [ - + ]: 2 : if (f == NULL)
296 : : {
1840 peter@eisentraut.org 297 :UBC 0 : pg_log_error("could not create timeline history file \"%s\": %s",
298 : : histfname, GetLastWalMethodError(stream->walmethod));
4105 heikki.linnakangas@i 299 : 0 : return false;
300 : : }
301 : :
573 rhaas@postgresql.org 302 [ - + ]:CBC 2 : if ((int) stream->walmethod->ops->write(f, content, size) != size)
303 : : {
1840 peter@eisentraut.org 304 :UBC 0 : pg_log_error("could not write timeline history file \"%s\": %s",
305 : : histfname, GetLastWalMethodError(stream->walmethod));
306 : :
307 : : /*
308 : : * If we fail to make the file, delete it to release disk space
309 : : */
573 rhaas@postgresql.org 310 : 0 : stream->walmethod->ops->close(f, CLOSE_UNLINK);
311 : :
4105 heikki.linnakangas@i 312 : 0 : return false;
313 : : }
314 : :
573 rhaas@postgresql.org 315 [ - + ]:CBC 2 : if (stream->walmethod->ops->close(f, CLOSE_NORMAL) != 0)
316 : : {
1840 peter@eisentraut.org 317 :UBC 0 : pg_log_error("could not close file \"%s\": %s",
318 : : histfname, GetLastWalMethodError(stream->walmethod));
4105 heikki.linnakangas@i 319 : 0 : return false;
320 : : }
321 : :
322 : : /* Maintain archive_status, check close_walfile() for details. */
2956 magnus@hagander.net 323 [ + + ]:CBC 2 : if (stream->mark_done)
324 : : {
325 : : /* writes error message if failed */
2730 326 [ - + ]: 1 : if (!mark_file_as_archived(stream, histfname))
3389 andres@anarazel.de 327 :UBC 0 : return false;
328 : : }
329 : :
4105 heikki.linnakangas@i 330 :CBC 2 : return true;
331 : : }
332 : :
333 : : /*
334 : : * Send a Standby Status Update message to server.
335 : : */
336 : : static bool
2607 tgl@sss.pgh.pa.us 337 : 123 : sendFeedback(PGconn *conn, XLogRecPtr blockpos, TimestampTz now, bool replyRequested)
338 : : {
339 : : char replybuf[1 + 8 + 8 + 8 + 8 + 1];
3973 bruce@momjian.us 340 : 123 : int len = 0;
341 : :
4176 heikki.linnakangas@i 342 : 123 : replybuf[len] = 'r';
343 : 123 : len += 1;
2489 tgl@sss.pgh.pa.us 344 : 123 : fe_sendint64(blockpos, &replybuf[len]); /* write */
4176 heikki.linnakangas@i 345 : 123 : len += 8;
3726 rhaas@postgresql.org 346 [ + + ]: 123 : if (reportFlushPosition)
2489 tgl@sss.pgh.pa.us 347 : 119 : fe_sendint64(lastFlushPosition, &replybuf[len]); /* flush */
348 : : else
349 : 4 : fe_sendint64(InvalidXLogRecPtr, &replybuf[len]); /* flush */
4176 heikki.linnakangas@i 350 : 123 : len += 8;
3631 bruce@momjian.us 351 : 123 : fe_sendint64(InvalidXLogRecPtr, &replybuf[len]); /* apply */
4176 heikki.linnakangas@i 352 : 123 : len += 8;
3631 bruce@momjian.us 353 : 123 : fe_sendint64(now, &replybuf[len]); /* sendTime */
4176 heikki.linnakangas@i 354 : 123 : len += 8;
2489 tgl@sss.pgh.pa.us 355 : 123 : replybuf[len] = replyRequested ? 1 : 0; /* replyRequested */
4176 heikki.linnakangas@i 356 : 123 : len += 1;
357 : :
358 [ + - - + ]: 123 : if (PQputCopyData(conn, replybuf, len) <= 0 || PQflush(conn))
359 : : {
1840 peter@eisentraut.org 360 :UBC 0 : pg_log_error("could not send feedback packet: %s",
361 : : PQerrorMessage(conn));
4176 heikki.linnakangas@i 362 : 0 : return false;
363 : : }
364 : :
4176 heikki.linnakangas@i 365 :CBC 123 : return true;
366 : : }
367 : :
368 : : /*
369 : : * Check that the server version we're connected to is supported by
370 : : * ReceiveXlogStream().
371 : : *
372 : : * If it's not, an error message is printed to stderr, and false is returned.
373 : : */
374 : : bool
4041 375 : 267 : CheckServerVersionForStreaming(PGconn *conn)
376 : : {
377 : : int minServerMajor,
378 : : maxServerMajor;
379 : : int serverMajor;
380 : :
381 : : /*
382 : : * The message format used in streaming replication changed in 9.3, so we
383 : : * cannot stream from older servers. And we don't support servers newer
384 : : * than the client; it might work, but we don't know, so err on the safe
385 : : * side.
386 : : */
387 : 267 : minServerMajor = 903;
388 : 267 : maxServerMajor = PG_VERSION_NUM / 100;
389 : 267 : serverMajor = PQserverVersion(conn) / 100;
3631 simon@2ndQuadrant.co 390 [ - + ]: 267 : if (serverMajor < minServerMajor)
391 : : {
4041 heikki.linnakangas@i 392 :UBC 0 : const char *serverver = PQparameterStatus(conn, "server_version");
393 : :
1840 peter@eisentraut.org 394 [ # # ]: 0 : pg_log_error("incompatible server version %s; client does not support streaming from server versions older than %s",
395 : : serverver ? serverver : "'unknown'",
396 : : "9.3");
3631 simon@2ndQuadrant.co 397 : 0 : return false;
398 : : }
3631 simon@2ndQuadrant.co 399 [ - + ]:CBC 267 : else if (serverMajor > maxServerMajor)
400 : : {
3631 simon@2ndQuadrant.co 401 :UBC 0 : const char *serverver = PQparameterStatus(conn, "server_version");
402 : :
1840 peter@eisentraut.org 403 [ # # ]: 0 : pg_log_error("incompatible server version %s; client does not support streaming from server versions newer than %s",
404 : : serverver ? serverver : "'unknown'",
405 : : PG_VERSION);
4041 heikki.linnakangas@i 406 : 0 : return false;
407 : : }
4041 heikki.linnakangas@i 408 :CBC 267 : return true;
409 : : }
410 : :
411 : : /*
412 : : * Receive a log stream starting at the specified position.
413 : : *
414 : : * Individual parameters are passed through the StreamCtl structure.
415 : : *
416 : : * If sysidentifier is specified, validate that both the system
417 : : * identifier and the timeline matches the specified ones
418 : : * (by sending an extra IDENTIFY_SYSTEM command)
419 : : *
420 : : * All received segments will be written to the directory
421 : : * specified by basedir. This will also fetch any missing timeline history
422 : : * files.
423 : : *
424 : : * The stream_stop callback will be called every time data
425 : : * is received, and whenever a segment is completed. If it returns
426 : : * true, the streaming will stop and the function
427 : : * return. As long as it returns false, streaming will continue
428 : : * indefinitely.
429 : : *
430 : : * If stream_stop() checks for external input, stop_socket should be set to
431 : : * the FD it checks. This will allow such input to be detected promptly
432 : : * rather than after standby_message_timeout (which might be indefinite).
433 : : * Note that signals will interrupt waits for input as well, but that is
434 : : * race-y since a signal received while busy won't interrupt the wait.
435 : : *
436 : : * standby_message_timeout controls how often we send a message
437 : : * back to the primary letting it know our progress, in milliseconds.
438 : : * Zero means no messages are sent.
439 : : * This message will only contain the write location, and never
440 : : * flush or replay.
441 : : *
442 : : * If 'partial_suffix' is not NULL, files are initially created with the
443 : : * given suffix, and the suffix is removed once the file is finished. That
444 : : * allows you to tell the difference between partial and completed files,
445 : : * so that you can continue later where you left.
446 : : *
447 : : * If 'synchronous' is true, the received WAL is flushed as soon as written,
448 : : * otherwise only when the WAL file is closed.
449 : : *
450 : : * Note: The WAL location *must* be at a log segment start!
451 : : */
452 : : bool
2956 magnus@hagander.net 453 : 124 : ReceiveXlogStream(PGconn *conn, StreamCtl *stream)
454 : : {
455 : : char query[128];
456 : : char slotcmd[128];
457 : : PGresult *res;
458 : : XLogRecPtr stoppos;
459 : :
460 : : /*
461 : : * The caller should've checked the server version already, but doesn't do
462 : : * any harm to check it here too.
463 : : */
4041 heikki.linnakangas@i 464 [ - + ]: 124 : if (!CheckServerVersionForStreaming(conn))
4107 heikki.linnakangas@i 465 :UBC 0 : return false;
466 : :
467 : : /*
468 : : * Decide whether we want to report the flush position. If we report the
469 : : * flush position, the primary will know what WAL we'll possibly
470 : : * re-request, and it can then remove older WAL safely. We must always do
471 : : * that when we are using slots.
472 : : *
473 : : * Reporting the flush position makes one eligible as a synchronous
474 : : * replica. People shouldn't include generic names in
475 : : * synchronous_standby_names, but we've protected them against it so far,
476 : : * so let's continue to do so unless specifically requested.
477 : : */
2645 magnus@hagander.net 478 [ + + ]:CBC 124 : if (stream->replication_slot != NULL)
479 : : {
3726 rhaas@postgresql.org 480 : 119 : reportFlushPosition = true;
2645 magnus@hagander.net 481 : 119 : sprintf(slotcmd, "SLOT \"%s\" ", stream->replication_slot);
482 : : }
483 : : else
484 : : {
2785 simon@2ndQuadrant.co 485 [ + + ]: 5 : if (stream->synchronous)
486 : 1 : reportFlushPosition = true;
487 : : else
488 : 4 : reportFlushPosition = false;
3726 rhaas@postgresql.org 489 : 5 : slotcmd[0] = 0;
490 : : }
491 : :
2956 magnus@hagander.net 492 [ + - ]: 124 : if (stream->sysidentifier != NULL)
493 : : {
957 michael@paquier.xyz 494 : 124 : char *sysidentifier = NULL;
495 : : TimeLineID servertli;
496 : :
497 : : /*
498 : : * Get the server system identifier and timeline, and validate them.
499 : : */
500 [ - + ]: 124 : if (!RunIdentifySystem(conn, &sysidentifier, &servertli, NULL, NULL))
501 : : {
957 michael@paquier.xyz 502 :UBC 0 : pg_free(sysidentifier);
4554 magnus@hagander.net 503 : 0 : return false;
504 : : }
505 : :
957 michael@paquier.xyz 506 [ - + ]:CBC 124 : if (strcmp(stream->sysidentifier, sysidentifier) != 0)
507 : : {
1840 peter@eisentraut.org 508 :UBC 0 : pg_log_error("system identifier does not match between base backup and streaming connection");
957 michael@paquier.xyz 509 : 0 : pg_free(sysidentifier);
4554 magnus@hagander.net 510 : 0 : return false;
511 : : }
957 michael@paquier.xyz 512 :CBC 124 : pg_free(sysidentifier);
513 : :
514 [ - + ]: 124 : if (stream->timeline > servertli)
515 : : {
1840 peter@eisentraut.org 516 :UBC 0 : pg_log_error("starting timeline %u is not present in the server",
517 : : stream->timeline);
4554 magnus@hagander.net 518 : 0 : return false;
519 : : }
520 : : }
521 : :
522 : : /*
523 : : * initialize flush position to starting point, it's the caller's
524 : : * responsibility that that's sane.
525 : : */
2956 magnus@hagander.net 526 :CBC 124 : lastFlushPosition = stream->startpos;
527 : :
528 : : while (1)
529 : : {
530 : : /*
531 : : * Fetch the timeline history file for this timeline, if we don't have
532 : : * it already. When streaming log to tar, this will always return
533 : : * false, as we are never streaming into an existing file and
534 : : * therefore there can be no pre-existing timeline history file.
535 : : */
536 [ + + ]: 125 : if (!existsTimeLineHistoryFile(stream))
537 : : {
538 : 2 : snprintf(query, sizeof(query), "TIMELINE_HISTORY %u", stream->timeline);
4105 heikki.linnakangas@i 539 : 2 : res = PQexec(conn, query);
540 [ - + ]: 2 : if (PQresultStatus(res) != PGRES_TUPLES_OK)
541 : : {
542 : : /* FIXME: we might send it ok, but get an error */
1840 peter@eisentraut.org 543 :UBC 0 : pg_log_error("could not send replication command \"%s\": %s",
544 : : "TIMELINE_HISTORY", PQresultErrorMessage(res));
4105 heikki.linnakangas@i 545 : 0 : PQclear(res);
546 : 0 : return false;
547 : : }
548 : :
549 : : /*
550 : : * The response to TIMELINE_HISTORY is a single row result set
551 : : * with two fields: filename and content
552 : : */
4105 heikki.linnakangas@i 553 [ + - - + ]:CBC 2 : if (PQnfields(res) != 2 || PQntuples(res) != 1)
554 : : {
1840 peter@eisentraut.org 555 :UBC 0 : pg_log_warning("unexpected response to TIMELINE_HISTORY command: got %d rows and %d fields, expected %d rows and %d fields",
556 : : PQntuples(res), PQnfields(res), 1, 2);
557 : : }
558 : :
559 : : /* Write the history file to disk */
2956 magnus@hagander.net 560 :CBC 2 : writeTimeLineHistoryFile(stream,
561 : : PQgetvalue(res, 0, 0),
562 : : PQgetvalue(res, 0, 1));
563 : :
4105 heikki.linnakangas@i 564 : 2 : PQclear(res);
565 : : }
566 : :
567 : : /*
568 : : * Before we start streaming from the requested location, check if the
569 : : * callback tells us to stop here.
570 : : */
2956 magnus@hagander.net 571 [ - + ]: 125 : if (stream->stream_stop(stream->startpos, stream->timeline, false))
4105 heikki.linnakangas@i 572 :UBC 0 : return true;
573 : :
574 : : /* Initiate the replication stream at specified location */
3726 rhaas@postgresql.org 575 :CBC 125 : snprintf(query, sizeof(query), "START_REPLICATION %s%X/%X TIMELINE %u",
576 : : slotcmd,
1146 peter@eisentraut.org 577 : 125 : LSN_FORMAT_ARGS(stream->startpos),
578 : : stream->timeline);
4105 heikki.linnakangas@i 579 : 125 : res = PQexec(conn, query);
580 [ + + ]: 125 : if (PQresultStatus(res) != PGRES_COPY_BOTH)
581 : : {
1840 peter@eisentraut.org 582 : 2 : pg_log_error("could not send replication command \"%s\": %s",
583 : : "START_REPLICATION", PQresultErrorMessage(res));
4105 heikki.linnakangas@i 584 : 2 : PQclear(res);
585 : 2 : return false;
586 : : }
4294 magnus@hagander.net 587 : 123 : PQclear(res);
588 : :
589 : : /* Stream the WAL */
2956 590 : 123 : res = HandleCopyStream(conn, stream, &stoppos);
4003 rhaas@postgresql.org 591 [ - + ]: 123 : if (res == NULL)
4105 heikki.linnakangas@i 592 :UBC 0 : goto error;
593 : :
594 : : /*
595 : : * Streaming finished.
596 : : *
597 : : * There are two possible reasons for that: a controlled shutdown, or
598 : : * we reached the end of the current timeline. In case of
599 : : * end-of-timeline, the server sends a result set after Copy has
600 : : * finished, containing information about the next timeline. Read
601 : : * that, and restart streaming from the next timeline. In case of
602 : : * controlled shutdown, stop here.
603 : : */
4105 heikki.linnakangas@i 604 [ + + ]:CBC 123 : if (PQresultStatus(res) == PGRES_TUPLES_OK)
605 : 1 : {
606 : : /*
607 : : * End-of-timeline. Read the next timeline's ID and starting
608 : : * position. Usually, the starting position will match the end of
609 : : * the previous timeline, but there are corner cases like if the
610 : : * server had sent us half of a WAL record, when it was promoted.
611 : : * The new timeline will begin at the end of the last complete
612 : : * record in that case, overlapping the partial WAL record on the
613 : : * old timeline.
614 : : */
615 : : uint32 newtimeline;
616 : : bool parsed;
617 : :
2956 magnus@hagander.net 618 : 1 : parsed = ReadEndOfStreamingResult(res, &stream->startpos, &newtimeline);
4105 heikki.linnakangas@i 619 : 1 : PQclear(res);
3994 620 [ - + ]: 1 : if (!parsed)
3994 heikki.linnakangas@i 621 :UBC 0 : goto error;
622 : :
623 : : /* Sanity check the values the server gave us */
2956 magnus@hagander.net 624 [ - + ]:CBC 1 : if (newtimeline <= stream->timeline)
625 : : {
1840 peter@eisentraut.org 626 :UBC 0 : pg_log_error("server reported unexpected next timeline %u, following timeline %u",
627 : : newtimeline, stream->timeline);
3994 heikki.linnakangas@i 628 : 0 : goto error;
629 : : }
2956 magnus@hagander.net 630 [ - + ]:CBC 1 : if (stream->startpos > stoppos)
631 : : {
1840 peter@eisentraut.org 632 :UBC 0 : pg_log_error("server stopped streaming timeline %u at %X/%X, but reported next timeline %u to begin at %X/%X",
633 : : stream->timeline, LSN_FORMAT_ARGS(stoppos),
634 : : newtimeline, LSN_FORMAT_ARGS(stream->startpos));
4105 heikki.linnakangas@i 635 : 0 : goto error;
636 : : }
637 : :
638 : : /* Read the final result, which should be CommandComplete. */
4105 heikki.linnakangas@i 639 :CBC 1 : res = PQgetResult(conn);
640 [ - + ]: 1 : if (PQresultStatus(res) != PGRES_COMMAND_OK)
641 : : {
1840 peter@eisentraut.org 642 :UBC 0 : pg_log_error("unexpected termination of replication stream: %s",
643 : : PQresultErrorMessage(res));
3543 fujii@postgresql.org 644 : 0 : PQclear(res);
4105 heikki.linnakangas@i 645 : 0 : goto error;
646 : : }
4105 heikki.linnakangas@i 647 :CBC 1 : PQclear(res);
648 : :
649 : : /*
650 : : * Loop back to start streaming from the new timeline. Always
651 : : * start streaming at the beginning of a segment.
652 : : */
2956 magnus@hagander.net 653 : 1 : stream->timeline = newtimeline;
2399 andres@anarazel.de 654 : 1 : stream->startpos = stream->startpos -
655 : 1 : XLogSegmentOffset(stream->startpos, WalSegSz);
4105 heikki.linnakangas@i 656 : 1 : continue;
657 : : }
658 [ + + ]: 122 : else if (PQresultStatus(res) == PGRES_COMMAND_OK)
659 : : {
3543 fujii@postgresql.org 660 : 121 : PQclear(res);
661 : :
662 : : /*
663 : : * End of replication (ie. controlled shut down of the server).
664 : : *
665 : : * Check if the callback thinks it's OK to stop here. If not,
666 : : * complain.
667 : : */
2956 magnus@hagander.net 668 [ + - ]: 121 : if (stream->stream_stop(stoppos, stream->timeline, false))
4105 heikki.linnakangas@i 669 : 121 : return true;
670 : : else
671 : : {
1840 peter@eisentraut.org 672 :UBC 0 : pg_log_error("replication stream was terminated before stop point");
4105 heikki.linnakangas@i 673 : 0 : goto error;
674 : : }
675 : : }
676 : : else
677 : : {
678 : : /* Server returned an error. */
1840 peter@eisentraut.org 679 :CBC 1 : pg_log_error("unexpected termination of replication stream: %s",
680 : : PQresultErrorMessage(res));
3543 fujii@postgresql.org 681 : 1 : PQclear(res);
4105 heikki.linnakangas@i 682 : 1 : goto error;
683 : : }
684 : : }
685 : :
686 : 1 : error:
573 rhaas@postgresql.org 687 [ - + - - ]: 1 : if (walfile != NULL && stream->walmethod->ops->close(walfile, CLOSE_NO_RENAME) != 0)
1840 peter@eisentraut.org 688 :UBC 0 : pg_log_error("could not close file \"%s\": %s",
689 : : walfile->pathname, GetLastWalMethodError(stream->walmethod));
2730 magnus@hagander.net 690 :CBC 1 : walfile = NULL;
4105 heikki.linnakangas@i 691 : 1 : return false;
692 : : }
693 : :
694 : : /*
695 : : * Helper function to parse the result set returned by server after streaming
696 : : * has finished. On failure, prints an error to stderr and returns false.
697 : : */
698 : : static bool
3994 699 : 1 : ReadEndOfStreamingResult(PGresult *res, XLogRecPtr *startpos, uint32 *timeline)
700 : : {
701 : : uint32 startpos_xlogid,
702 : : startpos_xrecoff;
703 : :
704 : : /*----------
705 : : * The result set consists of one row and two columns, e.g:
706 : : *
707 : : * next_tli | next_tli_startpos
708 : : * ----------+-------------------
709 : : * 4 | 0/9949AE0
710 : : *
711 : : * next_tli is the timeline ID of the next timeline after the one that
712 : : * just finished streaming. next_tli_startpos is the WAL location where
713 : : * the server switched to it.
714 : : *----------
715 : : */
716 [ + - - + ]: 1 : if (PQnfields(res) < 2 || PQntuples(res) != 1)
717 : : {
1840 peter@eisentraut.org 718 :UBC 0 : pg_log_error("unexpected result set after end-of-timeline: got %d rows and %d fields, expected %d rows and %d fields",
719 : : PQntuples(res), PQnfields(res), 1, 2);
3994 heikki.linnakangas@i 720 : 0 : return false;
721 : : }
722 : :
3994 heikki.linnakangas@i 723 :CBC 1 : *timeline = atoi(PQgetvalue(res, 0, 0));
724 [ - + ]: 1 : if (sscanf(PQgetvalue(res, 0, 1), "%X/%X", &startpos_xlogid,
725 : : &startpos_xrecoff) != 2)
726 : : {
1840 peter@eisentraut.org 727 :UBC 0 : pg_log_error("could not parse next timeline's starting point \"%s\"",
728 : : PQgetvalue(res, 0, 1));
3994 heikki.linnakangas@i 729 : 0 : return false;
730 : : }
3994 heikki.linnakangas@i 731 :CBC 1 : *startpos = ((uint64) startpos_xlogid << 32) | startpos_xrecoff;
732 : :
733 : 1 : return true;
734 : : }
735 : :
736 : : /*
737 : : * The main loop of ReceiveXlogStream. Handles the COPY stream after
738 : : * initiating streaming with the START_REPLICATION command.
739 : : *
740 : : * If the COPY ends (not necessarily successfully) due a message from the
741 : : * server, returns a PGresult and sets *stoppos to the last byte written.
742 : : * On any other sort of error, returns NULL.
743 : : */
744 : : static PGresult *
2956 magnus@hagander.net 745 : 123 : HandleCopyStream(PGconn *conn, StreamCtl *stream,
746 : : XLogRecPtr *stoppos)
747 : : {
4105 heikki.linnakangas@i 748 : 123 : char *copybuf = NULL;
2607 tgl@sss.pgh.pa.us 749 : 123 : TimestampTz last_status = -1;
2956 magnus@hagander.net 750 : 123 : XLogRecPtr blockpos = stream->startpos;
751 : :
3539 fujii@postgresql.org 752 : 123 : still_sending = true;
753 : :
754 : : while (1)
4554 magnus@hagander.net 755 : 2235 : {
756 : : int r;
757 : : TimestampTz now;
758 : : long sleeptime;
759 : :
760 : : /*
761 : : * Check if we should continue streaming, or abort at this point.
762 : : */
1312 peter@eisentraut.org 763 [ - + ]: 2358 : if (!CheckCopyStreamStop(conn, stream, blockpos))
3537 fujii@postgresql.org 764 :UBC 0 : goto error;
765 : :
3537 fujii@postgresql.org 766 :CBC 2358 : now = feGetCurrentTimestamp();
767 : :
768 : : /*
769 : : * If synchronous option is true, issue sync command as soon as there
770 : : * are WAL data which has not been flushed yet.
771 : : */
2730 magnus@hagander.net 772 [ + + - + : 2358 : if (stream->synchronous && lastFlushPosition < blockpos && walfile != NULL)
- - ]
773 : : {
573 rhaas@postgresql.org 774 [ # # ]:UBC 0 : if (stream->walmethod->ops->sync(walfile) != 0)
737 tgl@sss.pgh.pa.us 775 : 0 : pg_fatal("could not fsync file \"%s\": %s",
776 : : walfile->pathname, GetLastWalMethodError(stream->walmethod));
3537 fujii@postgresql.org 777 : 0 : lastFlushPosition = blockpos;
778 : :
779 : : /*
780 : : * Send feedback so that the server sees the latest WAL locations
781 : : * immediately.
782 : : */
3435 783 [ # # ]: 0 : if (!sendFeedback(conn, blockpos, now, false))
784 : 0 : goto error;
785 : 0 : last_status = now;
786 : : }
787 : :
788 : : /*
789 : : * Potentially send a status message to the primary
790 : : */
2956 magnus@hagander.net 791 [ + + + - :CBC 4620 : if (still_sending && stream->standby_message_timeout > 0 &&
+ + ]
3680 rhaas@postgresql.org 792 : 2262 : feTimestampDifferenceExceeds(last_status, now,
793 : : stream->standby_message_timeout))
794 : : {
795 : : /* Time to send feedback! */
4175 heikki.linnakangas@i 796 [ - + ]: 123 : if (!sendFeedback(conn, blockpos, now, false))
4294 magnus@hagander.net 797 :UBC 0 : goto error;
4554 magnus@hagander.net 798 :CBC 123 : last_status = now;
799 : : }
800 : :
801 : : /*
802 : : * Calculate how long send/receive loops should sleep
803 : : */
2956 804 : 2358 : sleeptime = CalculateCopyStreamSleeptime(now, stream->standby_message_timeout,
805 : : last_status);
806 : :
2544 tgl@sss.pgh.pa.us 807 : 2358 : r = CopyStreamReceive(conn, sleeptime, stream->stop_socket, ©buf);
3537 fujii@postgresql.org 808 [ + + ]: 8278 : while (r != 0)
809 : : {
810 [ - + ]: 6043 : if (r == -1)
3539 fujii@postgresql.org 811 :UBC 0 : goto error;
3537 fujii@postgresql.org 812 [ + + ]:CBC 6043 : if (r == -2)
813 : : {
2956 magnus@hagander.net 814 : 123 : PGresult *res = HandleEndOfCopyStream(conn, stream, copybuf, blockpos, stoppos);
815 : :
3537 fujii@postgresql.org 816 [ - + ]: 123 : if (res == NULL)
3537 fujii@postgresql.org 817 :UBC 0 : goto error;
818 : : else
3537 fujii@postgresql.org 819 :CBC 123 : return res;
820 : : }
821 : :
822 : : /* Check the message type. */
823 [ - + ]: 5920 : if (copybuf[0] == 'k')
824 : : {
2754 peter_e@gmx.net 825 [ # # ]:UBC 0 : if (!ProcessKeepaliveMsg(conn, stream, copybuf, r, blockpos,
826 : : &last_status))
3537 fujii@postgresql.org 827 : 0 : goto error;
828 : : }
3537 fujii@postgresql.org 829 [ + - ]:CBC 5920 : else if (copybuf[0] == 'w')
830 : : {
2956 magnus@hagander.net 831 [ - + ]: 5920 : if (!ProcessXLogDataMsg(conn, stream, copybuf, r, &blockpos))
3537 fujii@postgresql.org 832 :UBC 0 : goto error;
833 : :
834 : : /*
835 : : * Check if we should continue streaming, or abort at this
836 : : * point.
837 : : */
1312 peter@eisentraut.org 838 [ - + ]:CBC 5920 : if (!CheckCopyStreamStop(conn, stream, blockpos))
3537 fujii@postgresql.org 839 :UBC 0 : goto error;
840 : : }
841 : : else
842 : : {
1840 peter@eisentraut.org 843 : 0 : pg_log_error("unrecognized streaming header: \"%c\"",
844 : : copybuf[0]);
4294 magnus@hagander.net 845 : 0 : goto error;
846 : : }
847 : :
848 : : /*
849 : : * Process the received data, and any subsequent data we can read
850 : : * without blocking.
851 : : */
2544 tgl@sss.pgh.pa.us 852 :CBC 5920 : r = CopyStreamReceive(conn, 0, stream->stop_socket, ©buf);
853 : : }
854 : : }
855 : :
4294 magnus@hagander.net 856 :UBC 0 : error:
597 peter@eisentraut.org 857 : 0 : PQfreemem(copybuf);
4003 rhaas@postgresql.org 858 : 0 : return NULL;
859 : : }
860 : :
861 : : /*
862 : : * Wait until we can read a CopyData message,
863 : : * or timeout, or occurrence of a signal or input on the stop_socket.
864 : : * (timeout_ms < 0 means wait indefinitely; 0 means don't wait.)
865 : : *
866 : : * Returns 1 if data has become available for reading, 0 if timed out
867 : : * or interrupted by signal or stop_socket input, and -1 on an error.
868 : : */
869 : : static int
2544 tgl@sss.pgh.pa.us 870 :CBC 8152 : CopyStreamPoll(PGconn *conn, long timeout_ms, pgsocket stop_socket)
871 : : {
872 : : int ret;
873 : : fd_set input_mask;
874 : : int connsocket;
875 : : int maxfd;
876 : : struct timeval timeout;
877 : : struct timeval *timeoutptr;
878 : :
879 : 8152 : connsocket = PQsocket(conn);
880 [ - + ]: 8152 : if (connsocket < 0)
881 : : {
1840 peter@eisentraut.org 882 :UBC 0 : pg_log_error("invalid socket: %s", PQerrorMessage(conn));
3572 fujii@postgresql.org 883 : 0 : return -1;
884 : : }
885 : :
3572 fujii@postgresql.org 886 [ + + ]:CBC 138584 : FD_ZERO(&input_mask);
2544 tgl@sss.pgh.pa.us 887 : 8152 : FD_SET(connsocket, &input_mask);
888 : 8152 : maxfd = connsocket;
889 [ + + ]: 8152 : if (stop_socket != PGINVALID_SOCKET)
890 : : {
891 : 8075 : FD_SET(stop_socket, &input_mask);
892 : 8075 : maxfd = Max(maxfd, stop_socket);
893 : : }
894 : :
3572 fujii@postgresql.org 895 [ + + ]: 8152 : if (timeout_ms < 0)
896 : 96 : timeoutptr = NULL;
897 : : else
898 : : {
899 : 8056 : timeout.tv_sec = timeout_ms / 1000L;
900 : 8056 : timeout.tv_usec = (timeout_ms % 1000L) * 1000L;
901 : 8056 : timeoutptr = &timeout;
902 : : }
903 : :
2544 tgl@sss.pgh.pa.us 904 : 8152 : ret = select(maxfd + 1, &input_mask, NULL, NULL, timeoutptr);
905 : :
906 [ - + ]: 8152 : if (ret < 0)
907 : : {
2544 tgl@sss.pgh.pa.us 908 [ # # ]:UBC 0 : if (errno == EINTR)
909 : 0 : return 0; /* Got a signal, so not an error */
1087 peter@eisentraut.org 910 : 0 : pg_log_error("%s() failed: %m", "select");
3572 fujii@postgresql.org 911 : 0 : return -1;
912 : : }
2544 tgl@sss.pgh.pa.us 913 [ + + + + ]:CBC 8152 : if (ret > 0 && FD_ISSET(connsocket, &input_mask))
914 : 6877 : return 1; /* Got input on connection socket */
915 : :
916 : 1275 : return 0; /* Got timeout or input on stop_socket */
917 : : }
918 : :
919 : : /*
920 : : * Receive CopyData message available from XLOG stream, blocking for
921 : : * maximum of 'timeout' ms.
922 : : *
923 : : * If data was received, returns the length of the data. *buffer is set to
924 : : * point to a buffer holding the received message. The buffer is only valid
925 : : * until the next CopyStreamReceive call.
926 : : *
927 : : * Returns 0 if no data was available within timeout, or if wait was
928 : : * interrupted by signal or stop_socket input.
929 : : * -1 on error. -2 if the server ended the COPY.
930 : : */
931 : : static int
932 : 8278 : CopyStreamReceive(PGconn *conn, long timeout, pgsocket stop_socket,
933 : : char **buffer)
934 : : {
3572 fujii@postgresql.org 935 : 8278 : char *copybuf = NULL;
936 : : int rawlen;
937 : :
597 peter@eisentraut.org 938 : 8278 : PQfreemem(*buffer);
3572 fujii@postgresql.org 939 : 8278 : *buffer = NULL;
940 : :
941 : : /* Try to receive a CopyData message */
942 : 8278 : rawlen = PQgetCopyData(conn, ©buf, 1);
943 [ + + ]: 8278 : if (rawlen == 0)
944 : : {
945 : : int ret;
946 : :
947 : : /*
948 : : * No data available. Wait for some to appear, but not longer than
949 : : * the specified timeout, so that we can ping the server. Also stop
950 : : * waiting if input appears on stop_socket.
951 : : */
2544 tgl@sss.pgh.pa.us 952 : 8152 : ret = CopyStreamPoll(conn, timeout, stop_socket);
953 [ + + ]: 8152 : if (ret <= 0)
954 : 1275 : return ret;
955 : :
956 : : /* Now there is actually data on the socket */
3572 fujii@postgresql.org 957 [ - + ]: 6877 : if (PQconsumeInput(conn) == 0)
958 : : {
1840 peter@eisentraut.org 959 :UBC 0 : pg_log_error("could not receive data from WAL stream: %s",
960 : : PQerrorMessage(conn));
3572 fujii@postgresql.org 961 : 0 : return -1;
962 : : }
963 : :
964 : : /* Now that we've consumed some input, try again */
3572 fujii@postgresql.org 965 :CBC 6877 : rawlen = PQgetCopyData(conn, ©buf, 1);
966 [ + + ]: 6877 : if (rawlen == 0)
967 : 960 : return 0;
968 : : }
969 [ + + ]: 6043 : if (rawlen == -1) /* end-of-streaming or error */
970 : 123 : return -2;
971 [ - + ]: 5920 : if (rawlen == -2)
972 : : {
1840 peter@eisentraut.org 973 :UBC 0 : pg_log_error("could not read COPY data: %s", PQerrorMessage(conn));
3572 fujii@postgresql.org 974 : 0 : return -1;
975 : : }
976 : :
977 : : /* Return received messages to caller */
3572 fujii@postgresql.org 978 :CBC 5920 : *buffer = copybuf;
979 : 5920 : return rawlen;
980 : : }
981 : :
982 : : /*
983 : : * Process the keepalive message.
984 : : */
985 : : static bool
2754 peter_e@gmx.net 986 :UBC 0 : ProcessKeepaliveMsg(PGconn *conn, StreamCtl *stream, char *copybuf, int len,
987 : : XLogRecPtr blockpos, TimestampTz *last_status)
988 : : {
989 : : int pos;
990 : : bool replyRequested;
991 : : TimestampTz now;
992 : :
993 : : /*
994 : : * Parse the keepalive message, enclosed in the CopyData message. We just
995 : : * check if the server requested a reply, and ignore the rest.
996 : : */
3249 bruce@momjian.us 997 : 0 : pos = 1; /* skip msgtype 'k' */
998 : 0 : pos += 8; /* skip walEnd */
999 : 0 : pos += 8; /* skip sendTime */
1000 : :
3539 fujii@postgresql.org 1001 [ # # ]: 0 : if (len < pos + 1)
1002 : : {
1840 peter@eisentraut.org 1003 : 0 : pg_log_error("streaming header too small: %d", len);
3539 fujii@postgresql.org 1004 : 0 : return false;
1005 : : }
1006 : 0 : replyRequested = copybuf[pos];
1007 : :
1008 : : /* If the server requested an immediate reply, send one. */
1009 [ # # # # ]: 0 : if (replyRequested && still_sending)
1010 : : {
3434 1011 [ # # # # ]: 0 : if (reportFlushPosition && lastFlushPosition < blockpos &&
2730 magnus@hagander.net 1012 [ # # ]: 0 : walfile != NULL)
1013 : : {
1014 : : /*
1015 : : * If a valid flush location needs to be reported, flush the
1016 : : * current WAL file so that the latest flush location is sent back
1017 : : * to the server. This is necessary to see whether the last WAL
1018 : : * data has been successfully replicated or not, at the normal
1019 : : * shutdown of the server.
1020 : : */
573 rhaas@postgresql.org 1021 [ # # ]: 0 : if (stream->walmethod->ops->sync(walfile) != 0)
737 tgl@sss.pgh.pa.us 1022 : 0 : pg_fatal("could not fsync file \"%s\": %s",
1023 : : walfile->pathname, GetLastWalMethodError(stream->walmethod));
3434 fujii@postgresql.org 1024 : 0 : lastFlushPosition = blockpos;
1025 : : }
1026 : :
3539 1027 : 0 : now = feGetCurrentTimestamp();
1028 [ # # ]: 0 : if (!sendFeedback(conn, blockpos, now, false))
1029 : 0 : return false;
1030 : 0 : *last_status = now;
1031 : : }
1032 : :
1033 : 0 : return true;
1034 : : }
1035 : :
1036 : : /*
1037 : : * Process XLogData message.
1038 : : */
1039 : : static bool
2956 magnus@hagander.net 1040 :CBC 5920 : ProcessXLogDataMsg(PGconn *conn, StreamCtl *stream, char *copybuf, int len,
1041 : : XLogRecPtr *blockpos)
1042 : : {
1043 : : int xlogoff;
1044 : : int bytes_left;
1045 : : int bytes_written;
1046 : : int hdr_len;
1047 : :
1048 : : /*
1049 : : * Once we've decided we don't want to receive any more, just ignore any
1050 : : * subsequent XLogData messages.
1051 : : */
3539 fujii@postgresql.org 1052 [ + + ]: 5920 : if (!(still_sending))
1053 : 141 : return true;
1054 : :
1055 : : /*
1056 : : * Read the header of the XLogData message, enclosed in the CopyData
1057 : : * message. We only need the WAL location field (dataStart), the rest of
1058 : : * the header is ignored.
1059 : : */
3249 bruce@momjian.us 1060 : 5779 : hdr_len = 1; /* msgtype 'w' */
1061 : 5779 : hdr_len += 8; /* dataStart */
1062 : 5779 : hdr_len += 8; /* walEnd */
1063 : 5779 : hdr_len += 8; /* sendTime */
3539 fujii@postgresql.org 1064 [ - + ]: 5779 : if (len < hdr_len)
1065 : : {
1840 peter@eisentraut.org 1066 :UBC 0 : pg_log_error("streaming header too small: %d", len);
3539 fujii@postgresql.org 1067 : 0 : return false;
1068 : : }
3539 fujii@postgresql.org 1069 :CBC 5779 : *blockpos = fe_recvint64(©buf[1]);
1070 : :
1071 : : /* Extract WAL location for this block */
2399 andres@anarazel.de 1072 : 5779 : xlogoff = XLogSegmentOffset(*blockpos, WalSegSz);
1073 : :
1074 : : /*
1075 : : * Verify that the initial location in the stream matches where we think
1076 : : * we are.
1077 : : */
2730 magnus@hagander.net 1078 [ + + ]: 5779 : if (walfile == NULL)
1079 : : {
1080 : : /* No file open yet */
3539 fujii@postgresql.org 1081 [ - + ]: 130 : if (xlogoff != 0)
1082 : : {
1840 peter@eisentraut.org 1083 :UBC 0 : pg_log_error("received write-ahead log record for offset %u with no file open",
1084 : : xlogoff);
3539 fujii@postgresql.org 1085 : 0 : return false;
1086 : : }
1087 : : }
1088 : : else
1089 : : {
1090 : : /* More data in existing segment */
573 rhaas@postgresql.org 1091 [ - + ]:CBC 5649 : if (walfile->currpos != xlogoff)
1092 : : {
1840 peter@eisentraut.org 1093 :UBC 0 : pg_log_error("got WAL data offset %08x, expected %08x",
1094 : : xlogoff, (int) walfile->currpos);
3539 fujii@postgresql.org 1095 : 0 : return false;
1096 : : }
1097 : : }
1098 : :
3539 fujii@postgresql.org 1099 :CBC 5779 : bytes_left = len - hdr_len;
1100 : 5779 : bytes_written = 0;
1101 : :
1102 [ + + ]: 11555 : while (bytes_left)
1103 : : {
1104 : : int bytes_to_write;
1105 : :
1106 : : /*
1107 : : * If crossing a WAL boundary, only write up until we reach wal
1108 : : * segment size.
1109 : : */
2399 andres@anarazel.de 1110 [ - + ]: 5779 : if (xlogoff + bytes_left > WalSegSz)
2399 andres@anarazel.de 1111 :UBC 0 : bytes_to_write = WalSegSz - xlogoff;
1112 : : else
3539 fujii@postgresql.org 1113 :CBC 5779 : bytes_to_write = bytes_left;
1114 : :
2730 magnus@hagander.net 1115 [ + + ]: 5779 : if (walfile == NULL)
1116 : : {
2956 1117 [ - + ]: 130 : if (!open_walfile(stream, *blockpos))
1118 : : {
1119 : : /* Error logged by open_walfile */
3539 fujii@postgresql.org 1120 :UBC 0 : return false;
1121 : : }
1122 : : }
1123 : :
573 rhaas@postgresql.org 1124 :CBC 11558 : if (stream->walmethod->ops->write(walfile,
1125 : 5779 : copybuf + hdr_len + bytes_written,
1126 [ - + ]: 5779 : bytes_to_write) != bytes_to_write)
1127 : : {
879 peter@eisentraut.org 1128 :UBC 0 : pg_log_error("could not write %d bytes to WAL file \"%s\": %s",
1129 : : bytes_to_write, walfile->pathname,
1130 : : GetLastWalMethodError(stream->walmethod));
3539 fujii@postgresql.org 1131 : 0 : return false;
1132 : : }
1133 : :
1134 : : /* Write was successful, advance our position */
3539 fujii@postgresql.org 1135 :CBC 5779 : bytes_written += bytes_to_write;
1136 : 5779 : bytes_left -= bytes_to_write;
1137 : 5779 : *blockpos += bytes_to_write;
1138 : 5779 : xlogoff += bytes_to_write;
1139 : :
1140 : : /* Did we reach the end of a WAL segment? */
2399 andres@anarazel.de 1141 [ + + ]: 5779 : if (XLogSegmentOffset(*blockpos, WalSegSz) == 0)
1142 : : {
2956 magnus@hagander.net 1143 [ - + ]: 35 : if (!close_walfile(stream, *blockpos))
1144 : : /* Error message written in close_walfile() */
3539 fujii@postgresql.org 1145 :UBC 0 : return false;
1146 : :
3539 fujii@postgresql.org 1147 :CBC 35 : xlogoff = 0;
1148 : :
2956 magnus@hagander.net 1149 [ + - + + ]: 35 : if (still_sending && stream->stream_stop(*blockpos, stream->timeline, true))
1150 : : {
3539 fujii@postgresql.org 1151 [ + - - + ]:GBC 3 : if (PQputCopyEnd(conn, NULL) <= 0 || PQflush(conn))
1152 : : {
1840 peter@eisentraut.org 1153 :UBC 0 : pg_log_error("could not send copy-end packet: %s",
1154 : : PQerrorMessage(conn));
3539 fujii@postgresql.org 1155 : 0 : return false;
1156 : : }
3539 fujii@postgresql.org 1157 :GBC 3 : still_sending = false;
1158 : 3 : return true; /* ignore the rest of this XLogData packet */
1159 : : }
1160 : : }
1161 : : }
1162 : : /* No more data left to write, receive next copy packet */
1163 : :
3539 fujii@postgresql.org 1164 :CBC 5776 : return true;
1165 : : }
1166 : :
1167 : : /*
1168 : : * Handle end of the copy stream.
1169 : : */
1170 : : static PGresult *
2956 magnus@hagander.net 1171 : 123 : HandleEndOfCopyStream(PGconn *conn, StreamCtl *stream, char *copybuf,
1172 : : XLogRecPtr blockpos, XLogRecPtr *stoppos)
1173 : : {
3539 fujii@postgresql.org 1174 : 123 : PGresult *res = PQgetResult(conn);
1175 : :
1176 : : /*
1177 : : * The server closed its end of the copy stream. If we haven't closed
1178 : : * ours already, we need to do so now, unless the server threw an error,
1179 : : * in which case we don't.
1180 : : */
1181 [ + + ]: 123 : if (still_sending)
1182 : : {
2956 magnus@hagander.net 1183 [ - + ]: 2 : if (!close_walfile(stream, blockpos))
1184 : : {
1185 : : /* Error message written in close_walfile() */
3539 fujii@postgresql.org 1186 :UBC 0 : PQclear(res);
1187 : 0 : return NULL;
1188 : : }
3539 fujii@postgresql.org 1189 [ + + ]:CBC 2 : if (PQresultStatus(res) == PGRES_COPY_IN)
1190 : : {
1191 [ + - - + ]: 1 : if (PQputCopyEnd(conn, NULL) <= 0 || PQflush(conn))
1192 : : {
1840 peter@eisentraut.org 1193 :UBC 0 : pg_log_error("could not send copy-end packet: %s",
1194 : : PQerrorMessage(conn));
3539 fujii@postgresql.org 1195 : 0 : PQclear(res);
1196 : 0 : return NULL;
1197 : : }
3539 fujii@postgresql.org 1198 :CBC 1 : res = PQgetResult(conn);
1199 : : }
1200 : 2 : still_sending = false;
1201 : : }
597 peter@eisentraut.org 1202 : 123 : PQfreemem(copybuf);
3539 fujii@postgresql.org 1203 : 123 : *stoppos = blockpos;
1204 : 123 : return res;
1205 : : }
1206 : :
1207 : : /*
1208 : : * Check if we should continue streaming, or abort at this point.
1209 : : */
1210 : : static bool
1312 peter@eisentraut.org 1211 : 8278 : CheckCopyStreamStop(PGconn *conn, StreamCtl *stream, XLogRecPtr blockpos)
1212 : : {
2956 magnus@hagander.net 1213 [ + + + + ]: 8278 : if (still_sending && stream->stream_stop(blockpos, stream->timeline, false))
1214 : : {
1215 [ - + ]: 118 : if (!close_walfile(stream, blockpos))
1216 : : {
1217 : : /* Potential error message is written by close_walfile */
3537 fujii@postgresql.org 1218 :UBC 0 : return false;
1219 : : }
3537 fujii@postgresql.org 1220 [ + - - + ]:CBC 118 : if (PQputCopyEnd(conn, NULL) <= 0 || PQflush(conn))
1221 : : {
1840 peter@eisentraut.org 1222 :UBC 0 : pg_log_error("could not send copy-end packet: %s",
1223 : : PQerrorMessage(conn));
3537 fujii@postgresql.org 1224 : 0 : return false;
1225 : : }
3537 fujii@postgresql.org 1226 :CBC 118 : still_sending = false;
1227 : : }
1228 : :
1229 : 8278 : return true;
1230 : : }
1231 : :
1232 : : /*
1233 : : * Calculate how long send/receive loops should sleep
1234 : : */
1235 : : static long
2607 tgl@sss.pgh.pa.us 1236 : 2358 : CalculateCopyStreamSleeptime(TimestampTz now, int standby_message_timeout,
1237 : : TimestampTz last_status)
1238 : : {
1239 : 2358 : TimestampTz status_targettime = 0;
1240 : : long sleeptime;
1241 : :
3537 fujii@postgresql.org 1242 [ + - + + ]: 2358 : if (standby_message_timeout && still_sending)
1243 : 2262 : status_targettime = last_status +
1244 : 2262 : (standby_message_timeout - 1) * ((int64) 1000);
1245 : :
3435 1246 [ + + ]: 2358 : if (status_targettime > 0)
1247 : : {
1248 : : long secs;
1249 : : int usecs;
1250 : :
3537 1251 : 2262 : feTimestampDifference(now,
1252 : : status_targettime,
1253 : : &secs,
1254 : : &usecs);
1255 : : /* Always sleep at least 1 sec */
1256 [ - + ]: 2262 : if (secs <= 0)
1257 : : {
3537 fujii@postgresql.org 1258 :UBC 0 : secs = 1;
1259 : 0 : usecs = 0;
1260 : : }
1261 : :
3537 fujii@postgresql.org 1262 :CBC 2262 : sleeptime = secs * 1000 + usecs / 1000;
1263 : : }
1264 : : else
1265 : 96 : sleeptime = -1;
1266 : :
1267 : 2358 : return sleeptime;
1268 : : }
|