TLA Line data Source code
1 : /*-------------------------------------------------------------------------
2 : *
3 : * pg_recvlogical.c - receive data from a logical decoding slot in a streaming
4 : * fashion and write it to a local file.
5 : *
6 : * Portions Copyright (c) 1996-2023, PostgreSQL Global Development Group
7 : *
8 : * IDENTIFICATION
9 : * src/bin/pg_basebackup/pg_recvlogical.c
10 : *-------------------------------------------------------------------------
11 : */
12 :
13 : #include "postgres_fe.h"
14 :
15 : #include <dirent.h>
16 : #include <limits.h>
17 : #include <sys/select.h>
18 : #include <sys/stat.h>
19 : #include <unistd.h>
20 :
21 : #include "access/xlog_internal.h"
22 : #include "common/fe_memutils.h"
23 : #include "common/file_perm.h"
24 : #include "common/logging.h"
25 : #include "fe_utils/option_utils.h"
26 : #include "getopt_long.h"
27 : #include "libpq-fe.h"
28 : #include "libpq/pqsignal.h"
29 : #include "pqexpbuffer.h"
30 : #include "streamutil.h"
31 :
32 : /* Time to sleep between reconnection attempts */
33 : #define RECONNECT_SLEEP_TIME 5
34 :
35 : /* Global Options */
36 : static char *outfile = NULL;
37 : static int verbose = 0;
38 : static bool two_phase = false;
39 : static int noloop = 0;
40 : static int standby_message_timeout = 10 * 1000; /* 10 sec = default */
41 : static int fsync_interval = 10 * 1000; /* 10 sec = default */
42 : static XLogRecPtr startpos = InvalidXLogRecPtr;
43 : static XLogRecPtr endpos = InvalidXLogRecPtr;
44 : static bool do_create_slot = false;
45 : static bool slot_exists_ok = false;
46 : static bool do_start_slot = false;
47 : static bool do_drop_slot = false;
48 : static char *replication_slot = NULL;
49 :
50 : /* filled pairwise with option, value. value may be NULL */
51 : static char **options;
52 : static size_t noptions = 0;
53 : static const char *plugin = "test_decoding";
54 :
55 : /* Global State */
56 : static int outfd = -1;
57 : static volatile sig_atomic_t time_to_abort = false;
58 : static volatile sig_atomic_t output_reopen = false;
59 : static bool output_isfile;
60 : static TimestampTz output_last_fsync = -1;
61 : static bool output_needs_fsync = false;
62 : static XLogRecPtr output_written_lsn = InvalidXLogRecPtr;
63 : static XLogRecPtr output_fsync_lsn = InvalidXLogRecPtr;
64 :
65 : static void usage(void);
66 : static void StreamLogicalLog(void);
67 : static bool flushAndSendFeedback(PGconn *conn, TimestampTz *now);
68 : static void prepareToTerminate(PGconn *conn, XLogRecPtr endpos,
69 : bool keepalive, XLogRecPtr lsn);
70 ECB :
71 : static void
72 CBC 1 : usage(void)
73 : {
74 1 : printf(_("%s controls PostgreSQL logical decoding streams.\n\n"),
75 ECB : progname);
76 CBC 1 : printf(_("Usage:\n"));
77 1 : printf(_(" %s [OPTION]...\n"), progname);
78 1 : printf(_("\nAction to be performed:\n"));
79 1 : printf(_(" --create-slot create a new replication slot (for the slot's name see --slot)\n"));
80 1 : printf(_(" --drop-slot drop the replication slot (for the slot's name see --slot)\n"));
81 1 : printf(_(" --start start streaming in a replication slot (for the slot's name see --slot)\n"));
82 1 : printf(_("\nOptions:\n"));
83 1 : printf(_(" -E, --endpos=LSN exit after receiving the specified LSN\n"));
84 GIC 1 : printf(_(" -f, --file=FILE receive log into this file, - for stdout\n"));
85 CBC 1 : printf(_(" -F --fsync-interval=SECS\n"
86 ECB : " time between fsyncs to the output file (default: %d)\n"), (fsync_interval / 1000));
87 CBC 1 : printf(_(" --if-not-exists do not error if slot already exists when creating a slot\n"));
88 1 : printf(_(" -I, --startpos=LSN where in an existing slot should the streaming start\n"));
89 GIC 1 : printf(_(" -n, --no-loop do not loop on connection lost\n"));
90 1 : printf(_(" -o, --option=NAME[=VALUE]\n"
91 ECB : " pass option NAME with optional value VALUE to the\n"
92 : " output plugin\n"));
93 GIC 1 : printf(_(" -P, --plugin=PLUGIN use output plugin PLUGIN (default: %s)\n"), plugin);
94 CBC 1 : printf(_(" -s, --status-interval=SECS\n"
95 ECB : " time between status packets sent to server (default: %d)\n"), (standby_message_timeout / 1000));
96 CBC 1 : printf(_(" -S, --slot=SLOTNAME name of the logical replication slot\n"));
97 1 : printf(_(" -t, --two-phase enable decoding of prepared transactions when creating a slot\n"));
98 1 : printf(_(" -v, --verbose output verbose messages\n"));
99 1 : printf(_(" -V, --version output version information, then exit\n"));
100 1 : printf(_(" -?, --help show this help, then exit\n"));
101 1 : printf(_("\nConnection options:\n"));
102 1 : printf(_(" -d, --dbname=DBNAME database to connect to\n"));
103 1 : printf(_(" -h, --host=HOSTNAME database server host or socket directory\n"));
104 1 : printf(_(" -p, --port=PORT database server port number\n"));
105 1 : printf(_(" -U, --username=NAME connect as specified database user\n"));
106 1 : printf(_(" -w, --no-password never prompt for password\n"));
107 1 : printf(_(" -W, --password force password prompt (should happen automatically)\n"));
108 1 : printf(_("\nReport bugs to <%s>.\n"), PACKAGE_BUGREPORT);
109 GIC 1 : printf(_("%s home page: <%s>\n"), PACKAGE_NAME, PACKAGE_URL);
110 1 : }
111 :
112 : /*
113 : * Send a Standby Status Update message to server.
114 ECB : */
115 : static bool
116 GIC 25 : sendFeedback(PGconn *conn, TimestampTz now, bool force, bool replyRequested)
117 : {
118 : static XLogRecPtr last_written_lsn = InvalidXLogRecPtr;
119 : static XLogRecPtr last_fsync_lsn = InvalidXLogRecPtr;
120 ECB :
121 : char replybuf[1 + 8 + 8 + 8 + 8 + 1];
122 GIC 25 : int len = 0;
123 :
124 : /*
125 : * we normally don't want to send superfluous feedback, but if it's
126 : * because of a timeout we need to, otherwise wal_sender_timeout will kill
127 ECB : * us.
128 EUB : */
129 GBC 25 : if (!force &&
130 UBC 0 : last_written_lsn == output_written_lsn &&
131 UIC 0 : last_fsync_lsn == output_fsync_lsn)
132 LBC 0 : return true;
133 EUB :
134 GIC 25 : if (verbose)
135 UIC 0 : pg_log_info("confirming write up to %X/%X, flush to %X/%X (slot %s)",
136 : LSN_FORMAT_ARGS(output_written_lsn),
137 : LSN_FORMAT_ARGS(output_fsync_lsn),
138 ECB : replication_slot);
139 :
140 CBC 25 : replybuf[len] = 'r';
141 25 : len += 1;
142 25 : fe_sendint64(output_written_lsn, &replybuf[len]); /* write */
143 25 : len += 8;
144 25 : fe_sendint64(output_fsync_lsn, &replybuf[len]); /* flush */
145 25 : len += 8;
146 25 : fe_sendint64(InvalidXLogRecPtr, &replybuf[len]); /* apply */
147 25 : len += 8;
148 25 : fe_sendint64(now, &replybuf[len]); /* sendTime */
149 25 : len += 8;
150 GIC 25 : replybuf[len] = replyRequested ? 1 : 0; /* replyRequested */
151 CBC 25 : len += 1;
152 ECB :
153 CBC 25 : startpos = output_written_lsn;
154 GIC 25 : last_written_lsn = output_written_lsn;
155 CBC 25 : last_fsync_lsn = output_fsync_lsn;
156 :
157 GBC 25 : if (PQputCopyData(conn, replybuf, len) <= 0 || PQflush(conn))
158 : {
159 UBC 0 : pg_log_error("could not send feedback packet: %s",
160 : PQerrorMessage(conn));
161 UIC 0 : return false;
162 ECB : }
163 :
164 GIC 25 : return true;
165 : }
166 ECB :
167 : static void
168 CBC 47 : disconnect_atexit(void)
169 ECB : {
170 CBC 47 : if (conn != NULL)
171 GIC 24 : PQfinish(conn);
172 47 : }
173 ECB :
174 : static bool
175 CBC 25 : OutputFsync(TimestampTz now)
176 : {
177 25 : output_last_fsync = now;
178 :
179 25 : output_fsync_lsn = output_written_lsn;
180 EUB :
181 GIC 25 : if (fsync_interval <= 0)
182 LBC 0 : return true;
183 ECB :
184 GIC 25 : if (!output_needs_fsync)
185 CBC 19 : return true;
186 :
187 GIC 6 : output_needs_fsync = false;
188 ECB :
189 : /* can only fsync if it's a regular file */
190 GIC 6 : if (!output_isfile)
191 CBC 4 : return true;
192 EUB :
193 GIC 2 : if (fsync(outfd) != 0)
194 LBC 0 : pg_fatal("could not fsync file \"%s\": %m", outfile);
195 :
196 GIC 2 : return true;
197 : }
198 :
199 : /*
200 : * Start the log streaming
201 ECB : */
202 : static void
203 GIC 23 : StreamLogicalLog(void)
204 ECB : {
205 : PGresult *res;
206 GIC 23 : char *copybuf = NULL;
207 23 : TimestampTz last_status = -1;
208 : int i;
209 ECB : PQExpBuffer query;
210 :
211 GIC 23 : output_written_lsn = InvalidXLogRecPtr;
212 23 : output_fsync_lsn = InvalidXLogRecPtr;
213 :
214 : /*
215 ECB : * Connect in replication mode to the server
216 EUB : */
217 CBC 23 : if (!conn)
218 UIC 0 : conn = GetConnection();
219 GBC 23 : if (!conn)
220 : /* Error message already written in GetConnection() */
221 UIC 0 : return;
222 :
223 : /*
224 ECB : * Start the replication
225 EUB : */
226 GIC 23 : if (verbose)
227 UIC 0 : pg_log_info("starting log streaming at %X/%X (slot %s)",
228 : LSN_FORMAT_ARGS(startpos),
229 : replication_slot);
230 ECB :
231 : /* Initiate the replication stream at specified location */
232 CBC 23 : query = createPQExpBuffer();
233 GIC 23 : appendPQExpBuffer(query, "START_REPLICATION SLOT \"%s\" LOGICAL %X/%X",
234 23 : replication_slot, LSN_FORMAT_ARGS(startpos));
235 ECB :
236 : /* print options if there are any */
237 GIC 23 : if (noptions)
238 CBC 20 : appendPQExpBufferStr(query, " (");
239 :
240 GIC 63 : for (i = 0; i < noptions; i++)
241 ECB : {
242 : /* separator */
243 GIC 40 : if (i > 0)
244 20 : appendPQExpBufferStr(query, ", ");
245 ECB :
246 : /* write option name */
247 GIC 40 : appendPQExpBuffer(query, "\"%s\"", options[(i * 2)]);
248 ECB :
249 : /* write option value if specified */
250 GIC 40 : if (options[(i * 2) + 1] != NULL)
251 40 : appendPQExpBuffer(query, " '%s'", options[(i * 2) + 1]);
252 ECB : }
253 :
254 GIC 23 : if (noptions)
255 CBC 20 : appendPQExpBufferChar(query, ')');
256 ECB :
257 GIC 23 : res = PQexec(conn, query->data);
258 CBC 23 : if (PQresultStatus(res) != PGRES_COPY_BOTH)
259 : {
260 6 : pg_log_error("could not send replication command \"%s\": %s",
261 ECB : query->data, PQresultErrorMessage(res));
262 GIC 6 : PQclear(res);
263 CBC 6 : goto error;
264 ECB : }
265 GIC 17 : PQclear(res);
266 CBC 17 : resetPQExpBuffer(query);
267 EUB :
268 GIC 17 : if (verbose)
269 LBC 0 : pg_log_info("streaming initiated");
270 :
271 GIC 640 : while (!time_to_abort)
272 : {
273 : int r;
274 : int bytes_left;
275 : int bytes_written;
276 ECB : TimestampTz now;
277 : int hdr_len;
278 CBC 639 : XLogRecPtr cur_record_lsn = InvalidXLogRecPtr;
279 :
280 639 : if (copybuf != NULL)
281 ECB : {
282 GIC 380 : PQfreemem(copybuf);
283 380 : copybuf = NULL;
284 : }
285 :
286 : /*
287 ECB : * Potentially send a status message to the primary.
288 : */
289 CBC 639 : now = feGetCurrentTimestamp();
290 ECB :
291 GIC 1261 : if (outfd != -1 &&
292 622 : feTimestampDifferenceExceeds(output_last_fsync, now,
293 ECB : fsync_interval))
294 : {
295 GIC 17 : if (!OutputFsync(now))
296 2 : goto error;
297 ECB : }
298 :
299 GIC 1278 : if (standby_message_timeout > 0 &&
300 639 : feTimestampDifferenceExceeds(last_status, now,
301 : standby_message_timeout))
302 ECB : {
303 EUB : /* Time to send feedback! */
304 GIC 17 : if (!sendFeedback(conn, now, true, false))
305 LBC 0 : goto error;
306 :
307 GIC 17 : last_status = now;
308 : }
309 ECB :
310 : /* got SIGHUP, close output file */
311 GBC 639 : if (outfd != -1 && output_reopen && strcmp(outfile, "-") != 0)
312 EUB : {
313 UBC 0 : now = feGetCurrentTimestamp();
314 0 : if (!OutputFsync(now))
315 0 : goto error;
316 UIC 0 : close(outfd);
317 LBC 0 : outfd = -1;
318 : }
319 GIC 639 : output_reopen = false;
320 ECB :
321 : /* open the output file, if not open yet */
322 GIC 639 : if (outfd == -1)
323 : {
324 ECB : struct stat statbuf;
325 :
326 GIC 17 : if (strcmp(outfile, "-") == 0)
327 GBC 17 : outfd = fileno(stdout);
328 : else
329 LBC 0 : outfd = open(outfile, O_CREAT | O_APPEND | O_WRONLY | PG_BINARY,
330 : S_IRUSR | S_IWUSR);
331 GBC 17 : if (outfd == -1)
332 EUB : {
333 UIC 0 : pg_log_error("could not open log file \"%s\": %m", outfile);
334 0 : goto error;
335 ECB : }
336 :
337 GBC 17 : if (fstat(outfd, &statbuf) != 0)
338 EUB : {
339 UIC 0 : pg_log_error("could not stat file \"%s\": %m", outfile);
340 0 : goto error;
341 ECB : }
342 :
343 GIC 17 : output_isfile = S_ISREG(statbuf.st_mode) && !isatty(outfd);
344 ECB : }
345 :
346 CBC 639 : r = PQgetCopyData(conn, ©buf, 1);
347 GIC 639 : if (r == 0)
348 242 : {
349 : /*
350 : * In async mode, and no data available. We block on reading but
351 : * not more than the specified timeout, so that we can send a
352 : * response back to the client.
353 ECB : */
354 : fd_set input_mask;
355 GIC 245 : TimestampTz message_target = 0;
356 CBC 245 : TimestampTz fsync_target = 0;
357 : struct timeval timeout;
358 245 : struct timeval *timeoutptr = NULL;
359 :
360 GBC 245 : if (PQsocket(conn) < 0)
361 ECB : {
362 UIC 0 : pg_log_error("invalid socket: %s", PQerrorMessage(conn));
363 GIC 2 : goto error;
364 ECB : }
365 :
366 GIC 4165 : FD_ZERO(&input_mask);
367 245 : FD_SET(PQsocket(conn), &input_mask);
368 ECB :
369 : /* Compute when we need to wakeup to send a keepalive message. */
370 GIC 245 : if (standby_message_timeout)
371 245 : message_target = last_status + (standby_message_timeout - 1) *
372 : ((int64) 1000);
373 ECB :
374 : /* Compute when we need to wakeup to fsync the output file. */
375 GIC 245 : if (fsync_interval > 0 && output_needs_fsync)
376 91 : fsync_target = output_last_fsync + (fsync_interval - 1) *
377 : ((int64) 1000);
378 ECB :
379 : /* Now compute when to wakeup. */
380 GIC 245 : if (message_target > 0 || fsync_target > 0)
381 : {
382 : TimestampTz targettime;
383 : long secs;
384 ECB : int usecs;
385 :
386 CBC 245 : targettime = message_target;
387 EUB :
388 GIC 245 : if (fsync_target > 0 && fsync_target < targettime)
389 LBC 0 : targettime = fsync_target;
390 :
391 GIC 245 : feTimestampDifference(now,
392 : targettime,
393 ECB : &secs,
394 EUB : &usecs);
395 GIC 245 : if (secs <= 0)
396 LBC 0 : timeout.tv_sec = 1; /* Always sleep at least 1 sec */
397 ECB : else
398 CBC 245 : timeout.tv_sec = secs;
399 GIC 245 : timeout.tv_usec = usecs;
400 245 : timeoutptr = &timeout;
401 ECB : }
402 :
403 GIC 245 : r = select(PQsocket(conn) + 1, &input_mask, NULL, NULL, timeoutptr);
404 245 : if (r == 0 || (r < 0 && errno == EINTR))
405 : {
406 : /*
407 : * Got a timeout or signal. Continue the loop and either
408 : * deliver a status packet to the server or just go back into
409 ECB : * blocking.
410 : */
411 CBC 243 : continue;
412 : }
413 GBC 244 : else if (r < 0)
414 EUB : {
415 UIC 0 : pg_log_error("%s() failed: %m", "select");
416 0 : goto error;
417 : }
418 ECB :
419 : /* Else there is actually data on the socket */
420 CBC 244 : if (PQconsumeInput(conn) == 0)
421 : {
422 2 : pg_log_error("could not receive data from WAL stream: %s",
423 : PQerrorMessage(conn));
424 2 : goto error;
425 : }
426 GIC 242 : continue;
427 : }
428 ECB :
429 : /* End of copy stream */
430 GIC 394 : if (r == -1)
431 14 : break;
432 ECB :
433 : /* Failure while reading the copy stream */
434 GBC 387 : if (r == -2)
435 : {
436 UBC 0 : pg_log_error("could not read COPY data: %s",
437 : PQerrorMessage(conn));
438 UIC 0 : goto error;
439 : }
440 ECB :
441 : /* Check the message type. */
442 GIC 387 : if (copybuf[0] == 'k')
443 297 : {
444 : int pos;
445 ECB : bool replyRequested;
446 : XLogRecPtr walEnd;
447 GIC 299 : bool endposReached = false;
448 :
449 : /*
450 : * Parse the keepalive message, enclosed in the CopyData message.
451 : * We just check if the server requested a reply, and ignore the
452 ECB : * rest.
453 : */
454 CBC 299 : pos = 1; /* skip msgtype 'k' */
455 GIC 299 : walEnd = fe_recvint64(©buf[pos]);
456 CBC 299 : output_written_lsn = Max(walEnd, output_written_lsn);
457 :
458 299 : pos += 8; /* read walEnd */
459 :
460 299 : pos += 8; /* skip sendTime */
461 :
462 GBC 299 : if (r < pos + 1)
463 EUB : {
464 UIC 0 : pg_log_error("streaming header too small: %d", r);
465 LBC 0 : goto error;
466 : }
467 CBC 299 : replyRequested = copybuf[pos];
468 :
469 GIC 299 : if (endpos != InvalidXLogRecPtr && walEnd >= endpos)
470 : {
471 : /*
472 : * If there's nothing to read on the socket until a keepalive
473 : * we know that the server has nothing to send us; and if
474 : * walEnd has passed endpos, we know nothing else can have
475 ECB : * committed before endpos. So we can bail out now.
476 : */
477 GIC 2 : endposReached = true;
478 : }
479 ECB :
480 : /* Send a reply, if necessary */
481 CBC 299 : if (replyRequested || endposReached)
482 EUB : {
483 CBC 3 : if (!flushAndSendFeedback(conn, &now))
484 UIC 0 : goto error;
485 GIC 3 : last_status = now;
486 ECB : }
487 :
488 CBC 299 : if (endposReached)
489 ECB : {
490 CBC 2 : prepareToTerminate(conn, endpos, true, InvalidXLogRecPtr);
491 GIC 2 : time_to_abort = true;
492 2 : break;
493 ECB : }
494 :
495 CBC 297 : continue;
496 : }
497 GBC 88 : else if (copybuf[0] != 'w')
498 : {
499 UBC 0 : pg_log_error("unrecognized streaming header: \"%c\"",
500 : copybuf[0]);
501 UIC 0 : goto error;
502 : }
503 :
504 : /*
505 : * Read the header of the XLogData message, enclosed in the CopyData
506 : * message. We only need the WAL location field (dataStart), the rest
507 ECB : * of the header is ignored.
508 : */
509 CBC 88 : hdr_len = 1; /* msgtype 'w' */
510 88 : hdr_len += 8; /* dataStart */
511 88 : hdr_len += 8; /* walEnd */
512 GIC 88 : hdr_len += 8; /* sendTime */
513 GBC 88 : if (r < hdr_len + 1)
514 EUB : {
515 UIC 0 : pg_log_error("streaming header too small: %d", r);
516 0 : goto error;
517 : }
518 ECB :
519 : /* Extract WAL location for this block */
520 CBC 88 : cur_record_lsn = fe_recvint64(©buf[1]);
521 :
522 GIC 88 : if (endpos != InvalidXLogRecPtr && cur_record_lsn > endpos)
523 : {
524 : /*
525 : * We've read past our endpoint, so prepare to go away being
526 EUB : * cautious about what happens to our output data.
527 : */
528 UBC 0 : if (!flushAndSendFeedback(conn, &now))
529 0 : goto error;
530 0 : prepareToTerminate(conn, endpos, false, cur_record_lsn);
531 UIC 0 : time_to_abort = true;
532 0 : break;
533 ECB : }
534 :
535 CBC 88 : output_written_lsn = Max(cur_record_lsn, output_written_lsn);
536 ECB :
537 GIC 88 : bytes_left = r - hdr_len;
538 88 : bytes_written = 0;
539 ECB :
540 : /* signal that a fsync is needed */
541 CBC 88 : output_needs_fsync = true;
542 :
543 GIC 176 : while (bytes_left)
544 : {
545 ECB : int ret;
546 :
547 GIC 176 : ret = write(outfd,
548 88 : copybuf + hdr_len + bytes_written,
549 ECB : bytes_left);
550 :
551 GBC 88 : if (ret < 0)
552 : {
553 UBC 0 : pg_log_error("could not write %d bytes to log file \"%s\": %m",
554 : bytes_left, outfile);
555 UIC 0 : goto error;
556 : }
557 ECB :
558 : /* Write was successful, advance our position */
559 GIC 88 : bytes_written += ret;
560 88 : bytes_left -= ret;
561 ECB : }
562 :
563 GBC 88 : if (write(outfd, "\n", 1) != 1)
564 : {
565 UBC 0 : pg_log_error("could not write %d bytes to log file \"%s\": %m",
566 : 1, outfile);
567 UIC 0 : goto error;
568 ECB : }
569 :
570 GIC 88 : if (endpos != InvalidXLogRecPtr && cur_record_lsn == endpos)
571 ECB : {
572 EUB : /* endpos was exactly the record we just processed, we're done */
573 CBC 5 : if (!flushAndSendFeedback(conn, &now))
574 LBC 0 : goto error;
575 CBC 5 : prepareToTerminate(conn, endpos, false, cur_record_lsn);
576 GIC 5 : time_to_abort = true;
577 5 : break;
578 : }
579 ECB : }
580 :
581 GIC 15 : res = PQgetResult(conn);
582 CBC 15 : if (PQresultStatus(res) == PGRES_COPY_OUT)
583 : {
584 GIC 7 : PQclear(res);
585 :
586 : /*
587 : * We're doing a client-initiated clean exit and have sent CopyDone to
588 : * the server. Drain any messages, so we don't miss a last-minute
589 : * ErrorResponse. The walsender stops generating XLogData records once
590 : * it sees CopyDone, so expect this to finish quickly. After CopyDone,
591 : * it's too late for sendFeedback(), even if this were to take a long
592 : * time. Hence, use synchronous-mode PQgetCopyData().
593 ECB : */
594 : while (1)
595 GIC 3 : {
596 ECB : int r;
597 :
598 CBC 10 : if (copybuf != NULL)
599 ECB : {
600 GIC 10 : PQfreemem(copybuf);
601 CBC 10 : copybuf = NULL;
602 ECB : }
603 CBC 10 : r = PQgetCopyData(conn, ©buf, 0);
604 10 : if (r == -1)
605 GIC 7 : break;
606 GBC 3 : if (r == -2)
607 : {
608 UBC 0 : pg_log_error("could not read COPY data: %s",
609 EUB : PQerrorMessage(conn));
610 UIC 0 : time_to_abort = false; /* unclean exit */
611 0 : goto error;
612 : }
613 ECB : }
614 :
615 CBC 7 : res = PQgetResult(conn);
616 : }
617 15 : if (PQresultStatus(res) != PGRES_COMMAND_OK)
618 : {
619 7 : pg_log_error("unexpected termination of replication stream: %s",
620 : PQresultErrorMessage(res));
621 7 : goto error;
622 : }
623 8 : PQclear(res);
624 :
625 GBC 8 : if (outfd != -1 && strcmp(outfile, "-") != 0)
626 : {
627 UIC 0 : TimestampTz t = feGetCurrentTimestamp();
628 EUB :
629 : /* no need to jump to error on failure here, we're finishing anyway */
630 UBC 0 : OutputFsync(t);
631 EUB :
632 UIC 0 : if (close(outfd) != 0)
633 LBC 0 : pg_log_error("could not close file \"%s\": %m", outfile);
634 ECB : }
635 CBC 8 : outfd = -1;
636 GIC 23 : error:
637 GBC 23 : if (copybuf != NULL)
638 EUB : {
639 UIC 0 : PQfreemem(copybuf);
640 LBC 0 : copybuf = NULL;
641 ECB : }
642 CBC 23 : destroyPQExpBuffer(query);
643 GIC 23 : PQfinish(conn);
644 23 : conn = NULL;
645 : }
646 :
647 : /*
648 : * Unfortunately we can't do sensible signal handling on windows...
649 : */
650 : #ifndef WIN32
651 :
652 : /*
653 : * When SIGINT/SIGTERM are caught, just tell the system to exit at the next
654 : * possible moment.
655 ECB : */
656 : static void
657 GNC 1 : sigexit_handler(SIGNAL_ARGS)
658 ECB : {
659 GIC 1 : time_to_abort = true;
660 1 : }
661 :
662 : /*
663 : * Trigger the output file to be reopened.
664 EUB : */
665 : static void
666 UNC 0 : sighup_handler(SIGNAL_ARGS)
667 EUB : {
668 UIC 0 : output_reopen = true;
669 0 : }
670 : #endif
671 :
672 ECB :
673 : int
674 GIC 55 : main(int argc, char **argv)
675 : {
676 : static struct option long_options[] = {
677 : /* general options */
678 : {"file", required_argument, NULL, 'f'},
679 : {"fsync-interval", required_argument, NULL, 'F'},
680 : {"no-loop", no_argument, NULL, 'n'},
681 : {"verbose", no_argument, NULL, 'v'},
682 : {"two-phase", no_argument, NULL, 't'},
683 : {"version", no_argument, NULL, 'V'},
684 : {"help", no_argument, NULL, '?'},
685 : /* connection options */
686 : {"dbname", required_argument, NULL, 'd'},
687 : {"host", required_argument, NULL, 'h'},
688 : {"port", required_argument, NULL, 'p'},
689 : {"username", required_argument, NULL, 'U'},
690 : {"no-password", no_argument, NULL, 'w'},
691 : {"password", no_argument, NULL, 'W'},
692 : /* replication options */
693 : {"startpos", required_argument, NULL, 'I'},
694 : {"endpos", required_argument, NULL, 'E'},
695 : {"option", required_argument, NULL, 'o'},
696 : {"plugin", required_argument, NULL, 'P'},
697 : {"status-interval", required_argument, NULL, 's'},
698 : {"slot", required_argument, NULL, 'S'},
699 : /* action */
700 : {"create-slot", no_argument, NULL, 1},
701 : {"start", no_argument, NULL, 2},
702 : {"drop-slot", no_argument, NULL, 3},
703 : {"if-not-exists", no_argument, NULL, 4},
704 : {NULL, 0, NULL, 0}
705 : };
706 : int c;
707 : int option_index;
708 : uint32 hi,
709 : lo;
710 ECB : char *db_name;
711 :
712 CBC 55 : pg_logging_init(argv[0]);
713 GIC 55 : progname = get_progname(argv[0]);
714 CBC 55 : set_pglocale_pgservice(argv[0], PG_TEXTDOMAIN("pg_basebackup"));
715 :
716 55 : if (argc > 1)
717 : {
718 54 : if (strcmp(argv[1], "--help") == 0 || strcmp(argv[1], "-?") == 0)
719 ECB : {
720 GIC 1 : usage();
721 CBC 1 : exit(0);
722 ECB : }
723 GIC 53 : else if (strcmp(argv[1], "-V") == 0 ||
724 CBC 53 : strcmp(argv[1], "--version") == 0)
725 ECB : {
726 GIC 1 : puts("pg_recvlogical (PostgreSQL) " PG_VERSION);
727 1 : exit(0);
728 : }
729 ECB : }
730 :
731 GNC 321 : while ((c = getopt_long(argc, argv, "E:f:F:ntvd:h:p:U:wWI:o:P:s:S:",
732 CBC 321 : long_options, &option_index)) != -1)
733 : {
734 GIC 269 : switch (c)
735 ECB : {
736 : /* general options */
737 CBC 24 : case 'f':
738 GBC 24 : outfile = pg_strdup(optarg);
739 24 : break;
740 UIC 0 : case 'F':
741 0 : if (!option_parse_int(optarg, "-F/--fsync-interval", 0,
742 EUB : INT_MAX / 1000,
743 : &fsync_interval))
744 UBC 0 : exit(1);
745 LBC 0 : fsync_interval *= 1000;
746 0 : break;
747 CBC 23 : case 'n':
748 23 : noloop = 1;
749 23 : break;
750 GBC 2 : case 't':
751 GIC 2 : two_phase = true;
752 CBC 2 : break;
753 UNC 0 : case 'v':
754 0 : verbose++;
755 0 : break;
756 ECB : /* connection options */
757 CBC 50 : case 'd':
758 GBC 50 : dbname = pg_strdup(optarg);
759 50 : break;
760 UBC 0 : case 'h':
761 0 : dbhost = pg_strdup(optarg);
762 0 : break;
763 0 : case 'p':
764 0 : dbport = pg_strdup(optarg);
765 0 : break;
766 0 : case 'U':
767 0 : dbuser = pg_strdup(optarg);
768 0 : break;
769 0 : case 'w':
770 0 : dbgetpassword = -1;
771 0 : break;
772 0 : case 'W':
773 UIC 0 : dbgetpassword = 1;
774 UBC 0 : break;
775 EUB : /* replication options */
776 UBC 0 : case 'I':
777 0 : if (sscanf(optarg, "%X/%X", &hi, &lo) != 2)
778 0 : pg_fatal("could not parse start position \"%s\"", optarg);
779 LBC 0 : startpos = ((uint64) hi) << 32 | lo;
780 0 : break;
781 GBC 8 : case 'E':
782 CBC 8 : if (sscanf(optarg, "%X/%X", &hi, &lo) != 2)
783 LBC 0 : pg_fatal("could not parse end position \"%s\"", optarg);
784 CBC 8 : endpos = ((uint64) hi) << 32 | lo;
785 GIC 8 : break;
786 CBC 40 : case 'o':
787 ECB : {
788 GIC 40 : char *data = pg_strdup(optarg);
789 CBC 40 : char *val = strchr(data, '=');
790 :
791 GIC 40 : if (val != NULL)
792 ECB : {
793 : /* remove =; separate data from val */
794 GIC 40 : *val = '\0';
795 40 : val++;
796 ECB : }
797 :
798 GIC 40 : noptions += 1;
799 CBC 40 : options = pg_realloc(options, sizeof(char *) * noptions * 2);
800 ECB :
801 GIC 40 : options[(noptions - 1) * 2] = data;
802 40 : options[(noptions - 1) * 2 + 1] = val;
803 ECB : }
804 :
805 CBC 40 : break;
806 21 : case 'P':
807 GBC 21 : plugin = pg_strdup(optarg);
808 21 : break;
809 UIC 0 : case 's':
810 0 : if (!option_parse_int(optarg, "-s/--status-interval", 0,
811 EUB : INT_MAX / 1000,
812 : &standby_message_timeout))
813 UBC 0 : exit(1);
814 LBC 0 : standby_message_timeout *= 1000;
815 0 : break;
816 CBC 51 : case 'S':
817 GIC 51 : replication_slot = pg_strdup(optarg);
818 CBC 51 : break;
819 ECB : /* action */
820 CBC 23 : case 1:
821 23 : do_create_slot = true;
822 23 : break;
823 25 : case 2:
824 25 : do_start_slot = true;
825 25 : break;
826 1 : case 3:
827 GBC 1 : do_drop_slot = true;
828 1 : break;
829 UBC 0 : case 4:
830 UIC 0 : slot_exists_ok = true;
831 LBC 0 : break;
832 :
833 CBC 1 : default:
834 ECB : /* getopt_long already emitted a complaint */
835 GIC 1 : pg_log_error_hint("Try \"%s --help\" for more information.", progname);
836 1 : exit(1);
837 : }
838 : }
839 :
840 : /*
841 ECB : * Any non-option arguments?
842 : */
843 GBC 52 : if (optind < argc)
844 : {
845 UBC 0 : pg_log_error("too many command-line arguments (first is \"%s\")",
846 EUB : argv[optind]);
847 UIC 0 : pg_log_error_hint("Try \"%s --help\" for more information.", progname);
848 0 : exit(1);
849 : }
850 :
851 : /*
852 ECB : * Required arguments
853 : */
854 CBC 52 : if (replication_slot == NULL)
855 ECB : {
856 CBC 1 : pg_log_error("no slot specified");
857 GIC 1 : pg_log_error_hint("Try \"%s --help\" for more information.", progname);
858 1 : exit(1);
859 ECB : }
860 :
861 CBC 51 : if (do_start_slot && outfile == NULL)
862 ECB : {
863 CBC 1 : pg_log_error("no target file specified");
864 GIC 1 : pg_log_error_hint("Try \"%s --help\" for more information.", progname);
865 1 : exit(1);
866 ECB : }
867 :
868 CBC 50 : if (!do_drop_slot && dbname == NULL)
869 ECB : {
870 CBC 1 : pg_log_error("no database specified");
871 GIC 1 : pg_log_error_hint("Try \"%s --help\" for more information.", progname);
872 1 : exit(1);
873 ECB : }
874 :
875 CBC 49 : if (!do_drop_slot && !do_create_slot && !do_start_slot)
876 ECB : {
877 CBC 1 : pg_log_error("at least one action needs to be specified");
878 GIC 1 : pg_log_error_hint("Try \"%s --help\" for more information.", progname);
879 1 : exit(1);
880 ECB : }
881 :
882 GBC 48 : if (do_drop_slot && (do_create_slot || do_start_slot))
883 EUB : {
884 UBC 0 : pg_log_error("cannot use --create-slot or --start together with --drop-slot");
885 UIC 0 : pg_log_error_hint("Try \"%s --help\" for more information.", progname);
886 0 : exit(1);
887 ECB : }
888 :
889 GBC 48 : if (startpos != InvalidXLogRecPtr && (do_create_slot || do_drop_slot))
890 EUB : {
891 UBC 0 : pg_log_error("cannot use --create-slot or --drop-slot together with --startpos");
892 UIC 0 : pg_log_error_hint("Try \"%s --help\" for more information.", progname);
893 0 : exit(1);
894 ECB : }
895 :
896 GBC 48 : if (endpos != InvalidXLogRecPtr && !do_start_slot)
897 EUB : {
898 UBC 0 : pg_log_error("--endpos may only be specified with --start");
899 UIC 0 : pg_log_error_hint("Try \"%s --help\" for more information.", progname);
900 0 : exit(1);
901 ECB : }
902 :
903 CBC 48 : if (two_phase && !do_create_slot)
904 ECB : {
905 CBC 1 : pg_log_error("--two-phase may only be specified with --create-slot");
906 GIC 1 : pg_log_error_hint("Try \"%s --help\" for more information.", progname);
907 1 : exit(1);
908 : }
909 :
910 : /*
911 : * Obtain a connection to server. Notably, if we need a password, we want
912 ECB : * to collect it from the user immediately.
913 : */
914 GIC 47 : conn = GetConnection();
915 GBC 47 : if (!conn)
916 ECB : /* Error message already written in GetConnection() */
917 UIC 0 : exit(1);
918 GIC 47 : atexit(disconnect_atexit);
919 :
920 : /*
921 : * Trap signals. (Don't do this until after the initial password prompt,
922 : * if one is needed, in GetConnection.)
923 ECB : */
924 : #ifndef WIN32
925 GNC 47 : pqsignal(SIGINT, sigexit_handler);
926 47 : pqsignal(SIGTERM, sigexit_handler);
927 GIC 47 : pqsignal(SIGHUP, sighup_handler);
928 : #endif
929 :
930 : /*
931 : * Run IDENTIFY_SYSTEM to make sure we connected using a database specific
932 : * replication connection.
933 ECB : */
934 GBC 47 : if (!RunIdentifySystem(conn, NULL, NULL, NULL, &db_name))
935 UIC 0 : exit(1);
936 ECB :
937 GBC 47 : if (db_name == NULL)
938 UIC 0 : pg_fatal("could not establish database-specific replication connection");
939 :
940 : /*
941 : * Set umask so that directories/files are created with the same
942 : * permissions as directories/files in the source data directory.
943 : *
944 : * pg_mode_mask is set to owner-only by default and then updated in
945 : * GetConnection() where we get the mode from the server-side with
946 : * RetrieveDataDirCreatePerm() and then call SetDataDirectoryCreatePerm().
947 ECB : */
948 GIC 47 : umask(pg_mode_mask);
949 :
950 ECB : /* Drop a replication slot. */
951 GIC 47 : if (do_drop_slot)
952 ECB : {
953 GBC 1 : if (verbose)
954 UIC 0 : pg_log_info("dropping replication slot \"%s\"", replication_slot);
955 ECB :
956 GBC 1 : if (!DropReplicationSlot(conn, replication_slot))
957 UIC 0 : exit(1);
958 : }
959 :
960 ECB : /* Create a replication slot. */
961 GIC 47 : if (do_create_slot)
962 ECB : {
963 GBC 23 : if (verbose)
964 UIC 0 : pg_log_info("creating replication slot \"%s\"", replication_slot);
965 ECB :
966 GIC 23 : if (!CreateReplicationSlot(conn, replication_slot, plugin, false,
967 EUB : false, false, slot_exists_ok, two_phase))
968 LBC 0 : exit(1);
969 GIC 23 : startpos = InvalidXLogRecPtr;
970 : }
971 ECB :
972 CBC 47 : if (!do_start_slot)
973 GIC 24 : exit(0);
974 :
975 : /* Stream loop */
976 : while (true)
977 ECB : {
978 CBC 23 : StreamLogicalLog();
979 GIC 23 : if (time_to_abort)
980 : {
981 : /*
982 : * We've been Ctrl-C'ed or reached an exit limit condition. That's
983 : * not an error, so exit without an errorcode.
984 ECB : */
985 GIC 8 : exit(0);
986 ECB : }
987 CBC 15 : else if (noloop)
988 GIC 15 : pg_fatal("disconnected");
989 : else
990 : {
991 EUB : /* translator: check source for value for %d */
992 UIC 0 : pg_log_info("disconnected; waiting %d seconds to try again",
993 EUB : RECONNECT_SLEEP_TIME);
994 UIC 0 : pg_usleep(RECONNECT_SLEEP_TIME * 1000000);
995 : }
996 : }
997 : }
998 :
999 : /*
1000 : * Fsync our output data, and send a feedback message to the server. Returns
1001 : * true if successful, false otherwise.
1002 : *
1003 : * If successful, *now is updated to the current timestamp just before sending
1004 : * feedback.
1005 : */
1006 ECB : static bool
1007 GIC 8 : flushAndSendFeedback(PGconn *conn, TimestampTz *now)
1008 : {
1009 ECB : /* flush data to disk, so that we send a recent flush pointer */
1010 GBC 8 : if (!OutputFsync(*now))
1011 LBC 0 : return false;
1012 CBC 8 : *now = feGetCurrentTimestamp();
1013 GBC 8 : if (!sendFeedback(conn, *now, true, false))
1014 UIC 0 : return false;
1015 ECB :
1016 GIC 8 : return true;
1017 : }
1018 :
1019 : /*
1020 : * Try to inform the server about our upcoming demise, but don't wait around or
1021 : * retry on failure.
1022 : */
1023 ECB : static void
1024 GIC 7 : prepareToTerminate(PGconn *conn, XLogRecPtr endpos, bool keepalive, XLogRecPtr lsn)
1025 ECB : {
1026 CBC 7 : (void) PQputCopyEnd(conn, NULL);
1027 GIC 7 : (void) PQflush(conn);
1028 ECB :
1029 GIC 7 : if (verbose)
1030 EUB : {
1031 UBC 0 : if (keepalive)
1032 UIC 0 : pg_log_info("end position %X/%X reached by keepalive",
1033 : LSN_FORMAT_ARGS(endpos));
1034 EUB : else
1035 UIC 0 : pg_log_info("end position %X/%X reached by WAL record at %X/%X",
1036 : LSN_FORMAT_ARGS(endpos), LSN_FORMAT_ARGS(lsn));
1037 ECB : }
1038 GIC 7 : }
|