Age Owner Branch data TLA Line data Source code
1 : : /*-------------------------------------------------------------------------
2 : : *
3 : : * libpqwalreceiver.c
4 : : *
5 : : * This file contains the libpq-specific parts of walreceiver. It's
6 : : * loaded as a dynamic module to avoid linking the main server binary with
7 : : * libpq.
8 : : *
9 : : * Apart from walreceiver, the libpq-specific routines are now being used by
10 : : * logical replication workers and slot synchronization.
11 : : *
12 : : * Portions Copyright (c) 2010-2024, PostgreSQL Global Development Group
13 : : *
14 : : *
15 : : * IDENTIFICATION
16 : : * src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
17 : : *
18 : : *-------------------------------------------------------------------------
19 : : */
20 : : #include "postgres.h"
21 : :
22 : : #include <unistd.h>
23 : : #include <sys/time.h>
24 : :
25 : : #include "common/connect.h"
26 : : #include "funcapi.h"
27 : : #include "libpq-fe.h"
28 : : #include "mb/pg_wchar.h"
29 : : #include "miscadmin.h"
30 : : #include "pgstat.h"
31 : : #include "pqexpbuffer.h"
32 : : #include "replication/walreceiver.h"
33 : : #include "utils/builtins.h"
34 : : #include "utils/memutils.h"
35 : : #include "utils/pg_lsn.h"
36 : : #include "utils/tuplestore.h"
37 : :
5198 heikki.linnakangas@i 38 :CBC 928 : PG_MODULE_MAGIC;
39 : :
40 : : struct WalReceiverConn
41 : : {
42 : : /* Current connection to the primary, if any */
43 : : PGconn *streamConn;
44 : : /* Used to remember if the connection is logical or physical */
45 : : bool logical;
46 : : /* Buffer for currently read records */
47 : : char *recvBuf;
48 : : };
49 : :
50 : : /* Prototypes for interface functions */
51 : : static WalReceiverConn *libpqrcv_connect(const char *conninfo,
52 : : bool replication, bool logical,
53 : : bool must_use_password,
54 : : const char *appname, char **err);
55 : : static void libpqrcv_check_conninfo(const char *conninfo,
56 : : bool must_use_password);
57 : : static char *libpqrcv_get_conninfo(WalReceiverConn *conn);
58 : : static void libpqrcv_get_senderinfo(WalReceiverConn *conn,
59 : : char **sender_host, int *sender_port);
60 : : static char *libpqrcv_identify_system(WalReceiverConn *conn,
61 : : TimeLineID *primary_tli);
62 : : static char *libpqrcv_get_dbname_from_conninfo(const char *conninfo);
63 : : static int libpqrcv_server_version(WalReceiverConn *conn);
64 : : static void libpqrcv_readtimelinehistoryfile(WalReceiverConn *conn,
65 : : TimeLineID tli, char **filename,
66 : : char **content, int *len);
67 : : static bool libpqrcv_startstreaming(WalReceiverConn *conn,
68 : : const WalRcvStreamOptions *options);
69 : : static void libpqrcv_endstreaming(WalReceiverConn *conn,
70 : : TimeLineID *next_tli);
71 : : static int libpqrcv_receive(WalReceiverConn *conn, char **buffer,
72 : : pgsocket *wait_fd);
73 : : static void libpqrcv_send(WalReceiverConn *conn, const char *buffer,
74 : : int nbytes);
75 : : static char *libpqrcv_create_slot(WalReceiverConn *conn,
76 : : const char *slotname,
77 : : bool temporary,
78 : : bool two_phase,
79 : : bool failover,
80 : : CRSSnapshotAction snapshot_action,
81 : : XLogRecPtr *lsn);
82 : : static void libpqrcv_alter_slot(WalReceiverConn *conn, const char *slotname,
83 : : bool failover);
84 : : static pid_t libpqrcv_get_backend_pid(WalReceiverConn *conn);
85 : : static WalRcvExecResult *libpqrcv_exec(WalReceiverConn *conn,
86 : : const char *query,
87 : : const int nRetTypes,
88 : : const Oid *retTypes);
89 : : static void libpqrcv_disconnect(WalReceiverConn *conn);
90 : :
91 : : static WalReceiverFunctionsType PQWalReceiverFunctions = {
92 : : .walrcv_connect = libpqrcv_connect,
93 : : .walrcv_check_conninfo = libpqrcv_check_conninfo,
94 : : .walrcv_get_conninfo = libpqrcv_get_conninfo,
95 : : .walrcv_get_senderinfo = libpqrcv_get_senderinfo,
96 : : .walrcv_identify_system = libpqrcv_identify_system,
97 : : .walrcv_server_version = libpqrcv_server_version,
98 : : .walrcv_readtimelinehistoryfile = libpqrcv_readtimelinehistoryfile,
99 : : .walrcv_startstreaming = libpqrcv_startstreaming,
100 : : .walrcv_endstreaming = libpqrcv_endstreaming,
101 : : .walrcv_receive = libpqrcv_receive,
102 : : .walrcv_send = libpqrcv_send,
103 : : .walrcv_create_slot = libpqrcv_create_slot,
104 : : .walrcv_alter_slot = libpqrcv_alter_slot,
105 : : .walrcv_get_dbname_from_conninfo = libpqrcv_get_dbname_from_conninfo,
106 : : .walrcv_get_backend_pid = libpqrcv_get_backend_pid,
107 : : .walrcv_exec = libpqrcv_exec,
108 : : .walrcv_disconnect = libpqrcv_disconnect
109 : : };
110 : :
111 : : /* Prototypes for private functions */
112 : : static PGresult *libpqrcv_PQexec(PGconn *streamConn, const char *query);
113 : : static PGresult *libpqrcv_PQgetResult(PGconn *streamConn);
114 : : static char *stringlist_to_identifierstr(PGconn *conn, List *strings);
115 : :
116 : : /*
117 : : * Module initialization function
118 : : */
119 : : void
120 : 928 : _PG_init(void)
121 : : {
2692 peter_e@gmx.net 122 [ - + ]: 928 : if (WalReceiverFunctions != NULL)
5198 heikki.linnakangas@i 123 [ # # ]:UBC 0 : elog(ERROR, "libpqwalreceiver already loaded");
2692 peter_e@gmx.net 124 :CBC 928 : WalReceiverFunctions = &PQWalReceiverFunctions;
5198 heikki.linnakangas@i 125 : 928 : }
126 : :
127 : : /*
128 : : * Establish the connection to the primary server.
129 : : *
130 : : * This function can be used for both replication and regular connections.
131 : : * If it is a replication connection, it could be either logical or physical
132 : : * based on input argument 'logical'.
133 : : *
134 : : * If an error occurs, this function will normally return NULL and set *err
135 : : * to a palloc'ed error message. However, if must_use_password is true and
136 : : * the connection fails to use the password, this function will ereport(ERROR).
137 : : * We do this because in that case the error includes a detail and a hint for
138 : : * consistency with other parts of the system, and it's not worth adding the
139 : : * machinery to pass all of those back to the caller just to cover this one
140 : : * case.
141 : : */
142 : : static WalReceiverConn *
69 akapila@postgresql.o 143 :GNC 927 : libpqrcv_connect(const char *conninfo, bool replication, bool logical,
144 : : bool must_use_password, const char *appname, char **err)
145 : : {
146 : : WalReceiverConn *conn;
147 : : PostgresPollingStatusType status;
148 : : const char *keys[6];
149 : : const char *vals[6];
2692 peter_e@gmx.net 150 :CBC 927 : int i = 0;
151 : :
152 : : /*
153 : : * Re-validate connection string. The validation already happened at DDL
154 : : * time, but the subscription owner may have changed. If we don't recheck
155 : : * with the correct must_use_password, it's possible that the connection
156 : : * will obtain the password from a different source, such as PGPASSFILE or
157 : : * PGPASSWORD.
158 : : */
93 jdavis@postgresql.or 159 : 927 : libpqrcv_check_conninfo(conninfo, must_use_password);
160 : :
161 : : /*
162 : : * We use the expand_dbname parameter to process the connection string (or
163 : : * URI), and pass some extra options.
164 : : */
2692 peter_e@gmx.net 165 : 913 : keys[i] = "dbname";
166 : 913 : vals[i] = conninfo;
167 : :
168 : : /* We can not have logical without replication */
69 akapila@postgresql.o 169 [ + + - + ]:GNC 913 : Assert(replication || !logical);
170 : :
171 [ + + ]: 913 : if (replication)
172 : : {
173 : 902 : keys[++i] = "replication";
174 [ + + ]: 902 : vals[i] = logical ? "database" : "true";
175 : :
176 [ + + ]: 902 : if (logical)
177 : : {
178 : : /* Tell the publisher to translate to our encoding */
179 : 600 : keys[++i] = "client_encoding";
180 : 600 : vals[i] = GetDatabaseEncodingName();
181 : :
182 : : /*
183 : : * Force assorted GUC parameters to settings that ensure that the
184 : : * publisher will output data values in a form that is unambiguous
185 : : * to the subscriber. (We don't want to modify the subscriber's
186 : : * GUC settings, since that might surprise user-defined code
187 : : * running in the subscriber, such as triggers.) This should
188 : : * match what pg_dump does.
189 : : */
190 : 600 : keys[++i] = "options";
191 : 600 : vals[i] = "-c datestyle=ISO -c intervalstyle=postgres -c extra_float_digits=3";
192 : : }
193 : : else
194 : : {
195 : : /*
196 : : * The database name is ignored by the server in replication mode,
197 : : * but specify "replication" for .pgpass lookup.
198 : : */
199 : 302 : keys[++i] = "dbname";
200 : 302 : vals[i] = "replication";
201 : : }
202 : : }
203 : :
2692 peter_e@gmx.net 204 :CBC 913 : keys[++i] = "fallback_application_name";
205 : 913 : vals[i] = appname;
206 : :
207 : 913 : keys[++i] = NULL;
208 : 913 : vals[i] = NULL;
209 : :
2607 210 [ - + ]: 913 : Assert(i < sizeof(keys));
211 : :
2692 212 : 913 : conn = palloc0(sizeof(WalReceiverConn));
82 heikki.linnakangas@i 213 : 913 : conn->streamConn = PQconnectStartParams(keys, vals,
214 : : /* expand_dbname = */ true);
215 [ + + ]: 913 : if (PQstatus(conn->streamConn) == CONNECTION_BAD)
216 : 232 : goto bad_connection_errmsg;
217 : :
218 : : /*
219 : : * Poll connection until we have OK or FAILED status.
220 : : *
221 : : * Per spec for PQconnectPoll, first wait till socket is write-ready.
222 : : */
223 : 681 : status = PGRES_POLLING_WRITING;
224 : : do
225 : : {
226 : : int io_flag;
227 : : int rc;
228 : :
229 [ + + ]: 1878 : if (status == PGRES_POLLING_READING)
230 : 685 : io_flag = WL_SOCKET_READABLE;
231 : : #ifdef WIN32
232 : : /* Windows needs a different test while waiting for connection-made */
233 : : else if (PQstatus(conn->streamConn) == CONNECTION_STARTED)
234 : : io_flag = WL_SOCKET_CONNECTED;
235 : : #endif
236 : : else
237 : 1193 : io_flag = WL_SOCKET_WRITEABLE;
238 : :
239 : 1878 : rc = WaitLatchOrSocket(MyLatch,
240 : : WL_EXIT_ON_PM_DEATH | WL_LATCH_SET | io_flag,
241 : 1878 : PQsocket(conn->streamConn),
242 : : 0,
243 : : WAIT_EVENT_LIBPQWALRECEIVER_CONNECT);
244 : :
245 : : /* Interrupted? */
246 [ + + ]: 1878 : if (rc & WL_LATCH_SET)
247 : : {
248 : 513 : ResetLatch(MyLatch);
249 : 513 : ProcessWalRcvInterrupts();
250 : : }
251 : :
252 : : /* If socket is ready, advance the libpq state machine */
253 [ + + ]: 1877 : if (rc & io_flag)
254 : 1365 : status = PQconnectPoll(conn->streamConn);
255 [ + + + + ]: 1877 : } while (status != PGRES_POLLING_OK && status != PGRES_POLLING_FAILED);
256 : :
2692 peter_e@gmx.net 257 [ + + ]: 680 : if (PQstatus(conn->streamConn) != CONNECTION_OK)
447 andres@anarazel.de 258 : 22 : goto bad_connection_errmsg;
259 : :
381 rhaas@postgresql.org 260 [ + + - + ]: 658 : if (must_use_password && !PQconnectionUsedPassword(conn->streamConn))
261 : : {
82 heikki.linnakangas@i 262 :UBC 0 : PQfinish(conn->streamConn);
381 rhaas@postgresql.org 263 : 0 : pfree(conn);
264 : :
265 [ # # ]: 0 : ereport(ERROR,
266 : : (errcode(ERRCODE_S_R_E_PROHIBITED_SQL_STATEMENT_ATTEMPTED),
267 : : errmsg("password is required"),
268 : : errdetail("Non-superuser cannot connect if the server does not request a password."),
269 : : errhint("Target server's authentication method must be changed, or set password_required=false in the subscription parameters.")));
270 : : }
271 : :
272 : : /*
273 : : * Set always-secure search path for the cases where the connection is
274 : : * used to run SQL queries, so malicious users can't get control.
275 : : */
45 akapila@postgresql.o 276 [ + + + + ]:GNC 658 : if (!replication || logical)
277 : : {
278 : : PGresult *res;
279 : :
1343 noah@leadboat.com 280 :CBC 530 : res = libpqrcv_PQexec(conn->streamConn,
281 : : ALWAYS_SECURE_SEARCH_PATH_SQL);
282 [ - + ]: 529 : if (PQresultStatus(res) != PGRES_TUPLES_OK)
283 : : {
1343 noah@leadboat.com 284 :UBC 0 : PQclear(res);
447 andres@anarazel.de 285 : 0 : *err = psprintf(_("could not clear search path: %s"),
286 : 0 : pchomp(PQerrorMessage(conn->streamConn)));
287 : 0 : goto bad_connection;
288 : : }
1343 noah@leadboat.com 289 :CBC 529 : PQclear(res);
290 : : }
291 : :
2692 peter_e@gmx.net 292 : 657 : conn->logical = logical;
293 : :
294 : 657 : return conn;
295 : :
296 : : /* error path, using libpq's error message */
447 andres@anarazel.de 297 : 254 : bad_connection_errmsg:
298 : 254 : *err = pchomp(PQerrorMessage(conn->streamConn));
299 : :
300 : : /* error path, error already set */
301 : 254 : bad_connection:
82 heikki.linnakangas@i 302 : 254 : PQfinish(conn->streamConn);
447 andres@anarazel.de 303 : 254 : pfree(conn);
304 : 254 : return NULL;
305 : : }
306 : :
307 : : /*
308 : : * Validate connection info string, and determine whether it might cause
309 : : * local filesystem access to be attempted.
310 : : *
311 : : * If the connection string can't be parsed, this function will raise
312 : : * an error and will not return. If it can, it will return true if this
313 : : * connection string specifies a password and false otherwise.
314 : : */
315 : : static void
381 rhaas@postgresql.org 316 : 1091 : libpqrcv_check_conninfo(const char *conninfo, bool must_use_password)
317 : : {
2524 bruce@momjian.us 318 : 1091 : PQconninfoOption *opts = NULL;
319 : : PQconninfoOption *opt;
320 : 1091 : char *err = NULL;
321 : :
2642 peter_e@gmx.net 322 : 1091 : opts = PQconninfoParse(conninfo, &err);
323 [ + + ]: 1091 : if (opts == NULL)
324 : : {
325 : : /* The error string is malloc'd, so we must free it explicitly */
1123 tgl@sss.pgh.pa.us 326 [ + - ]: 9 : char *errcopy = err ? pstrdup(err) : "out of memory";
327 : :
328 : 9 : PQfreemem(err);
2642 peter_e@gmx.net 329 [ + - ]: 9 : ereport(ERROR,
330 : : (errcode(ERRCODE_SYNTAX_ERROR),
331 : : errmsg("invalid connection string syntax: %s", errcopy)));
332 : : }
333 : :
381 rhaas@postgresql.org 334 [ + + ]: 1082 : if (must_use_password)
335 : : {
331 tgl@sss.pgh.pa.us 336 : 24 : bool uses_password = false;
337 : :
381 rhaas@postgresql.org 338 [ + + ]: 735 : for (opt = opts; opt->keyword != NULL; ++opt)
339 : : {
340 : : /* Ignore connection options that are not present. */
341 [ + + ]: 718 : if (opt->val == NULL)
342 : 660 : continue;
343 : :
344 [ + + + - ]: 58 : if (strcmp(opt->keyword, "password") == 0 && opt->val[0] != '\0')
345 : : {
346 : 7 : uses_password = true;
347 : 7 : break;
348 : : }
349 : : }
350 : :
351 [ + + ]: 24 : if (!uses_password)
352 : : {
353 : : /* malloc'd, so we must free it explicitly */
93 jdavis@postgresql.or 354 : 17 : PQconninfoFree(opts);
355 : :
381 rhaas@postgresql.org 356 [ + - ]: 17 : ereport(ERROR,
357 : : (errcode(ERRCODE_S_R_E_PROHIBITED_SQL_STATEMENT_ATTEMPTED),
358 : : errmsg("password is required"),
359 : : errdetail("Non-superusers must provide a password in the connection string.")));
360 : : }
361 : : }
362 : :
2642 peter_e@gmx.net 363 : 1065 : PQconninfoFree(opts);
364 : 1065 : }
365 : :
366 : : /*
367 : : * Return a user-displayable conninfo string. Any security-sensitive fields
368 : : * are obfuscated.
369 : : */
370 : : static char *
2692 371 : 128 : libpqrcv_get_conninfo(WalReceiverConn *conn)
372 : : {
373 : : PQconninfoOption *conn_opts;
374 : : PQconninfoOption *conn_opt;
375 : : PQExpBufferData buf;
376 : : char *retval;
377 : :
378 [ - + ]: 128 : Assert(conn->streamConn != NULL);
379 : :
2846 alvherre@alvh.no-ip. 380 : 128 : initPQExpBuffer(&buf);
2692 peter_e@gmx.net 381 : 128 : conn_opts = PQconninfo(conn->streamConn);
382 : :
2846 alvherre@alvh.no-ip. 383 [ - + ]: 128 : if (conn_opts == NULL)
2846 alvherre@alvh.no-ip. 384 [ # # ]:UBC 0 : ereport(ERROR,
385 : : (errcode(ERRCODE_OUT_OF_MEMORY),
386 : : errmsg("could not parse connection string: %s",
387 : : _("out of memory"))));
388 : :
389 : : /* build a clean connection string from pieces */
2846 alvherre@alvh.no-ip. 390 [ + + ]:CBC 5376 : for (conn_opt = conn_opts; conn_opt->keyword != NULL; conn_opt++)
391 : : {
392 : : bool obfuscate;
393 : :
394 : : /* Skip debug and empty options */
395 [ + + ]: 5248 : if (strchr(conn_opt->dispchar, 'D') ||
396 [ + + ]: 5120 : conn_opt->val == NULL ||
397 [ + + ]: 2441 : conn_opt->val[0] == '\0')
398 : 2935 : continue;
399 : :
400 : : /* Obfuscate security-sensitive options */
401 : 2313 : obfuscate = strchr(conn_opt->dispchar, '*') != NULL;
402 : :
403 [ - + ]: 4626 : appendPQExpBuffer(&buf, "%s%s=%s",
404 [ + + ]: 2313 : buf.len == 0 ? "" : " ",
405 : : conn_opt->keyword,
406 : : obfuscate ? "********" : conn_opt->val);
407 : : }
408 : :
409 : 128 : PQconninfoFree(conn_opts);
410 : :
411 [ + - ]: 128 : retval = PQExpBufferDataBroken(buf) ? NULL : pstrdup(buf.data);
412 : 128 : termPQExpBuffer(&buf);
413 : 128 : return retval;
414 : : }
415 : :
416 : : /*
417 : : * Provides information of sender this WAL receiver is connected to.
418 : : */
419 : : static void
2206 fujii@postgresql.org 420 : 128 : libpqrcv_get_senderinfo(WalReceiverConn *conn, char **sender_host,
421 : : int *sender_port)
422 : : {
2180 tgl@sss.pgh.pa.us 423 : 128 : char *ret = NULL;
424 : :
2206 fujii@postgresql.org 425 : 128 : *sender_host = NULL;
426 : 128 : *sender_port = 0;
427 : :
428 [ - + ]: 128 : Assert(conn->streamConn != NULL);
429 : :
430 : 128 : ret = PQhost(conn->streamConn);
431 [ + - + - ]: 128 : if (ret && strlen(ret) != 0)
432 : 128 : *sender_host = pstrdup(ret);
433 : :
434 : 128 : ret = PQport(conn->streamConn);
435 [ + - + - ]: 128 : if (ret && strlen(ret) != 0)
436 : 128 : *sender_port = atoi(ret);
437 : 128 : }
438 : :
439 : : /*
440 : : * Check that primary's system identifier matches ours, and fetch the current
441 : : * timeline ID of the primary.
442 : : */
443 : : static char *
1857 peter@eisentraut.org 444 : 294 : libpqrcv_identify_system(WalReceiverConn *conn, TimeLineID *primary_tli)
445 : : {
446 : : PGresult *res;
447 : : char *primary_sysid;
448 : :
449 : : /*
450 : : * Get the system identifier and timeline ID as a DataRow message from the
451 : : * primary server.
452 : : */
2692 peter_e@gmx.net 453 : 294 : res = libpqrcv_PQexec(conn->streamConn, "IDENTIFY_SYSTEM");
5198 heikki.linnakangas@i 454 [ - + ]: 294 : if (PQresultStatus(res) != PGRES_TUPLES_OK)
455 : : {
5198 heikki.linnakangas@i 456 :UBC 0 : PQclear(res);
457 [ # # ]: 0 : ereport(ERROR,
458 : : (errcode(ERRCODE_PROTOCOL_VIOLATION),
459 : : errmsg("could not receive database system identifier and timeline ID from "
460 : : "the primary server: %s",
461 : : pchomp(PQerrorMessage(conn->streamConn)))));
462 : : }
463 : :
464 : : /*
465 : : * IDENTIFY_SYSTEM returns 3 columns in 9.3 and earlier, and 4 columns in
466 : : * 9.4 and onwards.
467 : : */
3526 fujii@postgresql.org 468 [ + - - + ]:CBC 294 : if (PQnfields(res) < 3 || PQntuples(res) != 1)
469 : : {
5161 bruce@momjian.us 470 :UBC 0 : int ntuples = PQntuples(res);
471 : 0 : int nfields = PQnfields(res);
472 : :
5198 heikki.linnakangas@i 473 : 0 : PQclear(res);
474 [ # # ]: 0 : ereport(ERROR,
475 : : (errcode(ERRCODE_PROTOCOL_VIOLATION),
476 : : errmsg("invalid response from primary server"),
477 : : errdetail("Could not identify system: got %d rows and %d fields, expected %d rows and %d or more fields.",
478 : : ntuples, nfields, 1, 3)));
479 : : }
2692 peter_e@gmx.net 480 :CBC 294 : primary_sysid = pstrdup(PQgetvalue(res, 0, 0));
2093 andres@anarazel.de 481 : 294 : *primary_tli = pg_strtoint32(PQgetvalue(res, 0, 1));
5198 heikki.linnakangas@i 482 : 294 : PQclear(res);
483 : :
2692 peter_e@gmx.net 484 : 294 : return primary_sysid;
485 : : }
486 : :
487 : : /*
488 : : * Thin wrapper around libpq to obtain server version.
489 : : */
490 : : static int
1857 peter@eisentraut.org 491 : 1130 : libpqrcv_server_version(WalReceiverConn *conn)
492 : : {
493 : 1130 : return PQserverVersion(conn->streamConn);
494 : : }
495 : :
496 : : /*
497 : : * Get database name from the primary server's conninfo.
498 : : *
499 : : * If dbname is not found in connInfo, return NULL value.
500 : : */
501 : : static char *
69 akapila@postgresql.o 502 :GNC 12 : libpqrcv_get_dbname_from_conninfo(const char *connInfo)
503 : : {
504 : : PQconninfoOption *opts;
505 : 12 : char *dbname = NULL;
506 : 12 : char *err = NULL;
507 : :
508 : 12 : opts = PQconninfoParse(connInfo, &err);
509 [ - + ]: 12 : if (opts == NULL)
510 : : {
511 : : /* The error string is malloc'd, so we must free it explicitly */
69 akapila@postgresql.o 512 [ # # ]:UNC 0 : char *errcopy = err ? pstrdup(err) : "out of memory";
513 : :
514 : 0 : PQfreemem(err);
515 [ # # ]: 0 : ereport(ERROR,
516 : : (errcode(ERRCODE_SYNTAX_ERROR),
517 : : errmsg("invalid connection string syntax: %s", errcopy)));
518 : : }
519 : :
69 akapila@postgresql.o 520 [ + + ]:GNC 504 : for (PQconninfoOption *opt = opts; opt->keyword != NULL; ++opt)
521 : : {
522 : : /*
523 : : * If multiple dbnames are specified, then the last one will be
524 : : * returned
525 : : */
526 [ + + + + ]: 492 : if (strcmp(opt->keyword, "dbname") == 0 && opt->val &&
527 [ + - ]: 11 : *opt->val)
528 : : {
529 [ - + ]: 11 : if (dbname)
69 akapila@postgresql.o 530 :UNC 0 : pfree(dbname);
531 : :
69 akapila@postgresql.o 532 :GNC 11 : dbname = pstrdup(opt->val);
533 : : }
534 : : }
535 : :
536 : 12 : PQconninfoFree(opts);
537 : 12 : return dbname;
538 : : }
539 : :
540 : : /*
541 : : * Start streaming WAL data from given streaming options.
542 : : *
543 : : * Returns true if we switched successfully to copy-both mode. False
544 : : * means the server received the command and executed it successfully, but
545 : : * didn't switch to copy-mode. That means that there was no WAL on the
546 : : * requested timeline and starting point, because the server switched to
547 : : * another timeline at or before the requested starting point. On failure,
548 : : * throws an ERROR.
549 : : */
550 : : static bool
2692 peter_e@gmx.net 551 :CBC 454 : libpqrcv_startstreaming(WalReceiverConn *conn,
552 : : const WalRcvStreamOptions *options)
553 : : {
554 : : StringInfoData cmd;
555 : : PGresult *res;
556 : :
2642 557 [ - + ]: 454 : Assert(options->logical == conn->logical);
558 [ + + - + ]: 454 : Assert(options->slotname || !options->logical);
559 : :
2692 560 : 454 : initStringInfo(&cmd);
561 : :
562 : : /* Build the command. */
2642 563 : 454 : appendStringInfoString(&cmd, "START_REPLICATION");
564 [ + + ]: 454 : if (options->slotname != NULL)
565 : 366 : appendStringInfo(&cmd, " SLOT \"%s\"",
566 : 366 : options->slotname);
567 : :
568 [ + + ]: 454 : if (options->logical)
2434 569 : 326 : appendStringInfoString(&cmd, " LOGICAL");
570 : :
1146 peter@eisentraut.org 571 : 454 : appendStringInfo(&cmd, " %X/%X", LSN_FORMAT_ARGS(options->startpoint));
572 : :
573 : : /*
574 : : * Additional options are different depending on if we are doing logical
575 : : * or physical replication.
576 : : */
2642 peter_e@gmx.net 577 [ + + ]: 454 : if (options->logical)
578 : : {
579 : : char *pubnames_str;
580 : : List *pubnames;
581 : : char *pubnames_literal;
582 : :
583 : 326 : appendStringInfoString(&cmd, " (");
584 : :
585 : 326 : appendStringInfo(&cmd, "proto_version '%u'",
586 : 326 : options->proto.logical.proto_version);
587 : :
461 akapila@postgresql.o 588 [ + + ]: 326 : if (options->proto.logical.streaming_str)
589 : 33 : appendStringInfo(&cmd, ", streaming '%s'",
590 : 33 : options->proto.logical.streaming_str);
591 : :
1005 592 [ + + + - ]: 330 : if (options->proto.logical.twophase &&
593 : 4 : PQserverVersion(conn->streamConn) >= 150000)
594 : 4 : appendStringInfoString(&cmd, ", two_phase 'on'");
595 : :
633 596 [ + - + - ]: 652 : if (options->proto.logical.origin &&
597 : 326 : PQserverVersion(conn->streamConn) >= 160000)
598 : 326 : appendStringInfo(&cmd, ", origin '%s'",
599 : 326 : options->proto.logical.origin);
600 : :
2642 peter_e@gmx.net 601 : 326 : pubnames = options->proto.logical.publication_names;
602 : 326 : pubnames_str = stringlist_to_identifierstr(conn->streamConn, pubnames);
2638 603 [ - + ]: 326 : if (!pubnames_str)
2638 peter_e@gmx.net 604 [ # # ]:UBC 0 : ereport(ERROR,
605 : : (errcode(ERRCODE_OUT_OF_MEMORY), /* likely guess */
606 : : errmsg("could not start WAL streaming: %s",
607 : : pchomp(PQerrorMessage(conn->streamConn)))));
2638 peter_e@gmx.net 608 :CBC 326 : pubnames_literal = PQescapeLiteral(conn->streamConn, pubnames_str,
609 : : strlen(pubnames_str));
610 [ - + ]: 326 : if (!pubnames_literal)
2638 peter_e@gmx.net 611 [ # # ]:UBC 0 : ereport(ERROR,
612 : : (errcode(ERRCODE_OUT_OF_MEMORY), /* likely guess */
613 : : errmsg("could not start WAL streaming: %s",
614 : : pchomp(PQerrorMessage(conn->streamConn)))));
2638 peter_e@gmx.net 615 :CBC 326 : appendStringInfo(&cmd, ", publication_names %s", pubnames_literal);
616 : 326 : PQfreemem(pubnames_literal);
2642 617 : 326 : pfree(pubnames_str);
618 : :
1366 tgl@sss.pgh.pa.us 619 [ + + + - ]: 337 : if (options->proto.logical.binary &&
620 : 11 : PQserverVersion(conn->streamConn) >= 140000)
621 : 11 : appendStringInfoString(&cmd, ", binary 'true'");
622 : :
2638 peter_e@gmx.net 623 : 326 : appendStringInfoChar(&cmd, ')');
624 : : }
625 : : else
2642 626 : 128 : appendStringInfo(&cmd, " TIMELINE %u",
627 : 128 : options->proto.physical.startpointTLI);
628 : :
629 : : /* Start streaming. */
2692 630 : 454 : res = libpqrcv_PQexec(conn->streamConn, cmd.data);
631 : 454 : pfree(cmd.data);
632 : :
4140 heikki.linnakangas@i 633 [ - + ]: 454 : if (PQresultStatus(res) == PGRES_COMMAND_OK)
634 : : {
4140 heikki.linnakangas@i 635 :UBC 0 : PQclear(res);
636 : 0 : return false;
637 : : }
4140 heikki.linnakangas@i 638 [ - + ]:CBC 454 : else if (PQresultStatus(res) != PGRES_COPY_BOTH)
639 : : {
5109 magnus@hagander.net 640 :UBC 0 : PQclear(res);
5198 heikki.linnakangas@i 641 [ # # ]: 0 : ereport(ERROR,
642 : : (errcode(ERRCODE_PROTOCOL_VIOLATION),
643 : : errmsg("could not start WAL streaming: %s",
644 : : pchomp(PQerrorMessage(conn->streamConn)))));
645 : : }
5198 heikki.linnakangas@i 646 :CBC 454 : PQclear(res);
4140 647 : 454 : return true;
648 : : }
649 : :
650 : : /*
651 : : * Stop streaming WAL data. Returns the next timeline's ID in *next_tli, as
652 : : * reported by the server, or 0 if it did not report it.
653 : : */
654 : : static void
2692 peter_e@gmx.net 655 : 202 : libpqrcv_endstreaming(WalReceiverConn *conn, TimeLineID *next_tli)
656 : : {
657 : : PGresult *res;
658 : :
659 : : /*
660 : : * Send copy-end message. As in libpqrcv_PQexec, this could theoretically
661 : : * block, but the risk seems small.
662 : : */
663 [ + + - + ]: 374 : if (PQputCopyEnd(conn->streamConn, NULL) <= 0 ||
664 : 172 : PQflush(conn->streamConn))
4140 heikki.linnakangas@i 665 [ + - ]: 30 : ereport(ERROR,
666 : : (errcode(ERRCODE_CONNECTION_FAILURE),
667 : : errmsg("could not send end-of-streaming message to primary: %s",
668 : : pchomp(PQerrorMessage(conn->streamConn)))));
669 : :
2692 peter_e@gmx.net 670 : 172 : *next_tli = 0;
671 : :
672 : : /*
673 : : * After COPY is finished, we should receive a result set indicating the
674 : : * next timeline's ID, or just CommandComplete if the server was shut
675 : : * down.
676 : : *
677 : : * If we had not yet received CopyDone from the backend, PGRES_COPY_OUT is
678 : : * also possible in case we aborted the copy in mid-stream.
679 : : */
1812 tgl@sss.pgh.pa.us 680 : 172 : res = libpqrcv_PQgetResult(conn->streamConn);
4104 heikki.linnakangas@i 681 [ + + ]: 172 : if (PQresultStatus(res) == PGRES_TUPLES_OK)
682 : : {
683 : : /*
684 : : * Read the next timeline's ID. The server also sends the timeline's
685 : : * starting point, but it is ignored.
686 : : */
3994 687 [ + - - + ]: 12 : if (PQnfields(res) < 2 || PQntuples(res) != 1)
4104 heikki.linnakangas@i 688 [ # # ]:UBC 0 : ereport(ERROR,
689 : : (errcode(ERRCODE_PROTOCOL_VIOLATION),
690 : : errmsg("unexpected result set after end-of-streaming")));
2093 andres@anarazel.de 691 :CBC 12 : *next_tli = pg_strtoint32(PQgetvalue(res, 0, 0));
4140 heikki.linnakangas@i 692 : 12 : PQclear(res);
693 : :
694 : : /* the result set should be followed by CommandComplete */
1812 tgl@sss.pgh.pa.us 695 : 12 : res = libpqrcv_PQgetResult(conn->streamConn);
696 : : }
2692 peter_e@gmx.net 697 [ + - ]: 160 : else if (PQresultStatus(res) == PGRES_COPY_OUT)
698 : : {
699 : 160 : PQclear(res);
700 : :
701 : : /* End the copy */
2480 tgl@sss.pgh.pa.us 702 [ - + ]: 160 : if (PQendcopy(conn->streamConn))
2480 tgl@sss.pgh.pa.us 703 [ # # ]:UBC 0 : ereport(ERROR,
704 : : (errcode(ERRCODE_CONNECTION_FAILURE),
705 : : errmsg("error while shutting down streaming COPY: %s",
706 : : pchomp(PQerrorMessage(conn->streamConn)))));
707 : :
708 : : /* CommandComplete should follow */
1812 tgl@sss.pgh.pa.us 709 :CBC 160 : res = libpqrcv_PQgetResult(conn->streamConn);
710 : : }
711 : :
4104 heikki.linnakangas@i 712 [ - + ]: 172 : if (PQresultStatus(res) != PGRES_COMMAND_OK)
4104 heikki.linnakangas@i 713 [ # # ]:UBC 0 : ereport(ERROR,
714 : : (errcode(ERRCODE_PROTOCOL_VIOLATION),
715 : : errmsg("error reading result of streaming command: %s",
716 : : pchomp(PQerrorMessage(conn->streamConn)))));
3350 tgl@sss.pgh.pa.us 717 :CBC 172 : PQclear(res);
718 : :
719 : : /* Verify that there are no more results */
1812 720 : 172 : res = libpqrcv_PQgetResult(conn->streamConn);
4104 heikki.linnakangas@i 721 [ - + ]: 172 : if (res != NULL)
4104 heikki.linnakangas@i 722 [ # # ]:UBC 0 : ereport(ERROR,
723 : : (errcode(ERRCODE_PROTOCOL_VIOLATION),
724 : : errmsg("unexpected result after CommandComplete: %s",
725 : : pchomp(PQerrorMessage(conn->streamConn)))));
4140 heikki.linnakangas@i 726 :CBC 172 : }
727 : :
728 : : /*
729 : : * Fetch the timeline history file for 'tli' from primary.
730 : : */
731 : : static void
2692 peter_e@gmx.net 732 : 11 : libpqrcv_readtimelinehistoryfile(WalReceiverConn *conn,
733 : : TimeLineID tli, char **filename,
734 : : char **content, int *len)
735 : : {
736 : : PGresult *res;
737 : : char cmd[64];
738 : :
739 [ - + ]: 11 : Assert(!conn->logical);
740 : :
741 : : /*
742 : : * Request the primary to send over the history file for given timeline.
743 : : */
4140 heikki.linnakangas@i 744 : 11 : snprintf(cmd, sizeof(cmd), "TIMELINE_HISTORY %u", tli);
2692 peter_e@gmx.net 745 : 11 : res = libpqrcv_PQexec(conn->streamConn, cmd);
4140 heikki.linnakangas@i 746 [ - + ]: 11 : if (PQresultStatus(res) != PGRES_TUPLES_OK)
747 : : {
4140 heikki.linnakangas@i 748 :UBC 0 : PQclear(res);
749 [ # # ]: 0 : ereport(ERROR,
750 : : (errcode(ERRCODE_PROTOCOL_VIOLATION),
751 : : errmsg("could not receive timeline history file from "
752 : : "the primary server: %s",
753 : : pchomp(PQerrorMessage(conn->streamConn)))));
754 : : }
4140 heikki.linnakangas@i 755 [ + - - + ]:CBC 11 : if (PQnfields(res) != 2 || PQntuples(res) != 1)
756 : : {
4140 heikki.linnakangas@i 757 :UBC 0 : int ntuples = PQntuples(res);
758 : 0 : int nfields = PQnfields(res);
759 : :
760 : 0 : PQclear(res);
761 [ # # ]: 0 : ereport(ERROR,
762 : : (errcode(ERRCODE_PROTOCOL_VIOLATION),
763 : : errmsg("invalid response from primary server"),
764 : : errdetail("Expected 1 tuple with 2 fields, got %d tuples with %d fields.",
765 : : ntuples, nfields)));
766 : : }
4140 heikki.linnakangas@i 767 :CBC 11 : *filename = pstrdup(PQgetvalue(res, 0, 0));
768 : :
769 : 11 : *len = PQgetlength(res, 0, 1);
770 : 11 : *content = palloc(*len);
771 : 11 : memcpy(*content, PQgetvalue(res, 0, 1), *len);
772 : 11 : PQclear(res);
5198 773 : 11 : }
774 : :
775 : : /*
776 : : * Send a query and wait for the results by using the asynchronous libpq
777 : : * functions and socket readiness events.
778 : : *
779 : : * The function is modeled on libpqsrv_exec(), with the behavior difference
780 : : * being that it calls ProcessWalRcvInterrupts(). As an optimization, it
781 : : * skips try/catch, since all errors terminate the process.
782 : : *
783 : : * May return NULL, rather than an error result, on failure.
784 : : */
785 : : static PGresult *
2692 peter_e@gmx.net 786 : 3219 : libpqrcv_PQexec(PGconn *streamConn, const char *query)
787 : : {
5031 bruce@momjian.us 788 : 3219 : PGresult *lastResult = NULL;
789 : :
790 : : /*
791 : : * PQexec() silently discards any prior query results on the connection.
792 : : * This is not required for this function as it's expected that the caller
793 : : * (which is this library in all cases) will behave correctly and we don't
794 : : * have to be backwards compatible with old libpq.
795 : : */
796 : :
797 : : /*
798 : : * Submit the query. Since we don't use non-blocking mode, this could
799 : : * theoretically block. In practice, since we don't send very long query
800 : : * strings, the risk seems negligible.
801 : : */
5109 magnus@hagander.net 802 [ + - ]: 3219 : if (!PQsendQuery(streamConn, query))
5109 magnus@hagander.net 803 :UBC 0 : return NULL;
804 : :
805 : : for (;;)
5109 magnus@hagander.net 806 :CBC 2596 : {
807 : : /* Wait for, and collect, the next PGresult. */
808 : : PGresult *result;
809 : :
1812 tgl@sss.pgh.pa.us 810 : 5815 : result = libpqrcv_PQgetResult(streamConn);
811 [ + + ]: 5814 : if (result == NULL)
812 : 2596 : break; /* query is complete, or failure */
813 : :
814 : : /*
815 : : * Emulate PQexec()'s behavior of returning the last result when there
816 : : * are many. We are fine with returning just last error message.
817 : : */
5109 magnus@hagander.net 818 : 3218 : PQclear(lastResult);
819 : 3218 : lastResult = result;
820 : :
821 [ + - + + ]: 6436 : if (PQresultStatus(lastResult) == PGRES_COPY_IN ||
822 [ + + ]: 6268 : PQresultStatus(lastResult) == PGRES_COPY_OUT ||
4873 rhaas@postgresql.org 823 [ + - ]: 5646 : PQresultStatus(lastResult) == PGRES_COPY_BOTH ||
5109 magnus@hagander.net 824 : 2596 : PQstatus(streamConn) == CONNECTION_BAD)
825 : : break;
826 : : }
827 : :
828 : 3218 : return lastResult;
829 : : }
830 : :
831 : : /*
832 : : * Perform the equivalent of PQgetResult(), but watch for interrupts.
833 : : */
834 : : static PGresult *
1812 tgl@sss.pgh.pa.us 835 : 6741 : libpqrcv_PQgetResult(PGconn *streamConn)
836 : : {
837 : : /*
838 : : * Collect data until PQgetResult is ready to get the result without
839 : : * blocking.
840 : : */
841 [ + + ]: 9972 : while (PQisBusy(streamConn))
842 : : {
843 : : int rc;
844 : :
845 : : /*
846 : : * We don't need to break down the sleep into smaller increments,
847 : : * since we'll get interrupted by signals and can handle any
848 : : * interrupts here.
849 : : */
850 : 3262 : rc = WaitLatchOrSocket(MyLatch,
851 : : WL_EXIT_ON_PM_DEATH | WL_SOCKET_READABLE |
852 : : WL_LATCH_SET,
853 : : PQsocket(streamConn),
854 : : 0,
855 : : WAIT_EVENT_LIBPQWALRECEIVER_RECEIVE);
856 : :
857 : : /* Interrupted? */
858 [ + + ]: 3262 : if (rc & WL_LATCH_SET)
859 : : {
860 : 2 : ResetLatch(MyLatch);
861 : 2 : ProcessWalRcvInterrupts();
862 : : }
863 : :
864 : : /* Consume whatever data is available from the socket */
865 [ + + ]: 3261 : if (PQconsumeInput(streamConn) == 0)
866 : : {
867 : : /* trouble; return NULL */
868 : 30 : return NULL;
869 : : }
870 : : }
871 : :
872 : : /* Now we can collect and return the next PGresult */
873 : 6710 : return PQgetResult(streamConn);
874 : : }
875 : :
876 : : /*
877 : : * Disconnect connection to primary, if any.
878 : : */
879 : : static void
2692 peter_e@gmx.net 880 : 657 : libpqrcv_disconnect(WalReceiverConn *conn)
881 : : {
82 heikki.linnakangas@i 882 : 657 : PQfinish(conn->streamConn);
597 peter@eisentraut.org 883 : 657 : PQfreemem(conn->recvBuf);
2692 peter_e@gmx.net 884 : 657 : pfree(conn);
5198 heikki.linnakangas@i 885 : 657 : }
886 : :
887 : : /*
888 : : * Receive a message available from XLOG stream.
889 : : *
890 : : * Returns:
891 : : *
892 : : * If data was received, returns the length of the data. *buffer is set to
893 : : * point to a buffer holding the received message. The buffer is only valid
894 : : * until the next libpqrcv_* call.
895 : : *
896 : : * If no data was available immediately, returns 0, and *wait_fd is set to a
897 : : * socket descriptor which can be waited on before trying again.
898 : : *
899 : : * -1 if the server ended the COPY.
900 : : *
901 : : * ereports on error.
902 : : */
903 : : static int
2692 peter_e@gmx.net 904 : 331197 : libpqrcv_receive(WalReceiverConn *conn, char **buffer,
905 : : pgsocket *wait_fd)
906 : : {
907 : : int rawlen;
908 : :
597 peter@eisentraut.org 909 : 331197 : PQfreemem(conn->recvBuf);
2692 peter_e@gmx.net 910 : 331197 : conn->recvBuf = NULL;
911 : :
912 : : /* Try to receive a CopyData message */
913 : 331197 : rawlen = PQgetCopyData(conn->streamConn, &conn->recvBuf, 1);
4840 heikki.linnakangas@i 914 [ + + ]: 331197 : if (rawlen == 0)
915 : : {
916 : : /* Try consuming some data. */
2692 peter_e@gmx.net 917 [ + + ]: 211556 : if (PQconsumeInput(conn->streamConn) == 0)
5198 heikki.linnakangas@i 918 [ + - ]: 46 : ereport(ERROR,
919 : : (errcode(ERRCODE_CONNECTION_FAILURE),
920 : : errmsg("could not receive data from WAL stream: %s",
921 : : pchomp(PQerrorMessage(conn->streamConn)))));
922 : :
923 : : /* Now that we've consumed some input, try again */
2692 peter_e@gmx.net 924 : 211510 : rawlen = PQgetCopyData(conn->streamConn, &conn->recvBuf, 1);
4840 heikki.linnakangas@i 925 [ + + ]: 211510 : if (rawlen == 0)
926 : : {
927 : : /* Tell caller to try again when our socket is ready. */
2692 peter_e@gmx.net 928 : 104720 : *wait_fd = PQsocket(conn->streamConn);
4140 heikki.linnakangas@i 929 : 104720 : return 0;
930 : : }
931 : : }
5161 bruce@momjian.us 932 [ + + ]: 226431 : if (rawlen == -1) /* end-of-streaming or error */
933 : : {
934 : : PGresult *res;
935 : :
1812 tgl@sss.pgh.pa.us 936 : 214 : res = libpqrcv_PQgetResult(conn->streamConn);
2579 peter_e@gmx.net 937 [ + + ]: 214 : if (PQresultStatus(res) == PGRES_COMMAND_OK)
938 : : {
939 : 196 : PQclear(res);
940 : :
941 : : /* Verify that there are no more results. */
1812 tgl@sss.pgh.pa.us 942 : 196 : res = libpqrcv_PQgetResult(conn->streamConn);
2579 peter_e@gmx.net 943 [ - + ]: 196 : if (res != NULL)
944 : : {
2502 andres@anarazel.de 945 :UBC 0 : PQclear(res);
946 : :
947 : : /*
948 : : * If the other side closed the connection orderly (otherwise
949 : : * we'd seen an error, or PGRES_COPY_IN) don't report an error
950 : : * here, but let callers deal with it.
951 : : */
952 [ # # ]: 0 : if (PQstatus(conn->streamConn) == CONNECTION_BAD)
953 : 0 : return -1;
954 : :
2579 peter_e@gmx.net 955 [ # # ]: 0 : ereport(ERROR,
956 : : (errcode(ERRCODE_PROTOCOL_VIOLATION),
957 : : errmsg("unexpected result after CommandComplete: %s",
958 : : PQerrorMessage(conn->streamConn))));
959 : : }
960 : :
2579 peter_e@gmx.net 961 :CBC 196 : return -1;
962 : : }
963 [ + + ]: 18 : else if (PQresultStatus(res) == PGRES_COPY_IN)
964 : : {
4140 heikki.linnakangas@i 965 : 12 : PQclear(res);
966 : 12 : return -1;
967 : : }
968 : : else
969 : : {
5198 970 : 6 : PQclear(res);
971 [ + - ]: 6 : ereport(ERROR,
972 : : (errcode(ERRCODE_PROTOCOL_VIOLATION),
973 : : errmsg("could not receive data from WAL stream: %s",
974 : : pchomp(PQerrorMessage(conn->streamConn)))));
975 : : }
976 : : }
977 [ - + ]: 226217 : if (rawlen < -1)
5198 heikki.linnakangas@i 978 [ # # ]:UBC 0 : ereport(ERROR,
979 : : (errcode(ERRCODE_PROTOCOL_VIOLATION),
980 : : errmsg("could not receive data from WAL stream: %s",
981 : : pchomp(PQerrorMessage(conn->streamConn)))));
982 : :
983 : : /* Return received messages to caller */
2692 peter_e@gmx.net 984 :CBC 226217 : *buffer = conn->recvBuf;
4140 heikki.linnakangas@i 985 : 226217 : return rawlen;
986 : : }
987 : :
988 : : /*
989 : : * Send a message to XLOG stream.
990 : : *
991 : : * ereports on error.
992 : : */
993 : : static void
2692 peter_e@gmx.net 994 : 101063 : libpqrcv_send(WalReceiverConn *conn, const char *buffer, int nbytes)
995 : : {
996 [ + - - + ]: 202126 : if (PQputCopyData(conn->streamConn, buffer, nbytes) <= 0 ||
997 : 101063 : PQflush(conn->streamConn))
4873 rhaas@postgresql.org 998 [ # # ]:UBC 0 : ereport(ERROR,
999 : : (errcode(ERRCODE_CONNECTION_FAILURE),
1000 : : errmsg("could not send data to WAL stream: %s",
1001 : : pchomp(PQerrorMessage(conn->streamConn)))));
4873 rhaas@postgresql.org 1002 :CBC 101063 : }
1003 : :
1004 : : /*
1005 : : * Create new replication slot.
1006 : : * Returns the name of the exported snapshot for logical slot or NULL for
1007 : : * physical slot.
1008 : : */
1009 : : static char *
2642 peter_e@gmx.net 1010 : 262 : libpqrcv_create_slot(WalReceiverConn *conn, const char *slotname,
1011 : : bool temporary, bool two_phase, bool failover,
1012 : : CRSSnapshotAction snapshot_action, XLogRecPtr *lsn)
1013 : : {
1014 : : PGresult *res;
1015 : : StringInfoData cmd;
1016 : : char *snapshot;
1017 : : int use_new_options_syntax;
1018 : :
922 rhaas@postgresql.org 1019 : 262 : use_new_options_syntax = (PQserverVersion(conn->streamConn) >= 150000);
1020 : :
2642 peter_e@gmx.net 1021 : 262 : initStringInfo(&cmd);
1022 : :
2588 1023 : 262 : appendStringInfo(&cmd, "CREATE_REPLICATION_SLOT \"%s\"", slotname);
1024 : :
2642 1025 [ - + ]: 262 : if (temporary)
2434 peter_e@gmx.net 1026 :UBC 0 : appendStringInfoString(&cmd, " TEMPORARY");
1027 : :
2642 peter_e@gmx.net 1028 [ + - ]:CBC 262 : if (conn->logical)
1029 : : {
922 rhaas@postgresql.org 1030 : 262 : appendStringInfoString(&cmd, " LOGICAL pgoutput ");
1031 [ + - ]: 262 : if (use_new_options_syntax)
1032 : 262 : appendStringInfoChar(&cmd, '(');
1005 akapila@postgresql.o 1033 [ + + ]: 262 : if (two_phase)
1034 : : {
922 rhaas@postgresql.org 1035 : 1 : appendStringInfoString(&cmd, "TWO_PHASE");
1036 [ + - ]: 1 : if (use_new_options_syntax)
1037 : 1 : appendStringInfoString(&cmd, ", ");
1038 : : else
922 rhaas@postgresql.org 1039 :UBC 0 : appendStringInfoChar(&cmd, ' ');
1040 : : }
1041 : :
76 akapila@postgresql.o 1042 [ + + ]:GNC 262 : if (failover)
1043 : : {
1044 : 5 : appendStringInfoString(&cmd, "FAILOVER");
1045 [ + - ]: 5 : if (use_new_options_syntax)
1046 : 5 : appendStringInfoString(&cmd, ", ");
1047 : : else
76 akapila@postgresql.o 1048 :UNC 0 : appendStringInfoChar(&cmd, ' ');
1049 : : }
1050 : :
922 rhaas@postgresql.org 1051 [ + - ]:CBC 262 : if (use_new_options_syntax)
1052 : : {
1053 [ - + + - ]: 262 : switch (snapshot_action)
1054 : : {
922 rhaas@postgresql.org 1055 :UBC 0 : case CRS_EXPORT_SNAPSHOT:
1056 : 0 : appendStringInfoString(&cmd, "SNAPSHOT 'export'");
1057 : 0 : break;
922 rhaas@postgresql.org 1058 :CBC 93 : case CRS_NOEXPORT_SNAPSHOT:
1059 : 93 : appendStringInfoString(&cmd, "SNAPSHOT 'nothing'");
1060 : 93 : break;
1061 : 169 : case CRS_USE_SNAPSHOT:
1062 : 169 : appendStringInfoString(&cmd, "SNAPSHOT 'use'");
1063 : 169 : break;
1064 : : }
1065 : : }
1066 : : else
1067 : : {
922 rhaas@postgresql.org 1068 [ # # # # ]:UBC 0 : switch (snapshot_action)
1069 : : {
1070 : 0 : case CRS_EXPORT_SNAPSHOT:
1071 : 0 : appendStringInfoString(&cmd, "EXPORT_SNAPSHOT");
1072 : 0 : break;
1073 : 0 : case CRS_NOEXPORT_SNAPSHOT:
1074 : 0 : appendStringInfoString(&cmd, "NOEXPORT_SNAPSHOT");
1075 : 0 : break;
1076 : 0 : case CRS_USE_SNAPSHOT:
1077 : 0 : appendStringInfoString(&cmd, "USE_SNAPSHOT");
1078 : 0 : break;
1079 : : }
1080 : : }
1081 : :
922 rhaas@postgresql.org 1082 [ + - ]:CBC 262 : if (use_new_options_syntax)
1083 : 262 : appendStringInfoChar(&cmd, ')');
1084 : : }
1085 : : else
1086 : : {
922 rhaas@postgresql.org 1087 [ # # ]:UBC 0 : if (use_new_options_syntax)
1088 : 0 : appendStringInfoString(&cmd, " PHYSICAL (RESERVE_WAL)");
1089 : : else
1090 : 0 : appendStringInfoString(&cmd, " PHYSICAL RESERVE_WAL");
1091 : : }
1092 : :
2642 peter_e@gmx.net 1093 :CBC 262 : res = libpqrcv_PQexec(conn->streamConn, cmd.data);
1094 : 262 : pfree(cmd.data);
1095 : :
1096 [ - + ]: 262 : if (PQresultStatus(res) != PGRES_TUPLES_OK)
1097 : : {
2642 peter_e@gmx.net 1098 :UBC 0 : PQclear(res);
1099 [ # # ]: 0 : ereport(ERROR,
1100 : : (errcode(ERRCODE_PROTOCOL_VIOLATION),
1101 : : errmsg("could not create replication slot \"%s\": %s",
1102 : : slotname, pchomp(PQerrorMessage(conn->streamConn)))));
1103 : : }
1104 : :
1555 peter@eisentraut.org 1105 [ + + ]:CBC 262 : if (lsn)
1106 : 169 : *lsn = DatumGetLSN(DirectFunctionCall1Coll(pg_lsn_in, InvalidOid,
1107 : 169 : CStringGetDatum(PQgetvalue(res, 0, 1))));
1108 : :
2642 peter_e@gmx.net 1109 [ - + ]: 262 : if (!PQgetisnull(res, 0, 2))
2642 peter_e@gmx.net 1110 :UBC 0 : snapshot = pstrdup(PQgetvalue(res, 0, 2));
1111 : : else
2642 peter_e@gmx.net 1112 :CBC 262 : snapshot = NULL;
1113 : :
1114 : 262 : PQclear(res);
1115 : :
1116 : 262 : return snapshot;
1117 : : }
1118 : :
1119 : : /*
1120 : : * Change the definition of the replication slot.
1121 : : */
1122 : : static void
76 akapila@postgresql.o 1123 :GNC 9 : libpqrcv_alter_slot(WalReceiverConn *conn, const char *slotname,
1124 : : bool failover)
1125 : : {
1126 : : StringInfoData cmd;
1127 : : PGresult *res;
1128 : :
1129 : 9 : initStringInfo(&cmd);
1130 [ + + ]: 9 : appendStringInfo(&cmd, "ALTER_REPLICATION_SLOT %s ( FAILOVER %s )",
1131 : : quote_identifier(slotname),
1132 : : failover ? "true" : "false");
1133 : :
1134 : 9 : res = libpqrcv_PQexec(conn->streamConn, cmd.data);
1135 : 9 : pfree(cmd.data);
1136 : :
1137 [ - + ]: 9 : if (PQresultStatus(res) != PGRES_COMMAND_OK)
76 akapila@postgresql.o 1138 [ # # ]:UNC 0 : ereport(ERROR,
1139 : : (errcode(ERRCODE_PROTOCOL_VIOLATION),
1140 : : errmsg("could not alter replication slot \"%s\": %s",
1141 : : slotname, pchomp(PQerrorMessage(conn->streamConn)))));
1142 : :
76 akapila@postgresql.o 1143 :GNC 9 : PQclear(res);
1144 : 9 : }
1145 : :
1146 : : /*
1147 : : * Return PID of remote backend process.
1148 : : */
1149 : : static pid_t
1552 peter@eisentraut.org 1150 :UBC 0 : libpqrcv_get_backend_pid(WalReceiverConn *conn)
1151 : : {
1152 : 0 : return PQbackendPID(conn->streamConn);
1153 : : }
1154 : :
1155 : : /*
1156 : : * Convert tuple query result to tuplestore.
1157 : : */
1158 : : static void
2579 peter_e@gmx.net 1159 :CBC 942 : libpqrcv_processTuples(PGresult *pgres, WalRcvExecResult *walres,
1160 : : const int nRetTypes, const Oid *retTypes)
1161 : : {
1162 : : int tupn;
1163 : : int coln;
2524 bruce@momjian.us 1164 : 942 : int nfields = PQnfields(pgres);
1165 : : HeapTuple tuple;
1166 : : AttInMetadata *attinmeta;
1167 : : MemoryContext rowcontext;
1168 : : MemoryContext oldcontext;
1169 : :
1170 : : /* Make sure we got expected number of fields. */
2579 peter_e@gmx.net 1171 [ - + ]: 942 : if (nfields != nRetTypes)
2579 peter_e@gmx.net 1172 [ # # ]:UBC 0 : ereport(ERROR,
1173 : : (errcode(ERRCODE_PROTOCOL_VIOLATION),
1174 : : errmsg("invalid query response"),
1175 : : errdetail("Expected %d fields, got %d fields.",
1176 : : nRetTypes, nfields)));
1177 : :
2579 peter_e@gmx.net 1178 :CBC 942 : walres->tuplestore = tuplestore_begin_heap(true, false, work_mem);
1179 : :
1180 : : /* Create tuple descriptor corresponding to expected result. */
1972 andres@anarazel.de 1181 : 942 : walres->tupledesc = CreateTemplateTupleDesc(nRetTypes);
2579 peter_e@gmx.net 1182 [ + + ]: 3129 : for (coln = 0; coln < nRetTypes; coln++)
1183 : 2187 : TupleDescInitEntry(walres->tupledesc, (AttrNumber) coln + 1,
1184 : 2187 : PQfname(pgres, coln), retTypes[coln], -1, 0);
1185 : 942 : attinmeta = TupleDescGetAttInMetadata(walres->tupledesc);
1186 : :
1187 : : /* No point in doing more here if there were no tuples returned. */
2578 1188 [ + + ]: 942 : if (PQntuples(pgres) == 0)
1189 : 15 : return;
1190 : :
1191 : : /* Create temporary context for local allocations. */
2579 1192 : 927 : rowcontext = AllocSetContextCreate(CurrentMemoryContext,
1193 : : "libpqrcv query result context",
1194 : : ALLOCSET_DEFAULT_SIZES);
1195 : :
1196 : : /* Process returned rows. */
1197 [ + + ]: 2143 : for (tupn = 0; tupn < PQntuples(pgres); tupn++)
1198 : : {
1199 : : char *cstrs[MaxTupleAttributeNumber];
1200 : :
1812 tgl@sss.pgh.pa.us 1201 : 1217 : ProcessWalRcvInterrupts();
1202 : :
1203 : : /* Do the allocations in temporary context. */
2579 peter_e@gmx.net 1204 : 1216 : oldcontext = MemoryContextSwitchTo(rowcontext);
1205 : :
1206 : : /*
1207 : : * Fill cstrs with null-terminated strings of column values.
1208 : : */
1209 [ + + ]: 4384 : for (coln = 0; coln < nfields; coln++)
1210 : : {
1211 [ + + ]: 3168 : if (PQgetisnull(pgres, tupn, coln))
1212 : 445 : cstrs[coln] = NULL;
1213 : : else
1214 : 2723 : cstrs[coln] = PQgetvalue(pgres, tupn, coln);
1215 : : }
1216 : :
1217 : : /* Convert row to a tuple, and add it to the tuplestore */
1218 : 1216 : tuple = BuildTupleFromCStrings(attinmeta, cstrs);
1219 : 1216 : tuplestore_puttuple(walres->tuplestore, tuple);
1220 : :
1221 : : /* Clean up */
1222 : 1216 : MemoryContextSwitchTo(oldcontext);
1223 : 1216 : MemoryContextReset(rowcontext);
1224 : : }
1225 : :
1226 : 926 : MemoryContextDelete(rowcontext);
1227 : : }
1228 : :
1229 : : /*
1230 : : * Public interface for sending generic queries (and commands).
1231 : : *
1232 : : * This can only be called from process connected to database.
1233 : : */
1234 : : static WalRcvExecResult *
1235 : 1659 : libpqrcv_exec(WalReceiverConn *conn, const char *query,
1236 : : const int nRetTypes, const Oid *retTypes)
1237 : : {
1238 : 1659 : PGresult *pgres = NULL;
1239 : 1659 : WalRcvExecResult *walres = palloc0(sizeof(WalRcvExecResult));
1240 : : char *diag_sqlstate;
1241 : :
1242 [ - + ]: 1659 : if (MyDatabaseId == InvalidOid)
2579 peter_e@gmx.net 1243 [ # # ]:UBC 0 : ereport(ERROR,
1244 : : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1245 : : errmsg("the query interface requires a database connection")));
1246 : :
2579 peter_e@gmx.net 1247 :CBC 1659 : pgres = libpqrcv_PQexec(conn->streamConn, query);
1248 : :
1249 [ + - + - : 1659 : switch (PQresultStatus(pgres))
+ - - -
- ]
1250 : : {
2579 peter_e@gmx.net 1251 :GIC 942 : case PGRES_TUPLES_OK:
1252 : : case PGRES_SINGLE_TUPLE:
1253 : : case PGRES_TUPLES_CHUNK:
2579 peter_e@gmx.net 1254 :CBC 942 : walres->status = WALRCV_OK_TUPLES;
1255 : 942 : libpqrcv_processTuples(pgres, walres, nRetTypes, retTypes);
1256 : 941 : break;
1257 : :
2579 peter_e@gmx.net 1258 :UBC 0 : case PGRES_COPY_IN:
1259 : 0 : walres->status = WALRCV_OK_COPY_IN;
1260 : 0 : break;
1261 : :
2579 peter_e@gmx.net 1262 :CBC 168 : case PGRES_COPY_OUT:
1263 : 168 : walres->status = WALRCV_OK_COPY_OUT;
1264 : 168 : break;
1265 : :
2579 peter_e@gmx.net 1266 :UBC 0 : case PGRES_COPY_BOTH:
1267 : 0 : walres->status = WALRCV_OK_COPY_BOTH;
1268 : 0 : break;
1269 : :
2579 peter_e@gmx.net 1270 :CBC 549 : case PGRES_COMMAND_OK:
1271 : 549 : walres->status = WALRCV_OK_COMMAND;
1272 : 549 : break;
1273 : :
1274 : : /* Empty query is considered error. */
2579 peter_e@gmx.net 1275 :UBC 0 : case PGRES_EMPTY_QUERY:
1276 : 0 : walres->status = WALRCV_ERROR;
1277 : 0 : walres->err = _("empty query");
1278 : 0 : break;
1279 : :
1126 alvherre@alvh.no-ip. 1280 : 0 : case PGRES_PIPELINE_SYNC:
1281 : : case PGRES_PIPELINE_ABORTED:
1282 : 0 : walres->status = WALRCV_ERROR;
1283 : 0 : walres->err = _("unexpected pipeline mode");
1284 : 0 : break;
1285 : :
2579 peter_e@gmx.net 1286 : 0 : case PGRES_NONFATAL_ERROR:
1287 : : case PGRES_FATAL_ERROR:
1288 : : case PGRES_BAD_RESPONSE:
1289 : 0 : walres->status = WALRCV_ERROR;
1290 : 0 : walres->err = pchomp(PQerrorMessage(conn->streamConn));
1157 akapila@postgresql.o 1291 : 0 : diag_sqlstate = PQresultErrorField(pgres, PG_DIAG_SQLSTATE);
1292 [ # # ]: 0 : if (diag_sqlstate)
1293 : 0 : walres->sqlstate = MAKE_SQLSTATE(diag_sqlstate[0],
1294 : : diag_sqlstate[1],
1295 : : diag_sqlstate[2],
1296 : : diag_sqlstate[3],
1297 : : diag_sqlstate[4]);
2579 peter_e@gmx.net 1298 : 0 : break;
1299 : : }
1300 : :
2579 peter_e@gmx.net 1301 :CBC 1658 : PQclear(pgres);
1302 : :
1303 : 1658 : return walres;
1304 : : }
1305 : :
1306 : : /*
1307 : : * Given a List of strings, return it as single comma separated
1308 : : * string, quoting identifiers as needed.
1309 : : *
1310 : : * This is essentially the reverse of SplitIdentifierString.
1311 : : *
1312 : : * The caller should free the result.
1313 : : */
1314 : : static char *
2642 1315 : 326 : stringlist_to_identifierstr(PGconn *conn, List *strings)
1316 : : {
1317 : : ListCell *lc;
1318 : : StringInfoData res;
2524 bruce@momjian.us 1319 : 326 : bool first = true;
1320 : :
2642 peter_e@gmx.net 1321 : 326 : initStringInfo(&res);
1322 : :
2524 bruce@momjian.us 1323 [ + - + + : 871 : foreach(lc, strings)
+ + ]
1324 : : {
1325 : 545 : char *val = strVal(lfirst(lc));
1326 : : char *val_escaped;
1327 : :
2642 peter_e@gmx.net 1328 [ + + ]: 545 : if (first)
1329 : 326 : first = false;
1330 : : else
1331 : 219 : appendStringInfoChar(&res, ',');
1332 : :
2638 1333 : 545 : val_escaped = PQescapeIdentifier(conn, val, strlen(val));
1334 [ - + ]: 545 : if (!val_escaped)
1335 : : {
2638 peter_e@gmx.net 1336 :UBC 0 : free(res.data);
1337 : 0 : return NULL;
1338 : : }
2638 peter_e@gmx.net 1339 :CBC 545 : appendStringInfoString(&res, val_escaped);
1340 : 545 : PQfreemem(val_escaped);
1341 : : }
1342 : :
2642 1343 : 326 : return res.data;
1344 : : }
|