Age Owner Branch data TLA Line data Source code
1 : : /*-------------------------------------------------------------------------
2 : : *
3 : : * pg_createsubscriber.c
4 : : * Create a new logical replica from a standby server
5 : : *
6 : : * Copyright (C) 2024, PostgreSQL Global Development Group
7 : : *
8 : : * IDENTIFICATION
9 : : * src/bin/pg_basebackup/pg_createsubscriber.c
10 : : *
11 : : *-------------------------------------------------------------------------
12 : : */
13 : :
14 : : #include "postgres_fe.h"
15 : :
16 : : #include <sys/time.h>
17 : : #include <sys/wait.h>
18 : : #include <time.h>
19 : :
20 : : #include "catalog/pg_authid_d.h"
21 : : #include "common/connect.h"
22 : : #include "common/controldata_utils.h"
23 : : #include "common/file_perm.h"
24 : : #include "common/logging.h"
25 : : #include "common/pg_prng.h"
26 : : #include "common/restricted_token.h"
27 : : #include "fe_utils/recovery_gen.h"
28 : : #include "fe_utils/simple_list.h"
29 : : #include "getopt_long.h"
30 : :
31 : : #define DEFAULT_SUB_PORT "50432"
32 : :
33 : : /* Command-line options */
34 : : struct CreateSubscriberOptions
35 : : {
36 : : char *config_file; /* configuration file */
37 : : char *pub_conninfo_str; /* publisher connection string */
38 : : char *socket_dir; /* directory for Unix-domain socket, if any */
39 : : char *sub_port; /* subscriber port number */
40 : : const char *sub_username; /* subscriber username */
41 : : SimpleStringList database_names; /* list of database names */
42 : : SimpleStringList pub_names; /* list of publication names */
43 : : SimpleStringList sub_names; /* list of subscription names */
44 : : SimpleStringList replslot_names; /* list of replication slot names */
45 : : int recovery_timeout; /* stop recovery after this time */
46 : : };
47 : :
48 : : struct LogicalRepInfo
49 : : {
50 : : Oid oid; /* database OID */
51 : : char *dbname; /* database name */
52 : : char *pubconninfo; /* publisher connection string */
53 : : char *subconninfo; /* subscriber connection string */
54 : : char *pubname; /* publication name */
55 : : char *subname; /* subscription name */
56 : : char *replslotname; /* replication slot name */
57 : :
58 : : bool made_replslot; /* replication slot was created */
59 : : bool made_publication; /* publication was created */
60 : : };
61 : :
62 : : static void cleanup_objects_atexit(void);
63 : : static void usage();
64 : : static char *get_base_conninfo(const char *conninfo, char **dbname);
65 : : static char *get_sub_conninfo(const struct CreateSubscriberOptions *opt);
66 : : static char *get_exec_path(const char *argv0, const char *progname);
67 : : static void check_data_directory(const char *datadir);
68 : : static char *concat_conninfo_dbname(const char *conninfo, const char *dbname);
69 : : static struct LogicalRepInfo *store_pub_sub_info(const struct CreateSubscriberOptions *opt,
70 : : const char *pub_base_conninfo,
71 : : const char *sub_base_conninfo);
72 : : static PGconn *connect_database(const char *conninfo, bool exit_on_error);
73 : : static void disconnect_database(PGconn *conn, bool exit_on_error);
74 : : static uint64 get_primary_sysid(const char *conninfo);
75 : : static uint64 get_standby_sysid(const char *datadir);
76 : : static void modify_subscriber_sysid(const struct CreateSubscriberOptions *opt);
77 : : static bool server_is_in_recovery(PGconn *conn);
78 : : static char *generate_object_name(PGconn *conn);
79 : : static void check_publisher(const struct LogicalRepInfo *dbinfo);
80 : : static char *setup_publisher(struct LogicalRepInfo *dbinfo);
81 : : static void check_subscriber(const struct LogicalRepInfo *dbinfo);
82 : : static void setup_subscriber(struct LogicalRepInfo *dbinfo,
83 : : const char *consistent_lsn);
84 : : static void setup_recovery(const struct LogicalRepInfo *dbinfo, const char *datadir,
85 : : const char *lsn);
86 : : static void drop_primary_replication_slot(struct LogicalRepInfo *dbinfo,
87 : : const char *slotname);
88 : : static char *create_logical_replication_slot(PGconn *conn,
89 : : struct LogicalRepInfo *dbinfo);
90 : : static void drop_replication_slot(PGconn *conn, struct LogicalRepInfo *dbinfo,
91 : : const char *slot_name);
92 : : static void pg_ctl_status(const char *pg_ctl_cmd, int rc);
93 : : static void start_standby_server(const struct CreateSubscriberOptions *opt,
94 : : bool restricted_access);
95 : : static void stop_standby_server(const char *datadir);
96 : : static void wait_for_end_recovery(const char *conninfo,
97 : : const struct CreateSubscriberOptions *opt);
98 : : static void create_publication(PGconn *conn, struct LogicalRepInfo *dbinfo);
99 : : static void drop_publication(PGconn *conn, struct LogicalRepInfo *dbinfo);
100 : : static void create_subscription(PGconn *conn, const struct LogicalRepInfo *dbinfo);
101 : : static void set_replication_progress(PGconn *conn, const struct LogicalRepInfo *dbinfo,
102 : : const char *lsn);
103 : : static void enable_subscription(PGconn *conn, const struct LogicalRepInfo *dbinfo);
104 : :
105 : : #define USEC_PER_SEC 1000000
106 : : #define WAIT_INTERVAL 1 /* 1 second */
107 : :
108 : : static const char *progname;
109 : :
110 : : static char *primary_slot_name = NULL;
111 : : static bool dry_run = false;
112 : :
113 : : static bool success = false;
114 : :
115 : : static struct LogicalRepInfo *dbinfo;
116 : : static int num_dbs = 0; /* number of specified databases */
117 : : static int num_pubs = 0; /* number of specified publications */
118 : : static int num_subs = 0; /* number of specified subscriptions */
119 : : static int num_replslots = 0; /* number of specified replication slots */
120 : :
121 : : static pg_prng_state prng_state;
122 : :
123 : : static char *pg_ctl_path = NULL;
124 : : static char *pg_resetwal_path = NULL;
125 : :
126 : : /* standby / subscriber data directory */
127 : : static char *subscriber_dir = NULL;
128 : :
129 : : static bool recovery_ended = false;
130 : : static bool standby_running = false;
131 : :
132 : : enum WaitPMResult
133 : : {
134 : : POSTMASTER_READY,
135 : : POSTMASTER_STILL_STARTING
136 : : };
137 : :
138 : :
139 : : /*
140 : : * Cleanup objects that were created by pg_createsubscriber if there is an
141 : : * error.
142 : : *
143 : : * Publications and replication slots are created on primary. Depending on the
144 : : * step it failed, it should remove the already created objects if it is
145 : : * possible (sometimes it won't work due to a connection issue).
146 : : * There is no cleanup on the target server. The steps on the target server are
147 : : * executed *after* promotion, hence, at this point, a failure means recreate
148 : : * the physical replica and start again.
149 : : */
150 : : static void
20 peter@eisentraut.org 151 :GNC 9 : cleanup_objects_atexit(void)
152 : : {
153 [ + + ]: 9 : if (success)
154 : 3 : return;
155 : :
156 : : /*
157 : : * If the server is promoted, there is no way to use the current setup
158 : : * again. Warn the user that a new replication setup should be done before
159 : : * trying again.
160 : : */
161 [ - + ]: 6 : if (recovery_ended)
162 : : {
20 peter@eisentraut.org 163 :UNC 0 : pg_log_warning("failed after the end of recovery");
164 : 0 : pg_log_warning_hint("The target server cannot be used as a physical replica anymore. "
165 : : "You must recreate the physical replica before continuing.");
166 : : }
167 : :
20 peter@eisentraut.org 168 [ + + ]:GNC 18 : for (int i = 0; i < num_dbs; i++)
169 : : {
170 [ + - - + ]: 12 : if (dbinfo[i].made_publication || dbinfo[i].made_replslot)
171 : : {
172 : : PGconn *conn;
173 : :
20 peter@eisentraut.org 174 :UNC 0 : conn = connect_database(dbinfo[i].pubconninfo, false);
175 [ # # ]: 0 : if (conn != NULL)
176 : : {
177 [ # # ]: 0 : if (dbinfo[i].made_publication)
178 : 0 : drop_publication(conn, &dbinfo[i]);
179 [ # # ]: 0 : if (dbinfo[i].made_replslot)
180 : 0 : drop_replication_slot(conn, &dbinfo[i], dbinfo[i].replslotname);
181 : 0 : disconnect_database(conn, false);
182 : : }
183 : : else
184 : : {
185 : : /*
186 : : * If a connection could not be established, inform the user
187 : : * that some objects were left on primary and should be
188 : : * removed before trying again.
189 : : */
190 [ # # ]: 0 : if (dbinfo[i].made_publication)
191 : : {
192 : 0 : pg_log_warning("publication \"%s\" in database \"%s\" on primary might be left behind",
193 : : dbinfo[i].pubname, dbinfo[i].dbname);
194 : 0 : pg_log_warning_hint("Consider dropping this publication before trying again.");
195 : : }
196 [ # # ]: 0 : if (dbinfo[i].made_replslot)
197 : : {
198 : 0 : pg_log_warning("replication slot \"%s\" in database \"%s\" on primary might be left behind",
199 : : dbinfo[i].replslotname, dbinfo[i].dbname);
200 : 0 : pg_log_warning_hint("Drop this replication slot soon to avoid retention of WAL files.");
201 : : }
202 : : }
203 : : }
204 : : }
205 : :
20 peter@eisentraut.org 206 [ + + ]:GNC 6 : if (standby_running)
207 : 4 : stop_standby_server(subscriber_dir);
208 : : }
209 : :
210 : : static void
211 : 1 : usage(void)
212 : : {
213 : 1 : printf(_("%s creates a new logical replica from a standby server.\n\n"),
214 : : progname);
215 : 1 : printf(_("Usage:\n"));
216 : 1 : printf(_(" %s [OPTION]...\n"), progname);
217 : 1 : printf(_("\nOptions:\n"));
218 : 1 : printf(_(" -d, --database=DBNAME database to create a subscription\n"));
219 : 1 : printf(_(" -D, --pgdata=DATADIR location for the subscriber data directory\n"));
220 : 1 : printf(_(" -n, --dry-run dry run, just show what would be done\n"));
221 : 1 : printf(_(" -p, --subscriber-port=PORT subscriber port number (default %s)\n"), DEFAULT_SUB_PORT);
222 : 1 : printf(_(" -P, --publisher-server=CONNSTR publisher connection string\n"));
223 : 1 : printf(_(" -s, --socket-directory=DIR socket directory to use (default current directory)\n"));
224 : 1 : printf(_(" -t, --recovery-timeout=SECS seconds to wait for recovery to end\n"));
225 : 1 : printf(_(" -U, --subscriber-username=NAME subscriber username\n"));
226 : 1 : printf(_(" -v, --verbose output verbose messages\n"));
227 : 1 : printf(_(" --config-file=FILENAME use specified main server configuration\n"
228 : : " file when running target cluster\n"));
229 : 1 : printf(_(" --publication=NAME publication name\n"));
230 : 1 : printf(_(" --replication-slot=NAME replication slot name\n"));
231 : 1 : printf(_(" --subscription=NAME subscription name\n"));
232 : 1 : printf(_(" -V, --version output version information, then exit\n"));
233 : 1 : printf(_(" -?, --help show this help, then exit\n"));
234 : 1 : printf(_("\nReport bugs to <%s>.\n"), PACKAGE_BUGREPORT);
235 : 1 : printf(_("%s home page: <%s>\n"), PACKAGE_NAME, PACKAGE_URL);
236 : 1 : }
237 : :
238 : : /*
239 : : * Validate a connection string. Returns a base connection string that is a
240 : : * connection string without a database name.
241 : : *
242 : : * Since we might process multiple databases, each database name will be
243 : : * appended to this base connection string to provide a final connection
244 : : * string. If the second argument (dbname) is not null, returns dbname if the
245 : : * provided connection string contains it. If option --database is not
246 : : * provided, uses dbname as the only database to setup the logical replica.
247 : : *
248 : : * It is the caller's responsibility to free the returned connection string and
249 : : * dbname.
250 : : */
251 : : static char *
252 : 13 : get_base_conninfo(const char *conninfo, char **dbname)
253 : : {
254 : : PQExpBuffer buf;
255 : : PQconninfoOption *conn_opts;
256 : : PQconninfoOption *conn_opt;
257 : 13 : char *errmsg = NULL;
258 : : char *ret;
259 : : int i;
260 : :
261 : 13 : conn_opts = PQconninfoParse(conninfo, &errmsg);
262 [ - + ]: 13 : if (conn_opts == NULL)
263 : : {
20 peter@eisentraut.org 264 :UNC 0 : pg_log_error("could not parse connection string: %s", errmsg);
13 tgl@sss.pgh.pa.us 265 : 0 : PQfreemem(errmsg);
20 peter@eisentraut.org 266 : 0 : return NULL;
267 : : }
268 : :
13 tgl@sss.pgh.pa.us 269 :GNC 13 : buf = createPQExpBuffer();
20 peter@eisentraut.org 270 : 13 : i = 0;
271 [ + + ]: 546 : for (conn_opt = conn_opts; conn_opt->keyword != NULL; conn_opt++)
272 : : {
273 [ + + + + ]: 533 : if (strcmp(conn_opt->keyword, "dbname") == 0 && conn_opt->val != NULL)
274 : : {
275 [ + - ]: 9 : if (dbname)
276 : 9 : *dbname = pg_strdup(conn_opt->val);
277 : 9 : continue;
278 : : }
279 : :
280 [ + + + - ]: 524 : if (conn_opt->val != NULL && conn_opt->val[0] != '\0')
281 : : {
282 [ + + ]: 22 : if (i > 0)
283 : 9 : appendPQExpBufferChar(buf, ' ');
284 : 22 : appendPQExpBuffer(buf, "%s=%s", conn_opt->keyword, conn_opt->val);
285 : 22 : i++;
286 : : }
287 : : }
288 : :
289 : 13 : ret = pg_strdup(buf->data);
290 : :
291 : 13 : destroyPQExpBuffer(buf);
292 : 13 : PQconninfoFree(conn_opts);
293 : :
294 : 13 : return ret;
295 : : }
296 : :
297 : : /*
298 : : * Build a subscriber connection string. Only a few parameters are supported
299 : : * since it starts a server with restricted access.
300 : : */
301 : : static char *
302 : 13 : get_sub_conninfo(const struct CreateSubscriberOptions *opt)
303 : : {
304 : 13 : PQExpBuffer buf = createPQExpBuffer();
305 : : char *ret;
306 : :
307 : 13 : appendPQExpBuffer(buf, "port=%s", opt->sub_port);
308 : : #if !defined(WIN32)
309 : 13 : appendPQExpBuffer(buf, " host=%s", opt->socket_dir);
310 : : #endif
311 [ - + ]: 13 : if (opt->sub_username != NULL)
20 peter@eisentraut.org 312 :UNC 0 : appendPQExpBuffer(buf, " user=%s", opt->sub_username);
20 peter@eisentraut.org 313 :GNC 13 : appendPQExpBuffer(buf, " fallback_application_name=%s", progname);
314 : :
315 : 13 : ret = pg_strdup(buf->data);
316 : :
317 : 13 : destroyPQExpBuffer(buf);
318 : :
319 : 13 : return ret;
320 : : }
321 : :
322 : : /*
323 : : * Verify if a PostgreSQL binary (progname) is available in the same directory as
324 : : * pg_createsubscriber and it has the same version. It returns the absolute
325 : : * path of the progname.
326 : : */
327 : : static char *
328 : 18 : get_exec_path(const char *argv0, const char *progname)
329 : : {
330 : : char *versionstr;
331 : : char *exec_path;
332 : : int ret;
333 : :
334 : 18 : versionstr = psprintf("%s (PostgreSQL) %s\n", progname, PG_VERSION);
335 : 18 : exec_path = pg_malloc(MAXPGPATH);
336 : 18 : ret = find_other_exec(argv0, progname, versionstr, exec_path);
337 : :
338 [ - + ]: 18 : if (ret < 0)
339 : : {
340 : : char full_path[MAXPGPATH];
341 : :
20 peter@eisentraut.org 342 [ # # ]:UNC 0 : if (find_my_exec(argv0, full_path) < 0)
343 : 0 : strlcpy(full_path, progname, sizeof(full_path));
344 : :
345 [ # # ]: 0 : if (ret == -1)
346 : 0 : pg_fatal("program \"%s\" is needed by %s but was not found in the same directory as \"%s\"",
347 : : progname, "pg_createsubscriber", full_path);
348 : : else
349 : 0 : pg_fatal("program \"%s\" was found by \"%s\" but was not the same version as %s",
350 : : progname, full_path, "pg_createsubscriber");
351 : : }
352 : :
20 peter@eisentraut.org 353 [ + + ]:GNC 18 : pg_log_debug("%s path is: %s", progname, exec_path);
354 : :
355 : 18 : return exec_path;
356 : : }
357 : :
358 : : /*
359 : : * Is it a cluster directory? These are preliminary checks. It is far from
360 : : * making an accurate check. If it is not a clone from the publisher, it will
361 : : * eventually fail in a future step.
362 : : */
363 : : static void
364 : 9 : check_data_directory(const char *datadir)
365 : : {
366 : : struct stat statbuf;
367 : : char versionfile[MAXPGPATH];
368 : :
369 : 9 : pg_log_info("checking if directory \"%s\" is a cluster data directory",
370 : : datadir);
371 : :
372 [ - + ]: 9 : if (stat(datadir, &statbuf) != 0)
373 : : {
20 peter@eisentraut.org 374 [ # # ]:UNC 0 : if (errno == ENOENT)
375 : 0 : pg_fatal("data directory \"%s\" does not exist", datadir);
376 : : else
19 377 : 0 : pg_fatal("could not access directory \"%s\": %m", datadir);
378 : : }
379 : :
20 peter@eisentraut.org 380 :GNC 9 : snprintf(versionfile, MAXPGPATH, "%s/PG_VERSION", datadir);
381 [ - + - - ]: 9 : if (stat(versionfile, &statbuf) != 0 && errno == ENOENT)
382 : : {
20 peter@eisentraut.org 383 :UNC 0 : pg_fatal("directory \"%s\" is not a database cluster directory",
384 : : datadir);
385 : : }
20 peter@eisentraut.org 386 :GNC 9 : }
387 : :
388 : : /*
389 : : * Append database name into a base connection string.
390 : : *
391 : : * dbname is the only parameter that changes so it is not included in the base
392 : : * connection string. This function concatenates dbname to build a "real"
393 : : * connection string.
394 : : */
395 : : static char *
396 : 34 : concat_conninfo_dbname(const char *conninfo, const char *dbname)
397 : : {
398 : 34 : PQExpBuffer buf = createPQExpBuffer();
399 : : char *ret;
400 : :
401 [ - + ]: 34 : Assert(conninfo != NULL);
402 : :
403 : 34 : appendPQExpBufferStr(buf, conninfo);
404 : 34 : appendPQExpBuffer(buf, " dbname=%s", dbname);
405 : :
406 : 34 : ret = pg_strdup(buf->data);
407 : 34 : destroyPQExpBuffer(buf);
408 : :
409 : 34 : return ret;
410 : : }
411 : :
412 : : /*
413 : : * Store publication and subscription information.
414 : : *
415 : : * If publication, replication slot and subscription names were specified,
416 : : * store it here. Otherwise, a generated name will be assigned to the object in
417 : : * setup_publisher().
418 : : */
419 : : static struct LogicalRepInfo *
420 : 9 : store_pub_sub_info(const struct CreateSubscriberOptions *opt,
421 : : const char *pub_base_conninfo,
422 : : const char *sub_base_conninfo)
423 : : {
424 : : struct LogicalRepInfo *dbinfo;
425 : 9 : SimpleStringListCell *pubcell = NULL;
426 : 9 : SimpleStringListCell *subcell = NULL;
427 : 9 : SimpleStringListCell *replslotcell = NULL;
428 : 9 : int i = 0;
429 : :
430 : 9 : dbinfo = pg_malloc_array(struct LogicalRepInfo, num_dbs);
431 : :
432 [ + + ]: 9 : if (num_pubs > 0)
433 : 2 : pubcell = opt->pub_names.head;
434 [ + + ]: 9 : if (num_subs > 0)
435 : 1 : subcell = opt->sub_names.head;
436 [ + + ]: 9 : if (num_replslots > 0)
437 : 2 : replslotcell = opt->replslot_names.head;
438 : :
439 [ + + ]: 26 : for (SimpleStringListCell *cell = opt->database_names.head; cell; cell = cell->next)
440 : : {
441 : : char *conninfo;
442 : :
443 : : /* Fill publisher attributes */
444 : 17 : conninfo = concat_conninfo_dbname(pub_base_conninfo, cell->val);
445 : 17 : dbinfo[i].pubconninfo = conninfo;
446 : 17 : dbinfo[i].dbname = cell->val;
447 [ + + ]: 17 : if (num_pubs > 0)
448 : 4 : dbinfo[i].pubname = pubcell->val;
449 : : else
450 : 13 : dbinfo[i].pubname = NULL;
451 [ + + ]: 17 : if (num_replslots > 0)
452 : 3 : dbinfo[i].replslotname = replslotcell->val;
453 : : else
454 : 14 : dbinfo[i].replslotname = NULL;
455 : 17 : dbinfo[i].made_replslot = false;
456 : 17 : dbinfo[i].made_publication = false;
457 : : /* Fill subscriber attributes */
458 : 17 : conninfo = concat_conninfo_dbname(sub_base_conninfo, cell->val);
459 : 17 : dbinfo[i].subconninfo = conninfo;
460 [ + + ]: 17 : if (num_subs > 0)
461 : 2 : dbinfo[i].subname = subcell->val;
462 : : else
463 : 15 : dbinfo[i].subname = NULL;
464 : : /* Other fields will be filled later */
465 : :
466 [ + + + - : 17 : pg_log_debug("publisher(%d): publication: %s ; replication slot: %s ; connection string: %s", i,
+ - ]
467 : : dbinfo[i].pubname ? dbinfo[i].pubname : "(auto)",
468 : : dbinfo[i].replslotname ? dbinfo[i].replslotname : "(auto)",
469 : : dbinfo[i].pubconninfo);
470 [ + + - + ]: 17 : pg_log_debug("subscriber(%d): subscription: %s ; connection string: %s", i,
471 : : dbinfo[i].subname ? dbinfo[i].subname : "(auto)",
472 : : dbinfo[i].subconninfo);
473 : :
474 [ + + ]: 17 : if (num_pubs > 0)
475 : 4 : pubcell = pubcell->next;
476 [ + + ]: 17 : if (num_subs > 0)
477 : 2 : subcell = subcell->next;
478 [ + + ]: 17 : if (num_replslots > 0)
479 : 3 : replslotcell = replslotcell->next;
480 : :
481 : 17 : i++;
482 : : }
483 : :
484 : 9 : return dbinfo;
485 : : }
486 : :
487 : : /*
488 : : * Open a new connection. If exit_on_error is true, it has an undesired
489 : : * condition and it should exit immediately.
490 : : */
491 : : static PGconn *
492 : 40 : connect_database(const char *conninfo, bool exit_on_error)
493 : : {
494 : : PGconn *conn;
495 : : PGresult *res;
496 : :
497 : 40 : conn = PQconnectdb(conninfo);
498 [ - + ]: 40 : if (PQstatus(conn) != CONNECTION_OK)
499 : : {
20 peter@eisentraut.org 500 :UNC 0 : pg_log_error("connection to database failed: %s",
501 : : PQerrorMessage(conn));
13 tgl@sss.pgh.pa.us 502 : 0 : PQfinish(conn);
503 : :
20 peter@eisentraut.org 504 [ # # ]: 0 : if (exit_on_error)
505 : 0 : exit(1);
506 : 0 : return NULL;
507 : : }
508 : :
509 : : /* Secure search_path */
20 peter@eisentraut.org 510 :GNC 40 : res = PQexec(conn, ALWAYS_SECURE_SEARCH_PATH_SQL);
511 [ - + ]: 40 : if (PQresultStatus(res) != PGRES_TUPLES_OK)
512 : : {
20 peter@eisentraut.org 513 :UNC 0 : pg_log_error("could not clear search_path: %s",
514 : : PQresultErrorMessage(res));
13 tgl@sss.pgh.pa.us 515 : 0 : PQclear(res);
516 : 0 : PQfinish(conn);
517 : :
20 peter@eisentraut.org 518 [ # # ]: 0 : if (exit_on_error)
519 : 0 : exit(1);
520 : 0 : return NULL;
521 : : }
20 peter@eisentraut.org 522 :GNC 40 : PQclear(res);
523 : :
524 : 40 : return conn;
525 : : }
526 : :
527 : : /*
528 : : * Close the connection. If exit_on_error is true, it has an undesired
529 : : * condition and it should exit immediately.
530 : : */
531 : : static void
532 : 40 : disconnect_database(PGconn *conn, bool exit_on_error)
533 : : {
534 [ - + ]: 40 : Assert(conn != NULL);
535 : :
536 : 40 : PQfinish(conn);
537 : :
538 [ + + ]: 40 : if (exit_on_error)
539 : 2 : exit(1);
540 : 38 : }
541 : :
542 : : /*
543 : : * Obtain the system identifier using the provided connection. It will be used
544 : : * to compare if a data directory is a clone of another one.
545 : : */
546 : : static uint64
547 : 9 : get_primary_sysid(const char *conninfo)
548 : : {
549 : : PGconn *conn;
550 : : PGresult *res;
551 : : uint64 sysid;
552 : :
553 : 9 : pg_log_info("getting system identifier from publisher");
554 : :
555 : 9 : conn = connect_database(conninfo, true);
556 : :
557 : 9 : res = PQexec(conn, "SELECT system_identifier FROM pg_catalog.pg_control_system()");
558 [ - + ]: 9 : if (PQresultStatus(res) != PGRES_TUPLES_OK)
559 : : {
20 peter@eisentraut.org 560 :UNC 0 : pg_log_error("could not get system identifier: %s",
561 : : PQresultErrorMessage(res));
562 : 0 : disconnect_database(conn, true);
563 : : }
20 peter@eisentraut.org 564 [ - + ]:GNC 9 : if (PQntuples(res) != 1)
565 : : {
20 peter@eisentraut.org 566 :UNC 0 : pg_log_error("could not get system identifier: got %d rows, expected %d row",
567 : : PQntuples(res), 1);
568 : 0 : disconnect_database(conn, true);
569 : : }
570 : :
20 peter@eisentraut.org 571 :GNC 9 : sysid = strtou64(PQgetvalue(res, 0, 0), NULL, 10);
572 : :
573 : 9 : pg_log_info("system identifier is %llu on publisher",
574 : : (unsigned long long) sysid);
575 : :
576 : 9 : PQclear(res);
577 : 9 : disconnect_database(conn, false);
578 : :
579 : 9 : return sysid;
580 : : }
581 : :
582 : : /*
583 : : * Obtain the system identifier from control file. It will be used to compare
584 : : * if a data directory is a clone of another one. This routine is used locally
585 : : * and avoids a connection.
586 : : */
587 : : static uint64
588 : 9 : get_standby_sysid(const char *datadir)
589 : : {
590 : : ControlFileData *cf;
591 : : bool crc_ok;
592 : : uint64 sysid;
593 : :
594 : 9 : pg_log_info("getting system identifier from subscriber");
595 : :
596 : 9 : cf = get_controlfile(datadir, &crc_ok);
597 [ - + ]: 9 : if (!crc_ok)
20 peter@eisentraut.org 598 :UNC 0 : pg_fatal("control file appears to be corrupt");
599 : :
20 peter@eisentraut.org 600 :GNC 9 : sysid = cf->system_identifier;
601 : :
602 : 9 : pg_log_info("system identifier is %llu on subscriber",
603 : : (unsigned long long) sysid);
604 : :
605 : 9 : pg_free(cf);
606 : :
607 : 9 : return sysid;
608 : : }
609 : :
610 : : /*
611 : : * Modify the system identifier. Since a standby server preserves the system
612 : : * identifier, it makes sense to change it to avoid situations in which WAL
613 : : * files from one of the systems might be used in the other one.
614 : : */
615 : : static void
616 : 3 : modify_subscriber_sysid(const struct CreateSubscriberOptions *opt)
617 : : {
618 : : ControlFileData *cf;
619 : : bool crc_ok;
620 : : struct timeval tv;
621 : :
622 : : char *cmd_str;
623 : :
624 : 3 : pg_log_info("modifying system identifier of subscriber");
625 : :
626 : 3 : cf = get_controlfile(subscriber_dir, &crc_ok);
627 [ - + ]: 3 : if (!crc_ok)
20 peter@eisentraut.org 628 :UNC 0 : pg_fatal("control file appears to be corrupt");
629 : :
630 : : /*
631 : : * Select a new system identifier.
632 : : *
633 : : * XXX this code was extracted from BootStrapXLOG().
634 : : */
20 peter@eisentraut.org 635 :GNC 3 : gettimeofday(&tv, NULL);
636 : 3 : cf->system_identifier = ((uint64) tv.tv_sec) << 32;
637 : 3 : cf->system_identifier |= ((uint64) tv.tv_usec) << 12;
638 : 3 : cf->system_identifier |= getpid() & 0xFFF;
639 : :
640 [ + + ]: 3 : if (!dry_run)
641 : 1 : update_controlfile(subscriber_dir, cf, true);
642 : :
643 : 3 : pg_log_info("system identifier is %llu on subscriber",
644 : : (unsigned long long) cf->system_identifier);
645 : :
646 : 3 : pg_log_info("running pg_resetwal on the subscriber");
647 : :
648 : 3 : cmd_str = psprintf("\"%s\" -D \"%s\" > \"%s\"", pg_resetwal_path,
649 : : subscriber_dir, DEVNULL);
650 : :
651 [ + + ]: 3 : pg_log_debug("pg_resetwal command is: %s", cmd_str);
652 : :
653 [ + + ]: 3 : if (!dry_run)
654 : : {
655 : 1 : int rc = system(cmd_str);
656 : :
657 [ + - ]: 1 : if (rc == 0)
658 : 1 : pg_log_info("subscriber successfully changed the system identifier");
659 : : else
20 peter@eisentraut.org 660 :UNC 0 : pg_fatal("subscriber failed to change system identifier: exit code: %d", rc);
661 : : }
662 : :
20 peter@eisentraut.org 663 :GNC 3 : pg_free(cf);
664 : 3 : }
665 : :
666 : : /*
667 : : * Generate an object name using a prefix, database oid and a random integer.
668 : : * It is used in case the user does not specify an object name (publication,
669 : : * subscription, replication slot).
670 : : */
671 : : static char *
672 : 5 : generate_object_name(PGconn *conn)
673 : : {
674 : : PGresult *res;
675 : : Oid oid;
676 : : uint32 rand;
677 : : char *objname;
678 : :
679 : 5 : res = PQexec(conn,
680 : : "SELECT oid FROM pg_catalog.pg_database "
681 : : "WHERE datname = pg_catalog.current_database()");
682 [ - + ]: 5 : if (PQresultStatus(res) != PGRES_TUPLES_OK)
683 : : {
20 peter@eisentraut.org 684 :UNC 0 : pg_log_error("could not obtain database OID: %s",
685 : : PQresultErrorMessage(res));
686 : 0 : disconnect_database(conn, true);
687 : : }
688 : :
20 peter@eisentraut.org 689 [ - + ]:GNC 5 : if (PQntuples(res) != 1)
690 : : {
19 peter@eisentraut.org 691 :UNC 0 : pg_log_error("could not obtain database OID: got %d rows, expected %d row",
692 : : PQntuples(res), 1);
20 693 : 0 : disconnect_database(conn, true);
694 : : }
695 : :
696 : : /* Database OID */
20 peter@eisentraut.org 697 :GNC 5 : oid = strtoul(PQgetvalue(res, 0, 0), NULL, 10);
698 : :
699 : 5 : PQclear(res);
700 : :
701 : : /* Random unsigned integer */
702 : 5 : rand = pg_prng_uint32(&prng_state);
703 : :
704 : : /*
705 : : * Build the object name. The name must not exceed NAMEDATALEN - 1. This
706 : : * current schema uses a maximum of 40 characters (20 + 10 + 1 + 8 +
707 : : * '\0').
708 : : */
709 : 5 : objname = psprintf("pg_createsubscriber_%u_%x", oid, rand);
710 : :
711 : 5 : return objname;
712 : : }
713 : :
714 : : /*
715 : : * Create the publications and replication slots in preparation for logical
716 : : * replication. Returns the LSN from latest replication slot. It will be the
717 : : * replication start point that is used to adjust the subscriptions (see
718 : : * set_replication_progress).
719 : : */
720 : : static char *
721 : 3 : setup_publisher(struct LogicalRepInfo *dbinfo)
722 : : {
723 : 3 : char *lsn = NULL;
724 : :
725 : 3 : pg_prng_seed(&prng_state, (uint64) (getpid() ^ time(NULL)));
726 : :
727 [ + + ]: 8 : for (int i = 0; i < num_dbs; i++)
728 : : {
729 : : PGconn *conn;
730 : 5 : char *genname = NULL;
731 : :
732 : 5 : conn = connect_database(dbinfo[i].pubconninfo, true);
733 : :
734 : : /*
735 : : * If an object name was not specified as command-line options, assign
736 : : * a generated object name. The replication slot has a different rule.
737 : : * The subscription name is assigned to the replication slot name if
738 : : * no replication slot is specified. It follows the same rule as
739 : : * CREATE SUBSCRIPTION.
740 : : */
741 [ + + + + : 5 : if (num_pubs == 0 || num_subs == 0 || num_replslots == 0)
+ - ]
742 : 5 : genname = generate_object_name(conn);
743 [ + + ]: 5 : if (num_pubs == 0)
744 : 1 : dbinfo[i].pubname = pg_strdup(genname);
745 [ + + ]: 5 : if (num_subs == 0)
746 : 3 : dbinfo[i].subname = pg_strdup(genname);
747 [ + + ]: 5 : if (num_replslots == 0)
748 : 2 : dbinfo[i].replslotname = pg_strdup(dbinfo[i].subname);
749 : :
750 : : /*
751 : : * Create publication on publisher. This step should be executed
752 : : * *before* promoting the subscriber to avoid any transactions between
753 : : * consistent LSN and the new publication rows (such transactions
754 : : * wouldn't see the new publication rows resulting in an error).
755 : : */
756 : 5 : create_publication(conn, &dbinfo[i]);
757 : :
758 : : /* Create replication slot on publisher */
759 [ + + ]: 5 : if (lsn)
760 : 1 : pg_free(lsn);
761 : 5 : lsn = create_logical_replication_slot(conn, &dbinfo[i]);
762 [ + + + - ]: 5 : if (lsn != NULL || dry_run)
763 : 5 : pg_log_info("create replication slot \"%s\" on publisher",
764 : : dbinfo[i].replslotname);
765 : : else
20 peter@eisentraut.org 766 :UNC 0 : exit(1);
767 : :
20 peter@eisentraut.org 768 :GNC 5 : disconnect_database(conn, false);
769 : : }
770 : :
771 : 3 : return lsn;
772 : : }
773 : :
774 : : /*
775 : : * Is recovery still in progress?
776 : : */
777 : : static bool
778 : 27 : server_is_in_recovery(PGconn *conn)
779 : : {
780 : : PGresult *res;
781 : : int ret;
782 : :
783 : 27 : res = PQexec(conn, "SELECT pg_catalog.pg_is_in_recovery()");
784 : :
785 [ - + ]: 27 : if (PQresultStatus(res) != PGRES_TUPLES_OK)
786 : : {
20 peter@eisentraut.org 787 :UNC 0 : pg_log_error("could not obtain recovery progress: %s",
788 : : PQresultErrorMessage(res));
789 : 0 : disconnect_database(conn, true);
790 : : }
791 : :
792 : :
20 peter@eisentraut.org 793 :GNC 27 : ret = strcmp("t", PQgetvalue(res, 0, 0));
794 : :
795 : 27 : PQclear(res);
796 : :
797 : 27 : return ret == 0;
798 : : }
799 : :
800 : : /*
801 : : * Is the primary server ready for logical replication?
802 : : *
803 : : * XXX Does it not allow a synchronous replica?
804 : : */
805 : : static void
806 : 5 : check_publisher(const struct LogicalRepInfo *dbinfo)
807 : : {
808 : : PGconn *conn;
809 : : PGresult *res;
810 : 5 : bool failed = false;
811 : :
812 : : char *wal_level;
813 : : int max_repslots;
814 : : int cur_repslots;
815 : : int max_walsenders;
816 : : int cur_walsenders;
817 : :
818 : 5 : pg_log_info("checking settings on publisher");
819 : :
820 : 5 : conn = connect_database(dbinfo[0].pubconninfo, true);
821 : :
822 : : /*
823 : : * If the primary server is in recovery (i.e. cascading replication),
824 : : * objects (publication) cannot be created because it is read only.
825 : : */
826 [ + + ]: 5 : if (server_is_in_recovery(conn))
827 : : {
828 : 1 : pg_log_error("primary server cannot be in recovery");
829 : 1 : disconnect_database(conn, true);
830 : : }
831 : :
832 : : /*------------------------------------------------------------------------
833 : : * Logical replication requires a few parameters to be set on publisher.
834 : : * Since these parameters are not a requirement for physical replication,
835 : : * we should check it to make sure it won't fail.
836 : : *
837 : : * - wal_level = logical
838 : : * - max_replication_slots >= current + number of dbs to be converted
839 : : * - max_wal_senders >= current + number of dbs to be converted
840 : : * -----------------------------------------------------------------------
841 : : */
842 : 4 : res = PQexec(conn,
843 : : "WITH wl AS "
844 : : "(SELECT setting AS wallevel FROM pg_catalog.pg_settings "
845 : : "WHERE name = 'wal_level'), "
846 : : "total_mrs AS "
847 : : "(SELECT setting AS tmrs FROM pg_catalog.pg_settings "
848 : : "WHERE name = 'max_replication_slots'), "
849 : : "cur_mrs AS "
850 : : "(SELECT count(*) AS cmrs "
851 : : "FROM pg_catalog.pg_replication_slots), "
852 : : "total_mws AS "
853 : : "(SELECT setting AS tmws FROM pg_catalog.pg_settings "
854 : : "WHERE name = 'max_wal_senders'), "
855 : : "cur_mws AS "
856 : : "(SELECT count(*) AS cmws FROM pg_catalog.pg_stat_activity "
857 : : "WHERE backend_type = 'walsender') "
858 : : "SELECT wallevel, tmrs, cmrs, tmws, cmws "
859 : : "FROM wl, total_mrs, cur_mrs, total_mws, cur_mws");
860 : :
861 [ - + ]: 4 : if (PQresultStatus(res) != PGRES_TUPLES_OK)
862 : : {
20 peter@eisentraut.org 863 :UNC 0 : pg_log_error("could not obtain publisher settings: %s",
864 : : PQresultErrorMessage(res));
865 : 0 : disconnect_database(conn, true);
866 : : }
867 : :
20 peter@eisentraut.org 868 :GNC 4 : wal_level = pg_strdup(PQgetvalue(res, 0, 0));
869 : 4 : max_repslots = atoi(PQgetvalue(res, 0, 1));
870 : 4 : cur_repslots = atoi(PQgetvalue(res, 0, 2));
871 : 4 : max_walsenders = atoi(PQgetvalue(res, 0, 3));
872 : 4 : cur_walsenders = atoi(PQgetvalue(res, 0, 4));
873 : :
874 : 4 : PQclear(res);
875 : :
876 [ + + ]: 4 : pg_log_debug("publisher: wal_level: %s", wal_level);
877 [ + + ]: 4 : pg_log_debug("publisher: max_replication_slots: %d", max_repslots);
878 [ + + ]: 4 : pg_log_debug("publisher: current replication slots: %d", cur_repslots);
879 [ + + ]: 4 : pg_log_debug("publisher: max_wal_senders: %d", max_walsenders);
880 [ + + ]: 4 : pg_log_debug("publisher: current wal senders: %d", cur_walsenders);
881 : :
882 : : /*
883 : : * If standby sets primary_slot_name, check if this replication slot is in
884 : : * use on primary for WAL retention purposes. This replication slot has no
885 : : * use after the transformation, hence, it will be removed at the end of
886 : : * this process.
887 : : */
888 [ + - ]: 4 : if (primary_slot_name)
889 : : {
890 : 4 : PQExpBuffer str = createPQExpBuffer();
891 : 4 : char *psn_esc = PQescapeLiteral(conn, primary_slot_name, strlen(primary_slot_name));
892 : :
893 : 4 : appendPQExpBuffer(str,
894 : : "SELECT 1 FROM pg_catalog.pg_replication_slots "
895 : : "WHERE active AND slot_name = %s",
896 : : psn_esc);
897 : :
898 : 4 : pg_free(psn_esc);
899 : :
900 [ + + ]: 4 : pg_log_debug("command is: %s", str->data);
901 : :
902 : 4 : res = PQexec(conn, str->data);
903 [ - + ]: 4 : if (PQresultStatus(res) != PGRES_TUPLES_OK)
904 : : {
20 peter@eisentraut.org 905 :UNC 0 : pg_log_error("could not obtain replication slot information: %s",
906 : : PQresultErrorMessage(res));
907 : 0 : disconnect_database(conn, true);
908 : : }
909 : :
20 peter@eisentraut.org 910 [ - + ]:GNC 4 : if (PQntuples(res) != 1)
911 : : {
20 peter@eisentraut.org 912 :UNC 0 : pg_log_error("could not obtain replication slot information: got %d rows, expected %d row",
913 : : PQntuples(res), 1);
914 : 0 : disconnect_database(conn, true);
915 : : }
916 : : else
20 peter@eisentraut.org 917 :GNC 4 : pg_log_info("primary has replication slot \"%s\"",
918 : : primary_slot_name);
919 : :
920 : 4 : PQclear(res);
921 : : }
922 : :
923 : 4 : disconnect_database(conn, false);
924 : :
925 [ + + ]: 4 : if (strcmp(wal_level, "logical") != 0)
926 : : {
19 927 : 1 : pg_log_error("publisher requires wal_level >= \"logical\"");
20 928 : 1 : failed = true;
929 : : }
930 : :
931 [ + + ]: 4 : if (max_repslots - cur_repslots < num_dbs)
932 : : {
933 : 1 : pg_log_error("publisher requires %d replication slots, but only %d remain",
934 : : num_dbs, max_repslots - cur_repslots);
935 : 1 : pg_log_error_hint("Consider increasing max_replication_slots to at least %d.",
936 : : cur_repslots + num_dbs);
937 : 1 : failed = true;
938 : : }
939 : :
940 [ + + ]: 4 : if (max_walsenders - cur_walsenders < num_dbs)
941 : : {
942 : 1 : pg_log_error("publisher requires %d wal sender processes, but only %d remain",
943 : : num_dbs, max_walsenders - cur_walsenders);
944 : 1 : pg_log_error_hint("Consider increasing max_wal_senders to at least %d.",
945 : : cur_walsenders + num_dbs);
946 : 1 : failed = true;
947 : : }
948 : :
13 tgl@sss.pgh.pa.us 949 : 4 : pg_free(wal_level);
950 : :
20 peter@eisentraut.org 951 [ + + ]: 4 : if (failed)
952 : 1 : exit(1);
953 : 3 : }
954 : :
955 : : /*
956 : : * Is the standby server ready for logical replication?
957 : : *
958 : : * XXX Does it not allow a time-delayed replica?
959 : : *
960 : : * XXX In a cascaded replication scenario (P -> S -> C), if the target server
961 : : * is S, it cannot detect there is a replica (server C) because server S starts
962 : : * accepting only local connections and server C cannot connect to it. Hence,
963 : : * there is not a reliable way to provide a suitable error saying the server C
964 : : * will be broken at the end of this process (due to pg_resetwal).
965 : : */
966 : : static void
967 : 7 : check_subscriber(const struct LogicalRepInfo *dbinfo)
968 : : {
969 : : PGconn *conn;
970 : : PGresult *res;
971 : 7 : bool failed = false;
972 : :
973 : : int max_lrworkers;
974 : : int max_repslots;
975 : : int max_wprocs;
976 : :
977 : 7 : pg_log_info("checking settings on subscriber");
978 : :
979 : 7 : conn = connect_database(dbinfo[0].subconninfo, true);
980 : :
981 : : /* The target server must be a standby */
982 [ + + ]: 7 : if (!server_is_in_recovery(conn))
983 : : {
984 : 1 : pg_log_error("target server must be a standby");
985 : 1 : disconnect_database(conn, true);
986 : : }
987 : :
988 : : /*------------------------------------------------------------------------
989 : : * Logical replication requires a few parameters to be set on subscriber.
990 : : * Since these parameters are not a requirement for physical replication,
991 : : * we should check it to make sure it won't fail.
992 : : *
993 : : * - max_replication_slots >= number of dbs to be converted
994 : : * - max_logical_replication_workers >= number of dbs to be converted
995 : : * - max_worker_processes >= 1 + number of dbs to be converted
996 : : *------------------------------------------------------------------------
997 : : */
998 : 6 : res = PQexec(conn,
999 : : "SELECT setting FROM pg_catalog.pg_settings WHERE name IN ("
1000 : : "'max_logical_replication_workers', "
1001 : : "'max_replication_slots', "
1002 : : "'max_worker_processes', "
1003 : : "'primary_slot_name') "
1004 : : "ORDER BY name");
1005 : :
1006 [ - + ]: 6 : if (PQresultStatus(res) != PGRES_TUPLES_OK)
1007 : : {
20 peter@eisentraut.org 1008 :UNC 0 : pg_log_error("could not obtain subscriber settings: %s",
1009 : : PQresultErrorMessage(res));
1010 : 0 : disconnect_database(conn, true);
1011 : : }
1012 : :
20 peter@eisentraut.org 1013 :GNC 6 : max_lrworkers = atoi(PQgetvalue(res, 0, 0));
1014 : 6 : max_repslots = atoi(PQgetvalue(res, 1, 0));
1015 : 6 : max_wprocs = atoi(PQgetvalue(res, 2, 0));
1016 [ + + ]: 6 : if (strcmp(PQgetvalue(res, 3, 0), "") != 0)
1017 : 5 : primary_slot_name = pg_strdup(PQgetvalue(res, 3, 0));
1018 : :
1019 [ + + ]: 6 : pg_log_debug("subscriber: max_logical_replication_workers: %d",
1020 : : max_lrworkers);
1021 [ + + ]: 6 : pg_log_debug("subscriber: max_replication_slots: %d", max_repslots);
1022 [ + + ]: 6 : pg_log_debug("subscriber: max_worker_processes: %d", max_wprocs);
1023 [ + + ]: 6 : if (primary_slot_name)
1024 [ + + ]: 5 : pg_log_debug("subscriber: primary_slot_name: %s", primary_slot_name);
1025 : :
1026 : 6 : PQclear(res);
1027 : :
1028 : 6 : disconnect_database(conn, false);
1029 : :
1030 [ + + ]: 6 : if (max_repslots < num_dbs)
1031 : : {
1032 : 1 : pg_log_error("subscriber requires %d replication slots, but only %d remain",
1033 : : num_dbs, max_repslots);
1034 : 1 : pg_log_error_hint("Consider increasing max_replication_slots to at least %d.",
1035 : : num_dbs);
1036 : 1 : failed = true;
1037 : : }
1038 : :
1039 [ + + ]: 6 : if (max_lrworkers < num_dbs)
1040 : : {
1041 : 1 : pg_log_error("subscriber requires %d logical replication workers, but only %d remain",
1042 : : num_dbs, max_lrworkers);
1043 : 1 : pg_log_error_hint("Consider increasing max_logical_replication_workers to at least %d.",
1044 : : num_dbs);
1045 : 1 : failed = true;
1046 : : }
1047 : :
1048 [ + + ]: 6 : if (max_wprocs < num_dbs + 1)
1049 : : {
1050 : 1 : pg_log_error("subscriber requires %d worker processes, but only %d remain",
1051 : : num_dbs + 1, max_wprocs);
1052 : 1 : pg_log_error_hint("Consider increasing max_worker_processes to at least %d.",
1053 : : num_dbs + 1);
1054 : 1 : failed = true;
1055 : : }
1056 : :
1057 [ + + ]: 6 : if (failed)
1058 : 1 : exit(1);
1059 : 5 : }
1060 : :
1061 : : /*
1062 : : * Create the subscriptions, adjust the initial location for logical
1063 : : * replication and enable the subscriptions. That's the last step for logical
1064 : : * replication setup.
1065 : : */
1066 : : static void
1067 : 3 : setup_subscriber(struct LogicalRepInfo *dbinfo, const char *consistent_lsn)
1068 : : {
1069 [ + + ]: 8 : for (int i = 0; i < num_dbs; i++)
1070 : : {
1071 : : PGconn *conn;
1072 : :
1073 : : /* Connect to subscriber. */
1074 : 5 : conn = connect_database(dbinfo[i].subconninfo, true);
1075 : :
1076 : : /*
1077 : : * Since the publication was created before the consistent LSN, it is
1078 : : * available on the subscriber when the physical replica is promoted.
1079 : : * Remove publications from the subscriber because it has no use.
1080 : : */
1081 : 5 : drop_publication(conn, &dbinfo[i]);
1082 : :
1083 : 5 : create_subscription(conn, &dbinfo[i]);
1084 : :
1085 : : /* Set the replication progress to the correct LSN */
1086 : 5 : set_replication_progress(conn, &dbinfo[i], consistent_lsn);
1087 : :
1088 : : /* Enable subscription */
1089 : 5 : enable_subscription(conn, &dbinfo[i]);
1090 : :
1091 : 5 : disconnect_database(conn, false);
1092 : : }
1093 : 3 : }
1094 : :
1095 : : /*
1096 : : * Write the required recovery parameters.
1097 : : */
1098 : : static void
1099 : 3 : setup_recovery(const struct LogicalRepInfo *dbinfo, const char *datadir, const char *lsn)
1100 : : {
1101 : : PGconn *conn;
1102 : : PQExpBuffer recoveryconfcontents;
1103 : :
1104 : : /*
1105 : : * Despite of the recovery parameters will be written to the subscriber,
1106 : : * use a publisher connection. The primary_conninfo is generated using the
1107 : : * connection settings.
1108 : : */
1109 : 3 : conn = connect_database(dbinfo[0].pubconninfo, true);
1110 : :
1111 : : /*
1112 : : * Write recovery parameters.
1113 : : *
1114 : : * The subscriber is not running yet. In dry run mode, the recovery
1115 : : * parameters *won't* be written. An invalid LSN is used for printing
1116 : : * purposes. Additional recovery parameters are added here. It avoids
1117 : : * unexpected behavior such as end of recovery as soon as a consistent
1118 : : * state is reached (recovery_target) and failure due to multiple recovery
1119 : : * targets (name, time, xid, LSN).
1120 : : */
1121 : 3 : recoveryconfcontents = GenerateRecoveryConfig(conn, NULL, NULL);
1122 : 3 : appendPQExpBuffer(recoveryconfcontents, "recovery_target = ''\n");
1123 : 3 : appendPQExpBuffer(recoveryconfcontents,
1124 : : "recovery_target_timeline = 'latest'\n");
1125 : 3 : appendPQExpBuffer(recoveryconfcontents,
1126 : : "recovery_target_inclusive = true\n");
1127 : 3 : appendPQExpBuffer(recoveryconfcontents,
1128 : : "recovery_target_action = promote\n");
1129 : 3 : appendPQExpBuffer(recoveryconfcontents, "recovery_target_name = ''\n");
1130 : 3 : appendPQExpBuffer(recoveryconfcontents, "recovery_target_time = ''\n");
1131 : 3 : appendPQExpBuffer(recoveryconfcontents, "recovery_target_xid = ''\n");
1132 : :
1133 [ + + ]: 3 : if (dry_run)
1134 : : {
1135 : 2 : appendPQExpBuffer(recoveryconfcontents, "# dry run mode");
1136 : 2 : appendPQExpBuffer(recoveryconfcontents,
1137 : : "recovery_target_lsn = '%X/%X'\n",
1138 : 2 : LSN_FORMAT_ARGS((XLogRecPtr) InvalidXLogRecPtr));
1139 : : }
1140 : : else
1141 : : {
1142 : 1 : appendPQExpBuffer(recoveryconfcontents, "recovery_target_lsn = '%s'\n",
1143 : : lsn);
1144 : 1 : WriteRecoveryConfig(conn, datadir, recoveryconfcontents);
1145 : : }
1146 : 3 : disconnect_database(conn, false);
1147 : :
1148 [ + + ]: 3 : pg_log_debug("recovery parameters:\n%s", recoveryconfcontents->data);
1149 : 3 : }
1150 : :
1151 : : /*
1152 : : * Drop physical replication slot on primary if the standby was using it. After
1153 : : * the transformation, it has no use.
1154 : : *
1155 : : * XXX we might not fail here. Instead, we provide a warning so the user
1156 : : * eventually drops this replication slot later.
1157 : : */
1158 : : static void
1159 : 3 : drop_primary_replication_slot(struct LogicalRepInfo *dbinfo, const char *slotname)
1160 : : {
1161 : : PGconn *conn;
1162 : :
1163 : : /* Replication slot does not exist, do nothing */
1164 [ - + ]: 3 : if (!primary_slot_name)
20 peter@eisentraut.org 1165 :UNC 0 : return;
1166 : :
20 peter@eisentraut.org 1167 :GNC 3 : conn = connect_database(dbinfo[0].pubconninfo, false);
1168 [ + - ]: 3 : if (conn != NULL)
1169 : : {
1170 : 3 : drop_replication_slot(conn, &dbinfo[0], slotname);
1171 : 3 : disconnect_database(conn, false);
1172 : : }
1173 : : else
1174 : : {
20 peter@eisentraut.org 1175 :UNC 0 : pg_log_warning("could not drop replication slot \"%s\" on primary",
1176 : : slotname);
1177 : 0 : pg_log_warning_hint("Drop this replication slot soon to avoid retention of WAL files.");
1178 : : }
1179 : : }
1180 : :
1181 : : /*
1182 : : * Create a logical replication slot and returns a LSN.
1183 : : *
1184 : : * CreateReplicationSlot() is not used because it does not provide the one-row
1185 : : * result set that contains the LSN.
1186 : : */
1187 : : static char *
20 peter@eisentraut.org 1188 :GNC 5 : create_logical_replication_slot(PGconn *conn, struct LogicalRepInfo *dbinfo)
1189 : : {
1190 : 5 : PQExpBuffer str = createPQExpBuffer();
1191 : 5 : PGresult *res = NULL;
1192 : 5 : const char *slot_name = dbinfo->replslotname;
1193 : : char *slot_name_esc;
1194 : 5 : char *lsn = NULL;
1195 : :
1196 [ - + ]: 5 : Assert(conn != NULL);
1197 : :
1198 : 5 : pg_log_info("creating the replication slot \"%s\" on database \"%s\"",
1199 : : slot_name, dbinfo->dbname);
1200 : :
1201 : 5 : slot_name_esc = PQescapeLiteral(conn, slot_name, strlen(slot_name));
1202 : :
1203 : 5 : appendPQExpBuffer(str,
1204 : : "SELECT lsn FROM pg_catalog.pg_create_logical_replication_slot(%s, 'pgoutput', false, false, false)",
1205 : : slot_name_esc);
1206 : :
1207 : 5 : pg_free(slot_name_esc);
1208 : :
1209 [ + + ]: 5 : pg_log_debug("command is: %s", str->data);
1210 : :
1211 [ + + ]: 5 : if (!dry_run)
1212 : : {
1213 : 2 : res = PQexec(conn, str->data);
1214 [ - + ]: 2 : if (PQresultStatus(res) != PGRES_TUPLES_OK)
1215 : : {
20 peter@eisentraut.org 1216 :UNC 0 : pg_log_error("could not create replication slot \"%s\" on database \"%s\": %s",
1217 : : slot_name, dbinfo->dbname,
1218 : : PQresultErrorMessage(res));
13 tgl@sss.pgh.pa.us 1219 : 0 : PQclear(res);
1220 : 0 : destroyPQExpBuffer(str);
20 peter@eisentraut.org 1221 : 0 : return NULL;
1222 : : }
1223 : :
20 peter@eisentraut.org 1224 :GNC 2 : lsn = pg_strdup(PQgetvalue(res, 0, 0));
1225 : 2 : PQclear(res);
1226 : : }
1227 : :
1228 : : /* For cleanup purposes */
1229 : 5 : dbinfo->made_replslot = true;
1230 : :
1231 : 5 : destroyPQExpBuffer(str);
1232 : :
1233 : 5 : return lsn;
1234 : : }
1235 : :
1236 : : static void
1237 : 3 : drop_replication_slot(PGconn *conn, struct LogicalRepInfo *dbinfo,
1238 : : const char *slot_name)
1239 : : {
1240 : 3 : PQExpBuffer str = createPQExpBuffer();
1241 : : char *slot_name_esc;
1242 : : PGresult *res;
1243 : :
1244 [ - + ]: 3 : Assert(conn != NULL);
1245 : :
1246 : 3 : pg_log_info("dropping the replication slot \"%s\" on database \"%s\"",
1247 : : slot_name, dbinfo->dbname);
1248 : :
1249 : 3 : slot_name_esc = PQescapeLiteral(conn, slot_name, strlen(slot_name));
1250 : :
1251 : 3 : appendPQExpBuffer(str, "SELECT pg_catalog.pg_drop_replication_slot(%s)", slot_name_esc);
1252 : :
1253 : 3 : pg_free(slot_name_esc);
1254 : :
1255 [ + + ]: 3 : pg_log_debug("command is: %s", str->data);
1256 : :
1257 [ + + ]: 3 : if (!dry_run)
1258 : : {
1259 : 1 : res = PQexec(conn, str->data);
1260 [ - + ]: 1 : if (PQresultStatus(res) != PGRES_TUPLES_OK)
1261 : : {
20 peter@eisentraut.org 1262 :UNC 0 : pg_log_error("could not drop replication slot \"%s\" on database \"%s\": %s",
1263 : : slot_name, dbinfo->dbname, PQresultErrorMessage(res));
1264 : 0 : dbinfo->made_replslot = false; /* don't try again. */
1265 : : }
1266 : :
20 peter@eisentraut.org 1267 :GNC 1 : PQclear(res);
1268 : : }
1269 : :
1270 : 3 : destroyPQExpBuffer(str);
1271 : 3 : }
1272 : :
1273 : : /*
1274 : : * Reports a suitable message if pg_ctl fails.
1275 : : */
1276 : : static void
1277 : 20 : pg_ctl_status(const char *pg_ctl_cmd, int rc)
1278 : : {
1279 [ - + ]: 20 : if (rc != 0)
1280 : : {
20 peter@eisentraut.org 1281 [ # # ]:UNC 0 : if (WIFEXITED(rc))
1282 : : {
1283 : 0 : pg_log_error("pg_ctl failed with exit code %d", WEXITSTATUS(rc));
1284 : : }
1285 [ # # ]: 0 : else if (WIFSIGNALED(rc))
1286 : : {
1287 : : #if defined(WIN32)
1288 : : pg_log_error("pg_ctl was terminated by exception 0x%X",
1289 : : WTERMSIG(rc));
1290 : : pg_log_error_detail("See C include file \"ntstatus.h\" for a description of the hexadecimal value.");
1291 : : #else
1292 : 0 : pg_log_error("pg_ctl was terminated by signal %d: %s",
1293 : : WTERMSIG(rc), pg_strsignal(WTERMSIG(rc)));
1294 : : #endif
1295 : : }
1296 : : else
1297 : : {
1298 : 0 : pg_log_error("pg_ctl exited with unrecognized status %d", rc);
1299 : : }
1300 : :
1301 : 0 : pg_log_error_detail("The failed command was: %s", pg_ctl_cmd);
1302 : 0 : exit(1);
1303 : : }
20 peter@eisentraut.org 1304 :GNC 20 : }
1305 : :
1306 : : static void
1307 : 10 : start_standby_server(const struct CreateSubscriberOptions *opt, bool restricted_access)
1308 : : {
1309 : 10 : PQExpBuffer pg_ctl_cmd = createPQExpBuffer();
1310 : : int rc;
1311 : :
1312 : 10 : appendPQExpBuffer(pg_ctl_cmd, "\"%s\" start -D \"%s\" -s",
1313 : : pg_ctl_path, subscriber_dir);
1314 [ + - ]: 10 : if (restricted_access)
1315 : : {
1316 : 10 : appendPQExpBuffer(pg_ctl_cmd, " -o \"-p %s\"", opt->sub_port);
1317 : : #if !defined(WIN32)
1318 : :
1319 : : /*
1320 : : * An empty listen_addresses list means the server does not listen on
1321 : : * any IP interfaces; only Unix-domain sockets can be used to connect
1322 : : * to the server. Prevent external connections to minimize the chance
1323 : : * of failure.
1324 : : */
1325 : 10 : appendPQExpBufferStr(pg_ctl_cmd, " -o \"-c listen_addresses='' -c unix_socket_permissions=0700");
1326 [ + - ]: 10 : if (opt->socket_dir)
1327 : 10 : appendPQExpBuffer(pg_ctl_cmd, " -c unix_socket_directories='%s'",
1328 : 10 : opt->socket_dir);
1329 : 10 : appendPQExpBufferChar(pg_ctl_cmd, '"');
1330 : : #endif
1331 : : }
1332 [ - + ]: 10 : if (opt->config_file != NULL)
20 peter@eisentraut.org 1333 :UNC 0 : appendPQExpBuffer(pg_ctl_cmd, " -o \"-c config_file=%s\"",
1334 : 0 : opt->config_file);
20 peter@eisentraut.org 1335 [ + + ]:GNC 10 : pg_log_debug("pg_ctl command is: %s", pg_ctl_cmd->data);
1336 : 10 : rc = system(pg_ctl_cmd->data);
1337 : 10 : pg_ctl_status(pg_ctl_cmd->data, rc);
1338 : 10 : standby_running = true;
1339 : 10 : destroyPQExpBuffer(pg_ctl_cmd);
1340 : 10 : pg_log_info("server was started");
1341 : 10 : }
1342 : :
1343 : : static void
1344 : 10 : stop_standby_server(const char *datadir)
1345 : : {
1346 : : char *pg_ctl_cmd;
1347 : : int rc;
1348 : :
1349 : 10 : pg_ctl_cmd = psprintf("\"%s\" stop -D \"%s\" -s", pg_ctl_path,
1350 : : datadir);
1351 [ + + ]: 10 : pg_log_debug("pg_ctl command is: %s", pg_ctl_cmd);
1352 : 10 : rc = system(pg_ctl_cmd);
1353 : 10 : pg_ctl_status(pg_ctl_cmd, rc);
1354 : 10 : standby_running = false;
1355 : 10 : pg_log_info("server was stopped");
1356 : 10 : }
1357 : :
1358 : : /*
1359 : : * Returns after the server finishes the recovery process.
1360 : : *
1361 : : * If recovery_timeout option is set, terminate abnormally without finishing
1362 : : * the recovery process. By default, it waits forever.
1363 : : */
1364 : : static void
1365 : 3 : wait_for_end_recovery(const char *conninfo, const struct CreateSubscriberOptions *opt)
1366 : : {
1367 : : PGconn *conn;
1368 : 3 : int status = POSTMASTER_STILL_STARTING;
1369 : 3 : int timer = 0;
1370 : 3 : int count = 0; /* number of consecutive connection attempts */
1371 : :
1372 : : #define NUM_CONN_ATTEMPTS 10
1373 : :
1374 : 3 : pg_log_info("waiting for the target server to reach the consistent state");
1375 : :
1376 : 3 : conn = connect_database(conninfo, true);
1377 : :
1378 : : for (;;)
1379 : 12 : {
1380 : : PGresult *res;
1381 : 15 : bool in_recovery = server_is_in_recovery(conn);
1382 : :
1383 : : /*
1384 : : * Does the recovery process finish? In dry run mode, there is no
1385 : : * recovery mode. Bail out as the recovery process has ended.
1386 : : */
1387 [ + + + + ]: 15 : if (!in_recovery || dry_run)
1388 : : {
1389 : 3 : status = POSTMASTER_READY;
1390 : 3 : recovery_ended = true;
1391 : 3 : break;
1392 : : }
1393 : :
1394 : : /*
1395 : : * If it is still in recovery, make sure the target server is
1396 : : * connected to the primary so it can receive the required WAL to
1397 : : * finish the recovery process. If it is disconnected try
1398 : : * NUM_CONN_ATTEMPTS in a row and bail out if not succeed.
1399 : : */
1400 : 12 : res = PQexec(conn,
1401 : : "SELECT 1 FROM pg_catalog.pg_stat_wal_receiver");
1402 [ - + ]: 12 : if (PQntuples(res) == 0)
1403 : : {
20 peter@eisentraut.org 1404 [ # # ]:UNC 0 : if (++count > NUM_CONN_ATTEMPTS)
1405 : : {
1406 : 0 : stop_standby_server(subscriber_dir);
1407 : 0 : pg_log_error("standby server disconnected from the primary");
1408 : 0 : break;
1409 : : }
1410 : : }
1411 : : else
20 peter@eisentraut.org 1412 :GNC 12 : count = 0; /* reset counter if it connects again */
1413 : :
1414 : 12 : PQclear(res);
1415 : :
1416 : : /* Bail out after recovery_timeout seconds if this option is set */
1417 [ - + - - ]: 12 : if (opt->recovery_timeout > 0 && timer >= opt->recovery_timeout)
1418 : : {
20 peter@eisentraut.org 1419 :UNC 0 : stop_standby_server(subscriber_dir);
1420 : 0 : pg_log_error("recovery timed out");
1421 : 0 : disconnect_database(conn, true);
1422 : : }
1423 : :
1424 : : /* Keep waiting */
20 peter@eisentraut.org 1425 :GNC 12 : pg_usleep(WAIT_INTERVAL * USEC_PER_SEC);
1426 : :
1427 : 12 : timer += WAIT_INTERVAL;
1428 : : }
1429 : :
1430 : 3 : disconnect_database(conn, false);
1431 : :
1432 [ - + ]: 3 : if (status == POSTMASTER_STILL_STARTING)
20 peter@eisentraut.org 1433 :UNC 0 : pg_fatal("server did not end recovery");
1434 : :
20 peter@eisentraut.org 1435 :GNC 3 : pg_log_info("target server reached the consistent state");
1436 : 3 : pg_log_info_hint("If pg_createsubscriber fails after this point, you must recreate the physical replica before continuing.");
1437 : 3 : }
1438 : :
1439 : : /*
1440 : : * Create a publication that includes all tables in the database.
1441 : : */
1442 : : static void
1443 : 5 : create_publication(PGconn *conn, struct LogicalRepInfo *dbinfo)
1444 : : {
1445 : 5 : PQExpBuffer str = createPQExpBuffer();
1446 : : PGresult *res;
1447 : : char *ipubname_esc;
1448 : : char *spubname_esc;
1449 : :
1450 [ - + ]: 5 : Assert(conn != NULL);
1451 : :
1452 : 5 : ipubname_esc = PQescapeIdentifier(conn, dbinfo->pubname, strlen(dbinfo->pubname));
1453 : 5 : spubname_esc = PQescapeLiteral(conn, dbinfo->pubname, strlen(dbinfo->pubname));
1454 : :
1455 : : /* Check if the publication already exists */
1456 : 5 : appendPQExpBuffer(str,
1457 : : "SELECT 1 FROM pg_catalog.pg_publication "
1458 : : "WHERE pubname = %s",
1459 : : spubname_esc);
1460 : 5 : res = PQexec(conn, str->data);
1461 [ - + ]: 5 : if (PQresultStatus(res) != PGRES_TUPLES_OK)
1462 : : {
20 peter@eisentraut.org 1463 :UNC 0 : pg_log_error("could not obtain publication information: %s",
1464 : : PQresultErrorMessage(res));
1465 : 0 : disconnect_database(conn, true);
1466 : : }
1467 : :
20 peter@eisentraut.org 1468 [ - + ]:GNC 5 : if (PQntuples(res) == 1)
1469 : : {
1470 : : /*
1471 : : * Unfortunately, if it reaches this code path, it will always fail
1472 : : * (unless you decide to change the existing publication name). That's
1473 : : * bad but it is very unlikely that the user will choose a name with
1474 : : * pg_createsubscriber_ prefix followed by the exact database oid and
1475 : : * a random number.
1476 : : */
20 peter@eisentraut.org 1477 :UNC 0 : pg_log_error("publication \"%s\" already exists", dbinfo->pubname);
1478 : 0 : pg_log_error_hint("Consider renaming this publication before continuing.");
1479 : 0 : disconnect_database(conn, true);
1480 : : }
1481 : :
20 peter@eisentraut.org 1482 :GNC 5 : PQclear(res);
1483 : 5 : resetPQExpBuffer(str);
1484 : :
1485 : 5 : pg_log_info("creating publication \"%s\" on database \"%s\"",
1486 : : dbinfo->pubname, dbinfo->dbname);
1487 : :
1488 : 5 : appendPQExpBuffer(str, "CREATE PUBLICATION %s FOR ALL TABLES",
1489 : : ipubname_esc);
1490 : :
1491 [ + + ]: 5 : pg_log_debug("command is: %s", str->data);
1492 : :
1493 [ + + ]: 5 : if (!dry_run)
1494 : : {
1495 : 2 : res = PQexec(conn, str->data);
1496 [ - + ]: 2 : if (PQresultStatus(res) != PGRES_COMMAND_OK)
1497 : : {
20 peter@eisentraut.org 1498 :UNC 0 : pg_log_error("could not create publication \"%s\" on database \"%s\": %s",
1499 : : dbinfo->pubname, dbinfo->dbname, PQresultErrorMessage(res));
1500 : 0 : disconnect_database(conn, true);
1501 : : }
20 peter@eisentraut.org 1502 :GNC 2 : PQclear(res);
1503 : : }
1504 : :
1505 : : /* For cleanup purposes */
1506 : 5 : dbinfo->made_publication = true;
1507 : :
1508 : 5 : pg_free(ipubname_esc);
1509 : 5 : pg_free(spubname_esc);
1510 : 5 : destroyPQExpBuffer(str);
1511 : 5 : }
1512 : :
1513 : : /*
1514 : : * Remove publication if it couldn't finish all steps.
1515 : : */
1516 : : static void
1517 : 5 : drop_publication(PGconn *conn, struct LogicalRepInfo *dbinfo)
1518 : : {
1519 : 5 : PQExpBuffer str = createPQExpBuffer();
1520 : : PGresult *res;
1521 : : char *pubname_esc;
1522 : :
1523 [ - + ]: 5 : Assert(conn != NULL);
1524 : :
1525 : 5 : pubname_esc = PQescapeIdentifier(conn, dbinfo->pubname, strlen(dbinfo->pubname));
1526 : :
1527 : 5 : pg_log_info("dropping publication \"%s\" on database \"%s\"",
1528 : : dbinfo->pubname, dbinfo->dbname);
1529 : :
1530 : 5 : appendPQExpBuffer(str, "DROP PUBLICATION %s", pubname_esc);
1531 : :
1532 : 5 : pg_free(pubname_esc);
1533 : :
1534 [ + + ]: 5 : pg_log_debug("command is: %s", str->data);
1535 : :
1536 [ + + ]: 5 : if (!dry_run)
1537 : : {
1538 : 2 : res = PQexec(conn, str->data);
1539 [ - + ]: 2 : if (PQresultStatus(res) != PGRES_COMMAND_OK)
1540 : : {
20 peter@eisentraut.org 1541 :UNC 0 : pg_log_error("could not drop publication \"%s\" on database \"%s\": %s",
1542 : : dbinfo->pubname, dbinfo->dbname, PQresultErrorMessage(res));
1543 : 0 : dbinfo->made_publication = false; /* don't try again. */
1544 : :
1545 : : /*
1546 : : * Don't disconnect and exit here. This routine is used by primary
1547 : : * (cleanup publication / replication slot due to an error) and
1548 : : * subscriber (remove the replicated publications). In both cases,
1549 : : * it can continue and provide instructions for the user to remove
1550 : : * it later if cleanup fails.
1551 : : */
1552 : : }
20 peter@eisentraut.org 1553 :GNC 2 : PQclear(res);
1554 : : }
1555 : :
1556 : 5 : destroyPQExpBuffer(str);
1557 : 5 : }
1558 : :
1559 : : /*
1560 : : * Create a subscription with some predefined options.
1561 : : *
1562 : : * A replication slot was already created in a previous step. Let's use it. It
1563 : : * is not required to copy data. The subscription will be created but it will
1564 : : * not be enabled now. That's because the replication progress must be set and
1565 : : * the replication origin name (one of the function arguments) contains the
1566 : : * subscription OID in its name. Once the subscription is created,
1567 : : * set_replication_progress() can obtain the chosen origin name and set up its
1568 : : * initial location.
1569 : : */
1570 : : static void
1571 : 5 : create_subscription(PGconn *conn, const struct LogicalRepInfo *dbinfo)
1572 : : {
1573 : 5 : PQExpBuffer str = createPQExpBuffer();
1574 : : PGresult *res;
1575 : : char *pubname_esc;
1576 : : char *subname_esc;
1577 : : char *pubconninfo_esc;
1578 : : char *replslotname_esc;
1579 : :
1580 [ - + ]: 5 : Assert(conn != NULL);
1581 : :
1582 : 5 : pubname_esc = PQescapeIdentifier(conn, dbinfo->pubname, strlen(dbinfo->pubname));
1583 : 5 : subname_esc = PQescapeIdentifier(conn, dbinfo->subname, strlen(dbinfo->subname));
1584 : 5 : pubconninfo_esc = PQescapeLiteral(conn, dbinfo->pubconninfo, strlen(dbinfo->pubconninfo));
1585 : 5 : replslotname_esc = PQescapeLiteral(conn, dbinfo->replslotname, strlen(dbinfo->replslotname));
1586 : :
1587 : 5 : pg_log_info("creating subscription \"%s\" on database \"%s\"",
1588 : : dbinfo->subname, dbinfo->dbname);
1589 : :
1590 : 5 : appendPQExpBuffer(str,
1591 : : "CREATE SUBSCRIPTION %s CONNECTION %s PUBLICATION %s "
1592 : : "WITH (create_slot = false, enabled = false, "
1593 : : "slot_name = %s, copy_data = false)",
1594 : : subname_esc, pubconninfo_esc, pubname_esc, replslotname_esc);
1595 : :
1596 : 5 : pg_free(pubname_esc);
1597 : 5 : pg_free(subname_esc);
1598 : 5 : pg_free(pubconninfo_esc);
1599 : 5 : pg_free(replslotname_esc);
1600 : :
1601 [ + + ]: 5 : pg_log_debug("command is: %s", str->data);
1602 : :
1603 [ + + ]: 5 : if (!dry_run)
1604 : : {
1605 : 2 : res = PQexec(conn, str->data);
1606 [ - + ]: 2 : if (PQresultStatus(res) != PGRES_COMMAND_OK)
1607 : : {
20 peter@eisentraut.org 1608 :UNC 0 : pg_log_error("could not create subscription \"%s\" on database \"%s\": %s",
1609 : : dbinfo->subname, dbinfo->dbname, PQresultErrorMessage(res));
1610 : 0 : disconnect_database(conn, true);
1611 : : }
20 peter@eisentraut.org 1612 :GNC 2 : PQclear(res);
1613 : : }
1614 : :
1615 : 5 : destroyPQExpBuffer(str);
1616 : 5 : }
1617 : :
1618 : : /*
1619 : : * Sets the replication progress to the consistent LSN.
1620 : : *
1621 : : * The subscriber caught up to the consistent LSN provided by the last
1622 : : * replication slot that was created. The goal is to set up the initial
1623 : : * location for the logical replication that is the exact LSN that the
1624 : : * subscriber was promoted. Once the subscription is enabled it will start
1625 : : * streaming from that location onwards. In dry run mode, the subscription OID
1626 : : * and LSN are set to invalid values for printing purposes.
1627 : : */
1628 : : static void
1629 : 5 : set_replication_progress(PGconn *conn, const struct LogicalRepInfo *dbinfo, const char *lsn)
1630 : : {
1631 : 5 : PQExpBuffer str = createPQExpBuffer();
1632 : : PGresult *res;
1633 : : Oid suboid;
1634 : : char *subname;
1635 : : char *dbname;
1636 : : char *originname;
1637 : : char *lsnstr;
1638 : :
1639 [ - + ]: 5 : Assert(conn != NULL);
1640 : :
1641 : 5 : subname = PQescapeLiteral(conn, dbinfo->subname, strlen(dbinfo->subname));
1642 : 5 : dbname = PQescapeLiteral(conn, dbinfo->dbname, strlen(dbinfo->dbname));
1643 : :
1644 : 5 : appendPQExpBuffer(str,
1645 : : "SELECT s.oid FROM pg_catalog.pg_subscription s "
1646 : : "INNER JOIN pg_catalog.pg_database d ON (s.subdbid = d.oid) "
1647 : : "WHERE s.subname = %s AND d.datname = %s",
1648 : : subname, dbname);
1649 : :
1650 : 5 : res = PQexec(conn, str->data);
1651 [ - + ]: 5 : if (PQresultStatus(res) != PGRES_TUPLES_OK)
1652 : : {
20 peter@eisentraut.org 1653 :UNC 0 : pg_log_error("could not obtain subscription OID: %s",
1654 : : PQresultErrorMessage(res));
1655 : 0 : disconnect_database(conn, true);
1656 : : }
1657 : :
20 peter@eisentraut.org 1658 [ + + - + ]:GNC 5 : if (PQntuples(res) != 1 && !dry_run)
1659 : : {
19 peter@eisentraut.org 1660 :UNC 0 : pg_log_error("could not obtain subscription OID: got %d rows, expected %d row",
1661 : : PQntuples(res), 1);
20 1662 : 0 : disconnect_database(conn, true);
1663 : : }
1664 : :
20 peter@eisentraut.org 1665 [ + + ]:GNC 5 : if (dry_run)
1666 : : {
1667 : 3 : suboid = InvalidOid;
1668 : 3 : lsnstr = psprintf("%X/%X", LSN_FORMAT_ARGS((XLogRecPtr) InvalidXLogRecPtr));
1669 : : }
1670 : : else
1671 : : {
1672 : 2 : suboid = strtoul(PQgetvalue(res, 0, 0), NULL, 10);
1673 : 2 : lsnstr = psprintf("%s", lsn);
1674 : : }
1675 : :
1676 : 5 : PQclear(res);
1677 : :
1678 : : /*
1679 : : * The origin name is defined as pg_%u. %u is the subscription OID. See
1680 : : * ApplyWorkerMain().
1681 : : */
1682 : 5 : originname = psprintf("pg_%u", suboid);
1683 : :
1684 : 5 : pg_log_info("setting the replication progress (node name \"%s\" ; LSN %s) on database \"%s\"",
1685 : : originname, lsnstr, dbinfo->dbname);
1686 : :
1687 : 5 : resetPQExpBuffer(str);
1688 : 5 : appendPQExpBuffer(str,
1689 : : "SELECT pg_catalog.pg_replication_origin_advance('%s', '%s')",
1690 : : originname, lsnstr);
1691 : :
1692 [ + + ]: 5 : pg_log_debug("command is: %s", str->data);
1693 : :
1694 [ + + ]: 5 : if (!dry_run)
1695 : : {
1696 : 2 : res = PQexec(conn, str->data);
1697 [ - + ]: 2 : if (PQresultStatus(res) != PGRES_TUPLES_OK)
1698 : : {
20 peter@eisentraut.org 1699 :UNC 0 : pg_log_error("could not set replication progress for the subscription \"%s\": %s",
1700 : : dbinfo->subname, PQresultErrorMessage(res));
1701 : 0 : disconnect_database(conn, true);
1702 : : }
20 peter@eisentraut.org 1703 :GNC 2 : PQclear(res);
1704 : : }
1705 : :
1706 : 5 : pg_free(subname);
1707 : 5 : pg_free(dbname);
1708 : 5 : pg_free(originname);
1709 : 5 : pg_free(lsnstr);
1710 : 5 : destroyPQExpBuffer(str);
1711 : 5 : }
1712 : :
1713 : : /*
1714 : : * Enables the subscription.
1715 : : *
1716 : : * The subscription was created in a previous step but it was disabled. After
1717 : : * adjusting the initial logical replication location, enable the subscription.
1718 : : */
1719 : : static void
1720 : 5 : enable_subscription(PGconn *conn, const struct LogicalRepInfo *dbinfo)
1721 : : {
1722 : 5 : PQExpBuffer str = createPQExpBuffer();
1723 : : PGresult *res;
1724 : : char *subname;
1725 : :
1726 [ - + ]: 5 : Assert(conn != NULL);
1727 : :
1728 : 5 : subname = PQescapeIdentifier(conn, dbinfo->subname, strlen(dbinfo->subname));
1729 : :
1730 : 5 : pg_log_info("enabling subscription \"%s\" on database \"%s\"",
1731 : : dbinfo->subname, dbinfo->dbname);
1732 : :
1733 : 5 : appendPQExpBuffer(str, "ALTER SUBSCRIPTION %s ENABLE", subname);
1734 : :
1735 [ + + ]: 5 : pg_log_debug("command is: %s", str->data);
1736 : :
1737 [ + + ]: 5 : if (!dry_run)
1738 : : {
1739 : 2 : res = PQexec(conn, str->data);
1740 [ - + ]: 2 : if (PQresultStatus(res) != PGRES_COMMAND_OK)
1741 : : {
20 peter@eisentraut.org 1742 :UNC 0 : pg_log_error("could not enable subscription \"%s\": %s",
1743 : : dbinfo->subname, PQresultErrorMessage(res));
1744 : 0 : disconnect_database(conn, true);
1745 : : }
1746 : :
20 peter@eisentraut.org 1747 :GNC 2 : PQclear(res);
1748 : : }
1749 : :
1750 : 5 : pg_free(subname);
1751 : 5 : destroyPQExpBuffer(str);
1752 : 5 : }
1753 : :
1754 : : int
1755 : 20 : main(int argc, char **argv)
1756 : : {
1757 : : static struct option long_options[] =
1758 : : {
1759 : : {"database", required_argument, NULL, 'd'},
1760 : : {"pgdata", required_argument, NULL, 'D'},
1761 : : {"dry-run", no_argument, NULL, 'n'},
1762 : : {"subscriber-port", required_argument, NULL, 'p'},
1763 : : {"publisher-server", required_argument, NULL, 'P'},
1764 : : {"socket-directory", required_argument, NULL, 's'},
1765 : : {"recovery-timeout", required_argument, NULL, 't'},
1766 : : {"subscriber-username", required_argument, NULL, 'U'},
1767 : : {"verbose", no_argument, NULL, 'v'},
1768 : : {"version", no_argument, NULL, 'V'},
1769 : : {"help", no_argument, NULL, '?'},
1770 : : {"config-file", required_argument, NULL, 1},
1771 : : {"publication", required_argument, NULL, 2},
1772 : : {"replication-slot", required_argument, NULL, 3},
1773 : : {"subscription", required_argument, NULL, 4},
1774 : : {NULL, 0, NULL, 0}
1775 : : };
1776 : :
1777 : 20 : struct CreateSubscriberOptions opt = {0};
1778 : :
1779 : : int c;
1780 : : int option_index;
1781 : :
1782 : : char *pub_base_conninfo;
1783 : : char *sub_base_conninfo;
1784 : 20 : char *dbname_conninfo = NULL;
1785 : :
1786 : : uint64 pub_sysid;
1787 : : uint64 sub_sysid;
1788 : : struct stat statbuf;
1789 : :
1790 : : char *consistent_lsn;
1791 : :
1792 : : char pidfile[MAXPGPATH];
1793 : :
1794 : 20 : pg_logging_init(argv[0]);
1795 : 20 : pg_logging_set_level(PG_LOG_WARNING);
1796 : 20 : progname = get_progname(argv[0]);
1797 : 20 : set_pglocale_pgservice(argv[0], PG_TEXTDOMAIN("pg_createsubscriber"));
1798 : :
1799 [ + + ]: 20 : if (argc > 1)
1800 : : {
1801 [ + + - + ]: 19 : if (strcmp(argv[1], "--help") == 0 || strcmp(argv[1], "-?") == 0)
1802 : : {
1803 : 1 : usage();
1804 : 1 : exit(0);
1805 : : }
1806 [ + - ]: 18 : else if (strcmp(argv[1], "-V") == 0
1807 [ + + ]: 18 : || strcmp(argv[1], "--version") == 0)
1808 : : {
1809 : 1 : puts("pg_createsubscriber (PostgreSQL) " PG_VERSION);
1810 : 1 : exit(0);
1811 : : }
1812 : : }
1813 : :
1814 : : /* Default settings */
1815 : 18 : subscriber_dir = NULL;
1816 : 18 : opt.config_file = NULL;
1817 : 18 : opt.pub_conninfo_str = NULL;
1818 : 18 : opt.socket_dir = NULL;
1819 : 18 : opt.sub_port = DEFAULT_SUB_PORT;
1820 : 18 : opt.sub_username = NULL;
1821 : 18 : opt.database_names = (SimpleStringList)
1822 : : {
1823 : : 0
1824 : : };
1825 : 18 : opt.recovery_timeout = 0;
1826 : :
1827 : : /*
1828 : : * Don't allow it to be run as root. It uses pg_ctl which does not allow
1829 : : * it either.
1830 : : */
1831 : : #ifndef WIN32
1832 [ - + ]: 18 : if (geteuid() == 0)
1833 : : {
20 peter@eisentraut.org 1834 :UNC 0 : pg_log_error("cannot be executed by \"root\"");
1835 : 0 : pg_log_error_hint("You must run %s as the PostgreSQL superuser.",
1836 : : progname);
1837 : 0 : exit(1);
1838 : : }
1839 : : #endif
1840 : :
20 peter@eisentraut.org 1841 :GNC 18 : get_restricted_token();
1842 : :
1843 : 132 : while ((c = getopt_long(argc, argv, "d:D:np:P:s:t:U:v",
1844 [ + + ]: 132 : long_options, &option_index)) != -1)
1845 : : {
1846 [ + + + + : 117 : switch (c)
+ + - - +
- + + +
+ ]
1847 : : {
1848 : 24 : case 'd':
1849 [ + + ]: 24 : if (!simple_string_list_member(&opt.database_names, optarg))
1850 : : {
1851 : 23 : simple_string_list_append(&opt.database_names, optarg);
1852 : 23 : num_dbs++;
1853 : : }
1854 : : else
1855 : : {
1856 : 1 : pg_log_error("duplicate database \"%s\"", optarg);
1857 : 1 : exit(1);
1858 : : }
1859 : 23 : break;
1860 : 16 : case 'D':
1861 : 16 : subscriber_dir = pg_strdup(optarg);
1862 : 16 : canonicalize_path(subscriber_dir);
1863 : 16 : break;
1864 : 7 : case 'n':
1865 : 7 : dry_run = true;
1866 : 7 : break;
1867 : 9 : case 'p':
1868 : 9 : opt.sub_port = pg_strdup(optarg);
1869 : 9 : break;
1870 : 15 : case 'P':
1871 : 15 : opt.pub_conninfo_str = pg_strdup(optarg);
1872 : 15 : break;
1873 : 9 : case 's':
1874 : 9 : opt.socket_dir = pg_strdup(optarg);
1875 : 9 : canonicalize_path(opt.socket_dir);
1876 : 9 : break;
20 peter@eisentraut.org 1877 :UNC 0 : case 't':
1878 : 0 : opt.recovery_timeout = atoi(optarg);
1879 : 0 : break;
1880 : 0 : case 'U':
1881 : 0 : opt.sub_username = pg_strdup(optarg);
1882 : 0 : break;
20 peter@eisentraut.org 1883 :GNC 16 : case 'v':
1884 : 16 : pg_logging_increase_verbosity();
1885 : 16 : break;
20 peter@eisentraut.org 1886 :UNC 0 : case 1:
1887 : 0 : opt.config_file = pg_strdup(optarg);
1888 : 0 : break;
20 peter@eisentraut.org 1889 :GNC 11 : case 2:
1890 [ + + ]: 11 : if (!simple_string_list_member(&opt.pub_names, optarg))
1891 : : {
1892 : 10 : simple_string_list_append(&opt.pub_names, optarg);
1893 : 10 : num_pubs++;
1894 : : }
1895 : : else
1896 : : {
1897 : 1 : pg_log_error("duplicate publication \"%s\"", optarg);
1898 : 1 : exit(1);
1899 : : }
1900 : 10 : break;
1901 : 4 : case 3:
1902 [ + - ]: 4 : if (!simple_string_list_member(&opt.replslot_names, optarg))
1903 : : {
1904 : 4 : simple_string_list_append(&opt.replslot_names, optarg);
1905 : 4 : num_replslots++;
1906 : : }
1907 : : else
1908 : : {
20 peter@eisentraut.org 1909 :UNC 0 : pg_log_error("duplicate replication slot \"%s\"", optarg);
1910 : 0 : exit(1);
1911 : : }
20 peter@eisentraut.org 1912 :GNC 4 : break;
1913 : 5 : case 4:
1914 [ + - ]: 5 : if (!simple_string_list_member(&opt.sub_names, optarg))
1915 : : {
1916 : 5 : simple_string_list_append(&opt.sub_names, optarg);
1917 : 5 : num_subs++;
1918 : : }
1919 : : else
1920 : : {
20 peter@eisentraut.org 1921 :UNC 0 : pg_log_error("duplicate subscription \"%s\"", optarg);
1922 : 0 : exit(1);
1923 : : }
20 peter@eisentraut.org 1924 :GNC 5 : break;
1925 : 1 : default:
1926 : : /* getopt_long already emitted a complaint */
1927 : 1 : pg_log_error_hint("Try \"%s --help\" for more information.", progname);
1928 : 1 : exit(1);
1929 : : }
1930 : : }
1931 : :
1932 : : /* Any non-option arguments? */
1933 [ - + ]: 15 : if (optind < argc)
1934 : : {
20 peter@eisentraut.org 1935 :UNC 0 : pg_log_error("too many command-line arguments (first is \"%s\")",
1936 : : argv[optind]);
1937 : 0 : pg_log_error_hint("Try \"%s --help\" for more information.", progname);
1938 : 0 : exit(1);
1939 : : }
1940 : :
1941 : : /* Required arguments */
20 peter@eisentraut.org 1942 [ + + ]:GNC 15 : if (subscriber_dir == NULL)
1943 : : {
1944 : 1 : pg_log_error("no subscriber data directory specified");
1945 : 1 : pg_log_error_hint("Try \"%s --help\" for more information.", progname);
1946 : 1 : exit(1);
1947 : : }
1948 : :
1949 : : /* If socket directory is not provided, use the current directory */
1950 [ + + ]: 14 : if (opt.socket_dir == NULL)
1951 : : {
1952 : : char cwd[MAXPGPATH];
1953 : :
1954 [ - + ]: 5 : if (!getcwd(cwd, MAXPGPATH))
20 peter@eisentraut.org 1955 :UNC 0 : pg_fatal("could not determine current directory");
20 peter@eisentraut.org 1956 :GNC 5 : opt.socket_dir = pg_strdup(cwd);
1957 : 5 : canonicalize_path(opt.socket_dir);
1958 : : }
1959 : :
1960 : : /*
1961 : : * Parse connection string. Build a base connection string that might be
1962 : : * reused by multiple databases.
1963 : : */
1964 [ + + ]: 14 : if (opt.pub_conninfo_str == NULL)
1965 : : {
1966 : : /*
1967 : : * TODO use primary_conninfo (if available) from subscriber and
1968 : : * extract publisher connection string. Assume that there are
1969 : : * identical entries for physical and logical replication. If there is
1970 : : * not, we would fail anyway.
1971 : : */
1972 : 1 : pg_log_error("no publisher connection string specified");
1973 : 1 : pg_log_error_hint("Try \"%s --help\" for more information.", progname);
1974 : 1 : exit(1);
1975 : : }
1976 : 13 : pg_log_info("validating connection string on publisher");
1977 : 13 : pub_base_conninfo = get_base_conninfo(opt.pub_conninfo_str,
1978 : : &dbname_conninfo);
1979 [ - + ]: 13 : if (pub_base_conninfo == NULL)
20 peter@eisentraut.org 1980 :UNC 0 : exit(1);
1981 : :
20 peter@eisentraut.org 1982 :GNC 13 : pg_log_info("validating connection string on subscriber");
1983 : 13 : sub_base_conninfo = get_sub_conninfo(&opt);
1984 : :
1985 [ + + ]: 13 : if (opt.database_names.head == NULL)
1986 : : {
1987 : 2 : pg_log_info("no database was specified");
1988 : :
1989 : : /*
1990 : : * If --database option is not provided, try to obtain the dbname from
1991 : : * the publisher conninfo. If dbname parameter is not available, error
1992 : : * out.
1993 : : */
1994 [ + + ]: 2 : if (dbname_conninfo)
1995 : : {
1996 : 1 : simple_string_list_append(&opt.database_names, dbname_conninfo);
1997 : 1 : num_dbs++;
1998 : :
1999 : 1 : pg_log_info("database \"%s\" was extracted from the publisher connection string",
2000 : : dbname_conninfo);
2001 : : }
2002 : : else
2003 : : {
2004 : 1 : pg_log_error("no database name specified");
2005 : 1 : pg_log_error_hint("Try \"%s --help\" for more information.",
2006 : : progname);
2007 : 1 : exit(1);
2008 : : }
2009 : : }
2010 : :
2011 : : /* Number of object names must match number of databases */
2012 [ + + + + ]: 12 : if (num_pubs > 0 && num_pubs != num_dbs)
2013 : : {
2014 : 1 : pg_log_error("wrong number of publication names");
2015 : 1 : pg_log_error_hint("Number of publication names (%d) must match number of database names (%d).",
2016 : : num_pubs, num_dbs);
2017 : 1 : exit(1);
2018 : : }
2019 [ + + + + ]: 11 : if (num_subs > 0 && num_subs != num_dbs)
2020 : : {
2021 : 1 : pg_log_error("wrong number of subscription names");
2022 : 1 : pg_log_error_hint("Number of subscription names (%d) must match number of database names (%d).",
2023 : : num_subs, num_dbs);
2024 : 1 : exit(1);
2025 : : }
2026 [ + + + + ]: 10 : if (num_replslots > 0 && num_replslots != num_dbs)
2027 : : {
2028 : 1 : pg_log_error("wrong number of replication slot names");
2029 : 1 : pg_log_error_hint("Number of replication slot names (%d) must match number of database names (%d).",
2030 : : num_replslots, num_dbs);
2031 : 1 : exit(1);
2032 : : }
2033 : :
2034 : : /* Get the absolute path of pg_ctl and pg_resetwal on the subscriber */
2035 : 9 : pg_ctl_path = get_exec_path(argv[0], "pg_ctl");
2036 : 9 : pg_resetwal_path = get_exec_path(argv[0], "pg_resetwal");
2037 : :
2038 : : /* Rudimentary check for a data directory */
2039 : 9 : check_data_directory(subscriber_dir);
2040 : :
2041 : : /*
2042 : : * Store database information for publisher and subscriber. It should be
2043 : : * called before atexit() because its return is used in the
2044 : : * cleanup_objects_atexit().
2045 : : */
2046 : 9 : dbinfo = store_pub_sub_info(&opt, pub_base_conninfo, sub_base_conninfo);
2047 : :
2048 : : /* Register a function to clean up objects in case of failure */
2049 : 9 : atexit(cleanup_objects_atexit);
2050 : :
2051 : : /*
2052 : : * Check if the subscriber data directory has the same system identifier
2053 : : * than the publisher data directory.
2054 : : */
2055 : 9 : pub_sysid = get_primary_sysid(dbinfo[0].pubconninfo);
2056 : 9 : sub_sysid = get_standby_sysid(subscriber_dir);
2057 [ + + ]: 9 : if (pub_sysid != sub_sysid)
2058 : 1 : pg_fatal("subscriber data directory is not a copy of the source database cluster");
2059 : :
2060 : : /* Subscriber PID file */
2061 : 8 : snprintf(pidfile, MAXPGPATH, "%s/postmaster.pid", subscriber_dir);
2062 : :
2063 : : /*
2064 : : * The standby server must not be running. If the server is started under
2065 : : * service manager and pg_createsubscriber stops it, the service manager
2066 : : * might react to this action and start the server again. Therefore,
2067 : : * refuse to proceed if the server is running to avoid possible failures.
2068 : : */
2069 [ + + ]: 8 : if (stat(pidfile, &statbuf) == 0)
2070 : : {
2071 : 1 : pg_log_error("standby is up and running");
2072 : 1 : pg_log_error_hint("Stop the standby and try again.");
2073 : 1 : exit(1);
2074 : : }
2075 : :
2076 : : /*
2077 : : * Start a short-lived standby server with temporary parameters (provided
2078 : : * by command-line options). The goal is to avoid connections during the
2079 : : * transformation steps.
2080 : : */
2081 : 7 : pg_log_info("starting the standby with command-line options");
2082 : 7 : start_standby_server(&opt, true);
2083 : :
2084 : : /* Check if the standby server is ready for logical replication */
2085 : 7 : check_subscriber(dbinfo);
2086 : :
2087 : : /*
2088 : : * Check if the primary server is ready for logical replication. This
2089 : : * routine checks if a replication slot is in use on primary so it relies
2090 : : * on check_subscriber() to obtain the primary_slot_name. That's why it is
2091 : : * called after it.
2092 : : */
2093 : 5 : check_publisher(dbinfo);
2094 : :
2095 : : /*
2096 : : * Stop the target server. The recovery process requires that the server
2097 : : * reaches a consistent state before targeting the recovery stop point.
2098 : : * Make sure a consistent state is reached (stop the target server
2099 : : * guarantees it) *before* creating the replication slots in
2100 : : * setup_publisher().
2101 : : */
2102 : 3 : pg_log_info("stopping the subscriber");
2103 : 3 : stop_standby_server(subscriber_dir);
2104 : :
2105 : : /*
2106 : : * Create the required objects for each database on publisher. This step
2107 : : * is here mainly because if we stop the standby we cannot verify if the
2108 : : * primary slot is in use. We could use an extra connection for it but it
2109 : : * doesn't seem worth.
2110 : : */
2111 : 3 : consistent_lsn = setup_publisher(dbinfo);
2112 : :
2113 : : /* Write the required recovery parameters */
2114 : 3 : setup_recovery(dbinfo, subscriber_dir, consistent_lsn);
2115 : :
2116 : : /*
2117 : : * Start subscriber so the recovery parameters will take effect. Wait
2118 : : * until accepting connections.
2119 : : */
2120 : 3 : pg_log_info("starting the subscriber");
2121 : 3 : start_standby_server(&opt, true);
2122 : :
2123 : : /* Waiting the subscriber to be promoted */
2124 : 3 : wait_for_end_recovery(dbinfo[0].subconninfo, &opt);
2125 : :
2126 : : /*
2127 : : * Create the subscription for each database on subscriber. It does not
2128 : : * enable it immediately because it needs to adjust the replication start
2129 : : * point to the LSN reported by setup_publisher(). It also cleans up
2130 : : * publications created by this tool and replication to the standby.
2131 : : */
2132 : 3 : setup_subscriber(dbinfo, consistent_lsn);
2133 : :
2134 : : /* Remove primary_slot_name if it exists on primary */
2135 : 3 : drop_primary_replication_slot(dbinfo, primary_slot_name);
2136 : :
2137 : : /* Stop the subscriber */
2138 : 3 : pg_log_info("stopping the subscriber");
2139 : 3 : stop_standby_server(subscriber_dir);
2140 : :
2141 : : /* Change system identifier from subscriber */
2142 : 3 : modify_subscriber_sysid(&opt);
2143 : :
2144 : 3 : success = true;
2145 : :
2146 : 3 : pg_log_info("Done!");
2147 : :
2148 : 3 : return 0;
2149 : : }
|