TLA Line data Source code
1 : /*
2 : * dblink.c
3 : *
4 : * Functions returning results from a remote database
5 : *
6 : * Joe Conway <mail@joeconway.com>
7 : * And contributors:
8 : * Darko Prenosil <Darko.Prenosil@finteh.hr>
9 : * Shridhar Daithankar <shridhar_daithankar@persistent.co.in>
10 : *
11 : * contrib/dblink/dblink.c
12 : * Copyright (c) 2001-2023, PostgreSQL Global Development Group
13 : * ALL RIGHTS RESERVED;
14 : *
15 : * Permission to use, copy, modify, and distribute this software and its
16 : * documentation for any purpose, without fee, and without a written agreement
17 : * is hereby granted, provided that the above copyright notice and this
18 : * paragraph and the following two paragraphs appear in all copies.
19 : *
20 : * IN NO EVENT SHALL THE AUTHOR OR DISTRIBUTORS BE LIABLE TO ANY PARTY FOR
21 : * DIRECT, INDIRECT, SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES, INCLUDING
22 : * LOST PROFITS, ARISING OUT OF THE USE OF THIS SOFTWARE AND ITS
23 : * DOCUMENTATION, EVEN IF THE AUTHOR OR DISTRIBUTORS HAVE BEEN ADVISED OF THE
24 : * POSSIBILITY OF SUCH DAMAGE.
25 : *
26 : * THE AUTHOR AND DISTRIBUTORS SPECIFICALLY DISCLAIMS ANY WARRANTIES,
27 : * INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY
28 : * AND FITNESS FOR A PARTICULAR PURPOSE. THE SOFTWARE PROVIDED HEREUNDER IS
29 : * ON AN "AS IS" BASIS, AND THE AUTHOR AND DISTRIBUTORS HAS NO OBLIGATIONS TO
30 : * PROVIDE MAINTENANCE, SUPPORT, UPDATES, ENHANCEMENTS, OR MODIFICATIONS.
31 : *
32 : */
33 : #include "postgres.h"
34 :
35 : #include <limits.h>
36 :
37 : #include "access/htup_details.h"
38 : #include "access/relation.h"
39 : #include "access/reloptions.h"
40 : #include "access/table.h"
41 : #include "catalog/namespace.h"
42 : #include "catalog/pg_foreign_data_wrapper.h"
43 : #include "catalog/pg_foreign_server.h"
44 : #include "catalog/pg_type.h"
45 : #include "catalog/pg_user_mapping.h"
46 : #include "executor/spi.h"
47 : #include "foreign/foreign.h"
48 : #include "funcapi.h"
49 : #include "lib/stringinfo.h"
50 : #include "libpq-fe.h"
51 : #include "libpq/libpq-be-fe-helpers.h"
52 : #include "mb/pg_wchar.h"
53 : #include "miscadmin.h"
54 : #include "parser/scansup.h"
55 : #include "utils/acl.h"
56 : #include "utils/builtins.h"
57 : #include "utils/fmgroids.h"
58 : #include "utils/guc.h"
59 : #include "utils/lsyscache.h"
60 : #include "utils/memutils.h"
61 : #include "utils/rel.h"
62 : #include "utils/varlena.h"
63 :
64 GIC 3 : PG_MODULE_MAGIC;
65 ECB :
66 : typedef struct remoteConn
67 : {
68 : PGconn *conn; /* Hold the remote connection */
69 : int openCursorCount; /* The number of open cursors */
70 : bool newXactForCursor; /* Opened a transaction for a cursor */
71 : } remoteConn;
72 :
73 : typedef struct storeInfo
74 : {
75 : FunctionCallInfo fcinfo;
76 : Tuplestorestate *tuplestore;
77 : AttInMetadata *attinmeta;
78 : MemoryContext tmpcontext;
79 : char **cstrs;
80 : /* temp storage for results to avoid leaks on exception */
81 : PGresult *last_res;
82 : PGresult *cur_res;
83 : } storeInfo;
84 :
85 : /*
86 : * Internal declarations
87 : */
88 : static Datum dblink_record_internal(FunctionCallInfo fcinfo, bool is_async);
89 : static void prepTuplestoreResult(FunctionCallInfo fcinfo);
90 : static void materializeResult(FunctionCallInfo fcinfo, PGconn *conn,
91 : PGresult *res);
92 : static void materializeQueryResult(FunctionCallInfo fcinfo,
93 : PGconn *conn,
94 : const char *conname,
95 : const char *sql,
96 : bool fail);
97 : static PGresult *storeQueryResult(volatile storeInfo *sinfo, PGconn *conn, const char *sql);
98 : static void storeRow(volatile storeInfo *sinfo, PGresult *res, bool first);
99 : static remoteConn *getConnectionByName(const char *name);
100 : static HTAB *createConnHash(void);
101 : static void createNewConnection(const char *name, remoteConn *rconn);
102 : static void deleteConnection(const char *name);
103 : static char **get_pkey_attnames(Relation rel, int16 *indnkeyatts);
104 : static char **get_text_array_contents(ArrayType *array, int *numitems);
105 : static char *get_sql_insert(Relation rel, int *pkattnums, int pknumatts, char **src_pkattvals, char **tgt_pkattvals);
106 : static char *get_sql_delete(Relation rel, int *pkattnums, int pknumatts, char **tgt_pkattvals);
107 : static char *get_sql_update(Relation rel, int *pkattnums, int pknumatts, char **src_pkattvals, char **tgt_pkattvals);
108 : static char *quote_ident_cstr(char *rawstr);
109 : static int get_attnum_pk_pos(int *pkattnums, int pknumatts, int key);
110 : static HeapTuple get_tuple_of_interest(Relation rel, int *pkattnums, int pknumatts, char **src_pkattvals);
111 : static Relation get_rel_from_relname(text *relname_text, LOCKMODE lockmode, AclMode aclmode);
112 : static char *generate_relation_name(Relation rel);
113 : static void dblink_connstr_check(const char *connstr);
114 : static void dblink_security_check(PGconn *conn, remoteConn *rconn);
115 : static void dblink_res_error(PGconn *conn, const char *conname, PGresult *res,
116 : bool fail, const char *fmt,...) pg_attribute_printf(5, 6);
117 : static char *get_connect_string(const char *servername);
118 : static char *escape_param_str(const char *str);
119 : static void validate_pkattnums(Relation rel,
120 : int2vector *pkattnums_arg, int32 pknumatts_arg,
121 : int **pkattnums, int *pknumatts);
122 : static bool is_valid_dblink_option(const PQconninfoOption *options,
123 : const char *option, Oid context);
124 : static int applyRemoteGucs(PGconn *conn);
125 : static void restoreLocalGucs(int nestlevel);
126 :
127 : /* Global */
128 : static remoteConn *pconn = NULL;
129 : static HTAB *remoteConnHash = NULL;
130 :
131 : /*
132 : * Following is list that holds multiple remote connections.
133 : * Calling convention of each dblink function changes to accept
134 : * connection name as the first parameter. The connection list is
135 : * much like ecpg e.g. a mapping between a name and a PGconn object.
136 : */
137 :
138 : typedef struct remoteConnHashEnt
139 : {
140 : char name[NAMEDATALEN];
141 : remoteConn *rconn;
142 : } remoteConnHashEnt;
143 :
144 : /* initial number of connection hashes */
145 : #define NUMCONN 16
146 :
147 : static char *
148 GIC 48 : xpstrdup(const char *in)
149 ECB : {
150 GIC 48 : if (in == NULL)
151 CBC 36 : return NULL;
152 12 : return pstrdup(in);
153 ECB : }
154 :
155 : static void
156 : pg_attribute_noreturn()
157 UIC 0 : dblink_res_internalerror(PGconn *conn, PGresult *res, const char *p2)
158 EUB : {
159 UIC 0 : char *msg = pchomp(PQerrorMessage(conn));
160 EUB :
161 UNC 0 : PQclear(res);
162 UBC 0 : elog(ERROR, "%s: %s", p2, msg);
163 : }
164 :
165 : static void
166 : pg_attribute_noreturn()
167 CBC 3 : dblink_conn_not_avail(const char *conname)
168 : {
169 3 : if (conname)
170 1 : ereport(ERROR,
171 : (errcode(ERRCODE_CONNECTION_DOES_NOT_EXIST),
172 : errmsg("connection \"%s\" not available", conname)));
173 : else
174 2 : ereport(ERROR,
175 : (errcode(ERRCODE_CONNECTION_DOES_NOT_EXIST),
176 : errmsg("connection not available")));
177 : }
178 :
179 : static void
180 33 : dblink_get_conn(char *conname_or_str,
181 : PGconn *volatile *conn_p, char **conname_p, volatile bool *freeconn_p)
182 : {
183 33 : remoteConn *rconn = getConnectionByName(conname_or_str);
184 : PGconn *conn;
185 : char *conname;
186 : bool freeconn;
187 :
188 33 : if (rconn)
189 : {
190 27 : conn = rconn->conn;
191 27 : conname = conname_or_str;
192 27 : freeconn = false;
193 : }
194 : else
195 : {
196 : const char *connstr;
197 :
198 6 : connstr = get_connect_string(conname_or_str);
199 6 : if (connstr == NULL)
200 6 : connstr = conname_or_str;
201 6 : dblink_connstr_check(connstr);
202 :
203 ECB : /* OK to make connection */
204 GNC 6 : conn = libpqsrv_connect(connstr, PG_WAIT_EXTENSION);
205 :
206 GIC 6 : if (PQstatus(conn) == CONNECTION_BAD)
207 ECB : {
208 GIC 2 : char *msg = pchomp(PQerrorMessage(conn));
209 ECB :
210 GNC 2 : libpqsrv_disconnect(conn);
211 CBC 2 : ereport(ERROR,
212 : (errcode(ERRCODE_SQLCLIENT_UNABLE_TO_ESTABLISH_SQLCONNECTION),
213 EUB : errmsg("could not establish connection"),
214 : errdetail_internal("%s", msg)));
215 : }
216 GIC 4 : dblink_security_check(conn, rconn);
217 4 : if (PQclientEncoding(conn) != GetDatabaseEncoding())
218 LBC 0 : PQsetClientEncoding(conn, GetDatabaseEncodingName());
219 GIC 4 : freeconn = true;
220 CBC 4 : conname = NULL;
221 : }
222 ECB :
223 CBC 31 : *conn_p = conn;
224 31 : *conname_p = conname;
225 31 : *freeconn_p = freeconn;
226 GIC 31 : }
227 ECB :
228 : static PGconn *
229 GIC 17 : dblink_get_named_conn(const char *conname)
230 : {
231 17 : remoteConn *rconn = getConnectionByName(conname);
232 ECB :
233 GIC 17 : if (rconn)
234 CBC 17 : return rconn->conn;
235 :
236 LBC 0 : dblink_conn_not_avail(conname);
237 ECB : return NULL; /* keep compiler quiet */
238 : }
239 :
240 : static void
241 CBC 120 : dblink_init(void)
242 : {
243 120 : if (!pconn)
244 : {
245 3 : pconn = (remoteConn *) MemoryContextAlloc(TopMemoryContext, sizeof(remoteConn));
246 GIC 3 : pconn->conn = NULL;
247 CBC 3 : pconn->openCursorCount = 0;
248 3 : pconn->newXactForCursor = false;
249 : }
250 120 : }
251 ECB :
252 : /*
253 : * Create a persistent connection to another database
254 : */
255 CBC 9 : PG_FUNCTION_INFO_V1(dblink_connect);
256 : Datum
257 16 : dblink_connect(PG_FUNCTION_ARGS)
258 ECB : {
259 CBC 16 : char *conname_or_str = NULL;
260 GIC 16 : char *connstr = NULL;
261 16 : char *connname = NULL;
262 : char *msg;
263 CBC 16 : PGconn *conn = NULL;
264 16 : remoteConn *rconn = NULL;
265 ECB :
266 GIC 16 : dblink_init();
267 :
268 CBC 16 : if (PG_NARGS() == 2)
269 : {
270 GIC 11 : conname_or_str = text_to_cstring(PG_GETARG_TEXT_PP(1));
271 CBC 11 : connname = text_to_cstring(PG_GETARG_TEXT_PP(0));
272 : }
273 5 : else if (PG_NARGS() == 1)
274 GIC 5 : conname_or_str = text_to_cstring(PG_GETARG_TEXT_PP(0));
275 EUB :
276 GBC 16 : if (connname)
277 EUB : {
278 GBC 11 : rconn = (remoteConn *) MemoryContextAlloc(TopMemoryContext,
279 : sizeof(remoteConn));
280 11 : rconn->conn = NULL;
281 GIC 11 : rconn->openCursorCount = 0;
282 11 : rconn->newXactForCursor = false;
283 : }
284 :
285 : /* first check for valid foreign data server */
286 16 : connstr = get_connect_string(conname_or_str);
287 CBC 16 : if (connstr == NULL)
288 GIC 14 : connstr = conname_or_str;
289 :
290 ECB : /* check password in connection string if not superuser */
291 GBC 16 : dblink_connstr_check(connstr);
292 :
293 ECB : /* OK to make connection */
294 GNC 15 : conn = libpqsrv_connect(connstr, PG_WAIT_EXTENSION);
295 ECB :
296 GIC 15 : if (PQstatus(conn) == CONNECTION_BAD)
297 ECB : {
298 UIC 0 : msg = pchomp(PQerrorMessage(conn));
299 UNC 0 : libpqsrv_disconnect(conn);
300 LBC 0 : if (rconn)
301 0 : pfree(rconn);
302 ECB :
303 LBC 0 : ereport(ERROR,
304 : (errcode(ERRCODE_SQLCLIENT_UNABLE_TO_ESTABLISH_SQLCONNECTION),
305 : errmsg("could not establish connection"),
306 ECB : errdetail_internal("%s", msg)));
307 : }
308 :
309 : /* check password actually used if not superuser */
310 GIC 15 : dblink_security_check(conn, rconn);
311 ECB :
312 : /* attempt to set client encoding to match server encoding, if needed */
313 GIC 15 : if (PQclientEncoding(conn) != GetDatabaseEncoding())
314 LBC 0 : PQsetClientEncoding(conn, GetDatabaseEncodingName());
315 ECB :
316 GIC 15 : if (connname)
317 : {
318 CBC 10 : rconn->conn = conn;
319 GIC 10 : createNewConnection(connname, rconn);
320 ECB : }
321 : else
322 : {
323 GIC 5 : if (pconn->conn)
324 GNC 1 : libpqsrv_disconnect(pconn->conn);
325 CBC 5 : pconn->conn = conn;
326 : }
327 ECB :
328 GIC 14 : PG_RETURN_TEXT_P(cstring_to_text("OK"));
329 ECB : }
330 :
331 : /*
332 : * Clear a persistent connection to another database
333 : */
334 CBC 6 : PG_FUNCTION_INFO_V1(dblink_disconnect);
335 : Datum
336 13 : dblink_disconnect(PG_FUNCTION_ARGS)
337 ECB : {
338 GIC 13 : char *conname = NULL;
339 CBC 13 : remoteConn *rconn = NULL;
340 GIC 13 : PGconn *conn = NULL;
341 :
342 CBC 13 : dblink_init();
343 ECB :
344 CBC 13 : if (PG_NARGS() == 1)
345 : {
346 9 : conname = text_to_cstring(PG_GETARG_TEXT_PP(0));
347 GIC 9 : rconn = getConnectionByName(conname);
348 9 : if (rconn)
349 CBC 8 : conn = rconn->conn;
350 : }
351 ECB : else
352 CBC 4 : conn = pconn->conn;
353 ECB :
354 CBC 13 : if (!conn)
355 GIC 1 : dblink_conn_not_avail(conname);
356 :
357 GNC 12 : libpqsrv_disconnect(conn);
358 CBC 12 : if (rconn)
359 ECB : {
360 CBC 8 : deleteConnection(conname);
361 GIC 8 : pfree(rconn);
362 : }
363 ECB : else
364 GIC 4 : pconn->conn = NULL;
365 :
366 CBC 12 : PG_RETURN_TEXT_P(cstring_to_text("OK"));
367 ECB : }
368 :
369 : /*
370 : * opens a cursor using a persistent connection
371 : */
372 GIC 9 : PG_FUNCTION_INFO_V1(dblink_open);
373 ECB : Datum
374 GBC 9 : dblink_open(PG_FUNCTION_ARGS)
375 : {
376 CBC 9 : PGresult *res = NULL;
377 : PGconn *conn;
378 GIC 9 : char *curname = NULL;
379 CBC 9 : char *sql = NULL;
380 GIC 9 : char *conname = NULL;
381 ECB : StringInfoData buf;
382 CBC 9 : remoteConn *rconn = NULL;
383 GBC 9 : bool fail = true; /* default to backward compatible behavior */
384 ECB :
385 CBC 9 : dblink_init();
386 GIC 9 : initStringInfo(&buf);
387 :
388 9 : if (PG_NARGS() == 2)
389 : {
390 : /* text,text */
391 2 : curname = text_to_cstring(PG_GETARG_TEXT_PP(0));
392 CBC 2 : sql = text_to_cstring(PG_GETARG_TEXT_PP(1));
393 GIC 2 : rconn = pconn;
394 : }
395 7 : else if (PG_NARGS() == 3)
396 ECB : {
397 : /* might be text,text,text or text,text,bool */
398 GIC 6 : if (get_fn_expr_argtype(fcinfo->flinfo, 2) == BOOLOID)
399 ECB : {
400 CBC 1 : curname = text_to_cstring(PG_GETARG_TEXT_PP(0));
401 1 : sql = text_to_cstring(PG_GETARG_TEXT_PP(1));
402 GIC 1 : fail = PG_GETARG_BOOL(2);
403 CBC 1 : rconn = pconn;
404 : }
405 ECB : else
406 : {
407 GIC 5 : conname = text_to_cstring(PG_GETARG_TEXT_PP(0));
408 CBC 5 : curname = text_to_cstring(PG_GETARG_TEXT_PP(1));
409 5 : sql = text_to_cstring(PG_GETARG_TEXT_PP(2));
410 GIC 5 : rconn = getConnectionByName(conname);
411 : }
412 : }
413 1 : else if (PG_NARGS() == 4)
414 : {
415 ECB : /* text,text,text,bool */
416 GIC 1 : conname = text_to_cstring(PG_GETARG_TEXT_PP(0));
417 CBC 1 : curname = text_to_cstring(PG_GETARG_TEXT_PP(1));
418 GIC 1 : sql = text_to_cstring(PG_GETARG_TEXT_PP(2));
419 1 : fail = PG_GETARG_BOOL(3);
420 CBC 1 : rconn = getConnectionByName(conname);
421 ECB : }
422 :
423 GIC 9 : if (!rconn || !rconn->conn)
424 LBC 0 : dblink_conn_not_avail(conname);
425 ECB :
426 GIC 9 : conn = rconn->conn;
427 ECB :
428 : /* If we are not in a transaction, start one */
429 GIC 9 : if (PQtransactionStatus(conn) == PQTRANS_IDLE)
430 ECB : {
431 GIC 7 : res = PQexec(conn, "BEGIN");
432 7 : if (PQresultStatus(res) != PGRES_COMMAND_OK)
433 UBC 0 : dblink_res_internalerror(conn, res, "begin error");
434 GBC 7 : PQclear(res);
435 GIC 7 : rconn->newXactForCursor = true;
436 ECB :
437 : /*
438 : * Since transaction state was IDLE, we force cursor count to
439 : * initially be 0. This is needed as a previous ABORT might have wiped
440 : * out our transaction without maintaining the cursor count for us.
441 : */
442 CBC 7 : rconn->openCursorCount = 0;
443 ECB : }
444 :
445 : /* if we started a transaction, increment cursor count */
446 GIC 9 : if (rconn->newXactForCursor)
447 CBC 9 : (rconn->openCursorCount)++;
448 ECB :
449 CBC 9 : appendStringInfo(&buf, "DECLARE %s CURSOR FOR %s", curname, sql);
450 GIC 9 : res = PQexec(conn, buf.data);
451 9 : if (!res || PQresultStatus(res) != PGRES_COMMAND_OK)
452 ECB : {
453 GIC 2 : dblink_res_error(conn, conname, res, fail,
454 : "while opening cursor \"%s\"", curname);
455 GBC 2 : PG_RETURN_TEXT_P(cstring_to_text("ERROR"));
456 EUB : }
457 :
458 GBC 7 : PQclear(res);
459 GIC 7 : PG_RETURN_TEXT_P(cstring_to_text("OK"));
460 : }
461 ECB :
462 EUB : /*
463 : * closes a cursor
464 ECB : */
465 GIC 6 : PG_FUNCTION_INFO_V1(dblink_close);
466 ECB : Datum
467 GIC 5 : dblink_close(PG_FUNCTION_ARGS)
468 : {
469 ECB : PGconn *conn;
470 CBC 5 : PGresult *res = NULL;
471 GIC 5 : char *curname = NULL;
472 CBC 5 : char *conname = NULL;
473 : StringInfoData buf;
474 5 : remoteConn *rconn = NULL;
475 GIC 5 : bool fail = true; /* default to backward compatible behavior */
476 :
477 CBC 5 : dblink_init();
478 GIC 5 : initStringInfo(&buf);
479 :
480 CBC 5 : if (PG_NARGS() == 1)
481 : {
482 ECB : /* text */
483 UIC 0 : curname = text_to_cstring(PG_GETARG_TEXT_PP(0));
484 0 : rconn = pconn;
485 ECB : }
486 GIC 5 : else if (PG_NARGS() == 2)
487 ECB : {
488 : /* might be text,text or text,bool */
489 CBC 5 : if (get_fn_expr_argtype(fcinfo->flinfo, 1) == BOOLOID)
490 ECB : {
491 GBC 2 : curname = text_to_cstring(PG_GETARG_TEXT_PP(0));
492 CBC 2 : fail = PG_GETARG_BOOL(1);
493 GIC 2 : rconn = pconn;
494 : }
495 : else
496 ECB : {
497 GIC 3 : conname = text_to_cstring(PG_GETARG_TEXT_PP(0));
498 3 : curname = text_to_cstring(PG_GETARG_TEXT_PP(1));
499 3 : rconn = getConnectionByName(conname);
500 : }
501 : }
502 CBC 5 : if (PG_NARGS() == 3)
503 : {
504 ECB : /* text,text,bool */
505 UIC 0 : conname = text_to_cstring(PG_GETARG_TEXT_PP(0));
506 LBC 0 : curname = text_to_cstring(PG_GETARG_TEXT_PP(1));
507 0 : fail = PG_GETARG_BOOL(2);
508 0 : rconn = getConnectionByName(conname);
509 ECB : }
510 :
511 CBC 5 : if (!rconn || !rconn->conn)
512 LBC 0 : dblink_conn_not_avail(conname);
513 ECB :
514 GIC 5 : conn = rconn->conn;
515 ECB :
516 GIC 5 : appendStringInfo(&buf, "CLOSE %s", curname);
517 ECB :
518 : /* close the cursor */
519 CBC 5 : res = PQexec(conn, buf.data);
520 GIC 5 : if (!res || PQresultStatus(res) != PGRES_COMMAND_OK)
521 : {
522 CBC 1 : dblink_res_error(conn, conname, res, fail,
523 ECB : "while closing cursor \"%s\"", curname);
524 CBC 1 : PG_RETURN_TEXT_P(cstring_to_text("ERROR"));
525 ECB : }
526 :
527 CBC 4 : PQclear(res);
528 ECB :
529 : /* if we started a transaction, decrement cursor count */
530 GIC 4 : if (rconn->newXactForCursor)
531 ECB : {
532 GIC 4 : (rconn->openCursorCount)--;
533 :
534 ECB : /* if count is zero, commit the transaction */
535 GIC 4 : if (rconn->openCursorCount == 0)
536 ECB : {
537 CBC 2 : rconn->newXactForCursor = false;
538 ECB :
539 CBC 2 : res = PQexec(conn, "COMMIT");
540 GIC 2 : if (PQresultStatus(res) != PGRES_COMMAND_OK)
541 UIC 0 : dblink_res_internalerror(conn, res, "commit error");
542 GIC 2 : PQclear(res);
543 ECB : }
544 : }
545 :
546 GIC 4 : PG_RETURN_TEXT_P(cstring_to_text("OK"));
547 ECB : }
548 :
549 : /*
550 : * Fetch results from an open cursor
551 : */
552 CBC 9 : PG_FUNCTION_INFO_V1(dblink_fetch);
553 : Datum
554 GIC 13 : dblink_fetch(PG_FUNCTION_ARGS)
555 ECB : {
556 CBC 13 : PGresult *res = NULL;
557 13 : char *conname = NULL;
558 GIC 13 : remoteConn *rconn = NULL;
559 13 : PGconn *conn = NULL;
560 ECB : StringInfoData buf;
561 GBC 13 : char *curname = NULL;
562 GIC 13 : int howmany = 0;
563 CBC 13 : bool fail = true; /* default to backward compatible */
564 ECB :
565 GIC 13 : prepTuplestoreResult(fcinfo);
566 :
567 13 : dblink_init();
568 :
569 13 : if (PG_NARGS() == 4)
570 : {
571 ECB : /* text,text,int,bool */
572 CBC 1 : conname = text_to_cstring(PG_GETARG_TEXT_PP(0));
573 1 : curname = text_to_cstring(PG_GETARG_TEXT_PP(1));
574 1 : howmany = PG_GETARG_INT32(2);
575 GIC 1 : fail = PG_GETARG_BOOL(3);
576 ECB :
577 GIC 1 : rconn = getConnectionByName(conname);
578 CBC 1 : if (rconn)
579 GIC 1 : conn = rconn->conn;
580 ECB : }
581 GIC 12 : else if (PG_NARGS() == 3)
582 : {
583 EUB : /* text,text,int or text,int,bool */
584 GBC 8 : if (get_fn_expr_argtype(fcinfo->flinfo, 2) == BOOLOID)
585 : {
586 GIC 2 : curname = text_to_cstring(PG_GETARG_TEXT_PP(0));
587 2 : howmany = PG_GETARG_INT32(1);
588 2 : fail = PG_GETARG_BOOL(2);
589 CBC 2 : conn = pconn->conn;
590 ECB : }
591 : else
592 : {
593 GIC 6 : conname = text_to_cstring(PG_GETARG_TEXT_PP(0));
594 6 : curname = text_to_cstring(PG_GETARG_TEXT_PP(1));
595 6 : howmany = PG_GETARG_INT32(2);
596 ECB :
597 GIC 6 : rconn = getConnectionByName(conname);
598 CBC 6 : if (rconn)
599 GIC 6 : conn = rconn->conn;
600 ECB : }
601 : }
602 GIC 4 : else if (PG_NARGS() == 2)
603 ECB : {
604 : /* text,int */
605 CBC 4 : curname = text_to_cstring(PG_GETARG_TEXT_PP(0));
606 GIC 4 : howmany = PG_GETARG_INT32(1);
607 4 : conn = pconn->conn;
608 : }
609 :
610 13 : if (!conn)
611 LBC 0 : dblink_conn_not_avail(conname);
612 :
613 CBC 13 : initStringInfo(&buf);
614 13 : appendStringInfo(&buf, "FETCH %d FROM %s", howmany, curname);
615 :
616 : /*
617 : * Try to execute the query. Note that since libpq uses malloc, the
618 EUB : * PGresult will be long-lived even though we are still in a short-lived
619 : * memory context.
620 : */
621 CBC 13 : res = PQexec(conn, buf.data);
622 26 : if (!res ||
623 GBC 26 : (PQresultStatus(res) != PGRES_COMMAND_OK &&
624 GIC 13 : PQresultStatus(res) != PGRES_TUPLES_OK))
625 ECB : {
626 GIC 5 : dblink_res_error(conn, conname, res, fail,
627 : "while fetching from cursor \"%s\"", curname);
628 CBC 3 : return (Datum) 0;
629 : }
630 8 : else if (PQresultStatus(res) == PGRES_COMMAND_OK)
631 : {
632 ECB : /* cursor does not exist - closed already or bad name */
633 UIC 0 : PQclear(res);
634 0 : ereport(ERROR,
635 : (errcode(ERRCODE_INVALID_CURSOR_NAME),
636 ECB : errmsg("cursor \"%s\" does not exist", curname)));
637 : }
638 :
639 CBC 8 : materializeResult(fcinfo, conn, res);
640 GIC 7 : return (Datum) 0;
641 ECB : }
642 :
643 : /*
644 : * Note: this is the new preferred version of dblink
645 : */
646 GIC 11 : PG_FUNCTION_INFO_V1(dblink_record);
647 ECB : Datum
648 CBC 25 : dblink_record(PG_FUNCTION_ARGS)
649 ECB : {
650 GIC 25 : return dblink_record_internal(fcinfo, false);
651 ECB : }
652 :
653 CBC 3 : PG_FUNCTION_INFO_V1(dblink_send_query);
654 : Datum
655 GIC 6 : dblink_send_query(PG_FUNCTION_ARGS)
656 ECB : {
657 : PGconn *conn;
658 : char *sql;
659 : int retval;
660 :
661 CBC 6 : if (PG_NARGS() == 2)
662 : {
663 GIC 6 : conn = dblink_get_named_conn(text_to_cstring(PG_GETARG_TEXT_PP(0)));
664 CBC 6 : sql = text_to_cstring(PG_GETARG_TEXT_PP(1));
665 : }
666 ECB : else
667 : /* shouldn't happen */
668 LBC 0 : elog(ERROR, "wrong number of arguments");
669 :
670 : /* async query send */
671 GIC 6 : retval = PQsendQuery(conn, sql);
672 CBC 6 : if (retval != 1)
673 LBC 0 : elog(NOTICE, "could not send query: %s", pchomp(PQerrorMessage(conn)));
674 ECB :
675 GIC 6 : PG_RETURN_INT32(retval);
676 : }
677 ECB :
678 GIC 4 : PG_FUNCTION_INFO_V1(dblink_get_result);
679 : Datum
680 CBC 8 : dblink_get_result(PG_FUNCTION_ARGS)
681 ECB : {
682 GIC 8 : return dblink_record_internal(fcinfo, true);
683 : }
684 :
685 EUB : static Datum
686 GIC 33 : dblink_record_internal(FunctionCallInfo fcinfo, bool is_async)
687 : {
688 33 : PGconn *volatile conn = NULL;
689 33 : volatile bool freeconn = false;
690 ECB :
691 GIC 33 : prepTuplestoreResult(fcinfo);
692 ECB :
693 GIC 33 : dblink_init();
694 :
695 GBC 33 : PG_TRY();
696 EUB : {
697 GIC 33 : char *sql = NULL;
698 CBC 33 : char *conname = NULL;
699 GIC 33 : bool fail = true; /* default to backward compatible */
700 :
701 CBC 33 : if (!is_async)
702 : {
703 GIC 25 : if (PG_NARGS() == 3)
704 : {
705 EUB : /* text,text,bool */
706 GIC 1 : conname = text_to_cstring(PG_GETARG_TEXT_PP(0));
707 1 : sql = text_to_cstring(PG_GETARG_TEXT_PP(1));
708 CBC 1 : fail = PG_GETARG_BOOL(2);
709 1 : dblink_get_conn(conname, &conn, &conname, &freeconn);
710 : }
711 24 : else if (PG_NARGS() == 2)
712 : {
713 : /* text,text or text,bool */
714 17 : if (get_fn_expr_argtype(fcinfo->flinfo, 1) == BOOLOID)
715 : {
716 GIC 1 : sql = text_to_cstring(PG_GETARG_TEXT_PP(0));
717 1 : fail = PG_GETARG_BOOL(1);
718 1 : conn = pconn->conn;
719 ECB : }
720 : else
721 : {
722 CBC 16 : conname = text_to_cstring(PG_GETARG_TEXT_PP(0));
723 GIC 16 : sql = text_to_cstring(PG_GETARG_TEXT_PP(1));
724 CBC 16 : dblink_get_conn(conname, &conn, &conname, &freeconn);
725 ECB : }
726 : }
727 GBC 7 : else if (PG_NARGS() == 1)
728 : {
729 : /* text */
730 GIC 7 : conn = pconn->conn;
731 7 : sql = text_to_cstring(PG_GETARG_TEXT_PP(0));
732 : }
733 ECB : else
734 : /* shouldn't happen */
735 UIC 0 : elog(ERROR, "wrong number of arguments");
736 : }
737 : else /* is_async */
738 ECB : {
739 : /* get async result */
740 GIC 8 : conname = text_to_cstring(PG_GETARG_TEXT_PP(0));
741 ECB :
742 CBC 8 : if (PG_NARGS() == 2)
743 : {
744 ECB : /* text,bool */
745 UIC 0 : fail = PG_GETARG_BOOL(1);
746 LBC 0 : conn = dblink_get_named_conn(conname);
747 : }
748 GIC 8 : else if (PG_NARGS() == 1)
749 : {
750 : /* text */
751 8 : conn = dblink_get_named_conn(conname);
752 : }
753 : else
754 : /* shouldn't happen */
755 UIC 0 : elog(ERROR, "wrong number of arguments");
756 ECB : }
757 :
758 CBC 31 : if (!conn)
759 GIC 2 : dblink_conn_not_avail(conname);
760 :
761 CBC 29 : if (!is_async)
762 EUB : {
763 : /* synchronous query, use efficient tuple collection method */
764 GIC 21 : materializeQueryResult(fcinfo, conn, conname, sql, fail);
765 ECB : }
766 EUB : else
767 : {
768 : /* async result retrieval, do it the old way */
769 GIC 8 : PGresult *res = PQgetResult(conn);
770 :
771 ECB : /* NULL means we're all done with the async results */
772 GIC 8 : if (res)
773 : {
774 CBC 5 : if (PQresultStatus(res) != PGRES_COMMAND_OK &&
775 5 : PQresultStatus(res) != PGRES_TUPLES_OK)
776 ECB : {
777 UIC 0 : dblink_res_error(conn, conname, res, fail,
778 : "while executing query");
779 : /* if fail isn't set, we'll return an empty query result */
780 : }
781 : else
782 : {
783 GIC 5 : materializeResult(fcinfo, conn, res);
784 ECB : }
785 : }
786 : }
787 : }
788 GIC 4 : PG_FINALLY();
789 ECB : {
790 : /* if needed, close the connection to the database */
791 CBC 33 : if (freeconn)
792 GNC 3 : libpqsrv_disconnect(conn);
793 : }
794 GIC 33 : PG_END_TRY();
795 ECB :
796 GIC 29 : return (Datum) 0;
797 EUB : }
798 :
799 : /*
800 : * Verify function caller can handle a tuplestore result, and set up for that.
801 : *
802 : * Note: if the caller returns without actually creating a tuplestore, the
803 : * executor will treat the function result as an empty set.
804 : */
805 : static void
806 GBC 46 : prepTuplestoreResult(FunctionCallInfo fcinfo)
807 EUB : {
808 GIC 46 : ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
809 :
810 : /* check to see if query supports us returning a tuplestore */
811 CBC 46 : if (rsinfo == NULL || !IsA(rsinfo, ReturnSetInfo))
812 UIC 0 : ereport(ERROR,
813 ECB : (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
814 : errmsg("set-valued function called in context that cannot accept a set")));
815 GIC 46 : if (!(rsinfo->allowedModes & SFRM_Materialize))
816 LBC 0 : ereport(ERROR,
817 : (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
818 ECB : errmsg("materialize mode required, but it is not allowed in this context")));
819 :
820 : /* let the executor know we're sending back a tuplestore */
821 GBC 46 : rsinfo->returnMode = SFRM_Materialize;
822 :
823 EUB : /* caller must fill these to return a non-empty result */
824 GIC 46 : rsinfo->setResult = NULL;
825 46 : rsinfo->setDesc = NULL;
826 46 : }
827 :
828 EUB : /*
829 : * Copy the contents of the PGresult into a tuplestore to be returned
830 : * as the result of the current function.
831 : * The PGresult will be released in this function.
832 : */
833 : static void
834 GIC 13 : materializeResult(FunctionCallInfo fcinfo, PGconn *conn, PGresult *res)
835 ECB : {
836 CBC 13 : ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
837 ECB :
838 : /* prepTuplestoreResult must have been called previously */
839 GIC 13 : Assert(rsinfo->returnMode == SFRM_Materialize);
840 :
841 13 : PG_TRY();
842 : {
843 ECB : TupleDesc tupdesc;
844 EUB : bool is_sql_cmd;
845 : int ntuples;
846 : int nfields;
847 :
848 GIC 13 : if (PQresultStatus(res) == PGRES_COMMAND_OK)
849 ECB : {
850 UIC 0 : is_sql_cmd = true;
851 :
852 ECB : /*
853 : * need a tuple descriptor representing one TEXT column to return
854 : * the command status string as our result tuple
855 : */
856 UIC 0 : tupdesc = CreateTemplateTupleDesc(1);
857 0 : TupleDescInitEntry(tupdesc, (AttrNumber) 1, "status",
858 ECB : TEXTOID, -1, 0);
859 UIC 0 : ntuples = 1;
860 0 : nfields = 1;
861 ECB : }
862 : else
863 : {
864 CBC 13 : Assert(PQresultStatus(res) == PGRES_TUPLES_OK);
865 ECB :
866 CBC 13 : is_sql_cmd = false;
867 ECB :
868 : /* get a tuple descriptor for our result type */
869 GIC 13 : switch (get_call_result_type(fcinfo, NULL, &tupdesc))
870 ECB : {
871 GIC 13 : case TYPEFUNC_COMPOSITE:
872 : /* success */
873 CBC 13 : break;
874 UIC 0 : case TYPEFUNC_RECORD:
875 : /* failed to determine actual type of RECORD */
876 0 : ereport(ERROR,
877 ECB : (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
878 : errmsg("function returning record called in context "
879 : "that cannot accept type record")));
880 : break;
881 LBC 0 : default:
882 : /* result type isn't composite */
883 0 : elog(ERROR, "return type must be a row type");
884 EUB : break;
885 : }
886 ECB :
887 : /* make sure we have a persistent copy of the tupdesc */
888 GIC 13 : tupdesc = CreateTupleDescCopy(tupdesc);
889 13 : ntuples = PQntuples(res);
890 13 : nfields = PQnfields(res);
891 EUB : }
892 :
893 : /*
894 : * check result and tuple descriptor have the same number of columns
895 ECB : */
896 CBC 13 : if (nfields != tupdesc->natts)
897 UIC 0 : ereport(ERROR,
898 : (errcode(ERRCODE_DATATYPE_MISMATCH),
899 : errmsg("remote query result rowtype does not match "
900 ECB : "the specified FROM clause rowtype")));
901 :
902 GIC 13 : if (ntuples > 0)
903 ECB : {
904 : AttInMetadata *attinmeta;
905 GIC 13 : int nestlevel = -1;
906 ECB : Tuplestorestate *tupstore;
907 : MemoryContext oldcontext;
908 : int row;
909 : char **values;
910 :
911 GIC 13 : attinmeta = TupleDescGetAttInMetadata(tupdesc);
912 :
913 : /* Set GUCs to ensure we read GUC-sensitive data types correctly */
914 13 : if (!is_sql_cmd)
915 13 : nestlevel = applyRemoteGucs(conn);
916 :
917 13 : oldcontext = MemoryContextSwitchTo(rsinfo->econtext->ecxt_per_query_memory);
918 13 : tupstore = tuplestore_begin_heap(true, false, work_mem);
919 13 : rsinfo->setResult = tupstore;
920 CBC 13 : rsinfo->setDesc = tupdesc;
921 GIC 13 : MemoryContextSwitchTo(oldcontext);
922 :
923 GNC 13 : values = palloc_array(char *, nfields);
924 :
925 : /* put all tuples into the tuplestore */
926 CBC 49 : for (row = 0; row < ntuples; row++)
927 ECB : {
928 : HeapTuple tuple;
929 :
930 GIC 37 : if (!is_sql_cmd)
931 ECB : {
932 : int i;
933 :
934 GIC 138 : for (i = 0; i < nfields; i++)
935 ECB : {
936 GIC 101 : if (PQgetisnull(res, row, i))
937 UIC 0 : values[i] = NULL;
938 ECB : else
939 GIC 101 : values[i] = PQgetvalue(res, row, i);
940 : }
941 : }
942 : else
943 ECB : {
944 UIC 0 : values[0] = PQcmdStatus(res);
945 ECB : }
946 :
947 : /* build the tuple and put it into the tuplestore. */
948 CBC 37 : tuple = BuildTupleFromCStrings(attinmeta, values);
949 GIC 36 : tuplestore_puttuple(tupstore, tuple);
950 : }
951 :
952 : /* clean up GUC settings, if we changed any */
953 CBC 12 : restoreLocalGucs(nestlevel);
954 : }
955 ECB : }
956 CBC 1 : PG_FINALLY();
957 : {
958 : /* be sure to release the libpq result */
959 GIC 13 : PQclear(res);
960 ECB : }
961 GIC 13 : PG_END_TRY();
962 12 : }
963 :
964 : /*
965 : * Execute the given SQL command and store its results into a tuplestore
966 : * to be returned as the result of the current function.
967 : *
968 : * This is equivalent to PQexec followed by materializeResult, but we make
969 : * use of libpq's single-row mode to avoid accumulating the whole result
970 : * inside libpq before it gets transferred to the tuplestore.
971 : */
972 : static void
973 21 : materializeQueryResult(FunctionCallInfo fcinfo,
974 : PGconn *conn,
975 : const char *conname,
976 : const char *sql,
977 EUB : bool fail)
978 : {
979 GIC 21 : ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
980 GBC 21 : PGresult *volatile res = NULL;
981 GIC 21 : volatile storeInfo sinfo = {0};
982 EUB :
983 : /* prepTuplestoreResult must have been called previously */
984 GBC 21 : Assert(rsinfo->returnMode == SFRM_Materialize);
985 EUB :
986 GBC 21 : sinfo.fcinfo = fcinfo;
987 :
988 21 : PG_TRY();
989 : {
990 : /* Create short-lived memory context for data conversions */
991 21 : sinfo.tmpcontext = AllocSetContextCreate(CurrentMemoryContext,
992 EUB : "dblink temporary context",
993 : ALLOCSET_DEFAULT_SIZES);
994 :
995 : /* execute query, collecting any tuples into the tuplestore */
996 GIC 21 : res = storeQueryResult(&sinfo, conn, sql);
997 :
998 21 : if (!res ||
999 CBC 21 : (PQresultStatus(res) != PGRES_COMMAND_OK &&
1000 GIC 21 : PQresultStatus(res) != PGRES_TUPLES_OK))
1001 CBC 2 : {
1002 : /*
1003 ECB : * dblink_res_error will clear the passed PGresult, so we need
1004 : * this ugly dance to avoid doing so twice during error exit
1005 : */
1006 GIC 2 : PGresult *res1 = res;
1007 :
1008 CBC 2 : res = NULL;
1009 2 : dblink_res_error(conn, conname, res1, fail,
1010 ECB : "while executing query");
1011 : /* if fail isn't set, we'll return an empty query result */
1012 : }
1013 CBC 19 : else if (PQresultStatus(res) == PGRES_COMMAND_OK)
1014 ECB : {
1015 : /*
1016 : * storeRow didn't get called, so we need to convert the command
1017 EUB : * status string to a tuple manually
1018 : */
1019 : TupleDesc tupdesc;
1020 : AttInMetadata *attinmeta;
1021 : Tuplestorestate *tupstore;
1022 : HeapTuple tuple;
1023 : char *values[1];
1024 : MemoryContext oldcontext;
1025 :
1026 : /*
1027 : * need a tuple descriptor representing one TEXT column to return
1028 ECB : * the command status string as our result tuple
1029 : */
1030 UIC 0 : tupdesc = CreateTemplateTupleDesc(1);
1031 0 : TupleDescInitEntry(tupdesc, (AttrNumber) 1, "status",
1032 : TEXTOID, -1, 0);
1033 0 : attinmeta = TupleDescGetAttInMetadata(tupdesc);
1034 :
1035 LBC 0 : oldcontext = MemoryContextSwitchTo(rsinfo->econtext->ecxt_per_query_memory);
1036 UIC 0 : tupstore = tuplestore_begin_heap(true, false, work_mem);
1037 LBC 0 : rsinfo->setResult = tupstore;
1038 0 : rsinfo->setDesc = tupdesc;
1039 UIC 0 : MemoryContextSwitchTo(oldcontext);
1040 :
1041 LBC 0 : values[0] = PQcmdStatus(res);
1042 EUB :
1043 : /* build the tuple and put it into the tuplestore. */
1044 LBC 0 : tuple = BuildTupleFromCStrings(attinmeta, values);
1045 UBC 0 : tuplestore_puttuple(tupstore, tuple);
1046 :
1047 UIC 0 : PQclear(res);
1048 0 : res = NULL;
1049 ECB : }
1050 : else
1051 : {
1052 CBC 19 : Assert(PQresultStatus(res) == PGRES_TUPLES_OK);
1053 ECB : /* storeRow should have created a tuplestore */
1054 GIC 19 : Assert(rsinfo->setResult != NULL);
1055 ECB :
1056 GIC 19 : PQclear(res);
1057 19 : res = NULL;
1058 : }
1059 :
1060 : /* clean up data conversion short-lived memory context */
1061 21 : if (sinfo.tmpcontext != NULL)
1062 21 : MemoryContextDelete(sinfo.tmpcontext);
1063 21 : sinfo.tmpcontext = NULL;
1064 ECB :
1065 CBC 21 : PQclear(sinfo.last_res);
1066 GIC 21 : sinfo.last_res = NULL;
1067 CBC 21 : PQclear(sinfo.cur_res);
1068 GIC 21 : sinfo.cur_res = NULL;
1069 ECB : }
1070 LBC 0 : PG_CATCH();
1071 ECB : {
1072 : /* be sure to release any libpq result we collected */
1073 UIC 0 : PQclear(res);
1074 0 : PQclear(sinfo.last_res);
1075 0 : PQclear(sinfo.cur_res);
1076 ECB : /* and clear out any pending data in libpq */
1077 UBC 0 : while ((res = PQgetResult(conn)) != NULL)
1078 UIC 0 : PQclear(res);
1079 0 : PG_RE_THROW();
1080 ECB : }
1081 CBC 21 : PG_END_TRY();
1082 21 : }
1083 ECB :
1084 : /*
1085 : * Execute query, and send any result rows to sinfo->tuplestore.
1086 : */
1087 : static PGresult *
1088 CBC 21 : storeQueryResult(volatile storeInfo *sinfo, PGconn *conn, const char *sql)
1089 : {
1090 GIC 21 : bool first = true;
1091 CBC 21 : int nestlevel = -1;
1092 ECB : PGresult *res;
1093 :
1094 GIC 21 : if (!PQsendQuery(conn, sql))
1095 UIC 0 : elog(ERROR, "could not send query: %s", pchomp(PQerrorMessage(conn)));
1096 :
1097 GIC 21 : if (!PQsetSingleRowMode(conn)) /* shouldn't fail */
1098 UIC 0 : elog(ERROR, "failed to set single-row mode for dblink query");
1099 :
1100 : for (;;)
1101 : {
1102 GIC 174 : CHECK_FOR_INTERRUPTS();
1103 ECB :
1104 GIC 174 : sinfo->cur_res = PQgetResult(conn);
1105 CBC 174 : if (!sinfo->cur_res)
1106 GIC 21 : break;
1107 :
1108 153 : if (PQresultStatus(sinfo->cur_res) == PGRES_SINGLE_TUPLE)
1109 : {
1110 ECB : /* got one row from possibly-bigger resultset */
1111 :
1112 : /*
1113 : * Set GUCs to ensure we read GUC-sensitive data types correctly.
1114 : * We shouldn't do this until we have a row in hand, to ensure
1115 : * libpq has seen any earlier ParameterStatus protocol messages.
1116 : */
1117 GIC 132 : if (first && nestlevel < 0)
1118 19 : nestlevel = applyRemoteGucs(conn);
1119 :
1120 132 : storeRow(sinfo, sinfo->cur_res, first);
1121 ECB :
1122 GBC 132 : PQclear(sinfo->cur_res);
1123 CBC 132 : sinfo->cur_res = NULL;
1124 GIC 132 : first = false;
1125 : }
1126 ECB : else
1127 : {
1128 : /* if empty resultset, fill tuplestore header */
1129 GIC 21 : if (first && PQresultStatus(sinfo->cur_res) == PGRES_TUPLES_OK)
1130 LBC 0 : storeRow(sinfo, sinfo->cur_res, first);
1131 EUB :
1132 : /* store completed result at last_res */
1133 GBC 21 : PQclear(sinfo->last_res);
1134 GIC 21 : sinfo->last_res = sinfo->cur_res;
1135 21 : sinfo->cur_res = NULL;
1136 21 : first = true;
1137 : }
1138 EUB : }
1139 :
1140 : /* clean up GUC settings, if we changed any */
1141 GIC 21 : restoreLocalGucs(nestlevel);
1142 :
1143 : /* return last_res */
1144 21 : res = sinfo->last_res;
1145 CBC 21 : sinfo->last_res = NULL;
1146 GIC 21 : return res;
1147 : }
1148 ECB :
1149 EUB : /*
1150 : * Send single row to sinfo->tuplestore.
1151 : *
1152 : * If "first" is true, create the tuplestore using PGresult's metadata
1153 : * (in this case the PGresult might contain either zero or one row).
1154 : */
1155 ECB : static void
1156 GIC 132 : storeRow(volatile storeInfo *sinfo, PGresult *res, bool first)
1157 : {
1158 CBC 132 : int nfields = PQnfields(res);
1159 ECB : HeapTuple tuple;
1160 : int i;
1161 : MemoryContext oldcontext;
1162 :
1163 GIC 132 : if (first)
1164 : {
1165 ECB : /* Prepare for new result set */
1166 GBC 19 : ReturnSetInfo *rsinfo = (ReturnSetInfo *) sinfo->fcinfo->resultinfo;
1167 : TupleDesc tupdesc;
1168 :
1169 : /*
1170 : * It's possible to get more than one result set if the query string
1171 : * contained multiple SQL commands. In that case, we follow PQexec's
1172 ECB : * traditional behavior of throwing away all but the last result.
1173 EUB : */
1174 CBC 19 : if (sinfo->tuplestore)
1175 UIC 0 : tuplestore_end(sinfo->tuplestore);
1176 GIC 19 : sinfo->tuplestore = NULL;
1177 :
1178 ECB : /* get a tuple descriptor for our result type */
1179 GIC 19 : switch (get_call_result_type(sinfo->fcinfo, NULL, &tupdesc))
1180 : {
1181 19 : case TYPEFUNC_COMPOSITE:
1182 : /* success */
1183 19 : break;
1184 UIC 0 : case TYPEFUNC_RECORD:
1185 ECB : /* failed to determine actual type of RECORD */
1186 UIC 0 : ereport(ERROR,
1187 : (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1188 : errmsg("function returning record called in context "
1189 : "that cannot accept type record")));
1190 ECB : break;
1191 UIC 0 : default:
1192 ECB : /* result type isn't composite */
1193 UBC 0 : elog(ERROR, "return type must be a row type");
1194 : break;
1195 ECB : }
1196 :
1197 : /* make sure we have a persistent copy of the tupdesc */
1198 GIC 19 : tupdesc = CreateTupleDescCopy(tupdesc);
1199 ECB :
1200 : /* check result and tuple descriptor have the same number of columns */
1201 CBC 19 : if (nfields != tupdesc->natts)
1202 UIC 0 : ereport(ERROR,
1203 : (errcode(ERRCODE_DATATYPE_MISMATCH),
1204 ECB : errmsg("remote query result rowtype does not match "
1205 : "the specified FROM clause rowtype")));
1206 :
1207 : /* Prepare attinmeta for later data conversions */
1208 GIC 19 : sinfo->attinmeta = TupleDescGetAttInMetadata(tupdesc);
1209 :
1210 : /* Create a new, empty tuplestore */
1211 19 : oldcontext = MemoryContextSwitchTo(rsinfo->econtext->ecxt_per_query_memory);
1212 19 : sinfo->tuplestore = tuplestore_begin_heap(true, false, work_mem);
1213 CBC 19 : rsinfo->setResult = sinfo->tuplestore;
1214 GIC 19 : rsinfo->setDesc = tupdesc;
1215 CBC 19 : MemoryContextSwitchTo(oldcontext);
1216 :
1217 : /* Done if empty resultset */
1218 GIC 19 : if (PQntuples(res) == 0)
1219 LBC 0 : return;
1220 :
1221 ECB : /*
1222 : * Set up sufficiently-wide string pointers array; this won't change
1223 : * in size so it's easy to preallocate.
1224 : */
1225 GIC 19 : if (sinfo->cstrs)
1226 UIC 0 : pfree(sinfo->cstrs);
1227 GNC 19 : sinfo->cstrs = palloc_array(char *, nfields);
1228 ECB : }
1229 :
1230 : /* Should have a single-row result if we get here */
1231 GIC 132 : Assert(PQntuples(res) == 1);
1232 :
1233 ECB : /*
1234 : * Do the following work in a temp context that we reset after each tuple.
1235 : * This cleans up not only the data we have direct access to, but any
1236 : * cruft the I/O functions might leak.
1237 EUB : */
1238 GIC 132 : oldcontext = MemoryContextSwitchTo(sinfo->tmpcontext);
1239 :
1240 : /*
1241 : * Fill cstrs with null-terminated strings of column values.
1242 : */
1243 510 : for (i = 0; i < nfields; i++)
1244 : {
1245 378 : if (PQgetisnull(res, 0, i))
1246 UIC 0 : sinfo->cstrs[i] = NULL;
1247 : else
1248 CBC 378 : sinfo->cstrs[i] = PQgetvalue(res, 0, i);
1249 : }
1250 ECB :
1251 : /* Convert row to a tuple, and add it to the tuplestore */
1252 GIC 132 : tuple = BuildTupleFromCStrings(sinfo->attinmeta, sinfo->cstrs);
1253 :
1254 CBC 132 : tuplestore_puttuple(sinfo->tuplestore, tuple);
1255 ECB :
1256 : /* Clean up */
1257 CBC 132 : MemoryContextSwitchTo(oldcontext);
1258 132 : MemoryContextReset(sinfo->tmpcontext);
1259 : }
1260 :
1261 : /*
1262 : * List all open dblink connections by name.
1263 : * Returns an array of all connection names.
1264 : * Takes no params
1265 : */
1266 GIC 2 : PG_FUNCTION_INFO_V1(dblink_get_connections);
1267 : Datum
1268 1 : dblink_get_connections(PG_FUNCTION_ARGS)
1269 : {
1270 : HASH_SEQ_STATUS status;
1271 : remoteConnHashEnt *hentry;
1272 CBC 1 : ArrayBuildState *astate = NULL;
1273 :
1274 1 : if (remoteConnHash)
1275 : {
1276 GIC 1 : hash_seq_init(&status, remoteConnHash);
1277 4 : while ((hentry = (remoteConnHashEnt *) hash_seq_search(&status)) != NULL)
1278 : {
1279 : /* stash away current value */
1280 3 : astate = accumArrayResult(astate,
1281 CBC 3 : CStringGetTextDatum(hentry->name),
1282 ECB : false, TEXTOID, CurrentMemoryContext);
1283 : }
1284 : }
1285 :
1286 CBC 1 : if (astate)
1287 GNC 1 : PG_RETURN_DATUM(makeArrayResult(astate,
1288 ECB : CurrentMemoryContext));
1289 : else
1290 UIC 0 : PG_RETURN_NULL();
1291 EUB : }
1292 :
1293 : /*
1294 : * Checks if a given remote connection is busy
1295 : *
1296 : * Returns 1 if the connection is busy, 0 otherwise
1297 : * Params:
1298 : * text connection_name - name of the connection to check
1299 : *
1300 : */
1301 GIC 2 : PG_FUNCTION_INFO_V1(dblink_is_busy);
1302 : Datum
1303 1 : dblink_is_busy(PG_FUNCTION_ARGS)
1304 : {
1305 ECB : PGconn *conn;
1306 :
1307 CBC 1 : dblink_init();
1308 GIC 1 : conn = dblink_get_named_conn(text_to_cstring(PG_GETARG_TEXT_PP(0)));
1309 :
1310 1 : PQconsumeInput(conn);
1311 1 : PG_RETURN_INT32(PQisBusy(conn));
1312 ECB : }
1313 :
1314 : /*
1315 : * Cancels a running request on a connection
1316 : *
1317 : * Returns text:
1318 : * "OK" if the cancel request has been sent correctly,
1319 EUB : * an error message otherwise
1320 : *
1321 : * Params:
1322 : * text connection_name - name of the connection to check
1323 : *
1324 : */
1325 CBC 2 : PG_FUNCTION_INFO_V1(dblink_cancel_query);
1326 : Datum
1327 1 : dblink_cancel_query(PG_FUNCTION_ARGS)
1328 : {
1329 ECB : int res;
1330 : PGconn *conn;
1331 : PGcancel *cancel;
1332 : char errbuf[256];
1333 :
1334 GIC 1 : dblink_init();
1335 CBC 1 : conn = dblink_get_named_conn(text_to_cstring(PG_GETARG_TEXT_PP(0)));
1336 GIC 1 : cancel = PQgetCancel(conn);
1337 ECB :
1338 CBC 1 : res = PQcancel(cancel, errbuf, 256);
1339 1 : PQfreeCancel(cancel);
1340 ECB :
1341 GIC 1 : if (res == 1)
1342 CBC 1 : PG_RETURN_TEXT_P(cstring_to_text("OK"));
1343 : else
1344 UIC 0 : PG_RETURN_TEXT_P(cstring_to_text(errbuf));
1345 EUB : }
1346 :
1347 :
1348 : /*
1349 : * Get error message from a connection
1350 ECB : *
1351 : * Returns text:
1352 : * "OK" if no error, an error message otherwise
1353 : *
1354 : * Params:
1355 : * text connection_name - name of the connection to check
1356 : *
1357 : */
1358 GIC 2 : PG_FUNCTION_INFO_V1(dblink_error_message);
1359 : Datum
1360 1 : dblink_error_message(PG_FUNCTION_ARGS)
1361 ECB : {
1362 : char *msg;
1363 : PGconn *conn;
1364 :
1365 GIC 1 : dblink_init();
1366 CBC 1 : conn = dblink_get_named_conn(text_to_cstring(PG_GETARG_TEXT_PP(0)));
1367 :
1368 GIC 1 : msg = PQerrorMessage(conn);
1369 CBC 1 : if (msg == NULL || msg[0] == '\0')
1370 1 : PG_RETURN_TEXT_P(cstring_to_text("OK"));
1371 : else
1372 UIC 0 : PG_RETURN_TEXT_P(cstring_to_text(pchomp(msg)));
1373 : }
1374 EUB :
1375 : /*
1376 ECB : * Execute an SQL non-SELECT command
1377 EUB : */
1378 GIC 9 : PG_FUNCTION_INFO_V1(dblink_exec);
1379 ECB : Datum
1380 CBC 26 : dblink_exec(PG_FUNCTION_ARGS)
1381 ECB : {
1382 CBC 26 : text *volatile sql_cmd_status = NULL;
1383 GIC 26 : PGconn *volatile conn = NULL;
1384 CBC 26 : volatile bool freeconn = false;
1385 :
1386 GIC 26 : dblink_init();
1387 :
1388 26 : PG_TRY();
1389 : {
1390 26 : PGresult *res = NULL;
1391 CBC 26 : char *sql = NULL;
1392 GIC 26 : char *conname = NULL;
1393 CBC 26 : bool fail = true; /* default to backward compatible behavior */
1394 :
1395 GIC 26 : if (PG_NARGS() == 3)
1396 : {
1397 : /* must be text,text,bool */
1398 UIC 0 : conname = text_to_cstring(PG_GETARG_TEXT_PP(0));
1399 LBC 0 : sql = text_to_cstring(PG_GETARG_TEXT_PP(1));
1400 0 : fail = PG_GETARG_BOOL(2);
1401 UIC 0 : dblink_get_conn(conname, &conn, &conname, &freeconn);
1402 : }
1403 GIC 26 : else if (PG_NARGS() == 2)
1404 EUB : {
1405 : /* might be text,text or text,bool */
1406 GIC 17 : if (get_fn_expr_argtype(fcinfo->flinfo, 1) == BOOLOID)
1407 : {
1408 1 : sql = text_to_cstring(PG_GETARG_TEXT_PP(0));
1409 1 : fail = PG_GETARG_BOOL(1);
1410 CBC 1 : conn = pconn->conn;
1411 : }
1412 : else
1413 ECB : {
1414 CBC 16 : conname = text_to_cstring(PG_GETARG_TEXT_PP(0));
1415 GIC 16 : sql = text_to_cstring(PG_GETARG_TEXT_PP(1));
1416 CBC 16 : dblink_get_conn(conname, &conn, &conname, &freeconn);
1417 : }
1418 ECB : }
1419 GIC 9 : else if (PG_NARGS() == 1)
1420 : {
1421 : /* must be single text argument */
1422 9 : conn = pconn->conn;
1423 9 : sql = text_to_cstring(PG_GETARG_TEXT_PP(0));
1424 : }
1425 : else
1426 : /* shouldn't happen */
1427 UIC 0 : elog(ERROR, "wrong number of arguments");
1428 ECB :
1429 GIC 26 : if (!conn)
1430 LBC 0 : dblink_conn_not_avail(conname);
1431 :
1432 GIC 26 : res = PQexec(conn, sql);
1433 26 : if (!res ||
1434 26 : (PQresultStatus(res) != PGRES_COMMAND_OK &&
1435 2 : PQresultStatus(res) != PGRES_TUPLES_OK))
1436 : {
1437 2 : dblink_res_error(conn, conname, res, fail,
1438 : "while executing command");
1439 :
1440 : /*
1441 ECB : * and save a copy of the command status string to return as our
1442 : * result tuple
1443 : */
1444 GIC 1 : sql_cmd_status = cstring_to_text("ERROR");
1445 : }
1446 24 : else if (PQresultStatus(res) == PGRES_COMMAND_OK)
1447 ECB : {
1448 : /*
1449 : * and save a copy of the command status string to return as our
1450 : * result tuple
1451 : */
1452 CBC 24 : sql_cmd_status = cstring_to_text(PQcmdStatus(res));
1453 GIC 24 : PQclear(res);
1454 : }
1455 ECB : else
1456 : {
1457 UIC 0 : PQclear(res);
1458 LBC 0 : ereport(ERROR,
1459 : (errcode(ERRCODE_S_R_E_PROHIBITED_SQL_STATEMENT_ATTEMPTED),
1460 ECB : errmsg("statement returning results not allowed")));
1461 : }
1462 : }
1463 GIC 1 : PG_FINALLY();
1464 : {
1465 ECB : /* if needed, close the connection to the database */
1466 CBC 26 : if (freeconn)
1467 GNC 1 : libpqsrv_disconnect(conn);
1468 : }
1469 GIC 26 : PG_END_TRY();
1470 :
1471 25 : PG_RETURN_TEXT_P(sql_cmd_status);
1472 ECB : }
1473 :
1474 :
1475 : /*
1476 : * dblink_get_pkey
1477 : *
1478 : * Return list of primary key fields for the supplied relation,
1479 : * or NULL if none exists.
1480 : */
1481 GIC 2 : PG_FUNCTION_INFO_V1(dblink_get_pkey);
1482 : Datum
1483 9 : dblink_get_pkey(PG_FUNCTION_ARGS)
1484 : {
1485 EUB : int16 indnkeyatts;
1486 : char **results;
1487 : FuncCallContext *funcctx;
1488 : int32 call_cntr;
1489 ECB : int32 max_calls;
1490 : AttInMetadata *attinmeta;
1491 : MemoryContext oldcontext;
1492 :
1493 : /* stuff done only on the first call of the function */
1494 GIC 9 : if (SRF_IS_FIRSTCALL())
1495 : {
1496 : Relation rel;
1497 : TupleDesc tupdesc;
1498 ECB :
1499 : /* create a function context for cross-call persistence */
1500 GIC 3 : funcctx = SRF_FIRSTCALL_INIT();
1501 ECB :
1502 : /*
1503 : * switch to memory context appropriate for multiple function calls
1504 : */
1505 GIC 3 : oldcontext = MemoryContextSwitchTo(funcctx->multi_call_memory_ctx);
1506 :
1507 : /* open target relation */
1508 3 : rel = get_rel_from_relname(PG_GETARG_TEXT_PP(0), AccessShareLock, ACL_SELECT);
1509 :
1510 ECB : /* get the array of attnums */
1511 CBC 3 : results = get_pkey_attnames(rel, &indnkeyatts);
1512 ECB :
1513 GIC 3 : relation_close(rel, AccessShareLock);
1514 :
1515 ECB : /*
1516 : * need a tuple descriptor representing one INT and one TEXT column
1517 : */
1518 CBC 3 : tupdesc = CreateTemplateTupleDesc(2);
1519 GIC 3 : TupleDescInitEntry(tupdesc, (AttrNumber) 1, "position",
1520 ECB : INT4OID, -1, 0);
1521 GIC 3 : TupleDescInitEntry(tupdesc, (AttrNumber) 2, "colname",
1522 : TEXTOID, -1, 0);
1523 :
1524 : /*
1525 ECB : * Generate attribute metadata needed later to produce tuples from raw
1526 : * C strings
1527 : */
1528 GIC 3 : attinmeta = TupleDescGetAttInMetadata(tupdesc);
1529 3 : funcctx->attinmeta = attinmeta;
1530 :
1531 3 : if ((results != NULL) && (indnkeyatts > 0))
1532 : {
1533 3 : funcctx->max_calls = indnkeyatts;
1534 :
1535 : /* got results, keep track of them */
1536 3 : funcctx->user_fctx = results;
1537 : }
1538 : else
1539 : {
1540 : /* fast track when no results */
1541 UIC 0 : MemoryContextSwitchTo(oldcontext);
1542 0 : SRF_RETURN_DONE(funcctx);
1543 : }
1544 :
1545 GIC 3 : MemoryContextSwitchTo(oldcontext);
1546 : }
1547 :
1548 : /* stuff done on every call of the function */
1549 CBC 9 : funcctx = SRF_PERCALL_SETUP();
1550 :
1551 ECB : /*
1552 : * initialize per-call variables
1553 : */
1554 CBC 9 : call_cntr = funcctx->call_cntr;
1555 9 : max_calls = funcctx->max_calls;
1556 ECB :
1557 CBC 9 : results = (char **) funcctx->user_fctx;
1558 GIC 9 : attinmeta = funcctx->attinmeta;
1559 :
1560 9 : if (call_cntr < max_calls) /* do when there is more left to send */
1561 : {
1562 : char **values;
1563 : HeapTuple tuple;
1564 : Datum result;
1565 :
1566 GNC 6 : values = palloc_array(char *, 2);
1567 GIC 6 : values[0] = psprintf("%d", call_cntr + 1);
1568 6 : values[1] = results[call_cntr];
1569 :
1570 ECB : /* build the tuple */
1571 GIC 6 : tuple = BuildTupleFromCStrings(attinmeta, values);
1572 :
1573 : /* make the tuple into a datum */
1574 6 : result = HeapTupleGetDatum(tuple);
1575 ECB :
1576 GIC 6 : SRF_RETURN_NEXT(funcctx, result);
1577 : }
1578 : else
1579 : {
1580 : /* do when there is no more left */
1581 3 : SRF_RETURN_DONE(funcctx);
1582 ECB : }
1583 : }
1584 :
1585 :
1586 : /*
1587 : * dblink_build_sql_insert
1588 EUB : *
1589 : * Used to generate an SQL insert statement
1590 : * based on an existing tuple in a local relation.
1591 : * This is useful for selectively replicating data
1592 : * to another server via dblink.
1593 : *
1594 : * API:
1595 : * <relname> - name of local table of interest
1596 ECB : * <pkattnums> - an int2vector of attnums which will be used
1597 : * to identify the local tuple of interest
1598 : * <pknumatts> - number of attnums in pkattnums
1599 : * <src_pkattvals_arry> - text array of key values which will be used
1600 : * to identify the local tuple of interest
1601 : * <tgt_pkattvals_arry> - text array of key values which will be used
1602 EUB : * to build the string for execution remotely. These are substituted
1603 : * for their counterparts in src_pkattvals_arry
1604 : */
1605 GIC 3 : PG_FUNCTION_INFO_V1(dblink_build_sql_insert);
1606 : Datum
1607 6 : dblink_build_sql_insert(PG_FUNCTION_ARGS)
1608 : {
1609 CBC 6 : text *relname_text = PG_GETARG_TEXT_PP(0);
1610 GIC 6 : int2vector *pkattnums_arg = (int2vector *) PG_GETARG_POINTER(1);
1611 6 : int32 pknumatts_arg = PG_GETARG_INT32(2);
1612 6 : ArrayType *src_pkattvals_arry = PG_GETARG_ARRAYTYPE_P(3);
1613 6 : ArrayType *tgt_pkattvals_arry = PG_GETARG_ARRAYTYPE_P(4);
1614 ECB : Relation rel;
1615 : int *pkattnums;
1616 : int pknumatts;
1617 : char **src_pkattvals;
1618 : char **tgt_pkattvals;
1619 : int src_nitems;
1620 : int tgt_nitems;
1621 : char *sql;
1622 :
1623 : /*
1624 : * Open target relation.
1625 : */
1626 GIC 6 : rel = get_rel_from_relname(relname_text, AccessShareLock, ACL_SELECT);
1627 :
1628 : /*
1629 : * Process pkattnums argument.
1630 : */
1631 6 : validate_pkattnums(rel, pkattnums_arg, pknumatts_arg,
1632 : &pkattnums, &pknumatts);
1633 :
1634 : /*
1635 : * Source array is made up of key values that will be used to locate the
1636 : * tuple of interest from the local system.
1637 : */
1638 CBC 4 : src_pkattvals = get_text_array_contents(src_pkattvals_arry, &src_nitems);
1639 :
1640 ECB : /*
1641 : * There should be one source array key value for each key attnum
1642 : */
1643 CBC 4 : if (src_nitems != pknumatts)
1644 LBC 0 : ereport(ERROR,
1645 ECB : (errcode(ERRCODE_ARRAY_SUBSCRIPT_ERROR),
1646 : errmsg("source key array length must match number of key attributes")));
1647 :
1648 : /*
1649 : * Target array is made up of key values that will be used to build the
1650 : * SQL string for use on the remote system.
1651 : */
1652 GIC 4 : tgt_pkattvals = get_text_array_contents(tgt_pkattvals_arry, &tgt_nitems);
1653 :
1654 : /*
1655 : * There should be one target array key value for each key attnum
1656 ECB : */
1657 GIC 4 : if (tgt_nitems != pknumatts)
1658 UIC 0 : ereport(ERROR,
1659 : (errcode(ERRCODE_ARRAY_SUBSCRIPT_ERROR),
1660 : errmsg("target key array length must match number of key attributes")));
1661 ECB :
1662 : /*
1663 : * Prep work is finally done. Go get the SQL string.
1664 : */
1665 GIC 4 : sql = get_sql_insert(rel, pkattnums, pknumatts, src_pkattvals, tgt_pkattvals);
1666 :
1667 : /*
1668 ECB : * Now we can close the relation.
1669 : */
1670 GIC 4 : relation_close(rel, AccessShareLock);
1671 :
1672 : /*
1673 ECB : * And send it
1674 EUB : */
1675 GIC 4 : PG_RETURN_TEXT_P(cstring_to_text(sql));
1676 : }
1677 :
1678 :
1679 : /*
1680 : * dblink_build_sql_delete
1681 ECB : *
1682 : * Used to generate an SQL delete statement.
1683 : * This is useful for selectively replicating a
1684 : * delete to another server via dblink.
1685 : *
1686 : * API:
1687 : * <relname> - name of remote table of interest
1688 : * <pkattnums> - an int2vector of attnums which will be used
1689 : * to identify the remote tuple of interest
1690 : * <pknumatts> - number of attnums in pkattnums
1691 : * <tgt_pkattvals_arry> - text array of key values which will be used
1692 : * to build the string for execution remotely.
1693 : */
1694 GIC 3 : PG_FUNCTION_INFO_V1(dblink_build_sql_delete);
1695 : Datum
1696 6 : dblink_build_sql_delete(PG_FUNCTION_ARGS)
1697 : {
1698 6 : text *relname_text = PG_GETARG_TEXT_PP(0);
1699 6 : int2vector *pkattnums_arg = (int2vector *) PG_GETARG_POINTER(1);
1700 6 : int32 pknumatts_arg = PG_GETARG_INT32(2);
1701 6 : ArrayType *tgt_pkattvals_arry = PG_GETARG_ARRAYTYPE_P(3);
1702 : Relation rel;
1703 : int *pkattnums;
1704 : int pknumatts;
1705 : char **tgt_pkattvals;
1706 : int tgt_nitems;
1707 : char *sql;
1708 :
1709 : /*
1710 : * Open target relation.
1711 : */
1712 6 : rel = get_rel_from_relname(relname_text, AccessShareLock, ACL_SELECT);
1713 :
1714 ECB : /*
1715 : * Process pkattnums argument.
1716 : */
1717 GIC 6 : validate_pkattnums(rel, pkattnums_arg, pknumatts_arg,
1718 ECB : &pkattnums, &pknumatts);
1719 :
1720 : /*
1721 : * Target array is made up of key values that will be used to build the
1722 : * SQL string for use on the remote system.
1723 : */
1724 GIC 4 : tgt_pkattvals = get_text_array_contents(tgt_pkattvals_arry, &tgt_nitems);
1725 :
1726 : /*
1727 : * There should be one target array key value for each key attnum
1728 : */
1729 4 : if (tgt_nitems != pknumatts)
1730 UIC 0 : ereport(ERROR,
1731 : (errcode(ERRCODE_ARRAY_SUBSCRIPT_ERROR),
1732 : errmsg("target key array length must match number of key attributes")));
1733 :
1734 : /*
1735 ECB : * Prep work is finally done. Go get the SQL string.
1736 : */
1737 GIC 4 : sql = get_sql_delete(rel, pkattnums, pknumatts, tgt_pkattvals);
1738 :
1739 : /*
1740 ECB : * Now we can close the relation.
1741 : */
1742 GIC 4 : relation_close(rel, AccessShareLock);
1743 :
1744 : /*
1745 : * And send it
1746 : */
1747 CBC 4 : PG_RETURN_TEXT_P(cstring_to_text(sql));
1748 : }
1749 :
1750 :
1751 : /*
1752 ECB : * dblink_build_sql_update
1753 EUB : *
1754 : * Used to generate an SQL update statement
1755 : * based on an existing tuple in a local relation.
1756 : * This is useful for selectively replicating data
1757 : * to another server via dblink.
1758 : *
1759 : * API:
1760 : * <relname> - name of local table of interest
1761 ECB : * <pkattnums> - an int2vector of attnums which will be used
1762 : * to identify the local tuple of interest
1763 : * <pknumatts> - number of attnums in pkattnums
1764 : * <src_pkattvals_arry> - text array of key values which will be used
1765 : * to identify the local tuple of interest
1766 : * <tgt_pkattvals_arry> - text array of key values which will be used
1767 EUB : * to build the string for execution remotely. These are substituted
1768 : * for their counterparts in src_pkattvals_arry
1769 : */
1770 GIC 3 : PG_FUNCTION_INFO_V1(dblink_build_sql_update);
1771 : Datum
1772 6 : dblink_build_sql_update(PG_FUNCTION_ARGS)
1773 : {
1774 CBC 6 : text *relname_text = PG_GETARG_TEXT_PP(0);
1775 GIC 6 : int2vector *pkattnums_arg = (int2vector *) PG_GETARG_POINTER(1);
1776 6 : int32 pknumatts_arg = PG_GETARG_INT32(2);
1777 6 : ArrayType *src_pkattvals_arry = PG_GETARG_ARRAYTYPE_P(3);
1778 6 : ArrayType *tgt_pkattvals_arry = PG_GETARG_ARRAYTYPE_P(4);
1779 ECB : Relation rel;
1780 : int *pkattnums;
1781 : int pknumatts;
1782 : char **src_pkattvals;
1783 : char **tgt_pkattvals;
1784 : int src_nitems;
1785 : int tgt_nitems;
1786 : char *sql;
1787 :
1788 : /*
1789 : * Open target relation.
1790 : */
1791 GIC 6 : rel = get_rel_from_relname(relname_text, AccessShareLock, ACL_SELECT);
1792 :
1793 ECB : /*
1794 : * Process pkattnums argument.
1795 EUB : */
1796 GIC 6 : validate_pkattnums(rel, pkattnums_arg, pknumatts_arg,
1797 : &pkattnums, &pknumatts);
1798 EUB :
1799 : /*
1800 : * Source array is made up of key values that will be used to locate the
1801 : * tuple of interest from the local system.
1802 : */
1803 GIC 4 : src_pkattvals = get_text_array_contents(src_pkattvals_arry, &src_nitems);
1804 :
1805 : /*
1806 : * There should be one source array key value for each key attnum
1807 : */
1808 4 : if (src_nitems != pknumatts)
1809 UIC 0 : ereport(ERROR,
1810 : (errcode(ERRCODE_ARRAY_SUBSCRIPT_ERROR),
1811 ECB : errmsg("source key array length must match number of key attributes")));
1812 :
1813 : /*
1814 : * Target array is made up of key values that will be used to build the
1815 : * SQL string for use on the remote system.
1816 : */
1817 CBC 4 : tgt_pkattvals = get_text_array_contents(tgt_pkattvals_arry, &tgt_nitems);
1818 :
1819 ECB : /*
1820 : * There should be one target array key value for each key attnum
1821 EUB : */
1822 GIC 4 : if (tgt_nitems != pknumatts)
1823 LBC 0 : ereport(ERROR,
1824 : (errcode(ERRCODE_ARRAY_SUBSCRIPT_ERROR),
1825 ECB : errmsg("target key array length must match number of key attributes")));
1826 :
1827 : /*
1828 : * Prep work is finally done. Go get the SQL string.
1829 : */
1830 GIC 4 : sql = get_sql_update(rel, pkattnums, pknumatts, src_pkattvals, tgt_pkattvals);
1831 :
1832 : /*
1833 ECB : * Now we can close the relation.
1834 : */
1835 GIC 4 : relation_close(rel, AccessShareLock);
1836 ECB :
1837 : /*
1838 : * And send it
1839 EUB : */
1840 GIC 4 : PG_RETURN_TEXT_P(cstring_to_text(sql));
1841 ECB : }
1842 :
1843 : /*
1844 : * dblink_current_query
1845 : * return the current query string
1846 EUB : * to allow its use in (among other things)
1847 : * rewrite rules
1848 ECB : */
1849 GIC 1 : PG_FUNCTION_INFO_V1(dblink_current_query);
1850 ECB : Datum
1851 LBC 0 : dblink_current_query(PG_FUNCTION_ARGS)
1852 : {
1853 : /* This is now just an alias for the built-in function current_query() */
1854 0 : PG_RETURN_DATUM(current_query(fcinfo));
1855 : }
1856 :
1857 : /*
1858 : * Retrieve async notifications for a connection.
1859 : *
1860 : * Returns a setof record of notifications, or an empty set if none received.
1861 : * Can optionally take a named connection as parameter, but uses the unnamed
1862 : * connection per default.
1863 : *
1864 ECB : */
1865 : #define DBLINK_NOTIFY_COLS 3
1866 :
1867 GIC 3 : PG_FUNCTION_INFO_V1(dblink_get_notify);
1868 ECB : Datum
1869 CBC 2 : dblink_get_notify(PG_FUNCTION_ARGS)
1870 : {
1871 : PGconn *conn;
1872 : PGnotify *notify;
1873 GIC 2 : ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
1874 :
1875 2 : dblink_init();
1876 2 : if (PG_NARGS() == 1)
1877 UIC 0 : conn = dblink_get_named_conn(text_to_cstring(PG_GETARG_TEXT_PP(0)));
1878 : else
1879 GIC 2 : conn = pconn->conn;
1880 :
1881 CBC 2 : InitMaterializedSRF(fcinfo, 0);
1882 :
1883 2 : PQconsumeInput(conn);
1884 4 : while ((notify = PQnotifies(conn)) != NULL)
1885 EUB : {
1886 : Datum values[DBLINK_NOTIFY_COLS];
1887 : bool nulls[DBLINK_NOTIFY_COLS];
1888 :
1889 GIC 2 : memset(values, 0, sizeof(values));
1890 2 : memset(nulls, 0, sizeof(nulls));
1891 :
1892 CBC 2 : if (notify->relname != NULL)
1893 GIC 2 : values[0] = CStringGetTextDatum(notify->relname);
1894 ECB : else
1895 UIC 0 : nulls[0] = true;
1896 ECB :
1897 GIC 2 : values[1] = Int32GetDatum(notify->be_pid);
1898 :
1899 2 : if (notify->extra != NULL)
1900 2 : values[2] = CStringGetTextDatum(notify->extra);
1901 : else
1902 UIC 0 : nulls[2] = true;
1903 :
1904 GIC 2 : tuplestore_putvalues(rsinfo->setResult, rsinfo->setDesc, values, nulls);
1905 :
1906 CBC 2 : PQfreemem(notify);
1907 GIC 2 : PQconsumeInput(conn);
1908 ECB : }
1909 :
1910 GIC 2 : return (Datum) 0;
1911 ECB : }
1912 :
1913 : /*
1914 : * Validate the options given to a dblink foreign server or user mapping.
1915 : * Raise an error if any option is invalid.
1916 : *
1917 : * We just check the names of options here, so semantic errors in options,
1918 : * such as invalid numeric format, will be detected at the attempt to connect.
1919 : */
1920 GIC 3 : PG_FUNCTION_INFO_V1(dblink_fdw_validator);
1921 : Datum
1922 5 : dblink_fdw_validator(PG_FUNCTION_ARGS)
1923 : {
1924 5 : List *options_list = untransformRelOptions(PG_GETARG_DATUM(0));
1925 5 : Oid context = PG_GETARG_OID(1);
1926 : ListCell *cell;
1927 :
1928 : static const PQconninfoOption *options = NULL;
1929 ECB :
1930 : /*
1931 : * Get list of valid libpq options.
1932 : *
1933 : * To avoid unnecessary work, we get the list once and use it throughout
1934 : * the lifetime of this backend process. We don't need to care about
1935 : * memory context issues, because PQconndefaults allocates with malloc.
1936 : */
1937 GIC 5 : if (!options)
1938 : {
1939 2 : options = PQconndefaults();
1940 2 : if (!options) /* assume reason for failure is OOM */
1941 UIC 0 : ereport(ERROR,
1942 : (errcode(ERRCODE_FDW_OUT_OF_MEMORY),
1943 : errmsg("out of memory"),
1944 : errdetail("Could not get libpq's default connection options.")));
1945 ECB : }
1946 :
1947 : /* Validate each supplied option. */
1948 GIC 8 : foreach(cell, options_list)
1949 : {
1950 5 : DefElem *def = (DefElem *) lfirst(cell);
1951 :
1952 CBC 5 : if (!is_valid_dblink_option(options, def->defname, context))
1953 : {
1954 : /*
1955 : * Unknown option, or invalid option for the context specified, so
1956 : * complain about it. Provide a hint with a valid option that
1957 : * looks similar, if there is one.
1958 ECB : */
1959 : const PQconninfoOption *opt;
1960 : const char *closest_match;
1961 : ClosestMatchState match_state;
1962 GNC 2 : bool has_valid_options = false;
1963 ECB :
1964 GNC 2 : initClosestMatch(&match_state, def->defname, 4);
1965 GIC 80 : for (opt = options; opt->keyword; opt++)
1966 : {
1967 78 : if (is_valid_dblink_option(options, opt->keyword, context))
1968 : {
1969 GNC 3 : has_valid_options = true;
1970 3 : updateClosestMatch(&match_state, opt->keyword);
1971 : }
1972 : }
1973 :
1974 2 : closest_match = getClosestMatch(&match_state);
1975 CBC 2 : ereport(ERROR,
1976 : (errcode(ERRCODE_FDW_OPTION_NAME_NOT_FOUND),
1977 ECB : errmsg("invalid option \"%s\"", def->defname),
1978 : has_valid_options ? closest_match ?
1979 : errhint("Perhaps you meant the option \"%s\".",
1980 : closest_match) : 0 :
1981 : errhint("There are no valid options in this context.")));
1982 : }
1983 : }
1984 :
1985 CBC 3 : PG_RETURN_VOID();
1986 : }
1987 ECB :
1988 :
1989 : /*************************************************************
1990 : * internal functions
1991 : */
1992 :
1993 :
1994 : /*
1995 : * get_pkey_attnames
1996 : *
1997 : * Get the primary key attnames for the given relation.
1998 : * Return NULL, and set indnkeyatts = 0, if no primary key exists.
1999 : */
2000 : static char **
2001 GIC 3 : get_pkey_attnames(Relation rel, int16 *indnkeyatts)
2002 : {
2003 : Relation indexRelation;
2004 : ScanKeyData skey;
2005 ECB : SysScanDesc scan;
2006 : HeapTuple indexTuple;
2007 : int i;
2008 CBC 3 : char **result = NULL;
2009 : TupleDesc tupdesc;
2010 :
2011 : /* initialize indnkeyatts to 0 in case no primary key exists */
2012 GIC 3 : *indnkeyatts = 0;
2013 :
2014 3 : tupdesc = rel->rd_att;
2015 :
2016 : /* Prepare to scan pg_index for entries having indrelid = this rel. */
2017 3 : indexRelation = table_open(IndexRelationId, AccessShareLock);
2018 3 : ScanKeyInit(&skey,
2019 ECB : Anum_pg_index_indrelid,
2020 : BTEqualStrategyNumber, F_OIDEQ,
2021 : ObjectIdGetDatum(RelationGetRelid(rel)));
2022 :
2023 CBC 3 : scan = systable_beginscan(indexRelation, IndexIndrelidIndexId, true,
2024 : NULL, 1, &skey);
2025 :
2026 3 : while (HeapTupleIsValid(indexTuple = systable_getnext(scan)))
2027 : {
2028 3 : Form_pg_index index = (Form_pg_index) GETSTRUCT(indexTuple);
2029 ECB :
2030 : /* we're only interested if it is the primary key */
2031 GIC 3 : if (index->indisprimary)
2032 ECB : {
2033 GIC 3 : *indnkeyatts = index->indnkeyatts;
2034 CBC 3 : if (*indnkeyatts > 0)
2035 : {
2036 GNC 3 : result = palloc_array(char *, *indnkeyatts);
2037 :
2038 GIC 9 : for (i = 0; i < *indnkeyatts; i++)
2039 6 : result[i] = SPI_fname(tupdesc, index->indkey.values[i]);
2040 ECB : }
2041 CBC 3 : break;
2042 ECB : }
2043 : }
2044 :
2045 GIC 3 : systable_endscan(scan);
2046 CBC 3 : table_close(indexRelation, AccessShareLock);
2047 :
2048 GBC 3 : return result;
2049 EUB : }
2050 :
2051 : /*
2052 : * Deconstruct a text[] into C-strings (note any NULL elements will be
2053 : * returned as NULL pointers)
2054 : */
2055 : static char **
2056 GIC 20 : get_text_array_contents(ArrayType *array, int *numitems)
2057 ECB : {
2058 GIC 20 : int ndim = ARR_NDIM(array);
2059 20 : int *dims = ARR_DIMS(array);
2060 : int nitems;
2061 ECB : int16 typlen;
2062 : bool typbyval;
2063 : char typalign;
2064 : char **values;
2065 : char *ptr;
2066 : bits8 *bitmap;
2067 : int bitmask;
2068 : int i;
2069 :
2070 GIC 20 : Assert(ARR_ELEMTYPE(array) == TEXTOID);
2071 :
2072 20 : *numitems = nitems = ArrayGetNItems(ndim, dims);
2073 ECB :
2074 GIC 20 : get_typlenbyvalalign(ARR_ELEMTYPE(array),
2075 : &typlen, &typbyval, &typalign);
2076 ECB :
2077 GNC 20 : values = palloc_array(char *, nitems);
2078 ECB :
2079 CBC 20 : ptr = ARR_DATA_PTR(array);
2080 GIC 20 : bitmap = ARR_NULLBITMAP(array);
2081 CBC 20 : bitmask = 1;
2082 ECB :
2083 GBC 55 : for (i = 0; i < nitems; i++)
2084 : {
2085 GIC 35 : if (bitmap && (*bitmap & bitmask) == 0)
2086 : {
2087 LBC 0 : values[i] = NULL;
2088 : }
2089 ECB : else
2090 : {
2091 GIC 35 : values[i] = TextDatumGetCString(PointerGetDatum(ptr));
2092 CBC 35 : ptr = att_addlength_pointer(ptr, typlen, ptr);
2093 GIC 35 : ptr = (char *) att_align_nominal(ptr, typalign);
2094 ECB : }
2095 :
2096 : /* advance bitmap pointer if any */
2097 CBC 35 : if (bitmap)
2098 ECB : {
2099 UIC 0 : bitmask <<= 1;
2100 LBC 0 : if (bitmask == 0x100)
2101 ECB : {
2102 LBC 0 : bitmap++;
2103 UIC 0 : bitmask = 1;
2104 : }
2105 ECB : }
2106 : }
2107 :
2108 GIC 20 : return values;
2109 : }
2110 ECB :
2111 : static char *
2112 GIC 4 : get_sql_insert(Relation rel, int *pkattnums, int pknumatts, char **src_pkattvals, char **tgt_pkattvals)
2113 ECB : {
2114 : char *relname;
2115 : HeapTuple tuple;
2116 : TupleDesc tupdesc;
2117 : int natts;
2118 : StringInfoData buf;
2119 : char *val;
2120 : int key;
2121 : int i;
2122 : bool needComma;
2123 :
2124 CBC 4 : initStringInfo(&buf);
2125 :
2126 ECB : /* get relation name including any needed schema prefix and quoting */
2127 GIC 4 : relname = generate_relation_name(rel);
2128 ECB :
2129 CBC 4 : tupdesc = rel->rd_att;
2130 GIC 4 : natts = tupdesc->natts;
2131 :
2132 GBC 4 : tuple = get_tuple_of_interest(rel, pkattnums, pknumatts, src_pkattvals);
2133 CBC 4 : if (!tuple)
2134 UIC 0 : ereport(ERROR,
2135 ECB : (errcode(ERRCODE_CARDINALITY_VIOLATION),
2136 : errmsg("source row not found")));
2137 :
2138 GIC 4 : appendStringInfo(&buf, "INSERT INTO %s(", relname);
2139 :
2140 4 : needComma = false;
2141 CBC 19 : for (i = 0; i < natts; i++)
2142 : {
2143 GIC 15 : Form_pg_attribute att = TupleDescAttr(tupdesc, i);
2144 :
2145 15 : if (att->attisdropped)
2146 2 : continue;
2147 :
2148 CBC 13 : if (needComma)
2149 GIC 9 : appendStringInfoChar(&buf, ',');
2150 :
2151 CBC 13 : appendStringInfoString(&buf,
2152 GIC 13 : quote_ident_cstr(NameStr(att->attname)));
2153 CBC 13 : needComma = true;
2154 : }
2155 ECB :
2156 CBC 4 : appendStringInfoString(&buf, ") VALUES(");
2157 :
2158 ECB : /*
2159 : * Note: i is physical column number (counting from 0).
2160 : */
2161 CBC 4 : needComma = false;
2162 19 : for (i = 0; i < natts; i++)
2163 : {
2164 15 : if (TupleDescAttr(tupdesc, i)->attisdropped)
2165 2 : continue;
2166 :
2167 13 : if (needComma)
2168 9 : appendStringInfoChar(&buf, ',');
2169 ECB :
2170 GIC 13 : key = get_attnum_pk_pos(pkattnums, pknumatts, i);
2171 EUB :
2172 GIC 13 : if (key >= 0)
2173 7 : val = tgt_pkattvals[key] ? pstrdup(tgt_pkattvals[key]) : NULL;
2174 ECB : else
2175 GIC 6 : val = SPI_getvalue(tuple, tupdesc, i + 1);
2176 :
2177 13 : if (val != NULL)
2178 ECB : {
2179 GIC 13 : appendStringInfoString(&buf, quote_literal_cstr(val));
2180 13 : pfree(val);
2181 : }
2182 : else
2183 UIC 0 : appendStringInfoString(&buf, "NULL");
2184 GIC 13 : needComma = true;
2185 : }
2186 4 : appendStringInfoChar(&buf, ')');
2187 :
2188 4 : return buf.data;
2189 : }
2190 ECB :
2191 : static char *
2192 GIC 4 : get_sql_delete(Relation rel, int *pkattnums, int pknumatts, char **tgt_pkattvals)
2193 ECB : {
2194 : char *relname;
2195 : TupleDesc tupdesc;
2196 : StringInfoData buf;
2197 : int i;
2198 :
2199 CBC 4 : initStringInfo(&buf);
2200 EUB :
2201 : /* get relation name including any needed schema prefix and quoting */
2202 GIC 4 : relname = generate_relation_name(rel);
2203 :
2204 CBC 4 : tupdesc = rel->rd_att;
2205 :
2206 GIC 4 : appendStringInfo(&buf, "DELETE FROM %s WHERE ", relname);
2207 11 : for (i = 0; i < pknumatts; i++)
2208 : {
2209 CBC 7 : int pkattnum = pkattnums[i];
2210 7 : Form_pg_attribute attr = TupleDescAttr(tupdesc, pkattnum);
2211 :
2212 7 : if (i > 0)
2213 GIC 3 : appendStringInfoString(&buf, " AND ");
2214 ECB :
2215 CBC 7 : appendStringInfoString(&buf,
2216 GIC 7 : quote_ident_cstr(NameStr(attr->attname)));
2217 ECB :
2218 CBC 7 : if (tgt_pkattvals[i] != NULL)
2219 GIC 7 : appendStringInfo(&buf, " = %s",
2220 CBC 7 : quote_literal_cstr(tgt_pkattvals[i]));
2221 ECB : else
2222 UIC 0 : appendStringInfoString(&buf, " IS NULL");
2223 ECB : }
2224 :
2225 CBC 4 : return buf.data;
2226 ECB : }
2227 :
2228 : static char *
2229 GIC 4 : get_sql_update(Relation rel, int *pkattnums, int pknumatts, char **src_pkattvals, char **tgt_pkattvals)
2230 ECB : {
2231 : char *relname;
2232 : HeapTuple tuple;
2233 : TupleDesc tupdesc;
2234 : int natts;
2235 : StringInfoData buf;
2236 EUB : char *val;
2237 ECB : int key;
2238 : int i;
2239 : bool needComma;
2240 :
2241 GIC 4 : initStringInfo(&buf);
2242 ECB :
2243 : /* get relation name including any needed schema prefix and quoting */
2244 CBC 4 : relname = generate_relation_name(rel);
2245 ECB :
2246 GIC 4 : tupdesc = rel->rd_att;
2247 CBC 4 : natts = tupdesc->natts;
2248 ECB :
2249 GIC 4 : tuple = get_tuple_of_interest(rel, pkattnums, pknumatts, src_pkattvals);
2250 CBC 4 : if (!tuple)
2251 LBC 0 : ereport(ERROR,
2252 : (errcode(ERRCODE_CARDINALITY_VIOLATION),
2253 ECB : errmsg("source row not found")));
2254 :
2255 CBC 4 : appendStringInfo(&buf, "UPDATE %s SET ", relname);
2256 ECB :
2257 : /*
2258 EUB : * Note: i is physical column number (counting from 0).
2259 : */
2260 GIC 4 : needComma = false;
2261 CBC 19 : for (i = 0; i < natts; i++)
2262 : {
2263 GIC 15 : Form_pg_attribute attr = TupleDescAttr(tupdesc, i);
2264 :
2265 15 : if (attr->attisdropped)
2266 2 : continue;
2267 :
2268 13 : if (needComma)
2269 CBC 9 : appendStringInfoString(&buf, ", ");
2270 :
2271 GIC 13 : appendStringInfo(&buf, "%s = ",
2272 13 : quote_ident_cstr(NameStr(attr->attname)));
2273 :
2274 13 : key = get_attnum_pk_pos(pkattnums, pknumatts, i);
2275 ECB :
2276 CBC 13 : if (key >= 0)
2277 GIC 7 : val = tgt_pkattvals[key] ? pstrdup(tgt_pkattvals[key]) : NULL;
2278 ECB : else
2279 GIC 6 : val = SPI_getvalue(tuple, tupdesc, i + 1);
2280 ECB :
2281 GIC 13 : if (val != NULL)
2282 : {
2283 13 : appendStringInfoString(&buf, quote_literal_cstr(val));
2284 CBC 13 : pfree(val);
2285 : }
2286 : else
2287 UIC 0 : appendStringInfoString(&buf, "NULL");
2288 GIC 13 : needComma = true;
2289 : }
2290 :
2291 CBC 4 : appendStringInfoString(&buf, " WHERE ");
2292 ECB :
2293 CBC 11 : for (i = 0; i < pknumatts; i++)
2294 : {
2295 7 : int pkattnum = pkattnums[i];
2296 GIC 7 : Form_pg_attribute attr = TupleDescAttr(tupdesc, pkattnum);
2297 :
2298 7 : if (i > 0)
2299 CBC 3 : appendStringInfoString(&buf, " AND ");
2300 :
2301 GIC 7 : appendStringInfoString(&buf,
2302 7 : quote_ident_cstr(NameStr(attr->attname)));
2303 :
2304 7 : val = tgt_pkattvals[i];
2305 :
2306 7 : if (val != NULL)
2307 7 : appendStringInfo(&buf, " = %s", quote_literal_cstr(val));
2308 : else
2309 UIC 0 : appendStringInfoString(&buf, " IS NULL");
2310 : }
2311 :
2312 CBC 4 : return buf.data;
2313 : }
2314 EUB :
2315 : /*
2316 ECB : * Return a properly quoted identifier.
2317 : * Uses quote_ident in quote.c
2318 : */
2319 : static char *
2320 GIC 80 : quote_ident_cstr(char *rawstr)
2321 ECB : {
2322 : text *rawstr_text;
2323 : text *result_text;
2324 : char *result;
2325 :
2326 GIC 80 : rawstr_text = cstring_to_text(rawstr);
2327 80 : result_text = DatumGetTextPP(DirectFunctionCall1(quote_ident,
2328 : PointerGetDatum(rawstr_text)));
2329 80 : result = text_to_cstring(result_text);
2330 :
2331 CBC 80 : return result;
2332 : }
2333 ECB :
2334 : static int
2335 CBC 26 : get_attnum_pk_pos(int *pkattnums, int pknumatts, int key)
2336 : {
2337 ECB : int i;
2338 :
2339 : /*
2340 : * Not likely a long list anyway, so just scan for the value
2341 : */
2342 GIC 50 : for (i = 0; i < pknumatts; i++)
2343 CBC 38 : if (key == pkattnums[i])
2344 14 : return i;
2345 :
2346 GIC 12 : return -1;
2347 ECB : }
2348 :
2349 : static HeapTuple
2350 GIC 8 : get_tuple_of_interest(Relation rel, int *pkattnums, int pknumatts, char **src_pkattvals)
2351 ECB : {
2352 : char *relname;
2353 : TupleDesc tupdesc;
2354 : int natts;
2355 : StringInfoData buf;
2356 : int ret;
2357 : HeapTuple tuple;
2358 : int i;
2359 :
2360 : /*
2361 : * Connect to SPI manager
2362 : */
2363 GIC 8 : if ((ret = SPI_connect()) < 0)
2364 EUB : /* internal error */
2365 UIC 0 : elog(ERROR, "SPI connect failure - returned %d", ret);
2366 :
2367 GIC 8 : initStringInfo(&buf);
2368 :
2369 : /* get relation name including any needed schema prefix and quoting */
2370 CBC 8 : relname = generate_relation_name(rel);
2371 ECB :
2372 GIC 8 : tupdesc = rel->rd_att;
2373 8 : natts = tupdesc->natts;
2374 :
2375 : /*
2376 ECB : * Build sql statement to look up tuple of interest, ie, the one matching
2377 EUB : * src_pkattvals. We used to use "SELECT *" here, but it's simpler to
2378 : * generate a result tuple that matches the table's physical structure,
2379 : * with NULLs for any dropped columns. Otherwise we have to deal with two
2380 : * different tupdescs and everything's very confusing.
2381 ECB : */
2382 GIC 8 : appendStringInfoString(&buf, "SELECT ");
2383 ECB :
2384 GIC 38 : for (i = 0; i < natts; i++)
2385 ECB : {
2386 CBC 30 : Form_pg_attribute attr = TupleDescAttr(tupdesc, i);
2387 :
2388 30 : if (i > 0)
2389 GIC 22 : appendStringInfoString(&buf, ", ");
2390 :
2391 30 : if (attr->attisdropped)
2392 4 : appendStringInfoString(&buf, "NULL");
2393 : else
2394 26 : appendStringInfoString(&buf,
2395 GBC 26 : quote_ident_cstr(NameStr(attr->attname)));
2396 : }
2397 EUB :
2398 GIC 8 : appendStringInfo(&buf, " FROM %s WHERE ", relname);
2399 :
2400 22 : for (i = 0; i < pknumatts; i++)
2401 : {
2402 14 : int pkattnum = pkattnums[i];
2403 14 : Form_pg_attribute attr = TupleDescAttr(tupdesc, pkattnum);
2404 :
2405 14 : if (i > 0)
2406 6 : appendStringInfoString(&buf, " AND ");
2407 :
2408 14 : appendStringInfoString(&buf,
2409 14 : quote_ident_cstr(NameStr(attr->attname)));
2410 :
2411 14 : if (src_pkattvals[i] != NULL)
2412 CBC 14 : appendStringInfo(&buf, " = %s",
2413 GIC 14 : quote_literal_cstr(src_pkattvals[i]));
2414 : else
2415 UIC 0 : appendStringInfoString(&buf, " IS NULL");
2416 : }
2417 :
2418 ECB : /*
2419 : * Retrieve the desired tuple
2420 : */
2421 CBC 8 : ret = SPI_exec(buf.data, 0);
2422 GIC 8 : pfree(buf.data);
2423 ECB :
2424 EUB : /*
2425 : * Only allow one qualifying tuple
2426 : */
2427 CBC 8 : if ((ret == SPI_OK_SELECT) && (SPI_processed > 1))
2428 UIC 0 : ereport(ERROR,
2429 : (errcode(ERRCODE_CARDINALITY_VIOLATION),
2430 : errmsg("source criteria matched more than one record")));
2431 :
2432 GIC 8 : else if (ret == SPI_OK_SELECT && SPI_processed == 1)
2433 : {
2434 8 : SPITupleTable *tuptable = SPI_tuptable;
2435 :
2436 8 : tuple = SPI_copytuple(tuptable->vals[0]);
2437 CBC 8 : SPI_finish();
2438 :
2439 GIC 8 : return tuple;
2440 : }
2441 : else
2442 : {
2443 ECB : /*
2444 : * no qualifying tuples
2445 : */
2446 LBC 0 : SPI_finish();
2447 :
2448 0 : return NULL;
2449 : }
2450 ECB :
2451 : /*
2452 : * never reached, but keep compiler quiet
2453 : */
2454 : return NULL;
2455 : }
2456 :
2457 : /*
2458 : * Open the relation named by relname_text, acquire specified type of lock,
2459 : * verify we have specified permissions.
2460 : * Caller must close rel when done with it.
2461 : */
2462 : static Relation
2463 CBC 21 : get_rel_from_relname(text *relname_text, LOCKMODE lockmode, AclMode aclmode)
2464 ECB : {
2465 : RangeVar *relvar;
2466 : Relation rel;
2467 : AclResult aclresult;
2468 :
2469 CBC 21 : relvar = makeRangeVarFromNameList(textToQualifiedNameList(relname_text));
2470 GIC 21 : rel = table_openrv(relvar, lockmode);
2471 ECB :
2472 GIC 21 : aclresult = pg_class_aclcheck(RelationGetRelid(rel), GetUserId(),
2473 : aclmode);
2474 21 : if (aclresult != ACLCHECK_OK)
2475 LBC 0 : aclcheck_error(aclresult, get_relkind_objtype(rel->rd_rel->relkind),
2476 UIC 0 : RelationGetRelationName(rel));
2477 :
2478 GIC 21 : return rel;
2479 ECB : }
2480 :
2481 : /*
2482 : * generate_relation_name - copied from ruleutils.c
2483 : * Compute the name to display for a relation
2484 : *
2485 : * The result includes all necessary quoting and schema-prefixing.
2486 : */
2487 : static char *
2488 GIC 20 : generate_relation_name(Relation rel)
2489 : {
2490 : char *nspname;
2491 : char *result;
2492 :
2493 ECB : /* Qualify the name if not visible in search path */
2494 CBC 20 : if (RelationIsVisible(RelationGetRelid(rel)))
2495 GIC 15 : nspname = NULL;
2496 ECB : else
2497 CBC 5 : nspname = get_namespace_name(rel->rd_rel->relnamespace);
2498 ECB :
2499 GIC 20 : result = quote_qualified_identifier(nspname, RelationGetRelationName(rel));
2500 :
2501 CBC 20 : return result;
2502 : }
2503 ECB :
2504 :
2505 : static remoteConn *
2506 CBC 75 : getConnectionByName(const char *name)
2507 : {
2508 : remoteConnHashEnt *hentry;
2509 : char *key;
2510 :
2511 75 : if (!remoteConnHash)
2512 2 : remoteConnHash = createConnHash();
2513 ECB :
2514 GIC 75 : key = pstrdup(name);
2515 75 : truncate_identifier(key, strlen(key), false);
2516 CBC 75 : hentry = (remoteConnHashEnt *) hash_search(remoteConnHash,
2517 : key, HASH_FIND, NULL);
2518 :
2519 GIC 75 : if (hentry)
2520 68 : return hentry->rconn;
2521 :
2522 CBC 7 : return NULL;
2523 EUB : }
2524 :
2525 ECB : static HTAB *
2526 CBC 3 : createConnHash(void)
2527 ECB : {
2528 : HASHCTL ctl;
2529 :
2530 CBC 3 : ctl.keysize = NAMEDATALEN;
2531 GBC 3 : ctl.entrysize = sizeof(remoteConnHashEnt);
2532 :
2533 GIC 3 : return hash_create("Remote Con hash", NUMCONN, &ctl,
2534 ECB : HASH_ELEM | HASH_STRINGS);
2535 : }
2536 :
2537 : static void
2538 GIC 10 : createNewConnection(const char *name, remoteConn *rconn)
2539 ECB : {
2540 : remoteConnHashEnt *hentry;
2541 EUB : bool found;
2542 : char *key;
2543 :
2544 GBC 10 : if (!remoteConnHash)
2545 1 : remoteConnHash = createConnHash();
2546 :
2547 10 : key = pstrdup(name);
2548 GIC 10 : truncate_identifier(key, strlen(key), true);
2549 10 : hentry = (remoteConnHashEnt *) hash_search(remoteConnHash, key,
2550 : HASH_ENTER, &found);
2551 :
2552 10 : if (found)
2553 : {
2554 GNC 1 : libpqsrv_disconnect(rconn->conn);
2555 GIC 1 : pfree(rconn);
2556 :
2557 1 : ereport(ERROR,
2558 : (errcode(ERRCODE_DUPLICATE_OBJECT),
2559 : errmsg("duplicate connection name")));
2560 : }
2561 :
2562 CBC 9 : hentry->rconn = rconn;
2563 GIC 9 : strlcpy(hentry->name, name, sizeof(hentry->name));
2564 CBC 9 : }
2565 :
2566 : static void
2567 GIC 8 : deleteConnection(const char *name)
2568 ECB : {
2569 : remoteConnHashEnt *hentry;
2570 : bool found;
2571 : char *key;
2572 :
2573 CBC 8 : if (!remoteConnHash)
2574 UIC 0 : remoteConnHash = createConnHash();
2575 ECB :
2576 GIC 8 : key = pstrdup(name);
2577 CBC 8 : truncate_identifier(key, strlen(key), false);
2578 GIC 8 : hentry = (remoteConnHashEnt *) hash_search(remoteConnHash,
2579 EUB : key, HASH_REMOVE, &found);
2580 :
2581 GIC 8 : if (!hentry)
2582 UIC 0 : ereport(ERROR,
2583 : (errcode(ERRCODE_UNDEFINED_OBJECT),
2584 ECB : errmsg("undefined connection name")));
2585 GIC 8 : }
2586 :
2587 ECB : static void
2588 CBC 19 : dblink_security_check(PGconn *conn, remoteConn *rconn)
2589 : {
2590 GIC 19 : if (!superuser())
2591 : {
2592 UIC 0 : if (!PQconnectionUsedPassword(conn))
2593 ECB : {
2594 UNC 0 : libpqsrv_disconnect(conn);
2595 UIC 0 : if (rconn)
2596 0 : pfree(rconn);
2597 :
2598 0 : ereport(ERROR,
2599 : (errcode(ERRCODE_S_R_E_PROHIBITED_SQL_STATEMENT_ATTEMPTED),
2600 : errmsg("password is required"),
2601 : errdetail("Non-superuser cannot connect if the server does not request a password."),
2602 : errhint("Target server's authentication method must be changed.")));
2603 ECB : }
2604 : }
2605 GIC 19 : }
2606 :
2607 ECB : /*
2608 : * For non-superusers, insist that the connstr specify a password. This
2609 : * prevents a password from being picked up from .pgpass, a service file,
2610 : * the environment, etc. We don't want the postgres user's passwords
2611 : * to be accessible to non-superusers.
2612 : */
2613 : static void
2614 GIC 22 : dblink_connstr_check(const char *connstr)
2615 : {
2616 22 : if (!superuser())
2617 : {
2618 : PQconninfoOption *options;
2619 : PQconninfoOption *option;
2620 CBC 1 : bool connstr_gives_password = false;
2621 ECB :
2622 GIC 1 : options = PQconninfoParse(connstr, NULL);
2623 CBC 1 : if (options)
2624 : {
2625 40 : for (option = options; option->keyword != NULL; option++)
2626 ECB : {
2627 GIC 39 : if (strcmp(option->keyword, "password") == 0)
2628 : {
2629 1 : if (option->val != NULL && option->val[0] != '\0')
2630 : {
2631 UIC 0 : connstr_gives_password = true;
2632 UBC 0 : break;
2633 : }
2634 ECB : }
2635 : }
2636 CBC 1 : PQconninfoFree(options);
2637 ECB : }
2638 :
2639 GIC 1 : if (!connstr_gives_password)
2640 1 : ereport(ERROR,
2641 : (errcode(ERRCODE_S_R_E_PROHIBITED_SQL_STATEMENT_ATTEMPTED),
2642 : errmsg("password is required"),
2643 : errdetail("Non-superusers must provide a password in the connection string.")));
2644 ECB : }
2645 GBC 21 : }
2646 :
2647 : /*
2648 : * Report an error received from the remote server
2649 : *
2650 : * res: the received error result (will be freed)
2651 : * fail: true for ERROR ereport, false for NOTICE
2652 : * fmt and following args: sprintf-style format and values for errcontext;
2653 ECB : * the resulting string should be worded like "while <some action>"
2654 : */
2655 : static void
2656 GIC 12 : dblink_res_error(PGconn *conn, const char *conname, PGresult *res,
2657 : bool fail, const char *fmt,...)
2658 : {
2659 : int level;
2660 12 : char *pg_diag_sqlstate = PQresultErrorField(res, PG_DIAG_SQLSTATE);
2661 12 : char *pg_diag_message_primary = PQresultErrorField(res, PG_DIAG_MESSAGE_PRIMARY);
2662 CBC 12 : char *pg_diag_message_detail = PQresultErrorField(res, PG_DIAG_MESSAGE_DETAIL);
2663 12 : char *pg_diag_message_hint = PQresultErrorField(res, PG_DIAG_MESSAGE_HINT);
2664 12 : char *pg_diag_context = PQresultErrorField(res, PG_DIAG_CONTEXT);
2665 : int sqlstate;
2666 ECB : char *message_primary;
2667 : char *message_detail;
2668 : char *message_hint;
2669 : char *message_context;
2670 : va_list ap;
2671 : char dblink_context_msg[512];
2672 :
2673 GIC 12 : if (fail)
2674 3 : level = ERROR;
2675 : else
2676 9 : level = NOTICE;
2677 :
2678 12 : if (pg_diag_sqlstate)
2679 CBC 12 : sqlstate = MAKE_SQLSTATE(pg_diag_sqlstate[0],
2680 : pg_diag_sqlstate[1],
2681 : pg_diag_sqlstate[2],
2682 : pg_diag_sqlstate[3],
2683 : pg_diag_sqlstate[4]);
2684 : else
2685 LBC 0 : sqlstate = ERRCODE_CONNECTION_FAILURE;
2686 :
2687 CBC 12 : message_primary = xpstrdup(pg_diag_message_primary);
2688 GIC 12 : message_detail = xpstrdup(pg_diag_message_detail);
2689 12 : message_hint = xpstrdup(pg_diag_message_hint);
2690 12 : message_context = xpstrdup(pg_diag_context);
2691 :
2692 : /*
2693 : * If we don't get a message from the PGresult, try the PGconn. This is
2694 : * needed because for connection-level failures, PQexec may just return
2695 : * NULL, not a PGresult at all.
2696 : */
2697 CBC 12 : if (message_primary == NULL)
2698 UIC 0 : message_primary = pchomp(PQerrorMessage(conn));
2699 :
2700 : /*
2701 : * Now that we've copied all the data we need out of the PGresult, it's
2702 : * safe to free it. We must do this to avoid PGresult leakage. We're
2703 : * leaking all the strings too, but those are in palloc'd memory that will
2704 : * get cleaned up eventually.
2705 : */
2706 GNC 12 : PQclear(res);
2707 ECB :
2708 : /*
2709 EUB : * Format the basic errcontext string. Below, we'll add on something
2710 : * about the connection name. That's a violation of the translatability
2711 : * guidelines about constructing error messages out of parts, but since
2712 : * there's no translation support for dblink, there's no need to worry
2713 : * about that (yet).
2714 : */
2715 GIC 12 : va_start(ap, fmt);
2716 CBC 12 : vsnprintf(dblink_context_msg, sizeof(dblink_context_msg), fmt, ap);
2717 12 : va_end(ap);
2718 ECB :
2719 GIC 12 : ereport(level,
2720 ECB : (errcode(sqlstate),
2721 : (message_primary != NULL && message_primary[0] != '\0') ?
2722 : errmsg_internal("%s", message_primary) :
2723 : errmsg("could not obtain message string for remote error"),
2724 : message_detail ? errdetail_internal("%s", message_detail) : 0,
2725 : message_hint ? errhint("%s", message_hint) : 0,
2726 : message_context ? (errcontext("%s", message_context)) : 0,
2727 : conname ?
2728 : (errcontext("%s on dblink connection named \"%s\"",
2729 : dblink_context_msg, conname)) :
2730 : (errcontext("%s on unnamed dblink connection",
2731 : dblink_context_msg))));
2732 GBC 9 : }
2733 :
2734 ECB : /*
2735 : * Obtain connection string for a foreign server
2736 EUB : */
2737 : static char *
2738 GBC 22 : get_connect_string(const char *servername)
2739 EUB : {
2740 GBC 22 : ForeignServer *foreign_server = NULL;
2741 : UserMapping *user_mapping;
2742 : ListCell *cell;
2743 ECB : StringInfoData buf;
2744 : ForeignDataWrapper *fdw;
2745 : AclResult aclresult;
2746 : char *srvname;
2747 :
2748 : static const PQconninfoOption *options = NULL;
2749 :
2750 GIC 22 : initStringInfo(&buf);
2751 :
2752 ECB : /*
2753 : * Get list of valid libpq options.
2754 : *
2755 : * To avoid unnecessary work, we get the list once and use it throughout
2756 : * the lifetime of this backend process. We don't need to care about
2757 : * memory context issues, because PQconndefaults allocates with malloc.
2758 : */
2759 CBC 22 : if (!options)
2760 : {
2761 GIC 3 : options = PQconndefaults();
2762 CBC 3 : if (!options) /* assume reason for failure is OOM */
2763 UIC 0 : ereport(ERROR,
2764 : (errcode(ERRCODE_FDW_OUT_OF_MEMORY),
2765 ECB : errmsg("out of memory"),
2766 : errdetail("Could not get libpq's default connection options.")));
2767 : }
2768 :
2769 : /* first gather the server connstr options */
2770 GIC 22 : srvname = pstrdup(servername);
2771 22 : truncate_identifier(srvname, strlen(srvname), false);
2772 22 : foreign_server = GetForeignServerByName(srvname, true);
2773 :
2774 CBC 22 : if (foreign_server)
2775 : {
2776 GIC 2 : Oid serverid = foreign_server->serverid;
2777 2 : Oid fdwid = foreign_server->fdwid;
2778 2 : Oid userid = GetUserId();
2779 ECB :
2780 GIC 2 : user_mapping = GetUserMapping(userid, serverid);
2781 CBC 2 : fdw = GetForeignDataWrapper(fdwid);
2782 :
2783 ECB : /* Check permissions, user must have usage on the server. */
2784 GNC 2 : aclresult = object_aclcheck(ForeignServerRelationId, serverid, userid, ACL_USAGE);
2785 CBC 2 : if (aclresult != ACLCHECK_OK)
2786 UIC 0 : aclcheck_error(aclresult, OBJECT_FOREIGN_SERVER, foreign_server->servername);
2787 :
2788 CBC 2 : foreach(cell, fdw->options)
2789 : {
2790 UIC 0 : DefElem *def = lfirst(cell);
2791 :
2792 0 : if (is_valid_dblink_option(options, def->defname, ForeignDataWrapperRelationId))
2793 0 : appendStringInfo(&buf, "%s='%s' ", def->defname,
2794 0 : escape_param_str(strVal(def->arg)));
2795 : }
2796 :
2797 GIC 6 : foreach(cell, foreign_server->options)
2798 : {
2799 4 : DefElem *def = lfirst(cell);
2800 :
2801 4 : if (is_valid_dblink_option(options, def->defname, ForeignServerRelationId))
2802 4 : appendStringInfo(&buf, "%s='%s' ", def->defname,
2803 4 : escape_param_str(strVal(def->arg)));
2804 : }
2805 :
2806 4 : foreach(cell, user_mapping->options)
2807 ECB : {
2808 :
2809 GIC 2 : DefElem *def = lfirst(cell);
2810 :
2811 CBC 2 : if (is_valid_dblink_option(options, def->defname, UserMappingRelationId))
2812 2 : appendStringInfo(&buf, "%s='%s' ", def->defname,
2813 GIC 2 : escape_param_str(strVal(def->arg)));
2814 : }
2815 :
2816 CBC 2 : return buf.data;
2817 : }
2818 : else
2819 20 : return NULL;
2820 EUB : }
2821 :
2822 : /*
2823 : * Escaping libpq connect parameter strings.
2824 : *
2825 ECB : * Replaces "'" with "\'" and "\" with "\\".
2826 : */
2827 : static char *
2828 GIC 6 : escape_param_str(const char *str)
2829 ECB : {
2830 : const char *cp;
2831 : StringInfoData buf;
2832 :
2833 GIC 6 : initStringInfo(&buf);
2834 :
2835 64 : for (cp = str; *cp; cp++)
2836 ECB : {
2837 CBC 58 : if (*cp == '\\' || *cp == '\'')
2838 UIC 0 : appendStringInfoChar(&buf, '\\');
2839 GIC 58 : appendStringInfoChar(&buf, *cp);
2840 : }
2841 :
2842 CBC 6 : return buf.data;
2843 ECB : }
2844 :
2845 : /*
2846 : * Validate the PK-attnums argument for dblink_build_sql_insert() and related
2847 : * functions, and translate to the internal representation.
2848 : *
2849 : * The user supplies an int2vector of 1-based logical attnums, plus a count
2850 : * argument (the need for the separate count argument is historical, but we
2851 : * still check it). We check that each attnum corresponds to a valid,
2852 : * non-dropped attribute of the rel. We do *not* prevent attnums from being
2853 : * listed twice, though the actual use-case for such things is dubious.
2854 : * Note that before Postgres 9.0, the user's attnums were interpreted as
2855 : * physical not logical column numbers; this was changed for future-proofing.
2856 EUB : *
2857 : * The internal representation is a palloc'd int array of 0-based physical
2858 : * attnums.
2859 : */
2860 ECB : static void
2861 GIC 18 : validate_pkattnums(Relation rel,
2862 : int2vector *pkattnums_arg, int32 pknumatts_arg,
2863 : int **pkattnums, int *pknumatts)
2864 : {
2865 18 : TupleDesc tupdesc = rel->rd_att;
2866 18 : int natts = tupdesc->natts;
2867 : int i;
2868 :
2869 : /* Don't take more array elements than there are */
2870 18 : pknumatts_arg = Min(pknumatts_arg, pkattnums_arg->dim1);
2871 :
2872 : /* Must have at least one pk attnum selected */
2873 18 : if (pknumatts_arg <= 0)
2874 UIC 0 : ereport(ERROR,
2875 : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
2876 : errmsg("number of key attributes must be > 0")));
2877 :
2878 ECB : /* Allocate output array */
2879 GNC 18 : *pkattnums = palloc_array(int, pknumatts_arg);
2880 GIC 18 : *pknumatts = pknumatts_arg;
2881 :
2882 : /* Validate attnums and convert to internal form */
2883 57 : for (i = 0; i < pknumatts_arg; i++)
2884 ECB : {
2885 GIC 45 : int pkattnum = pkattnums_arg->values[i];
2886 ECB : int lnum;
2887 : int j;
2888 :
2889 : /* Can throw error immediately if out of range */
2890 CBC 45 : if (pkattnum <= 0 || pkattnum > natts)
2891 GIC 6 : ereport(ERROR,
2892 : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
2893 ECB : errmsg("invalid attribute number %d", pkattnum)));
2894 :
2895 : /* Identify which physical column has this logical number */
2896 GIC 39 : lnum = 0;
2897 CBC 69 : for (j = 0; j < natts; j++)
2898 ECB : {
2899 : /* dropped columns don't count */
2900 GIC 69 : if (TupleDescAttr(tupdesc, j)->attisdropped)
2901 3 : continue;
2902 :
2903 66 : if (++lnum == pkattnum)
2904 CBC 39 : break;
2905 : }
2906 ECB :
2907 CBC 39 : if (j < natts)
2908 GIC 39 : (*pkattnums)[i] = j;
2909 : else
2910 UIC 0 : ereport(ERROR,
2911 ECB : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
2912 : errmsg("invalid attribute number %d", pkattnum)));
2913 : }
2914 GIC 12 : }
2915 ECB :
2916 : /*
2917 : * Check if the specified connection option is valid.
2918 : *
2919 : * We basically allow whatever libpq thinks is an option, with these
2920 : * restrictions:
2921 : * debug options: disallowed
2922 : * "client_encoding": disallowed
2923 : * "user": valid only in USER MAPPING options
2924 : * secure options (eg password): valid only in USER MAPPING options
2925 : * others: valid only in FOREIGN SERVER options
2926 : *
2927 : * We disallow client_encoding because it would be overridden anyway via
2928 : * PQclientEncoding; allowing it to be specified would merely promote
2929 : * confusion.
2930 : */
2931 : static bool
2932 GIC 89 : is_valid_dblink_option(const PQconninfoOption *options, const char *option,
2933 : Oid context)
2934 : {
2935 : const PQconninfoOption *opt;
2936 ECB :
2937 : /* Look up the option in libpq result */
2938 GIC 1697 : for (opt = options; opt->keyword; opt++)
2939 ECB : {
2940 GIC 1695 : if (strcmp(opt->keyword, option) == 0)
2941 CBC 87 : break;
2942 ECB : }
2943 GIC 89 : if (opt->keyword == NULL)
2944 2 : return false;
2945 :
2946 : /* Disallow debug options (particularly "replication") */
2947 87 : if (strchr(opt->dispchar, 'D'))
2948 2 : return false;
2949 :
2950 : /* Disallow "client_encoding" */
2951 CBC 85 : if (strcmp(opt->keyword, "client_encoding") == 0)
2952 GBC 2 : return false;
2953 :
2954 : /*
2955 : * If the option is "user" or marked secure, it should be specified only
2956 : * in USER MAPPING. Others should be specified only in SERVER.
2957 : */
2958 CBC 83 : if (strcmp(opt->keyword, "user") == 0 || strchr(opt->dispchar, '*'))
2959 ECB : {
2960 GIC 9 : if (context != UserMappingRelationId)
2961 CBC 3 : return false;
2962 ECB : }
2963 : else
2964 : {
2965 CBC 74 : if (context != ForeignServerRelationId)
2966 68 : return false;
2967 : }
2968 :
2969 12 : return true;
2970 : }
2971 :
2972 : /*
2973 : * Copy the remote session's values of GUCs that affect datatype I/O
2974 ECB : * and apply them locally in a new GUC nesting level. Returns the new
2975 : * nestlevel (which is needed by restoreLocalGucs to undo the settings),
2976 : * or -1 if no new nestlevel was needed.
2977 : *
2978 : * We use the equivalent of a function SET option to allow the settings to
2979 : * persist only until the caller calls restoreLocalGucs. If an error is
2980 : * thrown in between, guc.c will take care of undoing the settings.
2981 : */
2982 : static int
2983 GIC 32 : applyRemoteGucs(PGconn *conn)
2984 ECB : {
2985 : static const char *const GUCsAffectingIO[] = {
2986 : "DateStyle",
2987 : "IntervalStyle"
2988 : };
2989 :
2990 GIC 32 : int nestlevel = -1;
2991 : int i;
2992 :
2993 96 : for (i = 0; i < lengthof(GUCsAffectingIO); i++)
2994 : {
2995 64 : const char *gucName = GUCsAffectingIO[i];
2996 64 : const char *remoteVal = PQparameterStatus(conn, gucName);
2997 : const char *localVal;
2998 :
2999 : /*
3000 : * If the remote server is pre-8.4, it won't have IntervalStyle, but
3001 : * that's okay because its output format won't be ambiguous. So just
3002 : * skip the GUC if we don't get a value for it. (We might eventually
3003 : * need more complicated logic with remote-version checks here.)
3004 : */
3005 64 : if (remoteVal == NULL)
3006 UIC 0 : continue;
3007 :
3008 : /*
3009 : * Avoid GUC-setting overhead if the remote and local GUCs already
3010 : * have the same value.
3011 : */
3012 GIC 64 : localVal = GetConfigOption(gucName, false, false);
3013 64 : Assert(localVal != NULL);
3014 :
3015 64 : if (strcmp(remoteVal, localVal) == 0)
3016 47 : continue;
3017 :
3018 : /* Create new GUC nest level if we didn't already */
3019 17 : if (nestlevel < 0)
3020 9 : nestlevel = NewGUCNestLevel();
3021 :
3022 : /* Apply the option (this will throw error on failure) */
3023 17 : (void) set_config_option(gucName, remoteVal,
3024 : PGC_USERSET, PGC_S_SESSION,
3025 : GUC_ACTION_SAVE, true, 0, false);
3026 : }
3027 :
3028 32 : return nestlevel;
3029 : }
3030 :
3031 : /*
3032 : * Restore local GUCs after they have been overlaid with remote settings.
3033 : */
3034 : static void
3035 33 : restoreLocalGucs(int nestlevel)
3036 : {
3037 : /* Do nothing if no new nestlevel was created */
3038 33 : if (nestlevel > 0)
3039 8 : AtEOXact_GUC(true, nestlevel);
3040 33 : }
|