Age Owner TLA Line data Source code
1 : /*-------------------------------------------------------------------------
2 : *
3 : * libpqwalreceiver.c
4 : *
5 : * This file contains the libpq-specific parts of walreceiver. It's
6 : * loaded as a dynamic module to avoid linking the main server binary with
7 : * libpq.
8 : *
9 : * Portions Copyright (c) 2010-2023, PostgreSQL Global Development Group
10 : *
11 : *
12 : * IDENTIFICATION
13 : * src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
14 : *
15 : *-------------------------------------------------------------------------
16 : */
17 : #include "postgres.h"
18 :
19 : #include <unistd.h>
20 : #include <sys/time.h>
21 :
22 : #include "access/xlog.h"
23 : #include "catalog/pg_type.h"
24 : #include "common/connect.h"
25 : #include "funcapi.h"
26 : #include "libpq-fe.h"
27 : #include "libpq/libpq-be-fe-helpers.h"
28 : #include "mb/pg_wchar.h"
29 : #include "miscadmin.h"
30 : #include "pgstat.h"
31 : #include "pqexpbuffer.h"
32 : #include "replication/walreceiver.h"
33 : #include "utils/builtins.h"
34 : #include "utils/memutils.h"
35 : #include "utils/pg_lsn.h"
36 : #include "utils/tuplestore.h"
37 :
4827 heikki.linnakangas 38 GIC 615 : PG_MODULE_MAGIC;
4827 heikki.linnakangas 39 ECB :
40 : struct WalReceiverConn
41 : {
42 : /* Current connection to the primary, if any */
43 : PGconn *streamConn;
44 : /* Used to remember if the connection is logical or physical */
45 : bool logical;
46 : /* Buffer for currently read records */
47 : char *recvBuf;
48 : };
49 :
50 : /* Prototypes for interface functions */
51 : static WalReceiverConn *libpqrcv_connect(const char *conninfo,
52 : bool logical, bool must_use_password,
53 : const char *appname, char **err);
54 : static void libpqrcv_check_conninfo(const char *conninfo,
55 : bool must_use_password);
56 : static char *libpqrcv_get_conninfo(WalReceiverConn *conn);
57 : static void libpqrcv_get_senderinfo(WalReceiverConn *conn,
58 : char **sender_host, int *sender_port);
59 : static char *libpqrcv_identify_system(WalReceiverConn *conn,
60 : TimeLineID *primary_tli);
61 : static int libpqrcv_server_version(WalReceiverConn *conn);
62 : static void libpqrcv_readtimelinehistoryfile(WalReceiverConn *conn,
63 : TimeLineID tli, char **filename,
64 : char **content, int *len);
65 : static bool libpqrcv_startstreaming(WalReceiverConn *conn,
66 : const WalRcvStreamOptions *options);
67 : static void libpqrcv_endstreaming(WalReceiverConn *conn,
68 : TimeLineID *next_tli);
69 : static int libpqrcv_receive(WalReceiverConn *conn, char **buffer,
70 : pgsocket *wait_fd);
71 : static void libpqrcv_send(WalReceiverConn *conn, const char *buffer,
72 : int nbytes);
73 : static char *libpqrcv_create_slot(WalReceiverConn *conn,
74 : const char *slotname,
75 : bool temporary,
76 : bool two_phase,
77 : CRSSnapshotAction snapshot_action,
78 : XLogRecPtr *lsn);
79 : static pid_t libpqrcv_get_backend_pid(WalReceiverConn *conn);
80 : static WalRcvExecResult *libpqrcv_exec(WalReceiverConn *conn,
81 : const char *query,
82 : const int nRetTypes,
83 : const Oid *retTypes);
84 : static void libpqrcv_disconnect(WalReceiverConn *conn);
85 :
86 : static WalReceiverFunctionsType PQWalReceiverFunctions = {
87 : .walrcv_connect = libpqrcv_connect,
88 : .walrcv_check_conninfo = libpqrcv_check_conninfo,
89 : .walrcv_get_conninfo = libpqrcv_get_conninfo,
90 : .walrcv_get_senderinfo = libpqrcv_get_senderinfo,
91 : .walrcv_identify_system = libpqrcv_identify_system,
92 : .walrcv_server_version = libpqrcv_server_version,
93 : .walrcv_readtimelinehistoryfile = libpqrcv_readtimelinehistoryfile,
94 : .walrcv_startstreaming = libpqrcv_startstreaming,
95 : .walrcv_endstreaming = libpqrcv_endstreaming,
96 : .walrcv_receive = libpqrcv_receive,
97 : .walrcv_send = libpqrcv_send,
98 : .walrcv_create_slot = libpqrcv_create_slot,
99 : .walrcv_get_backend_pid = libpqrcv_get_backend_pid,
100 : .walrcv_exec = libpqrcv_exec,
101 : .walrcv_disconnect = libpqrcv_disconnect
102 : };
103 :
104 : /* Prototypes for private functions */
105 : static PGresult *libpqrcv_PQexec(PGconn *streamConn, const char *query);
106 : static PGresult *libpqrcv_PQgetResult(PGconn *streamConn);
107 : static char *stringlist_to_identifierstr(PGconn *conn, List *strings);
108 :
109 : /*
110 : * Module initialization function
111 : */
112 : void
4827 heikki.linnakangas 113 CBC 615 : _PG_init(void)
114 : {
2321 peter_e 115 615 : if (WalReceiverFunctions != NULL)
4827 heikki.linnakangas 116 UBC 0 : elog(ERROR, "libpqwalreceiver already loaded");
2321 peter_e 117 CBC 615 : WalReceiverFunctions = &PQWalReceiverFunctions;
4827 heikki.linnakangas 118 615 : }
119 :
120 : /*
121 : * Establish the connection to the primary server for XLOG streaming
122 : *
123 : * If an error occurs, this function will normally return NULL and set *err
124 : * to a palloc'ed error message. However, if must_use_password is true and
125 : * the connection fails to use the password, this function will ereport(ERROR).
126 : * We do this because in that case the error includes a detail and a hint for
127 : * consistency with other parts of the system, and it's not worth adding the
128 : * machinery to pass all of those back to the caller just to cover this one
129 : * case.
130 : */
131 : static WalReceiverConn *
10 rhaas 132 GNC 608 : libpqrcv_connect(const char *conninfo, bool logical, bool must_use_password,
133 : const char *appname, char **err)
134 : {
135 : WalReceiverConn *conn;
136 : const char *keys[6];
523 tgl 137 ECB : const char *vals[6];
2321 peter_e 138 GIC 608 : int i = 0;
139 :
140 : /*
141 : * We use the expand_dbname parameter to process the connection string (or
142 : * URI), and pass some extra options.
4685 heikki.linnakangas 143 ECB : */
2321 peter_e 144 GIC 608 : keys[i] = "dbname";
145 608 : vals[i] = conninfo;
146 608 : keys[++i] = "replication";
147 608 : vals[i] = logical ? "database" : "true";
148 608 : if (!logical)
2321 peter_e 149 ECB : {
1821 150 : /*
151 : * The database name is ignored by the server in replication mode, but
152 : * specify "replication" for .pgpass lookup.
153 : */
2321 peter_e 154 GIC 162 : keys[++i] = "dbname";
155 162 : vals[i] = "replication";
156 : }
157 608 : keys[++i] = "fallback_application_name";
158 608 : vals[i] = appname;
2236 peter_e 159 CBC 608 : if (logical)
2236 peter_e 160 ECB : {
161 : /* Tell the publisher to translate to our encoding */
2236 peter_e 162 CBC 446 : keys[++i] = "client_encoding";
163 446 : vals[i] = GetDatabaseEncodingName();
523 tgl 164 ECB :
165 : /*
166 : * Force assorted GUC parameters to settings that ensure that the
167 : * publisher will output data values in a form that is unambiguous to
168 : * the subscriber. (We don't want to modify the subscriber's GUC
169 : * settings, since that might surprise user-defined code running in
170 : * the subscriber, such as triggers.) This should match what pg_dump
171 : * does.
172 : */
523 tgl 173 GIC 446 : keys[++i] = "options";
174 446 : vals[i] = "-c datestyle=ISO -c intervalstyle=postgres -c extra_float_digits=3";
175 : }
2321 peter_e 176 608 : keys[++i] = NULL;
177 608 : vals[i] = NULL;
2321 peter_e 178 ECB :
2236 peter_e 179 CBC 608 : Assert(i < sizeof(keys));
180 :
2321 181 608 : conn = palloc0(sizeof(WalReceiverConn));
76 andres 182 GNC 607 : conn->streamConn =
183 608 : libpqsrv_connect_params(keys, vals,
184 : /* expand_dbname = */ true,
185 : WAIT_EVENT_LIBPQWALRECEIVER_CONNECT);
2228 peter_e 186 ECB :
2321 peter_e 187 GIC 607 : if (PQstatus(conn->streamConn) != CONNECTION_OK)
76 andres 188 71 : goto bad_connection_errmsg;
2271 peter_e 189 ECB :
10 rhaas 190 GNC 536 : if (must_use_password && !PQconnectionUsedPassword(conn->streamConn))
191 : {
10 rhaas 192 UNC 0 : libpqsrv_disconnect(conn->streamConn);
193 0 : pfree(conn);
194 :
195 0 : ereport(ERROR,
196 : (errcode(ERRCODE_S_R_E_PROHIBITED_SQL_STATEMENT_ATTEMPTED),
197 : errmsg("password is required"),
198 : errdetail("Non-superuser cannot connect if the server does not request a password."),
199 : errhint("Target server's authentication method must be changed. or set password_required=false in the subscription attributes.")));
200 : }
201 :
972 noah 202 CBC 536 : if (logical)
203 : {
204 : PGresult *res;
972 noah 205 ECB :
972 noah 206 CBC 432 : res = libpqrcv_PQexec(conn->streamConn,
972 noah 207 ECB : ALWAYS_SECURE_SEARCH_PATH_SQL);
972 noah 208 CBC 432 : if (PQresultStatus(res) != PGRES_TUPLES_OK)
209 : {
972 noah 210 UIC 0 : PQclear(res);
76 andres 211 0 : *err = psprintf(_("could not clear search path: %s"),
212 0 : pchomp(PQerrorMessage(conn->streamConn)));
213 0 : goto bad_connection;
214 : }
972 noah 215 GIC 432 : PQclear(res);
216 : }
217 :
2321 peter_e 218 536 : conn->logical = logical;
219 :
2321 peter_e 220 CBC 536 : return conn;
221 :
76 andres 222 ECB : /* error path, using libpq's error message */
76 andres 223 GIC 71 : bad_connection_errmsg:
76 andres 224 CBC 71 : *err = pchomp(PQerrorMessage(conn->streamConn));
225 :
76 andres 226 ECB : /* error path, error already set */
76 andres 227 CBC 71 : bad_connection:
76 andres 228 GNC 71 : libpqsrv_disconnect(conn->streamConn);
76 andres 229 GIC 71 : pfree(conn);
76 andres 230 CBC 71 : return NULL;
231 : }
3769 heikki.linnakangas 232 ECB :
2271 peter_e 233 : /*
234 : * Validate connection info string, and determine whether it might cause
235 : * local filesystem access to be attempted.
236 : *
237 : * If the connection string can't be parsed, this function will raise
238 : * an error and will not return. If it can, it will return true if this
239 : * connection string specifies a password and false otherwise.
240 : */
241 : static void
10 rhaas 242 GNC 130 : libpqrcv_check_conninfo(const char *conninfo, bool must_use_password)
2271 peter_e 243 ECB : {
2153 bruce 244 GIC 130 : PQconninfoOption *opts = NULL;
245 : PQconninfoOption *opt;
2153 bruce 246 CBC 130 : char *err = NULL;
247 :
2271 peter_e 248 130 : opts = PQconninfoParse(conninfo, &err);
2271 peter_e 249 GIC 130 : if (opts == NULL)
250 : {
752 tgl 251 ECB : /* The error string is malloc'd, so we must free it explicitly */
752 tgl 252 CBC 9 : char *errcopy = err ? pstrdup(err) : "out of memory";
253 :
254 9 : PQfreemem(err);
2271 peter_e 255 GIC 9 : ereport(ERROR,
2271 peter_e 256 ECB : (errcode(ERRCODE_SYNTAX_ERROR),
752 tgl 257 : errmsg("invalid connection string syntax: %s", errcopy)));
258 : }
259 :
10 rhaas 260 GNC 121 : if (must_use_password)
261 : {
262 6 : bool uses_password = false;
263 :
264 129 : for (opt = opts; opt->keyword != NULL; ++opt)
265 : {
266 : /* Ignore connection options that are not present. */
267 126 : if (opt->val == NULL)
268 120 : continue;
269 :
270 6 : if (strcmp(opt->keyword, "password") == 0 && opt->val[0] != '\0')
271 : {
272 3 : uses_password = true;
273 3 : break;
274 : }
275 : }
276 :
277 6 : if (!uses_password)
278 3 : ereport(ERROR,
279 : (errcode(ERRCODE_S_R_E_PROHIBITED_SQL_STATEMENT_ATTEMPTED),
280 : errmsg("password is required"),
281 : errdetail("Non-superusers must provide a password in the connection string.")));
282 : }
283 :
2271 peter_e 284 GIC 118 : PQconninfoFree(opts);
2271 peter_e 285 CBC 118 : }
2271 peter_e 286 ECB :
287 : /*
288 : * Return a user-displayable conninfo string. Any security-sensitive fields
289 : * are obfuscated.
290 : */
291 : static char *
2321 peter_e 292 CBC 104 : libpqrcv_get_conninfo(WalReceiverConn *conn)
2475 alvherre 293 ECB : {
294 : PQconninfoOption *conn_opts;
295 : PQconninfoOption *conn_opt;
296 : PQExpBufferData buf;
297 : char *retval;
298 :
2321 peter_e 299 GIC 104 : Assert(conn->streamConn != NULL);
2475 alvherre 300 ECB :
2475 alvherre 301 GIC 104 : initPQExpBuffer(&buf);
2321 peter_e 302 104 : conn_opts = PQconninfo(conn->streamConn);
303 :
2475 alvherre 304 104 : if (conn_opts == NULL)
2475 alvherre 305 UIC 0 : ereport(ERROR,
306 : (errcode(ERRCODE_OUT_OF_MEMORY),
662 tgl 307 ECB : errmsg("could not parse connection string: %s",
308 : _("out of memory"))));
2475 alvherre 309 :
310 : /* build a clean connection string from pieces */
2475 alvherre 311 GIC 4160 : for (conn_opt = conn_opts; conn_opt->keyword != NULL; conn_opt++)
2475 alvherre 312 ECB : {
2428 tgl 313 EUB : bool obfuscate;
314 :
315 : /* Skip debug and empty options */
2475 alvherre 316 GIC 4056 : if (strchr(conn_opt->dispchar, 'D') ||
317 3952 : conn_opt->val == NULL ||
318 1768 : conn_opt->val[0] == '\0')
2475 alvherre 319 CBC 2392 : continue;
320 :
321 : /* Obfuscate security-sensitive options */
2475 alvherre 322 GIC 1664 : obfuscate = strchr(conn_opt->dispchar, '*') != NULL;
323 :
2475 alvherre 324 CBC 3328 : appendPQExpBuffer(&buf, "%s%s=%s",
325 1664 : buf.len == 0 ? "" : " ",
2475 alvherre 326 ECB : conn_opt->keyword,
327 : obfuscate ? "********" : conn_opt->val);
328 : }
329 :
2475 alvherre 330 CBC 104 : PQconninfoFree(conn_opts);
331 :
332 104 : retval = PQExpBufferDataBroken(buf) ? NULL : pstrdup(buf.data);
333 104 : termPQExpBuffer(&buf);
2475 alvherre 334 GIC 104 : return retval;
335 : }
336 :
337 : /*
1835 fujii 338 ECB : * Provides information of sender this WAL receiver is connected to.
339 : */
340 : static void
1835 fujii 341 CBC 104 : libpqrcv_get_senderinfo(WalReceiverConn *conn, char **sender_host,
1809 tgl 342 ECB : int *sender_port)
343 : {
1809 tgl 344 GIC 104 : char *ret = NULL;
345 :
1835 fujii 346 104 : *sender_host = NULL;
347 104 : *sender_port = 0;
348 :
1835 fujii 349 CBC 104 : Assert(conn->streamConn != NULL);
350 :
1835 fujii 351 GIC 104 : ret = PQhost(conn->streamConn);
1835 fujii 352 CBC 104 : if (ret && strlen(ret) != 0)
1835 fujii 353 GIC 104 : *sender_host = pstrdup(ret);
1835 fujii 354 ECB :
1835 fujii 355 CBC 104 : ret = PQport(conn->streamConn);
1835 fujii 356 GIC 104 : if (ret && strlen(ret) != 0)
1835 fujii 357 CBC 104 : *sender_port = atoi(ret);
1835 fujii 358 GIC 104 : }
1835 fujii 359 ECB :
3769 heikki.linnakangas 360 : /*
361 : * Check that primary's system identifier matches ours, and fetch the current
362 : * timeline ID of the primary.
363 : */
2321 peter_e 364 : static char *
1486 peter 365 CBC 237 : libpqrcv_identify_system(WalReceiverConn *conn, TimeLineID *primary_tli)
3769 heikki.linnakangas 366 ECB : {
367 : PGresult *res;
368 : char *primary_sysid;
369 :
370 : /*
371 : * Get the system identifier and timeline ID as a DataRow message from the
372 : * primary server.
4827 373 : */
2321 peter_e 374 GIC 237 : res = libpqrcv_PQexec(conn->streamConn, "IDENTIFY_SYSTEM");
4827 heikki.linnakangas 375 237 : if (PQresultStatus(res) != PGRES_TUPLES_OK)
376 : {
4827 heikki.linnakangas 377 UIC 0 : PQclear(res);
378 0 : ereport(ERROR,
379 : (errcode(ERRCODE_PROTOCOL_VIOLATION),
380 : errmsg("could not receive database system identifier and timeline ID from "
381 : "the primary server: %s",
2232 peter_e 382 ECB : pchomp(PQerrorMessage(conn->streamConn)))));
4790 bruce 383 : }
3155 fujii 384 GIC 237 : if (PQnfields(res) < 3 || PQntuples(res) != 1)
4827 heikki.linnakangas 385 EUB : {
4790 bruce 386 UBC 0 : int ntuples = PQntuples(res);
4790 bruce 387 UIC 0 : int nfields = PQnfields(res);
388 :
4827 heikki.linnakangas 389 0 : PQclear(res);
390 0 : ereport(ERROR,
391 : (errcode(ERRCODE_PROTOCOL_VIOLATION),
662 tgl 392 ECB : errmsg("invalid response from primary server"),
393 : errdetail("Could not identify system: got %d rows and %d fields, expected %d rows and %d or more fields.",
3155 fujii 394 EUB : ntuples, nfields, 3, 1)));
4827 heikki.linnakangas 395 : }
2321 peter_e 396 GIC 237 : primary_sysid = pstrdup(PQgetvalue(res, 0, 0));
1722 andres 397 GBC 237 : *primary_tli = pg_strtoint32(PQgetvalue(res, 0, 1));
4827 heikki.linnakangas 398 237 : PQclear(res);
399 :
2321 peter_e 400 GIC 237 : return primary_sysid;
401 : }
402 :
403 : /*
1486 peter 404 ECB : * Thin wrapper around libpq to obtain server version.
405 : */
406 : static int
1486 peter 407 GIC 1011 : libpqrcv_server_version(WalReceiverConn *conn)
1486 peter 408 ECB : {
1486 peter 409 GIC 1011 : return PQserverVersion(conn->streamConn);
410 : }
411 :
412 : /*
413 : * Start streaming WAL data from given streaming options.
414 : *
3769 heikki.linnakangas 415 ECB : * Returns true if we switched successfully to copy-both mode. False
416 : * means the server received the command and executed it successfully, but
417 : * didn't switch to copy-mode. That means that there was no WAL on the
418 : * requested timeline and starting point, because the server switched to
419 : * another timeline at or before the requested starting point. On failure,
420 : * throws an ERROR.
421 : */
422 : static bool
2321 peter_e 423 GIC 386 : libpqrcv_startstreaming(WalReceiverConn *conn,
424 : const WalRcvStreamOptions *options)
425 : {
426 : StringInfoData cmd;
427 : PGresult *res;
428 :
2271 429 386 : Assert(options->logical == conn->logical);
430 386 : Assert(options->slotname || !options->logical);
2321 peter_e 431 ECB :
2321 peter_e 432 GIC 386 : initStringInfo(&cmd);
433 :
434 : /* Build the command. */
2271 435 386 : appendStringInfoString(&cmd, "START_REPLICATION");
436 386 : if (options->slotname != NULL)
2271 peter_e 437 CBC 304 : appendStringInfo(&cmd, " SLOT \"%s\"",
438 304 : options->slotname);
439 :
440 386 : if (options->logical)
2063 peter_e 441 GIC 282 : appendStringInfoString(&cmd, " LOGICAL");
442 :
775 peter 443 CBC 386 : appendStringInfo(&cmd, " %X/%X", LSN_FORMAT_ARGS(options->startpoint));
2271 peter_e 444 ECB :
445 : /*
446 : * Additional options are different depending on if we are doing logical
447 : * or physical replication.
448 : */
2271 peter_e 449 CBC 386 : if (options->logical)
450 : {
2153 bruce 451 ECB : char *pubnames_str;
452 : List *pubnames;
453 : char *pubnames_literal;
454 :
2271 peter_e 455 GIC 282 : appendStringInfoString(&cmd, " (");
456 :
2271 peter_e 457 CBC 282 : appendStringInfo(&cmd, "proto_version '%u'",
2271 peter_e 458 GIC 282 : options->proto.logical.proto_version);
459 :
90 akapila 460 GNC 282 : if (options->proto.logical.streaming_str)
461 33 : appendStringInfo(&cmd, ", streaming '%s'",
462 33 : options->proto.logical.streaming_str);
948 akapila 463 ECB :
634 akapila 464 GIC 285 : if (options->proto.logical.twophase &&
634 akapila 465 CBC 3 : PQserverVersion(conn->streamConn) >= 150000)
466 3 : appendStringInfoString(&cmd, ", two_phase 'on'");
467 :
262 akapila 468 GNC 564 : if (options->proto.logical.origin &&
469 282 : PQserverVersion(conn->streamConn) >= 160000)
470 282 : appendStringInfo(&cmd, ", origin '%s'",
471 282 : options->proto.logical.origin);
472 :
2271 peter_e 473 CBC 282 : pubnames = options->proto.logical.publication_names;
474 282 : pubnames_str = stringlist_to_identifierstr(conn->streamConn, pubnames);
2267 475 282 : if (!pubnames_str)
2267 peter_e 476 UIC 0 : ereport(ERROR,
662 tgl 477 ECB : (errcode(ERRCODE_OUT_OF_MEMORY), /* likely guess */
478 : errmsg("could not start WAL streaming: %s",
2232 peter_e 479 : pchomp(PQerrorMessage(conn->streamConn)))));
2267 peter_e 480 GIC 282 : pubnames_literal = PQescapeLiteral(conn->streamConn, pubnames_str,
2267 peter_e 481 ECB : strlen(pubnames_str));
2267 peter_e 482 CBC 282 : if (!pubnames_literal)
2267 peter_e 483 LBC 0 : ereport(ERROR,
662 tgl 484 ECB : (errcode(ERRCODE_OUT_OF_MEMORY), /* likely guess */
485 : errmsg("could not start WAL streaming: %s",
2232 peter_e 486 : pchomp(PQerrorMessage(conn->streamConn)))));
2267 peter_e 487 CBC 282 : appendStringInfo(&cmd, ", publication_names %s", pubnames_literal);
488 282 : PQfreemem(pubnames_literal);
2271 peter_e 489 GBC 282 : pfree(pubnames_str);
490 :
995 tgl 491 GIC 293 : if (options->proto.logical.binary &&
492 11 : PQserverVersion(conn->streamConn) >= 140000)
995 tgl 493 CBC 11 : appendStringInfoString(&cmd, ", binary 'true'");
494 :
2267 peter_e 495 282 : appendStringInfoChar(&cmd, ')');
2271 peter_e 496 EUB : }
497 : else
2271 peter_e 498 GIC 104 : appendStringInfo(&cmd, " TIMELINE %u",
499 104 : options->proto.physical.startpointTLI);
2271 peter_e 500 ECB :
501 : /* Start streaming. */
2321 peter_e 502 CBC 386 : res = libpqrcv_PQexec(conn->streamConn, cmd.data);
2321 peter_e 503 GIC 386 : pfree(cmd.data);
3769 heikki.linnakangas 504 ECB :
3769 heikki.linnakangas 505 CBC 386 : if (PQresultStatus(res) == PGRES_COMMAND_OK)
3769 heikki.linnakangas 506 ECB : {
3769 heikki.linnakangas 507 UIC 0 : PQclear(res);
3769 heikki.linnakangas 508 LBC 0 : return false;
509 : }
3769 heikki.linnakangas 510 GIC 386 : else if (PQresultStatus(res) != PGRES_COPY_BOTH)
4738 magnus 511 ECB : {
4738 magnus 512 LBC 0 : PQclear(res);
4827 heikki.linnakangas 513 UIC 0 : ereport(ERROR,
514 : (errcode(ERRCODE_PROTOCOL_VIOLATION),
662 tgl 515 ECB : errmsg("could not start WAL streaming: %s",
2232 peter_e 516 : pchomp(PQerrorMessage(conn->streamConn)))));
517 : }
4827 heikki.linnakangas 518 CBC 386 : PQclear(res);
3769 heikki.linnakangas 519 GIC 386 : return true;
3769 heikki.linnakangas 520 EUB : }
521 :
522 : /*
3733 heikki.linnakangas 523 ECB : * Stop streaming WAL data. Returns the next timeline's ID in *next_tli, as
524 : * reported by the server, or 0 if it did not report it.
3769 heikki.linnakangas 525 EUB : */
526 : static void
2321 peter_e 527 GIC 186 : libpqrcv_endstreaming(WalReceiverConn *conn, TimeLineID *next_tli)
528 : {
529 : PGresult *res;
530 :
1441 tgl 531 ECB : /*
532 : * Send copy-end message. As in libpqrcv_PQexec, this could theoretically
533 : * block, but the risk seems small.
534 : */
2321 peter_e 535 GIC 347 : if (PQputCopyEnd(conn->streamConn, NULL) <= 0 ||
536 161 : PQflush(conn->streamConn))
3769 heikki.linnakangas 537 25 : ereport(ERROR,
538 : (errcode(ERRCODE_CONNECTION_FAILURE),
539 : errmsg("could not send end-of-streaming message to primary: %s",
2118 tgl 540 ECB : pchomp(PQerrorMessage(conn->streamConn)))));
541 :
2321 peter_e 542 GIC 161 : *next_tli = 0;
543 :
544 : /*
545 : * After COPY is finished, we should receive a result set indicating the
546 : * next timeline's ID, or just CommandComplete if the server was shut
547 : * down.
3733 heikki.linnakangas 548 ECB : *
2153 bruce 549 : * If we had not yet received CopyDone from the backend, PGRES_COPY_OUT is
550 : * also possible in case we aborted the copy in mid-stream.
551 : */
1441 tgl 552 GIC 161 : res = libpqrcv_PQgetResult(conn->streamConn);
3733 heikki.linnakangas 553 161 : if (PQresultStatus(res) == PGRES_TUPLES_OK)
554 : {
3623 heikki.linnakangas 555 ECB : /*
556 : * Read the next timeline's ID. The server also sends the timeline's
557 : * starting point, but it is ignored.
558 : */
3623 heikki.linnakangas 559 GIC 12 : if (PQnfields(res) < 2 || PQntuples(res) != 1)
3733 heikki.linnakangas 560 UIC 0 : ereport(ERROR,
561 : (errcode(ERRCODE_PROTOCOL_VIOLATION),
562 : errmsg("unexpected result set after end-of-streaming")));
1722 andres 563 GIC 12 : *next_tli = pg_strtoint32(PQgetvalue(res, 0, 0));
3769 heikki.linnakangas 564 12 : PQclear(res);
3733 heikki.linnakangas 565 ECB :
566 : /* the result set should be followed by CommandComplete */
1441 tgl 567 GIC 12 : res = libpqrcv_PQgetResult(conn->streamConn);
568 : }
2321 peter_e 569 149 : else if (PQresultStatus(res) == PGRES_COPY_OUT)
570 : {
571 149 : PQclear(res);
2321 peter_e 572 ECB :
2321 peter_e 573 EUB : /* End the copy */
2109 tgl 574 GIC 149 : if (PQendcopy(conn->streamConn))
575 1 : ereport(ERROR,
662 tgl 576 ECB : (errcode(ERRCODE_CONNECTION_FAILURE),
577 : errmsg("error while shutting down streaming COPY: %s",
578 : pchomp(PQerrorMessage(conn->streamConn)))));
579 :
2321 peter_e 580 : /* CommandComplete should follow */
1441 tgl 581 GIC 148 : res = libpqrcv_PQgetResult(conn->streamConn);
3769 heikki.linnakangas 582 ECB : }
583 :
3733 heikki.linnakangas 584 CBC 160 : if (PQresultStatus(res) != PGRES_COMMAND_OK)
3733 heikki.linnakangas 585 UIC 0 : ereport(ERROR,
586 : (errcode(ERRCODE_PROTOCOL_VIOLATION),
662 tgl 587 ECB : errmsg("error reading result of streaming command: %s",
2232 peter_e 588 : pchomp(PQerrorMessage(conn->streamConn)))));
2979 tgl 589 GIC 160 : PQclear(res);
590 :
591 : /* Verify that there are no more results */
1441 592 160 : res = libpqrcv_PQgetResult(conn->streamConn);
3733 heikki.linnakangas 593 160 : if (res != NULL)
3733 heikki.linnakangas 594 LBC 0 : ereport(ERROR,
595 : (errcode(ERRCODE_PROTOCOL_VIOLATION),
596 : errmsg("unexpected result after CommandComplete: %s",
2232 peter_e 597 ECB : pchomp(PQerrorMessage(conn->streamConn)))));
3769 heikki.linnakangas 598 GBC 160 : }
599 :
600 : /*
601 : * Fetch the timeline history file for 'tli' from primary.
3769 heikki.linnakangas 602 ECB : */
603 : static void
2321 peter_e 604 GIC 11 : libpqrcv_readtimelinehistoryfile(WalReceiverConn *conn,
2321 peter_e 605 ECB : TimeLineID tli, char **filename,
606 : char **content, int *len)
3769 heikki.linnakangas 607 EUB : {
608 : PGresult *res;
609 : char cmd[64];
610 :
2321 peter_e 611 CBC 11 : Assert(!conn->logical);
612 :
613 : /*
614 : * Request the primary to send over the history file for given timeline.
615 : */
3769 heikki.linnakangas 616 GIC 11 : snprintf(cmd, sizeof(cmd), "TIMELINE_HISTORY %u", tli);
2321 peter_e 617 CBC 11 : res = libpqrcv_PQexec(conn->streamConn, cmd);
3769 heikki.linnakangas 618 GIC 11 : if (PQresultStatus(res) != PGRES_TUPLES_OK)
619 : {
3769 heikki.linnakangas 620 UIC 0 : PQclear(res);
621 0 : ereport(ERROR,
622 : (errcode(ERRCODE_PROTOCOL_VIOLATION),
623 : errmsg("could not receive timeline history file from "
3769 heikki.linnakangas 624 ECB : "the primary server: %s",
625 : pchomp(PQerrorMessage(conn->streamConn)))));
626 : }
3769 heikki.linnakangas 627 GIC 11 : if (PQnfields(res) != 2 || PQntuples(res) != 1)
628 : {
3769 heikki.linnakangas 629 LBC 0 : int ntuples = PQntuples(res);
630 0 : int nfields = PQnfields(res);
3769 heikki.linnakangas 631 ECB :
3769 heikki.linnakangas 632 UIC 0 : PQclear(res);
3769 heikki.linnakangas 633 UBC 0 : ereport(ERROR,
662 tgl 634 EUB : (errcode(ERRCODE_PROTOCOL_VIOLATION),
635 : errmsg("invalid response from primary server"),
636 : errdetail("Expected 1 tuple with 2 fields, got %d tuples with %d fields.",
637 : ntuples, nfields)));
638 : }
3769 heikki.linnakangas 639 GIC 11 : *filename = pstrdup(PQgetvalue(res, 0, 0));
3769 heikki.linnakangas 640 ECB :
3769 heikki.linnakangas 641 GIC 11 : *len = PQgetlength(res, 0, 1);
3769 heikki.linnakangas 642 GBC 11 : *content = palloc(*len);
643 11 : memcpy(*content, PQgetvalue(res, 0, 1), *len);
3769 heikki.linnakangas 644 GIC 11 : PQclear(res);
4827 heikki.linnakangas 645 GBC 11 : }
4827 heikki.linnakangas 646 EUB :
647 : /*
648 : * Send a query and wait for the results by using the asynchronous libpq
649 : * functions and socket readiness events.
650 : *
651 : * We must not use the regular blocking libpq functions like PQexec()
4738 magnus 652 ECB : * since they are uninterruptible by signals on some platforms, such as
653 : * Windows.
654 : *
655 : * The function is modeled on PQexec() in libpq, but only implements
2208 peter_e 656 : * those parts that are in use in the walreceiver api.
4738 magnus 657 : *
1441 tgl 658 : * May return NULL, rather than an error result, on failure.
659 : */
660 : static PGresult *
2321 peter_e 661 GIC 2760 : libpqrcv_PQexec(PGconn *streamConn, const char *query)
662 : {
4660 bruce 663 2760 : PGresult *lastResult = NULL;
664 :
665 : /*
666 : * PQexec() silently discards any prior query results on the connection.
667 : * This is not required for this function as it's expected that the caller
668 : * (which is this library in all cases) will behave correctly and we don't
669 : * have to be backwards compatible with old libpq.
670 : */
671 :
672 : /*
673 : * Submit the query. Since we don't use non-blocking mode, this could
1441 tgl 674 ECB : * theoretically block. In practice, since we don't send very long query
675 : * strings, the risk seems negligible.
4738 magnus 676 : */
4738 magnus 677 GIC 2760 : if (!PQsendQuery(streamConn, query))
4738 magnus 678 UIC 0 : return NULL;
679 :
680 : for (;;)
4738 magnus 681 GIC 2219 : {
682 : /* Wait for, and collect, the next PGresult. */
683 : PGresult *result;
684 :
1441 tgl 685 4979 : result = libpqrcv_PQgetResult(streamConn);
686 4979 : if (result == NULL)
687 2219 : break; /* query is complete, or failure */
688 :
689 : /*
2124 tgl 690 ECB : * Emulate PQexec()'s behavior of returning the last result when there
2124 tgl 691 EUB : * are many. We are fine with returning just last error message.
692 : */
4738 magnus 693 GIC 2760 : PQclear(lastResult);
4738 magnus 694 CBC 2760 : lastResult = result;
695 :
4738 magnus 696 GIC 5520 : if (PQresultStatus(lastResult) == PGRES_COPY_IN ||
697 5365 : PQresultStatus(lastResult) == PGRES_COPY_OUT ||
4502 rhaas 698 CBC 4824 : PQresultStatus(lastResult) == PGRES_COPY_BOTH ||
4738 magnus 699 2219 : PQstatus(streamConn) == CONNECTION_BAD)
4738 magnus 700 ECB : break;
701 : }
702 :
4738 magnus 703 GIC 2760 : return lastResult;
704 : }
705 :
1441 tgl 706 ECB : /*
707 : * Perform the equivalent of PQgetResult(), but watch for interrupts.
708 : */
709 : static PGresult *
1441 tgl 710 CBC 5834 : libpqrcv_PQgetResult(PGconn *streamConn)
1441 tgl 711 ECB : {
712 : /*
713 : * Collect data until PQgetResult is ready to get the result without
714 : * blocking.
715 : */
1441 tgl 716 CBC 8608 : while (PQisBusy(streamConn))
717 : {
718 : int rc;
719 :
720 : /*
721 : * We don't need to break down the sleep into smaller increments,
722 : * since we'll get interrupted by signals and can handle any
1441 tgl 723 ECB : * interrupts here.
724 : */
1441 tgl 725 GIC 2799 : rc = WaitLatchOrSocket(MyLatch,
726 : WL_EXIT_ON_PM_DEATH | WL_SOCKET_READABLE |
727 : WL_LATCH_SET,
728 : PQsocket(streamConn),
1441 tgl 729 ECB : 0,
730 : WAIT_EVENT_LIBPQWALRECEIVER_RECEIVE);
731 :
732 : /* Interrupted? */
1441 tgl 733 GIC 2799 : if (rc & WL_LATCH_SET)
734 : {
735 2 : ResetLatch(MyLatch);
736 2 : ProcessWalRcvInterrupts();
737 : }
1441 tgl 738 ECB :
739 : /* Consume whatever data is available from the socket */
1441 tgl 740 GIC 2799 : if (PQconsumeInput(streamConn) == 0)
741 : {
742 : /* trouble; return NULL */
743 25 : return NULL;
744 : }
745 : }
1441 tgl 746 ECB :
747 : /* Now we can collect and return the next PGresult */
1441 tgl 748 CBC 5809 : return PQgetResult(streamConn);
1441 tgl 749 ECB : }
750 :
751 : /*
752 : * Disconnect connection to primary, if any.
4827 heikki.linnakangas 753 : */
754 : static void
2321 peter_e 755 GIC 536 : libpqrcv_disconnect(WalReceiverConn *conn)
4827 heikki.linnakangas 756 ECB : {
76 andres 757 GNC 536 : libpqsrv_disconnect(conn->streamConn);
226 peter 758 536 : PQfreemem(conn->recvBuf);
2321 peter_e 759 GIC 536 : pfree(conn);
4827 heikki.linnakangas 760 CBC 536 : }
761 :
762 : /*
763 : * Receive a message available from XLOG stream.
764 : *
765 : * Returns:
766 : *
3769 heikki.linnakangas 767 ECB : * If data was received, returns the length of the data. *buffer is set to
768 : * point to a buffer holding the received message. The buffer is only valid
769 : * until the next libpqrcv_* call.
4827 770 : *
2567 rhaas 771 : * If no data was available immediately, returns 0, and *wait_fd is set to a
2551 tgl 772 : * socket descriptor which can be waited on before trying again.
773 : *
774 : * -1 if the server ended the COPY.
775 : *
776 : * ereports on error.
777 : */
778 : static int
2321 peter_e 779 GIC 310892 : libpqrcv_receive(WalReceiverConn *conn, char **buffer,
780 : pgsocket *wait_fd)
781 : {
782 : int rawlen;
783 :
226 peter 784 GNC 310892 : PQfreemem(conn->recvBuf);
2321 peter_e 785 GIC 310892 : conn->recvBuf = NULL;
786 :
787 : /* Try to receive a CopyData message */
788 310892 : rawlen = PQgetCopyData(conn->streamConn, &conn->recvBuf, 1);
4469 heikki.linnakangas 789 310892 : if (rawlen == 0)
4827 heikki.linnakangas 790 ECB : {
791 : /* Try consuming some data. */
2321 peter_e 792 GIC 199916 : if (PQconsumeInput(conn->streamConn) == 0)
4827 heikki.linnakangas 793 33 : ereport(ERROR,
794 : (errcode(ERRCODE_CONNECTION_FAILURE),
662 tgl 795 ECB : errmsg("could not receive data from WAL stream: %s",
2232 peter_e 796 : pchomp(PQerrorMessage(conn->streamConn)))));
797 :
798 : /* Now that we've consumed some input, try again */
2321 peter_e 799 CBC 199883 : rawlen = PQgetCopyData(conn->streamConn, &conn->recvBuf, 1);
4469 heikki.linnakangas 800 199883 : if (rawlen == 0)
801 : {
802 : /* Tell caller to try again when our socket is ready. */
2321 peter_e 803 89644 : *wait_fd = PQsocket(conn->streamConn);
3769 heikki.linnakangas 804 89644 : return 0;
805 : }
806 : }
4790 bruce 807 GIC 221215 : if (rawlen == -1) /* end-of-streaming or error */
808 : {
809 : PGresult *res;
4827 heikki.linnakangas 810 ECB :
1441 tgl 811 CBC 196 : res = libpqrcv_PQgetResult(conn->streamConn);
2208 peter_e 812 GIC 196 : if (PQresultStatus(res) == PGRES_COMMAND_OK)
813 : {
2208 peter_e 814 CBC 178 : PQclear(res);
2208 peter_e 815 ECB :
816 : /* Verify that there are no more results. */
1441 tgl 817 GIC 178 : res = libpqrcv_PQgetResult(conn->streamConn);
2208 peter_e 818 CBC 178 : if (res != NULL)
819 : {
2131 andres 820 UIC 0 : PQclear(res);
821 :
2131 andres 822 ECB : /*
823 : * If the other side closed the connection orderly (otherwise
824 : * we'd seen an error, or PGRES_COPY_IN) don't report an error
825 : * here, but let callers deal with it.
826 : */
2131 andres 827 UIC 0 : if (PQstatus(conn->streamConn) == CONNECTION_BAD)
2131 andres 828 LBC 0 : return -1;
2131 andres 829 ECB :
2208 peter_e 830 UIC 0 : ereport(ERROR,
662 tgl 831 EUB : (errcode(ERRCODE_PROTOCOL_VIOLATION),
832 : errmsg("unexpected result after CommandComplete: %s",
833 : PQerrorMessage(conn->streamConn))));
834 : }
835 :
2208 peter_e 836 GIC 178 : return -1;
837 : }
2208 peter_e 838 GBC 18 : else if (PQresultStatus(res) == PGRES_COPY_IN)
3769 heikki.linnakangas 839 EUB : {
3769 heikki.linnakangas 840 GIC 12 : PQclear(res);
3769 heikki.linnakangas 841 GBC 12 : return -1;
842 : }
843 : else
844 : {
4827 heikki.linnakangas 845 GIC 6 : PQclear(res);
846 6 : ereport(ERROR,
662 tgl 847 ECB : (errcode(ERRCODE_PROTOCOL_VIOLATION),
848 : errmsg("could not receive data from WAL stream: %s",
2232 peter_e 849 : pchomp(PQerrorMessage(conn->streamConn)))));
850 : }
4827 heikki.linnakangas 851 : }
4827 heikki.linnakangas 852 CBC 221019 : if (rawlen < -1)
4827 heikki.linnakangas 853 UIC 0 : ereport(ERROR,
854 : (errcode(ERRCODE_PROTOCOL_VIOLATION),
855 : errmsg("could not receive data from WAL stream: %s",
2232 peter_e 856 ECB : pchomp(PQerrorMessage(conn->streamConn)))));
4827 heikki.linnakangas 857 :
858 : /* Return received messages to caller */
2321 peter_e 859 GIC 221019 : *buffer = conn->recvBuf;
3769 heikki.linnakangas 860 221019 : return rawlen;
861 : }
862 :
4502 rhaas 863 ECB : /*
4502 rhaas 864 EUB : * Send a message to XLOG stream.
865 : *
866 : * ereports on error.
867 : */
868 : static void
2321 peter_e 869 GIC 93966 : libpqrcv_send(WalReceiverConn *conn, const char *buffer, int nbytes)
4502 rhaas 870 ECB : {
2321 peter_e 871 CBC 187932 : if (PQputCopyData(conn->streamConn, buffer, nbytes) <= 0 ||
2321 peter_e 872 GIC 93966 : PQflush(conn->streamConn))
4502 rhaas 873 UIC 0 : ereport(ERROR,
874 : (errcode(ERRCODE_CONNECTION_FAILURE),
875 : errmsg("could not send data to WAL stream: %s",
876 : pchomp(PQerrorMessage(conn->streamConn)))));
4502 rhaas 877 GIC 93966 : }
878 :
879 : /*
2271 peter_e 880 ECB : * Create new replication slot.
881 : * Returns the name of the exported snapshot for logical slot or NULL for
882 : * physical slot.
883 : */
2271 peter_e 884 EUB : static char *
2271 peter_e 885 GIC 224 : libpqrcv_create_slot(WalReceiverConn *conn, const char *slotname,
886 : bool temporary, bool two_phase, CRSSnapshotAction snapshot_action,
887 : XLogRecPtr *lsn)
2271 peter_e 888 ECB : {
889 : PGresult *res;
890 : StringInfoData cmd;
891 : char *snapshot;
892 : int use_new_options_syntax;
893 :
551 rhaas 894 GIC 224 : use_new_options_syntax = (PQserverVersion(conn->streamConn) >= 150000);
895 :
2271 peter_e 896 CBC 224 : initStringInfo(&cmd);
897 :
2217 peter_e 898 GIC 224 : appendStringInfo(&cmd, "CREATE_REPLICATION_SLOT \"%s\"", slotname);
899 :
2271 900 224 : if (temporary)
2063 peter_e 901 UIC 0 : appendStringInfoString(&cmd, " TEMPORARY");
902 :
2271 peter_e 903 GIC 224 : if (conn->logical)
904 : {
551 rhaas 905 CBC 224 : appendStringInfoString(&cmd, " LOGICAL pgoutput ");
551 rhaas 906 GIC 224 : if (use_new_options_syntax)
551 rhaas 907 CBC 224 : appendStringInfoChar(&cmd, '(');
634 akapila 908 GIC 224 : if (two_phase)
551 rhaas 909 ECB : {
551 rhaas 910 GIC 1 : appendStringInfoString(&cmd, "TWO_PHASE");
551 rhaas 911 CBC 1 : if (use_new_options_syntax)
551 rhaas 912 GBC 1 : appendStringInfoString(&cmd, ", ");
913 : else
551 rhaas 914 LBC 0 : appendStringInfoChar(&cmd, ' ');
915 : }
634 akapila 916 ECB :
551 rhaas 917 CBC 224 : if (use_new_options_syntax)
551 rhaas 918 ECB : {
551 rhaas 919 CBC 224 : switch (snapshot_action)
920 : {
551 rhaas 921 LBC 0 : case CRS_EXPORT_SNAPSHOT:
922 0 : appendStringInfoString(&cmd, "SNAPSHOT 'export'");
923 0 : break;
551 rhaas 924 GIC 69 : case CRS_NOEXPORT_SNAPSHOT:
551 rhaas 925 GBC 69 : appendStringInfoString(&cmd, "SNAPSHOT 'nothing'");
551 rhaas 926 GIC 69 : break;
927 155 : case CRS_USE_SNAPSHOT:
551 rhaas 928 CBC 155 : appendStringInfoString(&cmd, "SNAPSHOT 'use'");
551 rhaas 929 GIC 155 : break;
551 rhaas 930 ECB : }
931 : }
551 rhaas 932 EUB : else
2208 peter_e 933 : {
551 rhaas 934 UBC 0 : switch (snapshot_action)
551 rhaas 935 ECB : {
551 rhaas 936 LBC 0 : case CRS_EXPORT_SNAPSHOT:
937 0 : appendStringInfoString(&cmd, "EXPORT_SNAPSHOT");
938 0 : break;
939 0 : case CRS_NOEXPORT_SNAPSHOT:
940 0 : appendStringInfoString(&cmd, "NOEXPORT_SNAPSHOT");
551 rhaas 941 UIC 0 : break;
942 0 : case CRS_USE_SNAPSHOT:
943 0 : appendStringInfoString(&cmd, "USE_SNAPSHOT");
944 0 : break;
551 rhaas 945 EUB : }
946 : }
947 :
551 rhaas 948 GBC 224 : if (use_new_options_syntax)
949 224 : appendStringInfoChar(&cmd, ')');
2217 peter_e 950 EUB : }
1181 peter 951 : else
952 : {
551 rhaas 953 UBC 0 : if (use_new_options_syntax)
954 0 : appendStringInfoString(&cmd, " PHYSICAL (RESERVE_WAL)");
551 rhaas 955 EUB : else
551 rhaas 956 UIC 0 : appendStringInfoString(&cmd, " PHYSICAL RESERVE_WAL");
957 : }
958 :
2271 peter_e 959 CBC 224 : res = libpqrcv_PQexec(conn->streamConn, cmd.data);
960 224 : pfree(cmd.data);
961 :
2271 peter_e 962 GIC 224 : if (PQresultStatus(res) != PGRES_TUPLES_OK)
963 : {
2271 peter_e 964 UBC 0 : PQclear(res);
965 0 : ereport(ERROR,
966 : (errcode(ERRCODE_PROTOCOL_VIOLATION),
662 tgl 967 EUB : errmsg("could not create replication slot \"%s\": %s",
968 : slotname, pchomp(PQerrorMessage(conn->streamConn)))));
969 : }
2271 peter_e 970 ECB :
1184 peter 971 CBC 224 : if (lsn)
1184 peter 972 GIC 155 : *lsn = DatumGetLSN(DirectFunctionCall1Coll(pg_lsn_in, InvalidOid,
1184 peter 973 CBC 155 : CStringGetDatum(PQgetvalue(res, 0, 1))));
974 :
2271 peter_e 975 GBC 224 : if (!PQgetisnull(res, 0, 2))
2271 peter_e 976 UBC 0 : snapshot = pstrdup(PQgetvalue(res, 0, 2));
977 : else
2271 peter_e 978 GIC 224 : snapshot = NULL;
979 :
980 224 : PQclear(res);
981 :
2271 peter_e 982 CBC 224 : return snapshot;
2271 peter_e 983 ECB : }
984 :
985 : /*
1181 peter 986 : * Return PID of remote backend process.
1181 peter 987 EUB : */
988 : static pid_t
1181 peter 989 LBC 0 : libpqrcv_get_backend_pid(WalReceiverConn *conn)
990 : {
991 0 : return PQbackendPID(conn->streamConn);
992 : }
1181 peter 993 ECB :
994 : /*
995 : * Convert tuple query result to tuplestore.
996 : */
997 : static void
2208 peter_e 998 GIC 823 : libpqrcv_processTuples(PGresult *pgres, WalRcvExecResult *walres,
999 : const int nRetTypes, const Oid *retTypes)
2208 peter_e 1000 EUB : {
1001 : int tupn;
2153 bruce 1002 : int coln;
2153 bruce 1003 GIC 823 : int nfields = PQnfields(pgres);
1004 : HeapTuple tuple;
1005 : AttInMetadata *attinmeta;
1006 : MemoryContext rowcontext;
1007 : MemoryContext oldcontext;
1008 :
2208 peter_e 1009 ECB : /* Make sure we got expected number of fields. */
2208 peter_e 1010 GIC 823 : if (nfields != nRetTypes)
2208 peter_e 1011 UIC 0 : ereport(ERROR,
1012 : (errcode(ERRCODE_PROTOCOL_VIOLATION),
1013 : errmsg("invalid query response"),
2208 peter_e 1014 ECB : errdetail("Expected %d fields, got %d fields.",
1015 : nRetTypes, nfields)));
1016 :
2208 peter_e 1017 GIC 823 : walres->tuplestore = tuplestore_begin_heap(true, false, work_mem);
1018 :
1019 : /* Create tuple descriptor corresponding to expected result. */
1601 andres 1020 823 : walres->tupledesc = CreateTemplateTupleDesc(nRetTypes);
2208 peter_e 1021 CBC 2639 : for (coln = 0; coln < nRetTypes; coln++)
2208 peter_e 1022 GBC 1816 : TupleDescInitEntry(walres->tupledesc, (AttrNumber) coln + 1,
2208 peter_e 1023 GIC 1816 : PQfname(pgres, coln), retTypes[coln], -1, 0);
1024 823 : attinmeta = TupleDescGetAttInMetadata(walres->tupledesc);
1025 :
1026 : /* No point in doing more here if there were no tuples returned. */
2207 1027 823 : if (PQntuples(pgres) == 0)
2207 peter_e 1028 CBC 8 : return;
1029 :
1030 : /* Create temporary context for local allocations. */
2208 1031 815 : rowcontext = AllocSetContextCreate(CurrentMemoryContext,
2208 peter_e 1032 ECB : "libpqrcv query result context",
1033 : ALLOCSET_DEFAULT_SIZES);
1034 :
1035 : /* Process returned rows. */
2208 peter_e 1036 GIC 1900 : for (tupn = 0; tupn < PQntuples(pgres); tupn++)
1037 : {
2153 bruce 1038 ECB : char *cstrs[MaxTupleAttributeNumber];
2208 peter_e 1039 :
1441 tgl 1040 GIC 1085 : ProcessWalRcvInterrupts();
1041 :
2208 peter_e 1042 ECB : /* Do the allocations in temporary context. */
2208 peter_e 1043 GIC 1085 : oldcontext = MemoryContextSwitchTo(rowcontext);
1044 :
1045 : /*
1046 : * Fill cstrs with null-terminated strings of column values.
2208 peter_e 1047 ECB : */
2208 peter_e 1048 GIC 3755 : for (coln = 0; coln < nfields; coln++)
1049 : {
1050 2670 : if (PQgetisnull(pgres, tupn, coln))
2208 peter_e 1051 CBC 374 : cstrs[coln] = NULL;
1052 : else
2208 peter_e 1053 GIC 2296 : cstrs[coln] = PQgetvalue(pgres, tupn, coln);
2208 peter_e 1054 ECB : }
1055 :
1056 : /* Convert row to a tuple, and add it to the tuplestore */
2208 peter_e 1057 GIC 1085 : tuple = BuildTupleFromCStrings(attinmeta, cstrs);
1058 1085 : tuplestore_puttuple(walres->tuplestore, tuple);
2208 peter_e 1059 ECB :
1060 : /* Clean up */
2208 peter_e 1061 CBC 1085 : MemoryContextSwitchTo(oldcontext);
1062 1085 : MemoryContextReset(rowcontext);
1063 : }
2208 peter_e 1064 ECB :
2208 peter_e 1065 GIC 815 : MemoryContextDelete(rowcontext);
1066 : }
1067 :
2208 peter_e 1068 ECB : /*
1069 : * Public interface for sending generic queries (and commands).
1070 : *
1071 : * This can only be called from process connected to database.
2271 1072 : */
2208 1073 : static WalRcvExecResult *
2208 peter_e 1074 GIC 1470 : libpqrcv_exec(WalReceiverConn *conn, const char *query,
1075 : const int nRetTypes, const Oid *retTypes)
2271 peter_e 1076 ECB : {
2208 peter_e 1077 GIC 1470 : PGresult *pgres = NULL;
1078 1470 : WalRcvExecResult *walres = palloc0(sizeof(WalRcvExecResult));
1079 : char *diag_sqlstate;
1080 :
1081 1470 : if (MyDatabaseId == InvalidOid)
2208 peter_e 1082 UIC 0 : ereport(ERROR,
1083 : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1084 : errmsg("the query interface requires a database connection")));
2271 peter_e 1085 ECB :
2208 peter_e 1086 GIC 1470 : pgres = libpqrcv_PQexec(conn->streamConn, query);
1087 :
2208 peter_e 1088 CBC 1470 : switch (PQresultStatus(pgres))
2271 peter_e 1089 ECB : {
2208 peter_e 1090 GIC 823 : case PGRES_SINGLE_TUPLE:
1091 : case PGRES_TUPLES_OK:
2208 peter_e 1092 CBC 823 : walres->status = WALRCV_OK_TUPLES;
2208 peter_e 1093 GBC 823 : libpqrcv_processTuples(pgres, walres, nRetTypes, retTypes);
2208 peter_e 1094 GIC 823 : break;
1095 :
2208 peter_e 1096 UIC 0 : case PGRES_COPY_IN:
2208 peter_e 1097 LBC 0 : walres->status = WALRCV_OK_COPY_IN;
2208 peter_e 1098 UIC 0 : break;
2208 peter_e 1099 ECB :
2208 peter_e 1100 GIC 155 : case PGRES_COPY_OUT:
2208 peter_e 1101 CBC 155 : walres->status = WALRCV_OK_COPY_OUT;
2208 peter_e 1102 GIC 155 : break;
2208 peter_e 1103 ECB :
2208 peter_e 1104 LBC 0 : case PGRES_COPY_BOTH:
1105 0 : walres->status = WALRCV_OK_COPY_BOTH;
2208 peter_e 1106 UIC 0 : break;
2208 peter_e 1107 EUB :
2208 peter_e 1108 GBC 492 : case PGRES_COMMAND_OK:
1109 492 : walres->status = WALRCV_OK_COMMAND;
2208 peter_e 1110 GIC 492 : break;
2208 peter_e 1111 ECB :
2153 bruce 1112 : /* Empty query is considered error. */
2208 peter_e 1113 LBC 0 : case PGRES_EMPTY_QUERY:
2208 peter_e 1114 UIC 0 : walres->status = WALRCV_ERROR;
2208 peter_e 1115 UBC 0 : walres->err = _("empty query");
1116 0 : break;
2208 peter_e 1117 EUB :
755 alvherre 1118 UIC 0 : case PGRES_PIPELINE_SYNC:
755 alvherre 1119 ECB : case PGRES_PIPELINE_ABORTED:
755 alvherre 1120 LBC 0 : walres->status = WALRCV_ERROR;
1121 0 : walres->err = _("unexpected pipeline mode");
755 alvherre 1122 UIC 0 : break;
1123 :
2208 peter_e 1124 UBC 0 : case PGRES_NONFATAL_ERROR:
2208 peter_e 1125 EUB : case PGRES_FATAL_ERROR:
1126 : case PGRES_BAD_RESPONSE:
2208 peter_e 1127 UBC 0 : walres->status = WALRCV_ERROR;
2208 peter_e 1128 UIC 0 : walres->err = pchomp(PQerrorMessage(conn->streamConn));
786 akapila 1129 UBC 0 : diag_sqlstate = PQresultErrorField(pgres, PG_DIAG_SQLSTATE);
786 akapila 1130 UIC 0 : if (diag_sqlstate)
786 akapila 1131 UBC 0 : walres->sqlstate = MAKE_SQLSTATE(diag_sqlstate[0],
786 akapila 1132 EUB : diag_sqlstate[1],
1133 : diag_sqlstate[2],
1134 : diag_sqlstate[3],
1135 : diag_sqlstate[4]);
2208 peter_e 1136 UIC 0 : break;
1137 : }
2271 peter_e 1138 EUB :
2208 peter_e 1139 GBC 1470 : PQclear(pgres);
2271 peter_e 1140 EUB :
2208 peter_e 1141 GBC 1470 : return walres;
2271 peter_e 1142 EUB : }
1143 :
1144 : /*
1145 : * Given a List of strings, return it as single comma separated
1146 : * string, quoting identifiers as needed.
1147 : *
1148 : * This is essentially the reverse of SplitIdentifierString.
1149 : *
2271 peter_e 1150 ECB : * The caller should free the result.
1151 : */
1152 : static char *
2271 peter_e 1153 GIC 282 : stringlist_to_identifierstr(PGconn *conn, List *strings)
1154 : {
1155 : ListCell *lc;
1156 : StringInfoData res;
2153 bruce 1157 282 : bool first = true;
1158 :
2271 peter_e 1159 282 : initStringInfo(&res);
1160 :
2153 bruce 1161 779 : foreach(lc, strings)
1162 : {
1163 497 : char *val = strVal(lfirst(lc));
2153 bruce 1164 ECB : char *val_escaped;
1165 :
2271 peter_e 1166 GIC 497 : if (first)
1167 282 : first = false;
2271 peter_e 1168 ECB : else
2271 peter_e 1169 GIC 215 : appendStringInfoChar(&res, ',');
2271 peter_e 1170 ECB :
2267 peter_e 1171 GIC 497 : val_escaped = PQescapeIdentifier(conn, val, strlen(val));
2267 peter_e 1172 CBC 497 : if (!val_escaped)
1173 : {
2267 peter_e 1174 LBC 0 : free(res.data);
2267 peter_e 1175 UIC 0 : return NULL;
1176 : }
2267 peter_e 1177 CBC 497 : appendStringInfoString(&res, val_escaped);
1178 497 : PQfreemem(val_escaped);
1179 : }
2271 peter_e 1180 ECB :
2271 peter_e 1181 GIC 282 : return res.data;
2271 peter_e 1182 ECB : }
|