TLA Line data Source code
1 : /*-------------------------------------------------------------------------
2 : *
3 : * libpqwalreceiver.c
4 : *
5 : * This file contains the libpq-specific parts of walreceiver. It's
6 : * loaded as a dynamic module to avoid linking the main server binary with
7 : * libpq.
8 : *
9 : * Portions Copyright (c) 2010-2023, PostgreSQL Global Development Group
10 : *
11 : *
12 : * IDENTIFICATION
13 : * src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
14 : *
15 : *-------------------------------------------------------------------------
16 : */
17 : #include "postgres.h"
18 :
19 : #include <unistd.h>
20 : #include <sys/time.h>
21 :
22 : #include "access/xlog.h"
23 : #include "catalog/pg_type.h"
24 : #include "common/connect.h"
25 : #include "funcapi.h"
26 : #include "libpq-fe.h"
27 : #include "libpq/libpq-be-fe-helpers.h"
28 : #include "mb/pg_wchar.h"
29 : #include "miscadmin.h"
30 : #include "pgstat.h"
31 : #include "pqexpbuffer.h"
32 : #include "replication/walreceiver.h"
33 : #include "utils/builtins.h"
34 : #include "utils/memutils.h"
35 : #include "utils/pg_lsn.h"
36 : #include "utils/tuplestore.h"
37 :
38 GIC 615 : PG_MODULE_MAGIC;
39 ECB :
40 : struct WalReceiverConn
41 : {
42 : /* Current connection to the primary, if any */
43 : PGconn *streamConn;
44 : /* Used to remember if the connection is logical or physical */
45 : bool logical;
46 : /* Buffer for currently read records */
47 : char *recvBuf;
48 : };
49 :
50 : /* Prototypes for interface functions */
51 : static WalReceiverConn *libpqrcv_connect(const char *conninfo,
52 : bool logical, bool must_use_password,
53 : const char *appname, char **err);
54 : static void libpqrcv_check_conninfo(const char *conninfo,
55 : bool must_use_password);
56 : static char *libpqrcv_get_conninfo(WalReceiverConn *conn);
57 : static void libpqrcv_get_senderinfo(WalReceiverConn *conn,
58 : char **sender_host, int *sender_port);
59 : static char *libpqrcv_identify_system(WalReceiverConn *conn,
60 : TimeLineID *primary_tli);
61 : static int libpqrcv_server_version(WalReceiverConn *conn);
62 : static void libpqrcv_readtimelinehistoryfile(WalReceiverConn *conn,
63 : TimeLineID tli, char **filename,
64 : char **content, int *len);
65 : static bool libpqrcv_startstreaming(WalReceiverConn *conn,
66 : const WalRcvStreamOptions *options);
67 : static void libpqrcv_endstreaming(WalReceiverConn *conn,
68 : TimeLineID *next_tli);
69 : static int libpqrcv_receive(WalReceiverConn *conn, char **buffer,
70 : pgsocket *wait_fd);
71 : static void libpqrcv_send(WalReceiverConn *conn, const char *buffer,
72 : int nbytes);
73 : static char *libpqrcv_create_slot(WalReceiverConn *conn,
74 : const char *slotname,
75 : bool temporary,
76 : bool two_phase,
77 : CRSSnapshotAction snapshot_action,
78 : XLogRecPtr *lsn);
79 : static pid_t libpqrcv_get_backend_pid(WalReceiverConn *conn);
80 : static WalRcvExecResult *libpqrcv_exec(WalReceiverConn *conn,
81 : const char *query,
82 : const int nRetTypes,
83 : const Oid *retTypes);
84 : static void libpqrcv_disconnect(WalReceiverConn *conn);
85 :
86 : static WalReceiverFunctionsType PQWalReceiverFunctions = {
87 : .walrcv_connect = libpqrcv_connect,
88 : .walrcv_check_conninfo = libpqrcv_check_conninfo,
89 : .walrcv_get_conninfo = libpqrcv_get_conninfo,
90 : .walrcv_get_senderinfo = libpqrcv_get_senderinfo,
91 : .walrcv_identify_system = libpqrcv_identify_system,
92 : .walrcv_server_version = libpqrcv_server_version,
93 : .walrcv_readtimelinehistoryfile = libpqrcv_readtimelinehistoryfile,
94 : .walrcv_startstreaming = libpqrcv_startstreaming,
95 : .walrcv_endstreaming = libpqrcv_endstreaming,
96 : .walrcv_receive = libpqrcv_receive,
97 : .walrcv_send = libpqrcv_send,
98 : .walrcv_create_slot = libpqrcv_create_slot,
99 : .walrcv_get_backend_pid = libpqrcv_get_backend_pid,
100 : .walrcv_exec = libpqrcv_exec,
101 : .walrcv_disconnect = libpqrcv_disconnect
102 : };
103 :
104 : /* Prototypes for private functions */
105 : static PGresult *libpqrcv_PQexec(PGconn *streamConn, const char *query);
106 : static PGresult *libpqrcv_PQgetResult(PGconn *streamConn);
107 : static char *stringlist_to_identifierstr(PGconn *conn, List *strings);
108 :
109 : /*
110 : * Module initialization function
111 : */
112 : void
113 CBC 615 : _PG_init(void)
114 : {
115 615 : if (WalReceiverFunctions != NULL)
116 UBC 0 : elog(ERROR, "libpqwalreceiver already loaded");
117 CBC 615 : WalReceiverFunctions = &PQWalReceiverFunctions;
118 615 : }
119 :
120 : /*
121 : * Establish the connection to the primary server for XLOG streaming
122 : *
123 : * If an error occurs, this function will normally return NULL and set *err
124 : * to a palloc'ed error message. However, if must_use_password is true and
125 : * the connection fails to use the password, this function will ereport(ERROR).
126 : * We do this because in that case the error includes a detail and a hint for
127 : * consistency with other parts of the system, and it's not worth adding the
128 : * machinery to pass all of those back to the caller just to cover this one
129 : * case.
130 : */
131 : static WalReceiverConn *
132 GNC 608 : libpqrcv_connect(const char *conninfo, bool logical, bool must_use_password,
133 : const char *appname, char **err)
134 : {
135 : WalReceiverConn *conn;
136 : const char *keys[6];
137 ECB : const char *vals[6];
138 GIC 608 : int i = 0;
139 :
140 : /*
141 : * We use the expand_dbname parameter to process the connection string (or
142 : * URI), and pass some extra options.
143 ECB : */
144 GIC 608 : keys[i] = "dbname";
145 608 : vals[i] = conninfo;
146 608 : keys[++i] = "replication";
147 608 : vals[i] = logical ? "database" : "true";
148 608 : if (!logical)
149 ECB : {
150 : /*
151 : * The database name is ignored by the server in replication mode, but
152 : * specify "replication" for .pgpass lookup.
153 : */
154 GIC 162 : keys[++i] = "dbname";
155 162 : vals[i] = "replication";
156 : }
157 608 : keys[++i] = "fallback_application_name";
158 608 : vals[i] = appname;
159 CBC 608 : if (logical)
160 ECB : {
161 : /* Tell the publisher to translate to our encoding */
162 CBC 446 : keys[++i] = "client_encoding";
163 446 : vals[i] = GetDatabaseEncodingName();
164 ECB :
165 : /*
166 : * Force assorted GUC parameters to settings that ensure that the
167 : * publisher will output data values in a form that is unambiguous to
168 : * the subscriber. (We don't want to modify the subscriber's GUC
169 : * settings, since that might surprise user-defined code running in
170 : * the subscriber, such as triggers.) This should match what pg_dump
171 : * does.
172 : */
173 GIC 446 : keys[++i] = "options";
174 446 : vals[i] = "-c datestyle=ISO -c intervalstyle=postgres -c extra_float_digits=3";
175 : }
176 608 : keys[++i] = NULL;
177 608 : vals[i] = NULL;
178 ECB :
179 CBC 608 : Assert(i < sizeof(keys));
180 :
181 608 : conn = palloc0(sizeof(WalReceiverConn));
182 GNC 607 : conn->streamConn =
183 608 : libpqsrv_connect_params(keys, vals,
184 : /* expand_dbname = */ true,
185 : WAIT_EVENT_LIBPQWALRECEIVER_CONNECT);
186 ECB :
187 GIC 607 : if (PQstatus(conn->streamConn) != CONNECTION_OK)
188 71 : goto bad_connection_errmsg;
189 ECB :
190 GNC 536 : if (must_use_password && !PQconnectionUsedPassword(conn->streamConn))
191 : {
192 UNC 0 : libpqsrv_disconnect(conn->streamConn);
193 0 : pfree(conn);
194 :
195 0 : ereport(ERROR,
196 : (errcode(ERRCODE_S_R_E_PROHIBITED_SQL_STATEMENT_ATTEMPTED),
197 : errmsg("password is required"),
198 : errdetail("Non-superuser cannot connect if the server does not request a password."),
199 : errhint("Target server's authentication method must be changed. or set password_required=false in the subscription attributes.")));
200 : }
201 :
202 CBC 536 : if (logical)
203 : {
204 : PGresult *res;
205 ECB :
206 CBC 432 : res = libpqrcv_PQexec(conn->streamConn,
207 ECB : ALWAYS_SECURE_SEARCH_PATH_SQL);
208 CBC 432 : if (PQresultStatus(res) != PGRES_TUPLES_OK)
209 : {
210 UIC 0 : PQclear(res);
211 0 : *err = psprintf(_("could not clear search path: %s"),
212 0 : pchomp(PQerrorMessage(conn->streamConn)));
213 0 : goto bad_connection;
214 : }
215 GIC 432 : PQclear(res);
216 : }
217 :
218 536 : conn->logical = logical;
219 :
220 CBC 536 : return conn;
221 :
222 ECB : /* error path, using libpq's error message */
223 GIC 71 : bad_connection_errmsg:
224 CBC 71 : *err = pchomp(PQerrorMessage(conn->streamConn));
225 :
226 ECB : /* error path, error already set */
227 CBC 71 : bad_connection:
228 GNC 71 : libpqsrv_disconnect(conn->streamConn);
229 GIC 71 : pfree(conn);
230 CBC 71 : return NULL;
231 : }
232 ECB :
233 : /*
234 : * Validate connection info string, and determine whether it might cause
235 : * local filesystem access to be attempted.
236 : *
237 : * If the connection string can't be parsed, this function will raise
238 : * an error and will not return. If it can, it will return true if this
239 : * connection string specifies a password and false otherwise.
240 : */
241 : static void
242 GNC 130 : libpqrcv_check_conninfo(const char *conninfo, bool must_use_password)
243 ECB : {
244 GIC 130 : PQconninfoOption *opts = NULL;
245 : PQconninfoOption *opt;
246 CBC 130 : char *err = NULL;
247 :
248 130 : opts = PQconninfoParse(conninfo, &err);
249 GIC 130 : if (opts == NULL)
250 : {
251 ECB : /* The error string is malloc'd, so we must free it explicitly */
252 CBC 9 : char *errcopy = err ? pstrdup(err) : "out of memory";
253 :
254 9 : PQfreemem(err);
255 GIC 9 : ereport(ERROR,
256 ECB : (errcode(ERRCODE_SYNTAX_ERROR),
257 : errmsg("invalid connection string syntax: %s", errcopy)));
258 : }
259 :
260 GNC 121 : if (must_use_password)
261 : {
262 6 : bool uses_password = false;
263 :
264 129 : for (opt = opts; opt->keyword != NULL; ++opt)
265 : {
266 : /* Ignore connection options that are not present. */
267 126 : if (opt->val == NULL)
268 120 : continue;
269 :
270 6 : if (strcmp(opt->keyword, "password") == 0 && opt->val[0] != '\0')
271 : {
272 3 : uses_password = true;
273 3 : break;
274 : }
275 : }
276 :
277 6 : if (!uses_password)
278 3 : ereport(ERROR,
279 : (errcode(ERRCODE_S_R_E_PROHIBITED_SQL_STATEMENT_ATTEMPTED),
280 : errmsg("password is required"),
281 : errdetail("Non-superusers must provide a password in the connection string.")));
282 : }
283 :
284 GIC 118 : PQconninfoFree(opts);
285 CBC 118 : }
286 ECB :
287 : /*
288 : * Return a user-displayable conninfo string. Any security-sensitive fields
289 : * are obfuscated.
290 : */
291 : static char *
292 CBC 104 : libpqrcv_get_conninfo(WalReceiverConn *conn)
293 ECB : {
294 : PQconninfoOption *conn_opts;
295 : PQconninfoOption *conn_opt;
296 : PQExpBufferData buf;
297 : char *retval;
298 :
299 GIC 104 : Assert(conn->streamConn != NULL);
300 ECB :
301 GIC 104 : initPQExpBuffer(&buf);
302 104 : conn_opts = PQconninfo(conn->streamConn);
303 :
304 104 : if (conn_opts == NULL)
305 UIC 0 : ereport(ERROR,
306 : (errcode(ERRCODE_OUT_OF_MEMORY),
307 ECB : errmsg("could not parse connection string: %s",
308 : _("out of memory"))));
309 :
310 : /* build a clean connection string from pieces */
311 GIC 4160 : for (conn_opt = conn_opts; conn_opt->keyword != NULL; conn_opt++)
312 ECB : {
313 EUB : bool obfuscate;
314 :
315 : /* Skip debug and empty options */
316 GIC 4056 : if (strchr(conn_opt->dispchar, 'D') ||
317 3952 : conn_opt->val == NULL ||
318 1768 : conn_opt->val[0] == '\0')
319 CBC 2392 : continue;
320 :
321 : /* Obfuscate security-sensitive options */
322 GIC 1664 : obfuscate = strchr(conn_opt->dispchar, '*') != NULL;
323 :
324 CBC 3328 : appendPQExpBuffer(&buf, "%s%s=%s",
325 1664 : buf.len == 0 ? "" : " ",
326 ECB : conn_opt->keyword,
327 : obfuscate ? "********" : conn_opt->val);
328 : }
329 :
330 CBC 104 : PQconninfoFree(conn_opts);
331 :
332 104 : retval = PQExpBufferDataBroken(buf) ? NULL : pstrdup(buf.data);
333 104 : termPQExpBuffer(&buf);
334 GIC 104 : return retval;
335 : }
336 :
337 : /*
338 ECB : * Provides information of sender this WAL receiver is connected to.
339 : */
340 : static void
341 CBC 104 : libpqrcv_get_senderinfo(WalReceiverConn *conn, char **sender_host,
342 ECB : int *sender_port)
343 : {
344 GIC 104 : char *ret = NULL;
345 :
346 104 : *sender_host = NULL;
347 104 : *sender_port = 0;
348 :
349 CBC 104 : Assert(conn->streamConn != NULL);
350 :
351 GIC 104 : ret = PQhost(conn->streamConn);
352 CBC 104 : if (ret && strlen(ret) != 0)
353 GIC 104 : *sender_host = pstrdup(ret);
354 ECB :
355 CBC 104 : ret = PQport(conn->streamConn);
356 GIC 104 : if (ret && strlen(ret) != 0)
357 CBC 104 : *sender_port = atoi(ret);
358 GIC 104 : }
359 ECB :
360 : /*
361 : * Check that primary's system identifier matches ours, and fetch the current
362 : * timeline ID of the primary.
363 : */
364 : static char *
365 CBC 237 : libpqrcv_identify_system(WalReceiverConn *conn, TimeLineID *primary_tli)
366 ECB : {
367 : PGresult *res;
368 : char *primary_sysid;
369 :
370 : /*
371 : * Get the system identifier and timeline ID as a DataRow message from the
372 : * primary server.
373 : */
374 GIC 237 : res = libpqrcv_PQexec(conn->streamConn, "IDENTIFY_SYSTEM");
375 237 : if (PQresultStatus(res) != PGRES_TUPLES_OK)
376 : {
377 UIC 0 : PQclear(res);
378 0 : ereport(ERROR,
379 : (errcode(ERRCODE_PROTOCOL_VIOLATION),
380 : errmsg("could not receive database system identifier and timeline ID from "
381 : "the primary server: %s",
382 ECB : pchomp(PQerrorMessage(conn->streamConn)))));
383 : }
384 GIC 237 : if (PQnfields(res) < 3 || PQntuples(res) != 1)
385 EUB : {
386 UBC 0 : int ntuples = PQntuples(res);
387 UIC 0 : int nfields = PQnfields(res);
388 :
389 0 : PQclear(res);
390 0 : ereport(ERROR,
391 : (errcode(ERRCODE_PROTOCOL_VIOLATION),
392 ECB : errmsg("invalid response from primary server"),
393 : errdetail("Could not identify system: got %d rows and %d fields, expected %d rows and %d or more fields.",
394 EUB : ntuples, nfields, 3, 1)));
395 : }
396 GIC 237 : primary_sysid = pstrdup(PQgetvalue(res, 0, 0));
397 GBC 237 : *primary_tli = pg_strtoint32(PQgetvalue(res, 0, 1));
398 237 : PQclear(res);
399 :
400 GIC 237 : return primary_sysid;
401 : }
402 :
403 : /*
404 ECB : * Thin wrapper around libpq to obtain server version.
405 : */
406 : static int
407 GIC 1011 : libpqrcv_server_version(WalReceiverConn *conn)
408 ECB : {
409 GIC 1011 : return PQserverVersion(conn->streamConn);
410 : }
411 :
412 : /*
413 : * Start streaming WAL data from given streaming options.
414 : *
415 ECB : * Returns true if we switched successfully to copy-both mode. False
416 : * means the server received the command and executed it successfully, but
417 : * didn't switch to copy-mode. That means that there was no WAL on the
418 : * requested timeline and starting point, because the server switched to
419 : * another timeline at or before the requested starting point. On failure,
420 : * throws an ERROR.
421 : */
422 : static bool
423 GIC 386 : libpqrcv_startstreaming(WalReceiverConn *conn,
424 : const WalRcvStreamOptions *options)
425 : {
426 : StringInfoData cmd;
427 : PGresult *res;
428 :
429 386 : Assert(options->logical == conn->logical);
430 386 : Assert(options->slotname || !options->logical);
431 ECB :
432 GIC 386 : initStringInfo(&cmd);
433 :
434 : /* Build the command. */
435 386 : appendStringInfoString(&cmd, "START_REPLICATION");
436 386 : if (options->slotname != NULL)
437 CBC 304 : appendStringInfo(&cmd, " SLOT \"%s\"",
438 304 : options->slotname);
439 :
440 386 : if (options->logical)
441 GIC 282 : appendStringInfoString(&cmd, " LOGICAL");
442 :
443 CBC 386 : appendStringInfo(&cmd, " %X/%X", LSN_FORMAT_ARGS(options->startpoint));
444 ECB :
445 : /*
446 : * Additional options are different depending on if we are doing logical
447 : * or physical replication.
448 : */
449 CBC 386 : if (options->logical)
450 : {
451 ECB : char *pubnames_str;
452 : List *pubnames;
453 : char *pubnames_literal;
454 :
455 GIC 282 : appendStringInfoString(&cmd, " (");
456 :
457 CBC 282 : appendStringInfo(&cmd, "proto_version '%u'",
458 GIC 282 : options->proto.logical.proto_version);
459 :
460 GNC 282 : if (options->proto.logical.streaming_str)
461 33 : appendStringInfo(&cmd, ", streaming '%s'",
462 33 : options->proto.logical.streaming_str);
463 ECB :
464 GIC 285 : if (options->proto.logical.twophase &&
465 CBC 3 : PQserverVersion(conn->streamConn) >= 150000)
466 3 : appendStringInfoString(&cmd, ", two_phase 'on'");
467 :
468 GNC 564 : if (options->proto.logical.origin &&
469 282 : PQserverVersion(conn->streamConn) >= 160000)
470 282 : appendStringInfo(&cmd, ", origin '%s'",
471 282 : options->proto.logical.origin);
472 :
473 CBC 282 : pubnames = options->proto.logical.publication_names;
474 282 : pubnames_str = stringlist_to_identifierstr(conn->streamConn, pubnames);
475 282 : if (!pubnames_str)
476 UIC 0 : ereport(ERROR,
477 ECB : (errcode(ERRCODE_OUT_OF_MEMORY), /* likely guess */
478 : errmsg("could not start WAL streaming: %s",
479 : pchomp(PQerrorMessage(conn->streamConn)))));
480 GIC 282 : pubnames_literal = PQescapeLiteral(conn->streamConn, pubnames_str,
481 ECB : strlen(pubnames_str));
482 CBC 282 : if (!pubnames_literal)
483 LBC 0 : ereport(ERROR,
484 ECB : (errcode(ERRCODE_OUT_OF_MEMORY), /* likely guess */
485 : errmsg("could not start WAL streaming: %s",
486 : pchomp(PQerrorMessage(conn->streamConn)))));
487 CBC 282 : appendStringInfo(&cmd, ", publication_names %s", pubnames_literal);
488 282 : PQfreemem(pubnames_literal);
489 GBC 282 : pfree(pubnames_str);
490 :
491 GIC 293 : if (options->proto.logical.binary &&
492 11 : PQserverVersion(conn->streamConn) >= 140000)
493 CBC 11 : appendStringInfoString(&cmd, ", binary 'true'");
494 :
495 282 : appendStringInfoChar(&cmd, ')');
496 EUB : }
497 : else
498 GIC 104 : appendStringInfo(&cmd, " TIMELINE %u",
499 104 : options->proto.physical.startpointTLI);
500 ECB :
501 : /* Start streaming. */
502 CBC 386 : res = libpqrcv_PQexec(conn->streamConn, cmd.data);
503 GIC 386 : pfree(cmd.data);
504 ECB :
505 CBC 386 : if (PQresultStatus(res) == PGRES_COMMAND_OK)
506 ECB : {
507 UIC 0 : PQclear(res);
508 LBC 0 : return false;
509 : }
510 GIC 386 : else if (PQresultStatus(res) != PGRES_COPY_BOTH)
511 ECB : {
512 LBC 0 : PQclear(res);
513 UIC 0 : ereport(ERROR,
514 : (errcode(ERRCODE_PROTOCOL_VIOLATION),
515 ECB : errmsg("could not start WAL streaming: %s",
516 : pchomp(PQerrorMessage(conn->streamConn)))));
517 : }
518 CBC 386 : PQclear(res);
519 GIC 386 : return true;
520 EUB : }
521 :
522 : /*
523 ECB : * Stop streaming WAL data. Returns the next timeline's ID in *next_tli, as
524 : * reported by the server, or 0 if it did not report it.
525 EUB : */
526 : static void
527 GIC 186 : libpqrcv_endstreaming(WalReceiverConn *conn, TimeLineID *next_tli)
528 : {
529 : PGresult *res;
530 :
531 ECB : /*
532 : * Send copy-end message. As in libpqrcv_PQexec, this could theoretically
533 : * block, but the risk seems small.
534 : */
535 GIC 347 : if (PQputCopyEnd(conn->streamConn, NULL) <= 0 ||
536 161 : PQflush(conn->streamConn))
537 25 : ereport(ERROR,
538 : (errcode(ERRCODE_CONNECTION_FAILURE),
539 : errmsg("could not send end-of-streaming message to primary: %s",
540 ECB : pchomp(PQerrorMessage(conn->streamConn)))));
541 :
542 GIC 161 : *next_tli = 0;
543 :
544 : /*
545 : * After COPY is finished, we should receive a result set indicating the
546 : * next timeline's ID, or just CommandComplete if the server was shut
547 : * down.
548 ECB : *
549 : * If we had not yet received CopyDone from the backend, PGRES_COPY_OUT is
550 : * also possible in case we aborted the copy in mid-stream.
551 : */
552 GIC 161 : res = libpqrcv_PQgetResult(conn->streamConn);
553 161 : if (PQresultStatus(res) == PGRES_TUPLES_OK)
554 : {
555 ECB : /*
556 : * Read the next timeline's ID. The server also sends the timeline's
557 : * starting point, but it is ignored.
558 : */
559 GIC 12 : if (PQnfields(res) < 2 || PQntuples(res) != 1)
560 UIC 0 : ereport(ERROR,
561 : (errcode(ERRCODE_PROTOCOL_VIOLATION),
562 : errmsg("unexpected result set after end-of-streaming")));
563 GIC 12 : *next_tli = pg_strtoint32(PQgetvalue(res, 0, 0));
564 12 : PQclear(res);
565 ECB :
566 : /* the result set should be followed by CommandComplete */
567 GIC 12 : res = libpqrcv_PQgetResult(conn->streamConn);
568 : }
569 149 : else if (PQresultStatus(res) == PGRES_COPY_OUT)
570 : {
571 149 : PQclear(res);
572 ECB :
573 EUB : /* End the copy */
574 GIC 149 : if (PQendcopy(conn->streamConn))
575 1 : ereport(ERROR,
576 ECB : (errcode(ERRCODE_CONNECTION_FAILURE),
577 : errmsg("error while shutting down streaming COPY: %s",
578 : pchomp(PQerrorMessage(conn->streamConn)))));
579 :
580 : /* CommandComplete should follow */
581 GIC 148 : res = libpqrcv_PQgetResult(conn->streamConn);
582 ECB : }
583 :
584 CBC 160 : if (PQresultStatus(res) != PGRES_COMMAND_OK)
585 UIC 0 : ereport(ERROR,
586 : (errcode(ERRCODE_PROTOCOL_VIOLATION),
587 ECB : errmsg("error reading result of streaming command: %s",
588 : pchomp(PQerrorMessage(conn->streamConn)))));
589 GIC 160 : PQclear(res);
590 :
591 : /* Verify that there are no more results */
592 160 : res = libpqrcv_PQgetResult(conn->streamConn);
593 160 : if (res != NULL)
594 LBC 0 : ereport(ERROR,
595 : (errcode(ERRCODE_PROTOCOL_VIOLATION),
596 : errmsg("unexpected result after CommandComplete: %s",
597 ECB : pchomp(PQerrorMessage(conn->streamConn)))));
598 GBC 160 : }
599 :
600 : /*
601 : * Fetch the timeline history file for 'tli' from primary.
602 ECB : */
603 : static void
604 GIC 11 : libpqrcv_readtimelinehistoryfile(WalReceiverConn *conn,
605 ECB : TimeLineID tli, char **filename,
606 : char **content, int *len)
607 EUB : {
608 : PGresult *res;
609 : char cmd[64];
610 :
611 CBC 11 : Assert(!conn->logical);
612 :
613 : /*
614 : * Request the primary to send over the history file for given timeline.
615 : */
616 GIC 11 : snprintf(cmd, sizeof(cmd), "TIMELINE_HISTORY %u", tli);
617 CBC 11 : res = libpqrcv_PQexec(conn->streamConn, cmd);
618 GIC 11 : if (PQresultStatus(res) != PGRES_TUPLES_OK)
619 : {
620 UIC 0 : PQclear(res);
621 0 : ereport(ERROR,
622 : (errcode(ERRCODE_PROTOCOL_VIOLATION),
623 : errmsg("could not receive timeline history file from "
624 ECB : "the primary server: %s",
625 : pchomp(PQerrorMessage(conn->streamConn)))));
626 : }
627 GIC 11 : if (PQnfields(res) != 2 || PQntuples(res) != 1)
628 : {
629 LBC 0 : int ntuples = PQntuples(res);
630 0 : int nfields = PQnfields(res);
631 ECB :
632 UIC 0 : PQclear(res);
633 UBC 0 : ereport(ERROR,
634 EUB : (errcode(ERRCODE_PROTOCOL_VIOLATION),
635 : errmsg("invalid response from primary server"),
636 : errdetail("Expected 1 tuple with 2 fields, got %d tuples with %d fields.",
637 : ntuples, nfields)));
638 : }
639 GIC 11 : *filename = pstrdup(PQgetvalue(res, 0, 0));
640 ECB :
641 GIC 11 : *len = PQgetlength(res, 0, 1);
642 GBC 11 : *content = palloc(*len);
643 11 : memcpy(*content, PQgetvalue(res, 0, 1), *len);
644 GIC 11 : PQclear(res);
645 GBC 11 : }
646 EUB :
647 : /*
648 : * Send a query and wait for the results by using the asynchronous libpq
649 : * functions and socket readiness events.
650 : *
651 : * We must not use the regular blocking libpq functions like PQexec()
652 ECB : * since they are uninterruptible by signals on some platforms, such as
653 : * Windows.
654 : *
655 : * The function is modeled on PQexec() in libpq, but only implements
656 : * those parts that are in use in the walreceiver api.
657 : *
658 : * May return NULL, rather than an error result, on failure.
659 : */
660 : static PGresult *
661 GIC 2760 : libpqrcv_PQexec(PGconn *streamConn, const char *query)
662 : {
663 2760 : PGresult *lastResult = NULL;
664 :
665 : /*
666 : * PQexec() silently discards any prior query results on the connection.
667 : * This is not required for this function as it's expected that the caller
668 : * (which is this library in all cases) will behave correctly and we don't
669 : * have to be backwards compatible with old libpq.
670 : */
671 :
672 : /*
673 : * Submit the query. Since we don't use non-blocking mode, this could
674 ECB : * theoretically block. In practice, since we don't send very long query
675 : * strings, the risk seems negligible.
676 : */
677 GIC 2760 : if (!PQsendQuery(streamConn, query))
678 UIC 0 : return NULL;
679 :
680 : for (;;)
681 GIC 2219 : {
682 : /* Wait for, and collect, the next PGresult. */
683 : PGresult *result;
684 :
685 4979 : result = libpqrcv_PQgetResult(streamConn);
686 4979 : if (result == NULL)
687 2219 : break; /* query is complete, or failure */
688 :
689 : /*
690 ECB : * Emulate PQexec()'s behavior of returning the last result when there
691 EUB : * are many. We are fine with returning just last error message.
692 : */
693 GIC 2760 : PQclear(lastResult);
694 CBC 2760 : lastResult = result;
695 :
696 GIC 5520 : if (PQresultStatus(lastResult) == PGRES_COPY_IN ||
697 5365 : PQresultStatus(lastResult) == PGRES_COPY_OUT ||
698 CBC 4824 : PQresultStatus(lastResult) == PGRES_COPY_BOTH ||
699 2219 : PQstatus(streamConn) == CONNECTION_BAD)
700 ECB : break;
701 : }
702 :
703 GIC 2760 : return lastResult;
704 : }
705 :
706 ECB : /*
707 : * Perform the equivalent of PQgetResult(), but watch for interrupts.
708 : */
709 : static PGresult *
710 CBC 5834 : libpqrcv_PQgetResult(PGconn *streamConn)
711 ECB : {
712 : /*
713 : * Collect data until PQgetResult is ready to get the result without
714 : * blocking.
715 : */
716 CBC 8608 : while (PQisBusy(streamConn))
717 : {
718 : int rc;
719 :
720 : /*
721 : * We don't need to break down the sleep into smaller increments,
722 : * since we'll get interrupted by signals and can handle any
723 ECB : * interrupts here.
724 : */
725 GIC 2799 : rc = WaitLatchOrSocket(MyLatch,
726 : WL_EXIT_ON_PM_DEATH | WL_SOCKET_READABLE |
727 : WL_LATCH_SET,
728 : PQsocket(streamConn),
729 ECB : 0,
730 : WAIT_EVENT_LIBPQWALRECEIVER_RECEIVE);
731 :
732 : /* Interrupted? */
733 GIC 2799 : if (rc & WL_LATCH_SET)
734 : {
735 2 : ResetLatch(MyLatch);
736 2 : ProcessWalRcvInterrupts();
737 : }
738 ECB :
739 : /* Consume whatever data is available from the socket */
740 GIC 2799 : if (PQconsumeInput(streamConn) == 0)
741 : {
742 : /* trouble; return NULL */
743 25 : return NULL;
744 : }
745 : }
746 ECB :
747 : /* Now we can collect and return the next PGresult */
748 CBC 5809 : return PQgetResult(streamConn);
749 ECB : }
750 :
751 : /*
752 : * Disconnect connection to primary, if any.
753 : */
754 : static void
755 GIC 536 : libpqrcv_disconnect(WalReceiverConn *conn)
756 ECB : {
757 GNC 536 : libpqsrv_disconnect(conn->streamConn);
758 536 : PQfreemem(conn->recvBuf);
759 GIC 536 : pfree(conn);
760 CBC 536 : }
761 :
762 : /*
763 : * Receive a message available from XLOG stream.
764 : *
765 : * Returns:
766 : *
767 ECB : * If data was received, returns the length of the data. *buffer is set to
768 : * point to a buffer holding the received message. The buffer is only valid
769 : * until the next libpqrcv_* call.
770 : *
771 : * If no data was available immediately, returns 0, and *wait_fd is set to a
772 : * socket descriptor which can be waited on before trying again.
773 : *
774 : * -1 if the server ended the COPY.
775 : *
776 : * ereports on error.
777 : */
778 : static int
779 GIC 310892 : libpqrcv_receive(WalReceiverConn *conn, char **buffer,
780 : pgsocket *wait_fd)
781 : {
782 : int rawlen;
783 :
784 GNC 310892 : PQfreemem(conn->recvBuf);
785 GIC 310892 : conn->recvBuf = NULL;
786 :
787 : /* Try to receive a CopyData message */
788 310892 : rawlen = PQgetCopyData(conn->streamConn, &conn->recvBuf, 1);
789 310892 : if (rawlen == 0)
790 ECB : {
791 : /* Try consuming some data. */
792 GIC 199916 : if (PQconsumeInput(conn->streamConn) == 0)
793 33 : ereport(ERROR,
794 : (errcode(ERRCODE_CONNECTION_FAILURE),
795 ECB : errmsg("could not receive data from WAL stream: %s",
796 : pchomp(PQerrorMessage(conn->streamConn)))));
797 :
798 : /* Now that we've consumed some input, try again */
799 CBC 199883 : rawlen = PQgetCopyData(conn->streamConn, &conn->recvBuf, 1);
800 199883 : if (rawlen == 0)
801 : {
802 : /* Tell caller to try again when our socket is ready. */
803 89644 : *wait_fd = PQsocket(conn->streamConn);
804 89644 : return 0;
805 : }
806 : }
807 GIC 221215 : if (rawlen == -1) /* end-of-streaming or error */
808 : {
809 : PGresult *res;
810 ECB :
811 CBC 196 : res = libpqrcv_PQgetResult(conn->streamConn);
812 GIC 196 : if (PQresultStatus(res) == PGRES_COMMAND_OK)
813 : {
814 CBC 178 : PQclear(res);
815 ECB :
816 : /* Verify that there are no more results. */
817 GIC 178 : res = libpqrcv_PQgetResult(conn->streamConn);
818 CBC 178 : if (res != NULL)
819 : {
820 UIC 0 : PQclear(res);
821 :
822 ECB : /*
823 : * If the other side closed the connection orderly (otherwise
824 : * we'd seen an error, or PGRES_COPY_IN) don't report an error
825 : * here, but let callers deal with it.
826 : */
827 UIC 0 : if (PQstatus(conn->streamConn) == CONNECTION_BAD)
828 LBC 0 : return -1;
829 ECB :
830 UIC 0 : ereport(ERROR,
831 EUB : (errcode(ERRCODE_PROTOCOL_VIOLATION),
832 : errmsg("unexpected result after CommandComplete: %s",
833 : PQerrorMessage(conn->streamConn))));
834 : }
835 :
836 GIC 178 : return -1;
837 : }
838 GBC 18 : else if (PQresultStatus(res) == PGRES_COPY_IN)
839 EUB : {
840 GIC 12 : PQclear(res);
841 GBC 12 : return -1;
842 : }
843 : else
844 : {
845 GIC 6 : PQclear(res);
846 6 : ereport(ERROR,
847 ECB : (errcode(ERRCODE_PROTOCOL_VIOLATION),
848 : errmsg("could not receive data from WAL stream: %s",
849 : pchomp(PQerrorMessage(conn->streamConn)))));
850 : }
851 : }
852 CBC 221019 : if (rawlen < -1)
853 UIC 0 : ereport(ERROR,
854 : (errcode(ERRCODE_PROTOCOL_VIOLATION),
855 : errmsg("could not receive data from WAL stream: %s",
856 ECB : pchomp(PQerrorMessage(conn->streamConn)))));
857 :
858 : /* Return received messages to caller */
859 GIC 221019 : *buffer = conn->recvBuf;
860 221019 : return rawlen;
861 : }
862 :
863 ECB : /*
864 EUB : * Send a message to XLOG stream.
865 : *
866 : * ereports on error.
867 : */
868 : static void
869 GIC 93966 : libpqrcv_send(WalReceiverConn *conn, const char *buffer, int nbytes)
870 ECB : {
871 CBC 187932 : if (PQputCopyData(conn->streamConn, buffer, nbytes) <= 0 ||
872 GIC 93966 : PQflush(conn->streamConn))
873 UIC 0 : ereport(ERROR,
874 : (errcode(ERRCODE_CONNECTION_FAILURE),
875 : errmsg("could not send data to WAL stream: %s",
876 : pchomp(PQerrorMessage(conn->streamConn)))));
877 GIC 93966 : }
878 :
879 : /*
880 ECB : * Create new replication slot.
881 : * Returns the name of the exported snapshot for logical slot or NULL for
882 : * physical slot.
883 : */
884 EUB : static char *
885 GIC 224 : libpqrcv_create_slot(WalReceiverConn *conn, const char *slotname,
886 : bool temporary, bool two_phase, CRSSnapshotAction snapshot_action,
887 : XLogRecPtr *lsn)
888 ECB : {
889 : PGresult *res;
890 : StringInfoData cmd;
891 : char *snapshot;
892 : int use_new_options_syntax;
893 :
894 GIC 224 : use_new_options_syntax = (PQserverVersion(conn->streamConn) >= 150000);
895 :
896 CBC 224 : initStringInfo(&cmd);
897 :
898 GIC 224 : appendStringInfo(&cmd, "CREATE_REPLICATION_SLOT \"%s\"", slotname);
899 :
900 224 : if (temporary)
901 UIC 0 : appendStringInfoString(&cmd, " TEMPORARY");
902 :
903 GIC 224 : if (conn->logical)
904 : {
905 CBC 224 : appendStringInfoString(&cmd, " LOGICAL pgoutput ");
906 GIC 224 : if (use_new_options_syntax)
907 CBC 224 : appendStringInfoChar(&cmd, '(');
908 GIC 224 : if (two_phase)
909 ECB : {
910 GIC 1 : appendStringInfoString(&cmd, "TWO_PHASE");
911 CBC 1 : if (use_new_options_syntax)
912 GBC 1 : appendStringInfoString(&cmd, ", ");
913 : else
914 LBC 0 : appendStringInfoChar(&cmd, ' ');
915 : }
916 ECB :
917 CBC 224 : if (use_new_options_syntax)
918 ECB : {
919 CBC 224 : switch (snapshot_action)
920 : {
921 LBC 0 : case CRS_EXPORT_SNAPSHOT:
922 0 : appendStringInfoString(&cmd, "SNAPSHOT 'export'");
923 0 : break;
924 GIC 69 : case CRS_NOEXPORT_SNAPSHOT:
925 GBC 69 : appendStringInfoString(&cmd, "SNAPSHOT 'nothing'");
926 GIC 69 : break;
927 155 : case CRS_USE_SNAPSHOT:
928 CBC 155 : appendStringInfoString(&cmd, "SNAPSHOT 'use'");
929 GIC 155 : break;
930 ECB : }
931 : }
932 EUB : else
933 : {
934 UBC 0 : switch (snapshot_action)
935 ECB : {
936 LBC 0 : case CRS_EXPORT_SNAPSHOT:
937 0 : appendStringInfoString(&cmd, "EXPORT_SNAPSHOT");
938 0 : break;
939 0 : case CRS_NOEXPORT_SNAPSHOT:
940 0 : appendStringInfoString(&cmd, "NOEXPORT_SNAPSHOT");
941 UIC 0 : break;
942 0 : case CRS_USE_SNAPSHOT:
943 0 : appendStringInfoString(&cmd, "USE_SNAPSHOT");
944 0 : break;
945 EUB : }
946 : }
947 :
948 GBC 224 : if (use_new_options_syntax)
949 224 : appendStringInfoChar(&cmd, ')');
950 EUB : }
951 : else
952 : {
953 UBC 0 : if (use_new_options_syntax)
954 0 : appendStringInfoString(&cmd, " PHYSICAL (RESERVE_WAL)");
955 EUB : else
956 UIC 0 : appendStringInfoString(&cmd, " PHYSICAL RESERVE_WAL");
957 : }
958 :
959 CBC 224 : res = libpqrcv_PQexec(conn->streamConn, cmd.data);
960 224 : pfree(cmd.data);
961 :
962 GIC 224 : if (PQresultStatus(res) != PGRES_TUPLES_OK)
963 : {
964 UBC 0 : PQclear(res);
965 0 : ereport(ERROR,
966 : (errcode(ERRCODE_PROTOCOL_VIOLATION),
967 EUB : errmsg("could not create replication slot \"%s\": %s",
968 : slotname, pchomp(PQerrorMessage(conn->streamConn)))));
969 : }
970 ECB :
971 CBC 224 : if (lsn)
972 GIC 155 : *lsn = DatumGetLSN(DirectFunctionCall1Coll(pg_lsn_in, InvalidOid,
973 CBC 155 : CStringGetDatum(PQgetvalue(res, 0, 1))));
974 :
975 GBC 224 : if (!PQgetisnull(res, 0, 2))
976 UBC 0 : snapshot = pstrdup(PQgetvalue(res, 0, 2));
977 : else
978 GIC 224 : snapshot = NULL;
979 :
980 224 : PQclear(res);
981 :
982 CBC 224 : return snapshot;
983 ECB : }
984 :
985 : /*
986 : * Return PID of remote backend process.
987 EUB : */
988 : static pid_t
989 LBC 0 : libpqrcv_get_backend_pid(WalReceiverConn *conn)
990 : {
991 0 : return PQbackendPID(conn->streamConn);
992 : }
993 ECB :
994 : /*
995 : * Convert tuple query result to tuplestore.
996 : */
997 : static void
998 GIC 823 : libpqrcv_processTuples(PGresult *pgres, WalRcvExecResult *walres,
999 : const int nRetTypes, const Oid *retTypes)
1000 EUB : {
1001 : int tupn;
1002 : int coln;
1003 GIC 823 : int nfields = PQnfields(pgres);
1004 : HeapTuple tuple;
1005 : AttInMetadata *attinmeta;
1006 : MemoryContext rowcontext;
1007 : MemoryContext oldcontext;
1008 :
1009 ECB : /* Make sure we got expected number of fields. */
1010 GIC 823 : if (nfields != nRetTypes)
1011 UIC 0 : ereport(ERROR,
1012 : (errcode(ERRCODE_PROTOCOL_VIOLATION),
1013 : errmsg("invalid query response"),
1014 ECB : errdetail("Expected %d fields, got %d fields.",
1015 : nRetTypes, nfields)));
1016 :
1017 GIC 823 : walres->tuplestore = tuplestore_begin_heap(true, false, work_mem);
1018 :
1019 : /* Create tuple descriptor corresponding to expected result. */
1020 823 : walres->tupledesc = CreateTemplateTupleDesc(nRetTypes);
1021 CBC 2639 : for (coln = 0; coln < nRetTypes; coln++)
1022 GBC 1816 : TupleDescInitEntry(walres->tupledesc, (AttrNumber) coln + 1,
1023 GIC 1816 : PQfname(pgres, coln), retTypes[coln], -1, 0);
1024 823 : attinmeta = TupleDescGetAttInMetadata(walres->tupledesc);
1025 :
1026 : /* No point in doing more here if there were no tuples returned. */
1027 823 : if (PQntuples(pgres) == 0)
1028 CBC 8 : return;
1029 :
1030 : /* Create temporary context for local allocations. */
1031 815 : rowcontext = AllocSetContextCreate(CurrentMemoryContext,
1032 ECB : "libpqrcv query result context",
1033 : ALLOCSET_DEFAULT_SIZES);
1034 :
1035 : /* Process returned rows. */
1036 GIC 1900 : for (tupn = 0; tupn < PQntuples(pgres); tupn++)
1037 : {
1038 ECB : char *cstrs[MaxTupleAttributeNumber];
1039 :
1040 GIC 1085 : ProcessWalRcvInterrupts();
1041 :
1042 ECB : /* Do the allocations in temporary context. */
1043 GIC 1085 : oldcontext = MemoryContextSwitchTo(rowcontext);
1044 :
1045 : /*
1046 : * Fill cstrs with null-terminated strings of column values.
1047 ECB : */
1048 GIC 3755 : for (coln = 0; coln < nfields; coln++)
1049 : {
1050 2670 : if (PQgetisnull(pgres, tupn, coln))
1051 CBC 374 : cstrs[coln] = NULL;
1052 : else
1053 GIC 2296 : cstrs[coln] = PQgetvalue(pgres, tupn, coln);
1054 ECB : }
1055 :
1056 : /* Convert row to a tuple, and add it to the tuplestore */
1057 GIC 1085 : tuple = BuildTupleFromCStrings(attinmeta, cstrs);
1058 1085 : tuplestore_puttuple(walres->tuplestore, tuple);
1059 ECB :
1060 : /* Clean up */
1061 CBC 1085 : MemoryContextSwitchTo(oldcontext);
1062 1085 : MemoryContextReset(rowcontext);
1063 : }
1064 ECB :
1065 GIC 815 : MemoryContextDelete(rowcontext);
1066 : }
1067 :
1068 ECB : /*
1069 : * Public interface for sending generic queries (and commands).
1070 : *
1071 : * This can only be called from process connected to database.
1072 : */
1073 : static WalRcvExecResult *
1074 GIC 1470 : libpqrcv_exec(WalReceiverConn *conn, const char *query,
1075 : const int nRetTypes, const Oid *retTypes)
1076 ECB : {
1077 GIC 1470 : PGresult *pgres = NULL;
1078 1470 : WalRcvExecResult *walres = palloc0(sizeof(WalRcvExecResult));
1079 : char *diag_sqlstate;
1080 :
1081 1470 : if (MyDatabaseId == InvalidOid)
1082 UIC 0 : ereport(ERROR,
1083 : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1084 : errmsg("the query interface requires a database connection")));
1085 ECB :
1086 GIC 1470 : pgres = libpqrcv_PQexec(conn->streamConn, query);
1087 :
1088 CBC 1470 : switch (PQresultStatus(pgres))
1089 ECB : {
1090 GIC 823 : case PGRES_SINGLE_TUPLE:
1091 : case PGRES_TUPLES_OK:
1092 CBC 823 : walres->status = WALRCV_OK_TUPLES;
1093 GBC 823 : libpqrcv_processTuples(pgres, walres, nRetTypes, retTypes);
1094 GIC 823 : break;
1095 :
1096 UIC 0 : case PGRES_COPY_IN:
1097 LBC 0 : walres->status = WALRCV_OK_COPY_IN;
1098 UIC 0 : break;
1099 ECB :
1100 GIC 155 : case PGRES_COPY_OUT:
1101 CBC 155 : walres->status = WALRCV_OK_COPY_OUT;
1102 GIC 155 : break;
1103 ECB :
1104 LBC 0 : case PGRES_COPY_BOTH:
1105 0 : walres->status = WALRCV_OK_COPY_BOTH;
1106 UIC 0 : break;
1107 EUB :
1108 GBC 492 : case PGRES_COMMAND_OK:
1109 492 : walres->status = WALRCV_OK_COMMAND;
1110 GIC 492 : break;
1111 ECB :
1112 : /* Empty query is considered error. */
1113 LBC 0 : case PGRES_EMPTY_QUERY:
1114 UIC 0 : walres->status = WALRCV_ERROR;
1115 UBC 0 : walres->err = _("empty query");
1116 0 : break;
1117 EUB :
1118 UIC 0 : case PGRES_PIPELINE_SYNC:
1119 ECB : case PGRES_PIPELINE_ABORTED:
1120 LBC 0 : walres->status = WALRCV_ERROR;
1121 0 : walres->err = _("unexpected pipeline mode");
1122 UIC 0 : break;
1123 :
1124 UBC 0 : case PGRES_NONFATAL_ERROR:
1125 EUB : case PGRES_FATAL_ERROR:
1126 : case PGRES_BAD_RESPONSE:
1127 UBC 0 : walres->status = WALRCV_ERROR;
1128 UIC 0 : walres->err = pchomp(PQerrorMessage(conn->streamConn));
1129 UBC 0 : diag_sqlstate = PQresultErrorField(pgres, PG_DIAG_SQLSTATE);
1130 UIC 0 : if (diag_sqlstate)
1131 UBC 0 : walres->sqlstate = MAKE_SQLSTATE(diag_sqlstate[0],
1132 EUB : diag_sqlstate[1],
1133 : diag_sqlstate[2],
1134 : diag_sqlstate[3],
1135 : diag_sqlstate[4]);
1136 UIC 0 : break;
1137 : }
1138 EUB :
1139 GBC 1470 : PQclear(pgres);
1140 EUB :
1141 GBC 1470 : return walres;
1142 EUB : }
1143 :
1144 : /*
1145 : * Given a List of strings, return it as single comma separated
1146 : * string, quoting identifiers as needed.
1147 : *
1148 : * This is essentially the reverse of SplitIdentifierString.
1149 : *
1150 ECB : * The caller should free the result.
1151 : */
1152 : static char *
1153 GIC 282 : stringlist_to_identifierstr(PGconn *conn, List *strings)
1154 : {
1155 : ListCell *lc;
1156 : StringInfoData res;
1157 282 : bool first = true;
1158 :
1159 282 : initStringInfo(&res);
1160 :
1161 779 : foreach(lc, strings)
1162 : {
1163 497 : char *val = strVal(lfirst(lc));
1164 ECB : char *val_escaped;
1165 :
1166 GIC 497 : if (first)
1167 282 : first = false;
1168 ECB : else
1169 GIC 215 : appendStringInfoChar(&res, ',');
1170 ECB :
1171 GIC 497 : val_escaped = PQescapeIdentifier(conn, val, strlen(val));
1172 CBC 497 : if (!val_escaped)
1173 : {
1174 LBC 0 : free(res.data);
1175 UIC 0 : return NULL;
1176 : }
1177 CBC 497 : appendStringInfoString(&res, val_escaped);
1178 497 : PQfreemem(val_escaped);
1179 : }
1180 ECB :
1181 GIC 282 : return res.data;
1182 ECB : }
|