LCOV - differential code coverage report
Current view: top level - contrib/postgres_fdw - connection.c (source / functions) Coverage Total Hit UNC LBC UIC UBC GBC GIC GNC CBC EUB ECB DUB DCB
Current: Differential Code Coverage HEAD vs 15 Lines: 83.2 % 619 515 40 17 44 3 14 303 84 114 83 368 4 16
Current Date: 2023-04-08 17:13:01 Functions: 92.7 % 41 38 2 1 34 4 3 38
Baseline: 15 Line coverage date bins:
Baseline Date: 2023-04-08 15:09:40 [..60] days: 67.2 % 122 82 40 2 79 1 3
Legend: Lines: hit not hit (60,120] days: 100.0 % 3 3 3
(120,180] days: 100.0 % 1 1 1
(240..) days: 87.0 % 493 429 17 44 3 14 300 2 113 70 294
Function coverage date bins:
[..60] days: 66.7 % 6 4 2 4
(240..) days: 47.2 % 72 34 1 34 3 34

 Age         Owner                  TLA  Line data    Source code
                                  1                 : /*-------------------------------------------------------------------------
                                  2                 :  *
                                  3                 :  * connection.c
                                  4                 :  *        Connection management functions for postgres_fdw
                                  5                 :  *
                                  6                 :  * Portions Copyright (c) 2012-2023, PostgreSQL Global Development Group
                                  7                 :  *
                                  8                 :  * IDENTIFICATION
                                  9                 :  *        contrib/postgres_fdw/connection.c
                                 10                 :  *
                                 11                 :  *-------------------------------------------------------------------------
                                 12                 :  */
                                 13                 : #include "postgres.h"
                                 14                 : 
                                 15                 : #include "access/htup_details.h"
                                 16                 : #include "access/xact.h"
                                 17                 : #include "catalog/pg_user_mapping.h"
                                 18                 : #include "commands/defrem.h"
                                 19                 : #include "funcapi.h"
                                 20                 : #include "libpq/libpq-be-fe-helpers.h"
                                 21                 : #include "mb/pg_wchar.h"
                                 22                 : #include "miscadmin.h"
                                 23                 : #include "pgstat.h"
                                 24                 : #include "postgres_fdw.h"
                                 25                 : #include "storage/fd.h"
                                 26                 : #include "storage/latch.h"
                                 27                 : #include "utils/builtins.h"
                                 28                 : #include "utils/datetime.h"
                                 29                 : #include "utils/hsearch.h"
                                 30                 : #include "utils/inval.h"
                                 31                 : #include "utils/memutils.h"
                                 32                 : #include "utils/syscache.h"
                                 33                 : 
                                 34                 : /*
                                 35                 :  * Connection cache hash table entry
                                 36                 :  *
                                 37                 :  * The lookup key in this hash table is the user mapping OID. We use just one
                                 38                 :  * connection per user mapping ID, which ensures that all the scans use the
                                 39                 :  * same snapshot during a query.  Using the user mapping OID rather than
                                 40                 :  * the foreign server OID + user OID avoids creating multiple connections when
                                 41                 :  * the public user mapping applies to all user OIDs.
                                 42                 :  *
                                 43                 :  * The "conn" pointer can be NULL if we don't currently have a live connection.
                                 44                 :  * When we do have a connection, xact_depth tracks the current depth of
                                 45                 :  * transactions and subtransactions open on the remote side.  We need to issue
                                 46                 :  * commands at the same nesting depth on the remote as we're executing at
                                 47                 :  * ourselves, so that rolling back a subtransaction will kill the right
                                 48                 :  * queries and not the wrong ones.
                                 49                 :  */
                                 50                 : typedef Oid ConnCacheKey;
                                 51                 : 
                                 52                 : typedef struct ConnCacheEntry
                                 53                 : {
                                 54                 :     ConnCacheKey key;           /* hash key (must be first) */
                                 55                 :     PGconn     *conn;           /* connection to foreign server, or NULL */
                                 56                 :     /* Remaining fields are invalid when conn is NULL: */
                                 57                 :     int         xact_depth;     /* 0 = no xact open, 1 = main xact open, 2 =
                                 58                 :                                  * one level of subxact open, etc */
                                 59                 :     bool        have_prep_stmt; /* have we prepared any stmts in this xact? */
                                 60                 :     bool        have_error;     /* have any subxacts aborted in this xact? */
                                 61                 :     bool        changing_xact_state;    /* xact state change in process */
                                 62                 :     bool        parallel_commit;    /* do we commit (sub)xacts in parallel? */
                                 63                 :     bool        parallel_abort; /* do we abort (sub)xacts in parallel? */
                                 64                 :     bool        invalidated;    /* true if reconnect is pending */
                                 65                 :     bool        keep_connections;   /* setting value of keep_connections
                                 66                 :                                      * server option */
                                 67                 :     Oid         serverid;       /* foreign server OID used to get server name */
                                 68                 :     uint32      server_hashvalue;   /* hash value of foreign server OID */
                                 69                 :     uint32      mapping_hashvalue;  /* hash value of user mapping OID */
                                 70                 :     PgFdwConnState state;       /* extra per-connection state */
                                 71                 : } ConnCacheEntry;
                                 72                 : 
                                 73                 : /*
                                 74                 :  * Connection cache (initialized on first use)
                                 75                 :  */
                                 76                 : static HTAB *ConnectionHash = NULL;
                                 77                 : 
                                 78                 : /* for assigning cursor numbers and prepared statement numbers */
                                 79                 : static unsigned int cursor_number = 0;
                                 80                 : static unsigned int prep_stmt_number = 0;
                                 81                 : 
                                 82                 : /* tracks whether any work is needed in callback functions */
                                 83                 : static bool xact_got_connection = false;
                                 84                 : 
                                 85                 : /*
                                 86                 :  * Milliseconds to wait to cancel an in-progress query or execute a cleanup
                                 87                 :  * query; if it takes longer than 30 seconds to do these, we assume the
                                 88                 :  * connection is dead.
                                 89                 :  */
                                 90                 : #define CONNECTION_CLEANUP_TIMEOUT  30000
                                 91                 : 
                                 92                 : /* Macro for constructing abort command to be sent */
                                 93                 : #define CONSTRUCT_ABORT_COMMAND(sql, entry, toplevel) \
                                 94                 :     do { \
                                 95                 :         if (toplevel) \
                                 96                 :             snprintf((sql), sizeof(sql), \
                                 97                 :                      "ABORT TRANSACTION"); \
                                 98                 :         else \
                                 99                 :             snprintf((sql), sizeof(sql), \
                                100                 :                      "ROLLBACK TO SAVEPOINT s%d; RELEASE SAVEPOINT s%d", \
                                101                 :                      (entry)->xact_depth, (entry)->xact_depth); \
                                102                 :     } while(0)
                                103                 : 
                                104                 : /*
                                105                 :  * SQL functions
                                106                 :  */
  811 fujii                     107 GIC           2 : PG_FUNCTION_INFO_V1(postgres_fdw_get_connections);
  803                           108               2 : PG_FUNCTION_INFO_V1(postgres_fdw_disconnect);
                                109               2 : PG_FUNCTION_INFO_V1(postgres_fdw_disconnect_all);
                                110                 : 
                                111                 : /* prototypes of private functions */
                                112                 : static void make_new_connection(ConnCacheEntry *entry, UserMapping *user);
                                113                 : static PGconn *connect_pg_server(ForeignServer *server, UserMapping *user);
                                114                 : static void disconnect_pg_server(ConnCacheEntry *entry);
                                115                 : static void check_conn_params(const char **keywords, const char **values, UserMapping *user);
                                116                 : static void configure_remote_session(PGconn *conn);
                                117                 : static void do_sql_command_begin(PGconn *conn, const char *sql);
                                118                 : static void do_sql_command_end(PGconn *conn, const char *sql,
                                119                 :                                bool consume_input);
                                120                 : static void begin_remote_xact(ConnCacheEntry *entry);
                                121                 : static void pgfdw_xact_callback(XactEvent event, void *arg);
                                122                 : static void pgfdw_subxact_callback(SubXactEvent event,
                                123                 :                                    SubTransactionId mySubid,
                                124                 :                                    SubTransactionId parentSubid,
                                125                 :                                    void *arg);
                                126                 : static void pgfdw_inval_callback(Datum arg, int cacheid, uint32 hashvalue);
                                127                 : static void pgfdw_reject_incomplete_xact_state_change(ConnCacheEntry *entry);
  409 efujita                   128 ECB             : static void pgfdw_reset_xact_state(ConnCacheEntry *entry, bool toplevel);
 2132 rhaas                     129                 : static bool pgfdw_cancel_query(PGconn *conn);
                                130                 : static bool pgfdw_cancel_query_begin(PGconn *conn);
                                131                 : static bool pgfdw_cancel_query_end(PGconn *conn, TimestampTz endtime,
                                132                 :                                    bool consume_input);
                                133                 : static bool pgfdw_exec_cleanup_query(PGconn *conn, const char *query,
                                134                 :                                      bool ignore_errors);
                                135                 : static bool pgfdw_exec_cleanup_query_begin(PGconn *conn, const char *query);
                                136                 : static bool pgfdw_exec_cleanup_query_end(PGconn *conn, const char *query,
                                137                 :                                          TimestampTz endtime,
                                138                 :                                          bool consume_input,
                                139                 :                                          bool ignore_errors);
                                140                 : static bool pgfdw_get_cleanup_result(PGconn *conn, TimestampTz endtime,
                                141                 :                                      PGresult **result, bool *timed_out);
                                142                 : static void pgfdw_abort_cleanup(ConnCacheEntry *entry, bool toplevel);
                                143                 : static bool pgfdw_abort_cleanup_begin(ConnCacheEntry *entry, bool toplevel,
                                144                 :                                       List **pending_entries,
                                145                 :                                       List **cancel_requested);
                                146                 : static void pgfdw_finish_pre_commit_cleanup(List *pending_entries);
                                147                 : static void pgfdw_finish_pre_subcommit_cleanup(List *pending_entries,
                                148                 :                                                int curlevel);
                                149                 : static void pgfdw_finish_abort_cleanup(List *pending_entries,
                                150                 :                                        List *cancel_requested,
                                151                 :                                        bool toplevel);
                                152                 : static bool UserMappingPasswordRequired(UserMapping *user);
                                153                 : static bool disconnect_cached_connections(Oid serverid);
                                154                 : 
                                155                 : /*
                                156                 :  * Get a PGconn which can be used to execute queries on the remote PostgreSQL
                                157                 :  * server with the user's authorization.  A new connection is established
                                158                 :  * if we don't already have a suitable one, and a transaction is opened at
                                159                 :  * the right subtransaction nesting depth if we didn't do that already.
                                160                 :  *
                                161                 :  * will_prep_stmt must be true if caller intends to create any prepared
                                162                 :  * statements.  Since those don't go away automatically at transaction end
                                163                 :  * (not even on error), we need this flag to cue manual cleanup.
                                164                 :  *
                                165                 :  * If state is not NULL, *state receives the per-connection state associated
                                166                 :  * with the PGconn.
                                167                 :  */
                                168                 : PGconn *
  739 efujita                   169 GIC        1784 : GetConnection(UserMapping *user, bool will_prep_stmt, PgFdwConnState **state)
                                170                 : {
                                171                 :     bool        found;
  905 fujii                     172            1784 :     bool        retry = false;
                                173                 :     ConnCacheEntry *entry;
                                174                 :     ConnCacheKey key;
                                175            1784 :     MemoryContext ccxt = CurrentMemoryContext;
                                176                 : 
                                177                 :     /* First time through, initialize connection cache hashtable */
 3699 tgl                       178            1784 :     if (ConnectionHash == NULL)
                                179                 :     {
                                180                 :         HASHCTL     ctl;
                                181                 : 
                                182               4 :         ctl.keysize = sizeof(ConnCacheKey);
                                183               4 :         ctl.entrysize = sizeof(ConnCacheEntry);
                                184               4 :         ConnectionHash = hash_create("postgres_fdw connections", 8,
                                185                 :                                      &ctl,
                                186                 :                                      HASH_ELEM | HASH_BLOBS);
                                187                 : 
                                188                 :         /*
                                189                 :          * Register some callback functions that manage connection cleanup.
                                190                 :          * This should be done just once in each backend.
                                191                 :          */
                                192               4 :         RegisterXactCallback(pgfdw_xact_callback, NULL);
                                193               4 :         RegisterSubXactCallback(pgfdw_subxact_callback, NULL);
 2088                           194               4 :         CacheRegisterSyscacheCallback(FOREIGNSERVEROID,
                                195                 :                                       pgfdw_inval_callback, (Datum) 0);
                                196               4 :         CacheRegisterSyscacheCallback(USERMAPPINGOID,
                                197                 :                                       pgfdw_inval_callback, (Datum) 0);
                                198                 :     }
                                199                 : 
                                200                 :     /* Set flag that we did GetConnection during the current transaction */
 3699                           201            1784 :     xact_got_connection = true;
                                202                 : 
                                203                 :     /* Create hash key for the entry.  Assume no pad bytes in key struct */
 2628 rhaas                     204 CBC        1784 :     key = user->umid;
                                205                 : 
                                206                 :     /*
 3699 tgl                       207 ECB             :      * Find or create cached entry for requested connection.
                                208                 :      */
 3699 tgl                       209 GIC        1784 :     entry = hash_search(ConnectionHash, &key, HASH_ENTER, &found);
 3699 tgl                       210 CBC        1784 :     if (!found)
                                211                 :     {
                                212                 :         /*
 2088 tgl                       213 ECB             :          * We need only clear "conn" here; remaining fields will be filled
                                214                 :          * later when "conn" is set.
                                215                 :          */
 3699 tgl                       216 GIC          11 :         entry->conn = NULL;
 3699 tgl                       217 ECB             :     }
                                218                 : 
 2132 rhaas                     219                 :     /* Reject further use of connections which failed abort cleanup. */
 2132 rhaas                     220 GIC        1784 :     pgfdw_reject_incomplete_xact_state_change(entry);
                                221                 : 
                                222                 :     /*
                                223                 :      * If the connection needs to be remade due to invalidation, disconnect as
                                224                 :      * soon as we're out of all transactions.
                                225                 :      */
  905 fujii                     226            1784 :     if (entry->conn != NULL && entry->invalidated && entry->xact_depth == 0)
 2088 tgl                       227 ECB             :     {
  905 fujii                     228 LBC           0 :         elog(DEBUG3, "closing connection %p for option changes to take effect",
  905 fujii                     229 ECB             :              entry->conn);
 2088 tgl                       230 UIC           0 :         disconnect_pg_server(entry);
 2088 tgl                       231 ECB             :     }
                                232                 : 
                                233                 :     /*
                                234                 :      * If cache entry doesn't have a connection, we have to establish a new
                                235                 :      * connection.  (If connect_pg_server throws an error, the cache entry
                                236                 :      * will remain in a valid empty state, ie conn == NULL.)
                                237                 :      */
 3699 tgl                       238 GIC        1784 :     if (entry->conn == NULL)
  905 fujii                     239 CBC          61 :         make_new_connection(entry, user);
                                240                 : 
                                241                 :     /*
                                242                 :      * We check the health of the cached connection here when using it.  In
                                243                 :      * cases where we're out of all transactions, if a broken connection is
  549 efujita                   244 ECB             :      * detected, we try to reestablish a new connection later.
 3699 tgl                       245                 :      */
  915 fujii                     246 GIC        1778 :     PG_TRY();
                                247                 :     {
                                248                 :         /* Process a pending asynchronous request if any. */
  739 efujita                   249            1778 :         if (entry->state.pendingAreq)
  739 efujita                   250 UIC           0 :             process_pending_request(entry->state.pendingAreq);
  915 fujii                     251 ECB             :         /* Start a new transaction or subtransaction if needed. */
  915 fujii                     252 GIC        1778 :         begin_remote_xact(entry);
                                253                 :     }
                                254               2 :     PG_CATCH();
  915 fujii                     255 ECB             :     {
  905 fujii                     256 GIC           2 :         MemoryContext ecxt = MemoryContextSwitchTo(ccxt);
                                257               2 :         ErrorData  *errdata = CopyErrorData();
                                258                 : 
                                259                 :         /*
                                260                 :          * Determine whether to try to reestablish the connection.
  905 fujii                     261 ECB             :          *
                                262                 :          * After a broken connection is detected in libpq, any error other
  905 fujii                     263 EUB             :          * than connection failure (e.g., out-of-memory) can be thrown
                                264                 :          * somewhere between return from libpq and the expected ereport() call
                                265                 :          * in pgfdw_report_error(). In this case, since PQstatus() indicates
                                266                 :          * CONNECTION_BAD, checking only PQstatus() causes the false detection
                                267                 :          * of connection failure. To avoid this, we also verify that the
                                268                 :          * error's sqlstate is ERRCODE_CONNECTION_FAILURE. Note that also
                                269                 :          * checking only the sqlstate can cause another false detection
                                270                 :          * because pgfdw_report_error() may report ERRCODE_CONNECTION_FAILURE
                                271                 :          * for any libpq-originated error condition.
                                272                 :          */
  905 fujii                     273 CBC           2 :         if (errdata->sqlerrcode != ERRCODE_CONNECTION_FAILURE ||
                                274               2 :             PQstatus(entry->conn) != CONNECTION_BAD ||
  905 fujii                     275 GIC           2 :             entry->xact_depth > 0)
                                276                 :         {
                                277               1 :             MemoryContextSwitchTo(ecxt);
  915                           278               1 :             PG_RE_THROW();
                                279                 :         }
                                280                 : 
  905 fujii                     281 ECB             :         /* Clean up the error state */
  905 fujii                     282 GIC           1 :         FlushErrorState();
                                283               1 :         FreeErrorData(errdata);
  905 fujii                     284 CBC           1 :         errdata = NULL;
  905 fujii                     285 EUB             : 
  905 fujii                     286 GIC           1 :         retry = true;
  915 fujii                     287 ECB             :     }
  915 fujii                     288 GIC        1777 :     PG_END_TRY();
  915 fujii                     289 ECB             : 
                                290                 :     /*
  905                           291                 :      * If a broken connection is detected, disconnect it, reestablish a new
                                292                 :      * connection and retry a new remote transaction. If connection failure is
                                293                 :      * reported again, we give up getting a connection.
                                294                 :      */
  905 fujii                     295 GIC        1777 :     if (retry)
                                296                 :     {
                                297               1 :         Assert(entry->xact_depth == 0);
                                298                 : 
  915                           299               1 :         ereport(DEBUG3,
                                300                 :                 (errmsg_internal("could not start remote transaction on connection %p",
                                301                 :                                  entry->conn)),
                                302                 :                 errdetail_internal("%s", pchomp(PQerrorMessage(entry->conn))));
                                303                 : 
  905                           304               1 :         elog(DEBUG3, "closing connection %p to reestablish a new one",
                                305                 :              entry->conn);
                                306               1 :         disconnect_pg_server(entry);
                                307                 : 
   23 efujita                   308 GNC           1 :         make_new_connection(entry, user);
  905 fujii                     309 ECB             : 
  905 fujii                     310 GIC           1 :         begin_remote_xact(entry);
  915 fujii                     311 ECB             :     }
 3699 tgl                       312                 : 
                                313                 :     /* Remember if caller will prepare statements */
 3682 tgl                       314 GIC        1777 :     entry->have_prep_stmt |= will_prep_stmt;
                                315                 : 
  739 efujita                   316 ECB             :     /* If caller needs access to the per-connection state, return it. */
  739 efujita                   317 CBC        1777 :     if (state)
                                318             704 :         *state = &entry->state;
                                319                 : 
 3699 tgl                       320            1777 :     return entry->conn;
                                321                 : }
 3699 tgl                       322 ECB             : 
                                323                 : /*
                                324                 :  * Reset all transient state fields in the cached connection entry and
                                325                 :  * establish new connection to the remote server.
                                326                 :  */
                                327                 : static void
  905 fujii                     328 GIC          62 : make_new_connection(ConnCacheEntry *entry, UserMapping *user)
  905 fujii                     329 ECB             : {
  905 fujii                     330 GIC          62 :     ForeignServer *server = GetForeignServer(user->serverid);
  737 fujii                     331 ECB             :     ListCell   *lc;
                                332                 : 
  905 fujii                     333 CBC          62 :     Assert(entry->conn == NULL);
                                334                 : 
                                335                 :     /* Reset all transient state fields, to be sure all are clean */
  905 fujii                     336 GIC          62 :     entry->xact_depth = 0;
                                337              62 :     entry->have_prep_stmt = false;
  905 fujii                     338 CBC          62 :     entry->have_error = false;
  905 fujii                     339 GIC          62 :     entry->changing_xact_state = false;
  905 fujii                     340 CBC          62 :     entry->invalidated = false;
  814 fujii                     341 GIC          62 :     entry->serverid = server->serverid;
  905 fujii                     342 CBC          62 :     entry->server_hashvalue =
  905 fujii                     343 GIC          62 :         GetSysCacheHashValue1(FOREIGNSERVEROID,
  905 fujii                     344 ECB             :                               ObjectIdGetDatum(server->serverid));
  905 fujii                     345 GIC          62 :     entry->mapping_hashvalue =
                                346              62 :         GetSysCacheHashValue1(USERMAPPINGOID,
                                347                 :                               ObjectIdGetDatum(user->umid));
  739 efujita                   348 CBC          62 :     memset(&entry->state, 0, sizeof(entry->state));
                                349                 : 
                                350                 :     /*
  737 fujii                     351 ECB             :      * Determine whether to keep the connection that we're about to make here
                                352                 :      * open even after the transaction using it ends, so that the subsequent
                                353                 :      * transactions can re-use it.
                                354                 :      *
                                355                 :      * By default, all the connections to any foreign servers are kept open.
                                356                 :      *
                                357                 :      * Also determine whether to commit/abort (sub)transactions opened on the
                                358                 :      * remote server in parallel at (sub)transaction end, which is disabled by
                                359                 :      * default.
                                360                 :      *
                                361                 :      * Note: it's enough to determine these only when making a new connection
  276 efujita                   362                 :      * because if these settings for it are changed, it will be closed and
                                363                 :      * re-made later.
  737 fujii                     364                 :      */
  737 fujii                     365 GIC          62 :     entry->keep_connections = true;
  409 efujita                   366              62 :     entry->parallel_commit = false;
    3 efujita                   367 GNC          62 :     entry->parallel_abort = false;
  737 fujii                     368 CBC         268 :     foreach(lc, server->options)
                                369                 :     {
  737 fujii                     370 GIC         206 :         DefElem    *def = (DefElem *) lfirst(lc);
  737 fujii                     371 ECB             : 
  737 fujii                     372 CBC         206 :         if (strcmp(def->defname, "keep_connections") == 0)
                                373               6 :             entry->keep_connections = defGetBoolean(def);
  409 efujita                   374             200 :         else if (strcmp(def->defname, "parallel_commit") == 0)
                                375               2 :             entry->parallel_commit = defGetBoolean(def);
    3 efujita                   376 GNC         198 :         else if (strcmp(def->defname, "parallel_abort") == 0)
                                377               2 :             entry->parallel_abort = defGetBoolean(def);
  737 fujii                     378 ECB             :     }
                                379                 : 
  905                           380                 :     /* Now try to make the connection */
  905 fujii                     381 GIC          62 :     entry->conn = connect_pg_server(server, user);
  905 fujii                     382 ECB             : 
  905 fujii                     383 CBC          56 :     elog(DEBUG3, "new postgres_fdw connection %p for server \"%s\" (user mapping oid %u, userid %u)",
                                384                 :          entry->conn, server->servername, user->umid, user->userid);
                                385              56 : }
                                386                 : 
                                387                 : /*
                                388                 :  * Connect to remote server using specified server and user mapping properties.
                                389                 :  */
                                390                 : static PGconn *
 3699 tgl                       391 GIC          62 : connect_pg_server(ForeignServer *server, UserMapping *user)
                                392                 : {
                                393              62 :     PGconn     *volatile conn = NULL;
                                394                 : 
                                395                 :     /*
                                396                 :      * Use PG_TRY block to ensure closing connection on error.
                                397                 :      */
                                398              62 :     PG_TRY();
                                399                 :     {
                                400                 :         const char **keywords;
                                401                 :         const char **values;
  471 fujii                     402 CBC          62 :         char       *appname = NULL;
 3699 tgl                       403 ECB             :         int         n;
                                404                 : 
                                405                 :         /*
                                406                 :          * Construct connection params from generic options of ForeignServer
                                407                 :          * and UserMapping.  (Some of them might not be libpq options, in
                                408                 :          * which case we'll just waste a few array slots.)  Add 4 extra slots
  579 fujii                     409                 :          * for application_name, fallback_application_name, client_encoding,
                                410                 :          * end marker.
 3699 tgl                       411                 :          */
  579 fujii                     412 CBC          62 :         n = list_length(server->options) + list_length(user->options) + 4;
 3699 tgl                       413              62 :         keywords = (const char **) palloc(n * sizeof(char *));
                                414              62 :         values = (const char **) palloc(n * sizeof(char *));
                                415                 : 
 3699 tgl                       416 GIC          62 :         n = 0;
                                417             124 :         n += ExtractConnectionOptions(server->options,
 3699 tgl                       418 CBC          62 :                                       keywords + n, values + n);
 3699 tgl                       419 GIC         124 :         n += ExtractConnectionOptions(user->options,
 3699 tgl                       420 CBC          62 :                                       keywords + n, values + n);
                                421                 : 
  579 fujii                     422 ECB             :         /*
                                423                 :          * Use pgfdw_application_name as application_name if set.
                                424                 :          *
                                425                 :          * PQconnectdbParams() processes the parameter arrays from start to
                                426                 :          * end. If any key word is repeated, the last value is used. Therefore
                                427                 :          * note that pgfdw_application_name must be added to the arrays after
                                428                 :          * options of ForeignServer are, so that it can override
                                429                 :          * application_name set in ForeignServer.
                                430                 :          */
  579 fujii                     431 GIC          62 :         if (pgfdw_application_name && *pgfdw_application_name != '\0')
                                432                 :         {
                                433               1 :             keywords[n] = "application_name";
                                434               1 :             values[n] = pgfdw_application_name;
  579 fujii                     435 CBC           1 :             n++;
                                436                 :         }
                                437                 : 
                                438                 :         /*
  471 fujii                     439 ECB             :          * Search the parameter arrays to find application_name setting, and
                                440                 :          * replace escape sequences in it with status information if found.
                                441                 :          * The arrays are searched backwards because the last value is used if
                                442                 :          * application_name is repeatedly set.
                                443                 :          */
  471 fujii                     444 GIC         158 :         for (int i = n - 1; i >= 0; i--)
                                445                 :         {
                                446             112 :             if (strcmp(keywords[i], "application_name") == 0 &&
                                447              16 :                 *(values[i]) != '\0')
                                448                 :             {
  471 fujii                     449 ECB             :                 /*
                                450                 :                  * Use this application_name setting if it's not empty string
                                451                 :                  * even after any escape sequences in it are replaced.
                                452                 :                  */
  471 fujii                     453 CBC          16 :                 appname = process_pgfdw_appname(values[i]);
                                454              16 :                 if (appname[0] != '\0')
  471 fujii                     455 ECB             :                 {
  471 fujii                     456 CBC          16 :                     values[i] = appname;
                                457              16 :                     break;
                                458                 :                 }
                                459                 : 
                                460                 :                 /*
                                461                 :                  * This empty application_name is not used, so we set
                                462                 :                  * values[i] to NULL and keep searching the array to find the
                                463                 :                  * next one.
                                464                 :                  */
  471 fujii                     465 UIC           0 :                 values[i] = NULL;
                                466               0 :                 pfree(appname);
                                467               0 :                 appname = NULL;
  471 fujii                     468 ECB             :             }
                                469                 :         }
                                470                 : 
  579                           471                 :         /* Use "postgres_fdw" as fallback_application_name */
 3699 tgl                       472 CBC          62 :         keywords[n] = "fallback_application_name";
 3699 tgl                       473 GIC          62 :         values[n] = "postgres_fdw";
                                474              62 :         n++;
                                475                 : 
                                476                 :         /* Set client_encoding so that libpq can convert encoding properly. */
                                477              62 :         keywords[n] = "client_encoding";
                                478              62 :         values[n] = GetDatabaseEncodingName();
                                479              62 :         n++;
                                480                 : 
 3699 tgl                       481 CBC          62 :         keywords[n] = values[n] = NULL;
                                482                 : 
 1140 tgl                       483 ECB             :         /* verify the set of connection parameters */
 1951 rhaas                     484 CBC          62 :         check_conn_params(keywords, values, user);
                                485                 : 
 1140 tgl                       486 ECB             :         /* OK to make connection */
   76 andres                    487 GNC          60 :         conn = libpqsrv_connect_params(keywords, values,
                                488                 :                                        false,   /* expand_dbname */
                                489                 :                                        PG_WAIT_EXTENSION);
 1140 tgl                       490 ECB             : 
 3699 tgl                       491 CBC          60 :         if (!conn || PQstatus(conn) != CONNECTION_OK)
 3699 tgl                       492 GIC           2 :             ereport(ERROR,
 2118 tgl                       493 ECB             :                     (errcode(ERRCODE_SQLCLIENT_UNABLE_TO_ESTABLISH_SQLCONNECTION),
                                494                 :                      errmsg("could not connect to server \"%s\"",
                                495                 :                             server->servername),
                                496                 :                      errdetail_internal("%s", pchomp(PQerrorMessage(conn)))));
                                497                 : 
                                498                 :         /*
    1 sfrost                    499                 :          * Check that non-superuser has used password to establish connection;
                                500                 :          * otherwise, he's piggybacking on the postgres server's user
                                501                 :          * identity. See also dblink_security_check() in contrib/dblink and
                                502                 :          * check_conn_params.
                                503                 :          */
    1 sfrost                    504 CBC          58 :         if (!superuser_arg(user->userid) && UserMappingPasswordRequired(user) &&
    1 sfrost                    505 GIC           2 :             !PQconnectionUsedPassword(conn))
                                506               2 :             ereport(ERROR,
                                507                 :                     (errcode(ERRCODE_S_R_E_PROHIBITED_SQL_STATEMENT_ATTEMPTED),
                                508                 :                      errmsg("password is required"),
                                509                 :                      errdetail("Non-superuser cannot connect if the server does not request a password."),
                                510                 :                      errhint("Target server's authentication method must be changed or password_required=false set in the user mapping attributes.")));
                                511                 : 
                                512                 :         /* Prepare new session for use */
 3698 tgl                       513              56 :         configure_remote_session(conn);
                                514                 : 
  471 fujii                     515              56 :         if (appname != NULL)
  471 fujii                     516 CBC          16 :             pfree(appname);
 3699 tgl                       517              56 :         pfree(keywords);
                                518              56 :         pfree(values);
                                519                 :     }
 3699 tgl                       520 GIC           6 :     PG_CATCH();
                                521                 :     {
   76 andres                    522 GNC           6 :         libpqsrv_disconnect(conn);
 3699 tgl                       523 CBC           6 :         PG_RE_THROW();
 3699 tgl                       524 ECB             :     }
 3699 tgl                       525 CBC          56 :     PG_END_TRY();
                                526                 : 
                                527              56 :     return conn;
                                528                 : }
 3699 tgl                       529 ECB             : 
 2088                           530                 : /*
                                531                 :  * Disconnect any open connection for a connection cache entry.
                                532                 :  */
                                533                 : static void
 2088 tgl                       534 CBC          53 : disconnect_pg_server(ConnCacheEntry *entry)
                                535                 : {
 2088 tgl                       536 GIC          53 :     if (entry->conn != NULL)
                                537                 :     {
   76 andres                    538 GNC          53 :         libpqsrv_disconnect(entry->conn);
 2088 tgl                       539 GIC          53 :         entry->conn = NULL;
 2088 tgl                       540 ECB             :     }
 2088 tgl                       541 GIC          53 : }
 2088 tgl                       542 ECB             : 
                                543                 : /*
 1206 andrew                    544                 :  * Return true if the password_required is defined and false for this user
                                545                 :  * mapping, otherwise false. The mapping has been pre-validated.
                                546                 :  */
                                547                 : static bool
 1206 andrew                    548 GIC           5 : UserMappingPasswordRequired(UserMapping *user)
                                549                 : {
                                550                 :     ListCell   *cell;
                                551                 : 
                                552               8 :     foreach(cell, user->options)
                                553                 :     {
 1206 andrew                    554 CBC           4 :         DefElem    *def = (DefElem *) lfirst(cell);
                                555                 : 
 1206 andrew                    556 GIC           4 :         if (strcmp(def->defname, "password_required") == 0)
                                557               1 :             return defGetBoolean(def);
 1206 andrew                    558 ECB             :     }
                                559                 : 
 1206 andrew                    560 CBC           4 :     return true;
                                561                 : }
 1206 andrew                    562 ECB             : 
 3699 tgl                       563                 : /*
                                564                 :  * For non-superusers, insist that the connstr specify a password.  This
                                565                 :  * prevents a password from being picked up from .pgpass, a service file, the
 1206 andrew                    566                 :  * environment, etc.  We don't want the postgres user's passwords,
                                567                 :  * certificates, etc to be accessible to non-superusers.  (See also
                                568                 :  * dblink_connstr_check in contrib/dblink.)
                                569                 :  */
                                570                 : static void
 1951 rhaas                     571 GIC          62 : check_conn_params(const char **keywords, const char **values, UserMapping *user)
                                572                 : {
                                573                 :     int         i;
                                574                 : 
                                575                 :     /* no check required if superuser */
                                576              62 :     if (superuser_arg(user->userid))
 3699 tgl                       577 CBC          57 :         return;
                                578                 : 
                                579                 :     /* ok if params contain a non-empty password */
 3699 tgl                       580 GIC          19 :     for (i = 0; keywords[i] != NULL; i++)
                                581                 :     {
 3699 tgl                       582 CBC          17 :         if (strcmp(keywords[i], "password") == 0 && values[i][0] != '\0')
                                583               3 :             return;
                                584                 :     }
                                585                 : 
 1206 andrew                    586 ECB             :     /* ok if the superuser explicitly said so at user mapping creation time */
 1206 andrew                    587 GIC           2 :     if (!UserMappingPasswordRequired(user))
 1206 andrew                    588 LBC           0 :         return;
 1206 andrew                    589 ECB             : 
 3699 tgl                       590 GIC           2 :     ereport(ERROR,
                                591                 :             (errcode(ERRCODE_S_R_E_PROHIBITED_SQL_STATEMENT_ATTEMPTED),
                                592                 :              errmsg("password is required"),
    1 sfrost                    593 ECB             :              errdetail("Non-superusers must provide a password in the user mapping.")));
 3699 tgl                       594 EUB             : }
                                595                 : 
 3698 tgl                       596 ECB             : /*
                                597                 :  * Issue SET commands to make sure remote session is configured properly.
                                598                 :  *
                                599                 :  * We do this just once at connection, assuming nothing will change the
                                600                 :  * values later.  Since we'll never send volatile function calls to the
                                601                 :  * remote, there shouldn't be any way to break this assumption from our end.
                                602                 :  * It's possible to think of ways to break it at the remote end, eg making
                                603                 :  * a foreign table point to a view that includes a set_config call ---
                                604                 :  * but once you admit the possibility of a malicious view definition,
                                605                 :  * there are any number of ways to break things.
                                606                 :  */
                                607                 : static void
 3698 tgl                       608 GIC          56 : configure_remote_session(PGconn *conn)
                                609                 : {
 3681                           610              56 :     int         remoteversion = PQserverVersion(conn);
                                611                 : 
                                612                 :     /* Force the search path to contain only pg_catalog (see deparse.c) */
                                613              56 :     do_sql_command(conn, "SET search_path = pg_catalog");
 3681 tgl                       614 ECB             : 
                                615                 :     /*
                                616                 :      * Set remote timezone; this is basically just cosmetic, since all
                                617                 :      * transmitted and returned timestamptzs should specify a zone explicitly
                                618                 :      * anyway.  However it makes the regression test outputs more predictable.
                                619                 :      *
                                620                 :      * We don't risk setting remote zone equal to ours, since the remote
                                621                 :      * server might use a different timezone database.  Instead, use UTC
                                622                 :      * (quoted, because very old servers are picky about case).
                                623                 :      */
 3670 tgl                       624 GIC          56 :     do_sql_command(conn, "SET timezone = 'UTC'");
                                625                 : 
                                626                 :     /*
                                627                 :      * Set values needed to ensure unambiguous data output from remote.  (This
                                628                 :      * logic should match what pg_dump does.  See also set_transmission_modes
                                629                 :      * in postgres_fdw.c.)
 3681 tgl                       630 ECB             :      */
 3681 tgl                       631 GIC          56 :     do_sql_command(conn, "SET datestyle = ISO");
                                632              56 :     if (remoteversion >= 80400)
                                633              56 :         do_sql_command(conn, "SET intervalstyle = postgres");
                                634              56 :     if (remoteversion >= 90000)
                                635              56 :         do_sql_command(conn, "SET extra_float_digits = 3");
                                636                 :     else
 3681 tgl                       637 LBC           0 :         do_sql_command(conn, "SET extra_float_digits = 2");
 3681 tgl                       638 CBC          56 : }
 3681 tgl                       639 ECB             : 
                                640                 : /*
                                641                 :  * Convenience subroutine to issue a non-data-returning SQL command to remote
                                642                 :  */
  731 fujii                     643 EUB             : void
 3681 tgl                       644 CBC        1651 : do_sql_command(PGconn *conn, const char *sql)
                                645                 : {
  409 efujita                   646 GIC        1651 :     do_sql_command_begin(conn, sql);
                                647            1651 :     do_sql_command_end(conn, sql, false);
                                648            1648 : }
                                649                 : 
  409 efujita                   650 ECB             : static void
  409 efujita                   651 GIC        1669 : do_sql_command_begin(PGconn *conn, const char *sql)
  409 efujita                   652 ECB             : {
 2132 rhaas                     653 CBC        1669 :     if (!PQsendQuery(conn, sql))
 2132 rhaas                     654 LBC           0 :         pgfdw_report_error(ERROR, NULL, conn, false, sql);
  409 efujita                   655 GIC        1669 : }
                                656                 : 
  409 efujita                   657 ECB             : static void
  409 efujita                   658 GIC        1669 : do_sql_command_end(PGconn *conn, const char *sql, bool consume_input)
  409 efujita                   659 ECB             : {
  409 efujita                   660 EUB             :     PGresult   *res;
  409 efujita                   661 ECB             : 
                                662                 :     /*
                                663                 :      * If requested, consume whatever data is available from the socket. (Note
  332 tgl                       664                 :      * that if all data is available, this allows pgfdw_get_result to call
                                665                 :      * PQgetResult without forcing the overhead of WaitLatchOrSocket, which
                                666                 :      * would be large compared to the overhead of PQconsumeInput.)
                                667                 :      */
  409 efujita                   668 GIC        1669 :     if (consume_input && !PQconsumeInput(conn))
  409 efujita                   669 UIC           0 :         pgfdw_report_error(ERROR, NULL, conn, false, sql);
 2132 rhaas                     670 GIC        1669 :     res = pgfdw_get_result(conn, sql);
 3698 tgl                       671            1667 :     if (PQresultStatus(res) != PGRES_COMMAND_OK)
 3352                           672               1 :         pgfdw_report_error(ERROR, res, conn, true, sql);
 3698                           673            1666 :     PQclear(res);
 3698 tgl                       674 CBC        1666 : }
 3698 tgl                       675 EUB             : 
 3699 tgl                       676 ECB             : /*
                                677                 :  * Start remote transaction or subtransaction, if needed.
                                678                 :  *
                                679                 :  * Note that we always use at least REPEATABLE READ in the remote session.
                                680                 :  * This is so that, if a query initiates multiple scans of the same or
                                681                 :  * different foreign tables, we will get snapshot-consistent results from
                                682                 :  * those scans.  A disadvantage is that we can't provide sane emulation of
                                683                 :  * READ COMMITTED behavior --- it would be nice if we had some other way to
                                684                 :  * control which remote queries share a snapshot.
                                685                 :  */
                                686                 : static void
 3699 tgl                       687 GIC        1779 : begin_remote_xact(ConnCacheEntry *entry)
                                688                 : {
                                689            1779 :     int         curlevel = GetCurrentTransactionNestLevel();
                                690                 : 
                                691                 :     /* Start main transaction if we haven't yet */
                                692            1779 :     if (entry->xact_depth <= 0)
 3699 tgl                       693 ECB             :     {
                                694                 :         const char *sql;
                                695                 : 
 3699 tgl                       696 GIC         697 :         elog(DEBUG3, "starting remote transaction on connection %p",
                                697                 :              entry->conn);
 3699 tgl                       698 ECB             : 
 3699 tgl                       699 GIC         697 :         if (IsolationIsSerializable())
 3699 tgl                       700 UIC           0 :             sql = "START TRANSACTION ISOLATION LEVEL SERIALIZABLE";
                                701                 :         else
 3699 tgl                       702 CBC         697 :             sql = "START TRANSACTION ISOLATION LEVEL REPEATABLE READ";
 2132 rhaas                     703 GIC         697 :         entry->changing_xact_state = true;
 3681 tgl                       704             697 :         do_sql_command(entry->conn, sql);
 3699 tgl                       705 CBC         696 :         entry->xact_depth = 1;
 2132 rhaas                     706 GBC         696 :         entry->changing_xact_state = false;
                                707                 :     }
 3699 tgl                       708 ECB             : 
                                709                 :     /*
                                710                 :      * If we're in a subtransaction, stack up savepoints to match our level.
                                711                 :      * This ensures we can rollback just the desired effects when a
                                712                 :      * subtransaction aborts.
                                713                 :      */
 3699 tgl                       714 GIC        1792 :     while (entry->xact_depth < curlevel)
                                715                 :     {
                                716                 :         char        sql[64];
                                717                 : 
                                718              15 :         snprintf(sql, sizeof(sql), "SAVEPOINT s%d", entry->xact_depth + 1);
 2132 rhaas                     719              15 :         entry->changing_xact_state = true;
 3681 tgl                       720 CBC          15 :         do_sql_command(entry->conn, sql);
 3699 tgl                       721 GIC          14 :         entry->xact_depth++;
 2132 rhaas                     722              14 :         entry->changing_xact_state = false;
                                723                 :     }
 3699 tgl                       724 CBC        1777 : }
 3699 tgl                       725 ECB             : 
                                726                 : /*
                                727                 :  * Release connection reference count created by calling GetConnection.
                                728                 :  */
                                729                 : void
 3699 tgl                       730 CBC        1728 : ReleaseConnection(PGconn *conn)
                                731                 : {
                                732                 :     /*
                                733                 :      * Currently, we don't actually track connection references because all
                                734                 :      * cleanup is managed on a transaction or subtransaction basis instead. So
                                735                 :      * there's nothing to do here.
 3699 tgl                       736 ECB             :      */
 3699 tgl                       737 GIC        1728 : }
                                738                 : 
                                739                 : /*
                                740                 :  * Assign a "unique" number for a cursor.
                                741                 :  *
                                742                 :  * These really only need to be unique per connection within a transaction.
 3699 tgl                       743 ECB             :  * For the moment we ignore the per-connection point and assign them across
                                744                 :  * all connections in the transaction, but we ask for the connection to be
                                745                 :  * supplied in case we want to refine that.
                                746                 :  *
                                747                 :  * Note that even if wraparound happens in a very long transaction, actual
                                748                 :  * collisions are highly improbable; just be sure to use %u not %d to print.
                                749                 :  */
                                750                 : unsigned int
 3699 tgl                       751 GIC         503 : GetCursorNumber(PGconn *conn)
                                752                 : {
                                753             503 :     return ++cursor_number;
                                754                 : }
                                755                 : 
                                756                 : /*
 3682 tgl                       757 ECB             :  * Assign a "unique" number for a prepared statement.
                                758                 :  *
                                759                 :  * This works much like GetCursorNumber, except that we never reset the counter
                                760                 :  * within a session.  That's because we can't be 100% sure we've gotten rid
                                761                 :  * of all prepared statements on all connections, and it's not really worth
                                762                 :  * increasing the risk of prepared-statement name collisions by resetting.
                                763                 :  */
                                764                 : unsigned int
 3682 tgl                       765 GIC         174 : GetPrepStmtNumber(PGconn *conn)
                                766                 : {
                                767             174 :     return ++prep_stmt_number;
                                768                 : }
                                769                 : 
                                770                 : /*
 2544 rhaas                     771 ECB             :  * Submit a query and wait for the result.
                                772                 :  *
                                773                 :  * This function is interruptible by signals.
                                774                 :  *
                                775                 :  * Caller is responsible for the error handling on the result.
                                776                 :  */
                                777                 : PGresult *
  739 efujita                   778 GIC        3560 : pgfdw_exec_query(PGconn *conn, const char *query, PgFdwConnState *state)
                                779                 : {
                                780                 :     /* First, process a pending asynchronous request, if any. */
                                781            3560 :     if (state && state->pendingAreq)
                                782               4 :         process_pending_request(state->pendingAreq);
                                783                 : 
 2544 rhaas                     784 ECB             :     /*
                                785                 :      * Submit a query.  Since we don't use non-blocking mode, this also can
                                786                 :      * block.  But its risk is relatively small, so we ignore that for now.
                                787                 :      */
 2544 rhaas                     788 CBC        3560 :     if (!PQsendQuery(conn, query))
 2544 rhaas                     789 UIC           0 :         pgfdw_report_error(ERROR, NULL, conn, false, query);
                                790                 : 
                                791                 :     /* Wait for the result. */
 2544 rhaas                     792 GIC        3560 :     return pgfdw_get_result(conn, query);
                                793                 : }
 2544 rhaas                     794 ECB             : 
 2544 rhaas                     795 EUB             : /*
                                796                 :  * Wait for the result from a prior asynchronous execution function call.
                                797                 :  *
 2544 rhaas                     798 ECB             :  * This function offers quick responsiveness by checking for any interruptions.
                                799                 :  *
                                800                 :  * This function emulates PQexec()'s behavior of returning the last result
                                801                 :  * when there are many.
                                802                 :  *
                                803                 :  * Caller is responsible for the error handling on the result.
                                804                 :  */
                                805                 : PGresult *
 2544 rhaas                     806 GIC        7428 : pgfdw_get_result(PGconn *conn, const char *query)
                                807                 : {
 2124 tgl                       808            7428 :     PGresult   *volatile last_res = NULL;
                                809                 : 
                                810                 :     /* In what follows, do not leak any PGresults on an error. */
                                811            7428 :     PG_TRY();
 2544 rhaas                     812 ECB             :     {
                                813                 :         for (;;)
 2544 rhaas                     814 CBC        7428 :         {
                                815                 :             PGresult   *res;
                                816                 : 
 2124 tgl                       817           37034 :             while (PQisBusy(conn))
                                818                 :             {
                                819                 :                 int         wc;
 2544 rhaas                     820 ECB             : 
                                821                 :                 /* Sleep until there's something to do */
 2124 tgl                       822 GIC        7324 :                 wc = WaitLatchOrSocket(MyLatch,
 1598 tmunro                    823 ECB             :                                        WL_LATCH_SET | WL_SOCKET_READABLE |
                                824                 :                                        WL_EXIT_ON_PM_DEATH,
                                825                 :                                        PQsocket(conn),
                                826                 :                                        -1L, PG_WAIT_EXTENSION);
 2124 tgl                       827 GIC        7324 :                 ResetLatch(MyLatch);
 2544 rhaas                     828 ECB             : 
 2124 tgl                       829 GIC        7324 :                 CHECK_FOR_INTERRUPTS();
                                830                 : 
                                831                 :                 /* Data available in socket? */
                                832            7324 :                 if (wc & WL_SOCKET_READABLE)
 2124 tgl                       833 ECB             :                 {
 2124 tgl                       834 GIC        7322 :                     if (!PQconsumeInput(conn))
 2124 tgl                       835 CBC           2 :                         pgfdw_report_error(ERROR, NULL, conn, false, query);
                                836                 :                 }
                                837                 :             }
 2544 rhaas                     838 ECB             : 
 2124 tgl                       839 GIC       14854 :             res = PQgetResult(conn);
 2124 tgl                       840 CBC       14854 :             if (res == NULL)
                                841            7426 :                 break;          /* query is complete */
                                842                 : 
 2124 tgl                       843 GIC        7428 :             PQclear(last_res);
                                844            7428 :             last_res = res;
 2124 tgl                       845 ECB             :         }
                                846                 :     }
 2124 tgl                       847 CBC           2 :     PG_CATCH();
                                848                 :     {
 2544 rhaas                     849               2 :         PQclear(last_res);
 2124 tgl                       850               2 :         PG_RE_THROW();
                                851                 :     }
 2124 tgl                       852 GIC        7426 :     PG_END_TRY();
 2544 rhaas                     853 ECB             : 
 2544 rhaas                     854 GIC        7426 :     return last_res;
 2544 rhaas                     855 ECB             : }
                                856                 : 
                                857                 : /*
 3699 tgl                       858                 :  * Report an error we got from the remote server.
                                859                 :  *
                                860                 :  * elevel: error level to use (typically ERROR, but might be less)
                                861                 :  * res: PGresult containing the error
                                862                 :  * conn: connection we did the query on
                                863                 :  * clear: if true, PQclear the result (otherwise caller will handle it)
                                864                 :  * sql: NULL, or text of remote command we tried to execute
                                865                 :  *
                                866                 :  * Note: callers that choose not to throw ERROR for a remote error are
                                867                 :  * responsible for making sure that the associated ConnCacheEntry gets
                                868                 :  * marked with have_error = true.
                                869                 :  */
                                870                 : void
 3352 tgl                       871 GIC          15 : pgfdw_report_error(int elevel, PGresult *res, PGconn *conn,
                                872                 :                    bool clear, const char *sql)
                                873                 : {
                                874                 :     /* If requested, PGresult must be released before leaving this function. */
 3699                           875              15 :     PG_TRY();
                                876                 :     {
 3699 tgl                       877 CBC          15 :         char       *diag_sqlstate = PQresultErrorField(res, PG_DIAG_SQLSTATE);
 3699 tgl                       878 GIC          15 :         char       *message_primary = PQresultErrorField(res, PG_DIAG_MESSAGE_PRIMARY);
                                879              15 :         char       *message_detail = PQresultErrorField(res, PG_DIAG_MESSAGE_DETAIL);
                                880              15 :         char       *message_hint = PQresultErrorField(res, PG_DIAG_MESSAGE_HINT);
 3699 tgl                       881 CBC          15 :         char       *message_context = PQresultErrorField(res, PG_DIAG_CONTEXT);
                                882                 :         int         sqlstate;
 3699 tgl                       883 ECB             : 
 3699 tgl                       884 CBC          15 :         if (diag_sqlstate)
                                885              13 :             sqlstate = MAKE_SQLSTATE(diag_sqlstate[0],
 3699 tgl                       886 ECB             :                                      diag_sqlstate[1],
                                887                 :                                      diag_sqlstate[2],
                                888                 :                                      diag_sqlstate[3],
                                889                 :                                      diag_sqlstate[4]);
                                890                 :         else
 3699 tgl                       891 CBC           2 :             sqlstate = ERRCODE_CONNECTION_FAILURE;
                                892                 : 
                                893                 :         /*
                                894                 :          * If we don't get a message from the PGresult, try the PGconn.  This
                                895                 :          * is needed because for connection-level failures, PQexec may just
                                896                 :          * return NULL, not a PGresult at all.
 3352 tgl                       897 ECB             :          */
 3352 tgl                       898 GIC          15 :         if (message_primary == NULL)
 2232 peter_e                   899               2 :             message_primary = pchomp(PQerrorMessage(conn));
                                900                 : 
 3699 tgl                       901              15 :         ereport(elevel,
                                902                 :                 (errcode(sqlstate),
                                903                 :                  (message_primary != NULL && message_primary[0] != '\0') ?
  492 fujii                     904 ECB             :                  errmsg_internal("%s", message_primary) :
 2300 mail                      905                 :                  errmsg("could not obtain message string for remote error"),
                                906                 :                  message_detail ? errdetail_internal("%s", message_detail) : 0,
 3699 tgl                       907                 :                  message_hint ? errhint("%s", message_hint) : 0,
                                908                 :                  message_context ? errcontext("%s", message_context) : 0,
                                909                 :                  sql ? errcontext("remote SQL command: %s", sql) : 0));
                                910                 :     }
 1255 peter                     911 GIC          15 :     PG_FINALLY();
                                912                 :     {
 3699 tgl                       913              15 :         if (clear)
                                914              12 :             PQclear(res);
                                915                 :     }
                                916              15 :     PG_END_TRY();
 3699 tgl                       917 LBC           0 : }
                                918                 : 
 3699 tgl                       919 ECB             : /*
                                920                 :  * pgfdw_xact_callback --- cleanup at main-transaction end.
                                921                 :  *
  881 noah                      922                 :  * This runs just late enough that it must not enter user-defined code
  881 noah                      923 EUB             :  * locally.  (Entering such code on the remote side is fine.  Its remote
                                924                 :  * COMMIT TRANSACTION may run deferred triggers.)
                                925                 :  */
                                926                 : static void
 3699 tgl                       927 GIC        3678 : pgfdw_xact_callback(XactEvent event, void *arg)
                                928                 : {
                                929                 :     HASH_SEQ_STATUS scan;
                                930                 :     ConnCacheEntry *entry;
  409 efujita                   931            3678 :     List       *pending_entries = NIL;
    3 efujita                   932 GNC        3678 :     List       *cancel_requested = NIL;
                                933                 : 
 3699 tgl                       934 ECB             :     /* Quick exit if no connections were touched in this transaction. */
 3699 tgl                       935 GIC        3678 :     if (!xact_got_connection)
                                936            3012 :         return;
                                937                 : 
 3699 tgl                       938 ECB             :     /*
                                939                 :      * Scan all connection cache entries to find open remote transactions, and
                                940                 :      * close them.
                                941                 :      */
 3699 tgl                       942 CBC         666 :     hash_seq_init(&scan, ConnectionHash);
                                943            2917 :     while ((entry = (ConnCacheEntry *) hash_seq_search(&scan)))
                                944                 :     {
                                945                 :         PGresult   *res;
                                946                 : 
                                947                 :         /* Ignore cache entry if no open connection right now */
 3352 tgl                       948 GIC        2252 :         if (entry->conn == NULL)
 3699 tgl                       949 CBC        1060 :             continue;
 3699 tgl                       950 ECB             : 
                                951                 :         /* If it has an open remote transaction, try to close it */
 3352 tgl                       952 GIC        1192 :         if (entry->xact_depth > 0)
                                953                 :         {
                                954             697 :             elog(DEBUG3, "closing remote transaction on connection %p",
 3352 tgl                       955 ECB             :                  entry->conn);
                                956                 : 
 3352 tgl                       957 GIC         697 :             switch (event)
                                958                 :             {
 2901 rhaas                     959 CBC         658 :                 case XACT_EVENT_PARALLEL_PRE_COMMIT:
                                960                 :                 case XACT_EVENT_PRE_COMMIT:
 2132 rhaas                     961 ECB             : 
                                962                 :                     /*
                                963                 :                      * If abort cleanup previously failed for this connection,
                                964                 :                      * we can't issue any more commands against it.
                                965                 :                      */
 2132 rhaas                     966 CBC         658 :                     pgfdw_reject_incomplete_xact_state_change(entry);
                                967                 : 
                                968                 :                     /* Commit all remote transactions during pre-commit */
 2132 rhaas                     969 GIC         658 :                     entry->changing_xact_state = true;
  409 efujita                   970             658 :                     if (entry->parallel_commit)
                                971                 :                     {
                                972              16 :                         do_sql_command_begin(entry->conn, "COMMIT TRANSACTION");
  409 efujita                   973 CBC          16 :                         pending_entries = lappend(pending_entries, entry);
  409 efujita                   974 GIC          16 :                         continue;
                                975                 :                     }
 3352 tgl                       976 CBC         642 :                     do_sql_command(entry->conn, "COMMIT TRANSACTION");
 2132 rhaas                     977             642 :                     entry->changing_xact_state = false;
                                978                 : 
 3352 tgl                       979 ECB             :                     /*
                                980                 :                      * If there were any errors in subtransactions, and we
                                981                 :                      * made prepared statements, do a DEALLOCATE ALL to make
                                982                 :                      * sure we get rid of all prepared statements. This is
                                983                 :                      * annoying and not terribly bulletproof, but it's
                                984                 :                      * probably not worth trying harder.
                                985                 :                      *
                                986                 :                      * DEALLOCATE ALL only exists in 8.3 and later, so this
                                987                 :                      * constrains how old a server postgres_fdw can
                                988                 :                      * communicate with.  We intentionally ignore errors in
                                989                 :                      * the DEALLOCATE, so that we can hobble along to some
                                990                 :                      * extent with older servers (leaking prepared statements
                                991                 :                      * as we go; but we don't really support update operations
                                992                 :                      * pre-8.3 anyway).
                                993                 :                      */
 3682 tgl                       994 GIC         642 :                     if (entry->have_prep_stmt && entry->have_error)
                                995                 :                     {
 3682 tgl                       996 UIC           0 :                         res = PQexec(entry->conn, "DEALLOCATE ALL");
                                997               0 :                         PQclear(res);
                                998                 :                     }
 3682 tgl                       999 GIC         642 :                     entry->have_prep_stmt = false;
                               1000             642 :                     entry->have_error = false;
 3352 tgl                      1001 CBC         642 :                     break;
 3352 tgl                      1002 GIC           1 :                 case XACT_EVENT_PRE_PREPARE:
 3352 tgl                      1003 EUB             : 
                               1004                 :                     /*
                               1005                 :                      * We disallow any remote transactions, since it's not
 1248 efujita                  1006 ECB             :                      * very reasonable to hold them open until the prepared
                               1007                 :                      * transaction is committed.  For the moment, throw error
                               1008                 :                      * unconditionally; later we might allow read-only cases.
                               1009                 :                      * Note that the error will cause us to come right back
                               1010                 :                      * here with event == XACT_EVENT_ABORT, so we'll clean up
                               1011                 :                      * the connection state at that point.
                               1012                 :                      */
 3352 tgl                      1013 GIC           1 :                     ereport(ERROR,
                               1014                 :                             (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
                               1015                 :                              errmsg("cannot PREPARE a transaction that has operated on postgres_fdw foreign tables")));
                               1016                 :                     break;
 2901 rhaas                    1017 UIC           0 :                 case XACT_EVENT_PARALLEL_COMMIT:
                               1018                 :                 case XACT_EVENT_COMMIT:
                               1019                 :                 case XACT_EVENT_PREPARE:
 3352 tgl                      1020 ECB             :                     /* Pre-commit should have closed the open transaction */
 3352 tgl                      1021 UIC           0 :                     elog(ERROR, "missed cleaning up connection during pre-commit");
                               1022                 :                     break;
 2901 rhaas                    1023 GIC          38 :                 case XACT_EVENT_PARALLEL_ABORT:
 3352 tgl                      1024 EUB             :                 case XACT_EVENT_ABORT:
                               1025                 :                     /* Rollback all remote transactions during abort */
    3 efujita                  1026 GNC          38 :                     if (entry->parallel_abort)
                               1027                 :                     {
                               1028               4 :                         if (pgfdw_abort_cleanup_begin(entry, true,
                               1029                 :                                                       &pending_entries,
                               1030                 :                                                       &cancel_requested))
                               1031               4 :                             continue;
                               1032                 :                     }
                               1033                 :                     else
                               1034              34 :                         pgfdw_abort_cleanup(entry, true);
 3352 tgl                      1035 GIC          34 :                     break;
 3352 tgl                      1036 EUB             :             }
                               1037                 :         }
 3699 tgl                      1038 ECB             : 
                               1039                 :         /* Reset state to show we're out of a transaction */
  409 efujita                  1040 GIC        1171 :         pgfdw_reset_xact_state(entry, true);
  409 efujita                  1041 ECB             :     }
                               1042                 : 
                               1043                 :     /* If there are any pending connections, finish cleaning them up */
    3 efujita                  1044 GNC         665 :     if (pending_entries || cancel_requested)
                               1045                 :     {
                               1046              15 :         if (event == XACT_EVENT_PARALLEL_PRE_COMMIT ||
                               1047                 :             event == XACT_EVENT_PRE_COMMIT)
                               1048                 :         {
                               1049              13 :             Assert(cancel_requested == NIL);
                               1050              13 :             pgfdw_finish_pre_commit_cleanup(pending_entries);
                               1051                 :         }
                               1052                 :         else
                               1053                 :         {
                               1054               2 :             Assert(event == XACT_EVENT_PARALLEL_ABORT ||
                               1055                 :                    event == XACT_EVENT_ABORT);
                               1056               2 :             pgfdw_finish_abort_cleanup(pending_entries, cancel_requested,
                               1057                 :                                        true);
                               1058                 :         }
 3699 tgl                      1059 ECB             :     }
                               1060                 : 
                               1061                 :     /*
                               1062                 :      * Regardless of the event type, we can now mark ourselves as out of the
                               1063                 :      * transaction.  (Note: if we are here during PRE_COMMIT or PRE_PREPARE,
                               1064                 :      * this saves a useless scan of the hashtable during COMMIT or PREPARE.)
                               1065                 :      */
 3699 tgl                      1066 GIC         665 :     xact_got_connection = false;
                               1067                 : 
                               1068                 :     /* Also reset cursor numbering for next transaction */
 3699 tgl                      1069 CBC         665 :     cursor_number = 0;
                               1070                 : }
 3699 tgl                      1071 ECB             : 
                               1072                 : /*
                               1073                 :  * pgfdw_subxact_callback --- cleanup at subtransaction end.
                               1074                 :  */
                               1075                 : static void
 3699 tgl                      1076 GIC          38 : pgfdw_subxact_callback(SubXactEvent event, SubTransactionId mySubid,
                               1077                 :                        SubTransactionId parentSubid, void *arg)
                               1078                 : {
 3699 tgl                      1079 ECB             :     HASH_SEQ_STATUS scan;
                               1080                 :     ConnCacheEntry *entry;
                               1081                 :     int         curlevel;
  409 efujita                  1082 GIC          38 :     List       *pending_entries = NIL;
    3 efujita                  1083 GNC          38 :     List       *cancel_requested = NIL;
                               1084                 : 
                               1085                 :     /* Nothing to do at subxact start, nor after commit. */
 3699 tgl                      1086 GIC          38 :     if (!(event == SUBXACT_EVENT_PRE_COMMIT_SUB ||
                               1087                 :           event == SUBXACT_EVENT_ABORT_SUB))
                               1088              23 :         return;
                               1089                 : 
                               1090                 :     /* Quick exit if no connections were touched in this transaction. */
                               1091              15 :     if (!xact_got_connection)
 3699 tgl                      1092 LBC           0 :         return;
                               1093                 : 
                               1094                 :     /*
 3699 tgl                      1095 ECB             :      * Scan all connection cache entries to find open remote subtransactions
                               1096                 :      * of the current level, and close them.
                               1097                 :      */
 3699 tgl                      1098 GIC          15 :     curlevel = GetCurrentTransactionNestLevel();
                               1099              15 :     hash_seq_init(&scan, ConnectionHash);
                               1100              87 :     while ((entry = (ConnCacheEntry *) hash_seq_search(&scan)))
                               1101                 :     {
 3699 tgl                      1102 ECB             :         char        sql[100];
                               1103                 : 
                               1104                 :         /*
                               1105                 :          * We only care about connections with open remote subtransactions of
                               1106                 :          * the current level.
                               1107                 :          */
 3699 tgl                      1108 CBC          72 :         if (entry->conn == NULL || entry->xact_depth < curlevel)
                               1109              64 :             continue;
                               1110                 : 
 3699 tgl                      1111 GIC          14 :         if (entry->xact_depth > curlevel)
 3699 tgl                      1112 LBC           0 :             elog(ERROR, "missed cleaning up remote subtransaction at level %d",
                               1113                 :                  entry->xact_depth);
 3699 tgl                      1114 ECB             : 
 3699 tgl                      1115 GIC          14 :         if (event == SUBXACT_EVENT_PRE_COMMIT_SUB)
                               1116                 :         {
 2132 rhaas                    1117 ECB             :             /*
 2132 rhaas                    1118 EUB             :              * If abort cleanup previously failed for this connection, we
                               1119                 :              * can't issue any more commands against it.
                               1120                 :              */
 2132 rhaas                    1121 GIC           7 :             pgfdw_reject_incomplete_xact_state_change(entry);
                               1122                 : 
                               1123                 :             /* Commit all remote subtransactions during pre-commit */
 3699 tgl                      1124 CBC           7 :             snprintf(sql, sizeof(sql), "RELEASE SAVEPOINT s%d", curlevel);
 2132 rhaas                    1125               7 :             entry->changing_xact_state = true;
  409 efujita                  1126               7 :             if (entry->parallel_commit)
                               1127                 :             {
  409 efujita                  1128 GIC           2 :                 do_sql_command_begin(entry->conn, sql);
                               1129               2 :                 pending_entries = lappend(pending_entries, entry);
                               1130               2 :                 continue;
                               1131                 :             }
 3681 tgl                      1132               5 :             do_sql_command(entry->conn, sql);
 2132 rhaas                    1133               5 :             entry->changing_xact_state = false;
 3699 tgl                      1134 ECB             :         }
  564 fujii                    1135                 :         else
                               1136                 :         {
                               1137                 :             /* Rollback all remote subtransactions during abort */
    3 efujita                  1138 GNC           7 :             if (entry->parallel_abort)
                               1139                 :             {
                               1140               4 :                 if (pgfdw_abort_cleanup_begin(entry, false,
                               1141                 :                                               &pending_entries,
                               1142                 :                                               &cancel_requested))
                               1143               4 :                     continue;
                               1144                 :             }
                               1145                 :             else
                               1146               3 :                 pgfdw_abort_cleanup(entry, false);
                               1147                 :         }
                               1148                 : 
 3699 tgl                      1149 ECB             :         /* OK, we're outta that level of subtransaction */
  409 efujita                  1150 GIC           8 :         pgfdw_reset_xact_state(entry, false);
                               1151                 :     }
                               1152                 : 
                               1153                 :     /* If there are any pending connections, finish cleaning them up */
    3 efujita                  1154 GNC          15 :     if (pending_entries || cancel_requested)
  409 efujita                  1155 ECB             :     {
    3 efujita                  1156 GNC           3 :         if (event == SUBXACT_EVENT_PRE_COMMIT_SUB)
                               1157                 :         {
                               1158               1 :             Assert(cancel_requested == NIL);
                               1159               1 :             pgfdw_finish_pre_subcommit_cleanup(pending_entries, curlevel);
                               1160                 :         }
                               1161                 :         else
                               1162                 :         {
                               1163               2 :             Assert(event == SUBXACT_EVENT_ABORT_SUB);
                               1164               2 :             pgfdw_finish_abort_cleanup(pending_entries, cancel_requested,
                               1165                 :                                        false);
                               1166                 :         }
 3699 tgl                      1167 ECB             :     }
                               1168                 : }
 2132 rhaas                    1169                 : 
                               1170                 : /*
 2088 tgl                      1171                 :  * Connection invalidation callback function
                               1172                 :  *
                               1173                 :  * After a change to a pg_foreign_server or pg_user_mapping catalog entry,
                               1174                 :  * close connections depending on that entry immediately if current transaction
  832 fujii                    1175                 :  * has not used those connections yet. Otherwise, mark those connections as
                               1176                 :  * invalid and then make pgfdw_xact_callback() close them at the end of current
                               1177                 :  * transaction, since they cannot be closed in the midst of the transaction
                               1178                 :  * using them. Closed connections will be remade at the next opportunity if
                               1179                 :  * necessary.
                               1180                 :  *
 2088 tgl                      1181                 :  * Although most cache invalidation callbacks blow away all the related stuff
                               1182                 :  * regardless of the given hashvalue, connections are expensive enough that
                               1183                 :  * it's worth trying to avoid that.
                               1184                 :  *
                               1185                 :  * NB: We could avoid unnecessary disconnection more strictly by examining
                               1186                 :  * individual option values, but it seems too much effort for the gain.
                               1187                 :  */
                               1188                 : static void
 2088 tgl                      1189 CBC         162 : pgfdw_inval_callback(Datum arg, int cacheid, uint32 hashvalue)
                               1190                 : {
                               1191                 :     HASH_SEQ_STATUS scan;
                               1192                 :     ConnCacheEntry *entry;
 2088 tgl                      1193 ECB             : 
 2088 tgl                      1194 GIC         162 :     Assert(cacheid == FOREIGNSERVEROID || cacheid == USERMAPPINGOID);
                               1195                 : 
                               1196                 :     /* ConnectionHash must exist already, if we're registered */
 2088 tgl                      1197 CBC         162 :     hash_seq_init(&scan, ConnectionHash);
 2088 tgl                      1198 GIC         950 :     while ((entry = (ConnCacheEntry *) hash_seq_search(&scan)))
 2088 tgl                      1199 ECB             :     {
                               1200                 :         /* Ignore invalid entries */
 2088 tgl                      1201 CBC         788 :         if (entry->conn == NULL)
                               1202             625 :             continue;
                               1203                 : 
                               1204                 :         /* hashvalue == 0 means a cache reset, must clear all state */
 2088 tgl                      1205 GIC         163 :         if (hashvalue == 0 ||
 2088 tgl                      1206 CBC         123 :             (cacheid == FOREIGNSERVEROID &&
                               1207             163 :              entry->server_hashvalue == hashvalue) ||
 2088 tgl                      1208 GIC          40 :             (cacheid == USERMAPPINGOID &&
                               1209              40 :              entry->mapping_hashvalue == hashvalue))
                               1210                 :         {
                               1211                 :             /*
                               1212                 :              * Close the connection immediately if it's not used yet in this
                               1213                 :              * transaction. Otherwise mark it as invalid so that
                               1214                 :              * pgfdw_xact_callback() can close it at the end of this
                               1215                 :              * transaction.
                               1216                 :              */
  832 fujii                    1217              46 :             if (entry->xact_depth == 0)
                               1218                 :             {
                               1219              43 :                 elog(DEBUG3, "discarding connection %p", entry->conn);
                               1220              43 :                 disconnect_pg_server(entry);
                               1221                 :             }
                               1222                 :             else
                               1223               3 :                 entry->invalidated = true;
                               1224                 :         }
                               1225                 :     }
 2088 tgl                      1226             162 : }
                               1227                 : 
                               1228                 : /*
                               1229                 :  * Raise an error if the given connection cache entry is marked as being
                               1230                 :  * in the middle of an xact state change.  This should be called at which no
                               1231                 :  * such change is expected to be in progress; if one is found to be in
 2132 rhaas                    1232 ECB             :  * progress, it means that we aborted in the middle of a previous state change
                               1233                 :  * and now don't know what the remote transaction state actually is.
                               1234                 :  * Such connections can't safely be further used.  Re-establishing the
                               1235                 :  * connection would change the snapshot and roll back any writes already
                               1236                 :  * performed, so that's not an option, either. Thus, we must abort.
                               1237                 :  */
                               1238                 : static void
 2132 rhaas                    1239 GIC        2449 : pgfdw_reject_incomplete_xact_state_change(ConnCacheEntry *entry)
 2132 rhaas                    1240 ECB             : {
                               1241                 :     ForeignServer *server;
                               1242                 : 
                               1243                 :     /* nothing to do for inactive entries and entries of sane state */
 2088 tgl                      1244 CBC        2449 :     if (entry->conn == NULL || !entry->changing_xact_state)
 2132 rhaas                    1245            2449 :         return;
                               1246                 : 
                               1247                 :     /* make sure this entry is inactive */
 2088 tgl                      1248 LBC           0 :     disconnect_pg_server(entry);
 2088 tgl                      1249 ECB             : 
                               1250                 :     /* find server name to be shown in the message below */
  814 fujii                    1251 LBC           0 :     server = GetForeignServer(entry->serverid);
 2132 rhaas                    1252 ECB             : 
 2132 rhaas                    1253 UIC           0 :     ereport(ERROR,
                               1254                 :             (errcode(ERRCODE_CONNECTION_EXCEPTION),
                               1255                 :              errmsg("connection to server \"%s\" was lost",
                               1256                 :                     server->servername)));
                               1257                 : }
                               1258                 : 
                               1259                 : /*
  409 efujita                  1260 ECB             :  * Reset state to show we're out of a (sub)transaction.
                               1261                 :  */
                               1262                 : static void
  409 efujita                  1263 CBC        1205 : pgfdw_reset_xact_state(ConnCacheEntry *entry, bool toplevel)
                               1264                 : {
  409 efujita                  1265 GIC        1205 :     if (toplevel)
  409 efujita                  1266 ECB             :     {
                               1267                 :         /* Reset state to show we're out of a transaction */
  409 efujita                  1268 GIC        1191 :         entry->xact_depth = 0;
  409 efujita                  1269 ECB             : 
                               1270                 :         /*
                               1271                 :          * If the connection isn't in a good idle state, it is marked as
                               1272                 :          * invalid or keep_connections option of its server is disabled, then
                               1273                 :          * discard it to recover. Next GetConnection will open a new
                               1274                 :          * connection.
                               1275                 :          */
  409 efujita                  1276 GIC        2381 :         if (PQstatus(entry->conn) != CONNECTION_OK ||
                               1277            1190 :             PQtransactionStatus(entry->conn) != PQTRANS_IDLE ||
                               1278            1190 :             entry->changing_xact_state ||
                               1279            1190 :             entry->invalidated ||
                               1280            1188 :             !entry->keep_connections)
                               1281                 :         {
  409 efujita                  1282 CBC           4 :             elog(DEBUG3, "discarding connection %p", entry->conn);
  409 efujita                  1283 GIC           4 :             disconnect_pg_server(entry);
                               1284                 :         }
                               1285                 :     }
                               1286                 :     else
  409 efujita                  1287 ECB             :     {
                               1288                 :         /* Reset state to show we're out of a subtransaction */
  409 efujita                  1289 GIC          14 :         entry->xact_depth--;
                               1290                 :     }
  409 efujita                  1291 GBC        1205 : }
                               1292                 : 
                               1293                 : /*
 2132 rhaas                    1294 EUB             :  * Cancel the currently-in-progress query (whose query text we do not have)
                               1295                 :  * and ignore the result.  Returns true if we successfully cancel the query
                               1296                 :  * and discard any pending result, and false if not.
                               1297                 :  *
                               1298                 :  * It's not a huge problem if we throw an ERROR here, but if we get into error
                               1299                 :  * recursion trouble, we'll end up slamming the connection shut, which will
                               1300                 :  * necessitate failing the entire toplevel transaction even if subtransactions
                               1301                 :  * were used.  Try to use WARNING where we can.
                               1302                 :  *
                               1303                 :  * XXX: if the query was one sent by fetch_more_data_begin(), we could get the
                               1304                 :  * query text from the pendingAreq saved in the per-connection state, then
                               1305                 :  * report the query using it.
 2132 rhaas                    1306 ECB             :  */
                               1307                 : static bool
 2132 rhaas                    1308 LBC           0 : pgfdw_cancel_query(PGconn *conn)
                               1309                 : {
                               1310                 :     TimestampTz endtime;
                               1311                 : 
                               1312                 :     /*
                               1313                 :      * If it takes too long to cancel the query and discard the result, assume
                               1314                 :      * the connection is dead.
 2132 rhaas                    1315 ECB             :      */
    3 efujita                  1316 UNC           0 :     endtime = TimestampTzPlusMilliseconds(GetCurrentTimestamp(),
                               1317                 :                                           CONNECTION_CLEANUP_TIMEOUT);
                               1318                 : 
                               1319               0 :     if (!pgfdw_cancel_query_begin(conn))
                               1320               0 :         return false;
                               1321               0 :     return pgfdw_cancel_query_end(conn, endtime, false);
                               1322                 : }
                               1323                 : 
                               1324                 : static bool
                               1325               0 : pgfdw_cancel_query_begin(PGconn *conn)
                               1326                 : {
                               1327                 :     PGcancel   *cancel;
                               1328                 :     char        errbuf[256];
 2132 rhaas                    1329 ECB             : 
                               1330                 :     /*
                               1331                 :      * Issue cancel request.  Unfortunately, there's no good way to limit the
                               1332                 :      * amount of time that we might block inside PQgetCancel().
                               1333                 :      */
 2132 rhaas                    1334 LBC           0 :     if ((cancel = PQgetCancel(conn)))
                               1335                 :     {
 2132 rhaas                    1336 UIC           0 :         if (!PQcancel(cancel, errbuf, sizeof(errbuf)))
                               1337                 :         {
                               1338               0 :             ereport(WARNING,
                               1339                 :                     (errcode(ERRCODE_CONNECTION_FAILURE),
 2132 rhaas                    1340 ECB             :                      errmsg("could not send cancel request: %s",
                               1341                 :                             errbuf)));
 2132 rhaas                    1342 LBC           0 :             PQfreeCancel(cancel);
 2132 rhaas                    1343 UIC           0 :             return false;
                               1344                 :         }
                               1345               0 :         PQfreeCancel(cancel);
                               1346                 :     }
                               1347                 : 
    3 efujita                  1348 UNC           0 :     return true;
                               1349                 : }
                               1350                 : 
                               1351                 : static bool
                               1352               0 : pgfdw_cancel_query_end(PGconn *conn, TimestampTz endtime, bool consume_input)
                               1353                 : {
                               1354               0 :     PGresult   *result = NULL;
                               1355                 :     bool        timed_out;
                               1356                 : 
                               1357                 :     /*
                               1358                 :      * If requested, consume whatever data is available from the socket. (Note
                               1359                 :      * that if all data is available, this allows pgfdw_get_cleanup_result to
                               1360                 :      * call PQgetResult without forcing the overhead of WaitLatchOrSocket,
                               1361                 :      * which would be large compared to the overhead of PQconsumeInput.)
                               1362                 :      */
                               1363               0 :     if (consume_input && !PQconsumeInput(conn))
                               1364                 :     {
                               1365               0 :         ereport(WARNING,
                               1366                 :                 (errcode(ERRCODE_CONNECTION_FAILURE),
                               1367                 :                  errmsg("could not get result of cancel request: %s",
                               1368                 :                         pchomp(PQerrorMessage(conn)))));
                               1369               0 :         return false;
                               1370                 :     }
                               1371                 : 
                               1372                 :     /* Get and discard the result of the query. */
  487 fujii                    1373 UIC           0 :     if (pgfdw_get_cleanup_result(conn, endtime, &result, &timed_out))
                               1374                 :     {
                               1375               0 :         if (timed_out)
                               1376               0 :             ereport(WARNING,
                               1377                 :                     (errmsg("could not get result of cancel request due to timeout")));
                               1378                 :         else
                               1379               0 :             ereport(WARNING,
                               1380                 :                     (errcode(ERRCODE_CONNECTION_FAILURE),
                               1381                 :                      errmsg("could not get result of cancel request: %s",
                               1382                 :                             pchomp(PQerrorMessage(conn)))));
  487 fujii                    1383 EUB             : 
 2132 rhaas                    1384 UIC           0 :         return false;
                               1385                 :     }
                               1386               0 :     PQclear(result);
                               1387                 : 
                               1388               0 :     return true;
                               1389                 : }
                               1390                 : 
 2132 rhaas                    1391 EUB             : /*
                               1392                 :  * Submit a query during (sub)abort cleanup and wait up to 30 seconds for the
                               1393                 :  * result.  If the query is executed without error, the return value is true.
                               1394                 :  * If the query is executed successfully but returns an error, the return
                               1395                 :  * value is true if and only if ignore_errors is set.  If the query can't be
                               1396                 :  * sent or times out, the return value is false.
                               1397                 :  *
                               1398                 :  * It's not a huge problem if we throw an ERROR here, but if we get into error
                               1399                 :  * recursion trouble, we'll end up slamming the connection shut, which will
  543 efujita                  1400                 :  * necessitate failing the entire toplevel transaction even if subtransactions
                               1401                 :  * were used.  Try to use WARNING where we can.
                               1402                 :  */
                               1403                 : static bool
 2132 rhaas                    1404 GIC          50 : pgfdw_exec_cleanup_query(PGconn *conn, const char *query, bool ignore_errors)
                               1405                 : {
                               1406                 :     TimestampTz endtime;
 2132 rhaas                    1407 EUB             : 
                               1408                 :     /*
                               1409                 :      * If it takes too long to execute a cleanup query, assume the connection
                               1410                 :      * is dead.  It's fairly likely that this is why we aborted in the first
                               1411                 :      * place (e.g. statement timeout, user cancel), so the timeout shouldn't
                               1412                 :      * be too long.
                               1413                 :      */
    3 efujita                  1414 GNC          50 :     endtime = TimestampTzPlusMilliseconds(GetCurrentTimestamp(),
                               1415                 :                                           CONNECTION_CLEANUP_TIMEOUT);
 2132 rhaas                    1416 EUB             : 
    3 efujita                  1417 GNC          50 :     if (!pgfdw_exec_cleanup_query_begin(conn, query))
    3 efujita                  1418 UNC           0 :         return false;
    3 efujita                  1419 GNC          50 :     return pgfdw_exec_cleanup_query_end(conn, query, endtime,
                               1420                 :                                         false, ignore_errors);
                               1421                 : }
                               1422                 : 
                               1423                 : static bool
                               1424              62 : pgfdw_exec_cleanup_query_begin(PGconn *conn, const char *query)
                               1425                 : {
 2132 rhaas                    1426 EUB             :     /*
                               1427                 :      * Submit a query.  Since we don't use non-blocking mode, this also can
                               1428                 :      * block.  But its risk is relatively small, so we ignore that for now.
                               1429                 :      */
 2132 rhaas                    1430 GIC          62 :     if (!PQsendQuery(conn, query))
 2132 rhaas                    1431 EUB             :     {
 2132 rhaas                    1432 UIC           0 :         pgfdw_report_error(WARNING, NULL, conn, false, query);
                               1433               0 :         return false;
                               1434                 :     }
 2132 rhaas                    1435 EUB             : 
    3 efujita                  1436 GNC          62 :     return true;
                               1437                 : }
                               1438                 : 
                               1439                 : static bool
                               1440              62 : pgfdw_exec_cleanup_query_end(PGconn *conn, const char *query,
                               1441                 :                              TimestampTz endtime, bool consume_input,
                               1442                 :                              bool ignore_errors)
                               1443                 : {
                               1444              62 :     PGresult   *result = NULL;
                               1445                 :     bool        timed_out;
                               1446                 : 
                               1447                 :     /*
                               1448                 :      * If requested, consume whatever data is available from the socket. (Note
                               1449                 :      * that if all data is available, this allows pgfdw_get_cleanup_result to
                               1450                 :      * call PQgetResult without forcing the overhead of WaitLatchOrSocket,
                               1451                 :      * which would be large compared to the overhead of PQconsumeInput.)
                               1452                 :      */
                               1453              62 :     if (consume_input && !PQconsumeInput(conn))
                               1454                 :     {
    3 efujita                  1455 UNC           0 :         pgfdw_report_error(WARNING, NULL, conn, false, query);
                               1456               0 :         return false;
                               1457                 :     }
                               1458                 : 
                               1459                 :     /* Get the result of the query. */
  487 fujii                    1460 GBC          62 :     if (pgfdw_get_cleanup_result(conn, endtime, &result, &timed_out))
                               1461                 :     {
  487 fujii                    1462 UIC           0 :         if (timed_out)
                               1463               0 :             ereport(WARNING,
                               1464                 :                     (errmsg("could not get query result due to timeout"),
                               1465                 :                      query ? errcontext("remote SQL command: %s", query) : 0));
                               1466                 :         else
                               1467               0 :             pgfdw_report_error(WARNING, NULL, conn, false, query);
                               1468                 : 
 2132 rhaas                    1469 UBC           0 :         return false;
                               1470                 :     }
 2132 rhaas                    1471 EUB             : 
                               1472                 :     /* Issue a warning if not successful. */
 2132 rhaas                    1473 GIC          62 :     if (PQresultStatus(result) != PGRES_COMMAND_OK)
                               1474                 :     {
 2132 rhaas                    1475 UBC           0 :         pgfdw_report_error(WARNING, result, conn, true, query);
 2132 rhaas                    1476 UIC           0 :         return ignore_errors;
                               1477                 :     }
 2124 tgl                      1478 GIC          62 :     PQclear(result);
 2132 rhaas                    1479 EUB             : 
 2132 rhaas                    1480 GIC          62 :     return true;
 2132 rhaas                    1481 EUB             : }
                               1482                 : 
                               1483                 : /*
                               1484                 :  * Get, during abort cleanup, the result of a query that is in progress.  This
                               1485                 :  * might be a query that is being interrupted by transaction abort, or it might
                               1486                 :  * be a query that was initiated as part of transaction abort to get the remote
                               1487                 :  * side back to the appropriate state.
                               1488                 :  *
                               1489                 :  * endtime is the time at which we should give up and assume the remote
  487 fujii                    1490                 :  * side is dead.  Returns true if the timeout expired or connection trouble
                               1491                 :  * occurred, false otherwise.  Sets *result except in case of a timeout.
                               1492                 :  * Sets timed_out to true only when the timeout expired.
                               1493                 :  */
 2132 rhaas                    1494                 : static bool
  487 fujii                    1495 GIC          62 : pgfdw_get_cleanup_result(PGconn *conn, TimestampTz endtime, PGresult **result,
                               1496                 :                          bool *timed_out)
                               1497                 : {
                               1498              62 :     volatile bool failed = false;
 2124 tgl                      1499              62 :     PGresult   *volatile last_res = NULL;
                               1500                 : 
  487 fujii                    1501              62 :     *timed_out = false;
                               1502                 : 
                               1503                 :     /* In what follows, do not leak any PGresults on an error. */
 2124 tgl                      1504              62 :     PG_TRY();
                               1505                 :     {
                               1506                 :         for (;;)
 2132 rhaas                    1507              69 :         {
                               1508                 :             PGresult   *res;
                               1509                 : 
 2124 tgl                      1510 CBC         312 :             while (PQisBusy(conn))
                               1511                 :             {
                               1512                 :                 int         wc;
 2124 tgl                      1513 GIC          50 :                 TimestampTz now = GetCurrentTimestamp();
                               1514                 :                 long        cur_timeout;
                               1515                 : 
                               1516                 :                 /* If timeout has expired, give up, else get sleep time. */
  880                          1517              50 :                 cur_timeout = TimestampDifferenceMilliseconds(now, endtime);
                               1518              50 :                 if (cur_timeout <= 0)
                               1519                 :                 {
  487 fujii                    1520 LBC           0 :                     *timed_out = true;
  487 fujii                    1521 UIC           0 :                     failed = true;
 2124 tgl                      1522               0 :                     goto exit;
 2124 tgl                      1523 ECB             :                 }
 2124 tgl                      1524 EUB             : 
 2124 tgl                      1525 ECB             :                 /* Sleep until there's something to do */
 2124 tgl                      1526 GIC          50 :                 wc = WaitLatchOrSocket(MyLatch,
                               1527                 :                                        WL_LATCH_SET | WL_SOCKET_READABLE |
                               1528                 :                                        WL_TIMEOUT | WL_EXIT_ON_PM_DEATH,
                               1529                 :                                        PQsocket(conn),
 2124 tgl                      1530 ECB             :                                        cur_timeout, PG_WAIT_EXTENSION);
 2124 tgl                      1531 GIC          50 :                 ResetLatch(MyLatch);
                               1532                 : 
                               1533              50 :                 CHECK_FOR_INTERRUPTS();
                               1534                 : 
                               1535                 :                 /* Data available in socket? */
 2124 tgl                      1536 CBC          50 :                 if (wc & WL_SOCKET_READABLE)
                               1537                 :                 {
 2124 tgl                      1538 GBC          50 :                     if (!PQconsumeInput(conn))
 2124 tgl                      1539 EUB             :                     {
                               1540                 :                         /* connection trouble */
  487 fujii                    1541 UIC           0 :                         failed = true;
 2124 tgl                      1542 LBC           0 :                         goto exit;
                               1543                 :                     }
                               1544                 :                 }
                               1545                 :             }
 2132 rhaas                    1546 ECB             : 
 2124 tgl                      1547 GIC         131 :             res = PQgetResult(conn);
                               1548             131 :             if (res == NULL)
                               1549              62 :                 break;          /* query is complete */
 2132 rhaas                    1550 ECB             : 
 2124 tgl                      1551 GIC          69 :             PQclear(last_res);
                               1552              69 :             last_res = res;
                               1553                 :         }
                               1554              62 : exit:   ;
                               1555                 :     }
 2124 tgl                      1556 UIC           0 :     PG_CATCH();
                               1557                 :     {
 2132 rhaas                    1558               0 :         PQclear(last_res);
 2124 tgl                      1559 LBC           0 :         PG_RE_THROW();
                               1560                 :     }
 2124 tgl                      1561 GBC          62 :     PG_END_TRY();
 2132 rhaas                    1562 EUB             : 
  487 fujii                    1563 GIC          62 :     if (failed)
 2124 tgl                      1564 UIC           0 :         PQclear(last_res);
                               1565                 :     else
 2124 tgl                      1566 CBC          62 :         *result = last_res;
  487 fujii                    1567 GIC          62 :     return failed;
 2132 rhaas                    1568 EUB             : }
  811 fujii                    1569                 : 
                               1570                 : /*
                               1571                 :  * Abort remote transaction or subtransaction.
                               1572                 :  *
  564                          1573                 :  * "toplevel" should be set to true if toplevel (main) transaction is
                               1574                 :  * rollbacked, false otherwise.
                               1575                 :  *
                               1576                 :  * Set entry->changing_xact_state to false on success, true on failure.
                               1577                 :  */
                               1578                 : static void
  380 efujita                  1579 CBC          37 : pgfdw_abort_cleanup(ConnCacheEntry *entry, bool toplevel)
                               1580                 : {
  380 efujita                  1581 EUB             :     char        sql[100];
                               1582                 : 
                               1583                 :     /*
  564 fujii                    1584 ECB             :      * Don't try to clean up the connection if we're already in error
                               1585                 :      * recursion trouble.
                               1586                 :      */
  564 fujii                    1587 GIC          37 :     if (in_error_recursion_trouble())
  564 fujii                    1588 UIC           0 :         entry->changing_xact_state = true;
                               1589                 : 
                               1590                 :     /*
                               1591                 :      * If connection is already unsalvageable, don't touch it further.
                               1592                 :      */
  564 fujii                    1593 GIC          37 :     if (entry->changing_xact_state)
                               1594               1 :         return;
                               1595                 : 
                               1596                 :     /*
                               1597                 :      * Mark this connection as in the process of changing transaction state.
                               1598                 :      */
                               1599              36 :     entry->changing_xact_state = true;
                               1600                 : 
  564 fujii                    1601 ECB             :     /* Assume we might have lost track of prepared statements */
  564 fujii                    1602 GIC          36 :     entry->have_error = true;
                               1603                 : 
  564 fujii                    1604 ECB             :     /*
                               1605                 :      * If a command has been submitted to the remote server by using an
                               1606                 :      * asynchronous execution function, the command might not have yet
                               1607                 :      * completed.  Check to see if a command is still being processed by the
                               1608                 :      * remote server, and if so, request cancellation of the command.
                               1609                 :      */
  564 fujii                    1610 CBC          36 :     if (PQtransactionStatus(entry->conn) == PQTRANS_ACTIVE &&
  564 fujii                    1611 UIC           0 :         !pgfdw_cancel_query(entry->conn))
                               1612               0 :         return;                 /* Unable to cancel running query */
  564 fujii                    1613 ECB             : 
    3 efujita                  1614 GNC          36 :     CONSTRUCT_ABORT_COMMAND(sql, entry, toplevel);
  564 fujii                    1615 GIC          36 :     if (!pgfdw_exec_cleanup_query(entry->conn, sql, false))
  380 efujita                  1616 UIC           0 :         return;                 /* Unable to abort remote (sub)transaction */
                               1617                 : 
  564 fujii                    1618 CBC          36 :     if (toplevel)
  564 fujii                    1619 ECB             :     {
  564 fujii                    1620 GIC          33 :         if (entry->have_prep_stmt && entry->have_error &&
  564 fujii                    1621 GBC          14 :             !pgfdw_exec_cleanup_query(entry->conn,
  564 fujii                    1622 EUB             :                                       "DEALLOCATE ALL",
                               1623                 :                                       true))
  564 fujii                    1624 UIC           0 :             return;             /* Trouble clearing prepared statements */
                               1625                 : 
  564 fujii                    1626 GIC          33 :         entry->have_prep_stmt = false;
  564 fujii                    1627 CBC          33 :         entry->have_error = false;
                               1628                 :     }
                               1629                 : 
                               1630                 :     /*
                               1631                 :      * If pendingAreq of the per-connection state is not NULL, it means that
  443 efujita                  1632 ECB             :      * an asynchronous fetch begun by fetch_more_data_begin() was not done
                               1633                 :      * successfully and thus the per-connection state was not reset in
                               1634                 :      * fetch_more_data(); in that case reset the per-connection state here.
                               1635                 :      */
  443 efujita                  1636 GIC          36 :     if (entry->state.pendingAreq)
  443 efujita                  1637 LBC           0 :         memset(&entry->state, 0, sizeof(entry->state));
                               1638                 : 
  564 fujii                    1639 ECB             :     /* Disarm changing_xact_state if it all worked */
  564 fujii                    1640 GIC          36 :     entry->changing_xact_state = false;
                               1641                 : }
  564 fujii                    1642 EUB             : 
                               1643                 : /*
                               1644                 :  * Like pgfdw_abort_cleanup, submit an abort command or cancel request, but
                               1645                 :  * don't wait for the result.
                               1646                 :  *
                               1647                 :  * Returns true if the abort command or cancel request is successfully issued,
                               1648                 :  * false otherwise.  If the abort command is successfully issued, the given
                               1649                 :  * connection cache entry is appended to *pending_entries.  Othewise, if the
                               1650                 :  * cancel request is successfully issued, it is appended to *cancel_requested.
                               1651                 :  */
                               1652                 : static bool
    3 efujita                  1653 GNC           8 : pgfdw_abort_cleanup_begin(ConnCacheEntry *entry, bool toplevel,
                               1654                 :                           List **pending_entries, List **cancel_requested)
                               1655                 : {
                               1656                 :     /*
                               1657                 :      * Don't try to clean up the connection if we're already in error
                               1658                 :      * recursion trouble.
                               1659                 :      */
                               1660               8 :     if (in_error_recursion_trouble())
    3 efujita                  1661 UNC           0 :         entry->changing_xact_state = true;
                               1662                 : 
                               1663                 :     /*
                               1664                 :      * If connection is already unsalvageable, don't touch it further.
                               1665                 :      */
    3 efujita                  1666 GNC           8 :     if (entry->changing_xact_state)
    3 efujita                  1667 UNC           0 :         return false;
                               1668                 : 
                               1669                 :     /*
                               1670                 :      * Mark this connection as in the process of changing transaction state.
                               1671                 :      */
    3 efujita                  1672 GNC           8 :     entry->changing_xact_state = true;
                               1673                 : 
                               1674                 :     /* Assume we might have lost track of prepared statements */
                               1675               8 :     entry->have_error = true;
                               1676                 : 
                               1677                 :     /*
                               1678                 :      * If a command has been submitted to the remote server by using an
                               1679                 :      * asynchronous execution function, the command might not have yet
                               1680                 :      * completed.  Check to see if a command is still being processed by the
                               1681                 :      * remote server, and if so, request cancellation of the command.
                               1682                 :      */
                               1683               8 :     if (PQtransactionStatus(entry->conn) == PQTRANS_ACTIVE)
                               1684                 :     {
    3 efujita                  1685 UNC           0 :         if (!pgfdw_cancel_query_begin(entry->conn))
                               1686               0 :             return false;       /* Unable to cancel running query */
                               1687               0 :         *cancel_requested = lappend(*cancel_requested, entry);
                               1688                 :     }
                               1689                 :     else
                               1690                 :     {
                               1691                 :         char        sql[100];
                               1692                 : 
    3 efujita                  1693 GNC           8 :         CONSTRUCT_ABORT_COMMAND(sql, entry, toplevel);
                               1694               8 :         if (!pgfdw_exec_cleanup_query_begin(entry->conn, sql))
    3 efujita                  1695 UNC           0 :             return false;       /* Unable to abort remote transaction */
    3 efujita                  1696 GNC           8 :         *pending_entries = lappend(*pending_entries, entry);
                               1697                 :     }
                               1698                 : 
                               1699               8 :     return true;
                               1700                 : }
                               1701                 : 
  409 efujita                  1702 EUB             : /*
                               1703                 :  * Finish pre-commit cleanup of connections on each of which we've sent a
                               1704                 :  * COMMIT command to the remote server.
                               1705                 :  */
                               1706                 : static void
  409 efujita                  1707 CBC          13 : pgfdw_finish_pre_commit_cleanup(List *pending_entries)
  409 efujita                  1708 ECB             : {
                               1709                 :     ConnCacheEntry *entry;
  409 efujita                  1710 GIC          13 :     List       *pending_deallocs = NIL;
  409 efujita                  1711 ECB             :     ListCell   *lc;
                               1712                 : 
  409 efujita                  1713 GIC          13 :     Assert(pending_entries);
  409 efujita                  1714 ECB             : 
                               1715                 :     /*
  409 efujita                  1716 EUB             :      * Get the result of the COMMIT command for each of the pending entries
                               1717                 :      */
  409 efujita                  1718 GBC          29 :     foreach(lc, pending_entries)
  409 efujita                  1719 EUB             :     {
  409 efujita                  1720 GIC          16 :         entry = (ConnCacheEntry *) lfirst(lc);
  409 efujita                  1721 ECB             : 
  409 efujita                  1722 GIC          16 :         Assert(entry->changing_xact_state);
  332 tgl                      1723 ECB             : 
  409 efujita                  1724 EUB             :         /*
                               1725                 :          * We might already have received the result on the socket, so pass
  409 efujita                  1726 ECB             :          * consume_input=true to try to consume it first
                               1727                 :          */
  409 efujita                  1728 GIC          16 :         do_sql_command_end(entry->conn, "COMMIT TRANSACTION", true);
                               1729              16 :         entry->changing_xact_state = false;
                               1730                 : 
                               1731                 :         /* Do a DEALLOCATE ALL in parallel if needed */
                               1732              16 :         if (entry->have_prep_stmt && entry->have_error)
                               1733                 :         {
                               1734                 :             /* Ignore errors (see notes in pgfdw_xact_callback) */
                               1735               2 :             if (PQsendQuery(entry->conn, "DEALLOCATE ALL"))
                               1736                 :             {
                               1737               2 :                 pending_deallocs = lappend(pending_deallocs, entry);
                               1738               2 :                 continue;
  409 efujita                  1739 ECB             :             }
                               1740                 :         }
  409 efujita                  1741 GIC          14 :         entry->have_prep_stmt = false;
                               1742              14 :         entry->have_error = false;
                               1743                 : 
                               1744              14 :         pgfdw_reset_xact_state(entry, true);
                               1745                 :     }
                               1746                 : 
  409 efujita                  1747 ECB             :     /* No further work if no pending entries */
  409 efujita                  1748 GBC          13 :     if (!pending_deallocs)
  409 efujita                  1749 GIC          12 :         return;
                               1750                 : 
                               1751                 :     /*
                               1752                 :      * Get the result of the DEALLOCATE command for each of the pending
  409 efujita                  1753 ECB             :      * entries
                               1754                 :      */
  409 efujita                  1755 GIC           3 :     foreach(lc, pending_deallocs)
                               1756                 :     {
                               1757                 :         PGresult   *res;
                               1758                 : 
  409 efujita                  1759 CBC           2 :         entry = (ConnCacheEntry *) lfirst(lc);
                               1760                 : 
                               1761                 :         /* Ignore errors (see notes in pgfdw_xact_callback) */
                               1762               4 :         while ((res = PQgetResult(entry->conn)) != NULL)
                               1763                 :         {
  409 efujita                  1764 GIC           2 :             PQclear(res);
                               1765                 :             /* Stop if the connection is lost (else we'll loop infinitely) */
                               1766               2 :             if (PQstatus(entry->conn) == CONNECTION_BAD)
  409 efujita                  1767 UIC           0 :                 break;
                               1768                 :         }
  409 efujita                  1769 GIC           2 :         entry->have_prep_stmt = false;
  409 efujita                  1770 CBC           2 :         entry->have_error = false;
  409 efujita                  1771 EUB             : 
  409 efujita                  1772 GBC           2 :         pgfdw_reset_xact_state(entry, true);
                               1773                 :     }
  409 efujita                  1774 ECB             : }
                               1775                 : 
  409 efujita                  1776 EUB             : /*
                               1777                 :  * Finish pre-subcommit cleanup of connections on each of which we've sent a
  409 efujita                  1778 ECB             :  * RELEASE command to the remote server.
                               1779                 :  */
                               1780                 : static void
  409 efujita                  1781 CBC           1 : pgfdw_finish_pre_subcommit_cleanup(List *pending_entries, int curlevel)
                               1782                 : {
                               1783                 :     ConnCacheEntry *entry;
  409 efujita                  1784 EUB             :     char        sql[100];
                               1785                 :     ListCell   *lc;
  409 efujita                  1786 ECB             : 
  409 efujita                  1787 CBC           1 :     Assert(pending_entries);
                               1788                 : 
                               1789                 :     /*
                               1790                 :      * Get the result of the RELEASE command for each of the pending entries
                               1791                 :      */
  409 efujita                  1792 GIC           1 :     snprintf(sql, sizeof(sql), "RELEASE SAVEPOINT s%d", curlevel);
                               1793               3 :     foreach(lc, pending_entries)
                               1794                 :     {
                               1795               2 :         entry = (ConnCacheEntry *) lfirst(lc);
  409 efujita                  1796 ECB             : 
  409 efujita                  1797 GBC           2 :         Assert(entry->changing_xact_state);
                               1798                 : 
                               1799                 :         /*
  409 efujita                  1800 ECB             :          * We might already have received the result on the socket, so pass
                               1801                 :          * consume_input=true to try to consume it first
                               1802                 :          */
  409 efujita                  1803 GIC           2 :         do_sql_command_end(entry->conn, sql, true);
                               1804               2 :         entry->changing_xact_state = false;
                               1805                 : 
                               1806               2 :         pgfdw_reset_xact_state(entry, false);
                               1807                 :     }
                               1808               1 : }
                               1809                 : 
                               1810                 : /*
                               1811                 :  * Finish abort cleanup of connections on each of which we've sent an abort
                               1812                 :  * command or cancel request to the remote server.
                               1813                 :  */
                               1814                 : static void
    3 efujita                  1815 GNC           4 : pgfdw_finish_abort_cleanup(List *pending_entries, List *cancel_requested,
                               1816                 :                            bool toplevel)
                               1817                 : {
                               1818               4 :     List       *pending_deallocs = NIL;
                               1819                 :     ListCell   *lc;
                               1820                 : 
                               1821                 :     /*
                               1822                 :      * For each of the pending cancel requests (if any), get and discard the
                               1823                 :      * result of the query, and submit an abort command to the remote server.
                               1824                 :      */
                               1825               4 :     if (cancel_requested)
                               1826                 :     {
    3 efujita                  1827 UNC           0 :         foreach(lc, cancel_requested)
                               1828                 :         {
                               1829               0 :             ConnCacheEntry *entry = (ConnCacheEntry *) lfirst(lc);
                               1830                 :             TimestampTz endtime;
                               1831                 :             char        sql[100];
                               1832                 : 
                               1833               0 :             Assert(entry->changing_xact_state);
                               1834                 : 
                               1835                 :             /*
                               1836                 :              * Set end time.  You might think we should do this before issuing
                               1837                 :              * cancel request like in normal mode, but that is problematic,
                               1838                 :              * because if, for example, it took longer than 30 seconds to
                               1839                 :              * process the first few entries in the cancel_requested list, it
                               1840                 :              * would cause a timeout error when processing each of the
                               1841                 :              * remaining entries in the list, leading to slamming that entry's
                               1842                 :              * connection shut.
                               1843                 :              */
                               1844               0 :             endtime = TimestampTzPlusMilliseconds(GetCurrentTimestamp(),
                               1845                 :                                                   CONNECTION_CLEANUP_TIMEOUT);
                               1846                 : 
                               1847               0 :             if (!pgfdw_cancel_query_end(entry->conn, endtime, true))
                               1848                 :             {
                               1849                 :                 /* Unable to cancel running query */
                               1850               0 :                 pgfdw_reset_xact_state(entry, toplevel);
                               1851               0 :                 continue;
                               1852                 :             }
                               1853                 : 
                               1854                 :             /* Send an abort command in parallel if needed */
                               1855               0 :             CONSTRUCT_ABORT_COMMAND(sql, entry, toplevel);
                               1856               0 :             if (!pgfdw_exec_cleanup_query_begin(entry->conn, sql))
                               1857                 :             {
                               1858                 :                 /* Unable to abort remote (sub)transaction */
                               1859               0 :                 pgfdw_reset_xact_state(entry, toplevel);
                               1860                 :             }
                               1861                 :             else
                               1862               0 :                 pending_entries = lappend(pending_entries, entry);
                               1863                 :         }
                               1864                 :     }
                               1865                 : 
                               1866                 :     /* No further work if no pending entries */
    3 efujita                  1867 GNC           4 :     if (!pending_entries)
    3 efujita                  1868 UNC           0 :         return;
                               1869                 : 
                               1870                 :     /*
                               1871                 :      * Get the result of the abort command for each of the pending entries
                               1872                 :      */
    3 efujita                  1873 GNC          12 :     foreach(lc, pending_entries)
                               1874                 :     {
                               1875               8 :         ConnCacheEntry *entry = (ConnCacheEntry *) lfirst(lc);
                               1876                 :         TimestampTz endtime;
                               1877                 :         char        sql[100];
                               1878                 : 
                               1879               8 :         Assert(entry->changing_xact_state);
                               1880                 : 
                               1881                 :         /*
                               1882                 :          * Set end time.  We do this now, not before issuing the command like
                               1883                 :          * in normal mode, for the same reason as for the cancel_requested
                               1884                 :          * entries.
                               1885                 :          */
                               1886               8 :         endtime = TimestampTzPlusMilliseconds(GetCurrentTimestamp(),
                               1887                 :                                               CONNECTION_CLEANUP_TIMEOUT);
                               1888                 : 
                               1889               8 :         CONSTRUCT_ABORT_COMMAND(sql, entry, toplevel);
                               1890               8 :         if (!pgfdw_exec_cleanup_query_end(entry->conn, sql, endtime,
                               1891                 :                                           true, false))
                               1892                 :         {
                               1893                 :             /* Unable to abort remote (sub)transaction */
    3 efujita                  1894 UNC           0 :             pgfdw_reset_xact_state(entry, toplevel);
    3 efujita                  1895 GNC           4 :             continue;
                               1896                 :         }
                               1897                 : 
                               1898               8 :         if (toplevel)
                               1899                 :         {
                               1900                 :             /* Do a DEALLOCATE ALL in parallel if needed */
                               1901               4 :             if (entry->have_prep_stmt && entry->have_error)
                               1902                 :             {
                               1903               4 :                 if (!pgfdw_exec_cleanup_query_begin(entry->conn,
                               1904                 :                                                     "DEALLOCATE ALL"))
                               1905                 :                 {
                               1906                 :                     /* Trouble clearing prepared statements */
    3 efujita                  1907 UNC           0 :                     pgfdw_reset_xact_state(entry, toplevel);
                               1908                 :                 }
                               1909                 :                 else
    3 efujita                  1910 GNC           4 :                     pending_deallocs = lappend(pending_deallocs, entry);
                               1911               4 :                 continue;
                               1912                 :             }
    3 efujita                  1913 UNC           0 :             entry->have_prep_stmt = false;
                               1914               0 :             entry->have_error = false;
                               1915                 :         }
                               1916                 : 
                               1917                 :         /* Reset the per-connection state if needed */
    3 efujita                  1918 GNC           4 :         if (entry->state.pendingAreq)
    3 efujita                  1919 UNC           0 :             memset(&entry->state, 0, sizeof(entry->state));
                               1920                 : 
                               1921                 :         /* We're done with this entry; unset the changing_xact_state flag */
    3 efujita                  1922 GNC           4 :         entry->changing_xact_state = false;
                               1923               4 :         pgfdw_reset_xact_state(entry, toplevel);
                               1924                 :     }
                               1925                 : 
                               1926                 :     /* No further work if no pending entries */
                               1927               4 :     if (!pending_deallocs)
                               1928               2 :         return;
                               1929               2 :     Assert(toplevel);
                               1930                 : 
                               1931                 :     /*
                               1932                 :      * Get the result of the DEALLOCATE command for each of the pending
                               1933                 :      * entries
                               1934                 :      */
                               1935               6 :     foreach(lc, pending_deallocs)
                               1936                 :     {
                               1937               4 :         ConnCacheEntry *entry = (ConnCacheEntry *) lfirst(lc);
                               1938                 :         TimestampTz endtime;
                               1939                 : 
                               1940               4 :         Assert(entry->changing_xact_state);
                               1941               4 :         Assert(entry->have_prep_stmt);
                               1942               4 :         Assert(entry->have_error);
                               1943                 : 
                               1944                 :         /*
                               1945                 :          * Set end time.  We do this now, not before issuing the command like
                               1946                 :          * in normal mode, for the same reason as for the cancel_requested
                               1947                 :          * entries.
                               1948                 :          */
                               1949               4 :         endtime = TimestampTzPlusMilliseconds(GetCurrentTimestamp(),
                               1950                 :                                               CONNECTION_CLEANUP_TIMEOUT);
                               1951                 : 
                               1952               4 :         if (!pgfdw_exec_cleanup_query_end(entry->conn, "DEALLOCATE ALL",
                               1953                 :                                           endtime, true, true))
                               1954                 :         {
                               1955                 :             /* Trouble clearing prepared statements */
    3 efujita                  1956 UNC           0 :             pgfdw_reset_xact_state(entry, toplevel);
                               1957               0 :             continue;
                               1958                 :         }
    3 efujita                  1959 GNC           4 :         entry->have_prep_stmt = false;
                               1960               4 :         entry->have_error = false;
                               1961                 : 
                               1962                 :         /* Reset the per-connection state if needed */
                               1963               4 :         if (entry->state.pendingAreq)
    3 efujita                  1964 UNC           0 :             memset(&entry->state, 0, sizeof(entry->state));
                               1965                 : 
                               1966                 :         /* We're done with this entry; unset the changing_xact_state flag */
    3 efujita                  1967 GNC           4 :         entry->changing_xact_state = false;
                               1968               4 :         pgfdw_reset_xact_state(entry, toplevel);
                               1969                 :     }
                               1970                 : }
                               1971                 : 
                               1972                 : /*
                               1973                 :  * List active foreign server connections.
                               1974                 :  *
  811 fujii                    1975 ECB             :  * This function takes no input parameter and returns setof record made of
                               1976                 :  * following values:
                               1977                 :  * - server_name - server name of active connection. In case the foreign server
                               1978                 :  *   is dropped but still the connection is active, then the server name will
                               1979                 :  *   be NULL in output.
                               1980                 :  * - valid - true/false representing whether the connection is valid or not.
                               1981                 :  *   Note that the connections can get invalidated in pgfdw_inval_callback.
                               1982                 :  *
  811 fujii                    1983 EUB             :  * No records are returned when there are no cached connections at all.
                               1984                 :  */
                               1985                 : Datum
  811 fujii                    1986 GIC          11 : postgres_fdw_get_connections(PG_FUNCTION_ARGS)
                               1987                 : {
  811 fujii                    1988 ECB             : #define POSTGRES_FDW_GET_CONNECTIONS_COLS   2
  811 fujii                    1989 GBC          11 :     ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
                               1990                 :     HASH_SEQ_STATUS scan;
                               1991                 :     ConnCacheEntry *entry;
                               1992                 : 
  173 michael                  1993 GIC          11 :     InitMaterializedSRF(fcinfo, 0);
  811 fujii                    1994 ECB             : 
                               1995                 :     /* If cache doesn't exist, we return no records */
  811 fujii                    1996 GIC          11 :     if (!ConnectionHash)
  811 fujii                    1997 LBC           0 :         PG_RETURN_VOID();
                               1998                 : 
  811 fujii                    1999 GIC          11 :     hash_seq_init(&scan, ConnectionHash);
                               2000              82 :     while ((entry = (ConnCacheEntry *) hash_seq_search(&scan)))
                               2001                 :     {
                               2002                 :         ForeignServer *server;
  267 peter                    2003 GNC          71 :         Datum       values[POSTGRES_FDW_GET_CONNECTIONS_COLS] = {0};
                               2004              71 :         bool        nulls[POSTGRES_FDW_GET_CONNECTIONS_COLS] = {0};
  811 fujii                    2005 ECB             : 
                               2006                 :         /* We only look for open remote connections */
  811 fujii                    2007 GBC          71 :         if (!entry->conn)
                               2008              60 :             continue;
  811 fujii                    2009 EUB             : 
  811 fujii                    2010 GIC          11 :         server = GetForeignServerExtended(entry->serverid, FSV_MISSING_OK);
                               2011                 : 
  811 fujii                    2012 ECB             :         /*
                               2013                 :          * The foreign server may have been dropped in current explicit
  811 fujii                    2014 EUB             :          * transaction. It is not possible to drop the server from another
  811 fujii                    2015 ECB             :          * session when the connection associated with it is in use in the
                               2016                 :          * current transaction, if tried so, the drop query in another session
                               2017                 :          * blocks until the current transaction finishes.
                               2018                 :          *
                               2019                 :          * Even though the server is dropped in the current transaction, the
                               2020                 :          * cache can still have associated active connection entry, say we
                               2021                 :          * call such connections dangling. Since we can not fetch the server
                               2022                 :          * name from system catalogs for dangling connections, instead we show
                               2023                 :          * NULL value for server name in output.
                               2024                 :          *
                               2025                 :          * We could have done better by storing the server name in the cache
                               2026                 :          * entry instead of server oid so that it could be used in the output.
                               2027                 :          * But the server name in each cache entry requires 64 bytes of
                               2028                 :          * memory, which is huge, when there are many cached connections and
                               2029                 :          * the use case i.e. dropping the foreign server within the explicit
                               2030                 :          * current transaction seems rare. So, we chose to show NULL value for
                               2031                 :          * server name in output.
                               2032                 :          *
                               2033                 :          * Such dangling connections get closed either in next use or at the
                               2034                 :          * end of current explicit transaction in pgfdw_xact_callback.
                               2035                 :          */
  811 fujii                    2036 GIC          11 :         if (!server)
  811 fujii                    2037 ECB             :         {
                               2038                 :             /*
                               2039                 :              * If the server has been dropped in the current explicit
                               2040                 :              * transaction, then this entry would have been invalidated in
  803                          2041                 :              * pgfdw_inval_callback at the end of drop server command. Note
                               2042                 :              * that this connection would not have been closed in
                               2043                 :              * pgfdw_inval_callback because it is still being used in the
                               2044                 :              * current explicit transaction. So, assert that here.
                               2045                 :              */
  811 fujii                    2046 GIC           1 :             Assert(entry->conn && entry->xact_depth > 0 && entry->invalidated);
  811 fujii                    2047 ECB             : 
                               2048                 :             /* Show null, if no server name was found */
  811 fujii                    2049 GIC           1 :             nulls[0] = true;
                               2050                 :         }
  811 fujii                    2051 ECB             :         else
  811 fujii                    2052 GIC          10 :             values[0] = CStringGetTextDatum(server->servername);
                               2053                 : 
  811 fujii                    2054 CBC          11 :         values[1] = BoolGetDatum(!entry->invalidated);
                               2055                 : 
  397 michael                  2056              11 :         tuplestore_putvalues(rsinfo->setResult, rsinfo->setDesc, values, nulls);
  811 fujii                    2057 ECB             :     }
                               2058                 : 
  811 fujii                    2059 GIC          11 :     PG_RETURN_VOID();
  811 fujii                    2060 ECB             : }
  803                          2061                 : 
                               2062                 : /*
                               2063                 :  * Disconnect the specified cached connections.
                               2064                 :  *
                               2065                 :  * This function discards the open connections that are established by
                               2066                 :  * postgres_fdw from the local session to the foreign server with
                               2067                 :  * the given name. Note that there can be multiple connections to
                               2068                 :  * the given server using different user mappings. If the connections
                               2069                 :  * are used in the current local transaction, they are not disconnected
                               2070                 :  * and warning messages are reported. This function returns true
                               2071                 :  * if it disconnects at least one connection, otherwise false. If no
                               2072                 :  * foreign server with the given name is found, an error is reported.
                               2073                 :  */
                               2074                 : Datum
  803 fujii                    2075 GIC           4 : postgres_fdw_disconnect(PG_FUNCTION_ARGS)
                               2076                 : {
                               2077                 :     ForeignServer *server;
  803 fujii                    2078 ECB             :     char       *servername;
                               2079                 : 
  803 fujii                    2080 GIC           4 :     servername = text_to_cstring(PG_GETARG_TEXT_PP(0));
  803 fujii                    2081 CBC           4 :     server = GetForeignServerByName(servername, false);
                               2082                 : 
                               2083               3 :     PG_RETURN_BOOL(disconnect_cached_connections(server->serverid));
                               2084                 : }
  803 fujii                    2085 ECB             : 
  803 fujii                    2086 EUB             : /*
                               2087                 :  * Disconnect all the cached connections.
  803 fujii                    2088 ECB             :  *
                               2089                 :  * This function discards all the open connections that are established by
                               2090                 :  * postgres_fdw from the local session to the foreign servers.
                               2091                 :  * If the connections are used in the current local transaction, they are
                               2092                 :  * not disconnected and warning messages are reported. This function
                               2093                 :  * returns true if it disconnects at least one connection, otherwise false.
                               2094                 :  */
                               2095                 : Datum
  803 fujii                    2096 GIC           4 : postgres_fdw_disconnect_all(PG_FUNCTION_ARGS)
                               2097                 : {
                               2098               4 :     PG_RETURN_BOOL(disconnect_cached_connections(InvalidOid));
                               2099                 : }
  803 fujii                    2100 ECB             : 
                               2101                 : /*
                               2102                 :  * Workhorse to disconnect cached connections.
                               2103                 :  *
                               2104                 :  * This function scans all the connection cache entries and disconnects
                               2105                 :  * the open connections whose foreign server OID matches with
                               2106                 :  * the specified one. If InvalidOid is specified, it disconnects all
                               2107                 :  * the cached connections.
                               2108                 :  *
                               2109                 :  * This function emits a warning for each connection that's used in
                               2110                 :  * the current transaction and doesn't close it. It returns true if
                               2111                 :  * it disconnects at least one connection, otherwise false.
                               2112                 :  *
                               2113                 :  * Note that this function disconnects even the connections that are
                               2114                 :  * established by other users in the same local session using different
                               2115                 :  * user mappings. This leads even non-superuser to be able to close
                               2116                 :  * the connections established by superusers in the same local session.
                               2117                 :  *
                               2118                 :  * XXX As of now we don't see any security risk doing this. But we should
                               2119                 :  * set some restrictions on that, for example, prevent non-superuser
                               2120                 :  * from closing the connections established by superusers even
                               2121                 :  * in the same session?
                               2122                 :  */
                               2123                 : static bool
  803 fujii                    2124 GIC           7 : disconnect_cached_connections(Oid serverid)
  803 fujii                    2125 ECB             : {
                               2126                 :     HASH_SEQ_STATUS scan;
                               2127                 :     ConnCacheEntry *entry;
  803 fujii                    2128 GIC           7 :     bool        all = !OidIsValid(serverid);
                               2129               7 :     bool        result = false;
                               2130                 : 
                               2131                 :     /*
                               2132                 :      * Connection cache hashtable has not been initialized yet in this
                               2133                 :      * session, so return false.
  803 fujii                    2134 ECB             :      */
  803 fujii                    2135 GIC           7 :     if (!ConnectionHash)
  803 fujii                    2136 UIC           0 :         return false;
  803 fujii                    2137 ECB             : 
  803 fujii                    2138 GIC           7 :     hash_seq_init(&scan, ConnectionHash);
                               2139              50 :     while ((entry = (ConnCacheEntry *) hash_seq_search(&scan)))
                               2140                 :     {
                               2141                 :         /* Ignore cache entry if no open connection right now. */
                               2142              43 :         if (!entry->conn)
                               2143              32 :             continue;
  803 fujii                    2144 ECB             : 
  803 fujii                    2145 GIC          11 :         if (all || entry->serverid == serverid)
  803 fujii                    2146 EUB             :         {
                               2147                 :             /*
                               2148                 :              * Emit a warning because the connection to close is used in the
                               2149                 :              * current transaction and cannot be disconnected right now.
                               2150                 :              */
  803 fujii                    2151 GIC           8 :             if (entry->xact_depth > 0)
  803 fujii                    2152 EUB             :             {
                               2153                 :                 ForeignServer *server;
                               2154                 : 
  803 fujii                    2155 GIC           3 :                 server = GetForeignServerExtended(entry->serverid,
                               2156                 :                                                   FSV_MISSING_OK);
                               2157                 : 
                               2158               3 :                 if (!server)
                               2159                 :                 {
                               2160                 :                     /*
                               2161                 :                      * If the foreign server was dropped while its connection
                               2162                 :                      * was used in the current transaction, the connection
  803 fujii                    2163 EUB             :                      * must have been marked as invalid by
                               2164                 :                      * pgfdw_inval_callback at the end of DROP SERVER command.
                               2165                 :                      */
  803 fujii                    2166 UBC           0 :                     Assert(entry->invalidated);
                               2167                 : 
  803 fujii                    2168 UIC           0 :                     ereport(WARNING,
  803 fujii                    2169 EUB             :                             (errmsg("cannot close dropped server connection because it is still in use")));
                               2170                 :                 }
                               2171                 :                 else
  803 fujii                    2172 GIC           3 :                     ereport(WARNING,
                               2173                 :                             (errmsg("cannot close connection for server \"%s\" because it is still in use",
  803 fujii                    2174 EUB             :                                     server->servername)));
                               2175                 :             }
                               2176                 :             else
                               2177                 :             {
  803 fujii                    2178 GBC           5 :                 elog(DEBUG3, "discarding connection %p", entry->conn);
  803 fujii                    2179 GIC           5 :                 disconnect_pg_server(entry);
                               2180               5 :                 result = true;
  803 fujii                    2181 EUB             :             }
                               2182                 :         }
                               2183                 :     }
                               2184                 : 
  803 fujii                    2185 GIC           7 :     return result;
  803 fujii                    2186 ECB             : }
        

Generated by: LCOV version v1.16-55-g56c0a2a