Age Owner TLA Line data Source code
1 : /*-------------------------------------------------------------------------
2 : *
3 : * pg_recvlogical.c - receive data from a logical decoding slot in a streaming
4 : * fashion and write it to a local file.
5 : *
6 : * Portions Copyright (c) 1996-2023, PostgreSQL Global Development Group
7 : *
8 : * IDENTIFICATION
9 : * src/bin/pg_basebackup/pg_recvlogical.c
10 : *-------------------------------------------------------------------------
11 : */
12 :
13 : #include "postgres_fe.h"
14 :
15 : #include <dirent.h>
16 : #include <limits.h>
17 : #include <sys/select.h>
18 : #include <sys/stat.h>
19 : #include <unistd.h>
20 :
21 : #include "access/xlog_internal.h"
22 : #include "common/fe_memutils.h"
23 : #include "common/file_perm.h"
24 : #include "common/logging.h"
25 : #include "fe_utils/option_utils.h"
26 : #include "getopt_long.h"
27 : #include "libpq-fe.h"
28 : #include "libpq/pqsignal.h"
29 : #include "pqexpbuffer.h"
30 : #include "streamutil.h"
31 :
32 : /* Time to sleep between reconnection attempts */
33 : #define RECONNECT_SLEEP_TIME 5
34 :
35 : /* Global Options */
36 : static char *outfile = NULL;
37 : static int verbose = 0;
38 : static bool two_phase = false;
39 : static int noloop = 0;
40 : static int standby_message_timeout = 10 * 1000; /* 10 sec = default */
41 : static int fsync_interval = 10 * 1000; /* 10 sec = default */
42 : static XLogRecPtr startpos = InvalidXLogRecPtr;
43 : static XLogRecPtr endpos = InvalidXLogRecPtr;
44 : static bool do_create_slot = false;
45 : static bool slot_exists_ok = false;
46 : static bool do_start_slot = false;
47 : static bool do_drop_slot = false;
48 : static char *replication_slot = NULL;
49 :
50 : /* filled pairwise with option, value. value may be NULL */
51 : static char **options;
52 : static size_t noptions = 0;
53 : static const char *plugin = "test_decoding";
54 :
55 : /* Global State */
56 : static int outfd = -1;
57 : static volatile sig_atomic_t time_to_abort = false;
58 : static volatile sig_atomic_t output_reopen = false;
59 : static bool output_isfile;
60 : static TimestampTz output_last_fsync = -1;
61 : static bool output_needs_fsync = false;
62 : static XLogRecPtr output_written_lsn = InvalidXLogRecPtr;
63 : static XLogRecPtr output_fsync_lsn = InvalidXLogRecPtr;
64 :
65 : static void usage(void);
66 : static void StreamLogicalLog(void);
67 : static bool flushAndSendFeedback(PGconn *conn, TimestampTz *now);
68 : static void prepareToTerminate(PGconn *conn, XLogRecPtr endpos,
69 : bool keepalive, XLogRecPtr lsn);
3309 rhaas 70 ECB :
71 : static void
3309 rhaas 72 CBC 1 : usage(void)
73 : {
2942 bruce 74 1 : printf(_("%s controls PostgreSQL logical decoding streams.\n\n"),
3309 rhaas 75 ECB : progname);
3309 rhaas 76 CBC 1 : printf(_("Usage:\n"));
77 1 : printf(_(" %s [OPTION]...\n"), progname);
3101 peter_e 78 1 : printf(_("\nAction to be performed:\n"));
79 1 : printf(_(" --create-slot create a new replication slot (for the slot's name see --slot)\n"));
80 1 : printf(_(" --drop-slot drop the replication slot (for the slot's name see --slot)\n"));
81 1 : printf(_(" --start start streaming in a replication slot (for the slot's name see --slot)\n"));
3309 rhaas 82 1 : printf(_("\nOptions:\n"));
2131 peter_e 83 1 : printf(_(" -E, --endpos=LSN exit after receiving the specified LSN\n"));
3101 peter_e 84 GIC 1 : printf(_(" -f, --file=FILE receive log into this file, - for stdout\n"));
3241 andres 85 CBC 1 : printf(_(" -F --fsync-interval=SECS\n"
3101 peter_e 86 ECB : " time between fsyncs to the output file (default: %d)\n"), (fsync_interval / 1000));
2762 peter_e 87 CBC 1 : printf(_(" --if-not-exists do not error if slot already exists when creating a slot\n"));
3101 88 1 : printf(_(" -I, --startpos=LSN where in an existing slot should the streaming start\n"));
3309 rhaas 89 GIC 1 : printf(_(" -n, --no-loop do not loop on connection lost\n"));
3101 peter_e 90 1 : printf(_(" -o, --option=NAME[=VALUE]\n"
3101 peter_e 91 ECB : " pass option NAME with optional value VALUE to the\n"
92 : " output plugin\n"));
3101 peter_e 93 GIC 1 : printf(_(" -P, --plugin=PLUGIN use output plugin PLUGIN (default: %s)\n"), plugin);
3101 peter_e 94 CBC 1 : printf(_(" -s, --status-interval=SECS\n"
3101 peter_e 95 ECB : " time between status packets sent to server (default: %d)\n"), (standby_message_timeout / 1000));
3101 peter_e 96 CBC 1 : printf(_(" -S, --slot=SLOTNAME name of the logical replication slot\n"));
198 peter 97 1 : printf(_(" -t, --two-phase enable decoding of prepared transactions when creating a slot\n"));
3309 rhaas 98 1 : printf(_(" -v, --verbose output verbose messages\n"));
99 1 : printf(_(" -V, --version output version information, then exit\n"));
100 1 : printf(_(" -?, --help show this help, then exit\n"));
101 1 : printf(_("\nConnection options:\n"));
102 1 : printf(_(" -d, --dbname=DBNAME database to connect to\n"));
103 1 : printf(_(" -h, --host=HOSTNAME database server host or socket directory\n"));
104 1 : printf(_(" -p, --port=PORT database server port number\n"));
105 1 : printf(_(" -U, --username=NAME connect as specified database user\n"));
106 1 : printf(_(" -w, --no-password never prompt for password\n"));
107 1 : printf(_(" -W, --password force password prompt (should happen automatically)\n"));
1136 peter 108 1 : printf(_("\nReport bugs to <%s>.\n"), PACKAGE_BUGREPORT);
1136 peter 109 GIC 1 : printf(_("%s home page: <%s>\n"), PACKAGE_NAME, PACKAGE_URL);
3309 rhaas 110 1 : }
111 :
112 : /*
113 : * Send a Standby Status Update message to server.
3309 rhaas 114 ECB : */
115 : static bool
2236 tgl 116 GIC 25 : sendFeedback(PGconn *conn, TimestampTz now, bool force, bool replyRequested)
117 : {
118 : static XLogRecPtr last_written_lsn = InvalidXLogRecPtr;
119 : static XLogRecPtr last_fsync_lsn = InvalidXLogRecPtr;
3309 rhaas 120 ECB :
121 : char replybuf[1 + 8 + 8 + 8 + 8 + 1];
3309 rhaas 122 GIC 25 : int len = 0;
123 :
124 : /*
125 : * we normally don't want to send superfluous feedback, but if it's
126 : * because of a timeout we need to, otherwise wal_sender_timeout will kill
3260 bruce 127 ECB : * us.
3309 rhaas 128 EUB : */
3309 rhaas 129 GBC 25 : if (!force &&
3309 rhaas 130 UBC 0 : last_written_lsn == output_written_lsn &&
1061 noah 131 UIC 0 : last_fsync_lsn == output_fsync_lsn)
3309 rhaas 132 LBC 0 : return true;
3309 rhaas 133 EUB :
3309 rhaas 134 GIC 25 : if (verbose)
1469 peter 135 UIC 0 : pg_log_info("confirming write up to %X/%X, flush to %X/%X (slot %s)",
136 : LSN_FORMAT_ARGS(output_written_lsn),
137 : LSN_FORMAT_ARGS(output_fsync_lsn),
1418 tgl 138 ECB : replication_slot);
3309 rhaas 139 :
3309 rhaas 140 CBC 25 : replybuf[len] = 'r';
141 25 : len += 1;
3260 bruce 142 25 : fe_sendint64(output_written_lsn, &replybuf[len]); /* write */
3309 rhaas 143 25 : len += 8;
2118 tgl 144 25 : fe_sendint64(output_fsync_lsn, &replybuf[len]); /* flush */
3309 rhaas 145 25 : len += 8;
3260 bruce 146 25 : fe_sendint64(InvalidXLogRecPtr, &replybuf[len]); /* apply */
3309 rhaas 147 25 : len += 8;
3260 bruce 148 25 : fe_sendint64(now, &replybuf[len]); /* sendTime */
3309 rhaas 149 25 : len += 8;
2118 tgl 150 GIC 25 : replybuf[len] = replyRequested ? 1 : 0; /* replyRequested */
3309 rhaas 151 CBC 25 : len += 1;
3309 rhaas 152 ECB :
3309 rhaas 153 CBC 25 : startpos = output_written_lsn;
3309 rhaas 154 GIC 25 : last_written_lsn = output_written_lsn;
3309 rhaas 155 CBC 25 : last_fsync_lsn = output_fsync_lsn;
156 :
3309 rhaas 157 GBC 25 : if (PQputCopyData(conn, replybuf, len) <= 0 || PQflush(conn))
158 : {
1469 peter 159 UBC 0 : pg_log_error("could not send feedback packet: %s",
160 : PQerrorMessage(conn));
3309 rhaas 161 UIC 0 : return false;
3309 rhaas 162 ECB : }
163 :
3309 rhaas 164 GIC 25 : return true;
165 : }
3309 rhaas 166 ECB :
167 : static void
1562 peter 168 CBC 47 : disconnect_atexit(void)
3309 rhaas 169 ECB : {
3309 rhaas 170 CBC 47 : if (conn != NULL)
3309 rhaas 171 GIC 24 : PQfinish(conn);
172 47 : }
3309 rhaas 173 ECB :
174 : static bool
2236 tgl 175 CBC 25 : OutputFsync(TimestampTz now)
176 : {
3309 rhaas 177 25 : output_last_fsync = now;
178 :
179 25 : output_fsync_lsn = output_written_lsn;
3309 rhaas 180 EUB :
3309 rhaas 181 GIC 25 : if (fsync_interval <= 0)
3309 rhaas 182 LBC 0 : return true;
3309 rhaas 183 ECB :
3251 heikki.linnakangas 184 GIC 25 : if (!output_needs_fsync)
3309 rhaas 185 CBC 19 : return true;
186 :
3251 heikki.linnakangas 187 GIC 6 : output_needs_fsync = false;
3309 rhaas 188 ECB :
2833 andres 189 : /* can only fsync if it's a regular file */
2833 andres 190 GIC 6 : if (!output_isfile)
2833 andres 191 CBC 4 : return true;
2833 andres 192 EUB :
2833 andres 193 GIC 2 : if (fsync(outfd) != 0)
366 tgl 194 LBC 0 : pg_fatal("could not fsync file \"%s\": %m", outfile);
195 :
3309 rhaas 196 GIC 2 : return true;
197 : }
198 :
199 : /*
200 : * Start the log streaming
3309 rhaas 201 ECB : */
202 : static void
3114 andres 203 GIC 23 : StreamLogicalLog(void)
3309 rhaas 204 ECB : {
205 : PGresult *res;
3309 rhaas 206 GIC 23 : char *copybuf = NULL;
2236 tgl 207 23 : TimestampTz last_status = -1;
208 : int i;
3309 rhaas 209 ECB : PQExpBuffer query;
210 :
3309 rhaas 211 GIC 23 : output_written_lsn = InvalidXLogRecPtr;
212 23 : output_fsync_lsn = InvalidXLogRecPtr;
213 :
214 : /*
3309 rhaas 215 ECB : * Connect in replication mode to the server
3309 rhaas 216 EUB : */
3309 rhaas 217 CBC 23 : if (!conn)
3309 rhaas 218 UIC 0 : conn = GetConnection();
3309 rhaas 219 GBC 23 : if (!conn)
220 : /* Error message already written in GetConnection() */
3309 rhaas 221 UIC 0 : return;
222 :
223 : /*
3309 rhaas 224 ECB : * Start the replication
3309 rhaas 225 EUB : */
3309 rhaas 226 GIC 23 : if (verbose)
1469 peter 227 UIC 0 : pg_log_info("starting log streaming at %X/%X (slot %s)",
228 : LSN_FORMAT_ARGS(startpos),
229 : replication_slot);
3309 rhaas 230 ECB :
231 : /* Initiate the replication stream at specified location */
504 tgl 232 CBC 23 : query = createPQExpBuffer();
3309 rhaas 233 GIC 23 : appendPQExpBuffer(query, "START_REPLICATION SLOT \"%s\" LOGICAL %X/%X",
775 peter 234 23 : replication_slot, LSN_FORMAT_ARGS(startpos));
3309 rhaas 235 ECB :
236 : /* print options if there are any */
3309 rhaas 237 GIC 23 : if (noptions)
3309 rhaas 238 CBC 20 : appendPQExpBufferStr(query, " (");
239 :
3309 rhaas 240 GIC 63 : for (i = 0; i < noptions; i++)
3309 rhaas 241 ECB : {
242 : /* separator */
3309 rhaas 243 GIC 40 : if (i > 0)
244 20 : appendPQExpBufferStr(query, ", ");
3309 rhaas 245 ECB :
246 : /* write option name */
3309 rhaas 247 GIC 40 : appendPQExpBuffer(query, "\"%s\"", options[(i * 2)]);
3309 rhaas 248 ECB :
249 : /* write option value if specified */
3309 rhaas 250 GIC 40 : if (options[(i * 2) + 1] != NULL)
251 40 : appendPQExpBuffer(query, " '%s'", options[(i * 2) + 1]);
3309 rhaas 252 ECB : }
253 :
3309 rhaas 254 GIC 23 : if (noptions)
3309 rhaas 255 CBC 20 : appendPQExpBufferChar(query, ')');
3309 rhaas 256 ECB :
3309 rhaas 257 GIC 23 : res = PQexec(conn, query->data);
3309 rhaas 258 CBC 23 : if (PQresultStatus(res) != PGRES_COPY_BOTH)
259 : {
1469 peter 260 6 : pg_log_error("could not send replication command \"%s\": %s",
1469 peter 261 ECB : query->data, PQresultErrorMessage(res));
3309 rhaas 262 GIC 6 : PQclear(res);
3309 rhaas 263 CBC 6 : goto error;
3309 rhaas 264 ECB : }
3309 rhaas 265 GIC 17 : PQclear(res);
3309 rhaas 266 CBC 17 : resetPQExpBuffer(query);
3309 rhaas 267 EUB :
3309 rhaas 268 GIC 17 : if (verbose)
1469 peter 269 LBC 0 : pg_log_info("streaming initiated");
270 :
3309 rhaas 271 GIC 640 : while (!time_to_abort)
272 : {
273 : int r;
274 : int bytes_left;
275 : int bytes_written;
2236 tgl 276 ECB : TimestampTz now;
277 : int hdr_len;
2286 simon 278 CBC 639 : XLogRecPtr cur_record_lsn = InvalidXLogRecPtr;
279 :
3309 rhaas 280 639 : if (copybuf != NULL)
3309 rhaas 281 ECB : {
3309 rhaas 282 GIC 380 : PQfreemem(copybuf);
283 380 : copybuf = NULL;
284 : }
285 :
286 : /*
1029 andres 287 ECB : * Potentially send a status message to the primary.
288 : */
3309 rhaas 289 CBC 639 : now = feGetCurrentTimestamp();
3309 rhaas 290 ECB :
3309 rhaas 291 GIC 1261 : if (outfd != -1 &&
292 622 : feTimestampDifferenceExceeds(output_last_fsync, now,
3309 rhaas 293 ECB : fsync_interval))
294 : {
3309 rhaas 295 GIC 17 : if (!OutputFsync(now))
296 2 : goto error;
3309 rhaas 297 ECB : }
298 :
3309 rhaas 299 GIC 1278 : if (standby_message_timeout > 0 &&
300 639 : feTimestampDifferenceExceeds(last_status, now,
301 : standby_message_timeout))
3309 rhaas 302 ECB : {
3309 rhaas 303 EUB : /* Time to send feedback! */
3309 rhaas 304 GIC 17 : if (!sendFeedback(conn, now, true, false))
3309 rhaas 305 LBC 0 : goto error;
306 :
3309 rhaas 307 GIC 17 : last_status = now;
308 : }
3309 rhaas 309 ECB :
310 : /* got SIGHUP, close output file */
3251 heikki.linnakangas 311 GBC 639 : if (outfd != -1 && output_reopen && strcmp(outfile, "-") != 0)
3251 heikki.linnakangas 312 EUB : {
3251 heikki.linnakangas 313 UBC 0 : now = feGetCurrentTimestamp();
314 0 : if (!OutputFsync(now))
315 0 : goto error;
3251 heikki.linnakangas 316 UIC 0 : close(outfd);
3251 heikki.linnakangas 317 LBC 0 : outfd = -1;
318 : }
3251 heikki.linnakangas 319 GIC 639 : output_reopen = false;
3251 heikki.linnakangas 320 ECB :
321 : /* open the output file, if not open yet */
3250 heikki.linnakangas 322 GIC 639 : if (outfd == -1)
323 : {
2833 andres 324 ECB : struct stat statbuf;
325 :
3250 heikki.linnakangas 326 GIC 17 : if (strcmp(outfile, "-") == 0)
3250 heikki.linnakangas 327 GBC 17 : outfd = fileno(stdout);
328 : else
3250 heikki.linnakangas 329 LBC 0 : outfd = open(outfile, O_CREAT | O_APPEND | O_WRONLY | PG_BINARY,
330 : S_IRUSR | S_IWUSR);
3250 heikki.linnakangas 331 GBC 17 : if (outfd == -1)
3250 heikki.linnakangas 332 EUB : {
1469 peter 333 UIC 0 : pg_log_error("could not open log file \"%s\": %m", outfile);
3250 heikki.linnakangas 334 0 : goto error;
3250 heikki.linnakangas 335 ECB : }
336 :
2833 andres 337 GBC 17 : if (fstat(outfd, &statbuf) != 0)
619 michael 338 EUB : {
1469 peter 339 UIC 0 : pg_log_error("could not stat file \"%s\": %m", outfile);
619 michael 340 0 : goto error;
619 michael 341 ECB : }
342 :
2833 andres 343 GIC 17 : output_isfile = S_ISREG(statbuf.st_mode) && !isatty(outfd);
3250 heikki.linnakangas 344 ECB : }
345 :
3309 rhaas 346 CBC 639 : r = PQgetCopyData(conn, ©buf, 1);
3309 rhaas 347 GIC 639 : if (r == 0)
348 242 : {
349 : /*
350 : * In async mode, and no data available. We block on reading but
351 : * not more than the specified timeout, so that we can send a
352 : * response back to the client.
3309 rhaas 353 ECB : */
354 : fd_set input_mask;
2236 tgl 355 GIC 245 : TimestampTz message_target = 0;
2236 tgl 356 CBC 245 : TimestampTz fsync_target = 0;
357 : struct timeval timeout;
3309 rhaas 358 245 : struct timeval *timeoutptr = NULL;
359 :
2588 peter_e 360 GBC 245 : if (PQsocket(conn) < 0)
2588 peter_e 361 ECB : {
1469 peter 362 UIC 0 : pg_log_error("invalid socket: %s", PQerrorMessage(conn));
2588 peter_e 363 GIC 2 : goto error;
2588 peter_e 364 ECB : }
365 :
3309 rhaas 366 GIC 4165 : FD_ZERO(&input_mask);
367 245 : FD_SET(PQsocket(conn), &input_mask);
3309 rhaas 368 ECB :
369 : /* Compute when we need to wakeup to send a keepalive message. */
3309 rhaas 370 GIC 245 : if (standby_message_timeout)
371 245 : message_target = last_status + (standby_message_timeout - 1) *
372 : ((int64) 1000);
3309 rhaas 373 ECB :
374 : /* Compute when we need to wakeup to fsync the output file. */
3251 heikki.linnakangas 375 GIC 245 : if (fsync_interval > 0 && output_needs_fsync)
3309 rhaas 376 91 : fsync_target = output_last_fsync + (fsync_interval - 1) *
377 : ((int64) 1000);
3309 rhaas 378 ECB :
379 : /* Now compute when to wakeup. */
3309 rhaas 380 GIC 245 : if (message_target > 0 || fsync_target > 0)
381 : {
382 : TimestampTz targettime;
383 : long secs;
3309 rhaas 384 ECB : int usecs;
385 :
3309 rhaas 386 CBC 245 : targettime = message_target;
3309 rhaas 387 EUB :
3309 rhaas 388 GIC 245 : if (fsync_target > 0 && fsync_target < targettime)
3309 rhaas 389 LBC 0 : targettime = fsync_target;
390 :
3309 rhaas 391 GIC 245 : feTimestampDifference(now,
392 : targettime,
3309 rhaas 393 ECB : &secs,
3309 rhaas 394 EUB : &usecs);
3309 rhaas 395 GIC 245 : if (secs <= 0)
3309 rhaas 396 LBC 0 : timeout.tv_sec = 1; /* Always sleep at least 1 sec */
3309 rhaas 397 ECB : else
3309 rhaas 398 CBC 245 : timeout.tv_sec = secs;
3309 rhaas 399 GIC 245 : timeout.tv_usec = usecs;
400 245 : timeoutptr = &timeout;
3309 rhaas 401 ECB : }
402 :
3309 rhaas 403 GIC 245 : r = select(PQsocket(conn) + 1, &input_mask, NULL, NULL, timeoutptr);
404 245 : if (r == 0 || (r < 0 && errno == EINTR))
405 : {
406 : /*
407 : * Got a timeout or signal. Continue the loop and either
408 : * deliver a status packet to the server or just go back into
3309 rhaas 409 ECB : * blocking.
410 : */
3309 rhaas 411 CBC 243 : continue;
412 : }
3309 rhaas 413 GBC 244 : else if (r < 0)
3309 rhaas 414 EUB : {
716 peter 415 UIC 0 : pg_log_error("%s() failed: %m", "select");
3309 rhaas 416 0 : goto error;
417 : }
3309 rhaas 418 ECB :
419 : /* Else there is actually data on the socket */
3309 rhaas 420 CBC 244 : if (PQconsumeInput(conn) == 0)
421 : {
1469 peter 422 2 : pg_log_error("could not receive data from WAL stream: %s",
423 : PQerrorMessage(conn));
3309 rhaas 424 2 : goto error;
425 : }
3309 rhaas 426 GIC 242 : continue;
427 : }
3309 rhaas 428 ECB :
429 : /* End of copy stream */
3309 rhaas 430 GIC 394 : if (r == -1)
431 14 : break;
3309 rhaas 432 ECB :
433 : /* Failure while reading the copy stream */
3309 rhaas 434 GBC 387 : if (r == -2)
435 : {
1469 peter 436 UBC 0 : pg_log_error("could not read COPY data: %s",
437 : PQerrorMessage(conn));
3309 rhaas 438 UIC 0 : goto error;
439 : }
3309 rhaas 440 ECB :
441 : /* Check the message type. */
3309 rhaas 442 GIC 387 : if (copybuf[0] == 'k')
443 297 : {
444 : int pos;
3309 rhaas 445 ECB : bool replyRequested;
446 : XLogRecPtr walEnd;
2286 simon 447 GIC 299 : bool endposReached = false;
448 :
449 : /*
450 : * Parse the keepalive message, enclosed in the CopyData message.
451 : * We just check if the server requested a reply, and ignore the
3309 rhaas 452 ECB : * rest.
453 : */
3309 rhaas 454 CBC 299 : pos = 1; /* skip msgtype 'k' */
3309 rhaas 455 GIC 299 : walEnd = fe_recvint64(©buf[pos]);
3309 rhaas 456 CBC 299 : output_written_lsn = Max(walEnd, output_written_lsn);
457 :
458 299 : pos += 8; /* read walEnd */
459 :
460 299 : pos += 8; /* skip sendTime */
461 :
3309 rhaas 462 GBC 299 : if (r < pos + 1)
3309 rhaas 463 EUB : {
1469 peter 464 UIC 0 : pg_log_error("streaming header too small: %d", r);
3309 rhaas 465 LBC 0 : goto error;
466 : }
3309 rhaas 467 CBC 299 : replyRequested = copybuf[pos];
468 :
2286 simon 469 GIC 299 : if (endpos != InvalidXLogRecPtr && walEnd >= endpos)
470 : {
471 : /*
472 : * If there's nothing to read on the socket until a keepalive
473 : * we know that the server has nothing to send us; and if
474 : * walEnd has passed endpos, we know nothing else can have
2286 simon 475 ECB : * committed before endpos. So we can bail out now.
476 : */
2286 simon 477 GIC 2 : endposReached = true;
478 : }
3309 rhaas 479 ECB :
480 : /* Send a reply, if necessary */
2286 simon 481 CBC 299 : if (replyRequested || endposReached)
2286 simon 482 EUB : {
2286 simon 483 CBC 3 : if (!flushAndSendFeedback(conn, &now))
3309 rhaas 484 UIC 0 : goto error;
3309 rhaas 485 GIC 3 : last_status = now;
3309 rhaas 486 ECB : }
487 :
2286 simon 488 CBC 299 : if (endposReached)
2286 simon 489 ECB : {
2286 simon 490 CBC 2 : prepareToTerminate(conn, endpos, true, InvalidXLogRecPtr);
2286 simon 491 GIC 2 : time_to_abort = true;
492 2 : break;
2286 simon 493 ECB : }
494 :
3309 rhaas 495 CBC 297 : continue;
496 : }
3309 rhaas 497 GBC 88 : else if (copybuf[0] != 'w')
498 : {
1469 peter 499 UBC 0 : pg_log_error("unrecognized streaming header: \"%c\"",
500 : copybuf[0]);
3309 rhaas 501 UIC 0 : goto error;
502 : }
503 :
504 : /*
505 : * Read the header of the XLogData message, enclosed in the CopyData
506 : * message. We only need the WAL location field (dataStart), the rest
3309 rhaas 507 ECB : * of the header is ignored.
508 : */
3309 rhaas 509 CBC 88 : hdr_len = 1; /* msgtype 'w' */
510 88 : hdr_len += 8; /* dataStart */
511 88 : hdr_len += 8; /* walEnd */
3309 rhaas 512 GIC 88 : hdr_len += 8; /* sendTime */
3309 rhaas 513 GBC 88 : if (r < hdr_len + 1)
3309 rhaas 514 EUB : {
1469 peter 515 UIC 0 : pg_log_error("streaming header too small: %d", r);
3309 rhaas 516 0 : goto error;
517 : }
3309 rhaas 518 ECB :
519 : /* Extract WAL location for this block */
2286 simon 520 CBC 88 : cur_record_lsn = fe_recvint64(©buf[1]);
521 :
2286 simon 522 GIC 88 : if (endpos != InvalidXLogRecPtr && cur_record_lsn > endpos)
523 : {
524 : /*
525 : * We've read past our endpoint, so prepare to go away being
2286 simon 526 EUB : * cautious about what happens to our output data.
527 : */
2286 simon 528 UBC 0 : if (!flushAndSendFeedback(conn, &now))
529 0 : goto error;
530 0 : prepareToTerminate(conn, endpos, false, cur_record_lsn);
2286 simon 531 UIC 0 : time_to_abort = true;
532 0 : break;
3309 rhaas 533 ECB : }
534 :
2286 simon 535 CBC 88 : output_written_lsn = Max(cur_record_lsn, output_written_lsn);
2286 simon 536 ECB :
3309 rhaas 537 GIC 88 : bytes_left = r - hdr_len;
538 88 : bytes_written = 0;
3309 rhaas 539 ECB :
540 : /* signal that a fsync is needed */
3251 heikki.linnakangas 541 CBC 88 : output_needs_fsync = true;
542 :
3309 rhaas 543 GIC 176 : while (bytes_left)
544 : {
3309 rhaas 545 ECB : int ret;
546 :
3309 rhaas 547 GIC 176 : ret = write(outfd,
548 88 : copybuf + hdr_len + bytes_written,
3309 rhaas 549 ECB : bytes_left);
550 :
3309 rhaas 551 GBC 88 : if (ret < 0)
552 : {
508 peter 553 UBC 0 : pg_log_error("could not write %d bytes to log file \"%s\": %m",
554 : bytes_left, outfile);
3309 rhaas 555 UIC 0 : goto error;
556 : }
3309 rhaas 557 ECB :
558 : /* Write was successful, advance our position */
3309 rhaas 559 GIC 88 : bytes_written += ret;
560 88 : bytes_left -= ret;
3309 rhaas 561 ECB : }
562 :
3309 rhaas 563 GBC 88 : if (write(outfd, "\n", 1) != 1)
564 : {
508 peter 565 UBC 0 : pg_log_error("could not write %d bytes to log file \"%s\": %m",
566 : 1, outfile);
3309 rhaas 567 UIC 0 : goto error;
3309 rhaas 568 ECB : }
569 :
2286 simon 570 GIC 88 : if (endpos != InvalidXLogRecPtr && cur_record_lsn == endpos)
2286 simon 571 ECB : {
2286 simon 572 EUB : /* endpos was exactly the record we just processed, we're done */
2286 simon 573 CBC 5 : if (!flushAndSendFeedback(conn, &now))
2286 simon 574 LBC 0 : goto error;
2286 simon 575 CBC 5 : prepareToTerminate(conn, endpos, false, cur_record_lsn);
2286 simon 576 GIC 5 : time_to_abort = true;
577 5 : break;
578 : }
3309 rhaas 579 ECB : }
580 :
3309 rhaas 581 GIC 15 : res = PQgetResult(conn);
2286 simon 582 CBC 15 : if (PQresultStatus(res) == PGRES_COPY_OUT)
583 : {
1061 noah 584 GIC 7 : PQclear(res);
585 :
586 : /*
587 : * We're doing a client-initiated clean exit and have sent CopyDone to
588 : * the server. Drain any messages, so we don't miss a last-minute
589 : * ErrorResponse. The walsender stops generating XLogData records once
590 : * it sees CopyDone, so expect this to finish quickly. After CopyDone,
591 : * it's too late for sendFeedback(), even if this were to take a long
592 : * time. Hence, use synchronous-mode PQgetCopyData().
2286 simon 593 ECB : */
594 : while (1)
1061 noah 595 GIC 3 : {
1061 noah 596 ECB : int r;
597 :
1061 noah 598 CBC 10 : if (copybuf != NULL)
1061 noah 599 ECB : {
1061 noah 600 GIC 10 : PQfreemem(copybuf);
1061 noah 601 CBC 10 : copybuf = NULL;
1061 noah 602 ECB : }
1061 noah 603 CBC 10 : r = PQgetCopyData(conn, ©buf, 0);
604 10 : if (r == -1)
1061 noah 605 GIC 7 : break;
1061 noah 606 GBC 3 : if (r == -2)
607 : {
1061 noah 608 UBC 0 : pg_log_error("could not read COPY data: %s",
1061 noah 609 EUB : PQerrorMessage(conn));
1061 noah 610 UIC 0 : time_to_abort = false; /* unclean exit */
611 0 : goto error;
612 : }
1061 noah 613 ECB : }
614 :
1061 noah 615 CBC 7 : res = PQgetResult(conn);
616 : }
617 15 : if (PQresultStatus(res) != PGRES_COMMAND_OK)
618 : {
1469 peter 619 7 : pg_log_error("unexpected termination of replication stream: %s",
620 : PQresultErrorMessage(res));
3309 rhaas 621 7 : goto error;
622 : }
623 8 : PQclear(res);
624 :
3309 rhaas 625 GBC 8 : if (outfd != -1 && strcmp(outfile, "-") != 0)
626 : {
2236 tgl 627 UIC 0 : TimestampTz t = feGetCurrentTimestamp();
3309 rhaas 628 EUB :
629 : /* no need to jump to error on failure here, we're finishing anyway */
3309 rhaas 630 UBC 0 : OutputFsync(t);
3309 rhaas 631 EUB :
3309 rhaas 632 UIC 0 : if (close(outfd) != 0)
1469 peter 633 LBC 0 : pg_log_error("could not close file \"%s\": %m", outfile);
3309 rhaas 634 ECB : }
3309 rhaas 635 CBC 8 : outfd = -1;
3309 rhaas 636 GIC 23 : error:
3261 heikki.linnakangas 637 GBC 23 : if (copybuf != NULL)
3261 heikki.linnakangas 638 EUB : {
3261 heikki.linnakangas 639 UIC 0 : PQfreemem(copybuf);
3261 heikki.linnakangas 640 LBC 0 : copybuf = NULL;
3261 heikki.linnakangas 641 ECB : }
3309 rhaas 642 CBC 23 : destroyPQExpBuffer(query);
3309 rhaas 643 GIC 23 : PQfinish(conn);
644 23 : conn = NULL;
645 : }
646 :
647 : /*
648 : * Unfortunately we can't do sensible signal handling on windows...
649 : */
650 : #ifndef WIN32
651 :
652 : /*
653 : * When SIGINT/SIGTERM are caught, just tell the system to exit at the next
654 : * possible moment.
3309 rhaas 655 ECB : */
656 : static void
207 tgl 657 GNC 1 : sigexit_handler(SIGNAL_ARGS)
3309 rhaas 658 ECB : {
3309 rhaas 659 GIC 1 : time_to_abort = true;
660 1 : }
661 :
662 : /*
663 : * Trigger the output file to be reopened.
3309 rhaas 664 EUB : */
665 : static void
207 tgl 666 UNC 0 : sighup_handler(SIGNAL_ARGS)
3309 rhaas 667 EUB : {
3309 rhaas 668 UIC 0 : output_reopen = true;
669 0 : }
670 : #endif
671 :
3309 rhaas 672 ECB :
673 : int
3309 rhaas 674 GIC 55 : main(int argc, char **argv)
675 : {
676 : static struct option long_options[] = {
677 : /* general options */
678 : {"file", required_argument, NULL, 'f'},
679 : {"fsync-interval", required_argument, NULL, 'F'},
680 : {"no-loop", no_argument, NULL, 'n'},
681 : {"verbose", no_argument, NULL, 'v'},
682 : {"two-phase", no_argument, NULL, 't'},
683 : {"version", no_argument, NULL, 'V'},
684 : {"help", no_argument, NULL, '?'},
685 : /* connection options */
686 : {"dbname", required_argument, NULL, 'd'},
687 : {"host", required_argument, NULL, 'h'},
688 : {"port", required_argument, NULL, 'p'},
689 : {"username", required_argument, NULL, 'U'},
690 : {"no-password", no_argument, NULL, 'w'},
691 : {"password", no_argument, NULL, 'W'},
692 : /* replication options */
693 : {"startpos", required_argument, NULL, 'I'},
694 : {"endpos", required_argument, NULL, 'E'},
695 : {"option", required_argument, NULL, 'o'},
696 : {"plugin", required_argument, NULL, 'P'},
697 : {"status-interval", required_argument, NULL, 's'},
698 : {"slot", required_argument, NULL, 'S'},
699 : /* action */
700 : {"create-slot", no_argument, NULL, 1},
701 : {"start", no_argument, NULL, 2},
702 : {"drop-slot", no_argument, NULL, 3},
703 : {"if-not-exists", no_argument, NULL, 4},
704 : {NULL, 0, NULL, 0}
705 : };
706 : int c;
707 : int option_index;
708 : uint32 hi,
709 : lo;
3112 andres 710 ECB : char *db_name;
3309 rhaas 711 :
1469 peter 712 CBC 55 : pg_logging_init(argv[0]);
3309 rhaas 713 GIC 55 : progname = get_progname(argv[0]);
2659 alvherre 714 CBC 55 : set_pglocale_pgservice(argv[0], PG_TEXTDOMAIN("pg_basebackup"));
715 :
3309 rhaas 716 55 : if (argc > 1)
717 : {
718 54 : if (strcmp(argv[1], "--help") == 0 || strcmp(argv[1], "-?") == 0)
3309 rhaas 719 ECB : {
3309 rhaas 720 GIC 1 : usage();
3309 rhaas 721 CBC 1 : exit(0);
3309 rhaas 722 ECB : }
3309 rhaas 723 GIC 53 : else if (strcmp(argv[1], "-V") == 0 ||
3309 rhaas 724 CBC 53 : strcmp(argv[1], "--version") == 0)
3309 rhaas 725 ECB : {
3309 rhaas 726 GIC 1 : puts("pg_recvlogical (PostgreSQL) " PG_VERSION);
727 1 : exit(0);
728 : }
3309 rhaas 729 ECB : }
730 :
118 peter 731 GNC 321 : while ((c = getopt_long(argc, argv, "E:f:F:ntvd:h:p:U:wWI:o:P:s:S:",
3309 rhaas 732 CBC 321 : long_options, &option_index)) != -1)
733 : {
3309 rhaas 734 GIC 269 : switch (c)
3309 rhaas 735 ECB : {
736 : /* general options */
3309 rhaas 737 CBC 24 : case 'f':
3309 rhaas 738 GBC 24 : outfile = pg_strdup(optarg);
739 24 : break;
3241 andres 740 UIC 0 : case 'F':
624 michael 741 0 : if (!option_parse_int(optarg, "-F/--fsync-interval", 0,
624 michael 742 EUB : INT_MAX / 1000,
743 : &fsync_interval))
3241 andres 744 UBC 0 : exit(1);
624 michael 745 LBC 0 : fsync_interval *= 1000;
3241 andres 746 0 : break;
3309 rhaas 747 CBC 23 : case 'n':
748 23 : noloop = 1;
749 23 : break;
648 akapila 750 GBC 2 : case 't':
648 akapila 751 GIC 2 : two_phase = true;
648 akapila 752 CBC 2 : break;
118 peter 753 UNC 0 : case 'v':
754 0 : verbose++;
755 0 : break;
2905 andres 756 ECB : /* connection options */
3309 rhaas 757 CBC 50 : case 'd':
3309 rhaas 758 GBC 50 : dbname = pg_strdup(optarg);
759 50 : break;
3309 rhaas 760 UBC 0 : case 'h':
761 0 : dbhost = pg_strdup(optarg);
762 0 : break;
763 0 : case 'p':
764 0 : dbport = pg_strdup(optarg);
765 0 : break;
766 0 : case 'U':
767 0 : dbuser = pg_strdup(optarg);
768 0 : break;
769 0 : case 'w':
770 0 : dbgetpassword = -1;
771 0 : break;
772 0 : case 'W':
3309 rhaas 773 UIC 0 : dbgetpassword = 1;
3309 rhaas 774 UBC 0 : break;
3309 rhaas 775 EUB : /* replication options */
3241 andres 776 UBC 0 : case 'I':
777 0 : if (sscanf(optarg, "%X/%X", &hi, &lo) != 2)
366 tgl 778 0 : pg_fatal("could not parse start position \"%s\"", optarg);
3241 andres 779 LBC 0 : startpos = ((uint64) hi) << 32 | lo;
780 0 : break;
2286 simon 781 GBC 8 : case 'E':
2286 simon 782 CBC 8 : if (sscanf(optarg, "%X/%X", &hi, &lo) != 2)
366 tgl 783 LBC 0 : pg_fatal("could not parse end position \"%s\"", optarg);
2286 simon 784 CBC 8 : endpos = ((uint64) hi) << 32 | lo;
2286 simon 785 GIC 8 : break;
3309 rhaas 786 CBC 40 : case 'o':
3309 rhaas 787 ECB : {
3260 bruce 788 GIC 40 : char *data = pg_strdup(optarg);
3260 bruce 789 CBC 40 : char *val = strchr(data, '=');
790 :
3309 rhaas 791 GIC 40 : if (val != NULL)
3309 rhaas 792 ECB : {
793 : /* remove =; separate data from val */
3309 rhaas 794 GIC 40 : *val = '\0';
795 40 : val++;
3309 rhaas 796 ECB : }
797 :
3309 rhaas 798 GIC 40 : noptions += 1;
3260 bruce 799 CBC 40 : options = pg_realloc(options, sizeof(char *) * noptions * 2);
3309 rhaas 800 ECB :
3309 rhaas 801 GIC 40 : options[(noptions - 1) * 2] = data;
802 40 : options[(noptions - 1) * 2 + 1] = val;
3309 rhaas 803 ECB : }
804 :
3309 rhaas 805 CBC 40 : break;
806 21 : case 'P':
3309 rhaas 807 GBC 21 : plugin = pg_strdup(optarg);
808 21 : break;
3309 rhaas 809 UIC 0 : case 's':
624 michael 810 0 : if (!option_parse_int(optarg, "-s/--status-interval", 0,
624 michael 811 EUB : INT_MAX / 1000,
812 : &standby_message_timeout))
3309 rhaas 813 UBC 0 : exit(1);
624 michael 814 LBC 0 : standby_message_timeout *= 1000;
3309 rhaas 815 0 : break;
3309 rhaas 816 CBC 51 : case 'S':
3309 rhaas 817 GIC 51 : replication_slot = pg_strdup(optarg);
3309 rhaas 818 CBC 51 : break;
3309 rhaas 819 ECB : /* action */
3309 rhaas 820 CBC 23 : case 1:
821 23 : do_create_slot = true;
822 23 : break;
823 25 : case 2:
824 25 : do_start_slot = true;
825 25 : break;
826 1 : case 3:
3309 rhaas 827 GBC 1 : do_drop_slot = true;
828 1 : break;
2828 andres 829 UBC 0 : case 4:
2828 andres 830 UIC 0 : slot_exists_ok = true;
2828 andres 831 LBC 0 : break;
832 :
3309 rhaas 833 CBC 1 : default:
366 tgl 834 ECB : /* getopt_long already emitted a complaint */
366 tgl 835 GIC 1 : pg_log_error_hint("Try \"%s --help\" for more information.", progname);
3309 rhaas 836 1 : exit(1);
837 : }
838 : }
839 :
840 : /*
3309 rhaas 841 ECB : * Any non-option arguments?
842 : */
3309 rhaas 843 GBC 52 : if (optind < argc)
844 : {
1469 peter 845 UBC 0 : pg_log_error("too many command-line arguments (first is \"%s\")",
1469 peter 846 EUB : argv[optind]);
366 tgl 847 UIC 0 : pg_log_error_hint("Try \"%s --help\" for more information.", progname);
3309 rhaas 848 0 : exit(1);
849 : }
850 :
851 : /*
3309 rhaas 852 ECB : * Required arguments
853 : */
3309 rhaas 854 CBC 52 : if (replication_slot == NULL)
3309 rhaas 855 ECB : {
1469 peter 856 CBC 1 : pg_log_error("no slot specified");
366 tgl 857 GIC 1 : pg_log_error_hint("Try \"%s --help\" for more information.", progname);
3309 rhaas 858 1 : exit(1);
3309 rhaas 859 ECB : }
860 :
3309 rhaas 861 CBC 51 : if (do_start_slot && outfile == NULL)
3309 rhaas 862 ECB : {
1469 peter 863 CBC 1 : pg_log_error("no target file specified");
366 tgl 864 GIC 1 : pg_log_error_hint("Try \"%s --help\" for more information.", progname);
3309 rhaas 865 1 : exit(1);
3309 rhaas 866 ECB : }
867 :
3309 rhaas 868 CBC 50 : if (!do_drop_slot && dbname == NULL)
3309 rhaas 869 ECB : {
1469 peter 870 CBC 1 : pg_log_error("no database specified");
366 tgl 871 GIC 1 : pg_log_error_hint("Try \"%s --help\" for more information.", progname);
3309 rhaas 872 1 : exit(1);
3309 rhaas 873 ECB : }
874 :
3309 rhaas 875 CBC 49 : if (!do_drop_slot && !do_create_slot && !do_start_slot)
3309 rhaas 876 ECB : {
1469 peter 877 CBC 1 : pg_log_error("at least one action needs to be specified");
366 tgl 878 GIC 1 : pg_log_error_hint("Try \"%s --help\" for more information.", progname);
3309 rhaas 879 1 : exit(1);
3309 rhaas 880 ECB : }
881 :
3309 rhaas 882 GBC 48 : if (do_drop_slot && (do_create_slot || do_start_slot))
3309 rhaas 883 EUB : {
1469 peter 884 UBC 0 : pg_log_error("cannot use --create-slot or --start together with --drop-slot");
366 tgl 885 UIC 0 : pg_log_error_hint("Try \"%s --help\" for more information.", progname);
3309 rhaas 886 0 : exit(1);
3309 rhaas 887 ECB : }
888 :
3162 andres 889 GBC 48 : if (startpos != InvalidXLogRecPtr && (do_create_slot || do_drop_slot))
3309 rhaas 890 EUB : {
1469 peter 891 UBC 0 : pg_log_error("cannot use --create-slot or --drop-slot together with --startpos");
366 tgl 892 UIC 0 : pg_log_error_hint("Try \"%s --help\" for more information.", progname);
3309 rhaas 893 0 : exit(1);
3309 rhaas 894 ECB : }
895 :
2286 simon 896 GBC 48 : if (endpos != InvalidXLogRecPtr && !do_start_slot)
2286 simon 897 EUB : {
1469 peter 898 UBC 0 : pg_log_error("--endpos may only be specified with --start");
366 tgl 899 UIC 0 : pg_log_error_hint("Try \"%s --help\" for more information.", progname);
2286 simon 900 0 : exit(1);
2286 simon 901 ECB : }
902 :
648 akapila 903 CBC 48 : if (two_phase && !do_create_slot)
648 akapila 904 ECB : {
648 akapila 905 CBC 1 : pg_log_error("--two-phase may only be specified with --create-slot");
366 tgl 906 GIC 1 : pg_log_error_hint("Try \"%s --help\" for more information.", progname);
648 akapila 907 1 : exit(1);
908 : }
909 :
910 : /*
911 : * Obtain a connection to server. Notably, if we need a password, we want
504 tgl 912 ECB : * to collect it from the user immediately.
3309 rhaas 913 : */
3112 andres 914 GIC 47 : conn = GetConnection();
3112 andres 915 GBC 47 : if (!conn)
3112 andres 916 ECB : /* Error message already written in GetConnection() */
3112 andres 917 UIC 0 : exit(1);
1562 peter 918 GIC 47 : atexit(disconnect_atexit);
919 :
920 : /*
921 : * Trap signals. (Don't do this until after the initial password prompt,
922 : * if one is needed, in GetConnection.)
504 tgl 923 ECB : */
924 : #ifndef WIN32
207 dgustafsson 925 GNC 47 : pqsignal(SIGINT, sigexit_handler);
926 47 : pqsignal(SIGTERM, sigexit_handler);
504 tgl 927 GIC 47 : pqsignal(SIGHUP, sighup_handler);
928 : #endif
929 :
930 : /*
931 : * Run IDENTIFY_SYSTEM to make sure we connected using a database specific
932 : * replication connection.
3112 andres 933 ECB : */
3112 andres 934 GBC 47 : if (!RunIdentifySystem(conn, NULL, NULL, NULL, &db_name))
1562 peter 935 UIC 0 : exit(1);
3309 rhaas 936 ECB :
3112 andres 937 GBC 47 : if (db_name == NULL)
366 tgl 938 UIC 0 : pg_fatal("could not establish database-specific replication connection");
939 :
940 : /*
941 : * Set umask so that directories/files are created with the same
942 : * permissions as directories/files in the source data directory.
943 : *
944 : * pg_mode_mask is set to owner-only by default and then updated in
945 : * GetConnection() where we get the mode from the server-side with
946 : * RetrieveDataDirCreatePerm() and then call SetDataDirectoryCreatePerm().
1828 sfrost 947 ECB : */
1828 sfrost 948 GIC 47 : umask(pg_mode_mask);
949 :
3112 andres 950 ECB : /* Drop a replication slot. */
3309 rhaas 951 GIC 47 : if (do_drop_slot)
3309 rhaas 952 ECB : {
3309 rhaas 953 GBC 1 : if (verbose)
1469 peter 954 UIC 0 : pg_log_info("dropping replication slot \"%s\"", replication_slot);
3309 rhaas 955 ECB :
3112 andres 956 GBC 1 : if (!DropReplicationSlot(conn, replication_slot))
1562 peter 957 UIC 0 : exit(1);
958 : }
959 :
3112 andres 960 ECB : /* Create a replication slot. */
3309 rhaas 961 GIC 47 : if (do_create_slot)
3309 rhaas 962 ECB : {
3309 rhaas 963 GBC 23 : if (verbose)
1469 peter 964 UIC 0 : pg_log_info("creating replication slot \"%s\"", replication_slot);
3309 rhaas 965 ECB :
2021 peter_e 966 GIC 23 : if (!CreateReplicationSlot(conn, replication_slot, plugin, false,
648 akapila 967 EUB : false, false, slot_exists_ok, two_phase))
1562 peter 968 LBC 0 : exit(1);
2828 andres 969 GIC 23 : startpos = InvalidXLogRecPtr;
970 : }
3309 rhaas 971 ECB :
3309 rhaas 972 CBC 47 : if (!do_start_slot)
1562 peter 973 GIC 24 : exit(0);
974 :
975 : /* Stream loop */
976 : while (true)
3309 rhaas 977 ECB : {
3112 andres 978 CBC 23 : StreamLogicalLog();
3309 rhaas 979 GIC 23 : if (time_to_abort)
980 : {
981 : /*
982 : * We've been Ctrl-C'ed or reached an exit limit condition. That's
983 : * not an error, so exit without an errorcode.
3309 rhaas 984 ECB : */
1562 peter 985 GIC 8 : exit(0);
3309 rhaas 986 ECB : }
3309 rhaas 987 CBC 15 : else if (noloop)
366 tgl 988 GIC 15 : pg_fatal("disconnected");
989 : else
990 : {
3309 rhaas 991 EUB : /* translator: check source for value for %d */
1469 peter 992 UIC 0 : pg_log_info("disconnected; waiting %d seconds to try again",
1469 peter 993 EUB : RECONNECT_SLEEP_TIME);
3309 rhaas 994 UIC 0 : pg_usleep(RECONNECT_SLEEP_TIME * 1000000);
995 : }
996 : }
997 : }
998 :
999 : /*
1000 : * Fsync our output data, and send a feedback message to the server. Returns
1001 : * true if successful, false otherwise.
1002 : *
1003 : * If successful, *now is updated to the current timestamp just before sending
1004 : * feedback.
1005 : */
2286 simon 1006 ECB : static bool
2286 simon 1007 GIC 8 : flushAndSendFeedback(PGconn *conn, TimestampTz *now)
1008 : {
2286 simon 1009 ECB : /* flush data to disk, so that we send a recent flush pointer */
2286 simon 1010 GBC 8 : if (!OutputFsync(*now))
2286 simon 1011 LBC 0 : return false;
2286 simon 1012 CBC 8 : *now = feGetCurrentTimestamp();
2286 simon 1013 GBC 8 : if (!sendFeedback(conn, *now, true, false))
2286 simon 1014 UIC 0 : return false;
2286 simon 1015 ECB :
2286 simon 1016 GIC 8 : return true;
1017 : }
1018 :
1019 : /*
1020 : * Try to inform the server about our upcoming demise, but don't wait around or
1021 : * retry on failure.
1022 : */
2286 simon 1023 ECB : static void
2286 simon 1024 GIC 7 : prepareToTerminate(PGconn *conn, XLogRecPtr endpos, bool keepalive, XLogRecPtr lsn)
2286 simon 1025 ECB : {
2286 simon 1026 CBC 7 : (void) PQputCopyEnd(conn, NULL);
2286 simon 1027 GIC 7 : (void) PQflush(conn);
2286 simon 1028 ECB :
2286 simon 1029 GIC 7 : if (verbose)
2286 simon 1030 EUB : {
2286 simon 1031 UBC 0 : if (keepalive)
1370 peter 1032 UIC 0 : pg_log_info("end position %X/%X reached by keepalive",
1033 : LSN_FORMAT_ARGS(endpos));
2286 simon 1034 EUB : else
1370 peter 1035 UIC 0 : pg_log_info("end position %X/%X reached by WAL record at %X/%X",
1036 : LSN_FORMAT_ARGS(endpos), LSN_FORMAT_ARGS(lsn));
2286 simon 1037 ECB : }
2286 simon 1038 GIC 7 : }
|