LCOV - differential code coverage report
Current view: top level - contrib/dblink - dblink.c (source / functions) Coverage Total Hit UNC LBC UIC UBC GBC GIC GNC CBC EUB ECB DUB DCB
Current: Differential Code Coverage HEAD vs 15 Lines: 86.9 % 1078 937 3 42 90 6 38 568 22 309 93 552 4 34
Current Date: 2023-04-08 15:15:32 Functions: 97.4 % 76 74 2 72 1 1 2 69 3
Baseline: 15
Baseline Date: 2023-04-08 15:09:40
Legend: Lines: hit not hit

           TLA  Line data    Source code
       1                 : /*
       2                 :  * dblink.c
       3                 :  *
       4                 :  * Functions returning results from a remote database
       5                 :  *
       6                 :  * Joe Conway <mail@joeconway.com>
       7                 :  * And contributors:
       8                 :  * Darko Prenosil <Darko.Prenosil@finteh.hr>
       9                 :  * Shridhar Daithankar <shridhar_daithankar@persistent.co.in>
      10                 :  *
      11                 :  * contrib/dblink/dblink.c
      12                 :  * Copyright (c) 2001-2023, PostgreSQL Global Development Group
      13                 :  * ALL RIGHTS RESERVED;
      14                 :  *
      15                 :  * Permission to use, copy, modify, and distribute this software and its
      16                 :  * documentation for any purpose, without fee, and without a written agreement
      17                 :  * is hereby granted, provided that the above copyright notice and this
      18                 :  * paragraph and the following two paragraphs appear in all copies.
      19                 :  *
      20                 :  * IN NO EVENT SHALL THE AUTHOR OR DISTRIBUTORS BE LIABLE TO ANY PARTY FOR
      21                 :  * DIRECT, INDIRECT, SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES, INCLUDING
      22                 :  * LOST PROFITS, ARISING OUT OF THE USE OF THIS SOFTWARE AND ITS
      23                 :  * DOCUMENTATION, EVEN IF THE AUTHOR OR DISTRIBUTORS HAVE BEEN ADVISED OF THE
      24                 :  * POSSIBILITY OF SUCH DAMAGE.
      25                 :  *
      26                 :  * THE AUTHOR AND DISTRIBUTORS SPECIFICALLY DISCLAIMS ANY WARRANTIES,
      27                 :  * INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY
      28                 :  * AND FITNESS FOR A PARTICULAR PURPOSE.  THE SOFTWARE PROVIDED HEREUNDER IS
      29                 :  * ON AN "AS IS" BASIS, AND THE AUTHOR AND DISTRIBUTORS HAS NO OBLIGATIONS TO
      30                 :  * PROVIDE MAINTENANCE, SUPPORT, UPDATES, ENHANCEMENTS, OR MODIFICATIONS.
      31                 :  *
      32                 :  */
      33                 : #include "postgres.h"
      34                 : 
      35                 : #include <limits.h>
      36                 : 
      37                 : #include "access/htup_details.h"
      38                 : #include "access/relation.h"
      39                 : #include "access/reloptions.h"
      40                 : #include "access/table.h"
      41                 : #include "catalog/namespace.h"
      42                 : #include "catalog/pg_foreign_data_wrapper.h"
      43                 : #include "catalog/pg_foreign_server.h"
      44                 : #include "catalog/pg_type.h"
      45                 : #include "catalog/pg_user_mapping.h"
      46                 : #include "executor/spi.h"
      47                 : #include "foreign/foreign.h"
      48                 : #include "funcapi.h"
      49                 : #include "lib/stringinfo.h"
      50                 : #include "libpq-fe.h"
      51                 : #include "libpq/libpq-be-fe-helpers.h"
      52                 : #include "mb/pg_wchar.h"
      53                 : #include "miscadmin.h"
      54                 : #include "parser/scansup.h"
      55                 : #include "utils/acl.h"
      56                 : #include "utils/builtins.h"
      57                 : #include "utils/fmgroids.h"
      58                 : #include "utils/guc.h"
      59                 : #include "utils/lsyscache.h"
      60                 : #include "utils/memutils.h"
      61                 : #include "utils/rel.h"
      62                 : #include "utils/varlena.h"
      63                 : 
      64 GIC           3 : PG_MODULE_MAGIC;
      65 ECB             : 
      66                 : typedef struct remoteConn
      67                 : {
      68                 :     PGconn     *conn;           /* Hold the remote connection */
      69                 :     int         openCursorCount;    /* The number of open cursors */
      70                 :     bool        newXactForCursor;   /* Opened a transaction for a cursor */
      71                 : } remoteConn;
      72                 : 
      73                 : typedef struct storeInfo
      74                 : {
      75                 :     FunctionCallInfo fcinfo;
      76                 :     Tuplestorestate *tuplestore;
      77                 :     AttInMetadata *attinmeta;
      78                 :     MemoryContext tmpcontext;
      79                 :     char      **cstrs;
      80                 :     /* temp storage for results to avoid leaks on exception */
      81                 :     PGresult   *last_res;
      82                 :     PGresult   *cur_res;
      83                 : } storeInfo;
      84                 : 
      85                 : /*
      86                 :  * Internal declarations
      87                 :  */
      88                 : static Datum dblink_record_internal(FunctionCallInfo fcinfo, bool is_async);
      89                 : static void prepTuplestoreResult(FunctionCallInfo fcinfo);
      90                 : static void materializeResult(FunctionCallInfo fcinfo, PGconn *conn,
      91                 :                               PGresult *res);
      92                 : static void materializeQueryResult(FunctionCallInfo fcinfo,
      93                 :                                    PGconn *conn,
      94                 :                                    const char *conname,
      95                 :                                    const char *sql,
      96                 :                                    bool fail);
      97                 : static PGresult *storeQueryResult(volatile storeInfo *sinfo, PGconn *conn, const char *sql);
      98                 : static void storeRow(volatile storeInfo *sinfo, PGresult *res, bool first);
      99                 : static remoteConn *getConnectionByName(const char *name);
     100                 : static HTAB *createConnHash(void);
     101                 : static void createNewConnection(const char *name, remoteConn *rconn);
     102                 : static void deleteConnection(const char *name);
     103                 : static char **get_pkey_attnames(Relation rel, int16 *indnkeyatts);
     104                 : static char **get_text_array_contents(ArrayType *array, int *numitems);
     105                 : static char *get_sql_insert(Relation rel, int *pkattnums, int pknumatts, char **src_pkattvals, char **tgt_pkattvals);
     106                 : static char *get_sql_delete(Relation rel, int *pkattnums, int pknumatts, char **tgt_pkattvals);
     107                 : static char *get_sql_update(Relation rel, int *pkattnums, int pknumatts, char **src_pkattvals, char **tgt_pkattvals);
     108                 : static char *quote_ident_cstr(char *rawstr);
     109                 : static int  get_attnum_pk_pos(int *pkattnums, int pknumatts, int key);
     110                 : static HeapTuple get_tuple_of_interest(Relation rel, int *pkattnums, int pknumatts, char **src_pkattvals);
     111                 : static Relation get_rel_from_relname(text *relname_text, LOCKMODE lockmode, AclMode aclmode);
     112                 : static char *generate_relation_name(Relation rel);
     113                 : static void dblink_connstr_check(const char *connstr);
     114                 : static void dblink_security_check(PGconn *conn, remoteConn *rconn);
     115                 : static void dblink_res_error(PGconn *conn, const char *conname, PGresult *res,
     116                 :                              bool fail, const char *fmt,...) pg_attribute_printf(5, 6);
     117                 : static char *get_connect_string(const char *servername);
     118                 : static char *escape_param_str(const char *str);
     119                 : static void validate_pkattnums(Relation rel,
     120                 :                                int2vector *pkattnums_arg, int32 pknumatts_arg,
     121                 :                                int **pkattnums, int *pknumatts);
     122                 : static bool is_valid_dblink_option(const PQconninfoOption *options,
     123                 :                                    const char *option, Oid context);
     124                 : static int  applyRemoteGucs(PGconn *conn);
     125                 : static void restoreLocalGucs(int nestlevel);
     126                 : 
     127                 : /* Global */
     128                 : static remoteConn *pconn = NULL;
     129                 : static HTAB *remoteConnHash = NULL;
     130                 : 
     131                 : /*
     132                 :  *  Following is list that holds multiple remote connections.
     133                 :  *  Calling convention of each dblink function changes to accept
     134                 :  *  connection name as the first parameter. The connection list is
     135                 :  *  much like ecpg e.g. a mapping between a name and a PGconn object.
     136                 :  */
     137                 : 
     138                 : typedef struct remoteConnHashEnt
     139                 : {
     140                 :     char        name[NAMEDATALEN];
     141                 :     remoteConn *rconn;
     142                 : } remoteConnHashEnt;
     143                 : 
     144                 : /* initial number of connection hashes */
     145                 : #define NUMCONN 16
     146                 : 
     147                 : static char *
     148 GIC          48 : xpstrdup(const char *in)
     149 ECB             : {
     150 GIC          48 :     if (in == NULL)
     151 CBC          36 :         return NULL;
     152              12 :     return pstrdup(in);
     153 ECB             : }
     154                 : 
     155                 : static void
     156                 : pg_attribute_noreturn()
     157 UIC           0 : dblink_res_internalerror(PGconn *conn, PGresult *res, const char *p2)
     158 EUB             : {
     159 UIC           0 :     char       *msg = pchomp(PQerrorMessage(conn));
     160 EUB             : 
     161 UNC           0 :     PQclear(res);
     162 UBC           0 :     elog(ERROR, "%s: %s", p2, msg);
     163                 : }
     164                 : 
     165                 : static void
     166                 : pg_attribute_noreturn()
     167 CBC           3 : dblink_conn_not_avail(const char *conname)
     168                 : {
     169               3 :     if (conname)
     170               1 :         ereport(ERROR,
     171                 :                 (errcode(ERRCODE_CONNECTION_DOES_NOT_EXIST),
     172                 :                  errmsg("connection \"%s\" not available", conname)));
     173                 :     else
     174               2 :         ereport(ERROR,
     175                 :                 (errcode(ERRCODE_CONNECTION_DOES_NOT_EXIST),
     176                 :                  errmsg("connection not available")));
     177                 : }
     178                 : 
     179                 : static void
     180              33 : dblink_get_conn(char *conname_or_str,
     181                 :                 PGconn *volatile *conn_p, char **conname_p, volatile bool *freeconn_p)
     182                 : {
     183              33 :     remoteConn *rconn = getConnectionByName(conname_or_str);
     184                 :     PGconn     *conn;
     185                 :     char       *conname;
     186                 :     bool        freeconn;
     187                 : 
     188              33 :     if (rconn)
     189                 :     {
     190              27 :         conn = rconn->conn;
     191              27 :         conname = conname_or_str;
     192              27 :         freeconn = false;
     193                 :     }
     194                 :     else
     195                 :     {
     196                 :         const char *connstr;
     197                 : 
     198               6 :         connstr = get_connect_string(conname_or_str);
     199               6 :         if (connstr == NULL)
     200               6 :             connstr = conname_or_str;
     201               6 :         dblink_connstr_check(connstr);
     202                 : 
     203 ECB             :         /* OK to make connection */
     204 GNC           6 :         conn = libpqsrv_connect(connstr, PG_WAIT_EXTENSION);
     205                 : 
     206 GIC           6 :         if (PQstatus(conn) == CONNECTION_BAD)
     207 ECB             :         {
     208 GIC           2 :             char       *msg = pchomp(PQerrorMessage(conn));
     209 ECB             : 
     210 GNC           2 :             libpqsrv_disconnect(conn);
     211 CBC           2 :             ereport(ERROR,
     212                 :                     (errcode(ERRCODE_SQLCLIENT_UNABLE_TO_ESTABLISH_SQLCONNECTION),
     213 EUB             :                      errmsg("could not establish connection"),
     214                 :                      errdetail_internal("%s", msg)));
     215                 :         }
     216 GIC           4 :         dblink_security_check(conn, rconn);
     217               4 :         if (PQclientEncoding(conn) != GetDatabaseEncoding())
     218 LBC           0 :             PQsetClientEncoding(conn, GetDatabaseEncodingName());
     219 GIC           4 :         freeconn = true;
     220 CBC           4 :         conname = NULL;
     221                 :     }
     222 ECB             : 
     223 CBC          31 :     *conn_p = conn;
     224              31 :     *conname_p = conname;
     225              31 :     *freeconn_p = freeconn;
     226 GIC          31 : }
     227 ECB             : 
     228                 : static PGconn *
     229 GIC          17 : dblink_get_named_conn(const char *conname)
     230                 : {
     231              17 :     remoteConn *rconn = getConnectionByName(conname);
     232 ECB             : 
     233 GIC          17 :     if (rconn)
     234 CBC          17 :         return rconn->conn;
     235                 : 
     236 LBC           0 :     dblink_conn_not_avail(conname);
     237 ECB             :     return NULL;                /* keep compiler quiet */
     238                 : }
     239                 : 
     240                 : static void
     241 CBC         120 : dblink_init(void)
     242                 : {
     243             120 :     if (!pconn)
     244                 :     {
     245               3 :         pconn = (remoteConn *) MemoryContextAlloc(TopMemoryContext, sizeof(remoteConn));
     246 GIC           3 :         pconn->conn = NULL;
     247 CBC           3 :         pconn->openCursorCount = 0;
     248               3 :         pconn->newXactForCursor = false;
     249                 :     }
     250             120 : }
     251 ECB             : 
     252                 : /*
     253                 :  * Create a persistent connection to another database
     254                 :  */
     255 CBC           9 : PG_FUNCTION_INFO_V1(dblink_connect);
     256                 : Datum
     257              16 : dblink_connect(PG_FUNCTION_ARGS)
     258 ECB             : {
     259 CBC          16 :     char       *conname_or_str = NULL;
     260 GIC          16 :     char       *connstr = NULL;
     261              16 :     char       *connname = NULL;
     262                 :     char       *msg;
     263 CBC          16 :     PGconn     *conn = NULL;
     264              16 :     remoteConn *rconn = NULL;
     265 ECB             : 
     266 GIC          16 :     dblink_init();
     267                 : 
     268 CBC          16 :     if (PG_NARGS() == 2)
     269                 :     {
     270 GIC          11 :         conname_or_str = text_to_cstring(PG_GETARG_TEXT_PP(1));
     271 CBC          11 :         connname = text_to_cstring(PG_GETARG_TEXT_PP(0));
     272                 :     }
     273               5 :     else if (PG_NARGS() == 1)
     274 GIC           5 :         conname_or_str = text_to_cstring(PG_GETARG_TEXT_PP(0));
     275 EUB             : 
     276 GBC          16 :     if (connname)
     277 EUB             :     {
     278 GBC          11 :         rconn = (remoteConn *) MemoryContextAlloc(TopMemoryContext,
     279                 :                                                   sizeof(remoteConn));
     280              11 :         rconn->conn = NULL;
     281 GIC          11 :         rconn->openCursorCount = 0;
     282              11 :         rconn->newXactForCursor = false;
     283                 :     }
     284                 : 
     285                 :     /* first check for valid foreign data server */
     286              16 :     connstr = get_connect_string(conname_or_str);
     287 CBC          16 :     if (connstr == NULL)
     288 GIC          14 :         connstr = conname_or_str;
     289                 : 
     290 ECB             :     /* check password in connection string if not superuser */
     291 GBC          16 :     dblink_connstr_check(connstr);
     292                 : 
     293 ECB             :     /* OK to make connection */
     294 GNC          15 :     conn = libpqsrv_connect(connstr, PG_WAIT_EXTENSION);
     295 ECB             : 
     296 GIC          15 :     if (PQstatus(conn) == CONNECTION_BAD)
     297 ECB             :     {
     298 UIC           0 :         msg = pchomp(PQerrorMessage(conn));
     299 UNC           0 :         libpqsrv_disconnect(conn);
     300 LBC           0 :         if (rconn)
     301               0 :             pfree(rconn);
     302 ECB             : 
     303 LBC           0 :         ereport(ERROR,
     304                 :                 (errcode(ERRCODE_SQLCLIENT_UNABLE_TO_ESTABLISH_SQLCONNECTION),
     305                 :                  errmsg("could not establish connection"),
     306 ECB             :                  errdetail_internal("%s", msg)));
     307                 :     }
     308                 : 
     309                 :     /* check password actually used if not superuser */
     310 GIC          15 :     dblink_security_check(conn, rconn);
     311 ECB             : 
     312                 :     /* attempt to set client encoding to match server encoding, if needed */
     313 GIC          15 :     if (PQclientEncoding(conn) != GetDatabaseEncoding())
     314 LBC           0 :         PQsetClientEncoding(conn, GetDatabaseEncodingName());
     315 ECB             : 
     316 GIC          15 :     if (connname)
     317                 :     {
     318 CBC          10 :         rconn->conn = conn;
     319 GIC          10 :         createNewConnection(connname, rconn);
     320 ECB             :     }
     321                 :     else
     322                 :     {
     323 GIC           5 :         if (pconn->conn)
     324 GNC           1 :             libpqsrv_disconnect(pconn->conn);
     325 CBC           5 :         pconn->conn = conn;
     326                 :     }
     327 ECB             : 
     328 GIC          14 :     PG_RETURN_TEXT_P(cstring_to_text("OK"));
     329 ECB             : }
     330                 : 
     331                 : /*
     332                 :  * Clear a persistent connection to another database
     333                 :  */
     334 CBC           6 : PG_FUNCTION_INFO_V1(dblink_disconnect);
     335                 : Datum
     336              13 : dblink_disconnect(PG_FUNCTION_ARGS)
     337 ECB             : {
     338 GIC          13 :     char       *conname = NULL;
     339 CBC          13 :     remoteConn *rconn = NULL;
     340 GIC          13 :     PGconn     *conn = NULL;
     341                 : 
     342 CBC          13 :     dblink_init();
     343 ECB             : 
     344 CBC          13 :     if (PG_NARGS() == 1)
     345                 :     {
     346               9 :         conname = text_to_cstring(PG_GETARG_TEXT_PP(0));
     347 GIC           9 :         rconn = getConnectionByName(conname);
     348               9 :         if (rconn)
     349 CBC           8 :             conn = rconn->conn;
     350                 :     }
     351 ECB             :     else
     352 CBC           4 :         conn = pconn->conn;
     353 ECB             : 
     354 CBC          13 :     if (!conn)
     355 GIC           1 :         dblink_conn_not_avail(conname);
     356                 : 
     357 GNC          12 :     libpqsrv_disconnect(conn);
     358 CBC          12 :     if (rconn)
     359 ECB             :     {
     360 CBC           8 :         deleteConnection(conname);
     361 GIC           8 :         pfree(rconn);
     362                 :     }
     363 ECB             :     else
     364 GIC           4 :         pconn->conn = NULL;
     365                 : 
     366 CBC          12 :     PG_RETURN_TEXT_P(cstring_to_text("OK"));
     367 ECB             : }
     368                 : 
     369                 : /*
     370                 :  * opens a cursor using a persistent connection
     371                 :  */
     372 GIC           9 : PG_FUNCTION_INFO_V1(dblink_open);
     373 ECB             : Datum
     374 GBC           9 : dblink_open(PG_FUNCTION_ARGS)
     375                 : {
     376 CBC           9 :     PGresult   *res = NULL;
     377                 :     PGconn     *conn;
     378 GIC           9 :     char       *curname = NULL;
     379 CBC           9 :     char       *sql = NULL;
     380 GIC           9 :     char       *conname = NULL;
     381 ECB             :     StringInfoData buf;
     382 CBC           9 :     remoteConn *rconn = NULL;
     383 GBC           9 :     bool        fail = true;    /* default to backward compatible behavior */
     384 ECB             : 
     385 CBC           9 :     dblink_init();
     386 GIC           9 :     initStringInfo(&buf);
     387                 : 
     388               9 :     if (PG_NARGS() == 2)
     389                 :     {
     390                 :         /* text,text */
     391               2 :         curname = text_to_cstring(PG_GETARG_TEXT_PP(0));
     392 CBC           2 :         sql = text_to_cstring(PG_GETARG_TEXT_PP(1));
     393 GIC           2 :         rconn = pconn;
     394                 :     }
     395               7 :     else if (PG_NARGS() == 3)
     396 ECB             :     {
     397                 :         /* might be text,text,text or text,text,bool */
     398 GIC           6 :         if (get_fn_expr_argtype(fcinfo->flinfo, 2) == BOOLOID)
     399 ECB             :         {
     400 CBC           1 :             curname = text_to_cstring(PG_GETARG_TEXT_PP(0));
     401               1 :             sql = text_to_cstring(PG_GETARG_TEXT_PP(1));
     402 GIC           1 :             fail = PG_GETARG_BOOL(2);
     403 CBC           1 :             rconn = pconn;
     404                 :         }
     405 ECB             :         else
     406                 :         {
     407 GIC           5 :             conname = text_to_cstring(PG_GETARG_TEXT_PP(0));
     408 CBC           5 :             curname = text_to_cstring(PG_GETARG_TEXT_PP(1));
     409               5 :             sql = text_to_cstring(PG_GETARG_TEXT_PP(2));
     410 GIC           5 :             rconn = getConnectionByName(conname);
     411                 :         }
     412                 :     }
     413               1 :     else if (PG_NARGS() == 4)
     414                 :     {
     415 ECB             :         /* text,text,text,bool */
     416 GIC           1 :         conname = text_to_cstring(PG_GETARG_TEXT_PP(0));
     417 CBC           1 :         curname = text_to_cstring(PG_GETARG_TEXT_PP(1));
     418 GIC           1 :         sql = text_to_cstring(PG_GETARG_TEXT_PP(2));
     419               1 :         fail = PG_GETARG_BOOL(3);
     420 CBC           1 :         rconn = getConnectionByName(conname);
     421 ECB             :     }
     422                 : 
     423 GIC           9 :     if (!rconn || !rconn->conn)
     424 LBC           0 :         dblink_conn_not_avail(conname);
     425 ECB             : 
     426 GIC           9 :     conn = rconn->conn;
     427 ECB             : 
     428                 :     /* If we are not in a transaction, start one */
     429 GIC           9 :     if (PQtransactionStatus(conn) == PQTRANS_IDLE)
     430 ECB             :     {
     431 GIC           7 :         res = PQexec(conn, "BEGIN");
     432               7 :         if (PQresultStatus(res) != PGRES_COMMAND_OK)
     433 UBC           0 :             dblink_res_internalerror(conn, res, "begin error");
     434 GBC           7 :         PQclear(res);
     435 GIC           7 :         rconn->newXactForCursor = true;
     436 ECB             : 
     437                 :         /*
     438                 :          * Since transaction state was IDLE, we force cursor count to
     439                 :          * initially be 0. This is needed as a previous ABORT might have wiped
     440                 :          * out our transaction without maintaining the cursor count for us.
     441                 :          */
     442 CBC           7 :         rconn->openCursorCount = 0;
     443 ECB             :     }
     444                 : 
     445                 :     /* if we started a transaction, increment cursor count */
     446 GIC           9 :     if (rconn->newXactForCursor)
     447 CBC           9 :         (rconn->openCursorCount)++;
     448 ECB             : 
     449 CBC           9 :     appendStringInfo(&buf, "DECLARE %s CURSOR FOR %s", curname, sql);
     450 GIC           9 :     res = PQexec(conn, buf.data);
     451               9 :     if (!res || PQresultStatus(res) != PGRES_COMMAND_OK)
     452 ECB             :     {
     453 GIC           2 :         dblink_res_error(conn, conname, res, fail,
     454                 :                          "while opening cursor \"%s\"", curname);
     455 GBC           2 :         PG_RETURN_TEXT_P(cstring_to_text("ERROR"));
     456 EUB             :     }
     457                 : 
     458 GBC           7 :     PQclear(res);
     459 GIC           7 :     PG_RETURN_TEXT_P(cstring_to_text("OK"));
     460                 : }
     461 ECB             : 
     462 EUB             : /*
     463                 :  * closes a cursor
     464 ECB             :  */
     465 GIC           6 : PG_FUNCTION_INFO_V1(dblink_close);
     466 ECB             : Datum
     467 GIC           5 : dblink_close(PG_FUNCTION_ARGS)
     468                 : {
     469 ECB             :     PGconn     *conn;
     470 CBC           5 :     PGresult   *res = NULL;
     471 GIC           5 :     char       *curname = NULL;
     472 CBC           5 :     char       *conname = NULL;
     473                 :     StringInfoData buf;
     474               5 :     remoteConn *rconn = NULL;
     475 GIC           5 :     bool        fail = true;    /* default to backward compatible behavior */
     476                 : 
     477 CBC           5 :     dblink_init();
     478 GIC           5 :     initStringInfo(&buf);
     479                 : 
     480 CBC           5 :     if (PG_NARGS() == 1)
     481                 :     {
     482 ECB             :         /* text */
     483 UIC           0 :         curname = text_to_cstring(PG_GETARG_TEXT_PP(0));
     484               0 :         rconn = pconn;
     485 ECB             :     }
     486 GIC           5 :     else if (PG_NARGS() == 2)
     487 ECB             :     {
     488                 :         /* might be text,text or text,bool */
     489 CBC           5 :         if (get_fn_expr_argtype(fcinfo->flinfo, 1) == BOOLOID)
     490 ECB             :         {
     491 GBC           2 :             curname = text_to_cstring(PG_GETARG_TEXT_PP(0));
     492 CBC           2 :             fail = PG_GETARG_BOOL(1);
     493 GIC           2 :             rconn = pconn;
     494                 :         }
     495                 :         else
     496 ECB             :         {
     497 GIC           3 :             conname = text_to_cstring(PG_GETARG_TEXT_PP(0));
     498               3 :             curname = text_to_cstring(PG_GETARG_TEXT_PP(1));
     499               3 :             rconn = getConnectionByName(conname);
     500                 :         }
     501                 :     }
     502 CBC           5 :     if (PG_NARGS() == 3)
     503                 :     {
     504 ECB             :         /* text,text,bool */
     505 UIC           0 :         conname = text_to_cstring(PG_GETARG_TEXT_PP(0));
     506 LBC           0 :         curname = text_to_cstring(PG_GETARG_TEXT_PP(1));
     507               0 :         fail = PG_GETARG_BOOL(2);
     508               0 :         rconn = getConnectionByName(conname);
     509 ECB             :     }
     510                 : 
     511 CBC           5 :     if (!rconn || !rconn->conn)
     512 LBC           0 :         dblink_conn_not_avail(conname);
     513 ECB             : 
     514 GIC           5 :     conn = rconn->conn;
     515 ECB             : 
     516 GIC           5 :     appendStringInfo(&buf, "CLOSE %s", curname);
     517 ECB             : 
     518                 :     /* close the cursor */
     519 CBC           5 :     res = PQexec(conn, buf.data);
     520 GIC           5 :     if (!res || PQresultStatus(res) != PGRES_COMMAND_OK)
     521                 :     {
     522 CBC           1 :         dblink_res_error(conn, conname, res, fail,
     523 ECB             :                          "while closing cursor \"%s\"", curname);
     524 CBC           1 :         PG_RETURN_TEXT_P(cstring_to_text("ERROR"));
     525 ECB             :     }
     526                 : 
     527 CBC           4 :     PQclear(res);
     528 ECB             : 
     529                 :     /* if we started a transaction, decrement cursor count */
     530 GIC           4 :     if (rconn->newXactForCursor)
     531 ECB             :     {
     532 GIC           4 :         (rconn->openCursorCount)--;
     533                 : 
     534 ECB             :         /* if count is zero, commit the transaction */
     535 GIC           4 :         if (rconn->openCursorCount == 0)
     536 ECB             :         {
     537 CBC           2 :             rconn->newXactForCursor = false;
     538 ECB             : 
     539 CBC           2 :             res = PQexec(conn, "COMMIT");
     540 GIC           2 :             if (PQresultStatus(res) != PGRES_COMMAND_OK)
     541 UIC           0 :                 dblink_res_internalerror(conn, res, "commit error");
     542 GIC           2 :             PQclear(res);
     543 ECB             :         }
     544                 :     }
     545                 : 
     546 GIC           4 :     PG_RETURN_TEXT_P(cstring_to_text("OK"));
     547 ECB             : }
     548                 : 
     549                 : /*
     550                 :  * Fetch results from an open cursor
     551                 :  */
     552 CBC           9 : PG_FUNCTION_INFO_V1(dblink_fetch);
     553                 : Datum
     554 GIC          13 : dblink_fetch(PG_FUNCTION_ARGS)
     555 ECB             : {
     556 CBC          13 :     PGresult   *res = NULL;
     557              13 :     char       *conname = NULL;
     558 GIC          13 :     remoteConn *rconn = NULL;
     559              13 :     PGconn     *conn = NULL;
     560 ECB             :     StringInfoData buf;
     561 GBC          13 :     char       *curname = NULL;
     562 GIC          13 :     int         howmany = 0;
     563 CBC          13 :     bool        fail = true;    /* default to backward compatible */
     564 ECB             : 
     565 GIC          13 :     prepTuplestoreResult(fcinfo);
     566                 : 
     567              13 :     dblink_init();
     568                 : 
     569              13 :     if (PG_NARGS() == 4)
     570                 :     {
     571 ECB             :         /* text,text,int,bool */
     572 CBC           1 :         conname = text_to_cstring(PG_GETARG_TEXT_PP(0));
     573               1 :         curname = text_to_cstring(PG_GETARG_TEXT_PP(1));
     574               1 :         howmany = PG_GETARG_INT32(2);
     575 GIC           1 :         fail = PG_GETARG_BOOL(3);
     576 ECB             : 
     577 GIC           1 :         rconn = getConnectionByName(conname);
     578 CBC           1 :         if (rconn)
     579 GIC           1 :             conn = rconn->conn;
     580 ECB             :     }
     581 GIC          12 :     else if (PG_NARGS() == 3)
     582                 :     {
     583 EUB             :         /* text,text,int or text,int,bool */
     584 GBC           8 :         if (get_fn_expr_argtype(fcinfo->flinfo, 2) == BOOLOID)
     585                 :         {
     586 GIC           2 :             curname = text_to_cstring(PG_GETARG_TEXT_PP(0));
     587               2 :             howmany = PG_GETARG_INT32(1);
     588               2 :             fail = PG_GETARG_BOOL(2);
     589 CBC           2 :             conn = pconn->conn;
     590 ECB             :         }
     591                 :         else
     592                 :         {
     593 GIC           6 :             conname = text_to_cstring(PG_GETARG_TEXT_PP(0));
     594               6 :             curname = text_to_cstring(PG_GETARG_TEXT_PP(1));
     595               6 :             howmany = PG_GETARG_INT32(2);
     596 ECB             : 
     597 GIC           6 :             rconn = getConnectionByName(conname);
     598 CBC           6 :             if (rconn)
     599 GIC           6 :                 conn = rconn->conn;
     600 ECB             :         }
     601                 :     }
     602 GIC           4 :     else if (PG_NARGS() == 2)
     603 ECB             :     {
     604                 :         /* text,int */
     605 CBC           4 :         curname = text_to_cstring(PG_GETARG_TEXT_PP(0));
     606 GIC           4 :         howmany = PG_GETARG_INT32(1);
     607               4 :         conn = pconn->conn;
     608                 :     }
     609                 : 
     610              13 :     if (!conn)
     611 LBC           0 :         dblink_conn_not_avail(conname);
     612                 : 
     613 CBC          13 :     initStringInfo(&buf);
     614              13 :     appendStringInfo(&buf, "FETCH %d FROM %s", howmany, curname);
     615                 : 
     616                 :     /*
     617                 :      * Try to execute the query.  Note that since libpq uses malloc, the
     618 EUB             :      * PGresult will be long-lived even though we are still in a short-lived
     619                 :      * memory context.
     620                 :      */
     621 CBC          13 :     res = PQexec(conn, buf.data);
     622              26 :     if (!res ||
     623 GBC          26 :         (PQresultStatus(res) != PGRES_COMMAND_OK &&
     624 GIC          13 :          PQresultStatus(res) != PGRES_TUPLES_OK))
     625 ECB             :     {
     626 GIC           5 :         dblink_res_error(conn, conname, res, fail,
     627                 :                          "while fetching from cursor \"%s\"", curname);
     628 CBC           3 :         return (Datum) 0;
     629                 :     }
     630               8 :     else if (PQresultStatus(res) == PGRES_COMMAND_OK)
     631                 :     {
     632 ECB             :         /* cursor does not exist - closed already or bad name */
     633 UIC           0 :         PQclear(res);
     634               0 :         ereport(ERROR,
     635                 :                 (errcode(ERRCODE_INVALID_CURSOR_NAME),
     636 ECB             :                  errmsg("cursor \"%s\" does not exist", curname)));
     637                 :     }
     638                 : 
     639 CBC           8 :     materializeResult(fcinfo, conn, res);
     640 GIC           7 :     return (Datum) 0;
     641 ECB             : }
     642                 : 
     643                 : /*
     644                 :  * Note: this is the new preferred version of dblink
     645                 :  */
     646 GIC          11 : PG_FUNCTION_INFO_V1(dblink_record);
     647 ECB             : Datum
     648 CBC          25 : dblink_record(PG_FUNCTION_ARGS)
     649 ECB             : {
     650 GIC          25 :     return dblink_record_internal(fcinfo, false);
     651 ECB             : }
     652                 : 
     653 CBC           3 : PG_FUNCTION_INFO_V1(dblink_send_query);
     654                 : Datum
     655 GIC           6 : dblink_send_query(PG_FUNCTION_ARGS)
     656 ECB             : {
     657                 :     PGconn     *conn;
     658                 :     char       *sql;
     659                 :     int         retval;
     660                 : 
     661 CBC           6 :     if (PG_NARGS() == 2)
     662                 :     {
     663 GIC           6 :         conn = dblink_get_named_conn(text_to_cstring(PG_GETARG_TEXT_PP(0)));
     664 CBC           6 :         sql = text_to_cstring(PG_GETARG_TEXT_PP(1));
     665                 :     }
     666 ECB             :     else
     667                 :         /* shouldn't happen */
     668 LBC           0 :         elog(ERROR, "wrong number of arguments");
     669                 : 
     670                 :     /* async query send */
     671 GIC           6 :     retval = PQsendQuery(conn, sql);
     672 CBC           6 :     if (retval != 1)
     673 LBC           0 :         elog(NOTICE, "could not send query: %s", pchomp(PQerrorMessage(conn)));
     674 ECB             : 
     675 GIC           6 :     PG_RETURN_INT32(retval);
     676                 : }
     677 ECB             : 
     678 GIC           4 : PG_FUNCTION_INFO_V1(dblink_get_result);
     679                 : Datum
     680 CBC           8 : dblink_get_result(PG_FUNCTION_ARGS)
     681 ECB             : {
     682 GIC           8 :     return dblink_record_internal(fcinfo, true);
     683                 : }
     684                 : 
     685 EUB             : static Datum
     686 GIC          33 : dblink_record_internal(FunctionCallInfo fcinfo, bool is_async)
     687                 : {
     688              33 :     PGconn     *volatile conn = NULL;
     689              33 :     volatile bool freeconn = false;
     690 ECB             : 
     691 GIC          33 :     prepTuplestoreResult(fcinfo);
     692 ECB             : 
     693 GIC          33 :     dblink_init();
     694                 : 
     695 GBC          33 :     PG_TRY();
     696 EUB             :     {
     697 GIC          33 :         char       *sql = NULL;
     698 CBC          33 :         char       *conname = NULL;
     699 GIC          33 :         bool        fail = true;    /* default to backward compatible */
     700                 : 
     701 CBC          33 :         if (!is_async)
     702                 :         {
     703 GIC          25 :             if (PG_NARGS() == 3)
     704                 :             {
     705 EUB             :                 /* text,text,bool */
     706 GIC           1 :                 conname = text_to_cstring(PG_GETARG_TEXT_PP(0));
     707               1 :                 sql = text_to_cstring(PG_GETARG_TEXT_PP(1));
     708 CBC           1 :                 fail = PG_GETARG_BOOL(2);
     709               1 :                 dblink_get_conn(conname, &conn, &conname, &freeconn);
     710                 :             }
     711              24 :             else if (PG_NARGS() == 2)
     712                 :             {
     713                 :                 /* text,text or text,bool */
     714              17 :                 if (get_fn_expr_argtype(fcinfo->flinfo, 1) == BOOLOID)
     715                 :                 {
     716 GIC           1 :                     sql = text_to_cstring(PG_GETARG_TEXT_PP(0));
     717               1 :                     fail = PG_GETARG_BOOL(1);
     718               1 :                     conn = pconn->conn;
     719 ECB             :                 }
     720                 :                 else
     721                 :                 {
     722 CBC          16 :                     conname = text_to_cstring(PG_GETARG_TEXT_PP(0));
     723 GIC          16 :                     sql = text_to_cstring(PG_GETARG_TEXT_PP(1));
     724 CBC          16 :                     dblink_get_conn(conname, &conn, &conname, &freeconn);
     725 ECB             :                 }
     726                 :             }
     727 GBC           7 :             else if (PG_NARGS() == 1)
     728                 :             {
     729                 :                 /* text */
     730 GIC           7 :                 conn = pconn->conn;
     731               7 :                 sql = text_to_cstring(PG_GETARG_TEXT_PP(0));
     732                 :             }
     733 ECB             :             else
     734                 :                 /* shouldn't happen */
     735 UIC           0 :                 elog(ERROR, "wrong number of arguments");
     736                 :         }
     737                 :         else                    /* is_async */
     738 ECB             :         {
     739                 :             /* get async result */
     740 GIC           8 :             conname = text_to_cstring(PG_GETARG_TEXT_PP(0));
     741 ECB             : 
     742 CBC           8 :             if (PG_NARGS() == 2)
     743                 :             {
     744 ECB             :                 /* text,bool */
     745 UIC           0 :                 fail = PG_GETARG_BOOL(1);
     746 LBC           0 :                 conn = dblink_get_named_conn(conname);
     747                 :             }
     748 GIC           8 :             else if (PG_NARGS() == 1)
     749                 :             {
     750                 :                 /* text */
     751               8 :                 conn = dblink_get_named_conn(conname);
     752                 :             }
     753                 :             else
     754                 :                 /* shouldn't happen */
     755 UIC           0 :                 elog(ERROR, "wrong number of arguments");
     756 ECB             :         }
     757                 : 
     758 CBC          31 :         if (!conn)
     759 GIC           2 :             dblink_conn_not_avail(conname);
     760                 : 
     761 CBC          29 :         if (!is_async)
     762 EUB             :         {
     763                 :             /* synchronous query, use efficient tuple collection method */
     764 GIC          21 :             materializeQueryResult(fcinfo, conn, conname, sql, fail);
     765 ECB             :         }
     766 EUB             :         else
     767                 :         {
     768                 :             /* async result retrieval, do it the old way */
     769 GIC           8 :             PGresult   *res = PQgetResult(conn);
     770                 : 
     771 ECB             :             /* NULL means we're all done with the async results */
     772 GIC           8 :             if (res)
     773                 :             {
     774 CBC           5 :                 if (PQresultStatus(res) != PGRES_COMMAND_OK &&
     775               5 :                     PQresultStatus(res) != PGRES_TUPLES_OK)
     776 ECB             :                 {
     777 UIC           0 :                     dblink_res_error(conn, conname, res, fail,
     778                 :                                      "while executing query");
     779                 :                     /* if fail isn't set, we'll return an empty query result */
     780                 :                 }
     781                 :                 else
     782                 :                 {
     783 GIC           5 :                     materializeResult(fcinfo, conn, res);
     784 ECB             :                 }
     785                 :             }
     786                 :         }
     787                 :     }
     788 GIC           4 :     PG_FINALLY();
     789 ECB             :     {
     790                 :         /* if needed, close the connection to the database */
     791 CBC          33 :         if (freeconn)
     792 GNC           3 :             libpqsrv_disconnect(conn);
     793                 :     }
     794 GIC          33 :     PG_END_TRY();
     795 ECB             : 
     796 GIC          29 :     return (Datum) 0;
     797 EUB             : }
     798                 : 
     799                 : /*
     800                 :  * Verify function caller can handle a tuplestore result, and set up for that.
     801                 :  *
     802                 :  * Note: if the caller returns without actually creating a tuplestore, the
     803                 :  * executor will treat the function result as an empty set.
     804                 :  */
     805                 : static void
     806 GBC          46 : prepTuplestoreResult(FunctionCallInfo fcinfo)
     807 EUB             : {
     808 GIC          46 :     ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
     809                 : 
     810                 :     /* check to see if query supports us returning a tuplestore */
     811 CBC          46 :     if (rsinfo == NULL || !IsA(rsinfo, ReturnSetInfo))
     812 UIC           0 :         ereport(ERROR,
     813 ECB             :                 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
     814                 :                  errmsg("set-valued function called in context that cannot accept a set")));
     815 GIC          46 :     if (!(rsinfo->allowedModes & SFRM_Materialize))
     816 LBC           0 :         ereport(ERROR,
     817                 :                 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
     818 ECB             :                  errmsg("materialize mode required, but it is not allowed in this context")));
     819                 : 
     820                 :     /* let the executor know we're sending back a tuplestore */
     821 GBC          46 :     rsinfo->returnMode = SFRM_Materialize;
     822                 : 
     823 EUB             :     /* caller must fill these to return a non-empty result */
     824 GIC          46 :     rsinfo->setResult = NULL;
     825              46 :     rsinfo->setDesc = NULL;
     826              46 : }
     827                 : 
     828 EUB             : /*
     829                 :  * Copy the contents of the PGresult into a tuplestore to be returned
     830                 :  * as the result of the current function.
     831                 :  * The PGresult will be released in this function.
     832                 :  */
     833                 : static void
     834 GIC          13 : materializeResult(FunctionCallInfo fcinfo, PGconn *conn, PGresult *res)
     835 ECB             : {
     836 CBC          13 :     ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
     837 ECB             : 
     838                 :     /* prepTuplestoreResult must have been called previously */
     839 GIC          13 :     Assert(rsinfo->returnMode == SFRM_Materialize);
     840                 : 
     841              13 :     PG_TRY();
     842                 :     {
     843 ECB             :         TupleDesc   tupdesc;
     844 EUB             :         bool        is_sql_cmd;
     845                 :         int         ntuples;
     846                 :         int         nfields;
     847                 : 
     848 GIC          13 :         if (PQresultStatus(res) == PGRES_COMMAND_OK)
     849 ECB             :         {
     850 UIC           0 :             is_sql_cmd = true;
     851                 : 
     852 ECB             :             /*
     853                 :              * need a tuple descriptor representing one TEXT column to return
     854                 :              * the command status string as our result tuple
     855                 :              */
     856 UIC           0 :             tupdesc = CreateTemplateTupleDesc(1);
     857               0 :             TupleDescInitEntry(tupdesc, (AttrNumber) 1, "status",
     858 ECB             :                                TEXTOID, -1, 0);
     859 UIC           0 :             ntuples = 1;
     860               0 :             nfields = 1;
     861 ECB             :         }
     862                 :         else
     863                 :         {
     864 CBC          13 :             Assert(PQresultStatus(res) == PGRES_TUPLES_OK);
     865 ECB             : 
     866 CBC          13 :             is_sql_cmd = false;
     867 ECB             : 
     868                 :             /* get a tuple descriptor for our result type */
     869 GIC          13 :             switch (get_call_result_type(fcinfo, NULL, &tupdesc))
     870 ECB             :             {
     871 GIC          13 :                 case TYPEFUNC_COMPOSITE:
     872                 :                     /* success */
     873 CBC          13 :                     break;
     874 UIC           0 :                 case TYPEFUNC_RECORD:
     875                 :                     /* failed to determine actual type of RECORD */
     876               0 :                     ereport(ERROR,
     877 ECB             :                             (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
     878                 :                              errmsg("function returning record called in context "
     879                 :                                     "that cannot accept type record")));
     880                 :                     break;
     881 LBC           0 :                 default:
     882                 :                     /* result type isn't composite */
     883               0 :                     elog(ERROR, "return type must be a row type");
     884 EUB             :                     break;
     885                 :             }
     886 ECB             : 
     887                 :             /* make sure we have a persistent copy of the tupdesc */
     888 GIC          13 :             tupdesc = CreateTupleDescCopy(tupdesc);
     889              13 :             ntuples = PQntuples(res);
     890              13 :             nfields = PQnfields(res);
     891 EUB             :         }
     892                 : 
     893                 :         /*
     894                 :          * check result and tuple descriptor have the same number of columns
     895 ECB             :          */
     896 CBC          13 :         if (nfields != tupdesc->natts)
     897 UIC           0 :             ereport(ERROR,
     898                 :                     (errcode(ERRCODE_DATATYPE_MISMATCH),
     899                 :                      errmsg("remote query result rowtype does not match "
     900 ECB             :                             "the specified FROM clause rowtype")));
     901                 : 
     902 GIC          13 :         if (ntuples > 0)
     903 ECB             :         {
     904                 :             AttInMetadata *attinmeta;
     905 GIC          13 :             int         nestlevel = -1;
     906 ECB             :             Tuplestorestate *tupstore;
     907                 :             MemoryContext oldcontext;
     908                 :             int         row;
     909                 :             char      **values;
     910                 : 
     911 GIC          13 :             attinmeta = TupleDescGetAttInMetadata(tupdesc);
     912                 : 
     913                 :             /* Set GUCs to ensure we read GUC-sensitive data types correctly */
     914              13 :             if (!is_sql_cmd)
     915              13 :                 nestlevel = applyRemoteGucs(conn);
     916                 : 
     917              13 :             oldcontext = MemoryContextSwitchTo(rsinfo->econtext->ecxt_per_query_memory);
     918              13 :             tupstore = tuplestore_begin_heap(true, false, work_mem);
     919              13 :             rsinfo->setResult = tupstore;
     920 CBC          13 :             rsinfo->setDesc = tupdesc;
     921 GIC          13 :             MemoryContextSwitchTo(oldcontext);
     922                 : 
     923 GNC          13 :             values = palloc_array(char *, nfields);
     924                 : 
     925                 :             /* put all tuples into the tuplestore */
     926 CBC          49 :             for (row = 0; row < ntuples; row++)
     927 ECB             :             {
     928                 :                 HeapTuple   tuple;
     929                 : 
     930 GIC          37 :                 if (!is_sql_cmd)
     931 ECB             :                 {
     932                 :                     int         i;
     933                 : 
     934 GIC         138 :                     for (i = 0; i < nfields; i++)
     935 ECB             :                     {
     936 GIC         101 :                         if (PQgetisnull(res, row, i))
     937 UIC           0 :                             values[i] = NULL;
     938 ECB             :                         else
     939 GIC         101 :                             values[i] = PQgetvalue(res, row, i);
     940                 :                     }
     941                 :                 }
     942                 :                 else
     943 ECB             :                 {
     944 UIC           0 :                     values[0] = PQcmdStatus(res);
     945 ECB             :                 }
     946                 : 
     947                 :                 /* build the tuple and put it into the tuplestore. */
     948 CBC          37 :                 tuple = BuildTupleFromCStrings(attinmeta, values);
     949 GIC          36 :                 tuplestore_puttuple(tupstore, tuple);
     950                 :             }
     951                 : 
     952                 :             /* clean up GUC settings, if we changed any */
     953 CBC          12 :             restoreLocalGucs(nestlevel);
     954                 :         }
     955 ECB             :     }
     956 CBC           1 :     PG_FINALLY();
     957                 :     {
     958                 :         /* be sure to release the libpq result */
     959 GIC          13 :         PQclear(res);
     960 ECB             :     }
     961 GIC          13 :     PG_END_TRY();
     962              12 : }
     963                 : 
     964                 : /*
     965                 :  * Execute the given SQL command and store its results into a tuplestore
     966                 :  * to be returned as the result of the current function.
     967                 :  *
     968                 :  * This is equivalent to PQexec followed by materializeResult, but we make
     969                 :  * use of libpq's single-row mode to avoid accumulating the whole result
     970                 :  * inside libpq before it gets transferred to the tuplestore.
     971                 :  */
     972                 : static void
     973              21 : materializeQueryResult(FunctionCallInfo fcinfo,
     974                 :                        PGconn *conn,
     975                 :                        const char *conname,
     976                 :                        const char *sql,
     977 EUB             :                        bool fail)
     978                 : {
     979 GIC          21 :     ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
     980 GBC          21 :     PGresult   *volatile res = NULL;
     981 GIC          21 :     volatile storeInfo sinfo = {0};
     982 EUB             : 
     983                 :     /* prepTuplestoreResult must have been called previously */
     984 GBC          21 :     Assert(rsinfo->returnMode == SFRM_Materialize);
     985 EUB             : 
     986 GBC          21 :     sinfo.fcinfo = fcinfo;
     987                 : 
     988              21 :     PG_TRY();
     989                 :     {
     990                 :         /* Create short-lived memory context for data conversions */
     991              21 :         sinfo.tmpcontext = AllocSetContextCreate(CurrentMemoryContext,
     992 EUB             :                                                  "dblink temporary context",
     993                 :                                                  ALLOCSET_DEFAULT_SIZES);
     994                 : 
     995                 :         /* execute query, collecting any tuples into the tuplestore */
     996 GIC          21 :         res = storeQueryResult(&sinfo, conn, sql);
     997                 : 
     998              21 :         if (!res ||
     999 CBC          21 :             (PQresultStatus(res) != PGRES_COMMAND_OK &&
    1000 GIC          21 :              PQresultStatus(res) != PGRES_TUPLES_OK))
    1001 CBC           2 :         {
    1002                 :             /*
    1003 ECB             :              * dblink_res_error will clear the passed PGresult, so we need
    1004                 :              * this ugly dance to avoid doing so twice during error exit
    1005                 :              */
    1006 GIC           2 :             PGresult   *res1 = res;
    1007                 : 
    1008 CBC           2 :             res = NULL;
    1009               2 :             dblink_res_error(conn, conname, res1, fail,
    1010 ECB             :                              "while executing query");
    1011                 :             /* if fail isn't set, we'll return an empty query result */
    1012                 :         }
    1013 CBC          19 :         else if (PQresultStatus(res) == PGRES_COMMAND_OK)
    1014 ECB             :         {
    1015                 :             /*
    1016                 :              * storeRow didn't get called, so we need to convert the command
    1017 EUB             :              * status string to a tuple manually
    1018                 :              */
    1019                 :             TupleDesc   tupdesc;
    1020                 :             AttInMetadata *attinmeta;
    1021                 :             Tuplestorestate *tupstore;
    1022                 :             HeapTuple   tuple;
    1023                 :             char       *values[1];
    1024                 :             MemoryContext oldcontext;
    1025                 : 
    1026                 :             /*
    1027                 :              * need a tuple descriptor representing one TEXT column to return
    1028 ECB             :              * the command status string as our result tuple
    1029                 :              */
    1030 UIC           0 :             tupdesc = CreateTemplateTupleDesc(1);
    1031               0 :             TupleDescInitEntry(tupdesc, (AttrNumber) 1, "status",
    1032                 :                                TEXTOID, -1, 0);
    1033               0 :             attinmeta = TupleDescGetAttInMetadata(tupdesc);
    1034                 : 
    1035 LBC           0 :             oldcontext = MemoryContextSwitchTo(rsinfo->econtext->ecxt_per_query_memory);
    1036 UIC           0 :             tupstore = tuplestore_begin_heap(true, false, work_mem);
    1037 LBC           0 :             rsinfo->setResult = tupstore;
    1038               0 :             rsinfo->setDesc = tupdesc;
    1039 UIC           0 :             MemoryContextSwitchTo(oldcontext);
    1040                 : 
    1041 LBC           0 :             values[0] = PQcmdStatus(res);
    1042 EUB             : 
    1043                 :             /* build the tuple and put it into the tuplestore. */
    1044 LBC           0 :             tuple = BuildTupleFromCStrings(attinmeta, values);
    1045 UBC           0 :             tuplestore_puttuple(tupstore, tuple);
    1046                 : 
    1047 UIC           0 :             PQclear(res);
    1048               0 :             res = NULL;
    1049 ECB             :         }
    1050                 :         else
    1051                 :         {
    1052 CBC          19 :             Assert(PQresultStatus(res) == PGRES_TUPLES_OK);
    1053 ECB             :             /* storeRow should have created a tuplestore */
    1054 GIC          19 :             Assert(rsinfo->setResult != NULL);
    1055 ECB             : 
    1056 GIC          19 :             PQclear(res);
    1057              19 :             res = NULL;
    1058                 :         }
    1059                 : 
    1060                 :         /* clean up data conversion short-lived memory context */
    1061              21 :         if (sinfo.tmpcontext != NULL)
    1062              21 :             MemoryContextDelete(sinfo.tmpcontext);
    1063              21 :         sinfo.tmpcontext = NULL;
    1064 ECB             : 
    1065 CBC          21 :         PQclear(sinfo.last_res);
    1066 GIC          21 :         sinfo.last_res = NULL;
    1067 CBC          21 :         PQclear(sinfo.cur_res);
    1068 GIC          21 :         sinfo.cur_res = NULL;
    1069 ECB             :     }
    1070 LBC           0 :     PG_CATCH();
    1071 ECB             :     {
    1072                 :         /* be sure to release any libpq result we collected */
    1073 UIC           0 :         PQclear(res);
    1074               0 :         PQclear(sinfo.last_res);
    1075               0 :         PQclear(sinfo.cur_res);
    1076 ECB             :         /* and clear out any pending data in libpq */
    1077 UBC           0 :         while ((res = PQgetResult(conn)) != NULL)
    1078 UIC           0 :             PQclear(res);
    1079               0 :         PG_RE_THROW();
    1080 ECB             :     }
    1081 CBC          21 :     PG_END_TRY();
    1082              21 : }
    1083 ECB             : 
    1084                 : /*
    1085                 :  * Execute query, and send any result rows to sinfo->tuplestore.
    1086                 :  */
    1087                 : static PGresult *
    1088 CBC          21 : storeQueryResult(volatile storeInfo *sinfo, PGconn *conn, const char *sql)
    1089                 : {
    1090 GIC          21 :     bool        first = true;
    1091 CBC          21 :     int         nestlevel = -1;
    1092 ECB             :     PGresult   *res;
    1093                 : 
    1094 GIC          21 :     if (!PQsendQuery(conn, sql))
    1095 UIC           0 :         elog(ERROR, "could not send query: %s", pchomp(PQerrorMessage(conn)));
    1096                 : 
    1097 GIC          21 :     if (!PQsetSingleRowMode(conn))  /* shouldn't fail */
    1098 UIC           0 :         elog(ERROR, "failed to set single-row mode for dblink query");
    1099                 : 
    1100                 :     for (;;)
    1101                 :     {
    1102 GIC         174 :         CHECK_FOR_INTERRUPTS();
    1103 ECB             : 
    1104 GIC         174 :         sinfo->cur_res = PQgetResult(conn);
    1105 CBC         174 :         if (!sinfo->cur_res)
    1106 GIC          21 :             break;
    1107                 : 
    1108             153 :         if (PQresultStatus(sinfo->cur_res) == PGRES_SINGLE_TUPLE)
    1109                 :         {
    1110 ECB             :             /* got one row from possibly-bigger resultset */
    1111                 : 
    1112                 :             /*
    1113                 :              * Set GUCs to ensure we read GUC-sensitive data types correctly.
    1114                 :              * We shouldn't do this until we have a row in hand, to ensure
    1115                 :              * libpq has seen any earlier ParameterStatus protocol messages.
    1116                 :              */
    1117 GIC         132 :             if (first && nestlevel < 0)
    1118              19 :                 nestlevel = applyRemoteGucs(conn);
    1119                 : 
    1120             132 :             storeRow(sinfo, sinfo->cur_res, first);
    1121 ECB             : 
    1122 GBC         132 :             PQclear(sinfo->cur_res);
    1123 CBC         132 :             sinfo->cur_res = NULL;
    1124 GIC         132 :             first = false;
    1125                 :         }
    1126 ECB             :         else
    1127                 :         {
    1128                 :             /* if empty resultset, fill tuplestore header */
    1129 GIC          21 :             if (first && PQresultStatus(sinfo->cur_res) == PGRES_TUPLES_OK)
    1130 LBC           0 :                 storeRow(sinfo, sinfo->cur_res, first);
    1131 EUB             : 
    1132                 :             /* store completed result at last_res */
    1133 GBC          21 :             PQclear(sinfo->last_res);
    1134 GIC          21 :             sinfo->last_res = sinfo->cur_res;
    1135              21 :             sinfo->cur_res = NULL;
    1136              21 :             first = true;
    1137                 :         }
    1138 EUB             :     }
    1139                 : 
    1140                 :     /* clean up GUC settings, if we changed any */
    1141 GIC          21 :     restoreLocalGucs(nestlevel);
    1142                 : 
    1143                 :     /* return last_res */
    1144              21 :     res = sinfo->last_res;
    1145 CBC          21 :     sinfo->last_res = NULL;
    1146 GIC          21 :     return res;
    1147                 : }
    1148 ECB             : 
    1149 EUB             : /*
    1150                 :  * Send single row to sinfo->tuplestore.
    1151                 :  *
    1152                 :  * If "first" is true, create the tuplestore using PGresult's metadata
    1153                 :  * (in this case the PGresult might contain either zero or one row).
    1154                 :  */
    1155 ECB             : static void
    1156 GIC         132 : storeRow(volatile storeInfo *sinfo, PGresult *res, bool first)
    1157                 : {
    1158 CBC         132 :     int         nfields = PQnfields(res);
    1159 ECB             :     HeapTuple   tuple;
    1160                 :     int         i;
    1161                 :     MemoryContext oldcontext;
    1162                 : 
    1163 GIC         132 :     if (first)
    1164                 :     {
    1165 ECB             :         /* Prepare for new result set */
    1166 GBC          19 :         ReturnSetInfo *rsinfo = (ReturnSetInfo *) sinfo->fcinfo->resultinfo;
    1167                 :         TupleDesc   tupdesc;
    1168                 : 
    1169                 :         /*
    1170                 :          * It's possible to get more than one result set if the query string
    1171                 :          * contained multiple SQL commands.  In that case, we follow PQexec's
    1172 ECB             :          * traditional behavior of throwing away all but the last result.
    1173 EUB             :          */
    1174 CBC          19 :         if (sinfo->tuplestore)
    1175 UIC           0 :             tuplestore_end(sinfo->tuplestore);
    1176 GIC          19 :         sinfo->tuplestore = NULL;
    1177                 : 
    1178 ECB             :         /* get a tuple descriptor for our result type */
    1179 GIC          19 :         switch (get_call_result_type(sinfo->fcinfo, NULL, &tupdesc))
    1180                 :         {
    1181              19 :             case TYPEFUNC_COMPOSITE:
    1182                 :                 /* success */
    1183              19 :                 break;
    1184 UIC           0 :             case TYPEFUNC_RECORD:
    1185 ECB             :                 /* failed to determine actual type of RECORD */
    1186 UIC           0 :                 ereport(ERROR,
    1187                 :                         (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
    1188                 :                          errmsg("function returning record called in context "
    1189                 :                                 "that cannot accept type record")));
    1190 ECB             :                 break;
    1191 UIC           0 :             default:
    1192 ECB             :                 /* result type isn't composite */
    1193 UBC           0 :                 elog(ERROR, "return type must be a row type");
    1194                 :                 break;
    1195 ECB             :         }
    1196                 : 
    1197                 :         /* make sure we have a persistent copy of the tupdesc */
    1198 GIC          19 :         tupdesc = CreateTupleDescCopy(tupdesc);
    1199 ECB             : 
    1200                 :         /* check result and tuple descriptor have the same number of columns */
    1201 CBC          19 :         if (nfields != tupdesc->natts)
    1202 UIC           0 :             ereport(ERROR,
    1203                 :                     (errcode(ERRCODE_DATATYPE_MISMATCH),
    1204 ECB             :                      errmsg("remote query result rowtype does not match "
    1205                 :                             "the specified FROM clause rowtype")));
    1206                 : 
    1207                 :         /* Prepare attinmeta for later data conversions */
    1208 GIC          19 :         sinfo->attinmeta = TupleDescGetAttInMetadata(tupdesc);
    1209                 : 
    1210                 :         /* Create a new, empty tuplestore */
    1211              19 :         oldcontext = MemoryContextSwitchTo(rsinfo->econtext->ecxt_per_query_memory);
    1212              19 :         sinfo->tuplestore = tuplestore_begin_heap(true, false, work_mem);
    1213 CBC          19 :         rsinfo->setResult = sinfo->tuplestore;
    1214 GIC          19 :         rsinfo->setDesc = tupdesc;
    1215 CBC          19 :         MemoryContextSwitchTo(oldcontext);
    1216                 : 
    1217                 :         /* Done if empty resultset */
    1218 GIC          19 :         if (PQntuples(res) == 0)
    1219 LBC           0 :             return;
    1220                 : 
    1221 ECB             :         /*
    1222                 :          * Set up sufficiently-wide string pointers array; this won't change
    1223                 :          * in size so it's easy to preallocate.
    1224                 :          */
    1225 GIC          19 :         if (sinfo->cstrs)
    1226 UIC           0 :             pfree(sinfo->cstrs);
    1227 GNC          19 :         sinfo->cstrs = palloc_array(char *, nfields);
    1228 ECB             :     }
    1229                 : 
    1230                 :     /* Should have a single-row result if we get here */
    1231 GIC         132 :     Assert(PQntuples(res) == 1);
    1232                 : 
    1233 ECB             :     /*
    1234                 :      * Do the following work in a temp context that we reset after each tuple.
    1235                 :      * This cleans up not only the data we have direct access to, but any
    1236                 :      * cruft the I/O functions might leak.
    1237 EUB             :      */
    1238 GIC         132 :     oldcontext = MemoryContextSwitchTo(sinfo->tmpcontext);
    1239                 : 
    1240                 :     /*
    1241                 :      * Fill cstrs with null-terminated strings of column values.
    1242                 :      */
    1243             510 :     for (i = 0; i < nfields; i++)
    1244                 :     {
    1245             378 :         if (PQgetisnull(res, 0, i))
    1246 UIC           0 :             sinfo->cstrs[i] = NULL;
    1247                 :         else
    1248 CBC         378 :             sinfo->cstrs[i] = PQgetvalue(res, 0, i);
    1249                 :     }
    1250 ECB             : 
    1251                 :     /* Convert row to a tuple, and add it to the tuplestore */
    1252 GIC         132 :     tuple = BuildTupleFromCStrings(sinfo->attinmeta, sinfo->cstrs);
    1253                 : 
    1254 CBC         132 :     tuplestore_puttuple(sinfo->tuplestore, tuple);
    1255 ECB             : 
    1256                 :     /* Clean up */
    1257 CBC         132 :     MemoryContextSwitchTo(oldcontext);
    1258             132 :     MemoryContextReset(sinfo->tmpcontext);
    1259                 : }
    1260                 : 
    1261                 : /*
    1262                 :  * List all open dblink connections by name.
    1263                 :  * Returns an array of all connection names.
    1264                 :  * Takes no params
    1265                 :  */
    1266 GIC           2 : PG_FUNCTION_INFO_V1(dblink_get_connections);
    1267                 : Datum
    1268               1 : dblink_get_connections(PG_FUNCTION_ARGS)
    1269                 : {
    1270                 :     HASH_SEQ_STATUS status;
    1271                 :     remoteConnHashEnt *hentry;
    1272 CBC           1 :     ArrayBuildState *astate = NULL;
    1273                 : 
    1274               1 :     if (remoteConnHash)
    1275                 :     {
    1276 GIC           1 :         hash_seq_init(&status, remoteConnHash);
    1277               4 :         while ((hentry = (remoteConnHashEnt *) hash_seq_search(&status)) != NULL)
    1278                 :         {
    1279                 :             /* stash away current value */
    1280               3 :             astate = accumArrayResult(astate,
    1281 CBC           3 :                                       CStringGetTextDatum(hentry->name),
    1282 ECB             :                                       false, TEXTOID, CurrentMemoryContext);
    1283                 :         }
    1284                 :     }
    1285                 : 
    1286 CBC           1 :     if (astate)
    1287 GNC           1 :         PG_RETURN_DATUM(makeArrayResult(astate,
    1288 ECB             :                                               CurrentMemoryContext));
    1289                 :     else
    1290 UIC           0 :         PG_RETURN_NULL();
    1291 EUB             : }
    1292                 : 
    1293                 : /*
    1294                 :  * Checks if a given remote connection is busy
    1295                 :  *
    1296                 :  * Returns 1 if the connection is busy, 0 otherwise
    1297                 :  * Params:
    1298                 :  *  text connection_name - name of the connection to check
    1299                 :  *
    1300                 :  */
    1301 GIC           2 : PG_FUNCTION_INFO_V1(dblink_is_busy);
    1302                 : Datum
    1303               1 : dblink_is_busy(PG_FUNCTION_ARGS)
    1304                 : {
    1305 ECB             :     PGconn     *conn;
    1306                 : 
    1307 CBC           1 :     dblink_init();
    1308 GIC           1 :     conn = dblink_get_named_conn(text_to_cstring(PG_GETARG_TEXT_PP(0)));
    1309                 : 
    1310               1 :     PQconsumeInput(conn);
    1311               1 :     PG_RETURN_INT32(PQisBusy(conn));
    1312 ECB             : }
    1313                 : 
    1314                 : /*
    1315                 :  * Cancels a running request on a connection
    1316                 :  *
    1317                 :  * Returns text:
    1318                 :  *  "OK" if the cancel request has been sent correctly,
    1319 EUB             :  *      an error message otherwise
    1320                 :  *
    1321                 :  * Params:
    1322                 :  *  text connection_name - name of the connection to check
    1323                 :  *
    1324                 :  */
    1325 CBC           2 : PG_FUNCTION_INFO_V1(dblink_cancel_query);
    1326                 : Datum
    1327               1 : dblink_cancel_query(PG_FUNCTION_ARGS)
    1328                 : {
    1329 ECB             :     int         res;
    1330                 :     PGconn     *conn;
    1331                 :     PGcancel   *cancel;
    1332                 :     char        errbuf[256];
    1333                 : 
    1334 GIC           1 :     dblink_init();
    1335 CBC           1 :     conn = dblink_get_named_conn(text_to_cstring(PG_GETARG_TEXT_PP(0)));
    1336 GIC           1 :     cancel = PQgetCancel(conn);
    1337 ECB             : 
    1338 CBC           1 :     res = PQcancel(cancel, errbuf, 256);
    1339               1 :     PQfreeCancel(cancel);
    1340 ECB             : 
    1341 GIC           1 :     if (res == 1)
    1342 CBC           1 :         PG_RETURN_TEXT_P(cstring_to_text("OK"));
    1343                 :     else
    1344 UIC           0 :         PG_RETURN_TEXT_P(cstring_to_text(errbuf));
    1345 EUB             : }
    1346                 : 
    1347                 : 
    1348                 : /*
    1349                 :  * Get error message from a connection
    1350 ECB             :  *
    1351                 :  * Returns text:
    1352                 :  *  "OK" if no error, an error message otherwise
    1353                 :  *
    1354                 :  * Params:
    1355                 :  *  text connection_name - name of the connection to check
    1356                 :  *
    1357                 :  */
    1358 GIC           2 : PG_FUNCTION_INFO_V1(dblink_error_message);
    1359                 : Datum
    1360               1 : dblink_error_message(PG_FUNCTION_ARGS)
    1361 ECB             : {
    1362                 :     char       *msg;
    1363                 :     PGconn     *conn;
    1364                 : 
    1365 GIC           1 :     dblink_init();
    1366 CBC           1 :     conn = dblink_get_named_conn(text_to_cstring(PG_GETARG_TEXT_PP(0)));
    1367                 : 
    1368 GIC           1 :     msg = PQerrorMessage(conn);
    1369 CBC           1 :     if (msg == NULL || msg[0] == '\0')
    1370               1 :         PG_RETURN_TEXT_P(cstring_to_text("OK"));
    1371                 :     else
    1372 UIC           0 :         PG_RETURN_TEXT_P(cstring_to_text(pchomp(msg)));
    1373                 : }
    1374 EUB             : 
    1375                 : /*
    1376 ECB             :  * Execute an SQL non-SELECT command
    1377 EUB             :  */
    1378 GIC           9 : PG_FUNCTION_INFO_V1(dblink_exec);
    1379 ECB             : Datum
    1380 CBC          26 : dblink_exec(PG_FUNCTION_ARGS)
    1381 ECB             : {
    1382 CBC          26 :     text       *volatile sql_cmd_status = NULL;
    1383 GIC          26 :     PGconn     *volatile conn = NULL;
    1384 CBC          26 :     volatile bool freeconn = false;
    1385                 : 
    1386 GIC          26 :     dblink_init();
    1387                 : 
    1388              26 :     PG_TRY();
    1389                 :     {
    1390              26 :         PGresult   *res = NULL;
    1391 CBC          26 :         char       *sql = NULL;
    1392 GIC          26 :         char       *conname = NULL;
    1393 CBC          26 :         bool        fail = true;    /* default to backward compatible behavior */
    1394                 : 
    1395 GIC          26 :         if (PG_NARGS() == 3)
    1396                 :         {
    1397                 :             /* must be text,text,bool */
    1398 UIC           0 :             conname = text_to_cstring(PG_GETARG_TEXT_PP(0));
    1399 LBC           0 :             sql = text_to_cstring(PG_GETARG_TEXT_PP(1));
    1400               0 :             fail = PG_GETARG_BOOL(2);
    1401 UIC           0 :             dblink_get_conn(conname, &conn, &conname, &freeconn);
    1402                 :         }
    1403 GIC          26 :         else if (PG_NARGS() == 2)
    1404 EUB             :         {
    1405                 :             /* might be text,text or text,bool */
    1406 GIC          17 :             if (get_fn_expr_argtype(fcinfo->flinfo, 1) == BOOLOID)
    1407                 :             {
    1408               1 :                 sql = text_to_cstring(PG_GETARG_TEXT_PP(0));
    1409               1 :                 fail = PG_GETARG_BOOL(1);
    1410 CBC           1 :                 conn = pconn->conn;
    1411                 :             }
    1412                 :             else
    1413 ECB             :             {
    1414 CBC          16 :                 conname = text_to_cstring(PG_GETARG_TEXT_PP(0));
    1415 GIC          16 :                 sql = text_to_cstring(PG_GETARG_TEXT_PP(1));
    1416 CBC          16 :                 dblink_get_conn(conname, &conn, &conname, &freeconn);
    1417                 :             }
    1418 ECB             :         }
    1419 GIC           9 :         else if (PG_NARGS() == 1)
    1420                 :         {
    1421                 :             /* must be single text argument */
    1422               9 :             conn = pconn->conn;
    1423               9 :             sql = text_to_cstring(PG_GETARG_TEXT_PP(0));
    1424                 :         }
    1425                 :         else
    1426                 :             /* shouldn't happen */
    1427 UIC           0 :             elog(ERROR, "wrong number of arguments");
    1428 ECB             : 
    1429 GIC          26 :         if (!conn)
    1430 LBC           0 :             dblink_conn_not_avail(conname);
    1431                 : 
    1432 GIC          26 :         res = PQexec(conn, sql);
    1433              26 :         if (!res ||
    1434              26 :             (PQresultStatus(res) != PGRES_COMMAND_OK &&
    1435               2 :              PQresultStatus(res) != PGRES_TUPLES_OK))
    1436                 :         {
    1437               2 :             dblink_res_error(conn, conname, res, fail,
    1438                 :                              "while executing command");
    1439                 : 
    1440                 :             /*
    1441 ECB             :              * and save a copy of the command status string to return as our
    1442                 :              * result tuple
    1443                 :              */
    1444 GIC           1 :             sql_cmd_status = cstring_to_text("ERROR");
    1445                 :         }
    1446              24 :         else if (PQresultStatus(res) == PGRES_COMMAND_OK)
    1447 ECB             :         {
    1448                 :             /*
    1449                 :              * and save a copy of the command status string to return as our
    1450                 :              * result tuple
    1451                 :              */
    1452 CBC          24 :             sql_cmd_status = cstring_to_text(PQcmdStatus(res));
    1453 GIC          24 :             PQclear(res);
    1454                 :         }
    1455 ECB             :         else
    1456                 :         {
    1457 UIC           0 :             PQclear(res);
    1458 LBC           0 :             ereport(ERROR,
    1459                 :                     (errcode(ERRCODE_S_R_E_PROHIBITED_SQL_STATEMENT_ATTEMPTED),
    1460 ECB             :                      errmsg("statement returning results not allowed")));
    1461                 :         }
    1462                 :     }
    1463 GIC           1 :     PG_FINALLY();
    1464                 :     {
    1465 ECB             :         /* if needed, close the connection to the database */
    1466 CBC          26 :         if (freeconn)
    1467 GNC           1 :             libpqsrv_disconnect(conn);
    1468                 :     }
    1469 GIC          26 :     PG_END_TRY();
    1470                 : 
    1471              25 :     PG_RETURN_TEXT_P(sql_cmd_status);
    1472 ECB             : }
    1473                 : 
    1474                 : 
    1475                 : /*
    1476                 :  * dblink_get_pkey
    1477                 :  *
    1478                 :  * Return list of primary key fields for the supplied relation,
    1479                 :  * or NULL if none exists.
    1480                 :  */
    1481 GIC           2 : PG_FUNCTION_INFO_V1(dblink_get_pkey);
    1482                 : Datum
    1483               9 : dblink_get_pkey(PG_FUNCTION_ARGS)
    1484                 : {
    1485 EUB             :     int16       indnkeyatts;
    1486                 :     char      **results;
    1487                 :     FuncCallContext *funcctx;
    1488                 :     int32       call_cntr;
    1489 ECB             :     int32       max_calls;
    1490                 :     AttInMetadata *attinmeta;
    1491                 :     MemoryContext oldcontext;
    1492                 : 
    1493                 :     /* stuff done only on the first call of the function */
    1494 GIC           9 :     if (SRF_IS_FIRSTCALL())
    1495                 :     {
    1496                 :         Relation    rel;
    1497                 :         TupleDesc   tupdesc;
    1498 ECB             : 
    1499                 :         /* create a function context for cross-call persistence */
    1500 GIC           3 :         funcctx = SRF_FIRSTCALL_INIT();
    1501 ECB             : 
    1502                 :         /*
    1503                 :          * switch to memory context appropriate for multiple function calls
    1504                 :          */
    1505 GIC           3 :         oldcontext = MemoryContextSwitchTo(funcctx->multi_call_memory_ctx);
    1506                 : 
    1507                 :         /* open target relation */
    1508               3 :         rel = get_rel_from_relname(PG_GETARG_TEXT_PP(0), AccessShareLock, ACL_SELECT);
    1509                 : 
    1510 ECB             :         /* get the array of attnums */
    1511 CBC           3 :         results = get_pkey_attnames(rel, &indnkeyatts);
    1512 ECB             : 
    1513 GIC           3 :         relation_close(rel, AccessShareLock);
    1514                 : 
    1515 ECB             :         /*
    1516                 :          * need a tuple descriptor representing one INT and one TEXT column
    1517                 :          */
    1518 CBC           3 :         tupdesc = CreateTemplateTupleDesc(2);
    1519 GIC           3 :         TupleDescInitEntry(tupdesc, (AttrNumber) 1, "position",
    1520 ECB             :                            INT4OID, -1, 0);
    1521 GIC           3 :         TupleDescInitEntry(tupdesc, (AttrNumber) 2, "colname",
    1522                 :                            TEXTOID, -1, 0);
    1523                 : 
    1524                 :         /*
    1525 ECB             :          * Generate attribute metadata needed later to produce tuples from raw
    1526                 :          * C strings
    1527                 :          */
    1528 GIC           3 :         attinmeta = TupleDescGetAttInMetadata(tupdesc);
    1529               3 :         funcctx->attinmeta = attinmeta;
    1530                 : 
    1531               3 :         if ((results != NULL) && (indnkeyatts > 0))
    1532                 :         {
    1533               3 :             funcctx->max_calls = indnkeyatts;
    1534                 : 
    1535                 :             /* got results, keep track of them */
    1536               3 :             funcctx->user_fctx = results;
    1537                 :         }
    1538                 :         else
    1539                 :         {
    1540                 :             /* fast track when no results */
    1541 UIC           0 :             MemoryContextSwitchTo(oldcontext);
    1542               0 :             SRF_RETURN_DONE(funcctx);
    1543                 :         }
    1544                 : 
    1545 GIC           3 :         MemoryContextSwitchTo(oldcontext);
    1546                 :     }
    1547                 : 
    1548                 :     /* stuff done on every call of the function */
    1549 CBC           9 :     funcctx = SRF_PERCALL_SETUP();
    1550                 : 
    1551 ECB             :     /*
    1552                 :      * initialize per-call variables
    1553                 :      */
    1554 CBC           9 :     call_cntr = funcctx->call_cntr;
    1555               9 :     max_calls = funcctx->max_calls;
    1556 ECB             : 
    1557 CBC           9 :     results = (char **) funcctx->user_fctx;
    1558 GIC           9 :     attinmeta = funcctx->attinmeta;
    1559                 : 
    1560               9 :     if (call_cntr < max_calls)   /* do when there is more left to send */
    1561                 :     {
    1562                 :         char      **values;
    1563                 :         HeapTuple   tuple;
    1564                 :         Datum       result;
    1565                 : 
    1566 GNC           6 :         values = palloc_array(char *, 2);
    1567 GIC           6 :         values[0] = psprintf("%d", call_cntr + 1);
    1568               6 :         values[1] = results[call_cntr];
    1569                 : 
    1570 ECB             :         /* build the tuple */
    1571 GIC           6 :         tuple = BuildTupleFromCStrings(attinmeta, values);
    1572                 : 
    1573                 :         /* make the tuple into a datum */
    1574               6 :         result = HeapTupleGetDatum(tuple);
    1575 ECB             : 
    1576 GIC           6 :         SRF_RETURN_NEXT(funcctx, result);
    1577                 :     }
    1578                 :     else
    1579                 :     {
    1580                 :         /* do when there is no more left */
    1581               3 :         SRF_RETURN_DONE(funcctx);
    1582 ECB             :     }
    1583                 : }
    1584                 : 
    1585                 : 
    1586                 : /*
    1587                 :  * dblink_build_sql_insert
    1588 EUB             :  *
    1589                 :  * Used to generate an SQL insert statement
    1590                 :  * based on an existing tuple in a local relation.
    1591                 :  * This is useful for selectively replicating data
    1592                 :  * to another server via dblink.
    1593                 :  *
    1594                 :  * API:
    1595                 :  * <relname> - name of local table of interest
    1596 ECB             :  * <pkattnums> - an int2vector of attnums which will be used
    1597                 :  * to identify the local tuple of interest
    1598                 :  * <pknumatts> - number of attnums in pkattnums
    1599                 :  * <src_pkattvals_arry> - text array of key values which will be used
    1600                 :  * to identify the local tuple of interest
    1601                 :  * <tgt_pkattvals_arry> - text array of key values which will be used
    1602 EUB             :  * to build the string for execution remotely. These are substituted
    1603                 :  * for their counterparts in src_pkattvals_arry
    1604                 :  */
    1605 GIC           3 : PG_FUNCTION_INFO_V1(dblink_build_sql_insert);
    1606                 : Datum
    1607               6 : dblink_build_sql_insert(PG_FUNCTION_ARGS)
    1608                 : {
    1609 CBC           6 :     text       *relname_text = PG_GETARG_TEXT_PP(0);
    1610 GIC           6 :     int2vector *pkattnums_arg = (int2vector *) PG_GETARG_POINTER(1);
    1611               6 :     int32       pknumatts_arg = PG_GETARG_INT32(2);
    1612               6 :     ArrayType  *src_pkattvals_arry = PG_GETARG_ARRAYTYPE_P(3);
    1613               6 :     ArrayType  *tgt_pkattvals_arry = PG_GETARG_ARRAYTYPE_P(4);
    1614 ECB             :     Relation    rel;
    1615                 :     int        *pkattnums;
    1616                 :     int         pknumatts;
    1617                 :     char      **src_pkattvals;
    1618                 :     char      **tgt_pkattvals;
    1619                 :     int         src_nitems;
    1620                 :     int         tgt_nitems;
    1621                 :     char       *sql;
    1622                 : 
    1623                 :     /*
    1624                 :      * Open target relation.
    1625                 :      */
    1626 GIC           6 :     rel = get_rel_from_relname(relname_text, AccessShareLock, ACL_SELECT);
    1627                 : 
    1628                 :     /*
    1629                 :      * Process pkattnums argument.
    1630                 :      */
    1631               6 :     validate_pkattnums(rel, pkattnums_arg, pknumatts_arg,
    1632                 :                        &pkattnums, &pknumatts);
    1633                 : 
    1634                 :     /*
    1635                 :      * Source array is made up of key values that will be used to locate the
    1636                 :      * tuple of interest from the local system.
    1637                 :      */
    1638 CBC           4 :     src_pkattvals = get_text_array_contents(src_pkattvals_arry, &src_nitems);
    1639                 : 
    1640 ECB             :     /*
    1641                 :      * There should be one source array key value for each key attnum
    1642                 :      */
    1643 CBC           4 :     if (src_nitems != pknumatts)
    1644 LBC           0 :         ereport(ERROR,
    1645 ECB             :                 (errcode(ERRCODE_ARRAY_SUBSCRIPT_ERROR),
    1646                 :                  errmsg("source key array length must match number of key attributes")));
    1647                 : 
    1648                 :     /*
    1649                 :      * Target array is made up of key values that will be used to build the
    1650                 :      * SQL string for use on the remote system.
    1651                 :      */
    1652 GIC           4 :     tgt_pkattvals = get_text_array_contents(tgt_pkattvals_arry, &tgt_nitems);
    1653                 : 
    1654                 :     /*
    1655                 :      * There should be one target array key value for each key attnum
    1656 ECB             :      */
    1657 GIC           4 :     if (tgt_nitems != pknumatts)
    1658 UIC           0 :         ereport(ERROR,
    1659                 :                 (errcode(ERRCODE_ARRAY_SUBSCRIPT_ERROR),
    1660                 :                  errmsg("target key array length must match number of key attributes")));
    1661 ECB             : 
    1662                 :     /*
    1663                 :      * Prep work is finally done. Go get the SQL string.
    1664                 :      */
    1665 GIC           4 :     sql = get_sql_insert(rel, pkattnums, pknumatts, src_pkattvals, tgt_pkattvals);
    1666                 : 
    1667                 :     /*
    1668 ECB             :      * Now we can close the relation.
    1669                 :      */
    1670 GIC           4 :     relation_close(rel, AccessShareLock);
    1671                 : 
    1672                 :     /*
    1673 ECB             :      * And send it
    1674 EUB             :      */
    1675 GIC           4 :     PG_RETURN_TEXT_P(cstring_to_text(sql));
    1676                 : }
    1677                 : 
    1678                 : 
    1679                 : /*
    1680                 :  * dblink_build_sql_delete
    1681 ECB             :  *
    1682                 :  * Used to generate an SQL delete statement.
    1683                 :  * This is useful for selectively replicating a
    1684                 :  * delete to another server via dblink.
    1685                 :  *
    1686                 :  * API:
    1687                 :  * <relname> - name of remote table of interest
    1688                 :  * <pkattnums> - an int2vector of attnums which will be used
    1689                 :  * to identify the remote tuple of interest
    1690                 :  * <pknumatts> - number of attnums in pkattnums
    1691                 :  * <tgt_pkattvals_arry> - text array of key values which will be used
    1692                 :  * to build the string for execution remotely.
    1693                 :  */
    1694 GIC           3 : PG_FUNCTION_INFO_V1(dblink_build_sql_delete);
    1695                 : Datum
    1696               6 : dblink_build_sql_delete(PG_FUNCTION_ARGS)
    1697                 : {
    1698               6 :     text       *relname_text = PG_GETARG_TEXT_PP(0);
    1699               6 :     int2vector *pkattnums_arg = (int2vector *) PG_GETARG_POINTER(1);
    1700               6 :     int32       pknumatts_arg = PG_GETARG_INT32(2);
    1701               6 :     ArrayType  *tgt_pkattvals_arry = PG_GETARG_ARRAYTYPE_P(3);
    1702                 :     Relation    rel;
    1703                 :     int        *pkattnums;
    1704                 :     int         pknumatts;
    1705                 :     char      **tgt_pkattvals;
    1706                 :     int         tgt_nitems;
    1707                 :     char       *sql;
    1708                 : 
    1709                 :     /*
    1710                 :      * Open target relation.
    1711                 :      */
    1712               6 :     rel = get_rel_from_relname(relname_text, AccessShareLock, ACL_SELECT);
    1713                 : 
    1714 ECB             :     /*
    1715                 :      * Process pkattnums argument.
    1716                 :      */
    1717 GIC           6 :     validate_pkattnums(rel, pkattnums_arg, pknumatts_arg,
    1718 ECB             :                        &pkattnums, &pknumatts);
    1719                 : 
    1720                 :     /*
    1721                 :      * Target array is made up of key values that will be used to build the
    1722                 :      * SQL string for use on the remote system.
    1723                 :      */
    1724 GIC           4 :     tgt_pkattvals = get_text_array_contents(tgt_pkattvals_arry, &tgt_nitems);
    1725                 : 
    1726                 :     /*
    1727                 :      * There should be one target array key value for each key attnum
    1728                 :      */
    1729               4 :     if (tgt_nitems != pknumatts)
    1730 UIC           0 :         ereport(ERROR,
    1731                 :                 (errcode(ERRCODE_ARRAY_SUBSCRIPT_ERROR),
    1732                 :                  errmsg("target key array length must match number of key attributes")));
    1733                 : 
    1734                 :     /*
    1735 ECB             :      * Prep work is finally done. Go get the SQL string.
    1736                 :      */
    1737 GIC           4 :     sql = get_sql_delete(rel, pkattnums, pknumatts, tgt_pkattvals);
    1738                 : 
    1739                 :     /*
    1740 ECB             :      * Now we can close the relation.
    1741                 :      */
    1742 GIC           4 :     relation_close(rel, AccessShareLock);
    1743                 : 
    1744                 :     /*
    1745                 :      * And send it
    1746                 :      */
    1747 CBC           4 :     PG_RETURN_TEXT_P(cstring_to_text(sql));
    1748                 : }
    1749                 : 
    1750                 : 
    1751                 : /*
    1752 ECB             :  * dblink_build_sql_update
    1753 EUB             :  *
    1754                 :  * Used to generate an SQL update statement
    1755                 :  * based on an existing tuple in a local relation.
    1756                 :  * This is useful for selectively replicating data
    1757                 :  * to another server via dblink.
    1758                 :  *
    1759                 :  * API:
    1760                 :  * <relname> - name of local table of interest
    1761 ECB             :  * <pkattnums> - an int2vector of attnums which will be used
    1762                 :  * to identify the local tuple of interest
    1763                 :  * <pknumatts> - number of attnums in pkattnums
    1764                 :  * <src_pkattvals_arry> - text array of key values which will be used
    1765                 :  * to identify the local tuple of interest
    1766                 :  * <tgt_pkattvals_arry> - text array of key values which will be used
    1767 EUB             :  * to build the string for execution remotely. These are substituted
    1768                 :  * for their counterparts in src_pkattvals_arry
    1769                 :  */
    1770 GIC           3 : PG_FUNCTION_INFO_V1(dblink_build_sql_update);
    1771                 : Datum
    1772               6 : dblink_build_sql_update(PG_FUNCTION_ARGS)
    1773                 : {
    1774 CBC           6 :     text       *relname_text = PG_GETARG_TEXT_PP(0);
    1775 GIC           6 :     int2vector *pkattnums_arg = (int2vector *) PG_GETARG_POINTER(1);
    1776               6 :     int32       pknumatts_arg = PG_GETARG_INT32(2);
    1777               6 :     ArrayType  *src_pkattvals_arry = PG_GETARG_ARRAYTYPE_P(3);
    1778               6 :     ArrayType  *tgt_pkattvals_arry = PG_GETARG_ARRAYTYPE_P(4);
    1779 ECB             :     Relation    rel;
    1780                 :     int        *pkattnums;
    1781                 :     int         pknumatts;
    1782                 :     char      **src_pkattvals;
    1783                 :     char      **tgt_pkattvals;
    1784                 :     int         src_nitems;
    1785                 :     int         tgt_nitems;
    1786                 :     char       *sql;
    1787                 : 
    1788                 :     /*
    1789                 :      * Open target relation.
    1790                 :      */
    1791 GIC           6 :     rel = get_rel_from_relname(relname_text, AccessShareLock, ACL_SELECT);
    1792                 : 
    1793 ECB             :     /*
    1794                 :      * Process pkattnums argument.
    1795 EUB             :      */
    1796 GIC           6 :     validate_pkattnums(rel, pkattnums_arg, pknumatts_arg,
    1797                 :                        &pkattnums, &pknumatts);
    1798 EUB             : 
    1799                 :     /*
    1800                 :      * Source array is made up of key values that will be used to locate the
    1801                 :      * tuple of interest from the local system.
    1802                 :      */
    1803 GIC           4 :     src_pkattvals = get_text_array_contents(src_pkattvals_arry, &src_nitems);
    1804                 : 
    1805                 :     /*
    1806                 :      * There should be one source array key value for each key attnum
    1807                 :      */
    1808               4 :     if (src_nitems != pknumatts)
    1809 UIC           0 :         ereport(ERROR,
    1810                 :                 (errcode(ERRCODE_ARRAY_SUBSCRIPT_ERROR),
    1811 ECB             :                  errmsg("source key array length must match number of key attributes")));
    1812                 : 
    1813                 :     /*
    1814                 :      * Target array is made up of key values that will be used to build the
    1815                 :      * SQL string for use on the remote system.
    1816                 :      */
    1817 CBC           4 :     tgt_pkattvals = get_text_array_contents(tgt_pkattvals_arry, &tgt_nitems);
    1818                 : 
    1819 ECB             :     /*
    1820                 :      * There should be one target array key value for each key attnum
    1821 EUB             :      */
    1822 GIC           4 :     if (tgt_nitems != pknumatts)
    1823 LBC           0 :         ereport(ERROR,
    1824                 :                 (errcode(ERRCODE_ARRAY_SUBSCRIPT_ERROR),
    1825 ECB             :                  errmsg("target key array length must match number of key attributes")));
    1826                 : 
    1827                 :     /*
    1828                 :      * Prep work is finally done. Go get the SQL string.
    1829                 :      */
    1830 GIC           4 :     sql = get_sql_update(rel, pkattnums, pknumatts, src_pkattvals, tgt_pkattvals);
    1831                 : 
    1832                 :     /*
    1833 ECB             :      * Now we can close the relation.
    1834                 :      */
    1835 GIC           4 :     relation_close(rel, AccessShareLock);
    1836 ECB             : 
    1837                 :     /*
    1838                 :      * And send it
    1839 EUB             :      */
    1840 GIC           4 :     PG_RETURN_TEXT_P(cstring_to_text(sql));
    1841 ECB             : }
    1842                 : 
    1843                 : /*
    1844                 :  * dblink_current_query
    1845                 :  * return the current query string
    1846 EUB             :  * to allow its use in (among other things)
    1847                 :  * rewrite rules
    1848 ECB             :  */
    1849 GIC           1 : PG_FUNCTION_INFO_V1(dblink_current_query);
    1850 ECB             : Datum
    1851 LBC           0 : dblink_current_query(PG_FUNCTION_ARGS)
    1852                 : {
    1853                 :     /* This is now just an alias for the built-in function current_query() */
    1854               0 :     PG_RETURN_DATUM(current_query(fcinfo));
    1855                 : }
    1856                 : 
    1857                 : /*
    1858                 :  * Retrieve async notifications for a connection.
    1859                 :  *
    1860                 :  * Returns a setof record of notifications, or an empty set if none received.
    1861                 :  * Can optionally take a named connection as parameter, but uses the unnamed
    1862                 :  * connection per default.
    1863                 :  *
    1864 ECB             :  */
    1865                 : #define DBLINK_NOTIFY_COLS      3
    1866                 : 
    1867 GIC           3 : PG_FUNCTION_INFO_V1(dblink_get_notify);
    1868 ECB             : Datum
    1869 CBC           2 : dblink_get_notify(PG_FUNCTION_ARGS)
    1870                 : {
    1871                 :     PGconn     *conn;
    1872                 :     PGnotify   *notify;
    1873 GIC           2 :     ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
    1874                 : 
    1875               2 :     dblink_init();
    1876               2 :     if (PG_NARGS() == 1)
    1877 UIC           0 :         conn = dblink_get_named_conn(text_to_cstring(PG_GETARG_TEXT_PP(0)));
    1878                 :     else
    1879 GIC           2 :         conn = pconn->conn;
    1880                 : 
    1881 CBC           2 :     InitMaterializedSRF(fcinfo, 0);
    1882                 : 
    1883               2 :     PQconsumeInput(conn);
    1884               4 :     while ((notify = PQnotifies(conn)) != NULL)
    1885 EUB             :     {
    1886                 :         Datum       values[DBLINK_NOTIFY_COLS];
    1887                 :         bool        nulls[DBLINK_NOTIFY_COLS];
    1888                 : 
    1889 GIC           2 :         memset(values, 0, sizeof(values));
    1890               2 :         memset(nulls, 0, sizeof(nulls));
    1891                 : 
    1892 CBC           2 :         if (notify->relname != NULL)
    1893 GIC           2 :             values[0] = CStringGetTextDatum(notify->relname);
    1894 ECB             :         else
    1895 UIC           0 :             nulls[0] = true;
    1896 ECB             : 
    1897 GIC           2 :         values[1] = Int32GetDatum(notify->be_pid);
    1898                 : 
    1899               2 :         if (notify->extra != NULL)
    1900               2 :             values[2] = CStringGetTextDatum(notify->extra);
    1901                 :         else
    1902 UIC           0 :             nulls[2] = true;
    1903                 : 
    1904 GIC           2 :         tuplestore_putvalues(rsinfo->setResult, rsinfo->setDesc, values, nulls);
    1905                 : 
    1906 CBC           2 :         PQfreemem(notify);
    1907 GIC           2 :         PQconsumeInput(conn);
    1908 ECB             :     }
    1909                 : 
    1910 GIC           2 :     return (Datum) 0;
    1911 ECB             : }
    1912                 : 
    1913                 : /*
    1914                 :  * Validate the options given to a dblink foreign server or user mapping.
    1915                 :  * Raise an error if any option is invalid.
    1916                 :  *
    1917                 :  * We just check the names of options here, so semantic errors in options,
    1918                 :  * such as invalid numeric format, will be detected at the attempt to connect.
    1919                 :  */
    1920 GIC           3 : PG_FUNCTION_INFO_V1(dblink_fdw_validator);
    1921                 : Datum
    1922               5 : dblink_fdw_validator(PG_FUNCTION_ARGS)
    1923                 : {
    1924               5 :     List       *options_list = untransformRelOptions(PG_GETARG_DATUM(0));
    1925               5 :     Oid         context = PG_GETARG_OID(1);
    1926                 :     ListCell   *cell;
    1927                 : 
    1928                 :     static const PQconninfoOption *options = NULL;
    1929 ECB             : 
    1930                 :     /*
    1931                 :      * Get list of valid libpq options.
    1932                 :      *
    1933                 :      * To avoid unnecessary work, we get the list once and use it throughout
    1934                 :      * the lifetime of this backend process.  We don't need to care about
    1935                 :      * memory context issues, because PQconndefaults allocates with malloc.
    1936                 :      */
    1937 GIC           5 :     if (!options)
    1938                 :     {
    1939               2 :         options = PQconndefaults();
    1940               2 :         if (!options)           /* assume reason for failure is OOM */
    1941 UIC           0 :             ereport(ERROR,
    1942                 :                     (errcode(ERRCODE_FDW_OUT_OF_MEMORY),
    1943                 :                      errmsg("out of memory"),
    1944                 :                      errdetail("Could not get libpq's default connection options.")));
    1945 ECB             :     }
    1946                 : 
    1947                 :     /* Validate each supplied option. */
    1948 GIC           8 :     foreach(cell, options_list)
    1949                 :     {
    1950               5 :         DefElem    *def = (DefElem *) lfirst(cell);
    1951                 : 
    1952 CBC           5 :         if (!is_valid_dblink_option(options, def->defname, context))
    1953                 :         {
    1954                 :             /*
    1955                 :              * Unknown option, or invalid option for the context specified, so
    1956                 :              * complain about it.  Provide a hint with a valid option that
    1957                 :              * looks similar, if there is one.
    1958 ECB             :              */
    1959                 :             const PQconninfoOption *opt;
    1960                 :             const char *closest_match;
    1961                 :             ClosestMatchState match_state;
    1962 GNC           2 :             bool        has_valid_options = false;
    1963 ECB             : 
    1964 GNC           2 :             initClosestMatch(&match_state, def->defname, 4);
    1965 GIC          80 :             for (opt = options; opt->keyword; opt++)
    1966                 :             {
    1967              78 :                 if (is_valid_dblink_option(options, opt->keyword, context))
    1968                 :                 {
    1969 GNC           3 :                     has_valid_options = true;
    1970               3 :                     updateClosestMatch(&match_state, opt->keyword);
    1971                 :                 }
    1972                 :             }
    1973                 : 
    1974               2 :             closest_match = getClosestMatch(&match_state);
    1975 CBC           2 :             ereport(ERROR,
    1976                 :                     (errcode(ERRCODE_FDW_OPTION_NAME_NOT_FOUND),
    1977 ECB             :                      errmsg("invalid option \"%s\"", def->defname),
    1978                 :                      has_valid_options ? closest_match ?
    1979                 :                      errhint("Perhaps you meant the option \"%s\".",
    1980                 :                              closest_match) : 0 :
    1981                 :                      errhint("There are no valid options in this context.")));
    1982                 :         }
    1983                 :     }
    1984                 : 
    1985 CBC           3 :     PG_RETURN_VOID();
    1986                 : }
    1987 ECB             : 
    1988                 : 
    1989                 : /*************************************************************
    1990                 :  * internal functions
    1991                 :  */
    1992                 : 
    1993                 : 
    1994                 : /*
    1995                 :  * get_pkey_attnames
    1996                 :  *
    1997                 :  * Get the primary key attnames for the given relation.
    1998                 :  * Return NULL, and set indnkeyatts = 0, if no primary key exists.
    1999                 :  */
    2000                 : static char **
    2001 GIC           3 : get_pkey_attnames(Relation rel, int16 *indnkeyatts)
    2002                 : {
    2003                 :     Relation    indexRelation;
    2004                 :     ScanKeyData skey;
    2005 ECB             :     SysScanDesc scan;
    2006                 :     HeapTuple   indexTuple;
    2007                 :     int         i;
    2008 CBC           3 :     char      **result = NULL;
    2009                 :     TupleDesc   tupdesc;
    2010                 : 
    2011                 :     /* initialize indnkeyatts to 0 in case no primary key exists */
    2012 GIC           3 :     *indnkeyatts = 0;
    2013                 : 
    2014               3 :     tupdesc = rel->rd_att;
    2015                 : 
    2016                 :     /* Prepare to scan pg_index for entries having indrelid = this rel. */
    2017               3 :     indexRelation = table_open(IndexRelationId, AccessShareLock);
    2018               3 :     ScanKeyInit(&skey,
    2019 ECB             :                 Anum_pg_index_indrelid,
    2020                 :                 BTEqualStrategyNumber, F_OIDEQ,
    2021                 :                 ObjectIdGetDatum(RelationGetRelid(rel)));
    2022                 : 
    2023 CBC           3 :     scan = systable_beginscan(indexRelation, IndexIndrelidIndexId, true,
    2024                 :                               NULL, 1, &skey);
    2025                 : 
    2026               3 :     while (HeapTupleIsValid(indexTuple = systable_getnext(scan)))
    2027                 :     {
    2028               3 :         Form_pg_index index = (Form_pg_index) GETSTRUCT(indexTuple);
    2029 ECB             : 
    2030                 :         /* we're only interested if it is the primary key */
    2031 GIC           3 :         if (index->indisprimary)
    2032 ECB             :         {
    2033 GIC           3 :             *indnkeyatts = index->indnkeyatts;
    2034 CBC           3 :             if (*indnkeyatts > 0)
    2035                 :             {
    2036 GNC           3 :                 result = palloc_array(char *, *indnkeyatts);
    2037                 : 
    2038 GIC           9 :                 for (i = 0; i < *indnkeyatts; i++)
    2039               6 :                     result[i] = SPI_fname(tupdesc, index->indkey.values[i]);
    2040 ECB             :             }
    2041 CBC           3 :             break;
    2042 ECB             :         }
    2043                 :     }
    2044                 : 
    2045 GIC           3 :     systable_endscan(scan);
    2046 CBC           3 :     table_close(indexRelation, AccessShareLock);
    2047                 : 
    2048 GBC           3 :     return result;
    2049 EUB             : }
    2050                 : 
    2051                 : /*
    2052                 :  * Deconstruct a text[] into C-strings (note any NULL elements will be
    2053                 :  * returned as NULL pointers)
    2054                 :  */
    2055                 : static char **
    2056 GIC          20 : get_text_array_contents(ArrayType *array, int *numitems)
    2057 ECB             : {
    2058 GIC          20 :     int         ndim = ARR_NDIM(array);
    2059              20 :     int        *dims = ARR_DIMS(array);
    2060                 :     int         nitems;
    2061 ECB             :     int16       typlen;
    2062                 :     bool        typbyval;
    2063                 :     char        typalign;
    2064                 :     char      **values;
    2065                 :     char       *ptr;
    2066                 :     bits8      *bitmap;
    2067                 :     int         bitmask;
    2068                 :     int         i;
    2069                 : 
    2070 GIC          20 :     Assert(ARR_ELEMTYPE(array) == TEXTOID);
    2071                 : 
    2072              20 :     *numitems = nitems = ArrayGetNItems(ndim, dims);
    2073 ECB             : 
    2074 GIC          20 :     get_typlenbyvalalign(ARR_ELEMTYPE(array),
    2075                 :                          &typlen, &typbyval, &typalign);
    2076 ECB             : 
    2077 GNC          20 :     values = palloc_array(char *, nitems);
    2078 ECB             : 
    2079 CBC          20 :     ptr = ARR_DATA_PTR(array);
    2080 GIC          20 :     bitmap = ARR_NULLBITMAP(array);
    2081 CBC          20 :     bitmask = 1;
    2082 ECB             : 
    2083 GBC          55 :     for (i = 0; i < nitems; i++)
    2084                 :     {
    2085 GIC          35 :         if (bitmap && (*bitmap & bitmask) == 0)
    2086                 :         {
    2087 LBC           0 :             values[i] = NULL;
    2088                 :         }
    2089 ECB             :         else
    2090                 :         {
    2091 GIC          35 :             values[i] = TextDatumGetCString(PointerGetDatum(ptr));
    2092 CBC          35 :             ptr = att_addlength_pointer(ptr, typlen, ptr);
    2093 GIC          35 :             ptr = (char *) att_align_nominal(ptr, typalign);
    2094 ECB             :         }
    2095                 : 
    2096                 :         /* advance bitmap pointer if any */
    2097 CBC          35 :         if (bitmap)
    2098 ECB             :         {
    2099 UIC           0 :             bitmask <<= 1;
    2100 LBC           0 :             if (bitmask == 0x100)
    2101 ECB             :             {
    2102 LBC           0 :                 bitmap++;
    2103 UIC           0 :                 bitmask = 1;
    2104                 :             }
    2105 ECB             :         }
    2106                 :     }
    2107                 : 
    2108 GIC          20 :     return values;
    2109                 : }
    2110 ECB             : 
    2111                 : static char *
    2112 GIC           4 : get_sql_insert(Relation rel, int *pkattnums, int pknumatts, char **src_pkattvals, char **tgt_pkattvals)
    2113 ECB             : {
    2114                 :     char       *relname;
    2115                 :     HeapTuple   tuple;
    2116                 :     TupleDesc   tupdesc;
    2117                 :     int         natts;
    2118                 :     StringInfoData buf;
    2119                 :     char       *val;
    2120                 :     int         key;
    2121                 :     int         i;
    2122                 :     bool        needComma;
    2123                 : 
    2124 CBC           4 :     initStringInfo(&buf);
    2125                 : 
    2126 ECB             :     /* get relation name including any needed schema prefix and quoting */
    2127 GIC           4 :     relname = generate_relation_name(rel);
    2128 ECB             : 
    2129 CBC           4 :     tupdesc = rel->rd_att;
    2130 GIC           4 :     natts = tupdesc->natts;
    2131                 : 
    2132 GBC           4 :     tuple = get_tuple_of_interest(rel, pkattnums, pknumatts, src_pkattvals);
    2133 CBC           4 :     if (!tuple)
    2134 UIC           0 :         ereport(ERROR,
    2135 ECB             :                 (errcode(ERRCODE_CARDINALITY_VIOLATION),
    2136                 :                  errmsg("source row not found")));
    2137                 : 
    2138 GIC           4 :     appendStringInfo(&buf, "INSERT INTO %s(", relname);
    2139                 : 
    2140               4 :     needComma = false;
    2141 CBC          19 :     for (i = 0; i < natts; i++)
    2142                 :     {
    2143 GIC          15 :         Form_pg_attribute att = TupleDescAttr(tupdesc, i);
    2144                 : 
    2145              15 :         if (att->attisdropped)
    2146               2 :             continue;
    2147                 : 
    2148 CBC          13 :         if (needComma)
    2149 GIC           9 :             appendStringInfoChar(&buf, ',');
    2150                 : 
    2151 CBC          13 :         appendStringInfoString(&buf,
    2152 GIC          13 :                                quote_ident_cstr(NameStr(att->attname)));
    2153 CBC          13 :         needComma = true;
    2154                 :     }
    2155 ECB             : 
    2156 CBC           4 :     appendStringInfoString(&buf, ") VALUES(");
    2157                 : 
    2158 ECB             :     /*
    2159                 :      * Note: i is physical column number (counting from 0).
    2160                 :      */
    2161 CBC           4 :     needComma = false;
    2162              19 :     for (i = 0; i < natts; i++)
    2163                 :     {
    2164              15 :         if (TupleDescAttr(tupdesc, i)->attisdropped)
    2165               2 :             continue;
    2166                 : 
    2167              13 :         if (needComma)
    2168               9 :             appendStringInfoChar(&buf, ',');
    2169 ECB             : 
    2170 GIC          13 :         key = get_attnum_pk_pos(pkattnums, pknumatts, i);
    2171 EUB             : 
    2172 GIC          13 :         if (key >= 0)
    2173               7 :             val = tgt_pkattvals[key] ? pstrdup(tgt_pkattvals[key]) : NULL;
    2174 ECB             :         else
    2175 GIC           6 :             val = SPI_getvalue(tuple, tupdesc, i + 1);
    2176                 : 
    2177              13 :         if (val != NULL)
    2178 ECB             :         {
    2179 GIC          13 :             appendStringInfoString(&buf, quote_literal_cstr(val));
    2180              13 :             pfree(val);
    2181                 :         }
    2182                 :         else
    2183 UIC           0 :             appendStringInfoString(&buf, "NULL");
    2184 GIC          13 :         needComma = true;
    2185                 :     }
    2186               4 :     appendStringInfoChar(&buf, ')');
    2187                 : 
    2188               4 :     return buf.data;
    2189                 : }
    2190 ECB             : 
    2191                 : static char *
    2192 GIC           4 : get_sql_delete(Relation rel, int *pkattnums, int pknumatts, char **tgt_pkattvals)
    2193 ECB             : {
    2194                 :     char       *relname;
    2195                 :     TupleDesc   tupdesc;
    2196                 :     StringInfoData buf;
    2197                 :     int         i;
    2198                 : 
    2199 CBC           4 :     initStringInfo(&buf);
    2200 EUB             : 
    2201                 :     /* get relation name including any needed schema prefix and quoting */
    2202 GIC           4 :     relname = generate_relation_name(rel);
    2203                 : 
    2204 CBC           4 :     tupdesc = rel->rd_att;
    2205                 : 
    2206 GIC           4 :     appendStringInfo(&buf, "DELETE FROM %s WHERE ", relname);
    2207              11 :     for (i = 0; i < pknumatts; i++)
    2208                 :     {
    2209 CBC           7 :         int         pkattnum = pkattnums[i];
    2210               7 :         Form_pg_attribute attr = TupleDescAttr(tupdesc, pkattnum);
    2211                 : 
    2212               7 :         if (i > 0)
    2213 GIC           3 :             appendStringInfoString(&buf, " AND ");
    2214 ECB             : 
    2215 CBC           7 :         appendStringInfoString(&buf,
    2216 GIC           7 :                                quote_ident_cstr(NameStr(attr->attname)));
    2217 ECB             : 
    2218 CBC           7 :         if (tgt_pkattvals[i] != NULL)
    2219 GIC           7 :             appendStringInfo(&buf, " = %s",
    2220 CBC           7 :                              quote_literal_cstr(tgt_pkattvals[i]));
    2221 ECB             :         else
    2222 UIC           0 :             appendStringInfoString(&buf, " IS NULL");
    2223 ECB             :     }
    2224                 : 
    2225 CBC           4 :     return buf.data;
    2226 ECB             : }
    2227                 : 
    2228                 : static char *
    2229 GIC           4 : get_sql_update(Relation rel, int *pkattnums, int pknumatts, char **src_pkattvals, char **tgt_pkattvals)
    2230 ECB             : {
    2231                 :     char       *relname;
    2232                 :     HeapTuple   tuple;
    2233                 :     TupleDesc   tupdesc;
    2234                 :     int         natts;
    2235                 :     StringInfoData buf;
    2236 EUB             :     char       *val;
    2237 ECB             :     int         key;
    2238                 :     int         i;
    2239                 :     bool        needComma;
    2240                 : 
    2241 GIC           4 :     initStringInfo(&buf);
    2242 ECB             : 
    2243                 :     /* get relation name including any needed schema prefix and quoting */
    2244 CBC           4 :     relname = generate_relation_name(rel);
    2245 ECB             : 
    2246 GIC           4 :     tupdesc = rel->rd_att;
    2247 CBC           4 :     natts = tupdesc->natts;
    2248 ECB             : 
    2249 GIC           4 :     tuple = get_tuple_of_interest(rel, pkattnums, pknumatts, src_pkattvals);
    2250 CBC           4 :     if (!tuple)
    2251 LBC           0 :         ereport(ERROR,
    2252                 :                 (errcode(ERRCODE_CARDINALITY_VIOLATION),
    2253 ECB             :                  errmsg("source row not found")));
    2254                 : 
    2255 CBC           4 :     appendStringInfo(&buf, "UPDATE %s SET ", relname);
    2256 ECB             : 
    2257                 :     /*
    2258 EUB             :      * Note: i is physical column number (counting from 0).
    2259                 :      */
    2260 GIC           4 :     needComma = false;
    2261 CBC          19 :     for (i = 0; i < natts; i++)
    2262                 :     {
    2263 GIC          15 :         Form_pg_attribute attr = TupleDescAttr(tupdesc, i);
    2264                 : 
    2265              15 :         if (attr->attisdropped)
    2266               2 :             continue;
    2267                 : 
    2268              13 :         if (needComma)
    2269 CBC           9 :             appendStringInfoString(&buf, ", ");
    2270                 : 
    2271 GIC          13 :         appendStringInfo(&buf, "%s = ",
    2272              13 :                          quote_ident_cstr(NameStr(attr->attname)));
    2273                 : 
    2274              13 :         key = get_attnum_pk_pos(pkattnums, pknumatts, i);
    2275 ECB             : 
    2276 CBC          13 :         if (key >= 0)
    2277 GIC           7 :             val = tgt_pkattvals[key] ? pstrdup(tgt_pkattvals[key]) : NULL;
    2278 ECB             :         else
    2279 GIC           6 :             val = SPI_getvalue(tuple, tupdesc, i + 1);
    2280 ECB             : 
    2281 GIC          13 :         if (val != NULL)
    2282                 :         {
    2283              13 :             appendStringInfoString(&buf, quote_literal_cstr(val));
    2284 CBC          13 :             pfree(val);
    2285                 :         }
    2286                 :         else
    2287 UIC           0 :             appendStringInfoString(&buf, "NULL");
    2288 GIC          13 :         needComma = true;
    2289                 :     }
    2290                 : 
    2291 CBC           4 :     appendStringInfoString(&buf, " WHERE ");
    2292 ECB             : 
    2293 CBC          11 :     for (i = 0; i < pknumatts; i++)
    2294                 :     {
    2295               7 :         int         pkattnum = pkattnums[i];
    2296 GIC           7 :         Form_pg_attribute attr = TupleDescAttr(tupdesc, pkattnum);
    2297                 : 
    2298               7 :         if (i > 0)
    2299 CBC           3 :             appendStringInfoString(&buf, " AND ");
    2300                 : 
    2301 GIC           7 :         appendStringInfoString(&buf,
    2302               7 :                                quote_ident_cstr(NameStr(attr->attname)));
    2303                 : 
    2304               7 :         val = tgt_pkattvals[i];
    2305                 : 
    2306               7 :         if (val != NULL)
    2307               7 :             appendStringInfo(&buf, " = %s", quote_literal_cstr(val));
    2308                 :         else
    2309 UIC           0 :             appendStringInfoString(&buf, " IS NULL");
    2310                 :     }
    2311                 : 
    2312 CBC           4 :     return buf.data;
    2313                 : }
    2314 EUB             : 
    2315                 : /*
    2316 ECB             :  * Return a properly quoted identifier.
    2317                 :  * Uses quote_ident in quote.c
    2318                 :  */
    2319                 : static char *
    2320 GIC          80 : quote_ident_cstr(char *rawstr)
    2321 ECB             : {
    2322                 :     text       *rawstr_text;
    2323                 :     text       *result_text;
    2324                 :     char       *result;
    2325                 : 
    2326 GIC          80 :     rawstr_text = cstring_to_text(rawstr);
    2327              80 :     result_text = DatumGetTextPP(DirectFunctionCall1(quote_ident,
    2328                 :                                                      PointerGetDatum(rawstr_text)));
    2329              80 :     result = text_to_cstring(result_text);
    2330                 : 
    2331 CBC          80 :     return result;
    2332                 : }
    2333 ECB             : 
    2334                 : static int
    2335 CBC          26 : get_attnum_pk_pos(int *pkattnums, int pknumatts, int key)
    2336                 : {
    2337 ECB             :     int         i;
    2338                 : 
    2339                 :     /*
    2340                 :      * Not likely a long list anyway, so just scan for the value
    2341                 :      */
    2342 GIC          50 :     for (i = 0; i < pknumatts; i++)
    2343 CBC          38 :         if (key == pkattnums[i])
    2344              14 :             return i;
    2345                 : 
    2346 GIC          12 :     return -1;
    2347 ECB             : }
    2348                 : 
    2349                 : static HeapTuple
    2350 GIC           8 : get_tuple_of_interest(Relation rel, int *pkattnums, int pknumatts, char **src_pkattvals)
    2351 ECB             : {
    2352                 :     char       *relname;
    2353                 :     TupleDesc   tupdesc;
    2354                 :     int         natts;
    2355                 :     StringInfoData buf;
    2356                 :     int         ret;
    2357                 :     HeapTuple   tuple;
    2358                 :     int         i;
    2359                 : 
    2360                 :     /*
    2361                 :      * Connect to SPI manager
    2362                 :      */
    2363 GIC           8 :     if ((ret = SPI_connect()) < 0)
    2364 EUB             :         /* internal error */
    2365 UIC           0 :         elog(ERROR, "SPI connect failure - returned %d", ret);
    2366                 : 
    2367 GIC           8 :     initStringInfo(&buf);
    2368                 : 
    2369                 :     /* get relation name including any needed schema prefix and quoting */
    2370 CBC           8 :     relname = generate_relation_name(rel);
    2371 ECB             : 
    2372 GIC           8 :     tupdesc = rel->rd_att;
    2373               8 :     natts = tupdesc->natts;
    2374                 : 
    2375                 :     /*
    2376 ECB             :      * Build sql statement to look up tuple of interest, ie, the one matching
    2377 EUB             :      * src_pkattvals.  We used to use "SELECT *" here, but it's simpler to
    2378                 :      * generate a result tuple that matches the table's physical structure,
    2379                 :      * with NULLs for any dropped columns.  Otherwise we have to deal with two
    2380                 :      * different tupdescs and everything's very confusing.
    2381 ECB             :      */
    2382 GIC           8 :     appendStringInfoString(&buf, "SELECT ");
    2383 ECB             : 
    2384 GIC          38 :     for (i = 0; i < natts; i++)
    2385 ECB             :     {
    2386 CBC          30 :         Form_pg_attribute attr = TupleDescAttr(tupdesc, i);
    2387                 : 
    2388              30 :         if (i > 0)
    2389 GIC          22 :             appendStringInfoString(&buf, ", ");
    2390                 : 
    2391              30 :         if (attr->attisdropped)
    2392               4 :             appendStringInfoString(&buf, "NULL");
    2393                 :         else
    2394              26 :             appendStringInfoString(&buf,
    2395 GBC          26 :                                    quote_ident_cstr(NameStr(attr->attname)));
    2396                 :     }
    2397 EUB             : 
    2398 GIC           8 :     appendStringInfo(&buf, " FROM %s WHERE ", relname);
    2399                 : 
    2400              22 :     for (i = 0; i < pknumatts; i++)
    2401                 :     {
    2402              14 :         int         pkattnum = pkattnums[i];
    2403              14 :         Form_pg_attribute attr = TupleDescAttr(tupdesc, pkattnum);
    2404                 : 
    2405              14 :         if (i > 0)
    2406               6 :             appendStringInfoString(&buf, " AND ");
    2407                 : 
    2408              14 :         appendStringInfoString(&buf,
    2409              14 :                                quote_ident_cstr(NameStr(attr->attname)));
    2410                 : 
    2411              14 :         if (src_pkattvals[i] != NULL)
    2412 CBC          14 :             appendStringInfo(&buf, " = %s",
    2413 GIC          14 :                              quote_literal_cstr(src_pkattvals[i]));
    2414                 :         else
    2415 UIC           0 :             appendStringInfoString(&buf, " IS NULL");
    2416                 :     }
    2417                 : 
    2418 ECB             :     /*
    2419                 :      * Retrieve the desired tuple
    2420                 :      */
    2421 CBC           8 :     ret = SPI_exec(buf.data, 0);
    2422 GIC           8 :     pfree(buf.data);
    2423 ECB             : 
    2424 EUB             :     /*
    2425                 :      * Only allow one qualifying tuple
    2426                 :      */
    2427 CBC           8 :     if ((ret == SPI_OK_SELECT) && (SPI_processed > 1))
    2428 UIC           0 :         ereport(ERROR,
    2429                 :                 (errcode(ERRCODE_CARDINALITY_VIOLATION),
    2430                 :                  errmsg("source criteria matched more than one record")));
    2431                 : 
    2432 GIC           8 :     else if (ret == SPI_OK_SELECT && SPI_processed == 1)
    2433                 :     {
    2434               8 :         SPITupleTable *tuptable = SPI_tuptable;
    2435                 : 
    2436               8 :         tuple = SPI_copytuple(tuptable->vals[0]);
    2437 CBC           8 :         SPI_finish();
    2438                 : 
    2439 GIC           8 :         return tuple;
    2440                 :     }
    2441                 :     else
    2442                 :     {
    2443 ECB             :         /*
    2444                 :          * no qualifying tuples
    2445                 :          */
    2446 LBC           0 :         SPI_finish();
    2447                 : 
    2448               0 :         return NULL;
    2449                 :     }
    2450 ECB             : 
    2451                 :     /*
    2452                 :      * never reached, but keep compiler quiet
    2453                 :      */
    2454                 :     return NULL;
    2455                 : }
    2456                 : 
    2457                 : /*
    2458                 :  * Open the relation named by relname_text, acquire specified type of lock,
    2459                 :  * verify we have specified permissions.
    2460                 :  * Caller must close rel when done with it.
    2461                 :  */
    2462                 : static Relation
    2463 CBC          21 : get_rel_from_relname(text *relname_text, LOCKMODE lockmode, AclMode aclmode)
    2464 ECB             : {
    2465                 :     RangeVar   *relvar;
    2466                 :     Relation    rel;
    2467                 :     AclResult   aclresult;
    2468                 : 
    2469 CBC          21 :     relvar = makeRangeVarFromNameList(textToQualifiedNameList(relname_text));
    2470 GIC          21 :     rel = table_openrv(relvar, lockmode);
    2471 ECB             : 
    2472 GIC          21 :     aclresult = pg_class_aclcheck(RelationGetRelid(rel), GetUserId(),
    2473                 :                                   aclmode);
    2474              21 :     if (aclresult != ACLCHECK_OK)
    2475 LBC           0 :         aclcheck_error(aclresult, get_relkind_objtype(rel->rd_rel->relkind),
    2476 UIC           0 :                        RelationGetRelationName(rel));
    2477                 : 
    2478 GIC          21 :     return rel;
    2479 ECB             : }
    2480                 : 
    2481                 : /*
    2482                 :  * generate_relation_name - copied from ruleutils.c
    2483                 :  *      Compute the name to display for a relation
    2484                 :  *
    2485                 :  * The result includes all necessary quoting and schema-prefixing.
    2486                 :  */
    2487                 : static char *
    2488 GIC          20 : generate_relation_name(Relation rel)
    2489                 : {
    2490                 :     char       *nspname;
    2491                 :     char       *result;
    2492                 : 
    2493 ECB             :     /* Qualify the name if not visible in search path */
    2494 CBC          20 :     if (RelationIsVisible(RelationGetRelid(rel)))
    2495 GIC          15 :         nspname = NULL;
    2496 ECB             :     else
    2497 CBC           5 :         nspname = get_namespace_name(rel->rd_rel->relnamespace);
    2498 ECB             : 
    2499 GIC          20 :     result = quote_qualified_identifier(nspname, RelationGetRelationName(rel));
    2500                 : 
    2501 CBC          20 :     return result;
    2502                 : }
    2503 ECB             : 
    2504                 : 
    2505                 : static remoteConn *
    2506 CBC          75 : getConnectionByName(const char *name)
    2507                 : {
    2508                 :     remoteConnHashEnt *hentry;
    2509                 :     char       *key;
    2510                 : 
    2511              75 :     if (!remoteConnHash)
    2512               2 :         remoteConnHash = createConnHash();
    2513 ECB             : 
    2514 GIC          75 :     key = pstrdup(name);
    2515              75 :     truncate_identifier(key, strlen(key), false);
    2516 CBC          75 :     hentry = (remoteConnHashEnt *) hash_search(remoteConnHash,
    2517                 :                                                key, HASH_FIND, NULL);
    2518                 : 
    2519 GIC          75 :     if (hentry)
    2520              68 :         return hentry->rconn;
    2521                 : 
    2522 CBC           7 :     return NULL;
    2523 EUB             : }
    2524                 : 
    2525 ECB             : static HTAB *
    2526 CBC           3 : createConnHash(void)
    2527 ECB             : {
    2528                 :     HASHCTL     ctl;
    2529                 : 
    2530 CBC           3 :     ctl.keysize = NAMEDATALEN;
    2531 GBC           3 :     ctl.entrysize = sizeof(remoteConnHashEnt);
    2532                 : 
    2533 GIC           3 :     return hash_create("Remote Con hash", NUMCONN, &ctl,
    2534 ECB             :                        HASH_ELEM | HASH_STRINGS);
    2535                 : }
    2536                 : 
    2537                 : static void
    2538 GIC          10 : createNewConnection(const char *name, remoteConn *rconn)
    2539 ECB             : {
    2540                 :     remoteConnHashEnt *hentry;
    2541 EUB             :     bool        found;
    2542                 :     char       *key;
    2543                 : 
    2544 GBC          10 :     if (!remoteConnHash)
    2545               1 :         remoteConnHash = createConnHash();
    2546                 : 
    2547              10 :     key = pstrdup(name);
    2548 GIC          10 :     truncate_identifier(key, strlen(key), true);
    2549              10 :     hentry = (remoteConnHashEnt *) hash_search(remoteConnHash, key,
    2550                 :                                                HASH_ENTER, &found);
    2551                 : 
    2552              10 :     if (found)
    2553                 :     {
    2554 GNC           1 :         libpqsrv_disconnect(rconn->conn);
    2555 GIC           1 :         pfree(rconn);
    2556                 : 
    2557               1 :         ereport(ERROR,
    2558                 :                 (errcode(ERRCODE_DUPLICATE_OBJECT),
    2559                 :                  errmsg("duplicate connection name")));
    2560                 :     }
    2561                 : 
    2562 CBC           9 :     hentry->rconn = rconn;
    2563 GIC           9 :     strlcpy(hentry->name, name, sizeof(hentry->name));
    2564 CBC           9 : }
    2565                 : 
    2566                 : static void
    2567 GIC           8 : deleteConnection(const char *name)
    2568 ECB             : {
    2569                 :     remoteConnHashEnt *hentry;
    2570                 :     bool        found;
    2571                 :     char       *key;
    2572                 : 
    2573 CBC           8 :     if (!remoteConnHash)
    2574 UIC           0 :         remoteConnHash = createConnHash();
    2575 ECB             : 
    2576 GIC           8 :     key = pstrdup(name);
    2577 CBC           8 :     truncate_identifier(key, strlen(key), false);
    2578 GIC           8 :     hentry = (remoteConnHashEnt *) hash_search(remoteConnHash,
    2579 EUB             :                                                key, HASH_REMOVE, &found);
    2580                 : 
    2581 GIC           8 :     if (!hentry)
    2582 UIC           0 :         ereport(ERROR,
    2583                 :                 (errcode(ERRCODE_UNDEFINED_OBJECT),
    2584 ECB             :                  errmsg("undefined connection name")));
    2585 GIC           8 : }
    2586                 : 
    2587 ECB             : static void
    2588 CBC          19 : dblink_security_check(PGconn *conn, remoteConn *rconn)
    2589                 : {
    2590 GIC          19 :     if (!superuser())
    2591                 :     {
    2592 UIC           0 :         if (!PQconnectionUsedPassword(conn))
    2593 ECB             :         {
    2594 UNC           0 :             libpqsrv_disconnect(conn);
    2595 UIC           0 :             if (rconn)
    2596               0 :                 pfree(rconn);
    2597                 : 
    2598               0 :             ereport(ERROR,
    2599                 :                     (errcode(ERRCODE_S_R_E_PROHIBITED_SQL_STATEMENT_ATTEMPTED),
    2600                 :                      errmsg("password is required"),
    2601                 :                      errdetail("Non-superuser cannot connect if the server does not request a password."),
    2602                 :                      errhint("Target server's authentication method must be changed.")));
    2603 ECB             :         }
    2604                 :     }
    2605 GIC          19 : }
    2606                 : 
    2607 ECB             : /*
    2608                 :  * For non-superusers, insist that the connstr specify a password.  This
    2609                 :  * prevents a password from being picked up from .pgpass, a service file,
    2610                 :  * the environment, etc.  We don't want the postgres user's passwords
    2611                 :  * to be accessible to non-superusers.
    2612                 :  */
    2613                 : static void
    2614 GIC          22 : dblink_connstr_check(const char *connstr)
    2615                 : {
    2616              22 :     if (!superuser())
    2617                 :     {
    2618                 :         PQconninfoOption *options;
    2619                 :         PQconninfoOption *option;
    2620 CBC           1 :         bool        connstr_gives_password = false;
    2621 ECB             : 
    2622 GIC           1 :         options = PQconninfoParse(connstr, NULL);
    2623 CBC           1 :         if (options)
    2624                 :         {
    2625              40 :             for (option = options; option->keyword != NULL; option++)
    2626 ECB             :             {
    2627 GIC          39 :                 if (strcmp(option->keyword, "password") == 0)
    2628                 :                 {
    2629               1 :                     if (option->val != NULL && option->val[0] != '\0')
    2630                 :                     {
    2631 UIC           0 :                         connstr_gives_password = true;
    2632 UBC           0 :                         break;
    2633                 :                     }
    2634 ECB             :                 }
    2635                 :             }
    2636 CBC           1 :             PQconninfoFree(options);
    2637 ECB             :         }
    2638                 : 
    2639 GIC           1 :         if (!connstr_gives_password)
    2640               1 :             ereport(ERROR,
    2641                 :                     (errcode(ERRCODE_S_R_E_PROHIBITED_SQL_STATEMENT_ATTEMPTED),
    2642                 :                      errmsg("password is required"),
    2643                 :                      errdetail("Non-superusers must provide a password in the connection string.")));
    2644 ECB             :     }
    2645 GBC          21 : }
    2646                 : 
    2647                 : /*
    2648                 :  * Report an error received from the remote server
    2649                 :  *
    2650                 :  * res: the received error result (will be freed)
    2651                 :  * fail: true for ERROR ereport, false for NOTICE
    2652                 :  * fmt and following args: sprintf-style format and values for errcontext;
    2653 ECB             :  * the resulting string should be worded like "while <some action>"
    2654                 :  */
    2655                 : static void
    2656 GIC          12 : dblink_res_error(PGconn *conn, const char *conname, PGresult *res,
    2657                 :                  bool fail, const char *fmt,...)
    2658                 : {
    2659                 :     int         level;
    2660              12 :     char       *pg_diag_sqlstate = PQresultErrorField(res, PG_DIAG_SQLSTATE);
    2661              12 :     char       *pg_diag_message_primary = PQresultErrorField(res, PG_DIAG_MESSAGE_PRIMARY);
    2662 CBC          12 :     char       *pg_diag_message_detail = PQresultErrorField(res, PG_DIAG_MESSAGE_DETAIL);
    2663              12 :     char       *pg_diag_message_hint = PQresultErrorField(res, PG_DIAG_MESSAGE_HINT);
    2664              12 :     char       *pg_diag_context = PQresultErrorField(res, PG_DIAG_CONTEXT);
    2665                 :     int         sqlstate;
    2666 ECB             :     char       *message_primary;
    2667                 :     char       *message_detail;
    2668                 :     char       *message_hint;
    2669                 :     char       *message_context;
    2670                 :     va_list     ap;
    2671                 :     char        dblink_context_msg[512];
    2672                 : 
    2673 GIC          12 :     if (fail)
    2674               3 :         level = ERROR;
    2675                 :     else
    2676               9 :         level = NOTICE;
    2677                 : 
    2678              12 :     if (pg_diag_sqlstate)
    2679 CBC          12 :         sqlstate = MAKE_SQLSTATE(pg_diag_sqlstate[0],
    2680                 :                                  pg_diag_sqlstate[1],
    2681                 :                                  pg_diag_sqlstate[2],
    2682                 :                                  pg_diag_sqlstate[3],
    2683                 :                                  pg_diag_sqlstate[4]);
    2684                 :     else
    2685 LBC           0 :         sqlstate = ERRCODE_CONNECTION_FAILURE;
    2686                 : 
    2687 CBC          12 :     message_primary = xpstrdup(pg_diag_message_primary);
    2688 GIC          12 :     message_detail = xpstrdup(pg_diag_message_detail);
    2689              12 :     message_hint = xpstrdup(pg_diag_message_hint);
    2690              12 :     message_context = xpstrdup(pg_diag_context);
    2691                 : 
    2692                 :     /*
    2693                 :      * If we don't get a message from the PGresult, try the PGconn.  This is
    2694                 :      * needed because for connection-level failures, PQexec may just return
    2695                 :      * NULL, not a PGresult at all.
    2696                 :      */
    2697 CBC          12 :     if (message_primary == NULL)
    2698 UIC           0 :         message_primary = pchomp(PQerrorMessage(conn));
    2699                 : 
    2700                 :     /*
    2701                 :      * Now that we've copied all the data we need out of the PGresult, it's
    2702                 :      * safe to free it.  We must do this to avoid PGresult leakage.  We're
    2703                 :      * leaking all the strings too, but those are in palloc'd memory that will
    2704                 :      * get cleaned up eventually.
    2705                 :      */
    2706 GNC          12 :     PQclear(res);
    2707 ECB             : 
    2708                 :     /*
    2709 EUB             :      * Format the basic errcontext string.  Below, we'll add on something
    2710                 :      * about the connection name.  That's a violation of the translatability
    2711                 :      * guidelines about constructing error messages out of parts, but since
    2712                 :      * there's no translation support for dblink, there's no need to worry
    2713                 :      * about that (yet).
    2714                 :      */
    2715 GIC          12 :     va_start(ap, fmt);
    2716 CBC          12 :     vsnprintf(dblink_context_msg, sizeof(dblink_context_msg), fmt, ap);
    2717              12 :     va_end(ap);
    2718 ECB             : 
    2719 GIC          12 :     ereport(level,
    2720 ECB             :             (errcode(sqlstate),
    2721                 :              (message_primary != NULL && message_primary[0] != '\0') ?
    2722                 :              errmsg_internal("%s", message_primary) :
    2723                 :              errmsg("could not obtain message string for remote error"),
    2724                 :              message_detail ? errdetail_internal("%s", message_detail) : 0,
    2725                 :              message_hint ? errhint("%s", message_hint) : 0,
    2726                 :              message_context ? (errcontext("%s", message_context)) : 0,
    2727                 :              conname ?
    2728                 :              (errcontext("%s on dblink connection named \"%s\"",
    2729                 :                          dblink_context_msg, conname)) :
    2730                 :              (errcontext("%s on unnamed dblink connection",
    2731                 :                          dblink_context_msg))));
    2732 GBC           9 : }
    2733                 : 
    2734 ECB             : /*
    2735                 :  * Obtain connection string for a foreign server
    2736 EUB             :  */
    2737                 : static char *
    2738 GBC          22 : get_connect_string(const char *servername)
    2739 EUB             : {
    2740 GBC          22 :     ForeignServer *foreign_server = NULL;
    2741                 :     UserMapping *user_mapping;
    2742                 :     ListCell   *cell;
    2743 ECB             :     StringInfoData buf;
    2744                 :     ForeignDataWrapper *fdw;
    2745                 :     AclResult   aclresult;
    2746                 :     char       *srvname;
    2747                 : 
    2748                 :     static const PQconninfoOption *options = NULL;
    2749                 : 
    2750 GIC          22 :     initStringInfo(&buf);
    2751                 : 
    2752 ECB             :     /*
    2753                 :      * Get list of valid libpq options.
    2754                 :      *
    2755                 :      * To avoid unnecessary work, we get the list once and use it throughout
    2756                 :      * the lifetime of this backend process.  We don't need to care about
    2757                 :      * memory context issues, because PQconndefaults allocates with malloc.
    2758                 :      */
    2759 CBC          22 :     if (!options)
    2760                 :     {
    2761 GIC           3 :         options = PQconndefaults();
    2762 CBC           3 :         if (!options)           /* assume reason for failure is OOM */
    2763 UIC           0 :             ereport(ERROR,
    2764                 :                     (errcode(ERRCODE_FDW_OUT_OF_MEMORY),
    2765 ECB             :                      errmsg("out of memory"),
    2766                 :                      errdetail("Could not get libpq's default connection options.")));
    2767                 :     }
    2768                 : 
    2769                 :     /* first gather the server connstr options */
    2770 GIC          22 :     srvname = pstrdup(servername);
    2771              22 :     truncate_identifier(srvname, strlen(srvname), false);
    2772              22 :     foreign_server = GetForeignServerByName(srvname, true);
    2773                 : 
    2774 CBC          22 :     if (foreign_server)
    2775                 :     {
    2776 GIC           2 :         Oid         serverid = foreign_server->serverid;
    2777               2 :         Oid         fdwid = foreign_server->fdwid;
    2778               2 :         Oid         userid = GetUserId();
    2779 ECB             : 
    2780 GIC           2 :         user_mapping = GetUserMapping(userid, serverid);
    2781 CBC           2 :         fdw = GetForeignDataWrapper(fdwid);
    2782                 : 
    2783 ECB             :         /* Check permissions, user must have usage on the server. */
    2784 GNC           2 :         aclresult = object_aclcheck(ForeignServerRelationId, serverid, userid, ACL_USAGE);
    2785 CBC           2 :         if (aclresult != ACLCHECK_OK)
    2786 UIC           0 :             aclcheck_error(aclresult, OBJECT_FOREIGN_SERVER, foreign_server->servername);
    2787                 : 
    2788 CBC           2 :         foreach(cell, fdw->options)
    2789                 :         {
    2790 UIC           0 :             DefElem    *def = lfirst(cell);
    2791                 : 
    2792               0 :             if (is_valid_dblink_option(options, def->defname, ForeignDataWrapperRelationId))
    2793               0 :                 appendStringInfo(&buf, "%s='%s' ", def->defname,
    2794               0 :                                  escape_param_str(strVal(def->arg)));
    2795                 :         }
    2796                 : 
    2797 GIC           6 :         foreach(cell, foreign_server->options)
    2798                 :         {
    2799               4 :             DefElem    *def = lfirst(cell);
    2800                 : 
    2801               4 :             if (is_valid_dblink_option(options, def->defname, ForeignServerRelationId))
    2802               4 :                 appendStringInfo(&buf, "%s='%s' ", def->defname,
    2803               4 :                                  escape_param_str(strVal(def->arg)));
    2804                 :         }
    2805                 : 
    2806               4 :         foreach(cell, user_mapping->options)
    2807 ECB             :         {
    2808                 : 
    2809 GIC           2 :             DefElem    *def = lfirst(cell);
    2810                 : 
    2811 CBC           2 :             if (is_valid_dblink_option(options, def->defname, UserMappingRelationId))
    2812               2 :                 appendStringInfo(&buf, "%s='%s' ", def->defname,
    2813 GIC           2 :                                  escape_param_str(strVal(def->arg)));
    2814                 :         }
    2815                 : 
    2816 CBC           2 :         return buf.data;
    2817                 :     }
    2818                 :     else
    2819              20 :         return NULL;
    2820 EUB             : }
    2821                 : 
    2822                 : /*
    2823                 :  * Escaping libpq connect parameter strings.
    2824                 :  *
    2825 ECB             :  * Replaces "'" with "\'" and "\" with "\\".
    2826                 :  */
    2827                 : static char *
    2828 GIC           6 : escape_param_str(const char *str)
    2829 ECB             : {
    2830                 :     const char *cp;
    2831                 :     StringInfoData buf;
    2832                 : 
    2833 GIC           6 :     initStringInfo(&buf);
    2834                 : 
    2835              64 :     for (cp = str; *cp; cp++)
    2836 ECB             :     {
    2837 CBC          58 :         if (*cp == '\\' || *cp == '\'')
    2838 UIC           0 :             appendStringInfoChar(&buf, '\\');
    2839 GIC          58 :         appendStringInfoChar(&buf, *cp);
    2840                 :     }
    2841                 : 
    2842 CBC           6 :     return buf.data;
    2843 ECB             : }
    2844                 : 
    2845                 : /*
    2846                 :  * Validate the PK-attnums argument for dblink_build_sql_insert() and related
    2847                 :  * functions, and translate to the internal representation.
    2848                 :  *
    2849                 :  * The user supplies an int2vector of 1-based logical attnums, plus a count
    2850                 :  * argument (the need for the separate count argument is historical, but we
    2851                 :  * still check it).  We check that each attnum corresponds to a valid,
    2852                 :  * non-dropped attribute of the rel.  We do *not* prevent attnums from being
    2853                 :  * listed twice, though the actual use-case for such things is dubious.
    2854                 :  * Note that before Postgres 9.0, the user's attnums were interpreted as
    2855                 :  * physical not logical column numbers; this was changed for future-proofing.
    2856 EUB             :  *
    2857                 :  * The internal representation is a palloc'd int array of 0-based physical
    2858                 :  * attnums.
    2859                 :  */
    2860 ECB             : static void
    2861 GIC          18 : validate_pkattnums(Relation rel,
    2862                 :                    int2vector *pkattnums_arg, int32 pknumatts_arg,
    2863                 :                    int **pkattnums, int *pknumatts)
    2864                 : {
    2865              18 :     TupleDesc   tupdesc = rel->rd_att;
    2866              18 :     int         natts = tupdesc->natts;
    2867                 :     int         i;
    2868                 : 
    2869                 :     /* Don't take more array elements than there are */
    2870              18 :     pknumatts_arg = Min(pknumatts_arg, pkattnums_arg->dim1);
    2871                 : 
    2872                 :     /* Must have at least one pk attnum selected */
    2873              18 :     if (pknumatts_arg <= 0)
    2874 UIC           0 :         ereport(ERROR,
    2875                 :                 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
    2876                 :                  errmsg("number of key attributes must be > 0")));
    2877                 : 
    2878 ECB             :     /* Allocate output array */
    2879 GNC          18 :     *pkattnums = palloc_array(int, pknumatts_arg);
    2880 GIC          18 :     *pknumatts = pknumatts_arg;
    2881                 : 
    2882                 :     /* Validate attnums and convert to internal form */
    2883              57 :     for (i = 0; i < pknumatts_arg; i++)
    2884 ECB             :     {
    2885 GIC          45 :         int         pkattnum = pkattnums_arg->values[i];
    2886 ECB             :         int         lnum;
    2887                 :         int         j;
    2888                 : 
    2889                 :         /* Can throw error immediately if out of range */
    2890 CBC          45 :         if (pkattnum <= 0 || pkattnum > natts)
    2891 GIC           6 :             ereport(ERROR,
    2892                 :                     (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
    2893 ECB             :                      errmsg("invalid attribute number %d", pkattnum)));
    2894                 : 
    2895                 :         /* Identify which physical column has this logical number */
    2896 GIC          39 :         lnum = 0;
    2897 CBC          69 :         for (j = 0; j < natts; j++)
    2898 ECB             :         {
    2899                 :             /* dropped columns don't count */
    2900 GIC          69 :             if (TupleDescAttr(tupdesc, j)->attisdropped)
    2901               3 :                 continue;
    2902                 : 
    2903              66 :             if (++lnum == pkattnum)
    2904 CBC          39 :                 break;
    2905                 :         }
    2906 ECB             : 
    2907 CBC          39 :         if (j < natts)
    2908 GIC          39 :             (*pkattnums)[i] = j;
    2909                 :         else
    2910 UIC           0 :             ereport(ERROR,
    2911 ECB             :                     (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
    2912                 :                      errmsg("invalid attribute number %d", pkattnum)));
    2913                 :     }
    2914 GIC          12 : }
    2915 ECB             : 
    2916                 : /*
    2917                 :  * Check if the specified connection option is valid.
    2918                 :  *
    2919                 :  * We basically allow whatever libpq thinks is an option, with these
    2920                 :  * restrictions:
    2921                 :  *      debug options: disallowed
    2922                 :  *      "client_encoding": disallowed
    2923                 :  *      "user": valid only in USER MAPPING options
    2924                 :  *      secure options (eg password): valid only in USER MAPPING options
    2925                 :  *      others: valid only in FOREIGN SERVER options
    2926                 :  *
    2927                 :  * We disallow client_encoding because it would be overridden anyway via
    2928                 :  * PQclientEncoding; allowing it to be specified would merely promote
    2929                 :  * confusion.
    2930                 :  */
    2931                 : static bool
    2932 GIC          89 : is_valid_dblink_option(const PQconninfoOption *options, const char *option,
    2933                 :                        Oid context)
    2934                 : {
    2935                 :     const PQconninfoOption *opt;
    2936 ECB             : 
    2937                 :     /* Look up the option in libpq result */
    2938 GIC        1697 :     for (opt = options; opt->keyword; opt++)
    2939 ECB             :     {
    2940 GIC        1695 :         if (strcmp(opt->keyword, option) == 0)
    2941 CBC          87 :             break;
    2942 ECB             :     }
    2943 GIC          89 :     if (opt->keyword == NULL)
    2944               2 :         return false;
    2945                 : 
    2946                 :     /* Disallow debug options (particularly "replication") */
    2947              87 :     if (strchr(opt->dispchar, 'D'))
    2948               2 :         return false;
    2949                 : 
    2950                 :     /* Disallow "client_encoding" */
    2951 CBC          85 :     if (strcmp(opt->keyword, "client_encoding") == 0)
    2952 GBC           2 :         return false;
    2953                 : 
    2954                 :     /*
    2955                 :      * If the option is "user" or marked secure, it should be specified only
    2956                 :      * in USER MAPPING.  Others should be specified only in SERVER.
    2957                 :      */
    2958 CBC          83 :     if (strcmp(opt->keyword, "user") == 0 || strchr(opt->dispchar, '*'))
    2959 ECB             :     {
    2960 GIC           9 :         if (context != UserMappingRelationId)
    2961 CBC           3 :             return false;
    2962 ECB             :     }
    2963                 :     else
    2964                 :     {
    2965 CBC          74 :         if (context != ForeignServerRelationId)
    2966              68 :             return false;
    2967                 :     }
    2968                 : 
    2969              12 :     return true;
    2970                 : }
    2971                 : 
    2972                 : /*
    2973                 :  * Copy the remote session's values of GUCs that affect datatype I/O
    2974 ECB             :  * and apply them locally in a new GUC nesting level.  Returns the new
    2975                 :  * nestlevel (which is needed by restoreLocalGucs to undo the settings),
    2976                 :  * or -1 if no new nestlevel was needed.
    2977                 :  *
    2978                 :  * We use the equivalent of a function SET option to allow the settings to
    2979                 :  * persist only until the caller calls restoreLocalGucs.  If an error is
    2980                 :  * thrown in between, guc.c will take care of undoing the settings.
    2981                 :  */
    2982                 : static int
    2983 GIC          32 : applyRemoteGucs(PGconn *conn)
    2984 ECB             : {
    2985                 :     static const char *const GUCsAffectingIO[] = {
    2986                 :         "DateStyle",
    2987                 :         "IntervalStyle"
    2988                 :     };
    2989                 : 
    2990 GIC          32 :     int         nestlevel = -1;
    2991                 :     int         i;
    2992                 : 
    2993              96 :     for (i = 0; i < lengthof(GUCsAffectingIO); i++)
    2994                 :     {
    2995              64 :         const char *gucName = GUCsAffectingIO[i];
    2996              64 :         const char *remoteVal = PQparameterStatus(conn, gucName);
    2997                 :         const char *localVal;
    2998                 : 
    2999                 :         /*
    3000                 :          * If the remote server is pre-8.4, it won't have IntervalStyle, but
    3001                 :          * that's okay because its output format won't be ambiguous.  So just
    3002                 :          * skip the GUC if we don't get a value for it.  (We might eventually
    3003                 :          * need more complicated logic with remote-version checks here.)
    3004                 :          */
    3005              64 :         if (remoteVal == NULL)
    3006 UIC           0 :             continue;
    3007                 : 
    3008                 :         /*
    3009                 :          * Avoid GUC-setting overhead if the remote and local GUCs already
    3010                 :          * have the same value.
    3011                 :          */
    3012 GIC          64 :         localVal = GetConfigOption(gucName, false, false);
    3013              64 :         Assert(localVal != NULL);
    3014                 : 
    3015              64 :         if (strcmp(remoteVal, localVal) == 0)
    3016              47 :             continue;
    3017                 : 
    3018                 :         /* Create new GUC nest level if we didn't already */
    3019              17 :         if (nestlevel < 0)
    3020               9 :             nestlevel = NewGUCNestLevel();
    3021                 : 
    3022                 :         /* Apply the option (this will throw error on failure) */
    3023              17 :         (void) set_config_option(gucName, remoteVal,
    3024                 :                                  PGC_USERSET, PGC_S_SESSION,
    3025                 :                                  GUC_ACTION_SAVE, true, 0, false);
    3026                 :     }
    3027                 : 
    3028              32 :     return nestlevel;
    3029                 : }
    3030                 : 
    3031                 : /*
    3032                 :  * Restore local GUCs after they have been overlaid with remote settings.
    3033                 :  */
    3034                 : static void
    3035              33 : restoreLocalGucs(int nestlevel)
    3036                 : {
    3037                 :     /* Do nothing if no new nestlevel was created */
    3038              33 :     if (nestlevel > 0)
    3039               8 :         AtEOXact_GUC(true, nestlevel);
    3040              33 : }
        

Generated by: LCOV version v1.16-55-g56c0a2a