Age Owner Branch data TLA Line data Source code
1 : : /*-------------------------------------------------------------------------
2 : : *
3 : : * pg_recvlogical.c - receive data from a logical decoding slot in a streaming
4 : : * fashion and write it to a local file.
5 : : *
6 : : * Portions Copyright (c) 1996-2024, PostgreSQL Global Development Group
7 : : *
8 : : * IDENTIFICATION
9 : : * src/bin/pg_basebackup/pg_recvlogical.c
10 : : *-------------------------------------------------------------------------
11 : : */
12 : :
13 : : #include "postgres_fe.h"
14 : :
15 : : #include <dirent.h>
16 : : #include <limits.h>
17 : : #include <sys/select.h>
18 : : #include <sys/stat.h>
19 : : #include <unistd.h>
20 : :
21 : : #include "access/xlog_internal.h"
22 : : #include "common/fe_memutils.h"
23 : : #include "common/file_perm.h"
24 : : #include "common/logging.h"
25 : : #include "fe_utils/option_utils.h"
26 : : #include "getopt_long.h"
27 : : #include "libpq-fe.h"
28 : : #include "libpq/pqsignal.h"
29 : : #include "pqexpbuffer.h"
30 : : #include "streamutil.h"
31 : :
32 : : /* Time to sleep between reconnection attempts */
33 : : #define RECONNECT_SLEEP_TIME 5
34 : :
35 : : typedef enum
36 : : {
37 : : STREAM_STOP_NONE,
38 : : STREAM_STOP_END_OF_WAL,
39 : : STREAM_STOP_KEEPALIVE,
40 : : STREAM_STOP_SIGNAL
41 : : } StreamStopReason;
42 : :
43 : : /* Global Options */
44 : : static char *outfile = NULL;
45 : : static int verbose = 0;
46 : : static bool two_phase = false;
47 : : static int noloop = 0;
48 : : static int standby_message_timeout = 10 * 1000; /* 10 sec = default */
49 : : static int fsync_interval = 10 * 1000; /* 10 sec = default */
50 : : static XLogRecPtr startpos = InvalidXLogRecPtr;
51 : : static XLogRecPtr endpos = InvalidXLogRecPtr;
52 : : static bool do_create_slot = false;
53 : : static bool slot_exists_ok = false;
54 : : static bool do_start_slot = false;
55 : : static bool do_drop_slot = false;
56 : : static char *replication_slot = NULL;
57 : :
58 : : /* filled pairwise with option, value. value may be NULL */
59 : : static char **options;
60 : : static size_t noptions = 0;
61 : : static const char *plugin = "test_decoding";
62 : :
63 : : /* Global State */
64 : : static int outfd = -1;
65 : : static volatile sig_atomic_t time_to_abort = false;
66 : : static volatile sig_atomic_t stop_reason = STREAM_STOP_NONE;
67 : : static volatile sig_atomic_t output_reopen = false;
68 : : static bool output_isfile;
69 : : static TimestampTz output_last_fsync = -1;
70 : : static bool output_needs_fsync = false;
71 : : static XLogRecPtr output_written_lsn = InvalidXLogRecPtr;
72 : : static XLogRecPtr output_fsync_lsn = InvalidXLogRecPtr;
73 : :
74 : : static void usage(void);
75 : : static void StreamLogicalLog(void);
76 : : static bool flushAndSendFeedback(PGconn *conn, TimestampTz *now);
77 : : static void prepareToTerminate(PGconn *conn, XLogRecPtr endpos,
78 : : StreamStopReason reason,
79 : : XLogRecPtr lsn);
80 : :
81 : : static void
3680 rhaas@postgresql.org 82 :CBC 1 : usage(void)
83 : : {
3313 bruce@momjian.us 84 : 1 : printf(_("%s controls PostgreSQL logical decoding streams.\n\n"),
85 : : progname);
3680 rhaas@postgresql.org 86 : 1 : printf(_("Usage:\n"));
87 : 1 : printf(_(" %s [OPTION]...\n"), progname);
3472 peter_e@gmx.net 88 : 1 : printf(_("\nAction to be performed:\n"));
89 : 1 : printf(_(" --create-slot create a new replication slot (for the slot's name see --slot)\n"));
90 : 1 : printf(_(" --drop-slot drop the replication slot (for the slot's name see --slot)\n"));
91 : 1 : printf(_(" --start start streaming in a replication slot (for the slot's name see --slot)\n"));
3680 rhaas@postgresql.org 92 : 1 : printf(_("\nOptions:\n"));
2502 peter_e@gmx.net 93 : 1 : printf(_(" -E, --endpos=LSN exit after receiving the specified LSN\n"));
3472 94 : 1 : printf(_(" -f, --file=FILE receive log into this file, - for stdout\n"));
3612 andres@anarazel.de 95 : 1 : printf(_(" -F --fsync-interval=SECS\n"
96 : : " time between fsyncs to the output file (default: %d)\n"), (fsync_interval / 1000));
3133 peter_e@gmx.net 97 : 1 : printf(_(" --if-not-exists do not error if slot already exists when creating a slot\n"));
3472 98 : 1 : printf(_(" -I, --startpos=LSN where in an existing slot should the streaming start\n"));
3680 rhaas@postgresql.org 99 : 1 : printf(_(" -n, --no-loop do not loop on connection lost\n"));
3472 peter_e@gmx.net 100 : 1 : printf(_(" -o, --option=NAME[=VALUE]\n"
101 : : " pass option NAME with optional value VALUE to the\n"
102 : : " output plugin\n"));
103 : 1 : printf(_(" -P, --plugin=PLUGIN use output plugin PLUGIN (default: %s)\n"), plugin);
104 : 1 : printf(_(" -s, --status-interval=SECS\n"
105 : : " time between status packets sent to server (default: %d)\n"), (standby_message_timeout / 1000));
106 : 1 : printf(_(" -S, --slot=SLOTNAME name of the logical replication slot\n"));
569 peter@eisentraut.org 107 : 1 : printf(_(" -t, --two-phase enable decoding of prepared transactions when creating a slot\n"));
3680 rhaas@postgresql.org 108 : 1 : printf(_(" -v, --verbose output verbose messages\n"));
109 : 1 : printf(_(" -V, --version output version information, then exit\n"));
110 : 1 : printf(_(" -?, --help show this help, then exit\n"));
111 : 1 : printf(_("\nConnection options:\n"));
112 : 1 : printf(_(" -d, --dbname=DBNAME database to connect to\n"));
113 : 1 : printf(_(" -h, --host=HOSTNAME database server host or socket directory\n"));
114 : 1 : printf(_(" -p, --port=PORT database server port number\n"));
115 : 1 : printf(_(" -U, --username=NAME connect as specified database user\n"));
116 : 1 : printf(_(" -w, --no-password never prompt for password\n"));
117 : 1 : printf(_(" -W, --password force password prompt (should happen automatically)\n"));
1507 peter@eisentraut.org 118 : 1 : printf(_("\nReport bugs to <%s>.\n"), PACKAGE_BUGREPORT);
119 : 1 : printf(_("%s home page: <%s>\n"), PACKAGE_NAME, PACKAGE_URL);
3680 rhaas@postgresql.org 120 : 1 : }
121 : :
122 : : /*
123 : : * Send a Standby Status Update message to server.
124 : : */
125 : : static bool
2607 tgl@sss.pgh.pa.us 126 : 25 : sendFeedback(PGconn *conn, TimestampTz now, bool force, bool replyRequested)
127 : : {
128 : : static XLogRecPtr last_written_lsn = InvalidXLogRecPtr;
129 : : static XLogRecPtr last_fsync_lsn = InvalidXLogRecPtr;
130 : :
131 : : char replybuf[1 + 8 + 8 + 8 + 8 + 1];
3680 rhaas@postgresql.org 132 : 25 : int len = 0;
133 : :
134 : : /*
135 : : * we normally don't want to send superfluous feedback, but if it's
136 : : * because of a timeout we need to, otherwise wal_sender_timeout will kill
137 : : * us.
138 : : */
139 [ - + ]: 25 : if (!force &&
3680 rhaas@postgresql.org 140 [ # # ]:UBC 0 : last_written_lsn == output_written_lsn &&
1432 noah@leadboat.com 141 [ # # ]: 0 : last_fsync_lsn == output_fsync_lsn)
3680 rhaas@postgresql.org 142 : 0 : return true;
143 : :
3680 rhaas@postgresql.org 144 [ - + ]:CBC 25 : if (verbose)
1840 peter@eisentraut.org 145 :UBC 0 : pg_log_info("confirming write up to %X/%X, flush to %X/%X (slot %s)",
146 : : LSN_FORMAT_ARGS(output_written_lsn),
147 : : LSN_FORMAT_ARGS(output_fsync_lsn),
148 : : replication_slot);
149 : :
3680 rhaas@postgresql.org 150 :CBC 25 : replybuf[len] = 'r';
151 : 25 : len += 1;
3631 bruce@momjian.us 152 : 25 : fe_sendint64(output_written_lsn, &replybuf[len]); /* write */
3680 rhaas@postgresql.org 153 : 25 : len += 8;
2489 tgl@sss.pgh.pa.us 154 : 25 : fe_sendint64(output_fsync_lsn, &replybuf[len]); /* flush */
3680 rhaas@postgresql.org 155 : 25 : len += 8;
3631 bruce@momjian.us 156 : 25 : fe_sendint64(InvalidXLogRecPtr, &replybuf[len]); /* apply */
3680 rhaas@postgresql.org 157 : 25 : len += 8;
3631 bruce@momjian.us 158 : 25 : fe_sendint64(now, &replybuf[len]); /* sendTime */
3680 rhaas@postgresql.org 159 : 25 : len += 8;
2489 tgl@sss.pgh.pa.us 160 : 25 : replybuf[len] = replyRequested ? 1 : 0; /* replyRequested */
3680 rhaas@postgresql.org 161 : 25 : len += 1;
162 : :
163 : 25 : startpos = output_written_lsn;
164 : 25 : last_written_lsn = output_written_lsn;
165 : 25 : last_fsync_lsn = output_fsync_lsn;
166 : :
167 [ + - - + ]: 25 : if (PQputCopyData(conn, replybuf, len) <= 0 || PQflush(conn))
168 : : {
1840 peter@eisentraut.org 169 :UBC 0 : pg_log_error("could not send feedback packet: %s",
170 : : PQerrorMessage(conn));
3680 rhaas@postgresql.org 171 : 0 : return false;
172 : : }
173 : :
3680 rhaas@postgresql.org 174 :CBC 25 : return true;
175 : : }
176 : :
177 : : static void
1933 peter@eisentraut.org 178 : 47 : disconnect_atexit(void)
179 : : {
3680 rhaas@postgresql.org 180 [ + + ]: 47 : if (conn != NULL)
181 : 24 : PQfinish(conn);
182 : 47 : }
183 : :
184 : : static bool
2607 tgl@sss.pgh.pa.us 185 : 24 : OutputFsync(TimestampTz now)
186 : : {
3680 rhaas@postgresql.org 187 : 24 : output_last_fsync = now;
188 : :
189 : 24 : output_fsync_lsn = output_written_lsn;
190 : :
191 [ - + ]: 24 : if (fsync_interval <= 0)
3680 rhaas@postgresql.org 192 :UBC 0 : return true;
193 : :
3622 heikki.linnakangas@i 194 [ + + ]:CBC 24 : if (!output_needs_fsync)
3680 rhaas@postgresql.org 195 : 18 : return true;
196 : :
3622 heikki.linnakangas@i 197 : 6 : output_needs_fsync = false;
198 : :
199 : : /* can only fsync if it's a regular file */
3204 andres@anarazel.de 200 [ + + ]: 6 : if (!output_isfile)
201 : 4 : return true;
202 : :
203 [ - + ]: 2 : if (fsync(outfd) != 0)
737 tgl@sss.pgh.pa.us 204 :UBC 0 : pg_fatal("could not fsync file \"%s\": %m", outfile);
205 : :
3680 rhaas@postgresql.org 206 :CBC 2 : return true;
207 : : }
208 : :
209 : : /*
210 : : * Start the log streaming
211 : : */
212 : : static void
3485 andres@anarazel.de 213 : 23 : StreamLogicalLog(void)
214 : : {
215 : : PGresult *res;
3680 rhaas@postgresql.org 216 : 23 : char *copybuf = NULL;
2607 tgl@sss.pgh.pa.us 217 : 23 : TimestampTz last_status = -1;
218 : : int i;
219 : : PQExpBuffer query;
220 : : XLogRecPtr cur_record_lsn;
221 : :
3680 rhaas@postgresql.org 222 : 23 : output_written_lsn = InvalidXLogRecPtr;
223 : 23 : output_fsync_lsn = InvalidXLogRecPtr;
269 michael@paquier.xyz 224 :GNC 23 : cur_record_lsn = InvalidXLogRecPtr;
225 : :
226 : : /*
227 : : * Connect in replication mode to the server
228 : : */
3680 rhaas@postgresql.org 229 [ - + ]:CBC 23 : if (!conn)
3680 rhaas@postgresql.org 230 :UBC 0 : conn = GetConnection();
3680 rhaas@postgresql.org 231 [ - + ]:CBC 23 : if (!conn)
232 : : /* Error message already written in GetConnection() */
3680 rhaas@postgresql.org 233 :UBC 0 : return;
234 : :
235 : : /*
236 : : * Start the replication
237 : : */
3680 rhaas@postgresql.org 238 [ - + ]:CBC 23 : if (verbose)
1840 peter@eisentraut.org 239 :UBC 0 : pg_log_info("starting log streaming at %X/%X (slot %s)",
240 : : LSN_FORMAT_ARGS(startpos),
241 : : replication_slot);
242 : :
243 : : /* Initiate the replication stream at specified location */
875 tgl@sss.pgh.pa.us 244 :CBC 23 : query = createPQExpBuffer();
3680 rhaas@postgresql.org 245 : 23 : appendPQExpBuffer(query, "START_REPLICATION SLOT \"%s\" LOGICAL %X/%X",
1146 peter@eisentraut.org 246 : 23 : replication_slot, LSN_FORMAT_ARGS(startpos));
247 : :
248 : : /* print options if there are any */
3680 rhaas@postgresql.org 249 [ + + ]: 23 : if (noptions)
250 : 20 : appendPQExpBufferStr(query, " (");
251 : :
252 [ + + ]: 63 : for (i = 0; i < noptions; i++)
253 : : {
254 : : /* separator */
255 [ + + ]: 40 : if (i > 0)
256 : 20 : appendPQExpBufferStr(query, ", ");
257 : :
258 : : /* write option name */
259 : 40 : appendPQExpBuffer(query, "\"%s\"", options[(i * 2)]);
260 : :
261 : : /* write option value if specified */
262 [ + - ]: 40 : if (options[(i * 2) + 1] != NULL)
263 : 40 : appendPQExpBuffer(query, " '%s'", options[(i * 2) + 1]);
264 : : }
265 : :
266 [ + + ]: 23 : if (noptions)
267 : 20 : appendPQExpBufferChar(query, ')');
268 : :
269 : 23 : res = PQexec(conn, query->data);
270 [ + + ]: 23 : if (PQresultStatus(res) != PGRES_COPY_BOTH)
271 : : {
1840 peter@eisentraut.org 272 : 6 : pg_log_error("could not send replication command \"%s\": %s",
273 : : query->data, PQresultErrorMessage(res));
3680 rhaas@postgresql.org 274 : 6 : PQclear(res);
275 : 6 : goto error;
276 : : }
277 : 17 : PQclear(res);
278 : 17 : resetPQExpBuffer(query);
279 : :
280 [ - + ]: 17 : if (verbose)
1840 peter@eisentraut.org 281 :UBC 0 : pg_log_info("streaming initiated");
282 : :
3680 rhaas@postgresql.org 283 [ + + ]:CBC 857 : while (!time_to_abort)
284 : : {
285 : : int r;
286 : : int bytes_left;
287 : : int bytes_written;
288 : : TimestampTz now;
289 : : int hdr_len;
290 : :
269 michael@paquier.xyz 291 :GNC 856 : cur_record_lsn = InvalidXLogRecPtr;
292 : :
3680 rhaas@postgresql.org 293 [ + + ]:CBC 856 : if (copybuf != NULL)
294 : : {
295 : 496 : PQfreemem(copybuf);
296 : 496 : copybuf = NULL;
297 : : }
298 : :
299 : : /*
300 : : * Potentially send a status message to the primary.
301 : : */
302 : 856 : now = feGetCurrentTimestamp();
303 : :
304 [ + + + + ]: 1695 : if (outfd != -1 &&
305 : 839 : feTimestampDifferenceExceeds(output_last_fsync, now,
306 : : fsync_interval))
307 : : {
308 [ - + ]: 16 : if (!OutputFsync(now))
3680 rhaas@postgresql.org 309 :UBC 0 : goto error;
310 : : }
311 : :
3680 rhaas@postgresql.org 312 [ + - + + ]:CBC 1712 : if (standby_message_timeout > 0 &&
313 : 856 : feTimestampDifferenceExceeds(last_status, now,
314 : : standby_message_timeout))
315 : : {
316 : : /* Time to send feedback! */
317 [ - + ]: 17 : if (!sendFeedback(conn, now, true, false))
3680 rhaas@postgresql.org 318 :UBC 0 : goto error;
319 : :
3680 rhaas@postgresql.org 320 :CBC 17 : last_status = now;
321 : : }
322 : :
323 : : /* got SIGHUP, close output file */
3622 heikki.linnakangas@i 324 [ + + - + : 856 : if (outfd != -1 && output_reopen && strcmp(outfile, "-") != 0)
- - ]
325 : : {
3622 heikki.linnakangas@i 326 :UBC 0 : now = feGetCurrentTimestamp();
327 [ # # ]: 0 : if (!OutputFsync(now))
328 : 0 : goto error;
329 : 0 : close(outfd);
330 : 0 : outfd = -1;
331 : : }
3622 heikki.linnakangas@i 332 :CBC 856 : output_reopen = false;
333 : :
334 : : /* open the output file, if not open yet */
3621 335 [ + + ]: 856 : if (outfd == -1)
336 : : {
337 : : struct stat statbuf;
338 : :
339 [ + - ]: 17 : if (strcmp(outfile, "-") == 0)
340 : 17 : outfd = fileno(stdout);
341 : : else
3621 heikki.linnakangas@i 342 :UBC 0 : outfd = open(outfile, O_CREAT | O_APPEND | O_WRONLY | PG_BINARY,
343 : : S_IRUSR | S_IWUSR);
3621 heikki.linnakangas@i 344 [ - + ]:CBC 17 : if (outfd == -1)
345 : : {
1840 peter@eisentraut.org 346 :UBC 0 : pg_log_error("could not open log file \"%s\": %m", outfile);
3621 heikki.linnakangas@i 347 : 0 : goto error;
348 : : }
349 : :
3204 andres@anarazel.de 350 [ - + ]:CBC 17 : if (fstat(outfd, &statbuf) != 0)
351 : : {
1840 peter@eisentraut.org 352 :UBC 0 : pg_log_error("could not stat file \"%s\": %m", outfile);
990 michael@paquier.xyz 353 : 0 : goto error;
354 : : }
355 : :
3204 andres@anarazel.de 356 [ + + + - ]:CBC 17 : output_isfile = S_ISREG(statbuf.st_mode) && !isatty(outfd);
357 : : }
358 : :
3680 rhaas@postgresql.org 359 : 856 : r = PQgetCopyData(conn, ©buf, 1);
360 [ + + ]: 856 : if (r == 0)
361 : 343 : {
362 : : /*
363 : : * In async mode, and no data available. We block on reading but
364 : : * not more than the specified timeout, so that we can send a
365 : : * response back to the client.
366 : : */
367 : : fd_set input_mask;
2607 tgl@sss.pgh.pa.us 368 : 346 : TimestampTz message_target = 0;
369 : 346 : TimestampTz fsync_target = 0;
370 : : struct timeval timeout;
3680 rhaas@postgresql.org 371 : 346 : struct timeval *timeoutptr = NULL;
372 : :
2959 peter_e@gmx.net 373 [ - + ]: 346 : if (PQsocket(conn) < 0)
374 : : {
1840 peter@eisentraut.org 375 :UBC 0 : pg_log_error("invalid socket: %s", PQerrorMessage(conn));
2959 peter_e@gmx.net 376 : 0 : goto error;
377 : : }
378 : :
3680 rhaas@postgresql.org 379 [ + + ]:CBC 5882 : FD_ZERO(&input_mask);
380 : 346 : FD_SET(PQsocket(conn), &input_mask);
381 : :
382 : : /* Compute when we need to wakeup to send a keepalive message. */
383 [ + - ]: 346 : if (standby_message_timeout)
384 : 346 : message_target = last_status + (standby_message_timeout - 1) *
385 : : ((int64) 1000);
386 : :
387 : : /* Compute when we need to wakeup to fsync the output file. */
3622 heikki.linnakangas@i 388 [ + - + + ]: 346 : if (fsync_interval > 0 && output_needs_fsync)
3680 rhaas@postgresql.org 389 : 136 : fsync_target = output_last_fsync + (fsync_interval - 1) *
390 : : ((int64) 1000);
391 : :
392 : : /* Now compute when to wakeup. */
393 [ - + - - ]: 346 : if (message_target > 0 || fsync_target > 0)
394 : : {
395 : : TimestampTz targettime;
396 : : long secs;
397 : : int usecs;
398 : :
399 : 346 : targettime = message_target;
400 : :
401 [ + + - + ]: 346 : if (fsync_target > 0 && fsync_target < targettime)
3680 rhaas@postgresql.org 402 :UBC 0 : targettime = fsync_target;
403 : :
3680 rhaas@postgresql.org 404 :CBC 346 : feTimestampDifference(now,
405 : : targettime,
406 : : &secs,
407 : : &usecs);
408 [ - + ]: 346 : if (secs <= 0)
3680 rhaas@postgresql.org 409 :UBC 0 : timeout.tv_sec = 1; /* Always sleep at least 1 sec */
410 : : else
3680 rhaas@postgresql.org 411 :CBC 346 : timeout.tv_sec = secs;
412 : 346 : timeout.tv_usec = usecs;
413 : 346 : timeoutptr = &timeout;
414 : : }
415 : :
416 : 346 : r = select(PQsocket(conn) + 1, &input_mask, NULL, NULL, timeoutptr);
417 [ + - + + : 346 : if (r == 0 || (r < 0 && errno == EINTR))
+ - ]
418 : : {
419 : : /*
420 : : * Got a timeout or signal. Continue the loop and either
421 : : * deliver a status packet to the server or just go back into
422 : : * blocking.
423 : : */
424 : 344 : continue;
425 : : }
426 [ - + ]: 345 : else if (r < 0)
427 : : {
1087 peter@eisentraut.org 428 :UBC 0 : pg_log_error("%s() failed: %m", "select");
3680 rhaas@postgresql.org 429 : 0 : goto error;
430 : : }
431 : :
432 : : /* Else there is actually data on the socket */
3680 rhaas@postgresql.org 433 [ + + ]:CBC 345 : if (PQconsumeInput(conn) == 0)
434 : : {
1840 peter@eisentraut.org 435 : 2 : pg_log_error("could not receive data from WAL stream: %s",
436 : : PQerrorMessage(conn));
3680 rhaas@postgresql.org 437 : 2 : goto error;
438 : : }
439 : 343 : continue;
440 : : }
441 : :
442 : : /* End of copy stream */
443 [ + + ]: 510 : if (r == -1)
444 : 14 : break;
445 : :
446 : : /* Failure while reading the copy stream */
447 [ - + ]: 503 : if (r == -2)
448 : : {
1840 peter@eisentraut.org 449 :UBC 0 : pg_log_error("could not read COPY data: %s",
450 : : PQerrorMessage(conn));
3680 rhaas@postgresql.org 451 : 0 : goto error;
452 : : }
453 : :
454 : : /* Check the message type. */
3680 rhaas@postgresql.org 455 [ + + ]:CBC 503 : if (copybuf[0] == 'k')
456 : 410 : {
457 : : int pos;
458 : : bool replyRequested;
459 : : XLogRecPtr walEnd;
2657 simon@2ndQuadrant.co 460 : 412 : bool endposReached = false;
461 : :
462 : : /*
463 : : * Parse the keepalive message, enclosed in the CopyData message.
464 : : * We just check if the server requested a reply, and ignore the
465 : : * rest.
466 : : */
3680 rhaas@postgresql.org 467 : 412 : pos = 1; /* skip msgtype 'k' */
468 : 412 : walEnd = fe_recvint64(©buf[pos]);
469 : 412 : output_written_lsn = Max(walEnd, output_written_lsn);
470 : :
471 : 412 : pos += 8; /* read walEnd */
472 : :
473 : 412 : pos += 8; /* skip sendTime */
474 : :
475 [ - + ]: 412 : if (r < pos + 1)
476 : : {
1840 peter@eisentraut.org 477 :UBC 0 : pg_log_error("streaming header too small: %d", r);
3680 rhaas@postgresql.org 478 : 0 : goto error;
479 : : }
3680 rhaas@postgresql.org 480 :CBC 412 : replyRequested = copybuf[pos];
481 : :
2657 simon@2ndQuadrant.co 482 [ + + + + ]: 412 : if (endpos != InvalidXLogRecPtr && walEnd >= endpos)
483 : : {
484 : : /*
485 : : * If there's nothing to read on the socket until a keepalive
486 : : * we know that the server has nothing to send us; and if
487 : : * walEnd has passed endpos, we know nothing else can have
488 : : * committed before endpos. So we can bail out now.
489 : : */
490 : 2 : endposReached = true;
491 : : }
492 : :
493 : : /* Send a reply, if necessary */
494 [ + + + + ]: 412 : if (replyRequested || endposReached)
495 : : {
496 [ - + ]: 3 : if (!flushAndSendFeedback(conn, &now))
3680 rhaas@postgresql.org 497 :UBC 0 : goto error;
3680 rhaas@postgresql.org 498 :CBC 3 : last_status = now;
499 : : }
500 : :
2657 simon@2ndQuadrant.co 501 [ + + ]: 412 : if (endposReached)
502 : : {
269 michael@paquier.xyz 503 :GNC 2 : stop_reason = STREAM_STOP_KEEPALIVE;
2657 simon@2ndQuadrant.co 504 :CBC 2 : time_to_abort = true;
505 : 2 : break;
506 : : }
507 : :
3680 rhaas@postgresql.org 508 : 410 : continue;
509 : : }
510 [ - + ]: 91 : else if (copybuf[0] != 'w')
511 : : {
1840 peter@eisentraut.org 512 :UBC 0 : pg_log_error("unrecognized streaming header: \"%c\"",
513 : : copybuf[0]);
3680 rhaas@postgresql.org 514 : 0 : goto error;
515 : : }
516 : :
517 : : /*
518 : : * Read the header of the XLogData message, enclosed in the CopyData
519 : : * message. We only need the WAL location field (dataStart), the rest
520 : : * of the header is ignored.
521 : : */
3680 rhaas@postgresql.org 522 :CBC 91 : hdr_len = 1; /* msgtype 'w' */
523 : 91 : hdr_len += 8; /* dataStart */
524 : 91 : hdr_len += 8; /* walEnd */
525 : 91 : hdr_len += 8; /* sendTime */
526 [ - + ]: 91 : if (r < hdr_len + 1)
527 : : {
1840 peter@eisentraut.org 528 :UBC 0 : pg_log_error("streaming header too small: %d", r);
3680 rhaas@postgresql.org 529 : 0 : goto error;
530 : : }
531 : :
532 : : /* Extract WAL location for this block */
2657 simon@2ndQuadrant.co 533 :CBC 91 : cur_record_lsn = fe_recvint64(©buf[1]);
534 : :
535 [ + + - + ]: 91 : if (endpos != InvalidXLogRecPtr && cur_record_lsn > endpos)
536 : : {
537 : : /*
538 : : * We've read past our endpoint, so prepare to go away being
539 : : * cautious about what happens to our output data.
540 : : */
2657 simon@2ndQuadrant.co 541 [ # # ]:UBC 0 : if (!flushAndSendFeedback(conn, &now))
542 : 0 : goto error;
269 michael@paquier.xyz 543 :UNC 0 : stop_reason = STREAM_STOP_END_OF_WAL;
2657 simon@2ndQuadrant.co 544 :UBC 0 : time_to_abort = true;
545 : 0 : break;
546 : : }
547 : :
2657 simon@2ndQuadrant.co 548 :CBC 91 : output_written_lsn = Max(cur_record_lsn, output_written_lsn);
549 : :
3680 rhaas@postgresql.org 550 : 91 : bytes_left = r - hdr_len;
551 : 91 : bytes_written = 0;
552 : :
553 : : /* signal that a fsync is needed */
3622 heikki.linnakangas@i 554 : 91 : output_needs_fsync = true;
555 : :
3680 rhaas@postgresql.org 556 [ + + ]: 182 : while (bytes_left)
557 : : {
558 : : int ret;
559 : :
560 : 182 : ret = write(outfd,
561 : 91 : copybuf + hdr_len + bytes_written,
562 : : bytes_left);
563 : :
564 [ - + ]: 91 : if (ret < 0)
565 : : {
879 peter@eisentraut.org 566 :UBC 0 : pg_log_error("could not write %d bytes to log file \"%s\": %m",
567 : : bytes_left, outfile);
3680 rhaas@postgresql.org 568 : 0 : goto error;
569 : : }
570 : :
571 : : /* Write was successful, advance our position */
3680 rhaas@postgresql.org 572 :CBC 91 : bytes_written += ret;
573 : 91 : bytes_left -= ret;
574 : : }
575 : :
576 [ - + ]: 91 : if (write(outfd, "\n", 1) != 1)
577 : : {
879 peter@eisentraut.org 578 :UBC 0 : pg_log_error("could not write %d bytes to log file \"%s\": %m",
579 : : 1, outfile);
3680 rhaas@postgresql.org 580 : 0 : goto error;
581 : : }
582 : :
2657 simon@2ndQuadrant.co 583 [ + + + + ]:CBC 91 : if (endpos != InvalidXLogRecPtr && cur_record_lsn == endpos)
584 : : {
585 : : /* endpos was exactly the record we just processed, we're done */
586 [ - + ]: 5 : if (!flushAndSendFeedback(conn, &now))
2657 simon@2ndQuadrant.co 587 :UBC 0 : goto error;
269 michael@paquier.xyz 588 :GNC 5 : stop_reason = STREAM_STOP_END_OF_WAL;
2657 simon@2ndQuadrant.co 589 :CBC 5 : time_to_abort = true;
590 : 5 : break;
591 : : }
592 : : }
593 : :
594 : : /* Clean up connection state if stream has been aborted */
269 michael@paquier.xyz 595 [ + + ]:GNC 15 : if (time_to_abort)
596 : 8 : prepareToTerminate(conn, endpos, stop_reason, cur_record_lsn);
597 : :
3680 rhaas@postgresql.org 598 :CBC 15 : res = PQgetResult(conn);
2657 simon@2ndQuadrant.co 599 [ + + ]: 15 : if (PQresultStatus(res) == PGRES_COPY_OUT)
600 : : {
1432 noah@leadboat.com 601 : 8 : PQclear(res);
602 : :
603 : : /*
604 : : * We're doing a client-initiated clean exit and have sent CopyDone to
605 : : * the server. Drain any messages, so we don't miss a last-minute
606 : : * ErrorResponse. The walsender stops generating XLogData records once
607 : : * it sees CopyDone, so expect this to finish quickly. After CopyDone,
608 : : * it's too late for sendFeedback(), even if this were to take a long
609 : : * time. Hence, use synchronous-mode PQgetCopyData().
610 : : */
611 : : while (1)
612 : 52 : {
613 : : int r;
614 : :
615 [ + + ]: 60 : if (copybuf != NULL)
616 : : {
617 : 59 : PQfreemem(copybuf);
618 : 59 : copybuf = NULL;
619 : : }
620 : 60 : r = PQgetCopyData(conn, ©buf, 0);
621 [ + + ]: 60 : if (r == -1)
622 : 8 : break;
623 [ - + ]: 52 : if (r == -2)
624 : : {
1432 noah@leadboat.com 625 :UBC 0 : pg_log_error("could not read COPY data: %s",
626 : : PQerrorMessage(conn));
627 : 0 : time_to_abort = false; /* unclean exit */
628 : 0 : goto error;
629 : : }
630 : : }
631 : :
1432 noah@leadboat.com 632 :CBC 8 : res = PQgetResult(conn);
633 : : }
634 [ + + ]: 15 : if (PQresultStatus(res) != PGRES_COMMAND_OK)
635 : : {
1840 peter@eisentraut.org 636 : 6 : pg_log_error("unexpected termination of replication stream: %s",
637 : : PQresultErrorMessage(res));
3680 rhaas@postgresql.org 638 : 6 : goto error;
639 : : }
640 : 9 : PQclear(res);
641 : :
642 [ + - - + ]: 9 : if (outfd != -1 && strcmp(outfile, "-") != 0)
643 : : {
2607 tgl@sss.pgh.pa.us 644 :UBC 0 : TimestampTz t = feGetCurrentTimestamp();
645 : :
646 : : /* no need to jump to error on failure here, we're finishing anyway */
3680 rhaas@postgresql.org 647 : 0 : OutputFsync(t);
648 : :
649 [ # # ]: 0 : if (close(outfd) != 0)
1840 peter@eisentraut.org 650 : 0 : pg_log_error("could not close file \"%s\": %m", outfile);
651 : : }
3680 rhaas@postgresql.org 652 :CBC 9 : outfd = -1;
653 : 23 : error:
3632 heikki.linnakangas@i 654 [ - + ]: 23 : if (copybuf != NULL)
655 : : {
3632 heikki.linnakangas@i 656 :UBC 0 : PQfreemem(copybuf);
657 : 0 : copybuf = NULL;
658 : : }
3680 rhaas@postgresql.org 659 :CBC 23 : destroyPQExpBuffer(query);
660 : 23 : PQfinish(conn);
661 : 23 : conn = NULL;
662 : : }
663 : :
664 : : /*
665 : : * Unfortunately we can't do sensible signal handling on windows...
666 : : */
667 : : #ifndef WIN32
668 : :
669 : : /*
670 : : * When SIGINT/SIGTERM are caught, just tell the system to exit at the next
671 : : * possible moment.
672 : : */
673 : : static void
578 tgl@sss.pgh.pa.us 674 : 1 : sigexit_handler(SIGNAL_ARGS)
675 : : {
269 michael@paquier.xyz 676 :GNC 1 : stop_reason = STREAM_STOP_SIGNAL;
3680 rhaas@postgresql.org 677 :CBC 1 : time_to_abort = true;
678 : 1 : }
679 : :
680 : : /*
681 : : * Trigger the output file to be reopened.
682 : : */
683 : : static void
578 tgl@sss.pgh.pa.us 684 :UBC 0 : sighup_handler(SIGNAL_ARGS)
685 : : {
3680 rhaas@postgresql.org 686 : 0 : output_reopen = true;
687 : 0 : }
688 : : #endif
689 : :
690 : :
691 : : int
3680 rhaas@postgresql.org 692 :CBC 55 : main(int argc, char **argv)
693 : : {
694 : : static struct option long_options[] = {
695 : : /* general options */
696 : : {"file", required_argument, NULL, 'f'},
697 : : {"fsync-interval", required_argument, NULL, 'F'},
698 : : {"no-loop", no_argument, NULL, 'n'},
699 : : {"verbose", no_argument, NULL, 'v'},
700 : : {"two-phase", no_argument, NULL, 't'},
701 : : {"version", no_argument, NULL, 'V'},
702 : : {"help", no_argument, NULL, '?'},
703 : : /* connection options */
704 : : {"dbname", required_argument, NULL, 'd'},
705 : : {"host", required_argument, NULL, 'h'},
706 : : {"port", required_argument, NULL, 'p'},
707 : : {"username", required_argument, NULL, 'U'},
708 : : {"no-password", no_argument, NULL, 'w'},
709 : : {"password", no_argument, NULL, 'W'},
710 : : /* replication options */
711 : : {"startpos", required_argument, NULL, 'I'},
712 : : {"endpos", required_argument, NULL, 'E'},
713 : : {"option", required_argument, NULL, 'o'},
714 : : {"plugin", required_argument, NULL, 'P'},
715 : : {"status-interval", required_argument, NULL, 's'},
716 : : {"slot", required_argument, NULL, 'S'},
717 : : /* action */
718 : : {"create-slot", no_argument, NULL, 1},
719 : : {"start", no_argument, NULL, 2},
720 : : {"drop-slot", no_argument, NULL, 3},
721 : : {"if-not-exists", no_argument, NULL, 4},
722 : : {NULL, 0, NULL, 0}
723 : : };
724 : : int c;
725 : : int option_index;
726 : : uint32 hi,
727 : : lo;
728 : : char *db_name;
729 : :
1840 peter@eisentraut.org 730 : 55 : pg_logging_init(argv[0]);
3680 rhaas@postgresql.org 731 : 55 : progname = get_progname(argv[0]);
3030 alvherre@alvh.no-ip. 732 : 55 : set_pglocale_pgservice(argv[0], PG_TEXTDOMAIN("pg_basebackup"));
733 : :
3680 rhaas@postgresql.org 734 [ + + ]: 55 : if (argc > 1)
735 : : {
736 [ + + - + ]: 54 : if (strcmp(argv[1], "--help") == 0 || strcmp(argv[1], "-?") == 0)
737 : : {
738 : 1 : usage();
739 : 1 : exit(0);
740 : : }
741 [ + - ]: 53 : else if (strcmp(argv[1], "-V") == 0 ||
742 [ + + ]: 53 : strcmp(argv[1], "--version") == 0)
743 : : {
744 : 1 : puts("pg_recvlogical (PostgreSQL) " PG_VERSION);
745 : 1 : exit(0);
746 : : }
747 : : }
748 : :
489 peter@eisentraut.org 749 : 321 : while ((c = getopt_long(argc, argv, "E:f:F:ntvd:h:p:U:wWI:o:P:s:S:",
3680 rhaas@postgresql.org 750 [ + + ]: 321 : long_options, &option_index)) != -1)
751 : : {
752 [ + - + + : 269 : switch (c)
- + - - -
- - - + +
+ - + + +
+ - + ]
753 : : {
754 : : /* general options */
755 : 24 : case 'f':
756 : 24 : outfile = pg_strdup(optarg);
757 : 24 : break;
3612 andres@anarazel.de 758 :UBC 0 : case 'F':
995 michael@paquier.xyz 759 [ # # ]: 0 : if (!option_parse_int(optarg, "-F/--fsync-interval", 0,
760 : : INT_MAX / 1000,
761 : : &fsync_interval))
3612 andres@anarazel.de 762 : 0 : exit(1);
995 michael@paquier.xyz 763 : 0 : fsync_interval *= 1000;
3612 andres@anarazel.de 764 : 0 : break;
3680 rhaas@postgresql.org 765 :CBC 23 : case 'n':
766 : 23 : noloop = 1;
767 : 23 : break;
1019 akapila@postgresql.o 768 : 2 : case 't':
769 : 2 : two_phase = true;
770 : 2 : break;
489 peter@eisentraut.org 771 :UBC 0 : case 'v':
772 : 0 : verbose++;
773 : 0 : break;
774 : : /* connection options */
3680 rhaas@postgresql.org 775 :CBC 50 : case 'd':
776 : 50 : dbname = pg_strdup(optarg);
777 : 50 : break;
3680 rhaas@postgresql.org 778 :UBC 0 : case 'h':
779 : 0 : dbhost = pg_strdup(optarg);
780 : 0 : break;
781 : 0 : case 'p':
782 : 0 : dbport = pg_strdup(optarg);
783 : 0 : break;
784 : 0 : case 'U':
785 : 0 : dbuser = pg_strdup(optarg);
786 : 0 : break;
787 : 0 : case 'w':
788 : 0 : dbgetpassword = -1;
789 : 0 : break;
790 : 0 : case 'W':
791 : 0 : dbgetpassword = 1;
792 : 0 : break;
793 : : /* replication options */
3612 andres@anarazel.de 794 : 0 : case 'I':
795 [ # # ]: 0 : if (sscanf(optarg, "%X/%X", &hi, &lo) != 2)
737 tgl@sss.pgh.pa.us 796 : 0 : pg_fatal("could not parse start position \"%s\"", optarg);
3612 andres@anarazel.de 797 : 0 : startpos = ((uint64) hi) << 32 | lo;
798 : 0 : break;
2657 simon@2ndQuadrant.co 799 :CBC 8 : case 'E':
800 [ - + ]: 8 : if (sscanf(optarg, "%X/%X", &hi, &lo) != 2)
737 tgl@sss.pgh.pa.us 801 :UBC 0 : pg_fatal("could not parse end position \"%s\"", optarg);
2657 simon@2ndQuadrant.co 802 :CBC 8 : endpos = ((uint64) hi) << 32 | lo;
803 : 8 : break;
3680 rhaas@postgresql.org 804 : 40 : case 'o':
805 : : {
3631 bruce@momjian.us 806 : 40 : char *data = pg_strdup(optarg);
807 : 40 : char *val = strchr(data, '=');
808 : :
3680 rhaas@postgresql.org 809 [ + - ]: 40 : if (val != NULL)
810 : : {
811 : : /* remove =; separate data from val */
812 : 40 : *val = '\0';
813 : 40 : val++;
814 : : }
815 : :
816 : 40 : noptions += 1;
3631 bruce@momjian.us 817 : 40 : options = pg_realloc(options, sizeof(char *) * noptions * 2);
818 : :
3680 rhaas@postgresql.org 819 : 40 : options[(noptions - 1) * 2] = data;
820 : 40 : options[(noptions - 1) * 2 + 1] = val;
821 : : }
822 : :
823 : 40 : break;
824 : 21 : case 'P':
825 : 21 : plugin = pg_strdup(optarg);
826 : 21 : break;
3680 rhaas@postgresql.org 827 :UBC 0 : case 's':
995 michael@paquier.xyz 828 [ # # ]: 0 : if (!option_parse_int(optarg, "-s/--status-interval", 0,
829 : : INT_MAX / 1000,
830 : : &standby_message_timeout))
3680 rhaas@postgresql.org 831 : 0 : exit(1);
995 michael@paquier.xyz 832 : 0 : standby_message_timeout *= 1000;
3680 rhaas@postgresql.org 833 : 0 : break;
3680 rhaas@postgresql.org 834 :CBC 51 : case 'S':
835 : 51 : replication_slot = pg_strdup(optarg);
836 : 51 : break;
837 : : /* action */
838 : 23 : case 1:
839 : 23 : do_create_slot = true;
840 : 23 : break;
841 : 25 : case 2:
842 : 25 : do_start_slot = true;
843 : 25 : break;
844 : 1 : case 3:
845 : 1 : do_drop_slot = true;
846 : 1 : break;
3199 andres@anarazel.de 847 :UBC 0 : case 4:
848 : 0 : slot_exists_ok = true;
849 : 0 : break;
850 : :
3680 rhaas@postgresql.org 851 :CBC 1 : default:
852 : : /* getopt_long already emitted a complaint */
737 tgl@sss.pgh.pa.us 853 : 1 : pg_log_error_hint("Try \"%s --help\" for more information.", progname);
3680 rhaas@postgresql.org 854 : 1 : exit(1);
855 : : }
856 : : }
857 : :
858 : : /*
859 : : * Any non-option arguments?
860 : : */
861 [ - + ]: 52 : if (optind < argc)
862 : : {
1840 peter@eisentraut.org 863 :UBC 0 : pg_log_error("too many command-line arguments (first is \"%s\")",
864 : : argv[optind]);
737 tgl@sss.pgh.pa.us 865 : 0 : pg_log_error_hint("Try \"%s --help\" for more information.", progname);
3680 rhaas@postgresql.org 866 : 0 : exit(1);
867 : : }
868 : :
869 : : /*
870 : : * Required arguments
871 : : */
3680 rhaas@postgresql.org 872 [ + + ]:CBC 52 : if (replication_slot == NULL)
873 : : {
1840 peter@eisentraut.org 874 : 1 : pg_log_error("no slot specified");
737 tgl@sss.pgh.pa.us 875 : 1 : pg_log_error_hint("Try \"%s --help\" for more information.", progname);
3680 rhaas@postgresql.org 876 : 1 : exit(1);
877 : : }
878 : :
879 [ + + + + ]: 51 : if (do_start_slot && outfile == NULL)
880 : : {
1840 peter@eisentraut.org 881 : 1 : pg_log_error("no target file specified");
737 tgl@sss.pgh.pa.us 882 : 1 : pg_log_error_hint("Try \"%s --help\" for more information.", progname);
3680 rhaas@postgresql.org 883 : 1 : exit(1);
884 : : }
885 : :
886 [ + + + + ]: 50 : if (!do_drop_slot && dbname == NULL)
887 : : {
1840 peter@eisentraut.org 888 : 1 : pg_log_error("no database specified");
737 tgl@sss.pgh.pa.us 889 : 1 : pg_log_error_hint("Try \"%s --help\" for more information.", progname);
3680 rhaas@postgresql.org 890 : 1 : exit(1);
891 : : }
892 : :
893 [ + + + + : 49 : if (!do_drop_slot && !do_create_slot && !do_start_slot)
+ + ]
894 : : {
1840 peter@eisentraut.org 895 : 1 : pg_log_error("at least one action needs to be specified");
737 tgl@sss.pgh.pa.us 896 : 1 : pg_log_error_hint("Try \"%s --help\" for more information.", progname);
3680 rhaas@postgresql.org 897 : 1 : exit(1);
898 : : }
899 : :
900 [ + + + - : 48 : if (do_drop_slot && (do_create_slot || do_start_slot))
- + ]
901 : : {
1840 peter@eisentraut.org 902 :UBC 0 : pg_log_error("cannot use --create-slot or --start together with --drop-slot");
737 tgl@sss.pgh.pa.us 903 : 0 : pg_log_error_hint("Try \"%s --help\" for more information.", progname);
3680 rhaas@postgresql.org 904 : 0 : exit(1);
905 : : }
906 : :
3533 andres@anarazel.de 907 [ - + - - :CBC 48 : if (startpos != InvalidXLogRecPtr && (do_create_slot || do_drop_slot))
- - ]
908 : : {
1840 peter@eisentraut.org 909 :UBC 0 : pg_log_error("cannot use --create-slot or --drop-slot together with --startpos");
737 tgl@sss.pgh.pa.us 910 : 0 : pg_log_error_hint("Try \"%s --help\" for more information.", progname);
3680 rhaas@postgresql.org 911 : 0 : exit(1);
912 : : }
913 : :
2657 simon@2ndQuadrant.co 914 [ + + - + ]:CBC 48 : if (endpos != InvalidXLogRecPtr && !do_start_slot)
915 : : {
1840 peter@eisentraut.org 916 :UBC 0 : pg_log_error("--endpos may only be specified with --start");
737 tgl@sss.pgh.pa.us 917 : 0 : pg_log_error_hint("Try \"%s --help\" for more information.", progname);
2657 simon@2ndQuadrant.co 918 : 0 : exit(1);
919 : : }
920 : :
1019 akapila@postgresql.o 921 [ + + + + ]:CBC 48 : if (two_phase && !do_create_slot)
922 : : {
923 : 1 : pg_log_error("--two-phase may only be specified with --create-slot");
737 tgl@sss.pgh.pa.us 924 : 1 : pg_log_error_hint("Try \"%s --help\" for more information.", progname);
1019 akapila@postgresql.o 925 : 1 : exit(1);
926 : : }
927 : :
928 : : /*
929 : : * Obtain a connection to server. Notably, if we need a password, we want
930 : : * to collect it from the user immediately.
931 : : */
3483 andres@anarazel.de 932 : 47 : conn = GetConnection();
933 [ - + ]: 47 : if (!conn)
934 : : /* Error message already written in GetConnection() */
3483 andres@anarazel.de 935 :UBC 0 : exit(1);
1933 peter@eisentraut.org 936 :CBC 47 : atexit(disconnect_atexit);
937 : :
938 : : /*
939 : : * Trap signals. (Don't do this until after the initial password prompt,
940 : : * if one is needed, in GetConnection.)
941 : : */
942 : : #ifndef WIN32
578 dgustafsson@postgres 943 : 47 : pqsignal(SIGINT, sigexit_handler);
944 : 47 : pqsignal(SIGTERM, sigexit_handler);
875 tgl@sss.pgh.pa.us 945 : 47 : pqsignal(SIGHUP, sighup_handler);
946 : : #endif
947 : :
948 : : /*
949 : : * Run IDENTIFY_SYSTEM to make sure we connected using a database specific
950 : : * replication connection.
951 : : */
3483 andres@anarazel.de 952 [ - + ]: 47 : if (!RunIdentifySystem(conn, NULL, NULL, NULL, &db_name))
1933 peter@eisentraut.org 953 :UBC 0 : exit(1);
954 : :
3483 andres@anarazel.de 955 [ - + ]:CBC 47 : if (db_name == NULL)
737 tgl@sss.pgh.pa.us 956 :UBC 0 : pg_fatal("could not establish database-specific replication connection");
957 : :
958 : : /*
959 : : * Set umask so that directories/files are created with the same
960 : : * permissions as directories/files in the source data directory.
961 : : *
962 : : * pg_mode_mask is set to owner-only by default and then updated in
963 : : * GetConnection() where we get the mode from the server-side with
964 : : * RetrieveDataDirCreatePerm() and then call SetDataDirectoryCreatePerm().
965 : : */
2199 sfrost@snowman.net 966 :CBC 47 : umask(pg_mode_mask);
967 : :
968 : : /* Drop a replication slot. */
3680 rhaas@postgresql.org 969 [ + + ]: 47 : if (do_drop_slot)
970 : : {
971 [ - + ]: 1 : if (verbose)
1840 peter@eisentraut.org 972 :UBC 0 : pg_log_info("dropping replication slot \"%s\"", replication_slot);
973 : :
3483 andres@anarazel.de 974 [ - + ]:CBC 1 : if (!DropReplicationSlot(conn, replication_slot))
1933 peter@eisentraut.org 975 :UBC 0 : exit(1);
976 : : }
977 : :
978 : : /* Create a replication slot. */
3680 rhaas@postgresql.org 979 [ + + ]:CBC 47 : if (do_create_slot)
980 : : {
981 [ - + ]: 23 : if (verbose)
1840 peter@eisentraut.org 982 :UBC 0 : pg_log_info("creating replication slot \"%s\"", replication_slot);
983 : :
2392 peter_e@gmx.net 984 [ - + ]:CBC 23 : if (!CreateReplicationSlot(conn, replication_slot, plugin, false,
985 : : false, false, slot_exists_ok, two_phase))
1933 peter@eisentraut.org 986 :UBC 0 : exit(1);
3199 andres@anarazel.de 987 :CBC 23 : startpos = InvalidXLogRecPtr;
988 : : }
989 : :
3680 rhaas@postgresql.org 990 [ + + ]: 47 : if (!do_start_slot)
1933 peter@eisentraut.org 991 : 24 : exit(0);
992 : :
993 : : /* Stream loop */
994 : : while (true)
995 : : {
3483 andres@anarazel.de 996 :UBC 0 : StreamLogicalLog();
3680 rhaas@postgresql.org 997 [ + + ]:CBC 23 : if (time_to_abort)
998 : : {
999 : : /*
1000 : : * We've been Ctrl-C'ed or reached an exit limit condition. That's
1001 : : * not an error, so exit without an errorcode.
1002 : : */
1933 peter@eisentraut.org 1003 : 8 : exit(0);
1004 : : }
3680 rhaas@postgresql.org 1005 [ + - ]: 15 : else if (noloop)
737 tgl@sss.pgh.pa.us 1006 : 15 : pg_fatal("disconnected");
1007 : : else
1008 : : {
1009 : : /* translator: check source for value for %d */
1840 peter@eisentraut.org 1010 :UBC 0 : pg_log_info("disconnected; waiting %d seconds to try again",
1011 : : RECONNECT_SLEEP_TIME);
3680 rhaas@postgresql.org 1012 : 0 : pg_usleep(RECONNECT_SLEEP_TIME * 1000000);
1013 : : }
1014 : : }
1015 : : }
1016 : :
1017 : : /*
1018 : : * Fsync our output data, and send a feedback message to the server. Returns
1019 : : * true if successful, false otherwise.
1020 : : *
1021 : : * If successful, *now is updated to the current timestamp just before sending
1022 : : * feedback.
1023 : : */
1024 : : static bool
2657 simon@2ndQuadrant.co 1025 :CBC 8 : flushAndSendFeedback(PGconn *conn, TimestampTz *now)
1026 : : {
1027 : : /* flush data to disk, so that we send a recent flush pointer */
1028 [ - + ]: 8 : if (!OutputFsync(*now))
2657 simon@2ndQuadrant.co 1029 :UBC 0 : return false;
2657 simon@2ndQuadrant.co 1030 :CBC 8 : *now = feGetCurrentTimestamp();
1031 [ - + ]: 8 : if (!sendFeedback(conn, *now, true, false))
2657 simon@2ndQuadrant.co 1032 :UBC 0 : return false;
1033 : :
2657 simon@2ndQuadrant.co 1034 :CBC 8 : return true;
1035 : : }
1036 : :
1037 : : /*
1038 : : * Try to inform the server about our upcoming demise, but don't wait around or
1039 : : * retry on failure.
1040 : : */
1041 : : static void
269 michael@paquier.xyz 1042 :GNC 8 : prepareToTerminate(PGconn *conn, XLogRecPtr endpos, StreamStopReason reason,
1043 : : XLogRecPtr lsn)
1044 : : {
2657 simon@2ndQuadrant.co 1045 :CBC 8 : (void) PQputCopyEnd(conn, NULL);
1046 : 8 : (void) PQflush(conn);
1047 : :
1048 [ - + ]: 8 : if (verbose)
1049 : : {
269 michael@paquier.xyz 1050 [ # # # # :UNC 0 : switch (reason)
# ]
1051 : : {
1052 : 0 : case STREAM_STOP_SIGNAL:
1053 : 0 : pg_log_info("received interrupt signal, exiting");
1054 : 0 : break;
1055 : 0 : case STREAM_STOP_KEEPALIVE:
1056 : 0 : pg_log_info("end position %X/%X reached by keepalive",
1057 : : LSN_FORMAT_ARGS(endpos));
1058 : 0 : break;
1059 : 0 : case STREAM_STOP_END_OF_WAL:
1060 [ # # ]: 0 : Assert(!XLogRecPtrIsInvalid(lsn));
1061 : 0 : pg_log_info("end position %X/%X reached by WAL record at %X/%X",
1062 : : LSN_FORMAT_ARGS(endpos), LSN_FORMAT_ARGS(lsn));
1063 : 0 : break;
1064 : 0 : case STREAM_STOP_NONE:
1065 : 0 : Assert(false);
1066 : : break;
1067 : : }
1068 : : }
2657 simon@2ndQuadrant.co 1069 :CBC 8 : }
|