Age Owner TLA Line data Source code
1 : /*
2 : * dblink.c
3 : *
4 : * Functions returning results from a remote database
5 : *
6 : * Joe Conway <mail@joeconway.com>
7 : * And contributors:
8 : * Darko Prenosil <Darko.Prenosil@finteh.hr>
9 : * Shridhar Daithankar <shridhar_daithankar@persistent.co.in>
10 : *
11 : * contrib/dblink/dblink.c
12 : * Copyright (c) 2001-2023, PostgreSQL Global Development Group
13 : * ALL RIGHTS RESERVED;
14 : *
15 : * Permission to use, copy, modify, and distribute this software and its
16 : * documentation for any purpose, without fee, and without a written agreement
17 : * is hereby granted, provided that the above copyright notice and this
18 : * paragraph and the following two paragraphs appear in all copies.
19 : *
20 : * IN NO EVENT SHALL THE AUTHOR OR DISTRIBUTORS BE LIABLE TO ANY PARTY FOR
21 : * DIRECT, INDIRECT, SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES, INCLUDING
22 : * LOST PROFITS, ARISING OUT OF THE USE OF THIS SOFTWARE AND ITS
23 : * DOCUMENTATION, EVEN IF THE AUTHOR OR DISTRIBUTORS HAVE BEEN ADVISED OF THE
24 : * POSSIBILITY OF SUCH DAMAGE.
25 : *
26 : * THE AUTHOR AND DISTRIBUTORS SPECIFICALLY DISCLAIMS ANY WARRANTIES,
27 : * INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY
28 : * AND FITNESS FOR A PARTICULAR PURPOSE. THE SOFTWARE PROVIDED HEREUNDER IS
29 : * ON AN "AS IS" BASIS, AND THE AUTHOR AND DISTRIBUTORS HAS NO OBLIGATIONS TO
30 : * PROVIDE MAINTENANCE, SUPPORT, UPDATES, ENHANCEMENTS, OR MODIFICATIONS.
31 : *
32 : */
33 : #include "postgres.h"
34 :
35 : #include <limits.h>
36 :
37 : #include "access/htup_details.h"
38 : #include "access/relation.h"
39 : #include "access/reloptions.h"
40 : #include "access/table.h"
41 : #include "catalog/namespace.h"
42 : #include "catalog/pg_foreign_data_wrapper.h"
43 : #include "catalog/pg_foreign_server.h"
44 : #include "catalog/pg_type.h"
45 : #include "catalog/pg_user_mapping.h"
46 : #include "executor/spi.h"
47 : #include "foreign/foreign.h"
48 : #include "funcapi.h"
49 : #include "lib/stringinfo.h"
50 : #include "libpq-fe.h"
51 : #include "libpq/libpq-be-fe-helpers.h"
52 : #include "mb/pg_wchar.h"
53 : #include "miscadmin.h"
54 : #include "parser/scansup.h"
55 : #include "utils/acl.h"
56 : #include "utils/builtins.h"
57 : #include "utils/fmgroids.h"
58 : #include "utils/guc.h"
59 : #include "utils/lsyscache.h"
60 : #include "utils/memutils.h"
61 : #include "utils/rel.h"
62 : #include "utils/varlena.h"
63 :
6158 tgl 64 GIC 3 : PG_MODULE_MAGIC;
6158 tgl 65 ECB :
66 : typedef struct remoteConn
67 : {
68 : PGconn *conn; /* Hold the remote connection */
69 : int openCursorCount; /* The number of open cursors */
70 : bool newXactForCursor; /* Opened a transaction for a cursor */
71 : } remoteConn;
72 :
73 : typedef struct storeInfo
74 : {
75 : FunctionCallInfo fcinfo;
76 : Tuplestorestate *tuplestore;
77 : AttInMetadata *attinmeta;
78 : MemoryContext tmpcontext;
79 : char **cstrs;
80 : /* temp storage for results to avoid leaks on exception */
81 : PGresult *last_res;
82 : PGresult *cur_res;
83 : } storeInfo;
84 :
85 : /*
86 : * Internal declarations
87 : */
88 : static Datum dblink_record_internal(FunctionCallInfo fcinfo, bool is_async);
89 : static void prepTuplestoreResult(FunctionCallInfo fcinfo);
90 : static void materializeResult(FunctionCallInfo fcinfo, PGconn *conn,
91 : PGresult *res);
92 : static void materializeQueryResult(FunctionCallInfo fcinfo,
93 : PGconn *conn,
94 : const char *conname,
95 : const char *sql,
96 : bool fail);
97 : static PGresult *storeQueryResult(volatile storeInfo *sinfo, PGconn *conn, const char *sql);
98 : static void storeRow(volatile storeInfo *sinfo, PGresult *res, bool first);
99 : static remoteConn *getConnectionByName(const char *name);
100 : static HTAB *createConnHash(void);
101 : static void createNewConnection(const char *name, remoteConn *rconn);
102 : static void deleteConnection(const char *name);
103 : static char **get_pkey_attnames(Relation rel, int16 *indnkeyatts);
104 : static char **get_text_array_contents(ArrayType *array, int *numitems);
105 : static char *get_sql_insert(Relation rel, int *pkattnums, int pknumatts, char **src_pkattvals, char **tgt_pkattvals);
106 : static char *get_sql_delete(Relation rel, int *pkattnums, int pknumatts, char **tgt_pkattvals);
107 : static char *get_sql_update(Relation rel, int *pkattnums, int pknumatts, char **src_pkattvals, char **tgt_pkattvals);
108 : static char *quote_ident_cstr(char *rawstr);
109 : static int get_attnum_pk_pos(int *pkattnums, int pknumatts, int key);
110 : static HeapTuple get_tuple_of_interest(Relation rel, int *pkattnums, int pknumatts, char **src_pkattvals);
111 : static Relation get_rel_from_relname(text *relname_text, LOCKMODE lockmode, AclMode aclmode);
112 : static char *generate_relation_name(Relation rel);
113 : static void dblink_connstr_check(const char *connstr);
114 : static void dblink_security_check(PGconn *conn, remoteConn *rconn);
115 : static void dblink_res_error(PGconn *conn, const char *conname, PGresult *res,
116 : bool fail, const char *fmt,...) pg_attribute_printf(5, 6);
117 : static char *get_connect_string(const char *servername);
118 : static char *escape_param_str(const char *str);
119 : static void validate_pkattnums(Relation rel,
120 : int2vector *pkattnums_arg, int32 pknumatts_arg,
121 : int **pkattnums, int *pknumatts);
122 : static bool is_valid_dblink_option(const PQconninfoOption *options,
123 : const char *option, Oid context);
124 : static int applyRemoteGucs(PGconn *conn);
125 : static void restoreLocalGucs(int nestlevel);
126 :
127 : /* Global */
128 : static remoteConn *pconn = NULL;
129 : static HTAB *remoteConnHash = NULL;
130 :
131 : /*
132 : * Following is list that holds multiple remote connections.
133 : * Calling convention of each dblink function changes to accept
134 : * connection name as the first parameter. The connection list is
135 : * much like ecpg e.g. a mapping between a name and a PGconn object.
136 : */
137 :
138 : typedef struct remoteConnHashEnt
139 : {
140 : char name[NAMEDATALEN];
141 : remoteConn *rconn;
142 : } remoteConnHashEnt;
143 :
144 : /* initial number of connection hashes */
145 : #define NUMCONN 16
146 :
147 : static char *
2296 peter_e 148 GIC 48 : xpstrdup(const char *in)
2296 peter_e 149 ECB : {
2296 peter_e 150 GIC 48 : if (in == NULL)
2296 peter_e 151 CBC 36 : return NULL;
152 12 : return pstrdup(in);
2296 peter_e 153 ECB : }
154 :
155 : static void
156 : pg_attribute_noreturn()
2296 peter_e 157 UIC 0 : dblink_res_internalerror(PGconn *conn, PGresult *res, const char *p2)
2296 peter_e 158 EUB : {
2296 peter_e 159 UIC 0 : char *msg = pchomp(PQerrorMessage(conn));
2153 bruce 160 EUB :
280 peter 161 UNC 0 : PQclear(res);
2296 peter_e 162 UBC 0 : elog(ERROR, "%s: %s", p2, msg);
163 : }
164 :
165 : static void
166 : pg_attribute_noreturn()
2296 peter_e 167 CBC 3 : dblink_conn_not_avail(const char *conname)
168 : {
169 3 : if (conname)
170 1 : ereport(ERROR,
171 : (errcode(ERRCODE_CONNECTION_DOES_NOT_EXIST),
172 : errmsg("connection \"%s\" not available", conname)));
173 : else
174 2 : ereport(ERROR,
175 : (errcode(ERRCODE_CONNECTION_DOES_NOT_EXIST),
176 : errmsg("connection not available")));
177 : }
178 :
179 : static void
180 33 : dblink_get_conn(char *conname_or_str,
181 : PGconn *volatile *conn_p, char **conname_p, volatile bool *freeconn_p)
182 : {
183 33 : remoteConn *rconn = getConnectionByName(conname_or_str);
184 : PGconn *conn;
185 : char *conname;
186 : bool freeconn;
187 :
188 33 : if (rconn)
189 : {
190 27 : conn = rconn->conn;
191 27 : conname = conname_or_str;
192 27 : freeconn = false;
193 : }
194 : else
195 : {
196 : const char *connstr;
197 :
198 6 : connstr = get_connect_string(conname_or_str);
199 6 : if (connstr == NULL)
200 6 : connstr = conname_or_str;
201 6 : dblink_connstr_check(connstr);
202 :
1140 tgl 203 ECB : /* OK to make connection */
76 andres 204 GNC 6 : conn = libpqsrv_connect(connstr, PG_WAIT_EXTENSION);
205 :
2296 peter_e 206 GIC 6 : if (PQstatus(conn) == CONNECTION_BAD)
2296 peter_e 207 ECB : {
2296 peter_e 208 GIC 2 : char *msg = pchomp(PQerrorMessage(conn));
2153 bruce 209 ECB :
76 andres 210 GNC 2 : libpqsrv_disconnect(conn);
2296 peter_e 211 CBC 2 : ereport(ERROR,
212 : (errcode(ERRCODE_SQLCLIENT_UNABLE_TO_ESTABLISH_SQLCONNECTION),
2118 tgl 213 EUB : errmsg("could not establish connection"),
214 : errdetail_internal("%s", msg)));
215 : }
1 sfrost 216 GIC 4 : dblink_security_check(conn, rconn);
2296 peter_e 217 4 : if (PQclientEncoding(conn) != GetDatabaseEncoding())
2296 peter_e 218 LBC 0 : PQsetClientEncoding(conn, GetDatabaseEncodingName());
2296 peter_e 219 GIC 4 : freeconn = true;
2296 peter_e 220 CBC 4 : conname = NULL;
221 : }
2296 peter_e 222 ECB :
2296 peter_e 223 CBC 31 : *conn_p = conn;
224 31 : *conname_p = conname;
225 31 : *freeconn_p = freeconn;
2296 peter_e 226 GIC 31 : }
2296 peter_e 227 ECB :
228 : static PGconn *
2296 peter_e 229 GIC 17 : dblink_get_named_conn(const char *conname)
230 : {
231 17 : remoteConn *rconn = getConnectionByName(conname);
2153 bruce 232 ECB :
2296 peter_e 233 GIC 17 : if (rconn)
2296 peter_e 234 CBC 17 : return rconn->conn;
235 :
2218 peter_e 236 LBC 0 : dblink_conn_not_avail(conname);
2153 bruce 237 ECB : return NULL; /* keep compiler quiet */
2296 peter_e 238 : }
239 :
240 : static void
2296 peter_e 241 CBC 120 : dblink_init(void)
242 : {
243 120 : if (!pconn)
244 : {
245 3 : pconn = (remoteConn *) MemoryContextAlloc(TopMemoryContext, sizeof(remoteConn));
2296 peter_e 246 GIC 3 : pconn->conn = NULL;
2296 peter_e 247 CBC 3 : pconn->openCursorCount = 0;
2062 248 3 : pconn->newXactForCursor = false;
249 : }
2296 250 120 : }
7524 bruce 251 ECB :
252 : /*
253 : * Create a persistent connection to another database
254 : */
7524 bruce 255 CBC 9 : PG_FUNCTION_INFO_V1(dblink_connect);
256 : Datum
257 16 : dblink_connect(PG_FUNCTION_ARGS)
7524 bruce 258 ECB : {
5055 mail 259 CBC 16 : char *conname_or_str = NULL;
7228 bruce 260 GIC 16 : char *connstr = NULL;
261 16 : char *connname = NULL;
262 : char *msg;
7228 bruce 263 CBC 16 : PGconn *conn = NULL;
6392 264 16 : remoteConn *rconn = NULL;
7524 bruce 265 ECB :
2296 peter_e 266 GIC 16 : dblink_init();
267 :
7188 bruce 268 CBC 16 : if (PG_NARGS() == 2)
269 : {
5055 mail 270 GIC 11 : conname_or_str = text_to_cstring(PG_GETARG_TEXT_PP(1));
5493 tgl 271 CBC 11 : connname = text_to_cstring(PG_GETARG_TEXT_PP(0));
272 : }
7188 bruce 273 5 : else if (PG_NARGS() == 1)
5055 mail 274 GIC 5 : conname_or_str = text_to_cstring(PG_GETARG_TEXT_PP(0));
7524 bruce 275 EUB :
7188 bruce 276 GBC 16 : if (connname)
1046 mail 277 EUB : {
5243 tgl 278 GBC 11 : rconn = (remoteConn *) MemoryContextAlloc(TopMemoryContext,
279 : sizeof(remoteConn));
1046 mail 280 11 : rconn->conn = NULL;
1046 mail 281 GIC 11 : rconn->openCursorCount = 0;
282 11 : rconn->newXactForCursor = false;
283 : }
284 :
285 : /* first check for valid foreign data server */
5055 286 16 : connstr = get_connect_string(conname_or_str);
5055 mail 287 CBC 16 : if (connstr == NULL)
5055 mail 288 GIC 14 : connstr = conname_or_str;
289 :
5312 tgl 290 ECB : /* check password in connection string if not superuser */
5312 tgl 291 GBC 16 : dblink_connstr_check(connstr);
292 :
1140 tgl 293 ECB : /* OK to make connection */
76 andres 294 GNC 15 : conn = libpqsrv_connect(connstr, PG_WAIT_EXTENSION);
7228 bruce 295 ECB :
7228 bruce 296 GIC 15 : if (PQstatus(conn) == CONNECTION_BAD)
7524 bruce 297 ECB : {
2232 peter_e 298 UIC 0 : msg = pchomp(PQerrorMessage(conn));
76 andres 299 UNC 0 : libpqsrv_disconnect(conn);
6392 bruce 300 LBC 0 : if (rconn)
301 0 : pfree(rconn);
7199 tgl 302 ECB :
7199 tgl 303 LBC 0 : ereport(ERROR,
304 : (errcode(ERRCODE_SQLCLIENT_UNABLE_TO_ESTABLISH_SQLCONNECTION),
305 : errmsg("could not establish connection"),
4285 tgl 306 ECB : errdetail_internal("%s", msg)));
307 : }
7524 bruce 308 :
5312 tgl 309 : /* check password actually used if not superuser */
1 sfrost 310 GIC 15 : dblink_security_check(conn, rconn);
5754 mail 311 ECB :
3410 312 : /* attempt to set client encoding to match server encoding, if needed */
3410 mail 313 GIC 15 : if (PQclientEncoding(conn) != GetDatabaseEncoding())
3410 mail 314 LBC 0 : PQsetClientEncoding(conn, GetDatabaseEncodingName());
5052 mail 315 ECB :
7188 bruce 316 GIC 15 : if (connname)
317 : {
6392 bruce 318 CBC 10 : rconn->conn = conn;
6392 bruce 319 GIC 10 : createNewConnection(connname, rconn);
7228 bruce 320 ECB : }
321 : else
322 : {
2220 mail 323 GIC 5 : if (pconn->conn)
68 andres 324 GNC 1 : libpqsrv_disconnect(pconn->conn);
6382 mail 325 CBC 5 : pconn->conn = conn;
326 : }
7228 bruce 327 ECB :
5493 tgl 328 GIC 14 : PG_RETURN_TEXT_P(cstring_to_text("OK"));
7524 bruce 329 ECB : }
330 :
331 : /*
332 : * Clear a persistent connection to another database
333 : */
7524 bruce 334 CBC 6 : PG_FUNCTION_INFO_V1(dblink_disconnect);
335 : Datum
336 13 : dblink_disconnect(PG_FUNCTION_ARGS)
7524 bruce 337 ECB : {
7199 tgl 338 GIC 13 : char *conname = NULL;
6392 bruce 339 CBC 13 : remoteConn *rconn = NULL;
7228 bruce 340 GIC 13 : PGconn *conn = NULL;
341 :
2296 peter_e 342 CBC 13 : dblink_init();
6382 mail 343 ECB :
7188 bruce 344 CBC 13 : if (PG_NARGS() == 1)
345 : {
5493 tgl 346 9 : conname = text_to_cstring(PG_GETARG_TEXT_PP(0));
6392 bruce 347 GIC 9 : rconn = getConnectionByName(conname);
348 9 : if (rconn)
6392 bruce 349 CBC 8 : conn = rconn->conn;
350 : }
7228 bruce 351 ECB : else
6382 mail 352 CBC 4 : conn = pconn->conn;
7524 bruce 353 ECB :
7228 bruce 354 CBC 13 : if (!conn)
2296 peter_e 355 GIC 1 : dblink_conn_not_avail(conname);
356 :
76 andres 357 GNC 12 : libpqsrv_disconnect(conn);
6392 bruce 358 CBC 12 : if (rconn)
7228 bruce 359 ECB : {
7199 tgl 360 CBC 8 : deleteConnection(conname);
6392 bruce 361 GIC 8 : pfree(rconn);
362 : }
7072 mail 363 ECB : else
6382 mail 364 GIC 4 : pconn->conn = NULL;
365 :
5493 tgl 366 CBC 12 : PG_RETURN_TEXT_P(cstring_to_text("OK"));
7524 bruce 367 ECB : }
368 :
369 : /*
370 : * opens a cursor using a persistent connection
371 : */
7524 bruce 372 GIC 9 : PG_FUNCTION_INFO_V1(dblink_open);
7524 bruce 373 ECB : Datum
7524 bruce 374 GBC 9 : dblink_open(PG_FUNCTION_ARGS)
375 : {
7522 bruce 376 CBC 9 : PGresult *res = NULL;
377 : PGconn *conn;
7228 bruce 378 GIC 9 : char *curname = NULL;
7228 bruce 379 CBC 9 : char *sql = NULL;
7228 bruce 380 GIC 9 : char *conname = NULL;
6248 neilc 381 ECB : StringInfoData buf;
6392 bruce 382 CBC 9 : remoteConn *rconn = NULL;
6972 mail 383 GBC 9 : bool fail = true; /* default to backward compatible behavior */
7524 bruce 384 ECB :
2296 peter_e 385 CBC 9 : dblink_init();
6248 neilc 386 GIC 9 : initStringInfo(&buf);
387 :
7188 bruce 388 9 : if (PG_NARGS() == 2)
389 : {
390 : /* text,text */
5493 tgl 391 2 : curname = text_to_cstring(PG_GETARG_TEXT_PP(0));
5493 tgl 392 CBC 2 : sql = text_to_cstring(PG_GETARG_TEXT_PP(1));
6382 mail 393 GIC 2 : rconn = pconn;
394 : }
7188 bruce 395 7 : else if (PG_NARGS() == 3)
7228 bruce 396 ECB : {
6972 mail 397 : /* might be text,text,text or text,text,bool */
6972 mail 398 GIC 6 : if (get_fn_expr_argtype(fcinfo->flinfo, 2) == BOOLOID)
6972 mail 399 ECB : {
5493 tgl 400 CBC 1 : curname = text_to_cstring(PG_GETARG_TEXT_PP(0));
401 1 : sql = text_to_cstring(PG_GETARG_TEXT_PP(1));
6972 mail 402 GIC 1 : fail = PG_GETARG_BOOL(2);
6382 mail 403 CBC 1 : rconn = pconn;
404 : }
6972 mail 405 ECB : else
406 : {
5493 tgl 407 GIC 5 : conname = text_to_cstring(PG_GETARG_TEXT_PP(0));
5493 tgl 408 CBC 5 : curname = text_to_cstring(PG_GETARG_TEXT_PP(1));
409 5 : sql = text_to_cstring(PG_GETARG_TEXT_PP(2));
6392 bruce 410 GIC 5 : rconn = getConnectionByName(conname);
411 : }
412 : }
6972 mail 413 1 : else if (PG_NARGS() == 4)
414 : {
6972 mail 415 ECB : /* text,text,text,bool */
5493 tgl 416 GIC 1 : conname = text_to_cstring(PG_GETARG_TEXT_PP(0));
5493 tgl 417 CBC 1 : curname = text_to_cstring(PG_GETARG_TEXT_PP(1));
5493 tgl 418 GIC 1 : sql = text_to_cstring(PG_GETARG_TEXT_PP(2));
6972 mail 419 1 : fail = PG_GETARG_BOOL(3);
6392 bruce 420 CBC 1 : rconn = getConnectionByName(conname);
7228 bruce 421 ECB : }
422 :
6382 mail 423 GIC 9 : if (!rconn || !rconn->conn)
2296 peter_e 424 LBC 0 : dblink_conn_not_avail(conname);
2195 peter_e 425 ECB :
2195 peter_e 426 GIC 9 : conn = rconn->conn;
7524 bruce 427 ECB :
6347 428 : /* If we are not in a transaction, start one */
6382 mail 429 GIC 9 : if (PQtransactionStatus(conn) == PQTRANS_IDLE)
6382 mail 430 ECB : {
6382 mail 431 GIC 7 : res = PQexec(conn, "BEGIN");
432 7 : if (PQresultStatus(res) != PGRES_COMMAND_OK)
2296 peter_e 433 UBC 0 : dblink_res_internalerror(conn, res, "begin error");
6382 mail 434 GBC 7 : PQclear(res);
2062 peter_e 435 GIC 7 : rconn->newXactForCursor = true;
6031 bruce 436 ECB :
437 : /*
438 : * Since transaction state was IDLE, we force cursor count to
439 : * initially be 0. This is needed as a previous ABORT might have wiped
440 : * out our transaction without maintaining the cursor count for us.
6136 mail 441 : */
6136 mail 442 CBC 7 : rconn->openCursorCount = 0;
6382 mail 443 ECB : }
444 :
445 : /* if we started a transaction, increment cursor count */
6382 mail 446 GIC 9 : if (rconn->newXactForCursor)
6382 mail 447 CBC 9 : (rconn->openCursorCount)++;
7524 bruce 448 ECB :
6248 neilc 449 CBC 9 : appendStringInfo(&buf, "DECLARE %s CURSOR FOR %s", curname, sql);
6248 neilc 450 GIC 9 : res = PQexec(conn, buf.data);
6972 mail 451 9 : if (!res || PQresultStatus(res) != PGRES_COMMAND_OK)
6972 mail 452 ECB : {
1844 tgl 453 GIC 2 : dblink_res_error(conn, conname, res, fail,
454 : "while opening cursor \"%s\"", curname);
5393 mail 455 GBC 2 : PG_RETURN_TEXT_P(cstring_to_text("ERROR"));
6972 mail 456 EUB : }
7524 bruce 457 :
7228 bruce 458 GBC 7 : PQclear(res);
5493 tgl 459 GIC 7 : PG_RETURN_TEXT_P(cstring_to_text("OK"));
460 : }
7655 bruce 461 ECB :
7524 bruce 462 EUB : /*
463 : * closes a cursor
7524 bruce 464 ECB : */
7524 bruce 465 GIC 6 : PG_FUNCTION_INFO_V1(dblink_close);
7524 bruce 466 ECB : Datum
7524 bruce 467 GIC 5 : dblink_close(PG_FUNCTION_ARGS)
468 : {
2195 peter_e 469 ECB : PGconn *conn;
7522 bruce 470 CBC 5 : PGresult *res = NULL;
7228 bruce 471 GIC 5 : char *curname = NULL;
7228 bruce 472 CBC 5 : char *conname = NULL;
473 : StringInfoData buf;
6392 474 5 : remoteConn *rconn = NULL;
6972 mail 475 GIC 5 : bool fail = true; /* default to backward compatible behavior */
476 :
2296 peter_e 477 CBC 5 : dblink_init();
6248 neilc 478 GIC 5 : initStringInfo(&buf);
479 :
7228 bruce 480 CBC 5 : if (PG_NARGS() == 1)
481 : {
6972 mail 482 ECB : /* text */
5493 tgl 483 UIC 0 : curname = text_to_cstring(PG_GETARG_TEXT_PP(0));
6382 mail 484 0 : rconn = pconn;
7228 bruce 485 ECB : }
7188 bruce 486 GIC 5 : else if (PG_NARGS() == 2)
7228 bruce 487 ECB : {
488 : /* might be text,text or text,bool */
6972 mail 489 CBC 5 : if (get_fn_expr_argtype(fcinfo->flinfo, 1) == BOOLOID)
6972 mail 490 ECB : {
5493 tgl 491 GBC 2 : curname = text_to_cstring(PG_GETARG_TEXT_PP(0));
6972 mail 492 CBC 2 : fail = PG_GETARG_BOOL(1);
6382 mail 493 GIC 2 : rconn = pconn;
494 : }
495 : else
6972 mail 496 ECB : {
5493 tgl 497 GIC 3 : conname = text_to_cstring(PG_GETARG_TEXT_PP(0));
498 3 : curname = text_to_cstring(PG_GETARG_TEXT_PP(1));
6392 bruce 499 3 : rconn = getConnectionByName(conname);
500 : }
501 : }
6972 mail 502 CBC 5 : if (PG_NARGS() == 3)
503 : {
6972 mail 504 ECB : /* text,text,bool */
5493 tgl 505 UIC 0 : conname = text_to_cstring(PG_GETARG_TEXT_PP(0));
5493 tgl 506 LBC 0 : curname = text_to_cstring(PG_GETARG_TEXT_PP(1));
6972 mail 507 0 : fail = PG_GETARG_BOOL(2);
6392 bruce 508 0 : rconn = getConnectionByName(conname);
7228 bruce 509 ECB : }
510 :
6382 mail 511 CBC 5 : if (!rconn || !rconn->conn)
2296 peter_e 512 LBC 0 : dblink_conn_not_avail(conname);
2195 peter_e 513 ECB :
2195 peter_e 514 GIC 5 : conn = rconn->conn;
7524 bruce 515 ECB :
6248 neilc 516 GIC 5 : appendStringInfo(&buf, "CLOSE %s", curname);
7524 bruce 517 ECB :
518 : /* close the cursor */
6248 neilc 519 CBC 5 : res = PQexec(conn, buf.data);
7522 bruce 520 GIC 5 : if (!res || PQresultStatus(res) != PGRES_COMMAND_OK)
521 : {
1844 tgl 522 CBC 1 : dblink_res_error(conn, conname, res, fail,
1844 tgl 523 ECB : "while closing cursor \"%s\"", curname);
5393 mail 524 CBC 1 : PG_RETURN_TEXT_P(cstring_to_text("ERROR"));
6972 mail 525 ECB : }
526 :
7524 bruce 527 CBC 4 : PQclear(res);
7524 bruce 528 ECB :
6382 mail 529 : /* if we started a transaction, decrement cursor count */
6382 mail 530 GIC 4 : if (rconn->newXactForCursor)
6382 mail 531 ECB : {
6382 mail 532 GIC 4 : (rconn->openCursorCount)--;
533 :
6382 mail 534 ECB : /* if count is zero, commit the transaction */
6382 mail 535 GIC 4 : if (rconn->openCursorCount == 0)
6382 mail 536 ECB : {
2062 peter_e 537 CBC 2 : rconn->newXactForCursor = false;
6382 mail 538 ECB :
6382 mail 539 CBC 2 : res = PQexec(conn, "COMMIT");
6382 mail 540 GIC 2 : if (PQresultStatus(res) != PGRES_COMMAND_OK)
2296 peter_e 541 UIC 0 : dblink_res_internalerror(conn, res, "commit error");
6382 mail 542 GIC 2 : PQclear(res);
6382 mail 543 ECB : }
544 : }
7524 bruce 545 :
5493 tgl 546 GIC 4 : PG_RETURN_TEXT_P(cstring_to_text("OK"));
7524 bruce 547 ECB : }
548 :
549 : /*
550 : * Fetch results from an open cursor
551 : */
7524 bruce 552 CBC 9 : PG_FUNCTION_INFO_V1(dblink_fetch);
553 : Datum
7524 bruce 554 GIC 13 : dblink_fetch(PG_FUNCTION_ARGS)
7524 bruce 555 ECB : {
4790 bruce 556 CBC 13 : PGresult *res = NULL;
557 13 : char *conname = NULL;
4790 bruce 558 GIC 13 : remoteConn *rconn = NULL;
559 13 : PGconn *conn = NULL;
4790 bruce 560 ECB : StringInfoData buf;
4790 bruce 561 GBC 13 : char *curname = NULL;
4790 bruce 562 GIC 13 : int howmany = 0;
4790 bruce 563 CBC 13 : bool fail = true; /* default to backward compatible */
7524 bruce 564 ECB :
4023 tgl 565 GIC 13 : prepTuplestoreResult(fcinfo);
566 :
2296 peter_e 567 13 : dblink_init();
568 :
4823 mail 569 13 : if (PG_NARGS() == 4)
570 : {
4823 mail 571 ECB : /* text,text,int,bool */
4823 mail 572 CBC 1 : conname = text_to_cstring(PG_GETARG_TEXT_PP(0));
573 1 : curname = text_to_cstring(PG_GETARG_TEXT_PP(1));
574 1 : howmany = PG_GETARG_INT32(2);
4823 mail 575 GIC 1 : fail = PG_GETARG_BOOL(3);
7228 bruce 576 ECB :
4823 mail 577 GIC 1 : rconn = getConnectionByName(conname);
4823 mail 578 CBC 1 : if (rconn)
4823 mail 579 GIC 1 : conn = rconn->conn;
4823 mail 580 ECB : }
4823 mail 581 GIC 12 : else if (PG_NARGS() == 3)
582 : {
4823 mail 583 EUB : /* text,text,int or text,int,bool */
4823 mail 584 GBC 8 : if (get_fn_expr_argtype(fcinfo->flinfo, 2) == BOOLOID)
585 : {
4823 mail 586 GIC 2 : curname = text_to_cstring(PG_GETARG_TEXT_PP(0));
587 2 : howmany = PG_GETARG_INT32(1);
588 2 : fail = PG_GETARG_BOOL(2);
4823 mail 589 CBC 2 : conn = pconn->conn;
4823 mail 590 ECB : }
591 : else
592 : {
5493 tgl 593 GIC 6 : conname = text_to_cstring(PG_GETARG_TEXT_PP(0));
594 6 : curname = text_to_cstring(PG_GETARG_TEXT_PP(1));
7228 bruce 595 6 : howmany = PG_GETARG_INT32(2);
7228 bruce 596 ECB :
6392 bruce 597 GIC 6 : rconn = getConnectionByName(conname);
6392 bruce 598 CBC 6 : if (rconn)
6392 bruce 599 GIC 6 : conn = rconn->conn;
7228 bruce 600 ECB : }
601 : }
4823 mail 602 GIC 4 : else if (PG_NARGS() == 2)
4823 mail 603 ECB : {
604 : /* text,int */
4823 mail 605 CBC 4 : curname = text_to_cstring(PG_GETARG_TEXT_PP(0));
4823 mail 606 GIC 4 : howmany = PG_GETARG_INT32(1);
607 4 : conn = pconn->conn;
608 : }
609 :
610 13 : if (!conn)
2296 peter_e 611 LBC 0 : dblink_conn_not_avail(conname);
612 :
4823 mail 613 CBC 13 : initStringInfo(&buf);
614 13 : appendStringInfo(&buf, "FETCH %d FROM %s", howmany, curname);
615 :
616 : /*
617 : * Try to execute the query. Note that since libpq uses malloc, the
4790 bruce 618 EUB : * PGresult will be long-lived even though we are still in a short-lived
619 : * memory context.
620 : */
4823 mail 621 CBC 13 : res = PQexec(conn, buf.data);
622 26 : if (!res ||
4823 mail 623 GBC 26 : (PQresultStatus(res) != PGRES_COMMAND_OK &&
4823 mail 624 GIC 13 : PQresultStatus(res) != PGRES_TUPLES_OK))
7522 bruce 625 ECB : {
1844 tgl 626 GIC 5 : dblink_res_error(conn, conname, res, fail,
627 : "while fetching from cursor \"%s\"", curname);
4823 mail 628 CBC 3 : return (Datum) 0;
629 : }
630 8 : else if (PQresultStatus(res) == PGRES_COMMAND_OK)
631 : {
4823 mail 632 ECB : /* cursor does not exist - closed already or bad name */
7524 bruce 633 UIC 0 : PQclear(res);
4823 mail 634 0 : ereport(ERROR,
635 : (errcode(ERRCODE_INVALID_CURSOR_NAME),
4823 mail 636 ECB : errmsg("cursor \"%s\" does not exist", curname)));
637 : }
638 :
3670 tgl 639 CBC 8 : materializeResult(fcinfo, conn, res);
4823 mail 640 GIC 7 : return (Datum) 0;
7524 bruce 641 ECB : }
642 :
643 : /*
644 : * Note: this is the new preferred version of dblink
645 : */
7524 bruce 646 GIC 11 : PG_FUNCTION_INFO_V1(dblink_record);
7524 bruce 647 ECB : Datum
7524 bruce 648 CBC 25 : dblink_record(PG_FUNCTION_ARGS)
6063 mail 649 ECB : {
5059 mail 650 GIC 25 : return dblink_record_internal(fcinfo, false);
6063 mail 651 ECB : }
652 :
6063 mail 653 CBC 3 : PG_FUNCTION_INFO_V1(dblink_send_query);
654 : Datum
6063 mail 655 GIC 6 : dblink_send_query(PG_FUNCTION_ARGS)
6063 mail 656 ECB : {
2296 peter_e 657 : PGconn *conn;
658 : char *sql;
5059 mail 659 : int retval;
660 :
5059 mail 661 CBC 6 : if (PG_NARGS() == 2)
662 : {
2296 peter_e 663 GIC 6 : conn = dblink_get_named_conn(text_to_cstring(PG_GETARG_TEXT_PP(0)));
5059 mail 664 CBC 6 : sql = text_to_cstring(PG_GETARG_TEXT_PP(1));
665 : }
5059 mail 666 ECB : else
667 : /* shouldn't happen */
5059 mail 668 LBC 0 : elog(ERROR, "wrong number of arguments");
669 :
670 : /* async query send */
5059 mail 671 GIC 6 : retval = PQsendQuery(conn, sql);
5059 mail 672 CBC 6 : if (retval != 1)
2232 peter_e 673 LBC 0 : elog(NOTICE, "could not send query: %s", pchomp(PQerrorMessage(conn)));
5059 mail 674 ECB :
5059 mail 675 GIC 6 : PG_RETURN_INT32(retval);
676 : }
6063 mail 677 ECB :
6063 mail 678 GIC 4 : PG_FUNCTION_INFO_V1(dblink_get_result);
679 : Datum
6063 mail 680 CBC 8 : dblink_get_result(PG_FUNCTION_ARGS)
6063 mail 681 ECB : {
5059 mail 682 GIC 8 : return dblink_record_internal(fcinfo, true);
683 : }
684 :
6063 mail 685 EUB : static Datum
5059 mail 686 GIC 33 : dblink_record_internal(FunctionCallInfo fcinfo, bool is_async)
687 : {
4022 tgl 688 33 : PGconn *volatile conn = NULL;
689 33 : volatile bool freeconn = false;
4823 mail 690 ECB :
4023 tgl 691 GIC 33 : prepTuplestoreResult(fcinfo);
7524 bruce 692 ECB :
2296 peter_e 693 GIC 33 : dblink_init();
694 :
4022 tgl 695 GBC 33 : PG_TRY();
7522 bruce 696 EUB : {
4022 tgl 697 GIC 33 : char *sql = NULL;
4022 tgl 698 CBC 33 : char *conname = NULL;
4022 tgl 699 GIC 33 : bool fail = true; /* default to backward compatible */
700 :
4022 tgl 701 CBC 33 : if (!is_async)
702 : {
4022 tgl 703 GIC 25 : if (PG_NARGS() == 3)
704 : {
4022 tgl 705 EUB : /* text,text,bool */
2203 peter_e 706 GIC 1 : conname = text_to_cstring(PG_GETARG_TEXT_PP(0));
4022 tgl 707 1 : sql = text_to_cstring(PG_GETARG_TEXT_PP(1));
4022 tgl 708 CBC 1 : fail = PG_GETARG_BOOL(2);
2203 peter_e 709 1 : dblink_get_conn(conname, &conn, &conname, &freeconn);
710 : }
4022 tgl 711 24 : else if (PG_NARGS() == 2)
712 : {
713 : /* text,text or text,bool */
714 17 : if (get_fn_expr_argtype(fcinfo->flinfo, 1) == BOOLOID)
715 : {
4022 tgl 716 GIC 1 : sql = text_to_cstring(PG_GETARG_TEXT_PP(0));
717 1 : fail = PG_GETARG_BOOL(1);
2203 peter_e 718 1 : conn = pconn->conn;
4022 tgl 719 ECB : }
720 : else
721 : {
2203 peter_e 722 CBC 16 : conname = text_to_cstring(PG_GETARG_TEXT_PP(0));
4022 tgl 723 GIC 16 : sql = text_to_cstring(PG_GETARG_TEXT_PP(1));
2203 peter_e 724 CBC 16 : dblink_get_conn(conname, &conn, &conname, &freeconn);
4022 tgl 725 ECB : }
726 : }
4022 tgl 727 GBC 7 : else if (PG_NARGS() == 1)
728 : {
729 : /* text */
4823 mail 730 GIC 7 : conn = pconn->conn;
731 7 : sql = text_to_cstring(PG_GETARG_TEXT_PP(0));
732 : }
4823 mail 733 ECB : else
734 : /* shouldn't happen */
4022 tgl 735 UIC 0 : elog(ERROR, "wrong number of arguments");
736 : }
737 : else /* is_async */
4022 tgl 738 ECB : {
739 : /* get async result */
2203 peter_e 740 GIC 8 : conname = text_to_cstring(PG_GETARG_TEXT_PP(0));
2203 peter_e 741 ECB :
4022 tgl 742 CBC 8 : if (PG_NARGS() == 2)
743 : {
4022 tgl 744 ECB : /* text,bool */
4022 tgl 745 UIC 0 : fail = PG_GETARG_BOOL(1);
2203 peter_e 746 LBC 0 : conn = dblink_get_named_conn(conname);
747 : }
4022 tgl 748 GIC 8 : else if (PG_NARGS() == 1)
749 : {
750 : /* text */
2203 peter_e 751 8 : conn = dblink_get_named_conn(conname);
752 : }
753 : else
754 : /* shouldn't happen */
4022 tgl 755 UIC 0 : elog(ERROR, "wrong number of arguments");
7524 bruce 756 ECB : }
757 :
4022 tgl 758 CBC 31 : if (!conn)
2296 peter_e 759 GIC 2 : dblink_conn_not_avail(conname);
760 :
4022 tgl 761 CBC 29 : if (!is_async)
4823 mail 762 EUB : {
763 : /* synchronous query, use efficient tuple collection method */
4022 tgl 764 GIC 21 : materializeQueryResult(fcinfo, conn, conname, sql, fail);
4823 mail 765 ECB : }
5050 bruce 766 EUB : else
767 : {
768 : /* async result retrieval, do it the old way */
4022 tgl 769 GIC 8 : PGresult *res = PQgetResult(conn);
770 :
4022 tgl 771 ECB : /* NULL means we're all done with the async results */
4022 tgl 772 GIC 8 : if (res)
773 : {
4022 tgl 774 CBC 5 : if (PQresultStatus(res) != PGRES_COMMAND_OK &&
775 5 : PQresultStatus(res) != PGRES_TUPLES_OK)
4022 tgl 776 ECB : {
1844 tgl 777 UIC 0 : dblink_res_error(conn, conname, res, fail,
778 : "while executing query");
779 : /* if fail isn't set, we'll return an empty query result */
780 : }
781 : else
782 : {
3670 tgl 783 GIC 5 : materializeResult(fcinfo, conn, res);
4022 tgl 784 ECB : }
785 : }
5050 bruce 786 : }
787 : }
1255 peter 788 GIC 4 : PG_FINALLY();
4823 mail 789 ECB : {
790 : /* if needed, close the connection to the database */
4022 tgl 791 CBC 33 : if (freeconn)
76 andres 792 GNC 3 : libpqsrv_disconnect(conn);
793 : }
4022 tgl 794 GIC 33 : PG_END_TRY();
4823 mail 795 ECB :
4823 mail 796 GIC 29 : return (Datum) 0;
4823 mail 797 EUB : }
798 :
799 : /*
800 : * Verify function caller can handle a tuplestore result, and set up for that.
801 : *
802 : * Note: if the caller returns without actually creating a tuplestore, the
4023 tgl 803 : * executor will treat the function result as an empty set.
804 : */
805 : static void
4023 tgl 806 GBC 46 : prepTuplestoreResult(FunctionCallInfo fcinfo)
4023 tgl 807 EUB : {
4023 tgl 808 GIC 46 : ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
809 :
810 : /* check to see if query supports us returning a tuplestore */
4023 tgl 811 CBC 46 : if (rsinfo == NULL || !IsA(rsinfo, ReturnSetInfo))
4023 tgl 812 UIC 0 : ereport(ERROR,
4023 tgl 813 ECB : (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
814 : errmsg("set-valued function called in context that cannot accept a set")));
4023 tgl 815 GIC 46 : if (!(rsinfo->allowedModes & SFRM_Materialize))
4023 tgl 816 LBC 0 : ereport(ERROR,
817 : (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
4023 tgl 818 ECB : errmsg("materialize mode required, but it is not allowed in this context")));
819 :
820 : /* let the executor know we're sending back a tuplestore */
4023 tgl 821 GBC 46 : rsinfo->returnMode = SFRM_Materialize;
822 :
4023 tgl 823 EUB : /* caller must fill these to return a non-empty result */
4023 tgl 824 GIC 46 : rsinfo->setResult = NULL;
825 46 : rsinfo->setDesc = NULL;
826 46 : }
827 :
4023 tgl 828 EUB : /*
829 : * Copy the contents of the PGresult into a tuplestore to be returned
830 : * as the result of the current function.
831 : * The PGresult will be released in this function.
832 : */
833 : static void
3670 tgl 834 GIC 13 : materializeResult(FunctionCallInfo fcinfo, PGconn *conn, PGresult *res)
4823 mail 835 ECB : {
4790 bruce 836 CBC 13 : ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
4823 mail 837 ECB :
838 : /* prepTuplestoreResult must have been called previously */
4823 mail 839 GIC 13 : Assert(rsinfo->returnMode == SFRM_Materialize);
840 :
841 13 : PG_TRY();
842 : {
4823 mail 843 ECB : TupleDesc tupdesc;
3670 tgl 844 EUB : bool is_sql_cmd;
845 : int ntuples;
846 : int nfields;
847 :
5050 bruce 848 GIC 13 : if (PQresultStatus(res) == PGRES_COMMAND_OK)
5050 bruce 849 ECB : {
5050 bruce 850 UIC 0 : is_sql_cmd = true;
851 :
4823 mail 852 ECB : /*
853 : * need a tuple descriptor representing one TEXT column to return
854 : * the command status string as our result tuple
855 : */
1601 andres 856 UIC 0 : tupdesc = CreateTemplateTupleDesc(1);
5050 bruce 857 0 : TupleDescInitEntry(tupdesc, (AttrNumber) 1, "status",
5050 bruce 858 ECB : TEXTOID, -1, 0);
4823 mail 859 UIC 0 : ntuples = 1;
860 0 : nfields = 1;
5050 bruce 861 ECB : }
862 : else
863 : {
4823 mail 864 CBC 13 : Assert(PQresultStatus(res) == PGRES_TUPLES_OK);
5050 bruce 865 ECB :
4823 mail 866 CBC 13 : is_sql_cmd = false;
5050 bruce 867 ECB :
868 : /* get a tuple descriptor for our result type */
5050 bruce 869 GIC 13 : switch (get_call_result_type(fcinfo, NULL, &tupdesc))
6063 mail 870 ECB : {
5050 bruce 871 GIC 13 : case TYPEFUNC_COMPOSITE:
872 : /* success */
5050 bruce 873 CBC 13 : break;
5050 bruce 874 UIC 0 : case TYPEFUNC_RECORD:
875 : /* failed to determine actual type of RECORD */
876 0 : ereport(ERROR,
5050 bruce 877 ECB : (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
878 : errmsg("function returning record called in context "
879 : "that cannot accept type record")));
880 : break;
5050 bruce 881 LBC 0 : default:
882 : /* result type isn't composite */
883 0 : elog(ERROR, "return type must be a row type");
5050 bruce 884 EUB : break;
885 : }
6031 bruce 886 ECB :
887 : /* make sure we have a persistent copy of the tupdesc */
5050 bruce 888 GIC 13 : tupdesc = CreateTupleDescCopy(tupdesc);
4823 mail 889 13 : ntuples = PQntuples(res);
890 13 : nfields = PQnfields(res);
5050 bruce 891 EUB : }
892 :
893 : /*
894 : * check result and tuple descriptor have the same number of columns
5050 bruce 895 ECB : */
4823 mail 896 CBC 13 : if (nfields != tupdesc->natts)
5050 bruce 897 UIC 0 : ereport(ERROR,
898 : (errcode(ERRCODE_DATATYPE_MISMATCH),
899 : errmsg("remote query result rowtype does not match "
5050 bruce 900 ECB : "the specified FROM clause rowtype")));
901 :
4823 mail 902 GIC 13 : if (ntuples > 0)
5050 bruce 903 ECB : {
904 : AttInMetadata *attinmeta;
3670 tgl 905 GIC 13 : int nestlevel = -1;
4790 bruce 906 ECB : Tuplestorestate *tupstore;
907 : MemoryContext oldcontext;
908 : int row;
909 : char **values;
910 :
4823 mail 911 GIC 13 : attinmeta = TupleDescGetAttInMetadata(tupdesc);
912 :
913 : /* Set GUCs to ensure we read GUC-sensitive data types correctly */
3670 tgl 914 13 : if (!is_sql_cmd)
915 13 : nestlevel = applyRemoteGucs(conn);
916 :
1165 alvherre 917 13 : oldcontext = MemoryContextSwitchTo(rsinfo->econtext->ecxt_per_query_memory);
4823 mail 918 13 : tupstore = tuplestore_begin_heap(true, false, work_mem);
919 13 : rsinfo->setResult = tupstore;
4823 mail 920 CBC 13 : rsinfo->setDesc = tupdesc;
6063 mail 921 GIC 13 : MemoryContextSwitchTo(oldcontext);
922 :
209 peter 923 GNC 13 : values = palloc_array(char *, nfields);
924 :
925 : /* put all tuples into the tuplestore */
4823 mail 926 CBC 49 : for (row = 0; row < ntuples; row++)
7524 bruce 927 ECB : {
4823 mail 928 : HeapTuple tuple;
929 :
4823 mail 930 GIC 37 : if (!is_sql_cmd)
4823 mail 931 ECB : {
932 : int i;
933 :
4823 mail 934 GIC 138 : for (i = 0; i < nfields; i++)
4823 mail 935 ECB : {
4823 mail 936 GIC 101 : if (PQgetisnull(res, row, i))
4823 mail 937 UIC 0 : values[i] = NULL;
4823 mail 938 ECB : else
4823 mail 939 GIC 101 : values[i] = PQgetvalue(res, row, i);
940 : }
941 : }
942 : else
4823 mail 943 ECB : {
4823 mail 944 UIC 0 : values[0] = PQcmdStatus(res);
4823 mail 945 ECB : }
7524 bruce 946 :
4823 mail 947 : /* build the tuple and put it into the tuplestore. */
4823 mail 948 CBC 37 : tuple = BuildTupleFromCStrings(attinmeta, values);
4823 mail 949 GIC 36 : tuplestore_puttuple(tupstore, tuple);
950 : }
951 :
952 : /* clean up GUC settings, if we changed any */
3670 tgl 953 CBC 12 : restoreLocalGucs(nestlevel);
954 : }
7524 bruce 955 ECB : }
1255 peter 956 CBC 1 : PG_FINALLY();
957 : {
958 : /* be sure to release the libpq result */
7524 bruce 959 GIC 13 : PQclear(res);
7524 bruce 960 ECB : }
4823 mail 961 GIC 13 : PG_END_TRY();
7524 bruce 962 12 : }
963 :
964 : /*
965 : * Execute the given SQL command and store its results into a tuplestore
966 : * to be returned as the result of the current function.
967 : *
968 : * This is equivalent to PQexec followed by materializeResult, but we make
969 : * use of libpq's single-row mode to avoid accumulating the whole result
970 : * inside libpq before it gets transferred to the tuplestore.
971 : */
972 : static void
4022 tgl 973 21 : materializeQueryResult(FunctionCallInfo fcinfo,
974 : PGconn *conn,
975 : const char *conname,
976 : const char *sql,
4022 tgl 977 EUB : bool fail)
978 : {
4022 tgl 979 GIC 21 : ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
4022 tgl 980 GBC 21 : PGresult *volatile res = NULL;
1476 peter 981 GIC 21 : volatile storeInfo sinfo = {0};
4022 tgl 982 EUB :
983 : /* prepTuplestoreResult must have been called previously */
4022 tgl 984 GBC 21 : Assert(rsinfo->returnMode == SFRM_Materialize);
4022 tgl 985 EUB :
3902 tgl 986 GBC 21 : sinfo.fcinfo = fcinfo;
987 :
4022 988 21 : PG_TRY();
989 : {
990 : /* Create short-lived memory context for data conversions */
3215 mail 991 21 : sinfo.tmpcontext = AllocSetContextCreate(CurrentMemoryContext,
3215 mail 992 EUB : "dblink temporary context",
993 : ALLOCSET_DEFAULT_SIZES);
994 :
3902 tgl 995 : /* execute query, collecting any tuples into the tuplestore */
3902 tgl 996 GIC 21 : res = storeQueryResult(&sinfo, conn, sql);
997 :
4022 998 21 : if (!res ||
4022 tgl 999 CBC 21 : (PQresultStatus(res) != PGRES_COMMAND_OK &&
4022 tgl 1000 GIC 21 : PQresultStatus(res) != PGRES_TUPLES_OK))
4022 tgl 1001 CBC 2 : {
1002 : /*
4022 tgl 1003 ECB : * dblink_res_error will clear the passed PGresult, so we need
1004 : * this ugly dance to avoid doing so twice during error exit
1005 : */
4022 tgl 1006 GIC 2 : PGresult *res1 = res;
1007 :
4022 tgl 1008 CBC 2 : res = NULL;
1844 1009 2 : dblink_res_error(conn, conname, res1, fail,
1844 tgl 1010 ECB : "while executing query");
1011 : /* if fail isn't set, we'll return an empty query result */
4022 1012 : }
4022 tgl 1013 CBC 19 : else if (PQresultStatus(res) == PGRES_COMMAND_OK)
4022 tgl 1014 ECB : {
1015 : /*
1016 : * storeRow didn't get called, so we need to convert the command
3902 tgl 1017 EUB : * status string to a tuple manually
1018 : */
1019 : TupleDesc tupdesc;
4022 1020 : AttInMetadata *attinmeta;
1021 : Tuplestorestate *tupstore;
1022 : HeapTuple tuple;
1023 : char *values[1];
1024 : MemoryContext oldcontext;
1025 :
1026 : /*
1027 : * need a tuple descriptor representing one TEXT column to return
4022 tgl 1028 ECB : * the command status string as our result tuple
1029 : */
1601 andres 1030 UIC 0 : tupdesc = CreateTemplateTupleDesc(1);
4022 tgl 1031 0 : TupleDescInitEntry(tupdesc, (AttrNumber) 1, "status",
1032 : TEXTOID, -1, 0);
1033 0 : attinmeta = TupleDescGetAttInMetadata(tupdesc);
1034 :
1165 alvherre 1035 LBC 0 : oldcontext = MemoryContextSwitchTo(rsinfo->econtext->ecxt_per_query_memory);
4022 tgl 1036 UIC 0 : tupstore = tuplestore_begin_heap(true, false, work_mem);
4022 tgl 1037 LBC 0 : rsinfo->setResult = tupstore;
1038 0 : rsinfo->setDesc = tupdesc;
4022 tgl 1039 UIC 0 : MemoryContextSwitchTo(oldcontext);
1040 :
4022 tgl 1041 LBC 0 : values[0] = PQcmdStatus(res);
4022 tgl 1042 EUB :
1043 : /* build the tuple and put it into the tuplestore. */
4022 tgl 1044 LBC 0 : tuple = BuildTupleFromCStrings(attinmeta, values);
4022 tgl 1045 UBC 0 : tuplestore_puttuple(tupstore, tuple);
1046 :
4022 tgl 1047 UIC 0 : PQclear(res);
3902 1048 0 : res = NULL;
4022 tgl 1049 ECB : }
1050 : else
1051 : {
4022 tgl 1052 CBC 19 : Assert(PQresultStatus(res) == PGRES_TUPLES_OK);
3902 tgl 1053 ECB : /* storeRow should have created a tuplestore */
4022 tgl 1054 GIC 19 : Assert(rsinfo->setResult != NULL);
4022 tgl 1055 ECB :
4022 tgl 1056 GIC 19 : PQclear(res);
3902 1057 19 : res = NULL;
1058 : }
1059 :
1060 : /* clean up data conversion short-lived memory context */
3215 mail 1061 21 : if (sinfo.tmpcontext != NULL)
1062 21 : MemoryContextDelete(sinfo.tmpcontext);
1063 21 : sinfo.tmpcontext = NULL;
3215 mail 1064 ECB :
3902 tgl 1065 CBC 21 : PQclear(sinfo.last_res);
3902 tgl 1066 GIC 21 : sinfo.last_res = NULL;
3902 tgl 1067 CBC 21 : PQclear(sinfo.cur_res);
3902 tgl 1068 GIC 21 : sinfo.cur_res = NULL;
4022 tgl 1069 ECB : }
4022 tgl 1070 LBC 0 : PG_CATCH();
4022 tgl 1071 ECB : {
1072 : /* be sure to release any libpq result we collected */
3902 tgl 1073 UIC 0 : PQclear(res);
1074 0 : PQclear(sinfo.last_res);
1075 0 : PQclear(sinfo.cur_res);
4022 tgl 1076 ECB : /* and clear out any pending data in libpq */
3902 tgl 1077 UBC 0 : while ((res = PQgetResult(conn)) != NULL)
4022 tgl 1078 UIC 0 : PQclear(res);
1079 0 : PG_RE_THROW();
4022 tgl 1080 ECB : }
4022 tgl 1081 CBC 21 : PG_END_TRY();
1082 21 : }
4022 tgl 1083 ECB :
1084 : /*
1085 : * Execute query, and send any result rows to sinfo->tuplestore.
1086 : */
1087 : static PGresult *
2995 tgl 1088 CBC 21 : storeQueryResult(volatile storeInfo *sinfo, PGconn *conn, const char *sql)
1089 : {
3902 tgl 1090 GIC 21 : bool first = true;
3670 tgl 1091 CBC 21 : int nestlevel = -1;
3902 tgl 1092 ECB : PGresult *res;
1093 :
3902 tgl 1094 GIC 21 : if (!PQsendQuery(conn, sql))
2232 peter_e 1095 UIC 0 : elog(ERROR, "could not send query: %s", pchomp(PQerrorMessage(conn)));
1096 :
2118 tgl 1097 GIC 21 : if (!PQsetSingleRowMode(conn)) /* shouldn't fail */
3902 tgl 1098 UIC 0 : elog(ERROR, "failed to set single-row mode for dblink query");
1099 :
1100 : for (;;)
1101 : {
3902 tgl 1102 GIC 174 : CHECK_FOR_INTERRUPTS();
3902 tgl 1103 ECB :
3902 tgl 1104 GIC 174 : sinfo->cur_res = PQgetResult(conn);
3902 tgl 1105 CBC 174 : if (!sinfo->cur_res)
3902 tgl 1106 GIC 21 : break;
1107 :
1108 153 : if (PQresultStatus(sinfo->cur_res) == PGRES_SINGLE_TUPLE)
1109 : {
3902 tgl 1110 ECB : /* got one row from possibly-bigger resultset */
1111 :
1112 : /*
3670 1113 : * Set GUCs to ensure we read GUC-sensitive data types correctly.
1114 : * We shouldn't do this until we have a row in hand, to ensure
1115 : * libpq has seen any earlier ParameterStatus protocol messages.
1116 : */
3670 tgl 1117 GIC 132 : if (first && nestlevel < 0)
1118 19 : nestlevel = applyRemoteGucs(conn);
1119 :
3902 1120 132 : storeRow(sinfo, sinfo->cur_res, first);
3902 tgl 1121 ECB :
3902 tgl 1122 GBC 132 : PQclear(sinfo->cur_res);
3902 tgl 1123 CBC 132 : sinfo->cur_res = NULL;
3902 tgl 1124 GIC 132 : first = false;
1125 : }
3902 tgl 1126 ECB : else
1127 : {
1128 : /* if empty resultset, fill tuplestore header */
3902 tgl 1129 GIC 21 : if (first && PQresultStatus(sinfo->cur_res) == PGRES_TUPLES_OK)
3902 tgl 1130 LBC 0 : storeRow(sinfo, sinfo->cur_res, first);
3902 tgl 1131 EUB :
1132 : /* store completed result at last_res */
3902 tgl 1133 GBC 21 : PQclear(sinfo->last_res);
3902 tgl 1134 GIC 21 : sinfo->last_res = sinfo->cur_res;
1135 21 : sinfo->cur_res = NULL;
1136 21 : first = true;
1137 : }
3902 tgl 1138 EUB : }
1139 :
3670 1140 : /* clean up GUC settings, if we changed any */
3670 tgl 1141 GIC 21 : restoreLocalGucs(nestlevel);
1142 :
1143 : /* return last_res */
3902 1144 21 : res = sinfo->last_res;
3902 tgl 1145 CBC 21 : sinfo->last_res = NULL;
3902 tgl 1146 GIC 21 : return res;
1147 : }
3902 tgl 1148 ECB :
3902 tgl 1149 EUB : /*
1150 : * Send single row to sinfo->tuplestore.
1151 : *
1152 : * If "first" is true, create the tuplestore using PGresult's metadata
1153 : * (in this case the PGresult might contain either zero or one row).
1154 : */
3902 tgl 1155 ECB : static void
2995 tgl 1156 GIC 132 : storeRow(volatile storeInfo *sinfo, PGresult *res, bool first)
1157 : {
4022 tgl 1158 CBC 132 : int nfields = PQnfields(res);
4022 tgl 1159 ECB : HeapTuple tuple;
1160 : int i;
1161 : MemoryContext oldcontext;
1162 :
3902 tgl 1163 GIC 132 : if (first)
1164 : {
4022 tgl 1165 ECB : /* Prepare for new result set */
4022 tgl 1166 GBC 19 : ReturnSetInfo *rsinfo = (ReturnSetInfo *) sinfo->fcinfo->resultinfo;
1167 : TupleDesc tupdesc;
1168 :
1169 : /*
1170 : * It's possible to get more than one result set if the query string
1171 : * contained multiple SQL commands. In that case, we follow PQexec's
4022 tgl 1172 ECB : * traditional behavior of throwing away all but the last result.
4022 tgl 1173 EUB : */
4022 tgl 1174 CBC 19 : if (sinfo->tuplestore)
4022 tgl 1175 UIC 0 : tuplestore_end(sinfo->tuplestore);
4022 tgl 1176 GIC 19 : sinfo->tuplestore = NULL;
1177 :
4022 tgl 1178 ECB : /* get a tuple descriptor for our result type */
4022 tgl 1179 GIC 19 : switch (get_call_result_type(sinfo->fcinfo, NULL, &tupdesc))
1180 : {
1181 19 : case TYPEFUNC_COMPOSITE:
1182 : /* success */
1183 19 : break;
4022 tgl 1184 UIC 0 : case TYPEFUNC_RECORD:
4022 tgl 1185 ECB : /* failed to determine actual type of RECORD */
4022 tgl 1186 UIC 0 : ereport(ERROR,
1187 : (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1188 : errmsg("function returning record called in context "
1189 : "that cannot accept type record")));
4022 tgl 1190 ECB : break;
4022 tgl 1191 UIC 0 : default:
4022 tgl 1192 ECB : /* result type isn't composite */
4022 tgl 1193 UBC 0 : elog(ERROR, "return type must be a row type");
1194 : break;
4022 tgl 1195 ECB : }
1196 :
1197 : /* make sure we have a persistent copy of the tupdesc */
4022 tgl 1198 GIC 19 : tupdesc = CreateTupleDescCopy(tupdesc);
4022 tgl 1199 ECB :
1200 : /* check result and tuple descriptor have the same number of columns */
4022 tgl 1201 CBC 19 : if (nfields != tupdesc->natts)
4022 tgl 1202 UIC 0 : ereport(ERROR,
1203 : (errcode(ERRCODE_DATATYPE_MISMATCH),
4022 tgl 1204 ECB : errmsg("remote query result rowtype does not match "
1205 : "the specified FROM clause rowtype")));
1206 :
1207 : /* Prepare attinmeta for later data conversions */
4022 tgl 1208 GIC 19 : sinfo->attinmeta = TupleDescGetAttInMetadata(tupdesc);
1209 :
1210 : /* Create a new, empty tuplestore */
3902 1211 19 : oldcontext = MemoryContextSwitchTo(rsinfo->econtext->ecxt_per_query_memory);
4022 1212 19 : sinfo->tuplestore = tuplestore_begin_heap(true, false, work_mem);
4022 tgl 1213 CBC 19 : rsinfo->setResult = sinfo->tuplestore;
4022 tgl 1214 GIC 19 : rsinfo->setDesc = tupdesc;
4022 tgl 1215 CBC 19 : MemoryContextSwitchTo(oldcontext);
1216 :
1217 : /* Done if empty resultset */
3902 tgl 1218 GIC 19 : if (PQntuples(res) == 0)
3902 tgl 1219 LBC 0 : return;
1220 :
4022 tgl 1221 ECB : /*
1222 : * Set up sufficiently-wide string pointers array; this won't change
1223 : * in size so it's easy to preallocate.
1224 : */
4022 tgl 1225 GIC 19 : if (sinfo->cstrs)
4022 tgl 1226 UIC 0 : pfree(sinfo->cstrs);
209 peter 1227 GNC 19 : sinfo->cstrs = palloc_array(char *, nfields);
4022 tgl 1228 ECB : }
1229 :
1230 : /* Should have a single-row result if we get here */
3902 tgl 1231 GIC 132 : Assert(PQntuples(res) == 1);
1232 :
4022 tgl 1233 ECB : /*
1234 : * Do the following work in a temp context that we reset after each tuple.
1235 : * This cleans up not only the data we have direct access to, but any
1236 : * cruft the I/O functions might leak.
4022 tgl 1237 EUB : */
4022 tgl 1238 GIC 132 : oldcontext = MemoryContextSwitchTo(sinfo->tmpcontext);
1239 :
1240 : /*
1241 : * Fill cstrs with null-terminated strings of column values.
1242 : */
1243 510 : for (i = 0; i < nfields; i++)
1244 : {
3902 1245 378 : if (PQgetisnull(res, 0, i))
3902 tgl 1246 UIC 0 : sinfo->cstrs[i] = NULL;
1247 : else
3902 tgl 1248 CBC 378 : sinfo->cstrs[i] = PQgetvalue(res, 0, i);
1249 : }
4022 tgl 1250 ECB :
1251 : /* Convert row to a tuple, and add it to the tuplestore */
3902 tgl 1252 GIC 132 : tuple = BuildTupleFromCStrings(sinfo->attinmeta, sinfo->cstrs);
1253 :
4022 tgl 1254 CBC 132 : tuplestore_puttuple(sinfo->tuplestore, tuple);
4022 tgl 1255 ECB :
1256 : /* Clean up */
4022 tgl 1257 CBC 132 : MemoryContextSwitchTo(oldcontext);
1258 132 : MemoryContextReset(sinfo->tmpcontext);
1259 : }
1260 :
1261 : /*
1262 : * List all open dblink connections by name.
1263 : * Returns an array of all connection names.
1264 : * Takes no params
1265 : */
6063 mail 1266 GIC 2 : PG_FUNCTION_INFO_V1(dblink_get_connections);
1267 : Datum
1268 1 : dblink_get_connections(PG_FUNCTION_ARGS)
1269 : {
1270 : HASH_SEQ_STATUS status;
1271 : remoteConnHashEnt *hentry;
6031 bruce 1272 CBC 1 : ArrayBuildState *astate = NULL;
1273 :
6063 mail 1274 1 : if (remoteConnHash)
1275 : {
6063 mail 1276 GIC 1 : hash_seq_init(&status, remoteConnHash);
1277 4 : while ((hentry = (remoteConnHashEnt *) hash_seq_search(&status)) != NULL)
1278 : {
1279 : /* stash away current value */
1280 3 : astate = accumArrayResult(astate,
5493 tgl 1281 CBC 3 : CStringGetTextDatum(hentry->name),
6063 mail 1282 ECB : false, TEXTOID, CurrentMemoryContext);
1283 : }
1284 : }
1285 :
6063 mail 1286 CBC 1 : if (astate)
224 peter 1287 GNC 1 : PG_RETURN_DATUM(makeArrayResult(astate,
6063 mail 1288 ECB : CurrentMemoryContext));
1289 : else
6063 mail 1290 UIC 0 : PG_RETURN_NULL();
6063 mail 1291 EUB : }
1292 :
1293 : /*
1294 : * Checks if a given remote connection is busy
1295 : *
1296 : * Returns 1 if the connection is busy, 0 otherwise
1297 : * Params:
1298 : * text connection_name - name of the connection to check
1299 : *
1300 : */
6063 mail 1301 GIC 2 : PG_FUNCTION_INFO_V1(dblink_is_busy);
1302 : Datum
1303 1 : dblink_is_busy(PG_FUNCTION_ARGS)
1304 : {
2296 peter_e 1305 ECB : PGconn *conn;
1306 :
2296 peter_e 1307 CBC 1 : dblink_init();
2296 peter_e 1308 GIC 1 : conn = dblink_get_named_conn(text_to_cstring(PG_GETARG_TEXT_PP(0)));
1309 :
6063 mail 1310 1 : PQconsumeInput(conn);
1311 1 : PG_RETURN_INT32(PQisBusy(conn));
6063 mail 1312 ECB : }
1313 :
1314 : /*
1315 : * Cancels a running request on a connection
1316 : *
6031 bruce 1317 : * Returns text:
1318 : * "OK" if the cancel request has been sent correctly,
6031 bruce 1319 EUB : * an error message otherwise
1320 : *
1321 : * Params:
1322 : * text connection_name - name of the connection to check
1323 : *
1324 : */
6063 mail 1325 CBC 2 : PG_FUNCTION_INFO_V1(dblink_cancel_query);
1326 : Datum
1327 1 : dblink_cancel_query(PG_FUNCTION_ARGS)
1328 : {
2296 peter_e 1329 ECB : int res;
1330 : PGconn *conn;
6031 bruce 1331 : PGcancel *cancel;
1332 : char errbuf[256];
6063 mail 1333 :
2296 peter_e 1334 GIC 1 : dblink_init();
2296 peter_e 1335 CBC 1 : conn = dblink_get_named_conn(text_to_cstring(PG_GETARG_TEXT_PP(0)));
6063 mail 1336 GIC 1 : cancel = PQgetCancel(conn);
6063 mail 1337 ECB :
6063 mail 1338 CBC 1 : res = PQcancel(cancel, errbuf, 256);
1339 1 : PQfreeCancel(cancel);
6063 mail 1340 ECB :
5575 tgl 1341 GIC 1 : if (res == 1)
5493 tgl 1342 CBC 1 : PG_RETURN_TEXT_P(cstring_to_text("OK"));
1343 : else
5493 tgl 1344 UIC 0 : PG_RETURN_TEXT_P(cstring_to_text(errbuf));
6063 mail 1345 EUB : }
1346 :
1347 :
1348 : /*
1349 : * Get error message from a connection
6063 mail 1350 ECB : *
1351 : * Returns text:
1352 : * "OK" if no error, an error message otherwise
6031 bruce 1353 : *
1354 : * Params:
1355 : * text connection_name - name of the connection to check
1356 : *
6063 mail 1357 : */
6063 mail 1358 GIC 2 : PG_FUNCTION_INFO_V1(dblink_error_message);
1359 : Datum
1360 1 : dblink_error_message(PG_FUNCTION_ARGS)
6063 mail 1361 ECB : {
6031 bruce 1362 : char *msg;
2296 peter_e 1363 : PGconn *conn;
1364 :
2296 peter_e 1365 GIC 1 : dblink_init();
2296 peter_e 1366 CBC 1 : conn = dblink_get_named_conn(text_to_cstring(PG_GETARG_TEXT_PP(0)));
1367 :
6063 mail 1368 GIC 1 : msg = PQerrorMessage(conn);
5575 tgl 1369 CBC 1 : if (msg == NULL || msg[0] == '\0')
5493 1370 1 : PG_RETURN_TEXT_P(cstring_to_text("OK"));
1371 : else
2232 peter_e 1372 UIC 0 : PG_RETURN_TEXT_P(cstring_to_text(pchomp(msg)));
1373 : }
6063 mail 1374 EUB :
1375 : /*
7524 bruce 1376 ECB : * Execute an SQL non-SELECT command
7524 bruce 1377 EUB : */
7524 bruce 1378 GIC 9 : PG_FUNCTION_INFO_V1(dblink_exec);
7524 bruce 1379 ECB : Datum
7524 bruce 1380 CBC 26 : dblink_exec(PG_FUNCTION_ARGS)
7524 bruce 1381 ECB : {
4023 tgl 1382 CBC 26 : text *volatile sql_cmd_status = NULL;
4023 tgl 1383 GIC 26 : PGconn *volatile conn = NULL;
4023 tgl 1384 CBC 26 : volatile bool freeconn = false;
1385 :
2296 peter_e 1386 GIC 26 : dblink_init();
1387 :
4023 tgl 1388 26 : PG_TRY();
1389 : {
1390 26 : PGresult *res = NULL;
4023 tgl 1391 CBC 26 : char *sql = NULL;
4023 tgl 1392 GIC 26 : char *conname = NULL;
4023 tgl 1393 CBC 26 : bool fail = true; /* default to backward compatible behavior */
1394 :
4023 tgl 1395 GIC 26 : if (PG_NARGS() == 3)
1396 : {
1397 : /* must be text,text,bool */
2203 peter_e 1398 UIC 0 : conname = text_to_cstring(PG_GETARG_TEXT_PP(0));
4023 tgl 1399 LBC 0 : sql = text_to_cstring(PG_GETARG_TEXT_PP(1));
1400 0 : fail = PG_GETARG_BOOL(2);
2203 peter_e 1401 UIC 0 : dblink_get_conn(conname, &conn, &conname, &freeconn);
1402 : }
4023 tgl 1403 GIC 26 : else if (PG_NARGS() == 2)
4023 tgl 1404 EUB : {
1405 : /* might be text,text or text,bool */
4023 tgl 1406 GIC 17 : if (get_fn_expr_argtype(fcinfo->flinfo, 1) == BOOLOID)
1407 : {
1408 1 : sql = text_to_cstring(PG_GETARG_TEXT_PP(0));
1409 1 : fail = PG_GETARG_BOOL(1);
2203 peter_e 1410 CBC 1 : conn = pconn->conn;
1411 : }
1412 : else
4023 tgl 1413 ECB : {
2203 peter_e 1414 CBC 16 : conname = text_to_cstring(PG_GETARG_TEXT_PP(0));
4023 tgl 1415 GIC 16 : sql = text_to_cstring(PG_GETARG_TEXT_PP(1));
2203 peter_e 1416 CBC 16 : dblink_get_conn(conname, &conn, &conname, &freeconn);
1417 : }
4023 tgl 1418 ECB : }
4023 tgl 1419 GIC 9 : else if (PG_NARGS() == 1)
1420 : {
1421 : /* must be single text argument */
6382 mail 1422 9 : conn = pconn->conn;
5493 tgl 1423 9 : sql = text_to_cstring(PG_GETARG_TEXT_PP(0));
1424 : }
1425 : else
1426 : /* shouldn't happen */
4023 tgl 1427 UIC 0 : elog(ERROR, "wrong number of arguments");
7524 bruce 1428 ECB :
4023 tgl 1429 GIC 26 : if (!conn)
2296 peter_e 1430 LBC 0 : dblink_conn_not_avail(conname);
1431 :
4023 tgl 1432 GIC 26 : res = PQexec(conn, sql);
1433 26 : if (!res ||
1434 26 : (PQresultStatus(res) != PGRES_COMMAND_OK &&
1435 2 : PQresultStatus(res) != PGRES_TUPLES_OK))
1436 : {
1844 1437 2 : dblink_res_error(conn, conname, res, fail,
1438 : "while executing command");
1439 :
1440 : /*
4023 tgl 1441 ECB : * and save a copy of the command status string to return as our
1442 : * result tuple
1443 : */
4023 tgl 1444 GIC 1 : sql_cmd_status = cstring_to_text("ERROR");
1445 : }
1446 24 : else if (PQresultStatus(res) == PGRES_COMMAND_OK)
4023 tgl 1447 ECB : {
1448 : /*
1449 : * and save a copy of the command status string to return as our
1450 : * result tuple
1451 : */
4023 tgl 1452 CBC 24 : sql_cmd_status = cstring_to_text(PQcmdStatus(res));
4023 tgl 1453 GIC 24 : PQclear(res);
1454 : }
4023 tgl 1455 ECB : else
1456 : {
4023 tgl 1457 UIC 0 : PQclear(res);
4023 tgl 1458 LBC 0 : ereport(ERROR,
1459 : (errcode(ERRCODE_S_R_E_PROHIBITED_SQL_STATEMENT_ATTEMPTED),
2118 tgl 1460 ECB : errmsg("statement returning results not allowed")));
1461 : }
1462 : }
1255 peter 1463 GIC 1 : PG_FINALLY();
1464 : {
4023 tgl 1465 ECB : /* if needed, close the connection to the database */
4023 tgl 1466 CBC 26 : if (freeconn)
76 andres 1467 GNC 1 : libpqsrv_disconnect(conn);
1468 : }
4023 tgl 1469 GIC 26 : PG_END_TRY();
1470 :
7228 bruce 1471 25 : PG_RETURN_TEXT_P(sql_cmd_status);
7524 bruce 1472 ECB : }
1473 :
1474 :
7655 1475 : /*
1476 : * dblink_get_pkey
7522 1477 : *
1478 : * Return list of primary key fields for the supplied relation,
1479 : * or NULL if none exists.
7655 1480 : */
7655 bruce 1481 GIC 2 : PG_FUNCTION_INFO_V1(dblink_get_pkey);
1482 : Datum
1483 9 : dblink_get_pkey(PG_FUNCTION_ARGS)
1484 : {
1828 teodor 1485 EUB : int16 indnkeyatts;
7522 bruce 1486 : char **results;
1487 : FuncCallContext *funcctx;
1488 : int32 call_cntr;
7522 bruce 1489 ECB : int32 max_calls;
1490 : AttInMetadata *attinmeta;
1491 : MemoryContext oldcontext;
1492 :
7524 1493 : /* stuff done only on the first call of the function */
7522 bruce 1494 GIC 9 : if (SRF_IS_FIRSTCALL())
1495 : {
1496 : Relation rel;
1497 : TupleDesc tupdesc;
7524 bruce 1498 ECB :
1499 : /* create a function context for cross-call persistence */
7522 bruce 1500 GIC 3 : funcctx = SRF_FIRSTCALL_INIT();
7524 bruce 1501 ECB :
7522 1502 : /*
1503 : * switch to memory context appropriate for multiple function calls
1504 : */
7524 bruce 1505 GIC 3 : oldcontext = MemoryContextSwitchTo(funcctx->multi_call_memory_ctx);
1506 :
1507 : /* open target relation */
2219 noah 1508 3 : rel = get_rel_from_relname(PG_GETARG_TEXT_PP(0), AccessShareLock, ACL_SELECT);
1509 :
4682 tgl 1510 ECB : /* get the array of attnums */
1828 teodor 1511 CBC 3 : results = get_pkey_attnames(rel, &indnkeyatts);
4682 tgl 1512 ECB :
4682 tgl 1513 GIC 3 : relation_close(rel, AccessShareLock);
1514 :
7522 bruce 1515 ECB : /*
1516 : * need a tuple descriptor representing one INT and one TEXT column
1517 : */
1601 andres 1518 CBC 3 : tupdesc = CreateTemplateTupleDesc(2);
7524 bruce 1519 GIC 3 : TupleDescInitEntry(tupdesc, (AttrNumber) 1, "position",
6947 tgl 1520 ECB : INT4OID, -1, 0);
7524 bruce 1521 GIC 3 : TupleDescInitEntry(tupdesc, (AttrNumber) 2, "colname",
1522 : TEXTOID, -1, 0);
1523 :
1524 : /*
6385 bruce 1525 ECB : * Generate attribute metadata needed later to produce tuples from raw
1526 : * C strings
1527 : */
7524 bruce 1528 GIC 3 : attinmeta = TupleDescGetAttInMetadata(tupdesc);
1529 3 : funcctx->attinmeta = attinmeta;
1530 :
1828 teodor 1531 3 : if ((results != NULL) && (indnkeyatts > 0))
1532 : {
1533 3 : funcctx->max_calls = indnkeyatts;
1534 :
1535 : /* got results, keep track of them */
7524 bruce 1536 3 : funcctx->user_fctx = results;
1537 : }
1538 : else
1539 : {
1540 : /* fast track when no results */
5243 tgl 1541 UIC 0 : MemoryContextSwitchTo(oldcontext);
7522 bruce 1542 0 : SRF_RETURN_DONE(funcctx);
1543 : }
1544 :
7524 bruce 1545 GIC 3 : MemoryContextSwitchTo(oldcontext);
1546 : }
1547 :
1548 : /* stuff done on every call of the function */
7522 bruce 1549 CBC 9 : funcctx = SRF_PERCALL_SETUP();
1550 :
7524 bruce 1551 ECB : /*
1552 : * initialize per-call variables
1553 : */
7524 bruce 1554 CBC 9 : call_cntr = funcctx->call_cntr;
1555 9 : max_calls = funcctx->max_calls;
7655 bruce 1556 ECB :
7524 bruce 1557 CBC 9 : results = (char **) funcctx->user_fctx;
7524 bruce 1558 GIC 9 : attinmeta = funcctx->attinmeta;
1559 :
1560 9 : if (call_cntr < max_calls) /* do when there is more left to send */
1561 : {
1562 : char **values;
1563 : HeapTuple tuple;
1564 : Datum result;
1565 :
209 peter 1566 GNC 6 : values = palloc_array(char *, 2);
3380 peter_e 1567 GIC 6 : values[0] = psprintf("%d", call_cntr + 1);
7524 bruce 1568 6 : values[1] = results[call_cntr];
1569 :
7524 bruce 1570 ECB : /* build the tuple */
7524 bruce 1571 GIC 6 : tuple = BuildTupleFromCStrings(attinmeta, values);
1572 :
1573 : /* make the tuple into a datum */
6947 tgl 1574 6 : result = HeapTupleGetDatum(tuple);
7524 bruce 1575 ECB :
7522 bruce 1576 GIC 6 : SRF_RETURN_NEXT(funcctx, result);
1577 : }
1578 : else
1579 : {
1580 : /* do when there is no more left */
7228 1581 3 : SRF_RETURN_DONE(funcctx);
7655 bruce 1582 ECB : }
1583 : }
1584 :
1585 :
1586 : /*
1587 : * dblink_build_sql_insert
7522 bruce 1588 EUB : *
1589 : * Used to generate an SQL insert statement
1590 : * based on an existing tuple in a local relation.
1591 : * This is useful for selectively replicating data
1592 : * to another server via dblink.
1593 : *
1594 : * API:
1595 : * <relname> - name of local table of interest
7655 bruce 1596 ECB : * <pkattnums> - an int2vector of attnums which will be used
1597 : * to identify the local tuple of interest
1598 : * <pknumatts> - number of attnums in pkattnums
1599 : * <src_pkattvals_arry> - text array of key values which will be used
1600 : * to identify the local tuple of interest
1601 : * <tgt_pkattvals_arry> - text array of key values which will be used
7655 bruce 1602 EUB : * to build the string for execution remotely. These are substituted
1603 : * for their counterparts in src_pkattvals_arry
1604 : */
7655 bruce 1605 GIC 3 : PG_FUNCTION_INFO_V1(dblink_build_sql_insert);
1606 : Datum
1607 6 : dblink_build_sql_insert(PG_FUNCTION_ARGS)
1608 : {
2219 noah 1609 CBC 6 : text *relname_text = PG_GETARG_TEXT_PP(0);
4681 tgl 1610 GIC 6 : int2vector *pkattnums_arg = (int2vector *) PG_GETARG_POINTER(1);
1611 6 : int32 pknumatts_arg = PG_GETARG_INT32(2);
6351 1612 6 : ArrayType *src_pkattvals_arry = PG_GETARG_ARRAYTYPE_P(3);
1613 6 : ArrayType *tgt_pkattvals_arry = PG_GETARG_ARRAYTYPE_P(4);
4682 tgl 1614 ECB : Relation rel;
1615 : int *pkattnums;
1616 : int pknumatts;
1617 : char **src_pkattvals;
1618 : char **tgt_pkattvals;
7655 bruce 1619 : int src_nitems;
1620 : int tgt_nitems;
1621 : char *sql;
1622 :
1623 : /*
1624 : * Open target relation.
1625 : */
4682 tgl 1626 GIC 6 : rel = get_rel_from_relname(relname_text, AccessShareLock, ACL_SELECT);
1627 :
1628 : /*
1629 : * Process pkattnums argument.
1630 : */
4681 1631 6 : validate_pkattnums(rel, pkattnums_arg, pknumatts_arg,
1632 : &pkattnums, &pknumatts);
1633 :
1634 : /*
1635 : * Source array is made up of key values that will be used to locate the
1636 : * tuple of interest from the local system.
1637 : */
6351 tgl 1638 CBC 4 : src_pkattvals = get_text_array_contents(src_pkattvals_arry, &src_nitems);
1639 :
7655 bruce 1640 ECB : /*
1641 : * There should be one source array key value for each key attnum
1642 : */
7655 bruce 1643 CBC 4 : if (src_nitems != pknumatts)
7199 tgl 1644 LBC 0 : ereport(ERROR,
7199 tgl 1645 ECB : (errcode(ERRCODE_ARRAY_SUBSCRIPT_ERROR),
1646 : errmsg("source key array length must match number of key attributes")));
1647 :
1648 : /*
1649 : * Target array is made up of key values that will be used to build the
1650 : * SQL string for use on the remote system.
1651 : */
6351 tgl 1652 GIC 4 : tgt_pkattvals = get_text_array_contents(tgt_pkattvals_arry, &tgt_nitems);
1653 :
1654 : /*
1655 : * There should be one target array key value for each key attnum
7655 bruce 1656 ECB : */
7655 bruce 1657 GIC 4 : if (tgt_nitems != pknumatts)
7199 tgl 1658 UIC 0 : ereport(ERROR,
1659 : (errcode(ERRCODE_ARRAY_SUBSCRIPT_ERROR),
1660 : errmsg("target key array length must match number of key attributes")));
7655 bruce 1661 ECB :
1662 : /*
1663 : * Prep work is finally done. Go get the SQL string.
1664 : */
4682 tgl 1665 GIC 4 : sql = get_sql_insert(rel, pkattnums, pknumatts, src_pkattvals, tgt_pkattvals);
1666 :
1667 : /*
4682 tgl 1668 ECB : * Now we can close the relation.
1669 : */
4682 tgl 1670 GIC 4 : relation_close(rel, AccessShareLock);
1671 :
1672 : /*
7655 bruce 1673 ECB : * And send it
7655 bruce 1674 EUB : */
5493 tgl 1675 GIC 4 : PG_RETURN_TEXT_P(cstring_to_text(sql));
1676 : }
1677 :
1678 :
1679 : /*
1680 : * dblink_build_sql_delete
7522 bruce 1681 ECB : *
1682 : * Used to generate an SQL delete statement.
1683 : * This is useful for selectively replicating a
1684 : * delete to another server via dblink.
1685 : *
7655 1686 : * API:
1687 : * <relname> - name of remote table of interest
1688 : * <pkattnums> - an int2vector of attnums which will be used
1689 : * to identify the remote tuple of interest
1690 : * <pknumatts> - number of attnums in pkattnums
1691 : * <tgt_pkattvals_arry> - text array of key values which will be used
1692 : * to build the string for execution remotely.
1693 : */
7655 bruce 1694 GIC 3 : PG_FUNCTION_INFO_V1(dblink_build_sql_delete);
1695 : Datum
1696 6 : dblink_build_sql_delete(PG_FUNCTION_ARGS)
1697 : {
2219 noah 1698 6 : text *relname_text = PG_GETARG_TEXT_PP(0);
4681 tgl 1699 6 : int2vector *pkattnums_arg = (int2vector *) PG_GETARG_POINTER(1);
1700 6 : int32 pknumatts_arg = PG_GETARG_INT32(2);
6351 1701 6 : ArrayType *tgt_pkattvals_arry = PG_GETARG_ARRAYTYPE_P(3);
1702 : Relation rel;
1703 : int *pkattnums;
1704 : int pknumatts;
1705 : char **tgt_pkattvals;
1706 : int tgt_nitems;
1707 : char *sql;
1708 :
1709 : /*
1710 : * Open target relation.
1711 : */
4682 1712 6 : rel = get_rel_from_relname(relname_text, AccessShareLock, ACL_SELECT);
1713 :
7655 bruce 1714 ECB : /*
1715 : * Process pkattnums argument.
1716 : */
4681 tgl 1717 GIC 6 : validate_pkattnums(rel, pkattnums_arg, pknumatts_arg,
4681 tgl 1718 ECB : &pkattnums, &pknumatts);
4813 mail 1719 :
7655 bruce 1720 : /*
6385 1721 : * Target array is made up of key values that will be used to build the
1722 : * SQL string for use on the remote system.
1723 : */
6351 tgl 1724 GIC 4 : tgt_pkattvals = get_text_array_contents(tgt_pkattvals_arry, &tgt_nitems);
1725 :
1726 : /*
1727 : * There should be one target array key value for each key attnum
1728 : */
7655 bruce 1729 4 : if (tgt_nitems != pknumatts)
7199 tgl 1730 UIC 0 : ereport(ERROR,
1731 : (errcode(ERRCODE_ARRAY_SUBSCRIPT_ERROR),
1732 : errmsg("target key array length must match number of key attributes")));
1733 :
1734 : /*
7655 bruce 1735 ECB : * Prep work is finally done. Go get the SQL string.
1736 : */
4682 tgl 1737 GIC 4 : sql = get_sql_delete(rel, pkattnums, pknumatts, tgt_pkattvals);
1738 :
1739 : /*
4682 tgl 1740 ECB : * Now we can close the relation.
1741 : */
4682 tgl 1742 GIC 4 : relation_close(rel, AccessShareLock);
1743 :
1744 : /*
1745 : * And send it
1746 : */
5493 tgl 1747 CBC 4 : PG_RETURN_TEXT_P(cstring_to_text(sql));
1748 : }
1749 :
1750 :
1751 : /*
7655 bruce 1752 ECB : * dblink_build_sql_update
7522 bruce 1753 EUB : *
1754 : * Used to generate an SQL update statement
1755 : * based on an existing tuple in a local relation.
1756 : * This is useful for selectively replicating data
1757 : * to another server via dblink.
1758 : *
1759 : * API:
1760 : * <relname> - name of local table of interest
7655 bruce 1761 ECB : * <pkattnums> - an int2vector of attnums which will be used
1762 : * to identify the local tuple of interest
1763 : * <pknumatts> - number of attnums in pkattnums
1764 : * <src_pkattvals_arry> - text array of key values which will be used
1765 : * to identify the local tuple of interest
1766 : * <tgt_pkattvals_arry> - text array of key values which will be used
7655 bruce 1767 EUB : * to build the string for execution remotely. These are substituted
1768 : * for their counterparts in src_pkattvals_arry
1769 : */
7655 bruce 1770 GIC 3 : PG_FUNCTION_INFO_V1(dblink_build_sql_update);
1771 : Datum
1772 6 : dblink_build_sql_update(PG_FUNCTION_ARGS)
1773 : {
2219 noah 1774 CBC 6 : text *relname_text = PG_GETARG_TEXT_PP(0);
4681 tgl 1775 GIC 6 : int2vector *pkattnums_arg = (int2vector *) PG_GETARG_POINTER(1);
1776 6 : int32 pknumatts_arg = PG_GETARG_INT32(2);
6351 1777 6 : ArrayType *src_pkattvals_arry = PG_GETARG_ARRAYTYPE_P(3);
1778 6 : ArrayType *tgt_pkattvals_arry = PG_GETARG_ARRAYTYPE_P(4);
4682 tgl 1779 ECB : Relation rel;
1780 : int *pkattnums;
1781 : int pknumatts;
1782 : char **src_pkattvals;
1783 : char **tgt_pkattvals;
7655 bruce 1784 : int src_nitems;
1785 : int tgt_nitems;
1786 : char *sql;
1787 :
1788 : /*
1789 : * Open target relation.
1790 : */
4682 tgl 1791 GIC 6 : rel = get_rel_from_relname(relname_text, AccessShareLock, ACL_SELECT);
1792 :
7655 bruce 1793 ECB : /*
1794 : * Process pkattnums argument.
4813 mail 1795 EUB : */
4681 tgl 1796 GIC 6 : validate_pkattnums(rel, pkattnums_arg, pknumatts_arg,
1797 : &pkattnums, &pknumatts);
4813 mail 1798 EUB :
1799 : /*
1800 : * Source array is made up of key values that will be used to locate the
1801 : * tuple of interest from the local system.
1802 : */
6351 tgl 1803 GIC 4 : src_pkattvals = get_text_array_contents(src_pkattvals_arry, &src_nitems);
1804 :
1805 : /*
1806 : * There should be one source array key value for each key attnum
1807 : */
7655 bruce 1808 4 : if (src_nitems != pknumatts)
7199 tgl 1809 UIC 0 : ereport(ERROR,
1810 : (errcode(ERRCODE_ARRAY_SUBSCRIPT_ERROR),
1202 alvherre 1811 ECB : errmsg("source key array length must match number of key attributes")));
1812 :
7655 bruce 1813 : /*
1814 : * Target array is made up of key values that will be used to build the
1815 : * SQL string for use on the remote system.
1816 : */
6351 tgl 1817 CBC 4 : tgt_pkattvals = get_text_array_contents(tgt_pkattvals_arry, &tgt_nitems);
1818 :
7655 bruce 1819 ECB : /*
1820 : * There should be one target array key value for each key attnum
7655 bruce 1821 EUB : */
7655 bruce 1822 GIC 4 : if (tgt_nitems != pknumatts)
7199 tgl 1823 LBC 0 : ereport(ERROR,
1824 : (errcode(ERRCODE_ARRAY_SUBSCRIPT_ERROR),
1202 alvherre 1825 ECB : errmsg("target key array length must match number of key attributes")));
1826 :
7655 bruce 1827 : /*
1828 : * Prep work is finally done. Go get the SQL string.
1829 : */
4682 tgl 1830 GIC 4 : sql = get_sql_update(rel, pkattnums, pknumatts, src_pkattvals, tgt_pkattvals);
1831 :
1832 : /*
4682 tgl 1833 ECB : * Now we can close the relation.
1834 : */
4682 tgl 1835 GIC 4 : relation_close(rel, AccessShareLock);
7655 bruce 1836 ECB :
1837 : /*
1838 : * And send it
7655 bruce 1839 EUB : */
5493 tgl 1840 GIC 4 : PG_RETURN_TEXT_P(cstring_to_text(sql));
7969 bruce 1841 ECB : }
1842 :
5052 tgl 1843 : /*
1844 : * dblink_current_query
1845 : * return the current query string
5052 tgl 1846 EUB : * to allow its use in (among other things)
1847 : * rewrite rules
5052 tgl 1848 ECB : */
5052 tgl 1849 GIC 1 : PG_FUNCTION_INFO_V1(dblink_current_query);
5052 tgl 1850 ECB : Datum
5052 tgl 1851 LBC 0 : dblink_current_query(PG_FUNCTION_ARGS)
1852 : {
1853 : /* This is now just an alias for the built-in function current_query() */
1854 0 : PG_RETURN_DATUM(current_query(fcinfo));
1855 : }
1856 :
1857 : /*
1858 : * Retrieve async notifications for a connection.
1859 : *
1860 : * Returns a setof record of notifications, or an empty set if none received.
1861 : * Can optionally take a named connection as parameter, but uses the unnamed
1862 : * connection per default.
1863 : *
4995 mail 1864 ECB : */
1865 : #define DBLINK_NOTIFY_COLS 3
1866 :
4995 mail 1867 GIC 3 : PG_FUNCTION_INFO_V1(dblink_get_notify);
4995 mail 1868 ECB : Datum
4995 mail 1869 CBC 2 : dblink_get_notify(PG_FUNCTION_ARGS)
1870 : {
1871 : PGconn *conn;
1872 : PGnotify *notify;
4790 bruce 1873 GIC 2 : ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
1874 :
2296 peter_e 1875 2 : dblink_init();
4995 mail 1876 2 : if (PG_NARGS() == 1)
2296 peter_e 1877 UIC 0 : conn = dblink_get_named_conn(text_to_cstring(PG_GETARG_TEXT_PP(0)));
1878 : else
4995 mail 1879 GIC 2 : conn = pconn->conn;
1880 :
173 michael 1881 CBC 2 : InitMaterializedSRF(fcinfo, 0);
1882 :
4995 mail 1883 2 : PQconsumeInput(conn);
1884 4 : while ((notify = PQnotifies(conn)) != NULL)
4995 mail 1885 EUB : {
1886 : Datum values[DBLINK_NOTIFY_COLS];
1887 : bool nulls[DBLINK_NOTIFY_COLS];
1888 :
4995 mail 1889 GIC 2 : memset(values, 0, sizeof(values));
1890 2 : memset(nulls, 0, sizeof(nulls));
1891 :
4995 mail 1892 CBC 2 : if (notify->relname != NULL)
4995 mail 1893 GIC 2 : values[0] = CStringGetTextDatum(notify->relname);
4995 mail 1894 ECB : else
4995 mail 1895 UIC 0 : nulls[0] = true;
4995 mail 1896 ECB :
4995 mail 1897 GIC 2 : values[1] = Int32GetDatum(notify->be_pid);
1898 :
1899 2 : if (notify->extra != NULL)
1900 2 : values[2] = CStringGetTextDatum(notify->extra);
1901 : else
4995 mail 1902 UIC 0 : nulls[2] = true;
1903 :
397 michael 1904 GIC 2 : tuplestore_putvalues(rsinfo->setResult, rsinfo->setDesc, values, nulls);
1905 :
4995 mail 1906 CBC 2 : PQfreemem(notify);
4995 mail 1907 GIC 2 : PQconsumeInput(conn);
4995 mail 1908 ECB : }
1909 :
4995 mail 1910 GIC 2 : return (Datum) 0;
4995 mail 1911 ECB : }
1912 :
3833 tgl 1913 : /*
1914 : * Validate the options given to a dblink foreign server or user mapping.
1915 : * Raise an error if any option is invalid.
1916 : *
1917 : * We just check the names of options here, so semantic errors in options,
1918 : * such as invalid numeric format, will be detected at the attempt to connect.
1919 : */
3833 tgl 1920 GIC 3 : PG_FUNCTION_INFO_V1(dblink_fdw_validator);
1921 : Datum
1922 5 : dblink_fdw_validator(PG_FUNCTION_ARGS)
1923 : {
1924 5 : List *options_list = untransformRelOptions(PG_GETARG_DATUM(0));
1925 5 : Oid context = PG_GETARG_OID(1);
1926 : ListCell *cell;
1927 :
1928 : static const PQconninfoOption *options = NULL;
3833 tgl 1929 ECB :
1930 : /*
1931 : * Get list of valid libpq options.
1932 : *
1933 : * To avoid unnecessary work, we get the list once and use it throughout
1934 : * the lifetime of this backend process. We don't need to care about
1935 : * memory context issues, because PQconndefaults allocates with malloc.
1936 : */
3833 tgl 1937 GIC 5 : if (!options)
1938 : {
1939 2 : options = PQconndefaults();
1940 2 : if (!options) /* assume reason for failure is OOM */
3833 tgl 1941 UIC 0 : ereport(ERROR,
1942 : (errcode(ERRCODE_FDW_OUT_OF_MEMORY),
1943 : errmsg("out of memory"),
1944 : errdetail("Could not get libpq's default connection options.")));
3833 tgl 1945 ECB : }
1946 :
1947 : /* Validate each supplied option. */
3833 tgl 1948 GIC 8 : foreach(cell, options_list)
1949 : {
1950 5 : DefElem *def = (DefElem *) lfirst(cell);
1951 :
3833 tgl 1952 CBC 5 : if (!is_valid_dblink_option(options, def->defname, context))
1953 : {
1954 : /*
1955 : * Unknown option, or invalid option for the context specified, so
1956 : * complain about it. Provide a hint with a valid option that
1957 : * looks similar, if there is one.
3833 tgl 1958 ECB : */
1959 : const PQconninfoOption *opt;
1960 : const char *closest_match;
1961 : ClosestMatchState match_state;
205 peter 1962 GNC 2 : bool has_valid_options = false;
3833 tgl 1963 ECB :
205 peter 1964 GNC 2 : initClosestMatch(&match_state, def->defname, 4);
3833 tgl 1965 GIC 80 : for (opt = options; opt->keyword; opt++)
1966 : {
1967 78 : if (is_valid_dblink_option(options, opt->keyword, context))
1968 : {
205 peter 1969 GNC 3 : has_valid_options = true;
1970 3 : updateClosestMatch(&match_state, opt->keyword);
1971 : }
1972 : }
1973 :
1974 2 : closest_match = getClosestMatch(&match_state);
3833 tgl 1975 CBC 2 : ereport(ERROR,
1976 : (errcode(ERRCODE_FDW_OPTION_NAME_NOT_FOUND),
3833 tgl 1977 ECB : errmsg("invalid option \"%s\"", def->defname),
1978 : has_valid_options ? closest_match ?
1979 : errhint("Perhaps you meant the option \"%s\".",
1980 : closest_match) : 0 :
1981 : errhint("There are no valid options in this context.")));
1982 : }
1983 : }
1984 :
3833 tgl 1985 CBC 3 : PG_RETURN_VOID();
1986 : }
3833 tgl 1987 ECB :
1988 :
1989 : /*************************************************************
7969 bruce 1990 : * internal functions
1991 : */
1992 :
1993 :
7655 1994 : /*
1995 : * get_pkey_attnames
1996 : *
1997 : * Get the primary key attnames for the given relation.
1998 : * Return NULL, and set indnkeyatts = 0, if no primary key exists.
1999 : */
2000 : static char **
1828 teodor 2001 GIC 3 : get_pkey_attnames(Relation rel, int16 *indnkeyatts)
2002 : {
2003 : Relation indexRelation;
2004 : ScanKeyData skey;
5564 tgl 2005 ECB : SysScanDesc scan;
2006 : HeapTuple indexTuple;
7522 bruce 2007 : int i;
7522 bruce 2008 CBC 3 : char **result = NULL;
2009 : TupleDesc tupdesc;
2010 :
2011 : /* initialize indnkeyatts to 0 in case no primary key exists */
1828 teodor 2012 GIC 3 : *indnkeyatts = 0;
2013 :
7655 bruce 2014 3 : tupdesc = rel->rd_att;
2015 :
2016 : /* Prepare to scan pg_index for entries having indrelid = this rel. */
1539 andres 2017 3 : indexRelation = table_open(IndexRelationId, AccessShareLock);
5564 tgl 2018 3 : ScanKeyInit(&skey,
7088 tgl 2019 ECB : Anum_pg_index_indrelid,
2020 : BTEqualStrategyNumber, F_OIDEQ,
4682 2021 : ObjectIdGetDatum(RelationGetRelid(rel)));
2022 :
5564 tgl 2023 CBC 3 : scan = systable_beginscan(indexRelation, IndexIndrelidIndexId, true,
2024 : NULL, 1, &skey);
2025 :
2026 3 : while (HeapTupleIsValid(indexTuple = systable_getnext(scan)))
2027 : {
7522 bruce 2028 3 : Form_pg_index index = (Form_pg_index) GETSTRUCT(indexTuple);
7655 bruce 2029 ECB :
7524 2030 : /* we're only interested if it is the primary key */
5564 tgl 2031 GIC 3 : if (index->indisprimary)
7655 bruce 2032 ECB : {
1828 teodor 2033 GIC 3 : *indnkeyatts = index->indnkeyatts;
1828 teodor 2034 CBC 3 : if (*indnkeyatts > 0)
2035 : {
209 peter 2036 GNC 3 : result = palloc_array(char *, *indnkeyatts);
2037 :
1828 teodor 2038 GIC 9 : for (i = 0; i < *indnkeyatts; i++)
6585 tgl 2039 6 : result[i] = SPI_fname(tupdesc, index->indkey.values[i]);
7655 bruce 2040 ECB : }
7655 bruce 2041 CBC 3 : break;
7655 bruce 2042 ECB : }
2043 : }
2044 :
5564 tgl 2045 GIC 3 : systable_endscan(scan);
1539 andres 2046 CBC 3 : table_close(indexRelation, AccessShareLock);
2047 :
7655 bruce 2048 GBC 3 : return result;
7655 bruce 2049 EUB : }
2050 :
6351 tgl 2051 : /*
2052 : * Deconstruct a text[] into C-strings (note any NULL elements will be
2053 : * returned as NULL pointers)
2054 : */
2055 : static char **
6351 tgl 2056 GIC 20 : get_text_array_contents(ArrayType *array, int *numitems)
6351 tgl 2057 ECB : {
6351 tgl 2058 GIC 20 : int ndim = ARR_NDIM(array);
2059 20 : int *dims = ARR_DIMS(array);
2060 : int nitems;
6351 tgl 2061 ECB : int16 typlen;
2062 : bool typbyval;
2063 : char typalign;
2064 : char **values;
2065 : char *ptr;
2066 : bits8 *bitmap;
2067 : int bitmask;
2068 : int i;
2069 :
6351 tgl 2070 GIC 20 : Assert(ARR_ELEMTYPE(array) == TEXTOID);
2071 :
2072 20 : *numitems = nitems = ArrayGetNItems(ndim, dims);
6351 tgl 2073 ECB :
6351 tgl 2074 GIC 20 : get_typlenbyvalalign(ARR_ELEMTYPE(array),
2075 : &typlen, &typbyval, &typalign);
6351 tgl 2076 ECB :
209 peter 2077 GNC 20 : values = palloc_array(char *, nitems);
6351 tgl 2078 ECB :
6351 tgl 2079 CBC 20 : ptr = ARR_DATA_PTR(array);
6351 tgl 2080 GIC 20 : bitmap = ARR_NULLBITMAP(array);
6351 tgl 2081 CBC 20 : bitmask = 1;
6351 tgl 2082 ECB :
6351 tgl 2083 GBC 55 : for (i = 0; i < nitems; i++)
2084 : {
6351 tgl 2085 GIC 35 : if (bitmap && (*bitmap & bitmask) == 0)
2086 : {
6351 tgl 2087 LBC 0 : values[i] = NULL;
2088 : }
6351 tgl 2089 ECB : else
2090 : {
5493 tgl 2091 GIC 35 : values[i] = TextDatumGetCString(PointerGetDatum(ptr));
5847 tgl 2092 CBC 35 : ptr = att_addlength_pointer(ptr, typlen, ptr);
5847 tgl 2093 GIC 35 : ptr = (char *) att_align_nominal(ptr, typalign);
6351 tgl 2094 ECB : }
2095 :
2096 : /* advance bitmap pointer if any */
6351 tgl 2097 CBC 35 : if (bitmap)
6351 tgl 2098 ECB : {
6351 tgl 2099 UIC 0 : bitmask <<= 1;
6351 tgl 2100 LBC 0 : if (bitmask == 0x100)
6351 tgl 2101 ECB : {
6351 tgl 2102 LBC 0 : bitmap++;
6351 tgl 2103 UIC 0 : bitmask = 1;
2104 : }
6351 tgl 2105 ECB : }
2106 : }
2107 :
6351 tgl 2108 GIC 20 : return values;
2109 : }
6351 tgl 2110 ECB :
7622 2111 : static char *
4681 tgl 2112 GIC 4 : get_sql_insert(Relation rel, int *pkattnums, int pknumatts, char **src_pkattvals, char **tgt_pkattvals)
7655 bruce 2113 ECB : {
7522 2114 : char *relname;
2115 : HeapTuple tuple;
2116 : TupleDesc tupdesc;
2117 : int natts;
2118 : StringInfoData buf;
2119 : char *val;
2120 : int key;
2121 : int i;
2122 : bool needComma;
2123 :
6248 neilc 2124 CBC 4 : initStringInfo(&buf);
2125 :
7442 tgl 2126 ECB : /* get relation name including any needed schema prefix and quoting */
4682 tgl 2127 GIC 4 : relname = generate_relation_name(rel);
7442 tgl 2128 ECB :
7655 bruce 2129 CBC 4 : tupdesc = rel->rd_att;
7655 bruce 2130 GIC 4 : natts = tupdesc->natts;
2131 :
4682 tgl 2132 GBC 4 : tuple = get_tuple_of_interest(rel, pkattnums, pknumatts, src_pkattvals);
7524 bruce 2133 CBC 4 : if (!tuple)
7199 tgl 2134 UIC 0 : ereport(ERROR,
7199 tgl 2135 ECB : (errcode(ERRCODE_CARDINALITY_VIOLATION),
2136 : errmsg("source row not found")));
7655 bruce 2137 :
6248 neilc 2138 GIC 4 : appendStringInfo(&buf, "INSERT INTO %s(", relname);
2139 :
7555 tgl 2140 4 : needComma = false;
7655 bruce 2141 CBC 19 : for (i = 0; i < natts; i++)
2142 : {
2058 andres 2143 GIC 15 : Form_pg_attribute att = TupleDescAttr(tupdesc, i);
2144 :
2145 15 : if (att->attisdropped)
7555 tgl 2146 2 : continue;
2147 :
7555 tgl 2148 CBC 13 : if (needComma)
3447 rhaas 2149 GIC 9 : appendStringInfoChar(&buf, ',');
2150 :
6248 neilc 2151 CBC 13 : appendStringInfoString(&buf,
2058 andres 2152 GIC 13 : quote_ident_cstr(NameStr(att->attname)));
7555 tgl 2153 CBC 13 : needComma = true;
2154 : }
7655 bruce 2155 ECB :
3447 rhaas 2156 CBC 4 : appendStringInfoString(&buf, ") VALUES(");
2157 :
7655 bruce 2158 ECB : /*
4681 tgl 2159 : * Note: i is physical column number (counting from 0).
2160 : */
7555 tgl 2161 CBC 4 : needComma = false;
7655 bruce 2162 19 : for (i = 0; i < natts; i++)
2163 : {
2058 andres 2164 15 : if (TupleDescAttr(tupdesc, i)->attisdropped)
7555 tgl 2165 2 : continue;
2166 :
2167 13 : if (needComma)
3447 rhaas 2168 9 : appendStringInfoChar(&buf, ',');
7655 bruce 2169 ECB :
4681 tgl 2170 GIC 13 : key = get_attnum_pk_pos(pkattnums, pknumatts, i);
7655 bruce 2171 EUB :
4681 tgl 2172 GIC 13 : if (key >= 0)
6351 2173 7 : val = tgt_pkattvals[key] ? pstrdup(tgt_pkattvals[key]) : NULL;
7655 bruce 2174 ECB : else
7655 bruce 2175 GIC 6 : val = SPI_getvalue(tuple, tupdesc, i + 1);
2176 :
2177 13 : if (val != NULL)
7655 bruce 2178 ECB : {
6248 neilc 2179 GIC 13 : appendStringInfoString(&buf, quote_literal_cstr(val));
7655 bruce 2180 13 : pfree(val);
2181 : }
2182 : else
3447 rhaas 2183 UIC 0 : appendStringInfoString(&buf, "NULL");
7555 tgl 2184 GIC 13 : needComma = true;
2185 : }
3447 rhaas 2186 4 : appendStringInfoChar(&buf, ')');
2187 :
2061 peter_e 2188 4 : return buf.data;
2189 : }
7655 bruce 2190 ECB :
2191 : static char *
4681 tgl 2192 GIC 4 : get_sql_delete(Relation rel, int *pkattnums, int pknumatts, char **tgt_pkattvals)
7655 bruce 2193 ECB : {
2194 : char *relname;
7522 2195 : TupleDesc tupdesc;
6031 2196 : StringInfoData buf;
2197 : int i;
7655 2198 :
6248 neilc 2199 CBC 4 : initStringInfo(&buf);
6248 neilc 2200 EUB :
2201 : /* get relation name including any needed schema prefix and quoting */
4682 tgl 2202 GIC 4 : relname = generate_relation_name(rel);
2203 :
7655 bruce 2204 CBC 4 : tupdesc = rel->rd_att;
2205 :
6248 neilc 2206 GIC 4 : appendStringInfo(&buf, "DELETE FROM %s WHERE ", relname);
7655 bruce 2207 11 : for (i = 0; i < pknumatts; i++)
2208 : {
4681 tgl 2209 CBC 7 : int pkattnum = pkattnums[i];
2058 andres 2210 7 : Form_pg_attribute attr = TupleDescAttr(tupdesc, pkattnum);
2211 :
7655 bruce 2212 7 : if (i > 0)
3447 rhaas 2213 GIC 3 : appendStringInfoString(&buf, " AND ");
7655 bruce 2214 ECB :
6248 neilc 2215 CBC 7 : appendStringInfoString(&buf,
2058 andres 2216 GIC 7 : quote_ident_cstr(NameStr(attr->attname)));
7655 bruce 2217 ECB :
6351 tgl 2218 CBC 7 : if (tgt_pkattvals[i] != NULL)
6248 neilc 2219 GIC 7 : appendStringInfo(&buf, " = %s",
6351 tgl 2220 CBC 7 : quote_literal_cstr(tgt_pkattvals[i]));
7655 bruce 2221 ECB : else
3447 rhaas 2222 UIC 0 : appendStringInfoString(&buf, " IS NULL");
7655 bruce 2223 ECB : }
2224 :
2061 peter_e 2225 CBC 4 : return buf.data;
7655 bruce 2226 ECB : }
2227 :
7622 tgl 2228 : static char *
4681 tgl 2229 GIC 4 : get_sql_update(Relation rel, int *pkattnums, int pknumatts, char **src_pkattvals, char **tgt_pkattvals)
7655 bruce 2230 ECB : {
2231 : char *relname;
7522 2232 : HeapTuple tuple;
2233 : TupleDesc tupdesc;
2234 : int natts;
2235 : StringInfoData buf;
7522 bruce 2236 EUB : char *val;
4681 tgl 2237 ECB : int key;
2238 : int i;
2239 : bool needComma;
7655 bruce 2240 :
6248 neilc 2241 GIC 4 : initStringInfo(&buf);
6248 neilc 2242 ECB :
2243 : /* get relation name including any needed schema prefix and quoting */
4682 tgl 2244 CBC 4 : relname = generate_relation_name(rel);
7442 tgl 2245 ECB :
7655 bruce 2246 GIC 4 : tupdesc = rel->rd_att;
7655 bruce 2247 CBC 4 : natts = tupdesc->natts;
7655 bruce 2248 ECB :
4682 tgl 2249 GIC 4 : tuple = get_tuple_of_interest(rel, pkattnums, pknumatts, src_pkattvals);
7524 bruce 2250 CBC 4 : if (!tuple)
7199 tgl 2251 LBC 0 : ereport(ERROR,
2252 : (errcode(ERRCODE_CARDINALITY_VIOLATION),
7199 tgl 2253 ECB : errmsg("source row not found")));
2254 :
6248 neilc 2255 CBC 4 : appendStringInfo(&buf, "UPDATE %s SET ", relname);
7655 bruce 2256 ECB :
2257 : /*
4681 tgl 2258 EUB : * Note: i is physical column number (counting from 0).
2259 : */
7555 tgl 2260 GIC 4 : needComma = false;
7655 bruce 2261 CBC 19 : for (i = 0; i < natts; i++)
2262 : {
2058 andres 2263 GIC 15 : Form_pg_attribute attr = TupleDescAttr(tupdesc, i);
2264 :
2265 15 : if (attr->attisdropped)
7555 tgl 2266 2 : continue;
2267 :
2268 13 : if (needComma)
3447 rhaas 2269 CBC 9 : appendStringInfoString(&buf, ", ");
2270 :
6248 neilc 2271 GIC 13 : appendStringInfo(&buf, "%s = ",
2058 andres 2272 13 : quote_ident_cstr(NameStr(attr->attname)));
2273 :
4681 tgl 2274 13 : key = get_attnum_pk_pos(pkattnums, pknumatts, i);
7655 bruce 2275 ECB :
4681 tgl 2276 CBC 13 : if (key >= 0)
6347 bruce 2277 GIC 7 : val = tgt_pkattvals[key] ? pstrdup(tgt_pkattvals[key]) : NULL;
7655 bruce 2278 ECB : else
7655 bruce 2279 GIC 6 : val = SPI_getvalue(tuple, tupdesc, i + 1);
7655 bruce 2280 ECB :
7655 bruce 2281 GIC 13 : if (val != NULL)
2282 : {
6248 neilc 2283 13 : appendStringInfoString(&buf, quote_literal_cstr(val));
7655 bruce 2284 CBC 13 : pfree(val);
2285 : }
2286 : else
6248 neilc 2287 UIC 0 : appendStringInfoString(&buf, "NULL");
7555 tgl 2288 GIC 13 : needComma = true;
2289 : }
2290 :
3447 rhaas 2291 CBC 4 : appendStringInfoString(&buf, " WHERE ");
7655 bruce 2292 ECB :
7655 bruce 2293 CBC 11 : for (i = 0; i < pknumatts; i++)
2294 : {
4681 tgl 2295 7 : int pkattnum = pkattnums[i];
2058 andres 2296 GIC 7 : Form_pg_attribute attr = TupleDescAttr(tupdesc, pkattnum);
2297 :
7655 bruce 2298 7 : if (i > 0)
3447 rhaas 2299 CBC 3 : appendStringInfoString(&buf, " AND ");
2300 :
3447 rhaas 2301 GIC 7 : appendStringInfoString(&buf,
2058 andres 2302 7 : quote_ident_cstr(NameStr(attr->attname)));
2303 :
4681 tgl 2304 7 : val = tgt_pkattvals[i];
2305 :
7655 bruce 2306 7 : if (val != NULL)
6248 neilc 2307 7 : appendStringInfo(&buf, " = %s", quote_literal_cstr(val));
2308 : else
3447 rhaas 2309 UIC 0 : appendStringInfoString(&buf, " IS NULL");
2310 : }
2311 :
2061 peter_e 2312 CBC 4 : return buf.data;
2313 : }
7655 bruce 2314 EUB :
2315 : /*
7655 bruce 2316 ECB : * Return a properly quoted identifier.
2317 : * Uses quote_ident in quote.c
2318 : */
2319 : static char *
7655 bruce 2320 GIC 80 : quote_ident_cstr(char *rawstr)
7655 bruce 2321 ECB : {
7522 2322 : text *rawstr_text;
2323 : text *result_text;
2324 : char *result;
2325 :
5493 tgl 2326 GIC 80 : rawstr_text = cstring_to_text(rawstr);
2219 noah 2327 80 : result_text = DatumGetTextPP(DirectFunctionCall1(quote_ident,
2328 : PointerGetDatum(rawstr_text)));
5493 tgl 2329 80 : result = text_to_cstring(result_text);
2330 :
7655 bruce 2331 CBC 80 : return result;
2332 : }
7655 bruce 2333 ECB :
2334 : static int
4681 tgl 2335 CBC 26 : get_attnum_pk_pos(int *pkattnums, int pknumatts, int key)
2336 : {
7522 bruce 2337 ECB : int i;
7655 2338 :
2339 : /*
7522 2340 : * Not likely a long list anyway, so just scan for the value
7655 2341 : */
7655 bruce 2342 GIC 50 : for (i = 0; i < pknumatts; i++)
4681 tgl 2343 CBC 38 : if (key == pkattnums[i])
7655 bruce 2344 14 : return i;
2345 :
7655 bruce 2346 GIC 12 : return -1;
7655 bruce 2347 ECB : }
2348 :
7622 tgl 2349 : static HeapTuple
4681 tgl 2350 GIC 8 : get_tuple_of_interest(Relation rel, int *pkattnums, int pknumatts, char **src_pkattvals)
7655 bruce 2351 ECB : {
7522 2352 : char *relname;
2353 : TupleDesc tupdesc;
4681 tgl 2354 : int natts;
6248 neilc 2355 : StringInfoData buf;
2356 : int ret;
7522 bruce 2357 : HeapTuple tuple;
2358 : int i;
2359 :
4681 tgl 2360 : /*
2361 : * Connect to SPI manager
2362 : */
4681 tgl 2363 GIC 8 : if ((ret = SPI_connect()) < 0)
4681 tgl 2364 EUB : /* internal error */
4681 tgl 2365 UIC 0 : elog(ERROR, "SPI connect failure - returned %d", ret);
2366 :
6248 neilc 2367 GIC 8 : initStringInfo(&buf);
2368 :
2369 : /* get relation name including any needed schema prefix and quoting */
4682 tgl 2370 CBC 8 : relname = generate_relation_name(rel);
7442 tgl 2371 ECB :
4682 tgl 2372 GIC 8 : tupdesc = rel->rd_att;
4681 2373 8 : natts = tupdesc->natts;
2374 :
2375 : /*
4681 tgl 2376 ECB : * Build sql statement to look up tuple of interest, ie, the one matching
4681 tgl 2377 EUB : * src_pkattvals. We used to use "SELECT *" here, but it's simpler to
2378 : * generate a result tuple that matches the table's physical structure,
2379 : * with NULLs for any dropped columns. Otherwise we have to deal with two
2380 : * different tupdescs and everything's very confusing.
7655 bruce 2381 ECB : */
4681 tgl 2382 GIC 8 : appendStringInfoString(&buf, "SELECT ");
7655 bruce 2383 ECB :
4681 tgl 2384 GIC 38 : for (i = 0; i < natts; i++)
4681 tgl 2385 ECB : {
2058 andres 2386 CBC 30 : Form_pg_attribute attr = TupleDescAttr(tupdesc, i);
2387 :
4681 tgl 2388 30 : if (i > 0)
4681 tgl 2389 GIC 22 : appendStringInfoString(&buf, ", ");
2390 :
2058 andres 2391 30 : if (attr->attisdropped)
4681 tgl 2392 4 : appendStringInfoString(&buf, "NULL");
2393 : else
2394 26 : appendStringInfoString(&buf,
2058 andres 2395 GBC 26 : quote_ident_cstr(NameStr(attr->attname)));
2396 : }
4681 tgl 2397 EUB :
4681 tgl 2398 GIC 8 : appendStringInfo(&buf, " FROM %s WHERE ", relname);
2399 :
7655 bruce 2400 22 : for (i = 0; i < pknumatts; i++)
2401 : {
4681 tgl 2402 14 : int pkattnum = pkattnums[i];
2058 andres 2403 14 : Form_pg_attribute attr = TupleDescAttr(tupdesc, pkattnum);
2404 :
7655 bruce 2405 14 : if (i > 0)
3447 rhaas 2406 6 : appendStringInfoString(&buf, " AND ");
2407 :
6248 neilc 2408 14 : appendStringInfoString(&buf,
2058 andres 2409 14 : quote_ident_cstr(NameStr(attr->attname)));
2410 :
6351 tgl 2411 14 : if (src_pkattvals[i] != NULL)
6248 neilc 2412 CBC 14 : appendStringInfo(&buf, " = %s",
6351 tgl 2413 GIC 14 : quote_literal_cstr(src_pkattvals[i]));
2414 : else
3447 rhaas 2415 UIC 0 : appendStringInfoString(&buf, " IS NULL");
2416 : }
2417 :
7655 bruce 2418 ECB : /*
2419 : * Retrieve the desired tuple
2420 : */
6248 neilc 2421 CBC 8 : ret = SPI_exec(buf.data, 0);
6248 neilc 2422 GIC 8 : pfree(buf.data);
7655 bruce 2423 ECB :
7655 bruce 2424 EUB : /*
2425 : * Only allow one qualifying tuple
2426 : */
7655 bruce 2427 CBC 8 : if ((ret == SPI_OK_SELECT) && (SPI_processed > 1))
7199 tgl 2428 UIC 0 : ereport(ERROR,
2429 : (errcode(ERRCODE_CARDINALITY_VIOLATION),
2430 : errmsg("source criteria matched more than one record")));
2431 :
7655 bruce 2432 GIC 8 : else if (ret == SPI_OK_SELECT && SPI_processed == 1)
2433 : {
2434 8 : SPITupleTable *tuptable = SPI_tuptable;
2435 :
2436 8 : tuple = SPI_copytuple(tuptable->vals[0]);
7074 mail 2437 CBC 8 : SPI_finish();
2438 :
7655 bruce 2439 GIC 8 : return tuple;
2440 : }
2441 : else
2442 : {
7655 bruce 2443 ECB : /*
2444 : * no qualifying tuples
2445 : */
7074 mail 2446 LBC 0 : SPI_finish();
2447 :
7655 bruce 2448 0 : return NULL;
2449 : }
7655 bruce 2450 ECB :
2451 : /*
2452 : * never reached, but keep compiler quiet
2453 : */
2454 : return NULL;
2455 : }
2456 :
2457 : /*
2458 : * Open the relation named by relname_text, acquire specified type of lock,
2459 : * verify we have specified permissions.
4682 tgl 2460 : * Caller must close rel when done with it.
2461 : */
2462 : static Relation
4682 tgl 2463 CBC 21 : get_rel_from_relname(text *relname_text, LOCKMODE lockmode, AclMode aclmode)
7655 bruce 2464 ECB : {
7622 tgl 2465 : RangeVar *relvar;
2466 : Relation rel;
2467 : AclResult aclresult;
7655 bruce 2468 :
6526 neilc 2469 CBC 21 : relvar = makeRangeVarFromNameList(textToQualifiedNameList(relname_text));
1539 andres 2470 GIC 21 : rel = table_openrv(relvar, lockmode);
7655 bruce 2471 ECB :
4682 tgl 2472 GIC 21 : aclresult = pg_class_aclcheck(RelationGetRelid(rel), GetUserId(),
2473 : aclmode);
2474 21 : if (aclresult != ACLCHECK_OK)
1954 peter_e 2475 LBC 0 : aclcheck_error(aclresult, get_relkind_objtype(rel->rd_rel->relkind),
4682 tgl 2476 UIC 0 : RelationGetRelationName(rel));
2477 :
4682 tgl 2478 GIC 21 : return rel;
7655 bruce 2479 ECB : }
2480 :
2481 : /*
7442 tgl 2482 : * generate_relation_name - copied from ruleutils.c
2483 : * Compute the name to display for a relation
2484 : *
2485 : * The result includes all necessary quoting and schema-prefixing.
2486 : */
2487 : static char *
4682 tgl 2488 GIC 20 : generate_relation_name(Relation rel)
2489 : {
2490 : char *nspname;
2491 : char *result;
2492 :
7442 tgl 2493 ECB : /* Qualify the name if not visible in search path */
4682 tgl 2494 CBC 20 : if (RelationIsVisible(RelationGetRelid(rel)))
7442 tgl 2495 GIC 15 : nspname = NULL;
7442 tgl 2496 ECB : else
4682 tgl 2497 CBC 5 : nspname = get_namespace_name(rel->rd_rel->relnamespace);
7442 tgl 2498 ECB :
4682 tgl 2499 GIC 20 : result = quote_qualified_identifier(nspname, RelationGetRelationName(rel));
2500 :
7442 tgl 2501 CBC 20 : return result;
2502 : }
7228 bruce 2503 ECB :
2504 :
2505 : static remoteConn *
7228 bruce 2506 CBC 75 : getConnectionByName(const char *name)
2507 : {
2508 : remoteConnHashEnt *hentry;
2509 : char *key;
2510 :
7188 2511 75 : if (!remoteConnHash)
2512 2 : remoteConnHash = createConnHash();
7228 bruce 2513 ECB :
4693 itagaki.takahiro 2514 GIC 75 : key = pstrdup(name);
4518 2515 75 : truncate_identifier(key, strlen(key), false);
7188 bruce 2516 CBC 75 : hentry = (remoteConnHashEnt *) hash_search(remoteConnHash,
2517 : key, HASH_FIND, NULL);
2518 :
7188 bruce 2519 GIC 75 : if (hentry)
2061 peter_e 2520 68 : return hentry->rconn;
2521 :
2061 peter_e 2522 CBC 7 : return NULL;
7228 bruce 2523 EUB : }
2524 :
7228 bruce 2525 ECB : static HTAB *
7228 bruce 2526 CBC 3 : createConnHash(void)
7228 bruce 2527 ECB : {
2528 : HASHCTL ctl;
2529 :
7228 bruce 2530 CBC 3 : ctl.keysize = NAMEDATALEN;
7228 bruce 2531 GBC 3 : ctl.entrysize = sizeof(remoteConnHashEnt);
2532 :
845 tgl 2533 GIC 3 : return hash_create("Remote Con hash", NUMCONN, &ctl,
845 tgl 2534 ECB : HASH_ELEM | HASH_STRINGS);
2535 : }
2536 :
7199 2537 : static void
5050 bruce 2538 GIC 10 : createNewConnection(const char *name, remoteConn *rconn)
7228 bruce 2539 ECB : {
2540 : remoteConnHashEnt *hentry;
7188 bruce 2541 EUB : bool found;
2542 : char *key;
7228 2543 :
7188 bruce 2544 GBC 10 : if (!remoteConnHash)
7199 tgl 2545 1 : remoteConnHash = createConnHash();
2546 :
4693 itagaki.takahiro 2547 10 : key = pstrdup(name);
4693 itagaki.takahiro 2548 GIC 10 : truncate_identifier(key, strlen(key), true);
7228 bruce 2549 10 : hentry = (remoteConnHashEnt *) hash_search(remoteConnHash, key,
2550 : HASH_ENTER, &found);
2551 :
7188 2552 10 : if (found)
2553 : {
76 andres 2554 GNC 1 : libpqsrv_disconnect(rconn->conn);
4687 itagaki.takahiro 2555 GIC 1 : pfree(rconn);
2556 :
7199 tgl 2557 1 : ereport(ERROR,
2558 : (errcode(ERRCODE_DUPLICATE_OBJECT),
2559 : errmsg("duplicate connection name")));
2560 : }
2561 :
6392 bruce 2562 CBC 9 : hentry->rconn = rconn;
5905 peter_e 2563 GIC 9 : strlcpy(hentry->name, name, sizeof(hentry->name));
7228 bruce 2564 CBC 9 : }
2565 :
2566 : static void
7228 bruce 2567 GIC 8 : deleteConnection(const char *name)
7228 bruce 2568 ECB : {
2569 : remoteConnHashEnt *hentry;
7188 2570 : bool found;
4693 itagaki.takahiro 2571 : char *key;
2572 :
7188 bruce 2573 CBC 8 : if (!remoteConnHash)
7188 bruce 2574 UIC 0 : remoteConnHash = createConnHash();
7228 bruce 2575 ECB :
4693 itagaki.takahiro 2576 GIC 8 : key = pstrdup(name);
4518 itagaki.takahiro 2577 CBC 8 : truncate_identifier(key, strlen(key), false);
7228 bruce 2578 GIC 8 : hentry = (remoteConnHashEnt *) hash_search(remoteConnHash,
7228 bruce 2579 EUB : key, HASH_REMOVE, &found);
2580 :
7188 bruce 2581 GIC 8 : if (!hentry)
7199 tgl 2582 UIC 0 : ereport(ERROR,
2583 : (errcode(ERRCODE_UNDEFINED_OBJECT),
7199 tgl 2584 ECB : errmsg("undefined connection name")));
7228 bruce 2585 GIC 8 : }
2586 :
5575 tgl 2587 ECB : static void
1 sfrost 2588 CBC 19 : dblink_security_check(PGconn *conn, remoteConn *rconn)
2589 : {
1 sfrost 2590 GIC 19 : if (!superuser())
2591 : {
1 sfrost 2592 UIC 0 : if (!PQconnectionUsedPassword(conn))
1 sfrost 2593 ECB : {
1 sfrost 2594 UNC 0 : libpqsrv_disconnect(conn);
1 sfrost 2595 UIC 0 : if (rconn)
2596 0 : pfree(rconn);
2597 :
2598 0 : ereport(ERROR,
2599 : (errcode(ERRCODE_S_R_E_PROHIBITED_SQL_STATEMENT_ATTEMPTED),
2600 : errmsg("password is required"),
2601 : errdetail("Non-superuser cannot connect if the server does not request a password."),
2602 : errhint("Target server's authentication method must be changed.")));
1 sfrost 2603 ECB : }
2604 : }
5575 tgl 2605 GIC 19 : }
2606 :
5312 tgl 2607 ECB : /*
1 sfrost 2608 : * For non-superusers, insist that the connstr specify a password. This
2609 : * prevents a password from being picked up from .pgpass, a service file,
2610 : * the environment, etc. We don't want the postgres user's passwords
2611 : * to be accessible to non-superusers.
2612 : */
2613 : static void
1 sfrost 2614 GIC 22 : dblink_connstr_check(const char *connstr)
2615 : {
2616 22 : if (!superuser())
2617 : {
2618 : PQconninfoOption *options;
2619 : PQconninfoOption *option;
1 sfrost 2620 CBC 1 : bool connstr_gives_password = false;
1 sfrost 2621 ECB :
1 sfrost 2622 GIC 1 : options = PQconninfoParse(connstr, NULL);
1 sfrost 2623 CBC 1 : if (options)
2624 : {
2625 40 : for (option = options; option->keyword != NULL; option++)
5312 tgl 2626 ECB : {
1 sfrost 2627 GIC 39 : if (strcmp(option->keyword, "password") == 0)
2628 : {
2629 1 : if (option->val != NULL && option->val[0] != '\0')
2630 : {
1 sfrost 2631 UIC 0 : connstr_gives_password = true;
1 sfrost 2632 UBC 0 : break;
2633 : }
5312 tgl 2634 ECB : }
2635 : }
1 sfrost 2636 CBC 1 : PQconninfoFree(options);
5312 tgl 2637 ECB : }
2638 :
1 sfrost 2639 GIC 1 : if (!connstr_gives_password)
2640 1 : ereport(ERROR,
2641 : (errcode(ERRCODE_S_R_E_PROHIBITED_SQL_STATEMENT_ATTEMPTED),
2642 : errmsg("password is required"),
2643 : errdetail("Non-superusers must provide a password in the connection string.")));
1 sfrost 2644 ECB : }
5312 tgl 2645 GBC 21 : }
2646 :
2647 : /*
2648 : * Report an error received from the remote server
2649 : *
2650 : * res: the received error result (will be freed)
2651 : * fail: true for ERROR ereport, false for NOTICE
2652 : * fmt and following args: sprintf-style format and values for errcontext;
1844 tgl 2653 ECB : * the resulting string should be worded like "while <some action>"
2654 : */
2655 : static void
2299 mail 2656 GIC 12 : dblink_res_error(PGconn *conn, const char *conname, PGresult *res,
2657 : bool fail, const char *fmt,...)
2658 : {
2659 : int level;
5393 2660 12 : char *pg_diag_sqlstate = PQresultErrorField(res, PG_DIAG_SQLSTATE);
2661 12 : char *pg_diag_message_primary = PQresultErrorField(res, PG_DIAG_MESSAGE_PRIMARY);
5393 mail 2662 CBC 12 : char *pg_diag_message_detail = PQresultErrorField(res, PG_DIAG_MESSAGE_DETAIL);
2663 12 : char *pg_diag_message_hint = PQresultErrorField(res, PG_DIAG_MESSAGE_HINT);
2664 12 : char *pg_diag_context = PQresultErrorField(res, PG_DIAG_CONTEXT);
2665 : int sqlstate;
5393 mail 2666 ECB : char *message_primary;
2667 : char *message_detail;
2668 : char *message_hint;
2669 : char *message_context;
2670 : va_list ap;
2671 : char dblink_context_msg[512];
2672 :
5393 mail 2673 GIC 12 : if (fail)
2674 3 : level = ERROR;
2675 : else
2676 9 : level = NOTICE;
2677 :
2678 12 : if (pg_diag_sqlstate)
5393 mail 2679 CBC 12 : sqlstate = MAKE_SQLSTATE(pg_diag_sqlstate[0],
2680 : pg_diag_sqlstate[1],
2681 : pg_diag_sqlstate[2],
2682 : pg_diag_sqlstate[3],
2683 : pg_diag_sqlstate[4]);
2684 : else
5393 mail 2685 LBC 0 : sqlstate = ERRCODE_CONNECTION_FAILURE;
2686 :
2296 peter_e 2687 CBC 12 : message_primary = xpstrdup(pg_diag_message_primary);
2296 peter_e 2688 GIC 12 : message_detail = xpstrdup(pg_diag_message_detail);
2689 12 : message_hint = xpstrdup(pg_diag_message_hint);
2690 12 : message_context = xpstrdup(pg_diag_context);
2691 :
2692 : /*
2693 : * If we don't get a message from the PGresult, try the PGconn. This is
2694 : * needed because for connection-level failures, PQexec may just return
2695 : * NULL, not a PGresult at all.
2696 : */
2299 mail 2697 CBC 12 : if (message_primary == NULL)
2232 peter_e 2698 UIC 0 : message_primary = pchomp(PQerrorMessage(conn));
2699 :
2700 : /*
2701 : * Now that we've copied all the data we need out of the PGresult, it's
2702 : * safe to free it. We must do this to avoid PGresult leakage. We're
2703 : * leaking all the strings too, but those are in palloc'd memory that will
2704 : * get cleaned up eventually.
2705 : */
280 peter 2706 GNC 12 : PQclear(res);
5393 mail 2707 ECB :
1844 tgl 2708 : /*
1844 tgl 2709 EUB : * Format the basic errcontext string. Below, we'll add on something
2710 : * about the connection name. That's a violation of the translatability
2711 : * guidelines about constructing error messages out of parts, but since
2712 : * there's no translation support for dblink, there's no need to worry
2713 : * about that (yet).
2714 : */
1844 tgl 2715 GIC 12 : va_start(ap, fmt);
1844 tgl 2716 CBC 12 : vsnprintf(dblink_context_msg, sizeof(dblink_context_msg), fmt, ap);
2717 12 : va_end(ap);
5393 mail 2718 ECB :
5393 mail 2719 GIC 12 : ereport(level,
5050 bruce 2720 ECB : (errcode(sqlstate),
2721 : (message_primary != NULL && message_primary[0] != '\0') ?
492 fujii 2722 : errmsg_internal("%s", message_primary) :
2300 mail 2723 : errmsg("could not obtain message string for remote error"),
4285 tgl 2724 : message_detail ? errdetail_internal("%s", message_detail) : 0,
2725 : message_hint ? errhint("%s", message_hint) : 0,
1844 2726 : message_context ? (errcontext("%s", message_context)) : 0,
2727 : conname ?
2728 : (errcontext("%s on dblink connection named \"%s\"",
2729 : dblink_context_msg, conname)) :
2730 : (errcontext("%s on unnamed dblink connection",
2731 : dblink_context_msg))));
5393 mail 2732 GBC 9 : }
2733 :
5055 mail 2734 ECB : /*
2735 : * Obtain connection string for a foreign server
5055 mail 2736 EUB : */
2737 : static char *
5055 mail 2738 GBC 22 : get_connect_string(const char *servername)
5055 mail 2739 EUB : {
5050 bruce 2740 GBC 22 : ForeignServer *foreign_server = NULL;
2741 : UserMapping *user_mapping;
2742 : ListCell *cell;
2153 bruce 2743 ECB : StringInfoData buf;
2744 : ForeignDataWrapper *fdw;
5050 2745 : AclResult aclresult;
2746 : char *srvname;
5055 mail 2747 :
2299 2748 : static const PQconninfoOption *options = NULL;
2749 :
2221 peter_e 2750 GIC 22 : initStringInfo(&buf);
2751 :
2299 mail 2752 ECB : /*
2753 : * Get list of valid libpq options.
2754 : *
2755 : * To avoid unnecessary work, we get the list once and use it throughout
2756 : * the lifetime of this backend process. We don't need to care about
2757 : * memory context issues, because PQconndefaults allocates with malloc.
2758 : */
2299 mail 2759 CBC 22 : if (!options)
2760 : {
2299 mail 2761 GIC 3 : options = PQconndefaults();
2299 mail 2762 CBC 3 : if (!options) /* assume reason for failure is OOM */
2299 mail 2763 UIC 0 : ereport(ERROR,
2764 : (errcode(ERRCODE_FDW_OUT_OF_MEMORY),
2299 mail 2765 ECB : errmsg("out of memory"),
2766 : errdetail("Could not get libpq's default connection options.")));
2767 : }
2768 :
2769 : /* first gather the server connstr options */
4693 itagaki.takahiro 2770 GIC 22 : srvname = pstrdup(servername);
4687 2771 22 : truncate_identifier(srvname, strlen(srvname), false);
4693 2772 22 : foreign_server = GetForeignServerByName(srvname, true);
2773 :
5055 mail 2774 CBC 22 : if (foreign_server)
2775 : {
5050 bruce 2776 GIC 2 : Oid serverid = foreign_server->serverid;
2777 2 : Oid fdwid = foreign_server->fdwid;
2778 2 : Oid userid = GetUserId();
5055 mail 2779 ECB :
5055 mail 2780 GIC 2 : user_mapping = GetUserMapping(userid, serverid);
5050 bruce 2781 CBC 2 : fdw = GetForeignDataWrapper(fdwid);
2782 :
5055 mail 2783 ECB : /* Check permissions, user must have usage on the server. */
147 peter 2784 GNC 2 : aclresult = object_aclcheck(ForeignServerRelationId, serverid, userid, ACL_USAGE);
5055 mail 2785 CBC 2 : if (aclresult != ACLCHECK_OK)
1954 peter_e 2786 UIC 0 : aclcheck_error(aclresult, OBJECT_FOREIGN_SERVER, foreign_server->servername);
2787 :
5050 bruce 2788 CBC 2 : foreach(cell, fdw->options)
2789 : {
5050 bruce 2790 UIC 0 : DefElem *def = lfirst(cell);
2791 :
2299 mail 2792 0 : if (is_valid_dblink_option(options, def->defname, ForeignDataWrapperRelationId))
2221 peter_e 2793 0 : appendStringInfo(&buf, "%s='%s' ", def->defname,
2299 mail 2794 0 : escape_param_str(strVal(def->arg)));
2795 : }
2796 :
5050 bruce 2797 GIC 6 : foreach(cell, foreign_server->options)
2798 : {
2799 4 : DefElem *def = lfirst(cell);
2800 :
2299 mail 2801 4 : if (is_valid_dblink_option(options, def->defname, ForeignServerRelationId))
2221 peter_e 2802 4 : appendStringInfo(&buf, "%s='%s' ", def->defname,
2299 mail 2803 4 : escape_param_str(strVal(def->arg)));
2804 : }
2805 :
5050 bruce 2806 4 : foreach(cell, user_mapping->options)
5055 mail 2807 ECB : {
2808 :
5050 bruce 2809 GIC 2 : DefElem *def = lfirst(cell);
2810 :
2299 mail 2811 CBC 2 : if (is_valid_dblink_option(options, def->defname, UserMappingRelationId))
2221 peter_e 2812 2 : appendStringInfo(&buf, "%s='%s' ", def->defname,
2299 mail 2813 GIC 2 : escape_param_str(strVal(def->arg)));
2814 : }
2815 :
2221 peter_e 2816 CBC 2 : return buf.data;
2817 : }
2818 : else
5055 mail 2819 20 : return NULL;
5055 mail 2820 EUB : }
2821 :
2822 : /*
2823 : * Escaping libpq connect parameter strings.
2824 : *
5055 mail 2825 ECB : * Replaces "'" with "\'" and "\" with "\\".
2826 : */
2827 : static char *
5055 mail 2828 GIC 6 : escape_param_str(const char *str)
5055 mail 2829 ECB : {
2830 : const char *cp;
2153 bruce 2831 : StringInfoData buf;
2832 :
2221 peter_e 2833 GIC 6 : initStringInfo(&buf);
2834 :
5055 mail 2835 64 : for (cp = str; *cp; cp++)
5055 mail 2836 ECB : {
5055 mail 2837 CBC 58 : if (*cp == '\\' || *cp == '\'')
2221 peter_e 2838 UIC 0 : appendStringInfoChar(&buf, '\\');
2221 peter_e 2839 GIC 58 : appendStringInfoChar(&buf, *cp);
2840 : }
2841 :
2221 peter_e 2842 CBC 6 : return buf.data;
5055 mail 2843 ECB : }
2844 :
2845 : /*
4681 tgl 2846 : * Validate the PK-attnums argument for dblink_build_sql_insert() and related
2847 : * functions, and translate to the internal representation.
2848 : *
2849 : * The user supplies an int2vector of 1-based logical attnums, plus a count
2850 : * argument (the need for the separate count argument is historical, but we
2851 : * still check it). We check that each attnum corresponds to a valid,
2852 : * non-dropped attribute of the rel. We do *not* prevent attnums from being
2853 : * listed twice, though the actual use-case for such things is dubious.
2854 : * Note that before Postgres 9.0, the user's attnums were interpreted as
2855 : * physical not logical column numbers; this was changed for future-proofing.
4681 tgl 2856 EUB : *
2857 : * The internal representation is a palloc'd int array of 0-based physical
2858 : * attnums.
2859 : */
4681 tgl 2860 ECB : static void
4681 tgl 2861 GIC 18 : validate_pkattnums(Relation rel,
2862 : int2vector *pkattnums_arg, int32 pknumatts_arg,
2863 : int **pkattnums, int *pknumatts)
2864 : {
2865 18 : TupleDesc tupdesc = rel->rd_att;
2866 18 : int natts = tupdesc->natts;
2867 : int i;
2868 :
2869 : /* Don't take more array elements than there are */
2870 18 : pknumatts_arg = Min(pknumatts_arg, pkattnums_arg->dim1);
2871 :
2872 : /* Must have at least one pk attnum selected */
2873 18 : if (pknumatts_arg <= 0)
4681 tgl 2874 UIC 0 : ereport(ERROR,
2875 : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
2876 : errmsg("number of key attributes must be > 0")));
2877 :
4681 tgl 2878 ECB : /* Allocate output array */
209 peter 2879 GNC 18 : *pkattnums = palloc_array(int, pknumatts_arg);
4681 tgl 2880 GIC 18 : *pknumatts = pknumatts_arg;
2881 :
2882 : /* Validate attnums and convert to internal form */
2883 57 : for (i = 0; i < pknumatts_arg; i++)
4813 mail 2884 ECB : {
4660 bruce 2885 GIC 45 : int pkattnum = pkattnums_arg->values[i];
4660 bruce 2886 ECB : int lnum;
2887 : int j;
2888 :
4681 tgl 2889 : /* Can throw error immediately if out of range */
4681 tgl 2890 CBC 45 : if (pkattnum <= 0 || pkattnum > natts)
4681 tgl 2891 GIC 6 : ereport(ERROR,
2892 : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
4681 tgl 2893 ECB : errmsg("invalid attribute number %d", pkattnum)));
2894 :
2895 : /* Identify which physical column has this logical number */
4681 tgl 2896 GIC 39 : lnum = 0;
4681 tgl 2897 CBC 69 : for (j = 0; j < natts; j++)
4681 tgl 2898 ECB : {
2899 : /* dropped columns don't count */
2058 andres 2900 GIC 69 : if (TupleDescAttr(tupdesc, j)->attisdropped)
4681 tgl 2901 3 : continue;
2902 :
2903 66 : if (++lnum == pkattnum)
4681 tgl 2904 CBC 39 : break;
2905 : }
4681 tgl 2906 ECB :
4681 tgl 2907 CBC 39 : if (j < natts)
4681 tgl 2908 GIC 39 : (*pkattnums)[i] = j;
2909 : else
4681 tgl 2910 UIC 0 : ereport(ERROR,
4681 tgl 2911 ECB : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
2912 : errmsg("invalid attribute number %d", pkattnum)));
2913 : }
4813 mail 2914 GIC 12 : }
3833 tgl 2915 ECB :
2916 : /*
2917 : * Check if the specified connection option is valid.
2918 : *
2919 : * We basically allow whatever libpq thinks is an option, with these
2920 : * restrictions:
2921 : * debug options: disallowed
2922 : * "client_encoding": disallowed
2923 : * "user": valid only in USER MAPPING options
2924 : * secure options (eg password): valid only in USER MAPPING options
2925 : * others: valid only in FOREIGN SERVER options
2926 : *
2927 : * We disallow client_encoding because it would be overridden anyway via
2928 : * PQclientEncoding; allowing it to be specified would merely promote
2929 : * confusion.
2930 : */
2931 : static bool
3833 tgl 2932 GIC 89 : is_valid_dblink_option(const PQconninfoOption *options, const char *option,
2933 : Oid context)
2934 : {
2935 : const PQconninfoOption *opt;
3833 tgl 2936 ECB :
2937 : /* Look up the option in libpq result */
3833 tgl 2938 GIC 1697 : for (opt = options; opt->keyword; opt++)
3833 tgl 2939 ECB : {
3833 tgl 2940 GIC 1695 : if (strcmp(opt->keyword, option) == 0)
3833 tgl 2941 CBC 87 : break;
3833 tgl 2942 ECB : }
3833 tgl 2943 GIC 89 : if (opt->keyword == NULL)
2944 2 : return false;
2945 :
2946 : /* Disallow debug options (particularly "replication") */
2947 87 : if (strchr(opt->dispchar, 'D'))
2948 2 : return false;
2949 :
2950 : /* Disallow "client_encoding" */
3833 tgl 2951 CBC 85 : if (strcmp(opt->keyword, "client_encoding") == 0)
3833 tgl 2952 GBC 2 : return false;
2953 :
2954 : /*
2955 : * If the option is "user" or marked secure, it should be specified only
2956 : * in USER MAPPING. Others should be specified only in SERVER.
2957 : */
3833 tgl 2958 CBC 83 : if (strcmp(opt->keyword, "user") == 0 || strchr(opt->dispchar, '*'))
3833 tgl 2959 ECB : {
3833 tgl 2960 GIC 9 : if (context != UserMappingRelationId)
3833 tgl 2961 CBC 3 : return false;
3833 tgl 2962 ECB : }
2963 : else
2964 : {
3833 tgl 2965 CBC 74 : if (context != ForeignServerRelationId)
2966 68 : return false;
2967 : }
2968 :
2969 12 : return true;
2970 : }
2971 :
2972 : /*
2973 : * Copy the remote session's values of GUCs that affect datatype I/O
3670 tgl 2974 ECB : * and apply them locally in a new GUC nesting level. Returns the new
2975 : * nestlevel (which is needed by restoreLocalGucs to undo the settings),
2976 : * or -1 if no new nestlevel was needed.
2977 : *
2978 : * We use the equivalent of a function SET option to allow the settings to
2979 : * persist only until the caller calls restoreLocalGucs. If an error is
2980 : * thrown in between, guc.c will take care of undoing the settings.
2981 : */
2982 : static int
3670 tgl 2983 GIC 32 : applyRemoteGucs(PGconn *conn)
3670 tgl 2984 ECB : {
2985 : static const char *const GUCsAffectingIO[] = {
2986 : "DateStyle",
2987 : "IntervalStyle"
2988 : };
2989 :
3670 tgl 2990 GIC 32 : int nestlevel = -1;
2991 : int i;
2992 :
2993 96 : for (i = 0; i < lengthof(GUCsAffectingIO); i++)
2994 : {
2995 64 : const char *gucName = GUCsAffectingIO[i];
2996 64 : const char *remoteVal = PQparameterStatus(conn, gucName);
2997 : const char *localVal;
2998 :
2999 : /*
3000 : * If the remote server is pre-8.4, it won't have IntervalStyle, but
3001 : * that's okay because its output format won't be ambiguous. So just
3002 : * skip the GUC if we don't get a value for it. (We might eventually
3003 : * need more complicated logic with remote-version checks here.)
3004 : */
3005 64 : if (remoteVal == NULL)
3670 tgl 3006 UIC 0 : continue;
3007 :
3008 : /*
3009 : * Avoid GUC-setting overhead if the remote and local GUCs already
3010 : * have the same value.
3011 : */
3670 tgl 3012 GIC 64 : localVal = GetConfigOption(gucName, false, false);
3013 64 : Assert(localVal != NULL);
3014 :
3015 64 : if (strcmp(remoteVal, localVal) == 0)
3016 47 : continue;
3017 :
3018 : /* Create new GUC nest level if we didn't already */
3019 17 : if (nestlevel < 0)
3020 9 : nestlevel = NewGUCNestLevel();
3021 :
3022 : /* Apply the option (this will throw error on failure) */
3023 17 : (void) set_config_option(gucName, remoteVal,
3024 : PGC_USERSET, PGC_S_SESSION,
3025 : GUC_ACTION_SAVE, true, 0, false);
3026 : }
3027 :
3028 32 : return nestlevel;
3029 : }
3030 :
3031 : /*
3032 : * Restore local GUCs after they have been overlaid with remote settings.
3033 : */
3034 : static void
3035 33 : restoreLocalGucs(int nestlevel)
3036 : {
3037 : /* Do nothing if no new nestlevel was created */
3038 33 : if (nestlevel > 0)
3039 8 : AtEOXact_GUC(true, nestlevel);
3040 33 : }
|