LCOV - differential code coverage report
Current view: top level - contrib/dblink - dblink.c (source / functions) Coverage Total Hit UNC LBC UIC UBC GBC GIC GNC CBC EUB ECB DUB DCB
Current: Differential Code Coverage HEAD vs 15 Lines: 86.9 % 1078 937 3 42 90 6 38 568 22 309 93 552 4 34
Current Date: 2023-04-08 17:13:01 Functions: 97.4 % 76 74 2 72 1 1 2 69 3
Baseline: 15 Line coverage date bins:
Baseline Date: 2023-04-08 15:09:40 [..60] days: 68.2 % 22 15 1 5 1 10 5 8
Legend: Lines: hit not hit (60,120] days: 88.9 % 9 8 1 8
(120,180] days: 100.0 % 2 2 1 1
(180,240] days: 100.0 % 12 12 12
(240..) days: 87.1 % 1033 900 1 42 85 5 38 558 1 303 93 544
Function coverage date bins:
[..60] days: 66.7 % 3 2 2 1
(240..) days: 50.0 % 144 72 2 70 1 1 2 68

 Age         Owner                  TLA  Line data    Source code
                                  1                 : /*
                                  2                 :  * dblink.c
                                  3                 :  *
                                  4                 :  * Functions returning results from a remote database
                                  5                 :  *
                                  6                 :  * Joe Conway <mail@joeconway.com>
                                  7                 :  * And contributors:
                                  8                 :  * Darko Prenosil <Darko.Prenosil@finteh.hr>
                                  9                 :  * Shridhar Daithankar <shridhar_daithankar@persistent.co.in>
                                 10                 :  *
                                 11                 :  * contrib/dblink/dblink.c
                                 12                 :  * Copyright (c) 2001-2023, PostgreSQL Global Development Group
                                 13                 :  * ALL RIGHTS RESERVED;
                                 14                 :  *
                                 15                 :  * Permission to use, copy, modify, and distribute this software and its
                                 16                 :  * documentation for any purpose, without fee, and without a written agreement
                                 17                 :  * is hereby granted, provided that the above copyright notice and this
                                 18                 :  * paragraph and the following two paragraphs appear in all copies.
                                 19                 :  *
                                 20                 :  * IN NO EVENT SHALL THE AUTHOR OR DISTRIBUTORS BE LIABLE TO ANY PARTY FOR
                                 21                 :  * DIRECT, INDIRECT, SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES, INCLUDING
                                 22                 :  * LOST PROFITS, ARISING OUT OF THE USE OF THIS SOFTWARE AND ITS
                                 23                 :  * DOCUMENTATION, EVEN IF THE AUTHOR OR DISTRIBUTORS HAVE BEEN ADVISED OF THE
                                 24                 :  * POSSIBILITY OF SUCH DAMAGE.
                                 25                 :  *
                                 26                 :  * THE AUTHOR AND DISTRIBUTORS SPECIFICALLY DISCLAIMS ANY WARRANTIES,
                                 27                 :  * INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY
                                 28                 :  * AND FITNESS FOR A PARTICULAR PURPOSE.  THE SOFTWARE PROVIDED HEREUNDER IS
                                 29                 :  * ON AN "AS IS" BASIS, AND THE AUTHOR AND DISTRIBUTORS HAS NO OBLIGATIONS TO
                                 30                 :  * PROVIDE MAINTENANCE, SUPPORT, UPDATES, ENHANCEMENTS, OR MODIFICATIONS.
                                 31                 :  *
                                 32                 :  */
                                 33                 : #include "postgres.h"
                                 34                 : 
                                 35                 : #include <limits.h>
                                 36                 : 
                                 37                 : #include "access/htup_details.h"
                                 38                 : #include "access/relation.h"
                                 39                 : #include "access/reloptions.h"
                                 40                 : #include "access/table.h"
                                 41                 : #include "catalog/namespace.h"
                                 42                 : #include "catalog/pg_foreign_data_wrapper.h"
                                 43                 : #include "catalog/pg_foreign_server.h"
                                 44                 : #include "catalog/pg_type.h"
                                 45                 : #include "catalog/pg_user_mapping.h"
                                 46                 : #include "executor/spi.h"
                                 47                 : #include "foreign/foreign.h"
                                 48                 : #include "funcapi.h"
                                 49                 : #include "lib/stringinfo.h"
                                 50                 : #include "libpq-fe.h"
                                 51                 : #include "libpq/libpq-be-fe-helpers.h"
                                 52                 : #include "mb/pg_wchar.h"
                                 53                 : #include "miscadmin.h"
                                 54                 : #include "parser/scansup.h"
                                 55                 : #include "utils/acl.h"
                                 56                 : #include "utils/builtins.h"
                                 57                 : #include "utils/fmgroids.h"
                                 58                 : #include "utils/guc.h"
                                 59                 : #include "utils/lsyscache.h"
                                 60                 : #include "utils/memutils.h"
                                 61                 : #include "utils/rel.h"
                                 62                 : #include "utils/varlena.h"
                                 63                 : 
 6158 tgl                        64 GIC           3 : PG_MODULE_MAGIC;
 6158 tgl                        65 ECB             : 
                                 66                 : typedef struct remoteConn
                                 67                 : {
                                 68                 :     PGconn     *conn;           /* Hold the remote connection */
                                 69                 :     int         openCursorCount;    /* The number of open cursors */
                                 70                 :     bool        newXactForCursor;   /* Opened a transaction for a cursor */
                                 71                 : } remoteConn;
                                 72                 : 
                                 73                 : typedef struct storeInfo
                                 74                 : {
                                 75                 :     FunctionCallInfo fcinfo;
                                 76                 :     Tuplestorestate *tuplestore;
                                 77                 :     AttInMetadata *attinmeta;
                                 78                 :     MemoryContext tmpcontext;
                                 79                 :     char      **cstrs;
                                 80                 :     /* temp storage for results to avoid leaks on exception */
                                 81                 :     PGresult   *last_res;
                                 82                 :     PGresult   *cur_res;
                                 83                 : } storeInfo;
                                 84                 : 
                                 85                 : /*
                                 86                 :  * Internal declarations
                                 87                 :  */
                                 88                 : static Datum dblink_record_internal(FunctionCallInfo fcinfo, bool is_async);
                                 89                 : static void prepTuplestoreResult(FunctionCallInfo fcinfo);
                                 90                 : static void materializeResult(FunctionCallInfo fcinfo, PGconn *conn,
                                 91                 :                               PGresult *res);
                                 92                 : static void materializeQueryResult(FunctionCallInfo fcinfo,
                                 93                 :                                    PGconn *conn,
                                 94                 :                                    const char *conname,
                                 95                 :                                    const char *sql,
                                 96                 :                                    bool fail);
                                 97                 : static PGresult *storeQueryResult(volatile storeInfo *sinfo, PGconn *conn, const char *sql);
                                 98                 : static void storeRow(volatile storeInfo *sinfo, PGresult *res, bool first);
                                 99                 : static remoteConn *getConnectionByName(const char *name);
                                100                 : static HTAB *createConnHash(void);
                                101                 : static void createNewConnection(const char *name, remoteConn *rconn);
                                102                 : static void deleteConnection(const char *name);
                                103                 : static char **get_pkey_attnames(Relation rel, int16 *indnkeyatts);
                                104                 : static char **get_text_array_contents(ArrayType *array, int *numitems);
                                105                 : static char *get_sql_insert(Relation rel, int *pkattnums, int pknumatts, char **src_pkattvals, char **tgt_pkattvals);
                                106                 : static char *get_sql_delete(Relation rel, int *pkattnums, int pknumatts, char **tgt_pkattvals);
                                107                 : static char *get_sql_update(Relation rel, int *pkattnums, int pknumatts, char **src_pkattvals, char **tgt_pkattvals);
                                108                 : static char *quote_ident_cstr(char *rawstr);
                                109                 : static int  get_attnum_pk_pos(int *pkattnums, int pknumatts, int key);
                                110                 : static HeapTuple get_tuple_of_interest(Relation rel, int *pkattnums, int pknumatts, char **src_pkattvals);
                                111                 : static Relation get_rel_from_relname(text *relname_text, LOCKMODE lockmode, AclMode aclmode);
                                112                 : static char *generate_relation_name(Relation rel);
                                113                 : static void dblink_connstr_check(const char *connstr);
                                114                 : static void dblink_security_check(PGconn *conn, remoteConn *rconn);
                                115                 : static void dblink_res_error(PGconn *conn, const char *conname, PGresult *res,
                                116                 :                              bool fail, const char *fmt,...) pg_attribute_printf(5, 6);
                                117                 : static char *get_connect_string(const char *servername);
                                118                 : static char *escape_param_str(const char *str);
                                119                 : static void validate_pkattnums(Relation rel,
                                120                 :                                int2vector *pkattnums_arg, int32 pknumatts_arg,
                                121                 :                                int **pkattnums, int *pknumatts);
                                122                 : static bool is_valid_dblink_option(const PQconninfoOption *options,
                                123                 :                                    const char *option, Oid context);
                                124                 : static int  applyRemoteGucs(PGconn *conn);
                                125                 : static void restoreLocalGucs(int nestlevel);
                                126                 : 
                                127                 : /* Global */
                                128                 : static remoteConn *pconn = NULL;
                                129                 : static HTAB *remoteConnHash = NULL;
                                130                 : 
                                131                 : /*
                                132                 :  *  Following is list that holds multiple remote connections.
                                133                 :  *  Calling convention of each dblink function changes to accept
                                134                 :  *  connection name as the first parameter. The connection list is
                                135                 :  *  much like ecpg e.g. a mapping between a name and a PGconn object.
                                136                 :  */
                                137                 : 
                                138                 : typedef struct remoteConnHashEnt
                                139                 : {
                                140                 :     char        name[NAMEDATALEN];
                                141                 :     remoteConn *rconn;
                                142                 : } remoteConnHashEnt;
                                143                 : 
                                144                 : /* initial number of connection hashes */
                                145                 : #define NUMCONN 16
                                146                 : 
                                147                 : static char *
 2296 peter_e                   148 GIC          48 : xpstrdup(const char *in)
 2296 peter_e                   149 ECB             : {
 2296 peter_e                   150 GIC          48 :     if (in == NULL)
 2296 peter_e                   151 CBC          36 :         return NULL;
                                152              12 :     return pstrdup(in);
 2296 peter_e                   153 ECB             : }
                                154                 : 
                                155                 : static void
                                156                 : pg_attribute_noreturn()
 2296 peter_e                   157 UIC           0 : dblink_res_internalerror(PGconn *conn, PGresult *res, const char *p2)
 2296 peter_e                   158 EUB             : {
 2296 peter_e                   159 UIC           0 :     char       *msg = pchomp(PQerrorMessage(conn));
 2153 bruce                     160 EUB             : 
  280 peter                     161 UNC           0 :     PQclear(res);
 2296 peter_e                   162 UBC           0 :     elog(ERROR, "%s: %s", p2, msg);
                                163                 : }
                                164                 : 
                                165                 : static void
                                166                 : pg_attribute_noreturn()
 2296 peter_e                   167 CBC           3 : dblink_conn_not_avail(const char *conname)
                                168                 : {
                                169               3 :     if (conname)
                                170               1 :         ereport(ERROR,
                                171                 :                 (errcode(ERRCODE_CONNECTION_DOES_NOT_EXIST),
                                172                 :                  errmsg("connection \"%s\" not available", conname)));
                                173                 :     else
                                174               2 :         ereport(ERROR,
                                175                 :                 (errcode(ERRCODE_CONNECTION_DOES_NOT_EXIST),
                                176                 :                  errmsg("connection not available")));
                                177                 : }
                                178                 : 
                                179                 : static void
                                180              33 : dblink_get_conn(char *conname_or_str,
                                181                 :                 PGconn *volatile *conn_p, char **conname_p, volatile bool *freeconn_p)
                                182                 : {
                                183              33 :     remoteConn *rconn = getConnectionByName(conname_or_str);
                                184                 :     PGconn     *conn;
                                185                 :     char       *conname;
                                186                 :     bool        freeconn;
                                187                 : 
                                188              33 :     if (rconn)
                                189                 :     {
                                190              27 :         conn = rconn->conn;
                                191              27 :         conname = conname_or_str;
                                192              27 :         freeconn = false;
                                193                 :     }
                                194                 :     else
                                195                 :     {
                                196                 :         const char *connstr;
                                197                 : 
                                198               6 :         connstr = get_connect_string(conname_or_str);
                                199               6 :         if (connstr == NULL)
                                200               6 :             connstr = conname_or_str;
                                201               6 :         dblink_connstr_check(connstr);
                                202                 : 
 1140 tgl                       203 ECB             :         /* OK to make connection */
   76 andres                    204 GNC           6 :         conn = libpqsrv_connect(connstr, PG_WAIT_EXTENSION);
                                205                 : 
 2296 peter_e                   206 GIC           6 :         if (PQstatus(conn) == CONNECTION_BAD)
 2296 peter_e                   207 ECB             :         {
 2296 peter_e                   208 GIC           2 :             char       *msg = pchomp(PQerrorMessage(conn));
 2153 bruce                     209 ECB             : 
   76 andres                    210 GNC           2 :             libpqsrv_disconnect(conn);
 2296 peter_e                   211 CBC           2 :             ereport(ERROR,
                                212                 :                     (errcode(ERRCODE_SQLCLIENT_UNABLE_TO_ESTABLISH_SQLCONNECTION),
 2118 tgl                       213 EUB             :                      errmsg("could not establish connection"),
                                214                 :                      errdetail_internal("%s", msg)));
                                215                 :         }
    1 sfrost                    216 GIC           4 :         dblink_security_check(conn, rconn);
 2296 peter_e                   217               4 :         if (PQclientEncoding(conn) != GetDatabaseEncoding())
 2296 peter_e                   218 LBC           0 :             PQsetClientEncoding(conn, GetDatabaseEncodingName());
 2296 peter_e                   219 GIC           4 :         freeconn = true;
 2296 peter_e                   220 CBC           4 :         conname = NULL;
                                221                 :     }
 2296 peter_e                   222 ECB             : 
 2296 peter_e                   223 CBC          31 :     *conn_p = conn;
                                224              31 :     *conname_p = conname;
                                225              31 :     *freeconn_p = freeconn;
 2296 peter_e                   226 GIC          31 : }
 2296 peter_e                   227 ECB             : 
                                228                 : static PGconn *
 2296 peter_e                   229 GIC          17 : dblink_get_named_conn(const char *conname)
                                230                 : {
                                231              17 :     remoteConn *rconn = getConnectionByName(conname);
 2153 bruce                     232 ECB             : 
 2296 peter_e                   233 GIC          17 :     if (rconn)
 2296 peter_e                   234 CBC          17 :         return rconn->conn;
                                235                 : 
 2218 peter_e                   236 LBC           0 :     dblink_conn_not_avail(conname);
 2153 bruce                     237 ECB             :     return NULL;                /* keep compiler quiet */
 2296 peter_e                   238                 : }
                                239                 : 
                                240                 : static void
 2296 peter_e                   241 CBC         120 : dblink_init(void)
                                242                 : {
                                243             120 :     if (!pconn)
                                244                 :     {
                                245               3 :         pconn = (remoteConn *) MemoryContextAlloc(TopMemoryContext, sizeof(remoteConn));
 2296 peter_e                   246 GIC           3 :         pconn->conn = NULL;
 2296 peter_e                   247 CBC           3 :         pconn->openCursorCount = 0;
 2062                           248               3 :         pconn->newXactForCursor = false;
                                249                 :     }
 2296                           250             120 : }
 7524 bruce                     251 ECB             : 
                                252                 : /*
                                253                 :  * Create a persistent connection to another database
                                254                 :  */
 7524 bruce                     255 CBC           9 : PG_FUNCTION_INFO_V1(dblink_connect);
                                256                 : Datum
                                257              16 : dblink_connect(PG_FUNCTION_ARGS)
 7524 bruce                     258 ECB             : {
 5055 mail                      259 CBC          16 :     char       *conname_or_str = NULL;
 7228 bruce                     260 GIC          16 :     char       *connstr = NULL;
                                261              16 :     char       *connname = NULL;
                                262                 :     char       *msg;
 7228 bruce                     263 CBC          16 :     PGconn     *conn = NULL;
 6392                           264              16 :     remoteConn *rconn = NULL;
 7524 bruce                     265 ECB             : 
 2296 peter_e                   266 GIC          16 :     dblink_init();
                                267                 : 
 7188 bruce                     268 CBC          16 :     if (PG_NARGS() == 2)
                                269                 :     {
 5055 mail                      270 GIC          11 :         conname_or_str = text_to_cstring(PG_GETARG_TEXT_PP(1));
 5493 tgl                       271 CBC          11 :         connname = text_to_cstring(PG_GETARG_TEXT_PP(0));
                                272                 :     }
 7188 bruce                     273               5 :     else if (PG_NARGS() == 1)
 5055 mail                      274 GIC           5 :         conname_or_str = text_to_cstring(PG_GETARG_TEXT_PP(0));
 7524 bruce                     275 EUB             : 
 7188 bruce                     276 GBC          16 :     if (connname)
 1046 mail                      277 EUB             :     {
 5243 tgl                       278 GBC          11 :         rconn = (remoteConn *) MemoryContextAlloc(TopMemoryContext,
                                279                 :                                                   sizeof(remoteConn));
 1046 mail                      280              11 :         rconn->conn = NULL;
 1046 mail                      281 GIC          11 :         rconn->openCursorCount = 0;
                                282              11 :         rconn->newXactForCursor = false;
                                283                 :     }
                                284                 : 
                                285                 :     /* first check for valid foreign data server */
 5055                           286              16 :     connstr = get_connect_string(conname_or_str);
 5055 mail                      287 CBC          16 :     if (connstr == NULL)
 5055 mail                      288 GIC          14 :         connstr = conname_or_str;
                                289                 : 
 5312 tgl                       290 ECB             :     /* check password in connection string if not superuser */
 5312 tgl                       291 GBC          16 :     dblink_connstr_check(connstr);
                                292                 : 
 1140 tgl                       293 ECB             :     /* OK to make connection */
   76 andres                    294 GNC          15 :     conn = libpqsrv_connect(connstr, PG_WAIT_EXTENSION);
 7228 bruce                     295 ECB             : 
 7228 bruce                     296 GIC          15 :     if (PQstatus(conn) == CONNECTION_BAD)
 7524 bruce                     297 ECB             :     {
 2232 peter_e                   298 UIC           0 :         msg = pchomp(PQerrorMessage(conn));
   76 andres                    299 UNC           0 :         libpqsrv_disconnect(conn);
 6392 bruce                     300 LBC           0 :         if (rconn)
                                301               0 :             pfree(rconn);
 7199 tgl                       302 ECB             : 
 7199 tgl                       303 LBC           0 :         ereport(ERROR,
                                304                 :                 (errcode(ERRCODE_SQLCLIENT_UNABLE_TO_ESTABLISH_SQLCONNECTION),
                                305                 :                  errmsg("could not establish connection"),
 4285 tgl                       306 ECB             :                  errdetail_internal("%s", msg)));
                                307                 :     }
 7524 bruce                     308                 : 
 5312 tgl                       309                 :     /* check password actually used if not superuser */
    1 sfrost                    310 GIC          15 :     dblink_security_check(conn, rconn);
 5754 mail                      311 ECB             : 
 3410                           312                 :     /* attempt to set client encoding to match server encoding, if needed */
 3410 mail                      313 GIC          15 :     if (PQclientEncoding(conn) != GetDatabaseEncoding())
 3410 mail                      314 LBC           0 :         PQsetClientEncoding(conn, GetDatabaseEncodingName());
 5052 mail                      315 ECB             : 
 7188 bruce                     316 GIC          15 :     if (connname)
                                317                 :     {
 6392 bruce                     318 CBC          10 :         rconn->conn = conn;
 6392 bruce                     319 GIC          10 :         createNewConnection(connname, rconn);
 7228 bruce                     320 ECB             :     }
                                321                 :     else
                                322                 :     {
 2220 mail                      323 GIC           5 :         if (pconn->conn)
   68 andres                    324 GNC           1 :             libpqsrv_disconnect(pconn->conn);
 6382 mail                      325 CBC           5 :         pconn->conn = conn;
                                326                 :     }
 7228 bruce                     327 ECB             : 
 5493 tgl                       328 GIC          14 :     PG_RETURN_TEXT_P(cstring_to_text("OK"));
 7524 bruce                     329 ECB             : }
                                330                 : 
                                331                 : /*
                                332                 :  * Clear a persistent connection to another database
                                333                 :  */
 7524 bruce                     334 CBC           6 : PG_FUNCTION_INFO_V1(dblink_disconnect);
                                335                 : Datum
                                336              13 : dblink_disconnect(PG_FUNCTION_ARGS)
 7524 bruce                     337 ECB             : {
 7199 tgl                       338 GIC          13 :     char       *conname = NULL;
 6392 bruce                     339 CBC          13 :     remoteConn *rconn = NULL;
 7228 bruce                     340 GIC          13 :     PGconn     *conn = NULL;
                                341                 : 
 2296 peter_e                   342 CBC          13 :     dblink_init();
 6382 mail                      343 ECB             : 
 7188 bruce                     344 CBC          13 :     if (PG_NARGS() == 1)
                                345                 :     {
 5493 tgl                       346               9 :         conname = text_to_cstring(PG_GETARG_TEXT_PP(0));
 6392 bruce                     347 GIC           9 :         rconn = getConnectionByName(conname);
                                348               9 :         if (rconn)
 6392 bruce                     349 CBC           8 :             conn = rconn->conn;
                                350                 :     }
 7228 bruce                     351 ECB             :     else
 6382 mail                      352 CBC           4 :         conn = pconn->conn;
 7524 bruce                     353 ECB             : 
 7228 bruce                     354 CBC          13 :     if (!conn)
 2296 peter_e                   355 GIC           1 :         dblink_conn_not_avail(conname);
                                356                 : 
   76 andres                    357 GNC          12 :     libpqsrv_disconnect(conn);
 6392 bruce                     358 CBC          12 :     if (rconn)
 7228 bruce                     359 ECB             :     {
 7199 tgl                       360 CBC           8 :         deleteConnection(conname);
 6392 bruce                     361 GIC           8 :         pfree(rconn);
                                362                 :     }
 7072 mail                      363 ECB             :     else
 6382 mail                      364 GIC           4 :         pconn->conn = NULL;
                                365                 : 
 5493 tgl                       366 CBC          12 :     PG_RETURN_TEXT_P(cstring_to_text("OK"));
 7524 bruce                     367 ECB             : }
                                368                 : 
                                369                 : /*
                                370                 :  * opens a cursor using a persistent connection
                                371                 :  */
 7524 bruce                     372 GIC           9 : PG_FUNCTION_INFO_V1(dblink_open);
 7524 bruce                     373 ECB             : Datum
 7524 bruce                     374 GBC           9 : dblink_open(PG_FUNCTION_ARGS)
                                375                 : {
 7522 bruce                     376 CBC           9 :     PGresult   *res = NULL;
                                377                 :     PGconn     *conn;
 7228 bruce                     378 GIC           9 :     char       *curname = NULL;
 7228 bruce                     379 CBC           9 :     char       *sql = NULL;
 7228 bruce                     380 GIC           9 :     char       *conname = NULL;
 6248 neilc                     381 ECB             :     StringInfoData buf;
 6392 bruce                     382 CBC           9 :     remoteConn *rconn = NULL;
 6972 mail                      383 GBC           9 :     bool        fail = true;    /* default to backward compatible behavior */
 7524 bruce                     384 ECB             : 
 2296 peter_e                   385 CBC           9 :     dblink_init();
 6248 neilc                     386 GIC           9 :     initStringInfo(&buf);
                                387                 : 
 7188 bruce                     388               9 :     if (PG_NARGS() == 2)
                                389                 :     {
                                390                 :         /* text,text */
 5493 tgl                       391               2 :         curname = text_to_cstring(PG_GETARG_TEXT_PP(0));
 5493 tgl                       392 CBC           2 :         sql = text_to_cstring(PG_GETARG_TEXT_PP(1));
 6382 mail                      393 GIC           2 :         rconn = pconn;
                                394                 :     }
 7188 bruce                     395               7 :     else if (PG_NARGS() == 3)
 7228 bruce                     396 ECB             :     {
 6972 mail                      397                 :         /* might be text,text,text or text,text,bool */
 6972 mail                      398 GIC           6 :         if (get_fn_expr_argtype(fcinfo->flinfo, 2) == BOOLOID)
 6972 mail                      399 ECB             :         {
 5493 tgl                       400 CBC           1 :             curname = text_to_cstring(PG_GETARG_TEXT_PP(0));
                                401               1 :             sql = text_to_cstring(PG_GETARG_TEXT_PP(1));
 6972 mail                      402 GIC           1 :             fail = PG_GETARG_BOOL(2);
 6382 mail                      403 CBC           1 :             rconn = pconn;
                                404                 :         }
 6972 mail                      405 ECB             :         else
                                406                 :         {
 5493 tgl                       407 GIC           5 :             conname = text_to_cstring(PG_GETARG_TEXT_PP(0));
 5493 tgl                       408 CBC           5 :             curname = text_to_cstring(PG_GETARG_TEXT_PP(1));
                                409               5 :             sql = text_to_cstring(PG_GETARG_TEXT_PP(2));
 6392 bruce                     410 GIC           5 :             rconn = getConnectionByName(conname);
                                411                 :         }
                                412                 :     }
 6972 mail                      413               1 :     else if (PG_NARGS() == 4)
                                414                 :     {
 6972 mail                      415 ECB             :         /* text,text,text,bool */
 5493 tgl                       416 GIC           1 :         conname = text_to_cstring(PG_GETARG_TEXT_PP(0));
 5493 tgl                       417 CBC           1 :         curname = text_to_cstring(PG_GETARG_TEXT_PP(1));
 5493 tgl                       418 GIC           1 :         sql = text_to_cstring(PG_GETARG_TEXT_PP(2));
 6972 mail                      419               1 :         fail = PG_GETARG_BOOL(3);
 6392 bruce                     420 CBC           1 :         rconn = getConnectionByName(conname);
 7228 bruce                     421 ECB             :     }
                                422                 : 
 6382 mail                      423 GIC           9 :     if (!rconn || !rconn->conn)
 2296 peter_e                   424 LBC           0 :         dblink_conn_not_avail(conname);
 2195 peter_e                   425 ECB             : 
 2195 peter_e                   426 GIC           9 :     conn = rconn->conn;
 7524 bruce                     427 ECB             : 
 6347                           428                 :     /* If we are not in a transaction, start one */
 6382 mail                      429 GIC           9 :     if (PQtransactionStatus(conn) == PQTRANS_IDLE)
 6382 mail                      430 ECB             :     {
 6382 mail                      431 GIC           7 :         res = PQexec(conn, "BEGIN");
                                432               7 :         if (PQresultStatus(res) != PGRES_COMMAND_OK)
 2296 peter_e                   433 UBC           0 :             dblink_res_internalerror(conn, res, "begin error");
 6382 mail                      434 GBC           7 :         PQclear(res);
 2062 peter_e                   435 GIC           7 :         rconn->newXactForCursor = true;
 6031 bruce                     436 ECB             : 
                                437                 :         /*
                                438                 :          * Since transaction state was IDLE, we force cursor count to
                                439                 :          * initially be 0. This is needed as a previous ABORT might have wiped
                                440                 :          * out our transaction without maintaining the cursor count for us.
 6136 mail                      441                 :          */
 6136 mail                      442 CBC           7 :         rconn->openCursorCount = 0;
 6382 mail                      443 ECB             :     }
                                444                 : 
                                445                 :     /* if we started a transaction, increment cursor count */
 6382 mail                      446 GIC           9 :     if (rconn->newXactForCursor)
 6382 mail                      447 CBC           9 :         (rconn->openCursorCount)++;
 7524 bruce                     448 ECB             : 
 6248 neilc                     449 CBC           9 :     appendStringInfo(&buf, "DECLARE %s CURSOR FOR %s", curname, sql);
 6248 neilc                     450 GIC           9 :     res = PQexec(conn, buf.data);
 6972 mail                      451               9 :     if (!res || PQresultStatus(res) != PGRES_COMMAND_OK)
 6972 mail                      452 ECB             :     {
 1844 tgl                       453 GIC           2 :         dblink_res_error(conn, conname, res, fail,
                                454                 :                          "while opening cursor \"%s\"", curname);
 5393 mail                      455 GBC           2 :         PG_RETURN_TEXT_P(cstring_to_text("ERROR"));
 6972 mail                      456 EUB             :     }
 7524 bruce                     457                 : 
 7228 bruce                     458 GBC           7 :     PQclear(res);
 5493 tgl                       459 GIC           7 :     PG_RETURN_TEXT_P(cstring_to_text("OK"));
                                460                 : }
 7655 bruce                     461 ECB             : 
 7524 bruce                     462 EUB             : /*
                                463                 :  * closes a cursor
 7524 bruce                     464 ECB             :  */
 7524 bruce                     465 GIC           6 : PG_FUNCTION_INFO_V1(dblink_close);
 7524 bruce                     466 ECB             : Datum
 7524 bruce                     467 GIC           5 : dblink_close(PG_FUNCTION_ARGS)
                                468                 : {
 2195 peter_e                   469 ECB             :     PGconn     *conn;
 7522 bruce                     470 CBC           5 :     PGresult   *res = NULL;
 7228 bruce                     471 GIC           5 :     char       *curname = NULL;
 7228 bruce                     472 CBC           5 :     char       *conname = NULL;
                                473                 :     StringInfoData buf;
 6392                           474               5 :     remoteConn *rconn = NULL;
 6972 mail                      475 GIC           5 :     bool        fail = true;    /* default to backward compatible behavior */
                                476                 : 
 2296 peter_e                   477 CBC           5 :     dblink_init();
 6248 neilc                     478 GIC           5 :     initStringInfo(&buf);
                                479                 : 
 7228 bruce                     480 CBC           5 :     if (PG_NARGS() == 1)
                                481                 :     {
 6972 mail                      482 ECB             :         /* text */
 5493 tgl                       483 UIC           0 :         curname = text_to_cstring(PG_GETARG_TEXT_PP(0));
 6382 mail                      484               0 :         rconn = pconn;
 7228 bruce                     485 ECB             :     }
 7188 bruce                     486 GIC           5 :     else if (PG_NARGS() == 2)
 7228 bruce                     487 ECB             :     {
                                488                 :         /* might be text,text or text,bool */
 6972 mail                      489 CBC           5 :         if (get_fn_expr_argtype(fcinfo->flinfo, 1) == BOOLOID)
 6972 mail                      490 ECB             :         {
 5493 tgl                       491 GBC           2 :             curname = text_to_cstring(PG_GETARG_TEXT_PP(0));
 6972 mail                      492 CBC           2 :             fail = PG_GETARG_BOOL(1);
 6382 mail                      493 GIC           2 :             rconn = pconn;
                                494                 :         }
                                495                 :         else
 6972 mail                      496 ECB             :         {
 5493 tgl                       497 GIC           3 :             conname = text_to_cstring(PG_GETARG_TEXT_PP(0));
                                498               3 :             curname = text_to_cstring(PG_GETARG_TEXT_PP(1));
 6392 bruce                     499               3 :             rconn = getConnectionByName(conname);
                                500                 :         }
                                501                 :     }
 6972 mail                      502 CBC           5 :     if (PG_NARGS() == 3)
                                503                 :     {
 6972 mail                      504 ECB             :         /* text,text,bool */
 5493 tgl                       505 UIC           0 :         conname = text_to_cstring(PG_GETARG_TEXT_PP(0));
 5493 tgl                       506 LBC           0 :         curname = text_to_cstring(PG_GETARG_TEXT_PP(1));
 6972 mail                      507               0 :         fail = PG_GETARG_BOOL(2);
 6392 bruce                     508               0 :         rconn = getConnectionByName(conname);
 7228 bruce                     509 ECB             :     }
                                510                 : 
 6382 mail                      511 CBC           5 :     if (!rconn || !rconn->conn)
 2296 peter_e                   512 LBC           0 :         dblink_conn_not_avail(conname);
 2195 peter_e                   513 ECB             : 
 2195 peter_e                   514 GIC           5 :     conn = rconn->conn;
 7524 bruce                     515 ECB             : 
 6248 neilc                     516 GIC           5 :     appendStringInfo(&buf, "CLOSE %s", curname);
 7524 bruce                     517 ECB             : 
                                518                 :     /* close the cursor */
 6248 neilc                     519 CBC           5 :     res = PQexec(conn, buf.data);
 7522 bruce                     520 GIC           5 :     if (!res || PQresultStatus(res) != PGRES_COMMAND_OK)
                                521                 :     {
 1844 tgl                       522 CBC           1 :         dblink_res_error(conn, conname, res, fail,
 1844 tgl                       523 ECB             :                          "while closing cursor \"%s\"", curname);
 5393 mail                      524 CBC           1 :         PG_RETURN_TEXT_P(cstring_to_text("ERROR"));
 6972 mail                      525 ECB             :     }
                                526                 : 
 7524 bruce                     527 CBC           4 :     PQclear(res);
 7524 bruce                     528 ECB             : 
 6382 mail                      529                 :     /* if we started a transaction, decrement cursor count */
 6382 mail                      530 GIC           4 :     if (rconn->newXactForCursor)
 6382 mail                      531 ECB             :     {
 6382 mail                      532 GIC           4 :         (rconn->openCursorCount)--;
                                533                 : 
 6382 mail                      534 ECB             :         /* if count is zero, commit the transaction */
 6382 mail                      535 GIC           4 :         if (rconn->openCursorCount == 0)
 6382 mail                      536 ECB             :         {
 2062 peter_e                   537 CBC           2 :             rconn->newXactForCursor = false;
 6382 mail                      538 ECB             : 
 6382 mail                      539 CBC           2 :             res = PQexec(conn, "COMMIT");
 6382 mail                      540 GIC           2 :             if (PQresultStatus(res) != PGRES_COMMAND_OK)
 2296 peter_e                   541 UIC           0 :                 dblink_res_internalerror(conn, res, "commit error");
 6382 mail                      542 GIC           2 :             PQclear(res);
 6382 mail                      543 ECB             :         }
                                544                 :     }
 7524 bruce                     545                 : 
 5493 tgl                       546 GIC           4 :     PG_RETURN_TEXT_P(cstring_to_text("OK"));
 7524 bruce                     547 ECB             : }
                                548                 : 
                                549                 : /*
                                550                 :  * Fetch results from an open cursor
                                551                 :  */
 7524 bruce                     552 CBC           9 : PG_FUNCTION_INFO_V1(dblink_fetch);
                                553                 : Datum
 7524 bruce                     554 GIC          13 : dblink_fetch(PG_FUNCTION_ARGS)
 7524 bruce                     555 ECB             : {
 4790 bruce                     556 CBC          13 :     PGresult   *res = NULL;
                                557              13 :     char       *conname = NULL;
 4790 bruce                     558 GIC          13 :     remoteConn *rconn = NULL;
                                559              13 :     PGconn     *conn = NULL;
 4790 bruce                     560 ECB             :     StringInfoData buf;
 4790 bruce                     561 GBC          13 :     char       *curname = NULL;
 4790 bruce                     562 GIC          13 :     int         howmany = 0;
 4790 bruce                     563 CBC          13 :     bool        fail = true;    /* default to backward compatible */
 7524 bruce                     564 ECB             : 
 4023 tgl                       565 GIC          13 :     prepTuplestoreResult(fcinfo);
                                566                 : 
 2296 peter_e                   567              13 :     dblink_init();
                                568                 : 
 4823 mail                      569              13 :     if (PG_NARGS() == 4)
                                570                 :     {
 4823 mail                      571 ECB             :         /* text,text,int,bool */
 4823 mail                      572 CBC           1 :         conname = text_to_cstring(PG_GETARG_TEXT_PP(0));
                                573               1 :         curname = text_to_cstring(PG_GETARG_TEXT_PP(1));
                                574               1 :         howmany = PG_GETARG_INT32(2);
 4823 mail                      575 GIC           1 :         fail = PG_GETARG_BOOL(3);
 7228 bruce                     576 ECB             : 
 4823 mail                      577 GIC           1 :         rconn = getConnectionByName(conname);
 4823 mail                      578 CBC           1 :         if (rconn)
 4823 mail                      579 GIC           1 :             conn = rconn->conn;
 4823 mail                      580 ECB             :     }
 4823 mail                      581 GIC          12 :     else if (PG_NARGS() == 3)
                                582                 :     {
 4823 mail                      583 EUB             :         /* text,text,int or text,int,bool */
 4823 mail                      584 GBC           8 :         if (get_fn_expr_argtype(fcinfo->flinfo, 2) == BOOLOID)
                                585                 :         {
 4823 mail                      586 GIC           2 :             curname = text_to_cstring(PG_GETARG_TEXT_PP(0));
                                587               2 :             howmany = PG_GETARG_INT32(1);
                                588               2 :             fail = PG_GETARG_BOOL(2);
 4823 mail                      589 CBC           2 :             conn = pconn->conn;
 4823 mail                      590 ECB             :         }
                                591                 :         else
                                592                 :         {
 5493 tgl                       593 GIC           6 :             conname = text_to_cstring(PG_GETARG_TEXT_PP(0));
                                594               6 :             curname = text_to_cstring(PG_GETARG_TEXT_PP(1));
 7228 bruce                     595               6 :             howmany = PG_GETARG_INT32(2);
 7228 bruce                     596 ECB             : 
 6392 bruce                     597 GIC           6 :             rconn = getConnectionByName(conname);
 6392 bruce                     598 CBC           6 :             if (rconn)
 6392 bruce                     599 GIC           6 :                 conn = rconn->conn;
 7228 bruce                     600 ECB             :         }
                                601                 :     }
 4823 mail                      602 GIC           4 :     else if (PG_NARGS() == 2)
 4823 mail                      603 ECB             :     {
                                604                 :         /* text,int */
 4823 mail                      605 CBC           4 :         curname = text_to_cstring(PG_GETARG_TEXT_PP(0));
 4823 mail                      606 GIC           4 :         howmany = PG_GETARG_INT32(1);
                                607               4 :         conn = pconn->conn;
                                608                 :     }
                                609                 : 
                                610              13 :     if (!conn)
 2296 peter_e                   611 LBC           0 :         dblink_conn_not_avail(conname);
                                612                 : 
 4823 mail                      613 CBC          13 :     initStringInfo(&buf);
                                614              13 :     appendStringInfo(&buf, "FETCH %d FROM %s", howmany, curname);
                                615                 : 
                                616                 :     /*
                                617                 :      * Try to execute the query.  Note that since libpq uses malloc, the
 4790 bruce                     618 EUB             :      * PGresult will be long-lived even though we are still in a short-lived
                                619                 :      * memory context.
                                620                 :      */
 4823 mail                      621 CBC          13 :     res = PQexec(conn, buf.data);
                                622              26 :     if (!res ||
 4823 mail                      623 GBC          26 :         (PQresultStatus(res) != PGRES_COMMAND_OK &&
 4823 mail                      624 GIC          13 :          PQresultStatus(res) != PGRES_TUPLES_OK))
 7522 bruce                     625 ECB             :     {
 1844 tgl                       626 GIC           5 :         dblink_res_error(conn, conname, res, fail,
                                627                 :                          "while fetching from cursor \"%s\"", curname);
 4823 mail                      628 CBC           3 :         return (Datum) 0;
                                629                 :     }
                                630               8 :     else if (PQresultStatus(res) == PGRES_COMMAND_OK)
                                631                 :     {
 4823 mail                      632 ECB             :         /* cursor does not exist - closed already or bad name */
 7524 bruce                     633 UIC           0 :         PQclear(res);
 4823 mail                      634               0 :         ereport(ERROR,
                                635                 :                 (errcode(ERRCODE_INVALID_CURSOR_NAME),
 4823 mail                      636 ECB             :                  errmsg("cursor \"%s\" does not exist", curname)));
                                637                 :     }
                                638                 : 
 3670 tgl                       639 CBC           8 :     materializeResult(fcinfo, conn, res);
 4823 mail                      640 GIC           7 :     return (Datum) 0;
 7524 bruce                     641 ECB             : }
                                642                 : 
                                643                 : /*
                                644                 :  * Note: this is the new preferred version of dblink
                                645                 :  */
 7524 bruce                     646 GIC          11 : PG_FUNCTION_INFO_V1(dblink_record);
 7524 bruce                     647 ECB             : Datum
 7524 bruce                     648 CBC          25 : dblink_record(PG_FUNCTION_ARGS)
 6063 mail                      649 ECB             : {
 5059 mail                      650 GIC          25 :     return dblink_record_internal(fcinfo, false);
 6063 mail                      651 ECB             : }
                                652                 : 
 6063 mail                      653 CBC           3 : PG_FUNCTION_INFO_V1(dblink_send_query);
                                654                 : Datum
 6063 mail                      655 GIC           6 : dblink_send_query(PG_FUNCTION_ARGS)
 6063 mail                      656 ECB             : {
 2296 peter_e                   657                 :     PGconn     *conn;
                                658                 :     char       *sql;
 5059 mail                      659                 :     int         retval;
                                660                 : 
 5059 mail                      661 CBC           6 :     if (PG_NARGS() == 2)
                                662                 :     {
 2296 peter_e                   663 GIC           6 :         conn = dblink_get_named_conn(text_to_cstring(PG_GETARG_TEXT_PP(0)));
 5059 mail                      664 CBC           6 :         sql = text_to_cstring(PG_GETARG_TEXT_PP(1));
                                665                 :     }
 5059 mail                      666 ECB             :     else
                                667                 :         /* shouldn't happen */
 5059 mail                      668 LBC           0 :         elog(ERROR, "wrong number of arguments");
                                669                 : 
                                670                 :     /* async query send */
 5059 mail                      671 GIC           6 :     retval = PQsendQuery(conn, sql);
 5059 mail                      672 CBC           6 :     if (retval != 1)
 2232 peter_e                   673 LBC           0 :         elog(NOTICE, "could not send query: %s", pchomp(PQerrorMessage(conn)));
 5059 mail                      674 ECB             : 
 5059 mail                      675 GIC           6 :     PG_RETURN_INT32(retval);
                                676                 : }
 6063 mail                      677 ECB             : 
 6063 mail                      678 GIC           4 : PG_FUNCTION_INFO_V1(dblink_get_result);
                                679                 : Datum
 6063 mail                      680 CBC           8 : dblink_get_result(PG_FUNCTION_ARGS)
 6063 mail                      681 ECB             : {
 5059 mail                      682 GIC           8 :     return dblink_record_internal(fcinfo, true);
                                683                 : }
                                684                 : 
 6063 mail                      685 EUB             : static Datum
 5059 mail                      686 GIC          33 : dblink_record_internal(FunctionCallInfo fcinfo, bool is_async)
                                687                 : {
 4022 tgl                       688              33 :     PGconn     *volatile conn = NULL;
                                689              33 :     volatile bool freeconn = false;
 4823 mail                      690 ECB             : 
 4023 tgl                       691 GIC          33 :     prepTuplestoreResult(fcinfo);
 7524 bruce                     692 ECB             : 
 2296 peter_e                   693 GIC          33 :     dblink_init();
                                694                 : 
 4022 tgl                       695 GBC          33 :     PG_TRY();
 7522 bruce                     696 EUB             :     {
 4022 tgl                       697 GIC          33 :         char       *sql = NULL;
 4022 tgl                       698 CBC          33 :         char       *conname = NULL;
 4022 tgl                       699 GIC          33 :         bool        fail = true;    /* default to backward compatible */
                                700                 : 
 4022 tgl                       701 CBC          33 :         if (!is_async)
                                702                 :         {
 4022 tgl                       703 GIC          25 :             if (PG_NARGS() == 3)
                                704                 :             {
 4022 tgl                       705 EUB             :                 /* text,text,bool */
 2203 peter_e                   706 GIC           1 :                 conname = text_to_cstring(PG_GETARG_TEXT_PP(0));
 4022 tgl                       707               1 :                 sql = text_to_cstring(PG_GETARG_TEXT_PP(1));
 4022 tgl                       708 CBC           1 :                 fail = PG_GETARG_BOOL(2);
 2203 peter_e                   709               1 :                 dblink_get_conn(conname, &conn, &conname, &freeconn);
                                710                 :             }
 4022 tgl                       711              24 :             else if (PG_NARGS() == 2)
                                712                 :             {
                                713                 :                 /* text,text or text,bool */
                                714              17 :                 if (get_fn_expr_argtype(fcinfo->flinfo, 1) == BOOLOID)
                                715                 :                 {
 4022 tgl                       716 GIC           1 :                     sql = text_to_cstring(PG_GETARG_TEXT_PP(0));
                                717               1 :                     fail = PG_GETARG_BOOL(1);
 2203 peter_e                   718               1 :                     conn = pconn->conn;
 4022 tgl                       719 ECB             :                 }
                                720                 :                 else
                                721                 :                 {
 2203 peter_e                   722 CBC          16 :                     conname = text_to_cstring(PG_GETARG_TEXT_PP(0));
 4022 tgl                       723 GIC          16 :                     sql = text_to_cstring(PG_GETARG_TEXT_PP(1));
 2203 peter_e                   724 CBC          16 :                     dblink_get_conn(conname, &conn, &conname, &freeconn);
 4022 tgl                       725 ECB             :                 }
                                726                 :             }
 4022 tgl                       727 GBC           7 :             else if (PG_NARGS() == 1)
                                728                 :             {
                                729                 :                 /* text */
 4823 mail                      730 GIC           7 :                 conn = pconn->conn;
                                731               7 :                 sql = text_to_cstring(PG_GETARG_TEXT_PP(0));
                                732                 :             }
 4823 mail                      733 ECB             :             else
                                734                 :                 /* shouldn't happen */
 4022 tgl                       735 UIC           0 :                 elog(ERROR, "wrong number of arguments");
                                736                 :         }
                                737                 :         else                    /* is_async */
 4022 tgl                       738 ECB             :         {
                                739                 :             /* get async result */
 2203 peter_e                   740 GIC           8 :             conname = text_to_cstring(PG_GETARG_TEXT_PP(0));
 2203 peter_e                   741 ECB             : 
 4022 tgl                       742 CBC           8 :             if (PG_NARGS() == 2)
                                743                 :             {
 4022 tgl                       744 ECB             :                 /* text,bool */
 4022 tgl                       745 UIC           0 :                 fail = PG_GETARG_BOOL(1);
 2203 peter_e                   746 LBC           0 :                 conn = dblink_get_named_conn(conname);
                                747                 :             }
 4022 tgl                       748 GIC           8 :             else if (PG_NARGS() == 1)
                                749                 :             {
                                750                 :                 /* text */
 2203 peter_e                   751               8 :                 conn = dblink_get_named_conn(conname);
                                752                 :             }
                                753                 :             else
                                754                 :                 /* shouldn't happen */
 4022 tgl                       755 UIC           0 :                 elog(ERROR, "wrong number of arguments");
 7524 bruce                     756 ECB             :         }
                                757                 : 
 4022 tgl                       758 CBC          31 :         if (!conn)
 2296 peter_e                   759 GIC           2 :             dblink_conn_not_avail(conname);
                                760                 : 
 4022 tgl                       761 CBC          29 :         if (!is_async)
 4823 mail                      762 EUB             :         {
                                763                 :             /* synchronous query, use efficient tuple collection method */
 4022 tgl                       764 GIC          21 :             materializeQueryResult(fcinfo, conn, conname, sql, fail);
 4823 mail                      765 ECB             :         }
 5050 bruce                     766 EUB             :         else
                                767                 :         {
                                768                 :             /* async result retrieval, do it the old way */
 4022 tgl                       769 GIC           8 :             PGresult   *res = PQgetResult(conn);
                                770                 : 
 4022 tgl                       771 ECB             :             /* NULL means we're all done with the async results */
 4022 tgl                       772 GIC           8 :             if (res)
                                773                 :             {
 4022 tgl                       774 CBC           5 :                 if (PQresultStatus(res) != PGRES_COMMAND_OK &&
                                775               5 :                     PQresultStatus(res) != PGRES_TUPLES_OK)
 4022 tgl                       776 ECB             :                 {
 1844 tgl                       777 UIC           0 :                     dblink_res_error(conn, conname, res, fail,
                                778                 :                                      "while executing query");
                                779                 :                     /* if fail isn't set, we'll return an empty query result */
                                780                 :                 }
                                781                 :                 else
                                782                 :                 {
 3670 tgl                       783 GIC           5 :                     materializeResult(fcinfo, conn, res);
 4022 tgl                       784 ECB             :                 }
                                785                 :             }
 5050 bruce                     786                 :         }
                                787                 :     }
 1255 peter                     788 GIC           4 :     PG_FINALLY();
 4823 mail                      789 ECB             :     {
                                790                 :         /* if needed, close the connection to the database */
 4022 tgl                       791 CBC          33 :         if (freeconn)
   76 andres                    792 GNC           3 :             libpqsrv_disconnect(conn);
                                793                 :     }
 4022 tgl                       794 GIC          33 :     PG_END_TRY();
 4823 mail                      795 ECB             : 
 4823 mail                      796 GIC          29 :     return (Datum) 0;
 4823 mail                      797 EUB             : }
                                798                 : 
                                799                 : /*
                                800                 :  * Verify function caller can handle a tuplestore result, and set up for that.
                                801                 :  *
                                802                 :  * Note: if the caller returns without actually creating a tuplestore, the
 4023 tgl                       803                 :  * executor will treat the function result as an empty set.
                                804                 :  */
                                805                 : static void
 4023 tgl                       806 GBC          46 : prepTuplestoreResult(FunctionCallInfo fcinfo)
 4023 tgl                       807 EUB             : {
 4023 tgl                       808 GIC          46 :     ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
                                809                 : 
                                810                 :     /* check to see if query supports us returning a tuplestore */
 4023 tgl                       811 CBC          46 :     if (rsinfo == NULL || !IsA(rsinfo, ReturnSetInfo))
 4023 tgl                       812 UIC           0 :         ereport(ERROR,
 4023 tgl                       813 ECB             :                 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
                                814                 :                  errmsg("set-valued function called in context that cannot accept a set")));
 4023 tgl                       815 GIC          46 :     if (!(rsinfo->allowedModes & SFRM_Materialize))
 4023 tgl                       816 LBC           0 :         ereport(ERROR,
                                817                 :                 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
 4023 tgl                       818 ECB             :                  errmsg("materialize mode required, but it is not allowed in this context")));
                                819                 : 
                                820                 :     /* let the executor know we're sending back a tuplestore */
 4023 tgl                       821 GBC          46 :     rsinfo->returnMode = SFRM_Materialize;
                                822                 : 
 4023 tgl                       823 EUB             :     /* caller must fill these to return a non-empty result */
 4023 tgl                       824 GIC          46 :     rsinfo->setResult = NULL;
                                825              46 :     rsinfo->setDesc = NULL;
                                826              46 : }
                                827                 : 
 4023 tgl                       828 EUB             : /*
                                829                 :  * Copy the contents of the PGresult into a tuplestore to be returned
                                830                 :  * as the result of the current function.
                                831                 :  * The PGresult will be released in this function.
                                832                 :  */
                                833                 : static void
 3670 tgl                       834 GIC          13 : materializeResult(FunctionCallInfo fcinfo, PGconn *conn, PGresult *res)
 4823 mail                      835 ECB             : {
 4790 bruce                     836 CBC          13 :     ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
 4823 mail                      837 ECB             : 
                                838                 :     /* prepTuplestoreResult must have been called previously */
 4823 mail                      839 GIC          13 :     Assert(rsinfo->returnMode == SFRM_Materialize);
                                840                 : 
                                841              13 :     PG_TRY();
                                842                 :     {
 4823 mail                      843 ECB             :         TupleDesc   tupdesc;
 3670 tgl                       844 EUB             :         bool        is_sql_cmd;
                                845                 :         int         ntuples;
                                846                 :         int         nfields;
                                847                 : 
 5050 bruce                     848 GIC          13 :         if (PQresultStatus(res) == PGRES_COMMAND_OK)
 5050 bruce                     849 ECB             :         {
 5050 bruce                     850 UIC           0 :             is_sql_cmd = true;
                                851                 : 
 4823 mail                      852 ECB             :             /*
                                853                 :              * need a tuple descriptor representing one TEXT column to return
                                854                 :              * the command status string as our result tuple
                                855                 :              */
 1601 andres                    856 UIC           0 :             tupdesc = CreateTemplateTupleDesc(1);
 5050 bruce                     857               0 :             TupleDescInitEntry(tupdesc, (AttrNumber) 1, "status",
 5050 bruce                     858 ECB             :                                TEXTOID, -1, 0);
 4823 mail                      859 UIC           0 :             ntuples = 1;
                                860               0 :             nfields = 1;
 5050 bruce                     861 ECB             :         }
                                862                 :         else
                                863                 :         {
 4823 mail                      864 CBC          13 :             Assert(PQresultStatus(res) == PGRES_TUPLES_OK);
 5050 bruce                     865 ECB             : 
 4823 mail                      866 CBC          13 :             is_sql_cmd = false;
 5050 bruce                     867 ECB             : 
                                868                 :             /* get a tuple descriptor for our result type */
 5050 bruce                     869 GIC          13 :             switch (get_call_result_type(fcinfo, NULL, &tupdesc))
 6063 mail                      870 ECB             :             {
 5050 bruce                     871 GIC          13 :                 case TYPEFUNC_COMPOSITE:
                                872                 :                     /* success */
 5050 bruce                     873 CBC          13 :                     break;
 5050 bruce                     874 UIC           0 :                 case TYPEFUNC_RECORD:
                                875                 :                     /* failed to determine actual type of RECORD */
                                876               0 :                     ereport(ERROR,
 5050 bruce                     877 ECB             :                             (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
                                878                 :                              errmsg("function returning record called in context "
                                879                 :                                     "that cannot accept type record")));
                                880                 :                     break;
 5050 bruce                     881 LBC           0 :                 default:
                                882                 :                     /* result type isn't composite */
                                883               0 :                     elog(ERROR, "return type must be a row type");
 5050 bruce                     884 EUB             :                     break;
                                885                 :             }
 6031 bruce                     886 ECB             : 
                                887                 :             /* make sure we have a persistent copy of the tupdesc */
 5050 bruce                     888 GIC          13 :             tupdesc = CreateTupleDescCopy(tupdesc);
 4823 mail                      889              13 :             ntuples = PQntuples(res);
                                890              13 :             nfields = PQnfields(res);
 5050 bruce                     891 EUB             :         }
                                892                 : 
                                893                 :         /*
                                894                 :          * check result and tuple descriptor have the same number of columns
 5050 bruce                     895 ECB             :          */
 4823 mail                      896 CBC          13 :         if (nfields != tupdesc->natts)
 5050 bruce                     897 UIC           0 :             ereport(ERROR,
                                898                 :                     (errcode(ERRCODE_DATATYPE_MISMATCH),
                                899                 :                      errmsg("remote query result rowtype does not match "
 5050 bruce                     900 ECB             :                             "the specified FROM clause rowtype")));
                                901                 : 
 4823 mail                      902 GIC          13 :         if (ntuples > 0)
 5050 bruce                     903 ECB             :         {
                                904                 :             AttInMetadata *attinmeta;
 3670 tgl                       905 GIC          13 :             int         nestlevel = -1;
 4790 bruce                     906 ECB             :             Tuplestorestate *tupstore;
                                907                 :             MemoryContext oldcontext;
                                908                 :             int         row;
                                909                 :             char      **values;
                                910                 : 
 4823 mail                      911 GIC          13 :             attinmeta = TupleDescGetAttInMetadata(tupdesc);
                                912                 : 
                                913                 :             /* Set GUCs to ensure we read GUC-sensitive data types correctly */
 3670 tgl                       914              13 :             if (!is_sql_cmd)
                                915              13 :                 nestlevel = applyRemoteGucs(conn);
                                916                 : 
 1165 alvherre                  917              13 :             oldcontext = MemoryContextSwitchTo(rsinfo->econtext->ecxt_per_query_memory);
 4823 mail                      918              13 :             tupstore = tuplestore_begin_heap(true, false, work_mem);
                                919              13 :             rsinfo->setResult = tupstore;
 4823 mail                      920 CBC          13 :             rsinfo->setDesc = tupdesc;
 6063 mail                      921 GIC          13 :             MemoryContextSwitchTo(oldcontext);
                                922                 : 
  209 peter                     923 GNC          13 :             values = palloc_array(char *, nfields);
                                924                 : 
                                925                 :             /* put all tuples into the tuplestore */
 4823 mail                      926 CBC          49 :             for (row = 0; row < ntuples; row++)
 7524 bruce                     927 ECB             :             {
 4823 mail                      928                 :                 HeapTuple   tuple;
                                929                 : 
 4823 mail                      930 GIC          37 :                 if (!is_sql_cmd)
 4823 mail                      931 ECB             :                 {
                                932                 :                     int         i;
                                933                 : 
 4823 mail                      934 GIC         138 :                     for (i = 0; i < nfields; i++)
 4823 mail                      935 ECB             :                     {
 4823 mail                      936 GIC         101 :                         if (PQgetisnull(res, row, i))
 4823 mail                      937 UIC           0 :                             values[i] = NULL;
 4823 mail                      938 ECB             :                         else
 4823 mail                      939 GIC         101 :                             values[i] = PQgetvalue(res, row, i);
                                940                 :                     }
                                941                 :                 }
                                942                 :                 else
 4823 mail                      943 ECB             :                 {
 4823 mail                      944 UIC           0 :                     values[0] = PQcmdStatus(res);
 4823 mail                      945 ECB             :                 }
 7524 bruce                     946                 : 
 4823 mail                      947                 :                 /* build the tuple and put it into the tuplestore. */
 4823 mail                      948 CBC          37 :                 tuple = BuildTupleFromCStrings(attinmeta, values);
 4823 mail                      949 GIC          36 :                 tuplestore_puttuple(tupstore, tuple);
                                950                 :             }
                                951                 : 
                                952                 :             /* clean up GUC settings, if we changed any */
 3670 tgl                       953 CBC          12 :             restoreLocalGucs(nestlevel);
                                954                 :         }
 7524 bruce                     955 ECB             :     }
 1255 peter                     956 CBC           1 :     PG_FINALLY();
                                957                 :     {
                                958                 :         /* be sure to release the libpq result */
 7524 bruce                     959 GIC          13 :         PQclear(res);
 7524 bruce                     960 ECB             :     }
 4823 mail                      961 GIC          13 :     PG_END_TRY();
 7524 bruce                     962              12 : }
                                963                 : 
                                964                 : /*
                                965                 :  * Execute the given SQL command and store its results into a tuplestore
                                966                 :  * to be returned as the result of the current function.
                                967                 :  *
                                968                 :  * This is equivalent to PQexec followed by materializeResult, but we make
                                969                 :  * use of libpq's single-row mode to avoid accumulating the whole result
                                970                 :  * inside libpq before it gets transferred to the tuplestore.
                                971                 :  */
                                972                 : static void
 4022 tgl                       973              21 : materializeQueryResult(FunctionCallInfo fcinfo,
                                974                 :                        PGconn *conn,
                                975                 :                        const char *conname,
                                976                 :                        const char *sql,
 4022 tgl                       977 EUB             :                        bool fail)
                                978                 : {
 4022 tgl                       979 GIC          21 :     ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
 4022 tgl                       980 GBC          21 :     PGresult   *volatile res = NULL;
 1476 peter                     981 GIC          21 :     volatile storeInfo sinfo = {0};
 4022 tgl                       982 EUB             : 
                                983                 :     /* prepTuplestoreResult must have been called previously */
 4022 tgl                       984 GBC          21 :     Assert(rsinfo->returnMode == SFRM_Materialize);
 4022 tgl                       985 EUB             : 
 3902 tgl                       986 GBC          21 :     sinfo.fcinfo = fcinfo;
                                987                 : 
 4022                           988              21 :     PG_TRY();
                                989                 :     {
                                990                 :         /* Create short-lived memory context for data conversions */
 3215 mail                      991              21 :         sinfo.tmpcontext = AllocSetContextCreate(CurrentMemoryContext,
 3215 mail                      992 EUB             :                                                  "dblink temporary context",
                                993                 :                                                  ALLOCSET_DEFAULT_SIZES);
                                994                 : 
 3902 tgl                       995                 :         /* execute query, collecting any tuples into the tuplestore */
 3902 tgl                       996 GIC          21 :         res = storeQueryResult(&sinfo, conn, sql);
                                997                 : 
 4022                           998              21 :         if (!res ||
 4022 tgl                       999 CBC          21 :             (PQresultStatus(res) != PGRES_COMMAND_OK &&
 4022 tgl                      1000 GIC          21 :              PQresultStatus(res) != PGRES_TUPLES_OK))
 4022 tgl                      1001 CBC           2 :         {
                               1002                 :             /*
 4022 tgl                      1003 ECB             :              * dblink_res_error will clear the passed PGresult, so we need
                               1004                 :              * this ugly dance to avoid doing so twice during error exit
                               1005                 :              */
 4022 tgl                      1006 GIC           2 :             PGresult   *res1 = res;
                               1007                 : 
 4022 tgl                      1008 CBC           2 :             res = NULL;
 1844                          1009               2 :             dblink_res_error(conn, conname, res1, fail,
 1844 tgl                      1010 ECB             :                              "while executing query");
                               1011                 :             /* if fail isn't set, we'll return an empty query result */
 4022                          1012                 :         }
 4022 tgl                      1013 CBC          19 :         else if (PQresultStatus(res) == PGRES_COMMAND_OK)
 4022 tgl                      1014 ECB             :         {
                               1015                 :             /*
                               1016                 :              * storeRow didn't get called, so we need to convert the command
 3902 tgl                      1017 EUB             :              * status string to a tuple manually
                               1018                 :              */
                               1019                 :             TupleDesc   tupdesc;
 4022                          1020                 :             AttInMetadata *attinmeta;
                               1021                 :             Tuplestorestate *tupstore;
                               1022                 :             HeapTuple   tuple;
                               1023                 :             char       *values[1];
                               1024                 :             MemoryContext oldcontext;
                               1025                 : 
                               1026                 :             /*
                               1027                 :              * need a tuple descriptor representing one TEXT column to return
 4022 tgl                      1028 ECB             :              * the command status string as our result tuple
                               1029                 :              */
 1601 andres                   1030 UIC           0 :             tupdesc = CreateTemplateTupleDesc(1);
 4022 tgl                      1031               0 :             TupleDescInitEntry(tupdesc, (AttrNumber) 1, "status",
                               1032                 :                                TEXTOID, -1, 0);
                               1033               0 :             attinmeta = TupleDescGetAttInMetadata(tupdesc);
                               1034                 : 
 1165 alvherre                 1035 LBC           0 :             oldcontext = MemoryContextSwitchTo(rsinfo->econtext->ecxt_per_query_memory);
 4022 tgl                      1036 UIC           0 :             tupstore = tuplestore_begin_heap(true, false, work_mem);
 4022 tgl                      1037 LBC           0 :             rsinfo->setResult = tupstore;
                               1038               0 :             rsinfo->setDesc = tupdesc;
 4022 tgl                      1039 UIC           0 :             MemoryContextSwitchTo(oldcontext);
                               1040                 : 
 4022 tgl                      1041 LBC           0 :             values[0] = PQcmdStatus(res);
 4022 tgl                      1042 EUB             : 
                               1043                 :             /* build the tuple and put it into the tuplestore. */
 4022 tgl                      1044 LBC           0 :             tuple = BuildTupleFromCStrings(attinmeta, values);
 4022 tgl                      1045 UBC           0 :             tuplestore_puttuple(tupstore, tuple);
                               1046                 : 
 4022 tgl                      1047 UIC           0 :             PQclear(res);
 3902                          1048               0 :             res = NULL;
 4022 tgl                      1049 ECB             :         }
                               1050                 :         else
                               1051                 :         {
 4022 tgl                      1052 CBC          19 :             Assert(PQresultStatus(res) == PGRES_TUPLES_OK);
 3902 tgl                      1053 ECB             :             /* storeRow should have created a tuplestore */
 4022 tgl                      1054 GIC          19 :             Assert(rsinfo->setResult != NULL);
 4022 tgl                      1055 ECB             : 
 4022 tgl                      1056 GIC          19 :             PQclear(res);
 3902                          1057              19 :             res = NULL;
                               1058                 :         }
                               1059                 : 
                               1060                 :         /* clean up data conversion short-lived memory context */
 3215 mail                     1061              21 :         if (sinfo.tmpcontext != NULL)
                               1062              21 :             MemoryContextDelete(sinfo.tmpcontext);
                               1063              21 :         sinfo.tmpcontext = NULL;
 3215 mail                     1064 ECB             : 
 3902 tgl                      1065 CBC          21 :         PQclear(sinfo.last_res);
 3902 tgl                      1066 GIC          21 :         sinfo.last_res = NULL;
 3902 tgl                      1067 CBC          21 :         PQclear(sinfo.cur_res);
 3902 tgl                      1068 GIC          21 :         sinfo.cur_res = NULL;
 4022 tgl                      1069 ECB             :     }
 4022 tgl                      1070 LBC           0 :     PG_CATCH();
 4022 tgl                      1071 ECB             :     {
                               1072                 :         /* be sure to release any libpq result we collected */
 3902 tgl                      1073 UIC           0 :         PQclear(res);
                               1074               0 :         PQclear(sinfo.last_res);
                               1075               0 :         PQclear(sinfo.cur_res);
 4022 tgl                      1076 ECB             :         /* and clear out any pending data in libpq */
 3902 tgl                      1077 UBC           0 :         while ((res = PQgetResult(conn)) != NULL)
 4022 tgl                      1078 UIC           0 :             PQclear(res);
                               1079               0 :         PG_RE_THROW();
 4022 tgl                      1080 ECB             :     }
 4022 tgl                      1081 CBC          21 :     PG_END_TRY();
                               1082              21 : }
 4022 tgl                      1083 ECB             : 
                               1084                 : /*
                               1085                 :  * Execute query, and send any result rows to sinfo->tuplestore.
                               1086                 :  */
                               1087                 : static PGresult *
 2995 tgl                      1088 CBC          21 : storeQueryResult(volatile storeInfo *sinfo, PGconn *conn, const char *sql)
                               1089                 : {
 3902 tgl                      1090 GIC          21 :     bool        first = true;
 3670 tgl                      1091 CBC          21 :     int         nestlevel = -1;
 3902 tgl                      1092 ECB             :     PGresult   *res;
                               1093                 : 
 3902 tgl                      1094 GIC          21 :     if (!PQsendQuery(conn, sql))
 2232 peter_e                  1095 UIC           0 :         elog(ERROR, "could not send query: %s", pchomp(PQerrorMessage(conn)));
                               1096                 : 
 2118 tgl                      1097 GIC          21 :     if (!PQsetSingleRowMode(conn))  /* shouldn't fail */
 3902 tgl                      1098 UIC           0 :         elog(ERROR, "failed to set single-row mode for dblink query");
                               1099                 : 
                               1100                 :     for (;;)
                               1101                 :     {
 3902 tgl                      1102 GIC         174 :         CHECK_FOR_INTERRUPTS();
 3902 tgl                      1103 ECB             : 
 3902 tgl                      1104 GIC         174 :         sinfo->cur_res = PQgetResult(conn);
 3902 tgl                      1105 CBC         174 :         if (!sinfo->cur_res)
 3902 tgl                      1106 GIC          21 :             break;
                               1107                 : 
                               1108             153 :         if (PQresultStatus(sinfo->cur_res) == PGRES_SINGLE_TUPLE)
                               1109                 :         {
 3902 tgl                      1110 ECB             :             /* got one row from possibly-bigger resultset */
                               1111                 : 
                               1112                 :             /*
 3670                          1113                 :              * Set GUCs to ensure we read GUC-sensitive data types correctly.
                               1114                 :              * We shouldn't do this until we have a row in hand, to ensure
                               1115                 :              * libpq has seen any earlier ParameterStatus protocol messages.
                               1116                 :              */
 3670 tgl                      1117 GIC         132 :             if (first && nestlevel < 0)
                               1118              19 :                 nestlevel = applyRemoteGucs(conn);
                               1119                 : 
 3902                          1120             132 :             storeRow(sinfo, sinfo->cur_res, first);
 3902 tgl                      1121 ECB             : 
 3902 tgl                      1122 GBC         132 :             PQclear(sinfo->cur_res);
 3902 tgl                      1123 CBC         132 :             sinfo->cur_res = NULL;
 3902 tgl                      1124 GIC         132 :             first = false;
                               1125                 :         }
 3902 tgl                      1126 ECB             :         else
                               1127                 :         {
                               1128                 :             /* if empty resultset, fill tuplestore header */
 3902 tgl                      1129 GIC          21 :             if (first && PQresultStatus(sinfo->cur_res) == PGRES_TUPLES_OK)
 3902 tgl                      1130 LBC           0 :                 storeRow(sinfo, sinfo->cur_res, first);
 3902 tgl                      1131 EUB             : 
                               1132                 :             /* store completed result at last_res */
 3902 tgl                      1133 GBC          21 :             PQclear(sinfo->last_res);
 3902 tgl                      1134 GIC          21 :             sinfo->last_res = sinfo->cur_res;
                               1135              21 :             sinfo->cur_res = NULL;
                               1136              21 :             first = true;
                               1137                 :         }
 3902 tgl                      1138 EUB             :     }
                               1139                 : 
 3670                          1140                 :     /* clean up GUC settings, if we changed any */
 3670 tgl                      1141 GIC          21 :     restoreLocalGucs(nestlevel);
                               1142                 : 
                               1143                 :     /* return last_res */
 3902                          1144              21 :     res = sinfo->last_res;
 3902 tgl                      1145 CBC          21 :     sinfo->last_res = NULL;
 3902 tgl                      1146 GIC          21 :     return res;
                               1147                 : }
 3902 tgl                      1148 ECB             : 
 3902 tgl                      1149 EUB             : /*
                               1150                 :  * Send single row to sinfo->tuplestore.
                               1151                 :  *
                               1152                 :  * If "first" is true, create the tuplestore using PGresult's metadata
                               1153                 :  * (in this case the PGresult might contain either zero or one row).
                               1154                 :  */
 3902 tgl                      1155 ECB             : static void
 2995 tgl                      1156 GIC         132 : storeRow(volatile storeInfo *sinfo, PGresult *res, bool first)
                               1157                 : {
 4022 tgl                      1158 CBC         132 :     int         nfields = PQnfields(res);
 4022 tgl                      1159 ECB             :     HeapTuple   tuple;
                               1160                 :     int         i;
                               1161                 :     MemoryContext oldcontext;
                               1162                 : 
 3902 tgl                      1163 GIC         132 :     if (first)
                               1164                 :     {
 4022 tgl                      1165 ECB             :         /* Prepare for new result set */
 4022 tgl                      1166 GBC          19 :         ReturnSetInfo *rsinfo = (ReturnSetInfo *) sinfo->fcinfo->resultinfo;
                               1167                 :         TupleDesc   tupdesc;
                               1168                 : 
                               1169                 :         /*
                               1170                 :          * It's possible to get more than one result set if the query string
                               1171                 :          * contained multiple SQL commands.  In that case, we follow PQexec's
 4022 tgl                      1172 ECB             :          * traditional behavior of throwing away all but the last result.
 4022 tgl                      1173 EUB             :          */
 4022 tgl                      1174 CBC          19 :         if (sinfo->tuplestore)
 4022 tgl                      1175 UIC           0 :             tuplestore_end(sinfo->tuplestore);
 4022 tgl                      1176 GIC          19 :         sinfo->tuplestore = NULL;
                               1177                 : 
 4022 tgl                      1178 ECB             :         /* get a tuple descriptor for our result type */
 4022 tgl                      1179 GIC          19 :         switch (get_call_result_type(sinfo->fcinfo, NULL, &tupdesc))
                               1180                 :         {
                               1181              19 :             case TYPEFUNC_COMPOSITE:
                               1182                 :                 /* success */
                               1183              19 :                 break;
 4022 tgl                      1184 UIC           0 :             case TYPEFUNC_RECORD:
 4022 tgl                      1185 ECB             :                 /* failed to determine actual type of RECORD */
 4022 tgl                      1186 UIC           0 :                 ereport(ERROR,
                               1187                 :                         (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
                               1188                 :                          errmsg("function returning record called in context "
                               1189                 :                                 "that cannot accept type record")));
 4022 tgl                      1190 ECB             :                 break;
 4022 tgl                      1191 UIC           0 :             default:
 4022 tgl                      1192 ECB             :                 /* result type isn't composite */
 4022 tgl                      1193 UBC           0 :                 elog(ERROR, "return type must be a row type");
                               1194                 :                 break;
 4022 tgl                      1195 ECB             :         }
                               1196                 : 
                               1197                 :         /* make sure we have a persistent copy of the tupdesc */
 4022 tgl                      1198 GIC          19 :         tupdesc = CreateTupleDescCopy(tupdesc);
 4022 tgl                      1199 ECB             : 
                               1200                 :         /* check result and tuple descriptor have the same number of columns */
 4022 tgl                      1201 CBC          19 :         if (nfields != tupdesc->natts)
 4022 tgl                      1202 UIC           0 :             ereport(ERROR,
                               1203                 :                     (errcode(ERRCODE_DATATYPE_MISMATCH),
 4022 tgl                      1204 ECB             :                      errmsg("remote query result rowtype does not match "
                               1205                 :                             "the specified FROM clause rowtype")));
                               1206                 : 
                               1207                 :         /* Prepare attinmeta for later data conversions */
 4022 tgl                      1208 GIC          19 :         sinfo->attinmeta = TupleDescGetAttInMetadata(tupdesc);
                               1209                 : 
                               1210                 :         /* Create a new, empty tuplestore */
 3902                          1211              19 :         oldcontext = MemoryContextSwitchTo(rsinfo->econtext->ecxt_per_query_memory);
 4022                          1212              19 :         sinfo->tuplestore = tuplestore_begin_heap(true, false, work_mem);
 4022 tgl                      1213 CBC          19 :         rsinfo->setResult = sinfo->tuplestore;
 4022 tgl                      1214 GIC          19 :         rsinfo->setDesc = tupdesc;
 4022 tgl                      1215 CBC          19 :         MemoryContextSwitchTo(oldcontext);
                               1216                 : 
                               1217                 :         /* Done if empty resultset */
 3902 tgl                      1218 GIC          19 :         if (PQntuples(res) == 0)
 3902 tgl                      1219 LBC           0 :             return;
                               1220                 : 
 4022 tgl                      1221 ECB             :         /*
                               1222                 :          * Set up sufficiently-wide string pointers array; this won't change
                               1223                 :          * in size so it's easy to preallocate.
                               1224                 :          */
 4022 tgl                      1225 GIC          19 :         if (sinfo->cstrs)
 4022 tgl                      1226 UIC           0 :             pfree(sinfo->cstrs);
  209 peter                    1227 GNC          19 :         sinfo->cstrs = palloc_array(char *, nfields);
 4022 tgl                      1228 ECB             :     }
                               1229                 : 
                               1230                 :     /* Should have a single-row result if we get here */
 3902 tgl                      1231 GIC         132 :     Assert(PQntuples(res) == 1);
                               1232                 : 
 4022 tgl                      1233 ECB             :     /*
                               1234                 :      * Do the following work in a temp context that we reset after each tuple.
                               1235                 :      * This cleans up not only the data we have direct access to, but any
                               1236                 :      * cruft the I/O functions might leak.
 4022 tgl                      1237 EUB             :      */
 4022 tgl                      1238 GIC         132 :     oldcontext = MemoryContextSwitchTo(sinfo->tmpcontext);
                               1239                 : 
                               1240                 :     /*
                               1241                 :      * Fill cstrs with null-terminated strings of column values.
                               1242                 :      */
                               1243             510 :     for (i = 0; i < nfields; i++)
                               1244                 :     {
 3902                          1245             378 :         if (PQgetisnull(res, 0, i))
 3902 tgl                      1246 UIC           0 :             sinfo->cstrs[i] = NULL;
                               1247                 :         else
 3902 tgl                      1248 CBC         378 :             sinfo->cstrs[i] = PQgetvalue(res, 0, i);
                               1249                 :     }
 4022 tgl                      1250 ECB             : 
                               1251                 :     /* Convert row to a tuple, and add it to the tuplestore */
 3902 tgl                      1252 GIC         132 :     tuple = BuildTupleFromCStrings(sinfo->attinmeta, sinfo->cstrs);
                               1253                 : 
 4022 tgl                      1254 CBC         132 :     tuplestore_puttuple(sinfo->tuplestore, tuple);
 4022 tgl                      1255 ECB             : 
                               1256                 :     /* Clean up */
 4022 tgl                      1257 CBC         132 :     MemoryContextSwitchTo(oldcontext);
                               1258             132 :     MemoryContextReset(sinfo->tmpcontext);
                               1259                 : }
                               1260                 : 
                               1261                 : /*
                               1262                 :  * List all open dblink connections by name.
                               1263                 :  * Returns an array of all connection names.
                               1264                 :  * Takes no params
                               1265                 :  */
 6063 mail                     1266 GIC           2 : PG_FUNCTION_INFO_V1(dblink_get_connections);
                               1267                 : Datum
                               1268               1 : dblink_get_connections(PG_FUNCTION_ARGS)
                               1269                 : {
                               1270                 :     HASH_SEQ_STATUS status;
                               1271                 :     remoteConnHashEnt *hentry;
 6031 bruce                    1272 CBC           1 :     ArrayBuildState *astate = NULL;
                               1273                 : 
 6063 mail                     1274               1 :     if (remoteConnHash)
                               1275                 :     {
 6063 mail                     1276 GIC           1 :         hash_seq_init(&status, remoteConnHash);
                               1277               4 :         while ((hentry = (remoteConnHashEnt *) hash_seq_search(&status)) != NULL)
                               1278                 :         {
                               1279                 :             /* stash away current value */
                               1280               3 :             astate = accumArrayResult(astate,
 5493 tgl                      1281 CBC           3 :                                       CStringGetTextDatum(hentry->name),
 6063 mail                     1282 ECB             :                                       false, TEXTOID, CurrentMemoryContext);
                               1283                 :         }
                               1284                 :     }
                               1285                 : 
 6063 mail                     1286 CBC           1 :     if (astate)
  224 peter                    1287 GNC           1 :         PG_RETURN_DATUM(makeArrayResult(astate,
 6063 mail                     1288 ECB             :                                               CurrentMemoryContext));
                               1289                 :     else
 6063 mail                     1290 UIC           0 :         PG_RETURN_NULL();
 6063 mail                     1291 EUB             : }
                               1292                 : 
                               1293                 : /*
                               1294                 :  * Checks if a given remote connection is busy
                               1295                 :  *
                               1296                 :  * Returns 1 if the connection is busy, 0 otherwise
                               1297                 :  * Params:
                               1298                 :  *  text connection_name - name of the connection to check
                               1299                 :  *
                               1300                 :  */
 6063 mail                     1301 GIC           2 : PG_FUNCTION_INFO_V1(dblink_is_busy);
                               1302                 : Datum
                               1303               1 : dblink_is_busy(PG_FUNCTION_ARGS)
                               1304                 : {
 2296 peter_e                  1305 ECB             :     PGconn     *conn;
                               1306                 : 
 2296 peter_e                  1307 CBC           1 :     dblink_init();
 2296 peter_e                  1308 GIC           1 :     conn = dblink_get_named_conn(text_to_cstring(PG_GETARG_TEXT_PP(0)));
                               1309                 : 
 6063 mail                     1310               1 :     PQconsumeInput(conn);
                               1311               1 :     PG_RETURN_INT32(PQisBusy(conn));
 6063 mail                     1312 ECB             : }
                               1313                 : 
                               1314                 : /*
                               1315                 :  * Cancels a running request on a connection
                               1316                 :  *
 6031 bruce                    1317                 :  * Returns text:
                               1318                 :  *  "OK" if the cancel request has been sent correctly,
 6031 bruce                    1319 EUB             :  *      an error message otherwise
                               1320                 :  *
                               1321                 :  * Params:
                               1322                 :  *  text connection_name - name of the connection to check
                               1323                 :  *
                               1324                 :  */
 6063 mail                     1325 CBC           2 : PG_FUNCTION_INFO_V1(dblink_cancel_query);
                               1326                 : Datum
                               1327               1 : dblink_cancel_query(PG_FUNCTION_ARGS)
                               1328                 : {
 2296 peter_e                  1329 ECB             :     int         res;
                               1330                 :     PGconn     *conn;
 6031 bruce                    1331                 :     PGcancel   *cancel;
                               1332                 :     char        errbuf[256];
 6063 mail                     1333                 : 
 2296 peter_e                  1334 GIC           1 :     dblink_init();
 2296 peter_e                  1335 CBC           1 :     conn = dblink_get_named_conn(text_to_cstring(PG_GETARG_TEXT_PP(0)));
 6063 mail                     1336 GIC           1 :     cancel = PQgetCancel(conn);
 6063 mail                     1337 ECB             : 
 6063 mail                     1338 CBC           1 :     res = PQcancel(cancel, errbuf, 256);
                               1339               1 :     PQfreeCancel(cancel);
 6063 mail                     1340 ECB             : 
 5575 tgl                      1341 GIC           1 :     if (res == 1)
 5493 tgl                      1342 CBC           1 :         PG_RETURN_TEXT_P(cstring_to_text("OK"));
                               1343                 :     else
 5493 tgl                      1344 UIC           0 :         PG_RETURN_TEXT_P(cstring_to_text(errbuf));
 6063 mail                     1345 EUB             : }
                               1346                 : 
                               1347                 : 
                               1348                 : /*
                               1349                 :  * Get error message from a connection
 6063 mail                     1350 ECB             :  *
                               1351                 :  * Returns text:
                               1352                 :  *  "OK" if no error, an error message otherwise
 6031 bruce                    1353                 :  *
                               1354                 :  * Params:
                               1355                 :  *  text connection_name - name of the connection to check
                               1356                 :  *
 6063 mail                     1357                 :  */
 6063 mail                     1358 GIC           2 : PG_FUNCTION_INFO_V1(dblink_error_message);
                               1359                 : Datum
                               1360               1 : dblink_error_message(PG_FUNCTION_ARGS)
 6063 mail                     1361 ECB             : {
 6031 bruce                    1362                 :     char       *msg;
 2296 peter_e                  1363                 :     PGconn     *conn;
                               1364                 : 
 2296 peter_e                  1365 GIC           1 :     dblink_init();
 2296 peter_e                  1366 CBC           1 :     conn = dblink_get_named_conn(text_to_cstring(PG_GETARG_TEXT_PP(0)));
                               1367                 : 
 6063 mail                     1368 GIC           1 :     msg = PQerrorMessage(conn);
 5575 tgl                      1369 CBC           1 :     if (msg == NULL || msg[0] == '\0')
 5493                          1370               1 :         PG_RETURN_TEXT_P(cstring_to_text("OK"));
                               1371                 :     else
 2232 peter_e                  1372 UIC           0 :         PG_RETURN_TEXT_P(cstring_to_text(pchomp(msg)));
                               1373                 : }
 6063 mail                     1374 EUB             : 
                               1375                 : /*
 7524 bruce                    1376 ECB             :  * Execute an SQL non-SELECT command
 7524 bruce                    1377 EUB             :  */
 7524 bruce                    1378 GIC           9 : PG_FUNCTION_INFO_V1(dblink_exec);
 7524 bruce                    1379 ECB             : Datum
 7524 bruce                    1380 CBC          26 : dblink_exec(PG_FUNCTION_ARGS)
 7524 bruce                    1381 ECB             : {
 4023 tgl                      1382 CBC          26 :     text       *volatile sql_cmd_status = NULL;
 4023 tgl                      1383 GIC          26 :     PGconn     *volatile conn = NULL;
 4023 tgl                      1384 CBC          26 :     volatile bool freeconn = false;
                               1385                 : 
 2296 peter_e                  1386 GIC          26 :     dblink_init();
                               1387                 : 
 4023 tgl                      1388              26 :     PG_TRY();
                               1389                 :     {
                               1390              26 :         PGresult   *res = NULL;
 4023 tgl                      1391 CBC          26 :         char       *sql = NULL;
 4023 tgl                      1392 GIC          26 :         char       *conname = NULL;
 4023 tgl                      1393 CBC          26 :         bool        fail = true;    /* default to backward compatible behavior */
                               1394                 : 
 4023 tgl                      1395 GIC          26 :         if (PG_NARGS() == 3)
                               1396                 :         {
                               1397                 :             /* must be text,text,bool */
 2203 peter_e                  1398 UIC           0 :             conname = text_to_cstring(PG_GETARG_TEXT_PP(0));
 4023 tgl                      1399 LBC           0 :             sql = text_to_cstring(PG_GETARG_TEXT_PP(1));
                               1400               0 :             fail = PG_GETARG_BOOL(2);
 2203 peter_e                  1401 UIC           0 :             dblink_get_conn(conname, &conn, &conname, &freeconn);
                               1402                 :         }
 4023 tgl                      1403 GIC          26 :         else if (PG_NARGS() == 2)
 4023 tgl                      1404 EUB             :         {
                               1405                 :             /* might be text,text or text,bool */
 4023 tgl                      1406 GIC          17 :             if (get_fn_expr_argtype(fcinfo->flinfo, 1) == BOOLOID)
                               1407                 :             {
                               1408               1 :                 sql = text_to_cstring(PG_GETARG_TEXT_PP(0));
                               1409               1 :                 fail = PG_GETARG_BOOL(1);
 2203 peter_e                  1410 CBC           1 :                 conn = pconn->conn;
                               1411                 :             }
                               1412                 :             else
 4023 tgl                      1413 ECB             :             {
 2203 peter_e                  1414 CBC          16 :                 conname = text_to_cstring(PG_GETARG_TEXT_PP(0));
 4023 tgl                      1415 GIC          16 :                 sql = text_to_cstring(PG_GETARG_TEXT_PP(1));
 2203 peter_e                  1416 CBC          16 :                 dblink_get_conn(conname, &conn, &conname, &freeconn);
                               1417                 :             }
 4023 tgl                      1418 ECB             :         }
 4023 tgl                      1419 GIC           9 :         else if (PG_NARGS() == 1)
                               1420                 :         {
                               1421                 :             /* must be single text argument */
 6382 mail                     1422               9 :             conn = pconn->conn;
 5493 tgl                      1423               9 :             sql = text_to_cstring(PG_GETARG_TEXT_PP(0));
                               1424                 :         }
                               1425                 :         else
                               1426                 :             /* shouldn't happen */
 4023 tgl                      1427 UIC           0 :             elog(ERROR, "wrong number of arguments");
 7524 bruce                    1428 ECB             : 
 4023 tgl                      1429 GIC          26 :         if (!conn)
 2296 peter_e                  1430 LBC           0 :             dblink_conn_not_avail(conname);
                               1431                 : 
 4023 tgl                      1432 GIC          26 :         res = PQexec(conn, sql);
                               1433              26 :         if (!res ||
                               1434              26 :             (PQresultStatus(res) != PGRES_COMMAND_OK &&
                               1435               2 :              PQresultStatus(res) != PGRES_TUPLES_OK))
                               1436                 :         {
 1844                          1437               2 :             dblink_res_error(conn, conname, res, fail,
                               1438                 :                              "while executing command");
                               1439                 : 
                               1440                 :             /*
 4023 tgl                      1441 ECB             :              * and save a copy of the command status string to return as our
                               1442                 :              * result tuple
                               1443                 :              */
 4023 tgl                      1444 GIC           1 :             sql_cmd_status = cstring_to_text("ERROR");
                               1445                 :         }
                               1446              24 :         else if (PQresultStatus(res) == PGRES_COMMAND_OK)
 4023 tgl                      1447 ECB             :         {
                               1448                 :             /*
                               1449                 :              * and save a copy of the command status string to return as our
                               1450                 :              * result tuple
                               1451                 :              */
 4023 tgl                      1452 CBC          24 :             sql_cmd_status = cstring_to_text(PQcmdStatus(res));
 4023 tgl                      1453 GIC          24 :             PQclear(res);
                               1454                 :         }
 4023 tgl                      1455 ECB             :         else
                               1456                 :         {
 4023 tgl                      1457 UIC           0 :             PQclear(res);
 4023 tgl                      1458 LBC           0 :             ereport(ERROR,
                               1459                 :                     (errcode(ERRCODE_S_R_E_PROHIBITED_SQL_STATEMENT_ATTEMPTED),
 2118 tgl                      1460 ECB             :                      errmsg("statement returning results not allowed")));
                               1461                 :         }
                               1462                 :     }
 1255 peter                    1463 GIC           1 :     PG_FINALLY();
                               1464                 :     {
 4023 tgl                      1465 ECB             :         /* if needed, close the connection to the database */
 4023 tgl                      1466 CBC          26 :         if (freeconn)
   76 andres                   1467 GNC           1 :             libpqsrv_disconnect(conn);
                               1468                 :     }
 4023 tgl                      1469 GIC          26 :     PG_END_TRY();
                               1470                 : 
 7228 bruce                    1471              25 :     PG_RETURN_TEXT_P(sql_cmd_status);
 7524 bruce                    1472 ECB             : }
                               1473                 : 
                               1474                 : 
 7655                          1475                 : /*
                               1476                 :  * dblink_get_pkey
 7522                          1477                 :  *
                               1478                 :  * Return list of primary key fields for the supplied relation,
                               1479                 :  * or NULL if none exists.
 7655                          1480                 :  */
 7655 bruce                    1481 GIC           2 : PG_FUNCTION_INFO_V1(dblink_get_pkey);
                               1482                 : Datum
                               1483               9 : dblink_get_pkey(PG_FUNCTION_ARGS)
                               1484                 : {
 1828 teodor                   1485 EUB             :     int16       indnkeyatts;
 7522 bruce                    1486                 :     char      **results;
                               1487                 :     FuncCallContext *funcctx;
                               1488                 :     int32       call_cntr;
 7522 bruce                    1489 ECB             :     int32       max_calls;
                               1490                 :     AttInMetadata *attinmeta;
                               1491                 :     MemoryContext oldcontext;
                               1492                 : 
 7524                          1493                 :     /* stuff done only on the first call of the function */
 7522 bruce                    1494 GIC           9 :     if (SRF_IS_FIRSTCALL())
                               1495                 :     {
                               1496                 :         Relation    rel;
                               1497                 :         TupleDesc   tupdesc;
 7524 bruce                    1498 ECB             : 
                               1499                 :         /* create a function context for cross-call persistence */
 7522 bruce                    1500 GIC           3 :         funcctx = SRF_FIRSTCALL_INIT();
 7524 bruce                    1501 ECB             : 
 7522                          1502                 :         /*
                               1503                 :          * switch to memory context appropriate for multiple function calls
                               1504                 :          */
 7524 bruce                    1505 GIC           3 :         oldcontext = MemoryContextSwitchTo(funcctx->multi_call_memory_ctx);
                               1506                 : 
                               1507                 :         /* open target relation */
 2219 noah                     1508               3 :         rel = get_rel_from_relname(PG_GETARG_TEXT_PP(0), AccessShareLock, ACL_SELECT);
                               1509                 : 
 4682 tgl                      1510 ECB             :         /* get the array of attnums */
 1828 teodor                   1511 CBC           3 :         results = get_pkey_attnames(rel, &indnkeyatts);
 4682 tgl                      1512 ECB             : 
 4682 tgl                      1513 GIC           3 :         relation_close(rel, AccessShareLock);
                               1514                 : 
 7522 bruce                    1515 ECB             :         /*
                               1516                 :          * need a tuple descriptor representing one INT and one TEXT column
                               1517                 :          */
 1601 andres                   1518 CBC           3 :         tupdesc = CreateTemplateTupleDesc(2);
 7524 bruce                    1519 GIC           3 :         TupleDescInitEntry(tupdesc, (AttrNumber) 1, "position",
 6947 tgl                      1520 ECB             :                            INT4OID, -1, 0);
 7524 bruce                    1521 GIC           3 :         TupleDescInitEntry(tupdesc, (AttrNumber) 2, "colname",
                               1522                 :                            TEXTOID, -1, 0);
                               1523                 : 
                               1524                 :         /*
 6385 bruce                    1525 ECB             :          * Generate attribute metadata needed later to produce tuples from raw
                               1526                 :          * C strings
                               1527                 :          */
 7524 bruce                    1528 GIC           3 :         attinmeta = TupleDescGetAttInMetadata(tupdesc);
                               1529               3 :         funcctx->attinmeta = attinmeta;
                               1530                 : 
 1828 teodor                   1531               3 :         if ((results != NULL) && (indnkeyatts > 0))
                               1532                 :         {
                               1533               3 :             funcctx->max_calls = indnkeyatts;
                               1534                 : 
                               1535                 :             /* got results, keep track of them */
 7524 bruce                    1536               3 :             funcctx->user_fctx = results;
                               1537                 :         }
                               1538                 :         else
                               1539                 :         {
                               1540                 :             /* fast track when no results */
 5243 tgl                      1541 UIC           0 :             MemoryContextSwitchTo(oldcontext);
 7522 bruce                    1542               0 :             SRF_RETURN_DONE(funcctx);
                               1543                 :         }
                               1544                 : 
 7524 bruce                    1545 GIC           3 :         MemoryContextSwitchTo(oldcontext);
                               1546                 :     }
                               1547                 : 
                               1548                 :     /* stuff done on every call of the function */
 7522 bruce                    1549 CBC           9 :     funcctx = SRF_PERCALL_SETUP();
                               1550                 : 
 7524 bruce                    1551 ECB             :     /*
                               1552                 :      * initialize per-call variables
                               1553                 :      */
 7524 bruce                    1554 CBC           9 :     call_cntr = funcctx->call_cntr;
                               1555               9 :     max_calls = funcctx->max_calls;
 7655 bruce                    1556 ECB             : 
 7524 bruce                    1557 CBC           9 :     results = (char **) funcctx->user_fctx;
 7524 bruce                    1558 GIC           9 :     attinmeta = funcctx->attinmeta;
                               1559                 : 
                               1560               9 :     if (call_cntr < max_calls)   /* do when there is more left to send */
                               1561                 :     {
                               1562                 :         char      **values;
                               1563                 :         HeapTuple   tuple;
                               1564                 :         Datum       result;
                               1565                 : 
  209 peter                    1566 GNC           6 :         values = palloc_array(char *, 2);
 3380 peter_e                  1567 GIC           6 :         values[0] = psprintf("%d", call_cntr + 1);
 7524 bruce                    1568               6 :         values[1] = results[call_cntr];
                               1569                 : 
 7524 bruce                    1570 ECB             :         /* build the tuple */
 7524 bruce                    1571 GIC           6 :         tuple = BuildTupleFromCStrings(attinmeta, values);
                               1572                 : 
                               1573                 :         /* make the tuple into a datum */
 6947 tgl                      1574               6 :         result = HeapTupleGetDatum(tuple);
 7524 bruce                    1575 ECB             : 
 7522 bruce                    1576 GIC           6 :         SRF_RETURN_NEXT(funcctx, result);
                               1577                 :     }
                               1578                 :     else
                               1579                 :     {
                               1580                 :         /* do when there is no more left */
 7228                          1581               3 :         SRF_RETURN_DONE(funcctx);
 7655 bruce                    1582 ECB             :     }
                               1583                 : }
                               1584                 : 
                               1585                 : 
                               1586                 : /*
                               1587                 :  * dblink_build_sql_insert
 7522 bruce                    1588 EUB             :  *
                               1589                 :  * Used to generate an SQL insert statement
                               1590                 :  * based on an existing tuple in a local relation.
                               1591                 :  * This is useful for selectively replicating data
                               1592                 :  * to another server via dblink.
                               1593                 :  *
                               1594                 :  * API:
                               1595                 :  * <relname> - name of local table of interest
 7655 bruce                    1596 ECB             :  * <pkattnums> - an int2vector of attnums which will be used
                               1597                 :  * to identify the local tuple of interest
                               1598                 :  * <pknumatts> - number of attnums in pkattnums
                               1599                 :  * <src_pkattvals_arry> - text array of key values which will be used
                               1600                 :  * to identify the local tuple of interest
                               1601                 :  * <tgt_pkattvals_arry> - text array of key values which will be used
 7655 bruce                    1602 EUB             :  * to build the string for execution remotely. These are substituted
                               1603                 :  * for their counterparts in src_pkattvals_arry
                               1604                 :  */
 7655 bruce                    1605 GIC           3 : PG_FUNCTION_INFO_V1(dblink_build_sql_insert);
                               1606                 : Datum
                               1607               6 : dblink_build_sql_insert(PG_FUNCTION_ARGS)
                               1608                 : {
 2219 noah                     1609 CBC           6 :     text       *relname_text = PG_GETARG_TEXT_PP(0);
 4681 tgl                      1610 GIC           6 :     int2vector *pkattnums_arg = (int2vector *) PG_GETARG_POINTER(1);
                               1611               6 :     int32       pknumatts_arg = PG_GETARG_INT32(2);
 6351                          1612               6 :     ArrayType  *src_pkattvals_arry = PG_GETARG_ARRAYTYPE_P(3);
                               1613               6 :     ArrayType  *tgt_pkattvals_arry = PG_GETARG_ARRAYTYPE_P(4);
 4682 tgl                      1614 ECB             :     Relation    rel;
                               1615                 :     int        *pkattnums;
                               1616                 :     int         pknumatts;
                               1617                 :     char      **src_pkattvals;
                               1618                 :     char      **tgt_pkattvals;
 7655 bruce                    1619                 :     int         src_nitems;
                               1620                 :     int         tgt_nitems;
                               1621                 :     char       *sql;
                               1622                 : 
                               1623                 :     /*
                               1624                 :      * Open target relation.
                               1625                 :      */
 4682 tgl                      1626 GIC           6 :     rel = get_rel_from_relname(relname_text, AccessShareLock, ACL_SELECT);
                               1627                 : 
                               1628                 :     /*
                               1629                 :      * Process pkattnums argument.
                               1630                 :      */
 4681                          1631               6 :     validate_pkattnums(rel, pkattnums_arg, pknumatts_arg,
                               1632                 :                        &pkattnums, &pknumatts);
                               1633                 : 
                               1634                 :     /*
                               1635                 :      * Source array is made up of key values that will be used to locate the
                               1636                 :      * tuple of interest from the local system.
                               1637                 :      */
 6351 tgl                      1638 CBC           4 :     src_pkattvals = get_text_array_contents(src_pkattvals_arry, &src_nitems);
                               1639                 : 
 7655 bruce                    1640 ECB             :     /*
                               1641                 :      * There should be one source array key value for each key attnum
                               1642                 :      */
 7655 bruce                    1643 CBC           4 :     if (src_nitems != pknumatts)
 7199 tgl                      1644 LBC           0 :         ereport(ERROR,
 7199 tgl                      1645 ECB             :                 (errcode(ERRCODE_ARRAY_SUBSCRIPT_ERROR),
                               1646                 :                  errmsg("source key array length must match number of key attributes")));
                               1647                 : 
                               1648                 :     /*
                               1649                 :      * Target array is made up of key values that will be used to build the
                               1650                 :      * SQL string for use on the remote system.
                               1651                 :      */
 6351 tgl                      1652 GIC           4 :     tgt_pkattvals = get_text_array_contents(tgt_pkattvals_arry, &tgt_nitems);
                               1653                 : 
                               1654                 :     /*
                               1655                 :      * There should be one target array key value for each key attnum
 7655 bruce                    1656 ECB             :      */
 7655 bruce                    1657 GIC           4 :     if (tgt_nitems != pknumatts)
 7199 tgl                      1658 UIC           0 :         ereport(ERROR,
                               1659                 :                 (errcode(ERRCODE_ARRAY_SUBSCRIPT_ERROR),
                               1660                 :                  errmsg("target key array length must match number of key attributes")));
 7655 bruce                    1661 ECB             : 
                               1662                 :     /*
                               1663                 :      * Prep work is finally done. Go get the SQL string.
                               1664                 :      */
 4682 tgl                      1665 GIC           4 :     sql = get_sql_insert(rel, pkattnums, pknumatts, src_pkattvals, tgt_pkattvals);
                               1666                 : 
                               1667                 :     /*
 4682 tgl                      1668 ECB             :      * Now we can close the relation.
                               1669                 :      */
 4682 tgl                      1670 GIC           4 :     relation_close(rel, AccessShareLock);
                               1671                 : 
                               1672                 :     /*
 7655 bruce                    1673 ECB             :      * And send it
 7655 bruce                    1674 EUB             :      */
 5493 tgl                      1675 GIC           4 :     PG_RETURN_TEXT_P(cstring_to_text(sql));
                               1676                 : }
                               1677                 : 
                               1678                 : 
                               1679                 : /*
                               1680                 :  * dblink_build_sql_delete
 7522 bruce                    1681 ECB             :  *
                               1682                 :  * Used to generate an SQL delete statement.
                               1683                 :  * This is useful for selectively replicating a
                               1684                 :  * delete to another server via dblink.
                               1685                 :  *
 7655                          1686                 :  * API:
                               1687                 :  * <relname> - name of remote table of interest
                               1688                 :  * <pkattnums> - an int2vector of attnums which will be used
                               1689                 :  * to identify the remote tuple of interest
                               1690                 :  * <pknumatts> - number of attnums in pkattnums
                               1691                 :  * <tgt_pkattvals_arry> - text array of key values which will be used
                               1692                 :  * to build the string for execution remotely.
                               1693                 :  */
 7655 bruce                    1694 GIC           3 : PG_FUNCTION_INFO_V1(dblink_build_sql_delete);
                               1695                 : Datum
                               1696               6 : dblink_build_sql_delete(PG_FUNCTION_ARGS)
                               1697                 : {
 2219 noah                     1698               6 :     text       *relname_text = PG_GETARG_TEXT_PP(0);
 4681 tgl                      1699               6 :     int2vector *pkattnums_arg = (int2vector *) PG_GETARG_POINTER(1);
                               1700               6 :     int32       pknumatts_arg = PG_GETARG_INT32(2);
 6351                          1701               6 :     ArrayType  *tgt_pkattvals_arry = PG_GETARG_ARRAYTYPE_P(3);
                               1702                 :     Relation    rel;
                               1703                 :     int        *pkattnums;
                               1704                 :     int         pknumatts;
                               1705                 :     char      **tgt_pkattvals;
                               1706                 :     int         tgt_nitems;
                               1707                 :     char       *sql;
                               1708                 : 
                               1709                 :     /*
                               1710                 :      * Open target relation.
                               1711                 :      */
 4682                          1712               6 :     rel = get_rel_from_relname(relname_text, AccessShareLock, ACL_SELECT);
                               1713                 : 
 7655 bruce                    1714 ECB             :     /*
                               1715                 :      * Process pkattnums argument.
                               1716                 :      */
 4681 tgl                      1717 GIC           6 :     validate_pkattnums(rel, pkattnums_arg, pknumatts_arg,
 4681 tgl                      1718 ECB             :                        &pkattnums, &pknumatts);
 4813 mail                     1719                 : 
 7655 bruce                    1720                 :     /*
 6385                          1721                 :      * Target array is made up of key values that will be used to build the
                               1722                 :      * SQL string for use on the remote system.
                               1723                 :      */
 6351 tgl                      1724 GIC           4 :     tgt_pkattvals = get_text_array_contents(tgt_pkattvals_arry, &tgt_nitems);
                               1725                 : 
                               1726                 :     /*
                               1727                 :      * There should be one target array key value for each key attnum
                               1728                 :      */
 7655 bruce                    1729               4 :     if (tgt_nitems != pknumatts)
 7199 tgl                      1730 UIC           0 :         ereport(ERROR,
                               1731                 :                 (errcode(ERRCODE_ARRAY_SUBSCRIPT_ERROR),
                               1732                 :                  errmsg("target key array length must match number of key attributes")));
                               1733                 : 
                               1734                 :     /*
 7655 bruce                    1735 ECB             :      * Prep work is finally done. Go get the SQL string.
                               1736                 :      */
 4682 tgl                      1737 GIC           4 :     sql = get_sql_delete(rel, pkattnums, pknumatts, tgt_pkattvals);
                               1738                 : 
                               1739                 :     /*
 4682 tgl                      1740 ECB             :      * Now we can close the relation.
                               1741                 :      */
 4682 tgl                      1742 GIC           4 :     relation_close(rel, AccessShareLock);
                               1743                 : 
                               1744                 :     /*
                               1745                 :      * And send it
                               1746                 :      */
 5493 tgl                      1747 CBC           4 :     PG_RETURN_TEXT_P(cstring_to_text(sql));
                               1748                 : }
                               1749                 : 
                               1750                 : 
                               1751                 : /*
 7655 bruce                    1752 ECB             :  * dblink_build_sql_update
 7522 bruce                    1753 EUB             :  *
                               1754                 :  * Used to generate an SQL update statement
                               1755                 :  * based on an existing tuple in a local relation.
                               1756                 :  * This is useful for selectively replicating data
                               1757                 :  * to another server via dblink.
                               1758                 :  *
                               1759                 :  * API:
                               1760                 :  * <relname> - name of local table of interest
 7655 bruce                    1761 ECB             :  * <pkattnums> - an int2vector of attnums which will be used
                               1762                 :  * to identify the local tuple of interest
                               1763                 :  * <pknumatts> - number of attnums in pkattnums
                               1764                 :  * <src_pkattvals_arry> - text array of key values which will be used
                               1765                 :  * to identify the local tuple of interest
                               1766                 :  * <tgt_pkattvals_arry> - text array of key values which will be used
 7655 bruce                    1767 EUB             :  * to build the string for execution remotely. These are substituted
                               1768                 :  * for their counterparts in src_pkattvals_arry
                               1769                 :  */
 7655 bruce                    1770 GIC           3 : PG_FUNCTION_INFO_V1(dblink_build_sql_update);
                               1771                 : Datum
                               1772               6 : dblink_build_sql_update(PG_FUNCTION_ARGS)
                               1773                 : {
 2219 noah                     1774 CBC           6 :     text       *relname_text = PG_GETARG_TEXT_PP(0);
 4681 tgl                      1775 GIC           6 :     int2vector *pkattnums_arg = (int2vector *) PG_GETARG_POINTER(1);
                               1776               6 :     int32       pknumatts_arg = PG_GETARG_INT32(2);
 6351                          1777               6 :     ArrayType  *src_pkattvals_arry = PG_GETARG_ARRAYTYPE_P(3);
                               1778               6 :     ArrayType  *tgt_pkattvals_arry = PG_GETARG_ARRAYTYPE_P(4);
 4682 tgl                      1779 ECB             :     Relation    rel;
                               1780                 :     int        *pkattnums;
                               1781                 :     int         pknumatts;
                               1782                 :     char      **src_pkattvals;
                               1783                 :     char      **tgt_pkattvals;
 7655 bruce                    1784                 :     int         src_nitems;
                               1785                 :     int         tgt_nitems;
                               1786                 :     char       *sql;
                               1787                 : 
                               1788                 :     /*
                               1789                 :      * Open target relation.
                               1790                 :      */
 4682 tgl                      1791 GIC           6 :     rel = get_rel_from_relname(relname_text, AccessShareLock, ACL_SELECT);
                               1792                 : 
 7655 bruce                    1793 ECB             :     /*
                               1794                 :      * Process pkattnums argument.
 4813 mail                     1795 EUB             :      */
 4681 tgl                      1796 GIC           6 :     validate_pkattnums(rel, pkattnums_arg, pknumatts_arg,
                               1797                 :                        &pkattnums, &pknumatts);
 4813 mail                     1798 EUB             : 
                               1799                 :     /*
                               1800                 :      * Source array is made up of key values that will be used to locate the
                               1801                 :      * tuple of interest from the local system.
                               1802                 :      */
 6351 tgl                      1803 GIC           4 :     src_pkattvals = get_text_array_contents(src_pkattvals_arry, &src_nitems);
                               1804                 : 
                               1805                 :     /*
                               1806                 :      * There should be one source array key value for each key attnum
                               1807                 :      */
 7655 bruce                    1808               4 :     if (src_nitems != pknumatts)
 7199 tgl                      1809 UIC           0 :         ereport(ERROR,
                               1810                 :                 (errcode(ERRCODE_ARRAY_SUBSCRIPT_ERROR),
 1202 alvherre                 1811 ECB             :                  errmsg("source key array length must match number of key attributes")));
                               1812                 : 
 7655 bruce                    1813                 :     /*
                               1814                 :      * Target array is made up of key values that will be used to build the
                               1815                 :      * SQL string for use on the remote system.
                               1816                 :      */
 6351 tgl                      1817 CBC           4 :     tgt_pkattvals = get_text_array_contents(tgt_pkattvals_arry, &tgt_nitems);
                               1818                 : 
 7655 bruce                    1819 ECB             :     /*
                               1820                 :      * There should be one target array key value for each key attnum
 7655 bruce                    1821 EUB             :      */
 7655 bruce                    1822 GIC           4 :     if (tgt_nitems != pknumatts)
 7199 tgl                      1823 LBC           0 :         ereport(ERROR,
                               1824                 :                 (errcode(ERRCODE_ARRAY_SUBSCRIPT_ERROR),
 1202 alvherre                 1825 ECB             :                  errmsg("target key array length must match number of key attributes")));
                               1826                 : 
 7655 bruce                    1827                 :     /*
                               1828                 :      * Prep work is finally done. Go get the SQL string.
                               1829                 :      */
 4682 tgl                      1830 GIC           4 :     sql = get_sql_update(rel, pkattnums, pknumatts, src_pkattvals, tgt_pkattvals);
                               1831                 : 
                               1832                 :     /*
 4682 tgl                      1833 ECB             :      * Now we can close the relation.
                               1834                 :      */
 4682 tgl                      1835 GIC           4 :     relation_close(rel, AccessShareLock);
 7655 bruce                    1836 ECB             : 
                               1837                 :     /*
                               1838                 :      * And send it
 7655 bruce                    1839 EUB             :      */
 5493 tgl                      1840 GIC           4 :     PG_RETURN_TEXT_P(cstring_to_text(sql));
 7969 bruce                    1841 ECB             : }
                               1842                 : 
 5052 tgl                      1843                 : /*
                               1844                 :  * dblink_current_query
                               1845                 :  * return the current query string
 5052 tgl                      1846 EUB             :  * to allow its use in (among other things)
                               1847                 :  * rewrite rules
 5052 tgl                      1848 ECB             :  */
 5052 tgl                      1849 GIC           1 : PG_FUNCTION_INFO_V1(dblink_current_query);
 5052 tgl                      1850 ECB             : Datum
 5052 tgl                      1851 LBC           0 : dblink_current_query(PG_FUNCTION_ARGS)
                               1852                 : {
                               1853                 :     /* This is now just an alias for the built-in function current_query() */
                               1854               0 :     PG_RETURN_DATUM(current_query(fcinfo));
                               1855                 : }
                               1856                 : 
                               1857                 : /*
                               1858                 :  * Retrieve async notifications for a connection.
                               1859                 :  *
                               1860                 :  * Returns a setof record of notifications, or an empty set if none received.
                               1861                 :  * Can optionally take a named connection as parameter, but uses the unnamed
                               1862                 :  * connection per default.
                               1863                 :  *
 4995 mail                     1864 ECB             :  */
                               1865                 : #define DBLINK_NOTIFY_COLS      3
                               1866                 : 
 4995 mail                     1867 GIC           3 : PG_FUNCTION_INFO_V1(dblink_get_notify);
 4995 mail                     1868 ECB             : Datum
 4995 mail                     1869 CBC           2 : dblink_get_notify(PG_FUNCTION_ARGS)
                               1870                 : {
                               1871                 :     PGconn     *conn;
                               1872                 :     PGnotify   *notify;
 4790 bruce                    1873 GIC           2 :     ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
                               1874                 : 
 2296 peter_e                  1875               2 :     dblink_init();
 4995 mail                     1876               2 :     if (PG_NARGS() == 1)
 2296 peter_e                  1877 UIC           0 :         conn = dblink_get_named_conn(text_to_cstring(PG_GETARG_TEXT_PP(0)));
                               1878                 :     else
 4995 mail                     1879 GIC           2 :         conn = pconn->conn;
                               1880                 : 
  173 michael                  1881 CBC           2 :     InitMaterializedSRF(fcinfo, 0);
                               1882                 : 
 4995 mail                     1883               2 :     PQconsumeInput(conn);
                               1884               4 :     while ((notify = PQnotifies(conn)) != NULL)
 4995 mail                     1885 EUB             :     {
                               1886                 :         Datum       values[DBLINK_NOTIFY_COLS];
                               1887                 :         bool        nulls[DBLINK_NOTIFY_COLS];
                               1888                 : 
 4995 mail                     1889 GIC           2 :         memset(values, 0, sizeof(values));
                               1890               2 :         memset(nulls, 0, sizeof(nulls));
                               1891                 : 
 4995 mail                     1892 CBC           2 :         if (notify->relname != NULL)
 4995 mail                     1893 GIC           2 :             values[0] = CStringGetTextDatum(notify->relname);
 4995 mail                     1894 ECB             :         else
 4995 mail                     1895 UIC           0 :             nulls[0] = true;
 4995 mail                     1896 ECB             : 
 4995 mail                     1897 GIC           2 :         values[1] = Int32GetDatum(notify->be_pid);
                               1898                 : 
                               1899               2 :         if (notify->extra != NULL)
                               1900               2 :             values[2] = CStringGetTextDatum(notify->extra);
                               1901                 :         else
 4995 mail                     1902 UIC           0 :             nulls[2] = true;
                               1903                 : 
  397 michael                  1904 GIC           2 :         tuplestore_putvalues(rsinfo->setResult, rsinfo->setDesc, values, nulls);
                               1905                 : 
 4995 mail                     1906 CBC           2 :         PQfreemem(notify);
 4995 mail                     1907 GIC           2 :         PQconsumeInput(conn);
 4995 mail                     1908 ECB             :     }
                               1909                 : 
 4995 mail                     1910 GIC           2 :     return (Datum) 0;
 4995 mail                     1911 ECB             : }
                               1912                 : 
 3833 tgl                      1913                 : /*
                               1914                 :  * Validate the options given to a dblink foreign server or user mapping.
                               1915                 :  * Raise an error if any option is invalid.
                               1916                 :  *
                               1917                 :  * We just check the names of options here, so semantic errors in options,
                               1918                 :  * such as invalid numeric format, will be detected at the attempt to connect.
                               1919                 :  */
 3833 tgl                      1920 GIC           3 : PG_FUNCTION_INFO_V1(dblink_fdw_validator);
                               1921                 : Datum
                               1922               5 : dblink_fdw_validator(PG_FUNCTION_ARGS)
                               1923                 : {
                               1924               5 :     List       *options_list = untransformRelOptions(PG_GETARG_DATUM(0));
                               1925               5 :     Oid         context = PG_GETARG_OID(1);
                               1926                 :     ListCell   *cell;
                               1927                 : 
                               1928                 :     static const PQconninfoOption *options = NULL;
 3833 tgl                      1929 ECB             : 
                               1930                 :     /*
                               1931                 :      * Get list of valid libpq options.
                               1932                 :      *
                               1933                 :      * To avoid unnecessary work, we get the list once and use it throughout
                               1934                 :      * the lifetime of this backend process.  We don't need to care about
                               1935                 :      * memory context issues, because PQconndefaults allocates with malloc.
                               1936                 :      */
 3833 tgl                      1937 GIC           5 :     if (!options)
                               1938                 :     {
                               1939               2 :         options = PQconndefaults();
                               1940               2 :         if (!options)           /* assume reason for failure is OOM */
 3833 tgl                      1941 UIC           0 :             ereport(ERROR,
                               1942                 :                     (errcode(ERRCODE_FDW_OUT_OF_MEMORY),
                               1943                 :                      errmsg("out of memory"),
                               1944                 :                      errdetail("Could not get libpq's default connection options.")));
 3833 tgl                      1945 ECB             :     }
                               1946                 : 
                               1947                 :     /* Validate each supplied option. */
 3833 tgl                      1948 GIC           8 :     foreach(cell, options_list)
                               1949                 :     {
                               1950               5 :         DefElem    *def = (DefElem *) lfirst(cell);
                               1951                 : 
 3833 tgl                      1952 CBC           5 :         if (!is_valid_dblink_option(options, def->defname, context))
                               1953                 :         {
                               1954                 :             /*
                               1955                 :              * Unknown option, or invalid option for the context specified, so
                               1956                 :              * complain about it.  Provide a hint with a valid option that
                               1957                 :              * looks similar, if there is one.
 3833 tgl                      1958 ECB             :              */
                               1959                 :             const PQconninfoOption *opt;
                               1960                 :             const char *closest_match;
                               1961                 :             ClosestMatchState match_state;
  205 peter                    1962 GNC           2 :             bool        has_valid_options = false;
 3833 tgl                      1963 ECB             : 
  205 peter                    1964 GNC           2 :             initClosestMatch(&match_state, def->defname, 4);
 3833 tgl                      1965 GIC          80 :             for (opt = options; opt->keyword; opt++)
                               1966                 :             {
                               1967              78 :                 if (is_valid_dblink_option(options, opt->keyword, context))
                               1968                 :                 {
  205 peter                    1969 GNC           3 :                     has_valid_options = true;
                               1970               3 :                     updateClosestMatch(&match_state, opt->keyword);
                               1971                 :                 }
                               1972                 :             }
                               1973                 : 
                               1974               2 :             closest_match = getClosestMatch(&match_state);
 3833 tgl                      1975 CBC           2 :             ereport(ERROR,
                               1976                 :                     (errcode(ERRCODE_FDW_OPTION_NAME_NOT_FOUND),
 3833 tgl                      1977 ECB             :                      errmsg("invalid option \"%s\"", def->defname),
                               1978                 :                      has_valid_options ? closest_match ?
                               1979                 :                      errhint("Perhaps you meant the option \"%s\".",
                               1980                 :                              closest_match) : 0 :
                               1981                 :                      errhint("There are no valid options in this context.")));
                               1982                 :         }
                               1983                 :     }
                               1984                 : 
 3833 tgl                      1985 CBC           3 :     PG_RETURN_VOID();
                               1986                 : }
 3833 tgl                      1987 ECB             : 
                               1988                 : 
                               1989                 : /*************************************************************
 7969 bruce                    1990                 :  * internal functions
                               1991                 :  */
                               1992                 : 
                               1993                 : 
 7655                          1994                 : /*
                               1995                 :  * get_pkey_attnames
                               1996                 :  *
                               1997                 :  * Get the primary key attnames for the given relation.
                               1998                 :  * Return NULL, and set indnkeyatts = 0, if no primary key exists.
                               1999                 :  */
                               2000                 : static char **
 1828 teodor                   2001 GIC           3 : get_pkey_attnames(Relation rel, int16 *indnkeyatts)
                               2002                 : {
                               2003                 :     Relation    indexRelation;
                               2004                 :     ScanKeyData skey;
 5564 tgl                      2005 ECB             :     SysScanDesc scan;
                               2006                 :     HeapTuple   indexTuple;
 7522 bruce                    2007                 :     int         i;
 7522 bruce                    2008 CBC           3 :     char      **result = NULL;
                               2009                 :     TupleDesc   tupdesc;
                               2010                 : 
                               2011                 :     /* initialize indnkeyatts to 0 in case no primary key exists */
 1828 teodor                   2012 GIC           3 :     *indnkeyatts = 0;
                               2013                 : 
 7655 bruce                    2014               3 :     tupdesc = rel->rd_att;
                               2015                 : 
                               2016                 :     /* Prepare to scan pg_index for entries having indrelid = this rel. */
 1539 andres                   2017               3 :     indexRelation = table_open(IndexRelationId, AccessShareLock);
 5564 tgl                      2018               3 :     ScanKeyInit(&skey,
 7088 tgl                      2019 ECB             :                 Anum_pg_index_indrelid,
                               2020                 :                 BTEqualStrategyNumber, F_OIDEQ,
 4682                          2021                 :                 ObjectIdGetDatum(RelationGetRelid(rel)));
                               2022                 : 
 5564 tgl                      2023 CBC           3 :     scan = systable_beginscan(indexRelation, IndexIndrelidIndexId, true,
                               2024                 :                               NULL, 1, &skey);
                               2025                 : 
                               2026               3 :     while (HeapTupleIsValid(indexTuple = systable_getnext(scan)))
                               2027                 :     {
 7522 bruce                    2028               3 :         Form_pg_index index = (Form_pg_index) GETSTRUCT(indexTuple);
 7655 bruce                    2029 ECB             : 
 7524                          2030                 :         /* we're only interested if it is the primary key */
 5564 tgl                      2031 GIC           3 :         if (index->indisprimary)
 7655 bruce                    2032 ECB             :         {
 1828 teodor                   2033 GIC           3 :             *indnkeyatts = index->indnkeyatts;
 1828 teodor                   2034 CBC           3 :             if (*indnkeyatts > 0)
                               2035                 :             {
  209 peter                    2036 GNC           3 :                 result = palloc_array(char *, *indnkeyatts);
                               2037                 : 
 1828 teodor                   2038 GIC           9 :                 for (i = 0; i < *indnkeyatts; i++)
 6585 tgl                      2039               6 :                     result[i] = SPI_fname(tupdesc, index->indkey.values[i]);
 7655 bruce                    2040 ECB             :             }
 7655 bruce                    2041 CBC           3 :             break;
 7655 bruce                    2042 ECB             :         }
                               2043                 :     }
                               2044                 : 
 5564 tgl                      2045 GIC           3 :     systable_endscan(scan);
 1539 andres                   2046 CBC           3 :     table_close(indexRelation, AccessShareLock);
                               2047                 : 
 7655 bruce                    2048 GBC           3 :     return result;
 7655 bruce                    2049 EUB             : }
                               2050                 : 
 6351 tgl                      2051                 : /*
                               2052                 :  * Deconstruct a text[] into C-strings (note any NULL elements will be
                               2053                 :  * returned as NULL pointers)
                               2054                 :  */
                               2055                 : static char **
 6351 tgl                      2056 GIC          20 : get_text_array_contents(ArrayType *array, int *numitems)
 6351 tgl                      2057 ECB             : {
 6351 tgl                      2058 GIC          20 :     int         ndim = ARR_NDIM(array);
                               2059              20 :     int        *dims = ARR_DIMS(array);
                               2060                 :     int         nitems;
 6351 tgl                      2061 ECB             :     int16       typlen;
                               2062                 :     bool        typbyval;
                               2063                 :     char        typalign;
                               2064                 :     char      **values;
                               2065                 :     char       *ptr;
                               2066                 :     bits8      *bitmap;
                               2067                 :     int         bitmask;
                               2068                 :     int         i;
                               2069                 : 
 6351 tgl                      2070 GIC          20 :     Assert(ARR_ELEMTYPE(array) == TEXTOID);
                               2071                 : 
                               2072              20 :     *numitems = nitems = ArrayGetNItems(ndim, dims);
 6351 tgl                      2073 ECB             : 
 6351 tgl                      2074 GIC          20 :     get_typlenbyvalalign(ARR_ELEMTYPE(array),
                               2075                 :                          &typlen, &typbyval, &typalign);
 6351 tgl                      2076 ECB             : 
  209 peter                    2077 GNC          20 :     values = palloc_array(char *, nitems);
 6351 tgl                      2078 ECB             : 
 6351 tgl                      2079 CBC          20 :     ptr = ARR_DATA_PTR(array);
 6351 tgl                      2080 GIC          20 :     bitmap = ARR_NULLBITMAP(array);
 6351 tgl                      2081 CBC          20 :     bitmask = 1;
 6351 tgl                      2082 ECB             : 
 6351 tgl                      2083 GBC          55 :     for (i = 0; i < nitems; i++)
                               2084                 :     {
 6351 tgl                      2085 GIC          35 :         if (bitmap && (*bitmap & bitmask) == 0)
                               2086                 :         {
 6351 tgl                      2087 LBC           0 :             values[i] = NULL;
                               2088                 :         }
 6351 tgl                      2089 ECB             :         else
                               2090                 :         {
 5493 tgl                      2091 GIC          35 :             values[i] = TextDatumGetCString(PointerGetDatum(ptr));
 5847 tgl                      2092 CBC          35 :             ptr = att_addlength_pointer(ptr, typlen, ptr);
 5847 tgl                      2093 GIC          35 :             ptr = (char *) att_align_nominal(ptr, typalign);
 6351 tgl                      2094 ECB             :         }
                               2095                 : 
                               2096                 :         /* advance bitmap pointer if any */
 6351 tgl                      2097 CBC          35 :         if (bitmap)
 6351 tgl                      2098 ECB             :         {
 6351 tgl                      2099 UIC           0 :             bitmask <<= 1;
 6351 tgl                      2100 LBC           0 :             if (bitmask == 0x100)
 6351 tgl                      2101 ECB             :             {
 6351 tgl                      2102 LBC           0 :                 bitmap++;
 6351 tgl                      2103 UIC           0 :                 bitmask = 1;
                               2104                 :             }
 6351 tgl                      2105 ECB             :         }
                               2106                 :     }
                               2107                 : 
 6351 tgl                      2108 GIC          20 :     return values;
                               2109                 : }
 6351 tgl                      2110 ECB             : 
 7622                          2111                 : static char *
 4681 tgl                      2112 GIC           4 : get_sql_insert(Relation rel, int *pkattnums, int pknumatts, char **src_pkattvals, char **tgt_pkattvals)
 7655 bruce                    2113 ECB             : {
 7522                          2114                 :     char       *relname;
                               2115                 :     HeapTuple   tuple;
                               2116                 :     TupleDesc   tupdesc;
                               2117                 :     int         natts;
                               2118                 :     StringInfoData buf;
                               2119                 :     char       *val;
                               2120                 :     int         key;
                               2121                 :     int         i;
                               2122                 :     bool        needComma;
                               2123                 : 
 6248 neilc                    2124 CBC           4 :     initStringInfo(&buf);
                               2125                 : 
 7442 tgl                      2126 ECB             :     /* get relation name including any needed schema prefix and quoting */
 4682 tgl                      2127 GIC           4 :     relname = generate_relation_name(rel);
 7442 tgl                      2128 ECB             : 
 7655 bruce                    2129 CBC           4 :     tupdesc = rel->rd_att;
 7655 bruce                    2130 GIC           4 :     natts = tupdesc->natts;
                               2131                 : 
 4682 tgl                      2132 GBC           4 :     tuple = get_tuple_of_interest(rel, pkattnums, pknumatts, src_pkattvals);
 7524 bruce                    2133 CBC           4 :     if (!tuple)
 7199 tgl                      2134 UIC           0 :         ereport(ERROR,
 7199 tgl                      2135 ECB             :                 (errcode(ERRCODE_CARDINALITY_VIOLATION),
                               2136                 :                  errmsg("source row not found")));
 7655 bruce                    2137                 : 
 6248 neilc                    2138 GIC           4 :     appendStringInfo(&buf, "INSERT INTO %s(", relname);
                               2139                 : 
 7555 tgl                      2140               4 :     needComma = false;
 7655 bruce                    2141 CBC          19 :     for (i = 0; i < natts; i++)
                               2142                 :     {
 2058 andres                   2143 GIC          15 :         Form_pg_attribute att = TupleDescAttr(tupdesc, i);
                               2144                 : 
                               2145              15 :         if (att->attisdropped)
 7555 tgl                      2146               2 :             continue;
                               2147                 : 
 7555 tgl                      2148 CBC          13 :         if (needComma)
 3447 rhaas                    2149 GIC           9 :             appendStringInfoChar(&buf, ',');
                               2150                 : 
 6248 neilc                    2151 CBC          13 :         appendStringInfoString(&buf,
 2058 andres                   2152 GIC          13 :                                quote_ident_cstr(NameStr(att->attname)));
 7555 tgl                      2153 CBC          13 :         needComma = true;
                               2154                 :     }
 7655 bruce                    2155 ECB             : 
 3447 rhaas                    2156 CBC           4 :     appendStringInfoString(&buf, ") VALUES(");
                               2157                 : 
 7655 bruce                    2158 ECB             :     /*
 4681 tgl                      2159                 :      * Note: i is physical column number (counting from 0).
                               2160                 :      */
 7555 tgl                      2161 CBC           4 :     needComma = false;
 7655 bruce                    2162              19 :     for (i = 0; i < natts; i++)
                               2163                 :     {
 2058 andres                   2164              15 :         if (TupleDescAttr(tupdesc, i)->attisdropped)
 7555 tgl                      2165               2 :             continue;
                               2166                 : 
                               2167              13 :         if (needComma)
 3447 rhaas                    2168               9 :             appendStringInfoChar(&buf, ',');
 7655 bruce                    2169 ECB             : 
 4681 tgl                      2170 GIC          13 :         key = get_attnum_pk_pos(pkattnums, pknumatts, i);
 7655 bruce                    2171 EUB             : 
 4681 tgl                      2172 GIC          13 :         if (key >= 0)
 6351                          2173               7 :             val = tgt_pkattvals[key] ? pstrdup(tgt_pkattvals[key]) : NULL;
 7655 bruce                    2174 ECB             :         else
 7655 bruce                    2175 GIC           6 :             val = SPI_getvalue(tuple, tupdesc, i + 1);
                               2176                 : 
                               2177              13 :         if (val != NULL)
 7655 bruce                    2178 ECB             :         {
 6248 neilc                    2179 GIC          13 :             appendStringInfoString(&buf, quote_literal_cstr(val));
 7655 bruce                    2180              13 :             pfree(val);
                               2181                 :         }
                               2182                 :         else
 3447 rhaas                    2183 UIC           0 :             appendStringInfoString(&buf, "NULL");
 7555 tgl                      2184 GIC          13 :         needComma = true;
                               2185                 :     }
 3447 rhaas                    2186               4 :     appendStringInfoChar(&buf, ')');
                               2187                 : 
 2061 peter_e                  2188               4 :     return buf.data;
                               2189                 : }
 7655 bruce                    2190 ECB             : 
                               2191                 : static char *
 4681 tgl                      2192 GIC           4 : get_sql_delete(Relation rel, int *pkattnums, int pknumatts, char **tgt_pkattvals)
 7655 bruce                    2193 ECB             : {
                               2194                 :     char       *relname;
 7522                          2195                 :     TupleDesc   tupdesc;
 6031                          2196                 :     StringInfoData buf;
                               2197                 :     int         i;
 7655                          2198                 : 
 6248 neilc                    2199 CBC           4 :     initStringInfo(&buf);
 6248 neilc                    2200 EUB             : 
                               2201                 :     /* get relation name including any needed schema prefix and quoting */
 4682 tgl                      2202 GIC           4 :     relname = generate_relation_name(rel);
                               2203                 : 
 7655 bruce                    2204 CBC           4 :     tupdesc = rel->rd_att;
                               2205                 : 
 6248 neilc                    2206 GIC           4 :     appendStringInfo(&buf, "DELETE FROM %s WHERE ", relname);
 7655 bruce                    2207              11 :     for (i = 0; i < pknumatts; i++)
                               2208                 :     {
 4681 tgl                      2209 CBC           7 :         int         pkattnum = pkattnums[i];
 2058 andres                   2210               7 :         Form_pg_attribute attr = TupleDescAttr(tupdesc, pkattnum);
                               2211                 : 
 7655 bruce                    2212               7 :         if (i > 0)
 3447 rhaas                    2213 GIC           3 :             appendStringInfoString(&buf, " AND ");
 7655 bruce                    2214 ECB             : 
 6248 neilc                    2215 CBC           7 :         appendStringInfoString(&buf,
 2058 andres                   2216 GIC           7 :                                quote_ident_cstr(NameStr(attr->attname)));
 7655 bruce                    2217 ECB             : 
 6351 tgl                      2218 CBC           7 :         if (tgt_pkattvals[i] != NULL)
 6248 neilc                    2219 GIC           7 :             appendStringInfo(&buf, " = %s",
 6351 tgl                      2220 CBC           7 :                              quote_literal_cstr(tgt_pkattvals[i]));
 7655 bruce                    2221 ECB             :         else
 3447 rhaas                    2222 UIC           0 :             appendStringInfoString(&buf, " IS NULL");
 7655 bruce                    2223 ECB             :     }
                               2224                 : 
 2061 peter_e                  2225 CBC           4 :     return buf.data;
 7655 bruce                    2226 ECB             : }
                               2227                 : 
 7622 tgl                      2228                 : static char *
 4681 tgl                      2229 GIC           4 : get_sql_update(Relation rel, int *pkattnums, int pknumatts, char **src_pkattvals, char **tgt_pkattvals)
 7655 bruce                    2230 ECB             : {
                               2231                 :     char       *relname;
 7522                          2232                 :     HeapTuple   tuple;
                               2233                 :     TupleDesc   tupdesc;
                               2234                 :     int         natts;
                               2235                 :     StringInfoData buf;
 7522 bruce                    2236 EUB             :     char       *val;
 4681 tgl                      2237 ECB             :     int         key;
                               2238                 :     int         i;
                               2239                 :     bool        needComma;
 7655 bruce                    2240                 : 
 6248 neilc                    2241 GIC           4 :     initStringInfo(&buf);
 6248 neilc                    2242 ECB             : 
                               2243                 :     /* get relation name including any needed schema prefix and quoting */
 4682 tgl                      2244 CBC           4 :     relname = generate_relation_name(rel);
 7442 tgl                      2245 ECB             : 
 7655 bruce                    2246 GIC           4 :     tupdesc = rel->rd_att;
 7655 bruce                    2247 CBC           4 :     natts = tupdesc->natts;
 7655 bruce                    2248 ECB             : 
 4682 tgl                      2249 GIC           4 :     tuple = get_tuple_of_interest(rel, pkattnums, pknumatts, src_pkattvals);
 7524 bruce                    2250 CBC           4 :     if (!tuple)
 7199 tgl                      2251 LBC           0 :         ereport(ERROR,
                               2252                 :                 (errcode(ERRCODE_CARDINALITY_VIOLATION),
 7199 tgl                      2253 ECB             :                  errmsg("source row not found")));
                               2254                 : 
 6248 neilc                    2255 CBC           4 :     appendStringInfo(&buf, "UPDATE %s SET ", relname);
 7655 bruce                    2256 ECB             : 
                               2257                 :     /*
 4681 tgl                      2258 EUB             :      * Note: i is physical column number (counting from 0).
                               2259                 :      */
 7555 tgl                      2260 GIC           4 :     needComma = false;
 7655 bruce                    2261 CBC          19 :     for (i = 0; i < natts; i++)
                               2262                 :     {
 2058 andres                   2263 GIC          15 :         Form_pg_attribute attr = TupleDescAttr(tupdesc, i);
                               2264                 : 
                               2265              15 :         if (attr->attisdropped)
 7555 tgl                      2266               2 :             continue;
                               2267                 : 
                               2268              13 :         if (needComma)
 3447 rhaas                    2269 CBC           9 :             appendStringInfoString(&buf, ", ");
                               2270                 : 
 6248 neilc                    2271 GIC          13 :         appendStringInfo(&buf, "%s = ",
 2058 andres                   2272              13 :                          quote_ident_cstr(NameStr(attr->attname)));
                               2273                 : 
 4681 tgl                      2274              13 :         key = get_attnum_pk_pos(pkattnums, pknumatts, i);
 7655 bruce                    2275 ECB             : 
 4681 tgl                      2276 CBC          13 :         if (key >= 0)
 6347 bruce                    2277 GIC           7 :             val = tgt_pkattvals[key] ? pstrdup(tgt_pkattvals[key]) : NULL;
 7655 bruce                    2278 ECB             :         else
 7655 bruce                    2279 GIC           6 :             val = SPI_getvalue(tuple, tupdesc, i + 1);
 7655 bruce                    2280 ECB             : 
 7655 bruce                    2281 GIC          13 :         if (val != NULL)
                               2282                 :         {
 6248 neilc                    2283              13 :             appendStringInfoString(&buf, quote_literal_cstr(val));
 7655 bruce                    2284 CBC          13 :             pfree(val);
                               2285                 :         }
                               2286                 :         else
 6248 neilc                    2287 UIC           0 :             appendStringInfoString(&buf, "NULL");
 7555 tgl                      2288 GIC          13 :         needComma = true;
                               2289                 :     }
                               2290                 : 
 3447 rhaas                    2291 CBC           4 :     appendStringInfoString(&buf, " WHERE ");
 7655 bruce                    2292 ECB             : 
 7655 bruce                    2293 CBC          11 :     for (i = 0; i < pknumatts; i++)
                               2294                 :     {
 4681 tgl                      2295               7 :         int         pkattnum = pkattnums[i];
 2058 andres                   2296 GIC           7 :         Form_pg_attribute attr = TupleDescAttr(tupdesc, pkattnum);
                               2297                 : 
 7655 bruce                    2298               7 :         if (i > 0)
 3447 rhaas                    2299 CBC           3 :             appendStringInfoString(&buf, " AND ");
                               2300                 : 
 3447 rhaas                    2301 GIC           7 :         appendStringInfoString(&buf,
 2058 andres                   2302               7 :                                quote_ident_cstr(NameStr(attr->attname)));
                               2303                 : 
 4681 tgl                      2304               7 :         val = tgt_pkattvals[i];
                               2305                 : 
 7655 bruce                    2306               7 :         if (val != NULL)
 6248 neilc                    2307               7 :             appendStringInfo(&buf, " = %s", quote_literal_cstr(val));
                               2308                 :         else
 3447 rhaas                    2309 UIC           0 :             appendStringInfoString(&buf, " IS NULL");
                               2310                 :     }
                               2311                 : 
 2061 peter_e                  2312 CBC           4 :     return buf.data;
                               2313                 : }
 7655 bruce                    2314 EUB             : 
                               2315                 : /*
 7655 bruce                    2316 ECB             :  * Return a properly quoted identifier.
                               2317                 :  * Uses quote_ident in quote.c
                               2318                 :  */
                               2319                 : static char *
 7655 bruce                    2320 GIC          80 : quote_ident_cstr(char *rawstr)
 7655 bruce                    2321 ECB             : {
 7522                          2322                 :     text       *rawstr_text;
                               2323                 :     text       *result_text;
                               2324                 :     char       *result;
                               2325                 : 
 5493 tgl                      2326 GIC          80 :     rawstr_text = cstring_to_text(rawstr);
 2219 noah                     2327              80 :     result_text = DatumGetTextPP(DirectFunctionCall1(quote_ident,
                               2328                 :                                                      PointerGetDatum(rawstr_text)));
 5493 tgl                      2329              80 :     result = text_to_cstring(result_text);
                               2330                 : 
 7655 bruce                    2331 CBC          80 :     return result;
                               2332                 : }
 7655 bruce                    2333 ECB             : 
                               2334                 : static int
 4681 tgl                      2335 CBC          26 : get_attnum_pk_pos(int *pkattnums, int pknumatts, int key)
                               2336                 : {
 7522 bruce                    2337 ECB             :     int         i;
 7655                          2338                 : 
                               2339                 :     /*
 7522                          2340                 :      * Not likely a long list anyway, so just scan for the value
 7655                          2341                 :      */
 7655 bruce                    2342 GIC          50 :     for (i = 0; i < pknumatts; i++)
 4681 tgl                      2343 CBC          38 :         if (key == pkattnums[i])
 7655 bruce                    2344              14 :             return i;
                               2345                 : 
 7655 bruce                    2346 GIC          12 :     return -1;
 7655 bruce                    2347 ECB             : }
                               2348                 : 
 7622 tgl                      2349                 : static HeapTuple
 4681 tgl                      2350 GIC           8 : get_tuple_of_interest(Relation rel, int *pkattnums, int pknumatts, char **src_pkattvals)
 7655 bruce                    2351 ECB             : {
 7522                          2352                 :     char       *relname;
                               2353                 :     TupleDesc   tupdesc;
 4681 tgl                      2354                 :     int         natts;
 6248 neilc                    2355                 :     StringInfoData buf;
                               2356                 :     int         ret;
 7522 bruce                    2357                 :     HeapTuple   tuple;
                               2358                 :     int         i;
                               2359                 : 
 4681 tgl                      2360                 :     /*
                               2361                 :      * Connect to SPI manager
                               2362                 :      */
 4681 tgl                      2363 GIC           8 :     if ((ret = SPI_connect()) < 0)
 4681 tgl                      2364 EUB             :         /* internal error */
 4681 tgl                      2365 UIC           0 :         elog(ERROR, "SPI connect failure - returned %d", ret);
                               2366                 : 
 6248 neilc                    2367 GIC           8 :     initStringInfo(&buf);
                               2368                 : 
                               2369                 :     /* get relation name including any needed schema prefix and quoting */
 4682 tgl                      2370 CBC           8 :     relname = generate_relation_name(rel);
 7442 tgl                      2371 ECB             : 
 4682 tgl                      2372 GIC           8 :     tupdesc = rel->rd_att;
 4681                          2373               8 :     natts = tupdesc->natts;
                               2374                 : 
                               2375                 :     /*
 4681 tgl                      2376 ECB             :      * Build sql statement to look up tuple of interest, ie, the one matching
 4681 tgl                      2377 EUB             :      * src_pkattvals.  We used to use "SELECT *" here, but it's simpler to
                               2378                 :      * generate a result tuple that matches the table's physical structure,
                               2379                 :      * with NULLs for any dropped columns.  Otherwise we have to deal with two
                               2380                 :      * different tupdescs and everything's very confusing.
 7655 bruce                    2381 ECB             :      */
 4681 tgl                      2382 GIC           8 :     appendStringInfoString(&buf, "SELECT ");
 7655 bruce                    2383 ECB             : 
 4681 tgl                      2384 GIC          38 :     for (i = 0; i < natts; i++)
 4681 tgl                      2385 ECB             :     {
 2058 andres                   2386 CBC          30 :         Form_pg_attribute attr = TupleDescAttr(tupdesc, i);
                               2387                 : 
 4681 tgl                      2388              30 :         if (i > 0)
 4681 tgl                      2389 GIC          22 :             appendStringInfoString(&buf, ", ");
                               2390                 : 
 2058 andres                   2391              30 :         if (attr->attisdropped)
 4681 tgl                      2392               4 :             appendStringInfoString(&buf, "NULL");
                               2393                 :         else
                               2394              26 :             appendStringInfoString(&buf,
 2058 andres                   2395 GBC          26 :                                    quote_ident_cstr(NameStr(attr->attname)));
                               2396                 :     }
 4681 tgl                      2397 EUB             : 
 4681 tgl                      2398 GIC           8 :     appendStringInfo(&buf, " FROM %s WHERE ", relname);
                               2399                 : 
 7655 bruce                    2400              22 :     for (i = 0; i < pknumatts; i++)
                               2401                 :     {
 4681 tgl                      2402              14 :         int         pkattnum = pkattnums[i];
 2058 andres                   2403              14 :         Form_pg_attribute attr = TupleDescAttr(tupdesc, pkattnum);
                               2404                 : 
 7655 bruce                    2405              14 :         if (i > 0)
 3447 rhaas                    2406               6 :             appendStringInfoString(&buf, " AND ");
                               2407                 : 
 6248 neilc                    2408              14 :         appendStringInfoString(&buf,
 2058 andres                   2409              14 :                                quote_ident_cstr(NameStr(attr->attname)));
                               2410                 : 
 6351 tgl                      2411              14 :         if (src_pkattvals[i] != NULL)
 6248 neilc                    2412 CBC          14 :             appendStringInfo(&buf, " = %s",
 6351 tgl                      2413 GIC          14 :                              quote_literal_cstr(src_pkattvals[i]));
                               2414                 :         else
 3447 rhaas                    2415 UIC           0 :             appendStringInfoString(&buf, " IS NULL");
                               2416                 :     }
                               2417                 : 
 7655 bruce                    2418 ECB             :     /*
                               2419                 :      * Retrieve the desired tuple
                               2420                 :      */
 6248 neilc                    2421 CBC           8 :     ret = SPI_exec(buf.data, 0);
 6248 neilc                    2422 GIC           8 :     pfree(buf.data);
 7655 bruce                    2423 ECB             : 
 7655 bruce                    2424 EUB             :     /*
                               2425                 :      * Only allow one qualifying tuple
                               2426                 :      */
 7655 bruce                    2427 CBC           8 :     if ((ret == SPI_OK_SELECT) && (SPI_processed > 1))
 7199 tgl                      2428 UIC           0 :         ereport(ERROR,
                               2429                 :                 (errcode(ERRCODE_CARDINALITY_VIOLATION),
                               2430                 :                  errmsg("source criteria matched more than one record")));
                               2431                 : 
 7655 bruce                    2432 GIC           8 :     else if (ret == SPI_OK_SELECT && SPI_processed == 1)
                               2433                 :     {
                               2434               8 :         SPITupleTable *tuptable = SPI_tuptable;
                               2435                 : 
                               2436               8 :         tuple = SPI_copytuple(tuptable->vals[0]);
 7074 mail                     2437 CBC           8 :         SPI_finish();
                               2438                 : 
 7655 bruce                    2439 GIC           8 :         return tuple;
                               2440                 :     }
                               2441                 :     else
                               2442                 :     {
 7655 bruce                    2443 ECB             :         /*
                               2444                 :          * no qualifying tuples
                               2445                 :          */
 7074 mail                     2446 LBC           0 :         SPI_finish();
                               2447                 : 
 7655 bruce                    2448               0 :         return NULL;
                               2449                 :     }
 7655 bruce                    2450 ECB             : 
                               2451                 :     /*
                               2452                 :      * never reached, but keep compiler quiet
                               2453                 :      */
                               2454                 :     return NULL;
                               2455                 : }
                               2456                 : 
                               2457                 : /*
                               2458                 :  * Open the relation named by relname_text, acquire specified type of lock,
                               2459                 :  * verify we have specified permissions.
 4682 tgl                      2460                 :  * Caller must close rel when done with it.
                               2461                 :  */
                               2462                 : static Relation
 4682 tgl                      2463 CBC          21 : get_rel_from_relname(text *relname_text, LOCKMODE lockmode, AclMode aclmode)
 7655 bruce                    2464 ECB             : {
 7622 tgl                      2465                 :     RangeVar   *relvar;
                               2466                 :     Relation    rel;
                               2467                 :     AclResult   aclresult;
 7655 bruce                    2468                 : 
 6526 neilc                    2469 CBC          21 :     relvar = makeRangeVarFromNameList(textToQualifiedNameList(relname_text));
 1539 andres                   2470 GIC          21 :     rel = table_openrv(relvar, lockmode);
 7655 bruce                    2471 ECB             : 
 4682 tgl                      2472 GIC          21 :     aclresult = pg_class_aclcheck(RelationGetRelid(rel), GetUserId(),
                               2473                 :                                   aclmode);
                               2474              21 :     if (aclresult != ACLCHECK_OK)
 1954 peter_e                  2475 LBC           0 :         aclcheck_error(aclresult, get_relkind_objtype(rel->rd_rel->relkind),
 4682 tgl                      2476 UIC           0 :                        RelationGetRelationName(rel));
                               2477                 : 
 4682 tgl                      2478 GIC          21 :     return rel;
 7655 bruce                    2479 ECB             : }
                               2480                 : 
                               2481                 : /*
 7442 tgl                      2482                 :  * generate_relation_name - copied from ruleutils.c
                               2483                 :  *      Compute the name to display for a relation
                               2484                 :  *
                               2485                 :  * The result includes all necessary quoting and schema-prefixing.
                               2486                 :  */
                               2487                 : static char *
 4682 tgl                      2488 GIC          20 : generate_relation_name(Relation rel)
                               2489                 : {
                               2490                 :     char       *nspname;
                               2491                 :     char       *result;
                               2492                 : 
 7442 tgl                      2493 ECB             :     /* Qualify the name if not visible in search path */
 4682 tgl                      2494 CBC          20 :     if (RelationIsVisible(RelationGetRelid(rel)))
 7442 tgl                      2495 GIC          15 :         nspname = NULL;
 7442 tgl                      2496 ECB             :     else
 4682 tgl                      2497 CBC           5 :         nspname = get_namespace_name(rel->rd_rel->relnamespace);
 7442 tgl                      2498 ECB             : 
 4682 tgl                      2499 GIC          20 :     result = quote_qualified_identifier(nspname, RelationGetRelationName(rel));
                               2500                 : 
 7442 tgl                      2501 CBC          20 :     return result;
                               2502                 : }
 7228 bruce                    2503 ECB             : 
                               2504                 : 
                               2505                 : static remoteConn *
 7228 bruce                    2506 CBC          75 : getConnectionByName(const char *name)
                               2507                 : {
                               2508                 :     remoteConnHashEnt *hentry;
                               2509                 :     char       *key;
                               2510                 : 
 7188                          2511              75 :     if (!remoteConnHash)
                               2512               2 :         remoteConnHash = createConnHash();
 7228 bruce                    2513 ECB             : 
 4693 itagaki.takahiro         2514 GIC          75 :     key = pstrdup(name);
 4518                          2515              75 :     truncate_identifier(key, strlen(key), false);
 7188 bruce                    2516 CBC          75 :     hentry = (remoteConnHashEnt *) hash_search(remoteConnHash,
                               2517                 :                                                key, HASH_FIND, NULL);
                               2518                 : 
 7188 bruce                    2519 GIC          75 :     if (hentry)
 2061 peter_e                  2520              68 :         return hentry->rconn;
                               2521                 : 
 2061 peter_e                  2522 CBC           7 :     return NULL;
 7228 bruce                    2523 EUB             : }
                               2524                 : 
 7228 bruce                    2525 ECB             : static HTAB *
 7228 bruce                    2526 CBC           3 : createConnHash(void)
 7228 bruce                    2527 ECB             : {
                               2528                 :     HASHCTL     ctl;
                               2529                 : 
 7228 bruce                    2530 CBC           3 :     ctl.keysize = NAMEDATALEN;
 7228 bruce                    2531 GBC           3 :     ctl.entrysize = sizeof(remoteConnHashEnt);
                               2532                 : 
  845 tgl                      2533 GIC           3 :     return hash_create("Remote Con hash", NUMCONN, &ctl,
  845 tgl                      2534 ECB             :                        HASH_ELEM | HASH_STRINGS);
                               2535                 : }
                               2536                 : 
 7199                          2537                 : static void
 5050 bruce                    2538 GIC          10 : createNewConnection(const char *name, remoteConn *rconn)
 7228 bruce                    2539 ECB             : {
                               2540                 :     remoteConnHashEnt *hentry;
 7188 bruce                    2541 EUB             :     bool        found;
                               2542                 :     char       *key;
 7228                          2543                 : 
 7188 bruce                    2544 GBC          10 :     if (!remoteConnHash)
 7199 tgl                      2545               1 :         remoteConnHash = createConnHash();
                               2546                 : 
 4693 itagaki.takahiro         2547              10 :     key = pstrdup(name);
 4693 itagaki.takahiro         2548 GIC          10 :     truncate_identifier(key, strlen(key), true);
 7228 bruce                    2549              10 :     hentry = (remoteConnHashEnt *) hash_search(remoteConnHash, key,
                               2550                 :                                                HASH_ENTER, &found);
                               2551                 : 
 7188                          2552              10 :     if (found)
                               2553                 :     {
   76 andres                   2554 GNC           1 :         libpqsrv_disconnect(rconn->conn);
 4687 itagaki.takahiro         2555 GIC           1 :         pfree(rconn);
                               2556                 : 
 7199 tgl                      2557               1 :         ereport(ERROR,
                               2558                 :                 (errcode(ERRCODE_DUPLICATE_OBJECT),
                               2559                 :                  errmsg("duplicate connection name")));
                               2560                 :     }
                               2561                 : 
 6392 bruce                    2562 CBC           9 :     hentry->rconn = rconn;
 5905 peter_e                  2563 GIC           9 :     strlcpy(hentry->name, name, sizeof(hentry->name));
 7228 bruce                    2564 CBC           9 : }
                               2565                 : 
                               2566                 : static void
 7228 bruce                    2567 GIC           8 : deleteConnection(const char *name)
 7228 bruce                    2568 ECB             : {
                               2569                 :     remoteConnHashEnt *hentry;
 7188                          2570                 :     bool        found;
 4693 itagaki.takahiro         2571                 :     char       *key;
                               2572                 : 
 7188 bruce                    2573 CBC           8 :     if (!remoteConnHash)
 7188 bruce                    2574 UIC           0 :         remoteConnHash = createConnHash();
 7228 bruce                    2575 ECB             : 
 4693 itagaki.takahiro         2576 GIC           8 :     key = pstrdup(name);
 4518 itagaki.takahiro         2577 CBC           8 :     truncate_identifier(key, strlen(key), false);
 7228 bruce                    2578 GIC           8 :     hentry = (remoteConnHashEnt *) hash_search(remoteConnHash,
 7228 bruce                    2579 EUB             :                                                key, HASH_REMOVE, &found);
                               2580                 : 
 7188 bruce                    2581 GIC           8 :     if (!hentry)
 7199 tgl                      2582 UIC           0 :         ereport(ERROR,
                               2583                 :                 (errcode(ERRCODE_UNDEFINED_OBJECT),
 7199 tgl                      2584 ECB             :                  errmsg("undefined connection name")));
 7228 bruce                    2585 GIC           8 : }
                               2586                 : 
 5575 tgl                      2587 ECB             : static void
    1 sfrost                   2588 CBC          19 : dblink_security_check(PGconn *conn, remoteConn *rconn)
                               2589                 : {
    1 sfrost                   2590 GIC          19 :     if (!superuser())
                               2591                 :     {
    1 sfrost                   2592 UIC           0 :         if (!PQconnectionUsedPassword(conn))
    1 sfrost                   2593 ECB             :         {
    1 sfrost                   2594 UNC           0 :             libpqsrv_disconnect(conn);
    1 sfrost                   2595 UIC           0 :             if (rconn)
                               2596               0 :                 pfree(rconn);
                               2597                 : 
                               2598               0 :             ereport(ERROR,
                               2599                 :                     (errcode(ERRCODE_S_R_E_PROHIBITED_SQL_STATEMENT_ATTEMPTED),
                               2600                 :                      errmsg("password is required"),
                               2601                 :                      errdetail("Non-superuser cannot connect if the server does not request a password."),
                               2602                 :                      errhint("Target server's authentication method must be changed.")));
    1 sfrost                   2603 ECB             :         }
                               2604                 :     }
 5575 tgl                      2605 GIC          19 : }
                               2606                 : 
 5312 tgl                      2607 ECB             : /*
    1 sfrost                   2608                 :  * For non-superusers, insist that the connstr specify a password.  This
                               2609                 :  * prevents a password from being picked up from .pgpass, a service file,
                               2610                 :  * the environment, etc.  We don't want the postgres user's passwords
                               2611                 :  * to be accessible to non-superusers.
                               2612                 :  */
                               2613                 : static void
    1 sfrost                   2614 GIC          22 : dblink_connstr_check(const char *connstr)
                               2615                 : {
                               2616              22 :     if (!superuser())
                               2617                 :     {
                               2618                 :         PQconninfoOption *options;
                               2619                 :         PQconninfoOption *option;
    1 sfrost                   2620 CBC           1 :         bool        connstr_gives_password = false;
    1 sfrost                   2621 ECB             : 
    1 sfrost                   2622 GIC           1 :         options = PQconninfoParse(connstr, NULL);
    1 sfrost                   2623 CBC           1 :         if (options)
                               2624                 :         {
                               2625              40 :             for (option = options; option->keyword != NULL; option++)
 5312 tgl                      2626 ECB             :             {
    1 sfrost                   2627 GIC          39 :                 if (strcmp(option->keyword, "password") == 0)
                               2628                 :                 {
                               2629               1 :                     if (option->val != NULL && option->val[0] != '\0')
                               2630                 :                     {
    1 sfrost                   2631 UIC           0 :                         connstr_gives_password = true;
    1 sfrost                   2632 UBC           0 :                         break;
                               2633                 :                     }
 5312 tgl                      2634 ECB             :                 }
                               2635                 :             }
    1 sfrost                   2636 CBC           1 :             PQconninfoFree(options);
 5312 tgl                      2637 ECB             :         }
                               2638                 : 
    1 sfrost                   2639 GIC           1 :         if (!connstr_gives_password)
                               2640               1 :             ereport(ERROR,
                               2641                 :                     (errcode(ERRCODE_S_R_E_PROHIBITED_SQL_STATEMENT_ATTEMPTED),
                               2642                 :                      errmsg("password is required"),
                               2643                 :                      errdetail("Non-superusers must provide a password in the connection string.")));
    1 sfrost                   2644 ECB             :     }
 5312 tgl                      2645 GBC          21 : }
                               2646                 : 
                               2647                 : /*
                               2648                 :  * Report an error received from the remote server
                               2649                 :  *
                               2650                 :  * res: the received error result (will be freed)
                               2651                 :  * fail: true for ERROR ereport, false for NOTICE
                               2652                 :  * fmt and following args: sprintf-style format and values for errcontext;
 1844 tgl                      2653 ECB             :  * the resulting string should be worded like "while <some action>"
                               2654                 :  */
                               2655                 : static void
 2299 mail                     2656 GIC          12 : dblink_res_error(PGconn *conn, const char *conname, PGresult *res,
                               2657                 :                  bool fail, const char *fmt,...)
                               2658                 : {
                               2659                 :     int         level;
 5393                          2660              12 :     char       *pg_diag_sqlstate = PQresultErrorField(res, PG_DIAG_SQLSTATE);
                               2661              12 :     char       *pg_diag_message_primary = PQresultErrorField(res, PG_DIAG_MESSAGE_PRIMARY);
 5393 mail                     2662 CBC          12 :     char       *pg_diag_message_detail = PQresultErrorField(res, PG_DIAG_MESSAGE_DETAIL);
                               2663              12 :     char       *pg_diag_message_hint = PQresultErrorField(res, PG_DIAG_MESSAGE_HINT);
                               2664              12 :     char       *pg_diag_context = PQresultErrorField(res, PG_DIAG_CONTEXT);
                               2665                 :     int         sqlstate;
 5393 mail                     2666 ECB             :     char       *message_primary;
                               2667                 :     char       *message_detail;
                               2668                 :     char       *message_hint;
                               2669                 :     char       *message_context;
                               2670                 :     va_list     ap;
                               2671                 :     char        dblink_context_msg[512];
                               2672                 : 
 5393 mail                     2673 GIC          12 :     if (fail)
                               2674               3 :         level = ERROR;
                               2675                 :     else
                               2676               9 :         level = NOTICE;
                               2677                 : 
                               2678              12 :     if (pg_diag_sqlstate)
 5393 mail                     2679 CBC          12 :         sqlstate = MAKE_SQLSTATE(pg_diag_sqlstate[0],
                               2680                 :                                  pg_diag_sqlstate[1],
                               2681                 :                                  pg_diag_sqlstate[2],
                               2682                 :                                  pg_diag_sqlstate[3],
                               2683                 :                                  pg_diag_sqlstate[4]);
                               2684                 :     else
 5393 mail                     2685 LBC           0 :         sqlstate = ERRCODE_CONNECTION_FAILURE;
                               2686                 : 
 2296 peter_e                  2687 CBC          12 :     message_primary = xpstrdup(pg_diag_message_primary);
 2296 peter_e                  2688 GIC          12 :     message_detail = xpstrdup(pg_diag_message_detail);
                               2689              12 :     message_hint = xpstrdup(pg_diag_message_hint);
                               2690              12 :     message_context = xpstrdup(pg_diag_context);
                               2691                 : 
                               2692                 :     /*
                               2693                 :      * If we don't get a message from the PGresult, try the PGconn.  This is
                               2694                 :      * needed because for connection-level failures, PQexec may just return
                               2695                 :      * NULL, not a PGresult at all.
                               2696                 :      */
 2299 mail                     2697 CBC          12 :     if (message_primary == NULL)
 2232 peter_e                  2698 UIC           0 :         message_primary = pchomp(PQerrorMessage(conn));
                               2699                 : 
                               2700                 :     /*
                               2701                 :      * Now that we've copied all the data we need out of the PGresult, it's
                               2702                 :      * safe to free it.  We must do this to avoid PGresult leakage.  We're
                               2703                 :      * leaking all the strings too, but those are in palloc'd memory that will
                               2704                 :      * get cleaned up eventually.
                               2705                 :      */
  280 peter                    2706 GNC          12 :     PQclear(res);
 5393 mail                     2707 ECB             : 
 1844 tgl                      2708                 :     /*
 1844 tgl                      2709 EUB             :      * Format the basic errcontext string.  Below, we'll add on something
                               2710                 :      * about the connection name.  That's a violation of the translatability
                               2711                 :      * guidelines about constructing error messages out of parts, but since
                               2712                 :      * there's no translation support for dblink, there's no need to worry
                               2713                 :      * about that (yet).
                               2714                 :      */
 1844 tgl                      2715 GIC          12 :     va_start(ap, fmt);
 1844 tgl                      2716 CBC          12 :     vsnprintf(dblink_context_msg, sizeof(dblink_context_msg), fmt, ap);
                               2717              12 :     va_end(ap);
 5393 mail                     2718 ECB             : 
 5393 mail                     2719 GIC          12 :     ereport(level,
 5050 bruce                    2720 ECB             :             (errcode(sqlstate),
                               2721                 :              (message_primary != NULL && message_primary[0] != '\0') ?
  492 fujii                    2722                 :              errmsg_internal("%s", message_primary) :
 2300 mail                     2723                 :              errmsg("could not obtain message string for remote error"),
 4285 tgl                      2724                 :              message_detail ? errdetail_internal("%s", message_detail) : 0,
                               2725                 :              message_hint ? errhint("%s", message_hint) : 0,
 1844                          2726                 :              message_context ? (errcontext("%s", message_context)) : 0,
                               2727                 :              conname ?
                               2728                 :              (errcontext("%s on dblink connection named \"%s\"",
                               2729                 :                          dblink_context_msg, conname)) :
                               2730                 :              (errcontext("%s on unnamed dblink connection",
                               2731                 :                          dblink_context_msg))));
 5393 mail                     2732 GBC           9 : }
                               2733                 : 
 5055 mail                     2734 ECB             : /*
                               2735                 :  * Obtain connection string for a foreign server
 5055 mail                     2736 EUB             :  */
                               2737                 : static char *
 5055 mail                     2738 GBC          22 : get_connect_string(const char *servername)
 5055 mail                     2739 EUB             : {
 5050 bruce                    2740 GBC          22 :     ForeignServer *foreign_server = NULL;
                               2741                 :     UserMapping *user_mapping;
                               2742                 :     ListCell   *cell;
 2153 bruce                    2743 ECB             :     StringInfoData buf;
                               2744                 :     ForeignDataWrapper *fdw;
 5050                          2745                 :     AclResult   aclresult;
                               2746                 :     char       *srvname;
 5055 mail                     2747                 : 
 2299                          2748                 :     static const PQconninfoOption *options = NULL;
                               2749                 : 
 2221 peter_e                  2750 GIC          22 :     initStringInfo(&buf);
                               2751                 : 
 2299 mail                     2752 ECB             :     /*
                               2753                 :      * Get list of valid libpq options.
                               2754                 :      *
                               2755                 :      * To avoid unnecessary work, we get the list once and use it throughout
                               2756                 :      * the lifetime of this backend process.  We don't need to care about
                               2757                 :      * memory context issues, because PQconndefaults allocates with malloc.
                               2758                 :      */
 2299 mail                     2759 CBC          22 :     if (!options)
                               2760                 :     {
 2299 mail                     2761 GIC           3 :         options = PQconndefaults();
 2299 mail                     2762 CBC           3 :         if (!options)           /* assume reason for failure is OOM */
 2299 mail                     2763 UIC           0 :             ereport(ERROR,
                               2764                 :                     (errcode(ERRCODE_FDW_OUT_OF_MEMORY),
 2299 mail                     2765 ECB             :                      errmsg("out of memory"),
                               2766                 :                      errdetail("Could not get libpq's default connection options.")));
                               2767                 :     }
                               2768                 : 
                               2769                 :     /* first gather the server connstr options */
 4693 itagaki.takahiro         2770 GIC          22 :     srvname = pstrdup(servername);
 4687                          2771              22 :     truncate_identifier(srvname, strlen(srvname), false);
 4693                          2772              22 :     foreign_server = GetForeignServerByName(srvname, true);
                               2773                 : 
 5055 mail                     2774 CBC          22 :     if (foreign_server)
                               2775                 :     {
 5050 bruce                    2776 GIC           2 :         Oid         serverid = foreign_server->serverid;
                               2777               2 :         Oid         fdwid = foreign_server->fdwid;
                               2778               2 :         Oid         userid = GetUserId();
 5055 mail                     2779 ECB             : 
 5055 mail                     2780 GIC           2 :         user_mapping = GetUserMapping(userid, serverid);
 5050 bruce                    2781 CBC           2 :         fdw = GetForeignDataWrapper(fdwid);
                               2782                 : 
 5055 mail                     2783 ECB             :         /* Check permissions, user must have usage on the server. */
  147 peter                    2784 GNC           2 :         aclresult = object_aclcheck(ForeignServerRelationId, serverid, userid, ACL_USAGE);
 5055 mail                     2785 CBC           2 :         if (aclresult != ACLCHECK_OK)
 1954 peter_e                  2786 UIC           0 :             aclcheck_error(aclresult, OBJECT_FOREIGN_SERVER, foreign_server->servername);
                               2787                 : 
 5050 bruce                    2788 CBC           2 :         foreach(cell, fdw->options)
                               2789                 :         {
 5050 bruce                    2790 UIC           0 :             DefElem    *def = lfirst(cell);
                               2791                 : 
 2299 mail                     2792               0 :             if (is_valid_dblink_option(options, def->defname, ForeignDataWrapperRelationId))
 2221 peter_e                  2793               0 :                 appendStringInfo(&buf, "%s='%s' ", def->defname,
 2299 mail                     2794               0 :                                  escape_param_str(strVal(def->arg)));
                               2795                 :         }
                               2796                 : 
 5050 bruce                    2797 GIC           6 :         foreach(cell, foreign_server->options)
                               2798                 :         {
                               2799               4 :             DefElem    *def = lfirst(cell);
                               2800                 : 
 2299 mail                     2801               4 :             if (is_valid_dblink_option(options, def->defname, ForeignServerRelationId))
 2221 peter_e                  2802               4 :                 appendStringInfo(&buf, "%s='%s' ", def->defname,
 2299 mail                     2803               4 :                                  escape_param_str(strVal(def->arg)));
                               2804                 :         }
                               2805                 : 
 5050 bruce                    2806               4 :         foreach(cell, user_mapping->options)
 5055 mail                     2807 ECB             :         {
                               2808                 : 
 5050 bruce                    2809 GIC           2 :             DefElem    *def = lfirst(cell);
                               2810                 : 
 2299 mail                     2811 CBC           2 :             if (is_valid_dblink_option(options, def->defname, UserMappingRelationId))
 2221 peter_e                  2812               2 :                 appendStringInfo(&buf, "%s='%s' ", def->defname,
 2299 mail                     2813 GIC           2 :                                  escape_param_str(strVal(def->arg)));
                               2814                 :         }
                               2815                 : 
 2221 peter_e                  2816 CBC           2 :         return buf.data;
                               2817                 :     }
                               2818                 :     else
 5055 mail                     2819              20 :         return NULL;
 5055 mail                     2820 EUB             : }
                               2821                 : 
                               2822                 : /*
                               2823                 :  * Escaping libpq connect parameter strings.
                               2824                 :  *
 5055 mail                     2825 ECB             :  * Replaces "'" with "\'" and "\" with "\\".
                               2826                 :  */
                               2827                 : static char *
 5055 mail                     2828 GIC           6 : escape_param_str(const char *str)
 5055 mail                     2829 ECB             : {
                               2830                 :     const char *cp;
 2153 bruce                    2831                 :     StringInfoData buf;
                               2832                 : 
 2221 peter_e                  2833 GIC           6 :     initStringInfo(&buf);
                               2834                 : 
 5055 mail                     2835              64 :     for (cp = str; *cp; cp++)
 5055 mail                     2836 ECB             :     {
 5055 mail                     2837 CBC          58 :         if (*cp == '\\' || *cp == '\'')
 2221 peter_e                  2838 UIC           0 :             appendStringInfoChar(&buf, '\\');
 2221 peter_e                  2839 GIC          58 :         appendStringInfoChar(&buf, *cp);
                               2840                 :     }
                               2841                 : 
 2221 peter_e                  2842 CBC           6 :     return buf.data;
 5055 mail                     2843 ECB             : }
                               2844                 : 
                               2845                 : /*
 4681 tgl                      2846                 :  * Validate the PK-attnums argument for dblink_build_sql_insert() and related
                               2847                 :  * functions, and translate to the internal representation.
                               2848                 :  *
                               2849                 :  * The user supplies an int2vector of 1-based logical attnums, plus a count
                               2850                 :  * argument (the need for the separate count argument is historical, but we
                               2851                 :  * still check it).  We check that each attnum corresponds to a valid,
                               2852                 :  * non-dropped attribute of the rel.  We do *not* prevent attnums from being
                               2853                 :  * listed twice, though the actual use-case for such things is dubious.
                               2854                 :  * Note that before Postgres 9.0, the user's attnums were interpreted as
                               2855                 :  * physical not logical column numbers; this was changed for future-proofing.
 4681 tgl                      2856 EUB             :  *
                               2857                 :  * The internal representation is a palloc'd int array of 0-based physical
                               2858                 :  * attnums.
                               2859                 :  */
 4681 tgl                      2860 ECB             : static void
 4681 tgl                      2861 GIC          18 : validate_pkattnums(Relation rel,
                               2862                 :                    int2vector *pkattnums_arg, int32 pknumatts_arg,
                               2863                 :                    int **pkattnums, int *pknumatts)
                               2864                 : {
                               2865              18 :     TupleDesc   tupdesc = rel->rd_att;
                               2866              18 :     int         natts = tupdesc->natts;
                               2867                 :     int         i;
                               2868                 : 
                               2869                 :     /* Don't take more array elements than there are */
                               2870              18 :     pknumatts_arg = Min(pknumatts_arg, pkattnums_arg->dim1);
                               2871                 : 
                               2872                 :     /* Must have at least one pk attnum selected */
                               2873              18 :     if (pknumatts_arg <= 0)
 4681 tgl                      2874 UIC           0 :         ereport(ERROR,
                               2875                 :                 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
                               2876                 :                  errmsg("number of key attributes must be > 0")));
                               2877                 : 
 4681 tgl                      2878 ECB             :     /* Allocate output array */
  209 peter                    2879 GNC          18 :     *pkattnums = palloc_array(int, pknumatts_arg);
 4681 tgl                      2880 GIC          18 :     *pknumatts = pknumatts_arg;
                               2881                 : 
                               2882                 :     /* Validate attnums and convert to internal form */
                               2883              57 :     for (i = 0; i < pknumatts_arg; i++)
 4813 mail                     2884 ECB             :     {
 4660 bruce                    2885 GIC          45 :         int         pkattnum = pkattnums_arg->values[i];
 4660 bruce                    2886 ECB             :         int         lnum;
                               2887                 :         int         j;
                               2888                 : 
 4681 tgl                      2889                 :         /* Can throw error immediately if out of range */
 4681 tgl                      2890 CBC          45 :         if (pkattnum <= 0 || pkattnum > natts)
 4681 tgl                      2891 GIC           6 :             ereport(ERROR,
                               2892                 :                     (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
 4681 tgl                      2893 ECB             :                      errmsg("invalid attribute number %d", pkattnum)));
                               2894                 : 
                               2895                 :         /* Identify which physical column has this logical number */
 4681 tgl                      2896 GIC          39 :         lnum = 0;
 4681 tgl                      2897 CBC          69 :         for (j = 0; j < natts; j++)
 4681 tgl                      2898 ECB             :         {
                               2899                 :             /* dropped columns don't count */
 2058 andres                   2900 GIC          69 :             if (TupleDescAttr(tupdesc, j)->attisdropped)
 4681 tgl                      2901               3 :                 continue;
                               2902                 : 
                               2903              66 :             if (++lnum == pkattnum)
 4681 tgl                      2904 CBC          39 :                 break;
                               2905                 :         }
 4681 tgl                      2906 ECB             : 
 4681 tgl                      2907 CBC          39 :         if (j < natts)
 4681 tgl                      2908 GIC          39 :             (*pkattnums)[i] = j;
                               2909                 :         else
 4681 tgl                      2910 UIC           0 :             ereport(ERROR,
 4681 tgl                      2911 ECB             :                     (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
                               2912                 :                      errmsg("invalid attribute number %d", pkattnum)));
                               2913                 :     }
 4813 mail                     2914 GIC          12 : }
 3833 tgl                      2915 ECB             : 
                               2916                 : /*
                               2917                 :  * Check if the specified connection option is valid.
                               2918                 :  *
                               2919                 :  * We basically allow whatever libpq thinks is an option, with these
                               2920                 :  * restrictions:
                               2921                 :  *      debug options: disallowed
                               2922                 :  *      "client_encoding": disallowed
                               2923                 :  *      "user": valid only in USER MAPPING options
                               2924                 :  *      secure options (eg password): valid only in USER MAPPING options
                               2925                 :  *      others: valid only in FOREIGN SERVER options
                               2926                 :  *
                               2927                 :  * We disallow client_encoding because it would be overridden anyway via
                               2928                 :  * PQclientEncoding; allowing it to be specified would merely promote
                               2929                 :  * confusion.
                               2930                 :  */
                               2931                 : static bool
 3833 tgl                      2932 GIC          89 : is_valid_dblink_option(const PQconninfoOption *options, const char *option,
                               2933                 :                        Oid context)
                               2934                 : {
                               2935                 :     const PQconninfoOption *opt;
 3833 tgl                      2936 ECB             : 
                               2937                 :     /* Look up the option in libpq result */
 3833 tgl                      2938 GIC        1697 :     for (opt = options; opt->keyword; opt++)
 3833 tgl                      2939 ECB             :     {
 3833 tgl                      2940 GIC        1695 :         if (strcmp(opt->keyword, option) == 0)
 3833 tgl                      2941 CBC          87 :             break;
 3833 tgl                      2942 ECB             :     }
 3833 tgl                      2943 GIC          89 :     if (opt->keyword == NULL)
                               2944               2 :         return false;
                               2945                 : 
                               2946                 :     /* Disallow debug options (particularly "replication") */
                               2947              87 :     if (strchr(opt->dispchar, 'D'))
                               2948               2 :         return false;
                               2949                 : 
                               2950                 :     /* Disallow "client_encoding" */
 3833 tgl                      2951 CBC          85 :     if (strcmp(opt->keyword, "client_encoding") == 0)
 3833 tgl                      2952 GBC           2 :         return false;
                               2953                 : 
                               2954                 :     /*
                               2955                 :      * If the option is "user" or marked secure, it should be specified only
                               2956                 :      * in USER MAPPING.  Others should be specified only in SERVER.
                               2957                 :      */
 3833 tgl                      2958 CBC          83 :     if (strcmp(opt->keyword, "user") == 0 || strchr(opt->dispchar, '*'))
 3833 tgl                      2959 ECB             :     {
 3833 tgl                      2960 GIC           9 :         if (context != UserMappingRelationId)
 3833 tgl                      2961 CBC           3 :             return false;
 3833 tgl                      2962 ECB             :     }
                               2963                 :     else
                               2964                 :     {
 3833 tgl                      2965 CBC          74 :         if (context != ForeignServerRelationId)
                               2966              68 :             return false;
                               2967                 :     }
                               2968                 : 
                               2969              12 :     return true;
                               2970                 : }
                               2971                 : 
                               2972                 : /*
                               2973                 :  * Copy the remote session's values of GUCs that affect datatype I/O
 3670 tgl                      2974 ECB             :  * and apply them locally in a new GUC nesting level.  Returns the new
                               2975                 :  * nestlevel (which is needed by restoreLocalGucs to undo the settings),
                               2976                 :  * or -1 if no new nestlevel was needed.
                               2977                 :  *
                               2978                 :  * We use the equivalent of a function SET option to allow the settings to
                               2979                 :  * persist only until the caller calls restoreLocalGucs.  If an error is
                               2980                 :  * thrown in between, guc.c will take care of undoing the settings.
                               2981                 :  */
                               2982                 : static int
 3670 tgl                      2983 GIC          32 : applyRemoteGucs(PGconn *conn)
 3670 tgl                      2984 ECB             : {
                               2985                 :     static const char *const GUCsAffectingIO[] = {
                               2986                 :         "DateStyle",
                               2987                 :         "IntervalStyle"
                               2988                 :     };
                               2989                 : 
 3670 tgl                      2990 GIC          32 :     int         nestlevel = -1;
                               2991                 :     int         i;
                               2992                 : 
                               2993              96 :     for (i = 0; i < lengthof(GUCsAffectingIO); i++)
                               2994                 :     {
                               2995              64 :         const char *gucName = GUCsAffectingIO[i];
                               2996              64 :         const char *remoteVal = PQparameterStatus(conn, gucName);
                               2997                 :         const char *localVal;
                               2998                 : 
                               2999                 :         /*
                               3000                 :          * If the remote server is pre-8.4, it won't have IntervalStyle, but
                               3001                 :          * that's okay because its output format won't be ambiguous.  So just
                               3002                 :          * skip the GUC if we don't get a value for it.  (We might eventually
                               3003                 :          * need more complicated logic with remote-version checks here.)
                               3004                 :          */
                               3005              64 :         if (remoteVal == NULL)
 3670 tgl                      3006 UIC           0 :             continue;
                               3007                 : 
                               3008                 :         /*
                               3009                 :          * Avoid GUC-setting overhead if the remote and local GUCs already
                               3010                 :          * have the same value.
                               3011                 :          */
 3670 tgl                      3012 GIC          64 :         localVal = GetConfigOption(gucName, false, false);
                               3013              64 :         Assert(localVal != NULL);
                               3014                 : 
                               3015              64 :         if (strcmp(remoteVal, localVal) == 0)
                               3016              47 :             continue;
                               3017                 : 
                               3018                 :         /* Create new GUC nest level if we didn't already */
                               3019              17 :         if (nestlevel < 0)
                               3020               9 :             nestlevel = NewGUCNestLevel();
                               3021                 : 
                               3022                 :         /* Apply the option (this will throw error on failure) */
                               3023              17 :         (void) set_config_option(gucName, remoteVal,
                               3024                 :                                  PGC_USERSET, PGC_S_SESSION,
                               3025                 :                                  GUC_ACTION_SAVE, true, 0, false);
                               3026                 :     }
                               3027                 : 
                               3028              32 :     return nestlevel;
                               3029                 : }
                               3030                 : 
                               3031                 : /*
                               3032                 :  * Restore local GUCs after they have been overlaid with remote settings.
                               3033                 :  */
                               3034                 : static void
                               3035              33 : restoreLocalGucs(int nestlevel)
                               3036                 : {
                               3037                 :     /* Do nothing if no new nestlevel was created */
                               3038              33 :     if (nestlevel > 0)
                               3039               8 :         AtEOXact_GUC(true, nestlevel);
                               3040              33 : }
        

Generated by: LCOV version v1.16-55-g56c0a2a