LCOV - differential code coverage report
Current view: top level - contrib/postgres_fdw - connection.c (source / functions) Coverage Total Hit UNC LBC UIC UBC GBC GIC GNC CBC EUB ECB DUB DCB
Current: Differential Code Coverage HEAD vs 15 Lines: 83.2 % 619 515 40 17 44 3 14 303 84 114 83 368 4 16
Current Date: 2023-04-08 15:15:32 Functions: 92.7 % 41 38 2 1 34 4 3 38
Baseline: 15
Baseline Date: 2023-04-08 15:09:40
Legend: Lines: hit not hit

           TLA  Line data    Source code
       1                 : /*-------------------------------------------------------------------------
       2                 :  *
       3                 :  * connection.c
       4                 :  *        Connection management functions for postgres_fdw
       5                 :  *
       6                 :  * Portions Copyright (c) 2012-2023, PostgreSQL Global Development Group
       7                 :  *
       8                 :  * IDENTIFICATION
       9                 :  *        contrib/postgres_fdw/connection.c
      10                 :  *
      11                 :  *-------------------------------------------------------------------------
      12                 :  */
      13                 : #include "postgres.h"
      14                 : 
      15                 : #include "access/htup_details.h"
      16                 : #include "access/xact.h"
      17                 : #include "catalog/pg_user_mapping.h"
      18                 : #include "commands/defrem.h"
      19                 : #include "funcapi.h"
      20                 : #include "libpq/libpq-be-fe-helpers.h"
      21                 : #include "mb/pg_wchar.h"
      22                 : #include "miscadmin.h"
      23                 : #include "pgstat.h"
      24                 : #include "postgres_fdw.h"
      25                 : #include "storage/fd.h"
      26                 : #include "storage/latch.h"
      27                 : #include "utils/builtins.h"
      28                 : #include "utils/datetime.h"
      29                 : #include "utils/hsearch.h"
      30                 : #include "utils/inval.h"
      31                 : #include "utils/memutils.h"
      32                 : #include "utils/syscache.h"
      33                 : 
      34                 : /*
      35                 :  * Connection cache hash table entry
      36                 :  *
      37                 :  * The lookup key in this hash table is the user mapping OID. We use just one
      38                 :  * connection per user mapping ID, which ensures that all the scans use the
      39                 :  * same snapshot during a query.  Using the user mapping OID rather than
      40                 :  * the foreign server OID + user OID avoids creating multiple connections when
      41                 :  * the public user mapping applies to all user OIDs.
      42                 :  *
      43                 :  * The "conn" pointer can be NULL if we don't currently have a live connection.
      44                 :  * When we do have a connection, xact_depth tracks the current depth of
      45                 :  * transactions and subtransactions open on the remote side.  We need to issue
      46                 :  * commands at the same nesting depth on the remote as we're executing at
      47                 :  * ourselves, so that rolling back a subtransaction will kill the right
      48                 :  * queries and not the wrong ones.
      49                 :  */
      50                 : typedef Oid ConnCacheKey;
      51                 : 
      52                 : typedef struct ConnCacheEntry
      53                 : {
      54                 :     ConnCacheKey key;           /* hash key (must be first) */
      55                 :     PGconn     *conn;           /* connection to foreign server, or NULL */
      56                 :     /* Remaining fields are invalid when conn is NULL: */
      57                 :     int         xact_depth;     /* 0 = no xact open, 1 = main xact open, 2 =
      58                 :                                  * one level of subxact open, etc */
      59                 :     bool        have_prep_stmt; /* have we prepared any stmts in this xact? */
      60                 :     bool        have_error;     /* have any subxacts aborted in this xact? */
      61                 :     bool        changing_xact_state;    /* xact state change in process */
      62                 :     bool        parallel_commit;    /* do we commit (sub)xacts in parallel? */
      63                 :     bool        parallel_abort; /* do we abort (sub)xacts in parallel? */
      64                 :     bool        invalidated;    /* true if reconnect is pending */
      65                 :     bool        keep_connections;   /* setting value of keep_connections
      66                 :                                      * server option */
      67                 :     Oid         serverid;       /* foreign server OID used to get server name */
      68                 :     uint32      server_hashvalue;   /* hash value of foreign server OID */
      69                 :     uint32      mapping_hashvalue;  /* hash value of user mapping OID */
      70                 :     PgFdwConnState state;       /* extra per-connection state */
      71                 : } ConnCacheEntry;
      72                 : 
      73                 : /*
      74                 :  * Connection cache (initialized on first use)
      75                 :  */
      76                 : static HTAB *ConnectionHash = NULL;
      77                 : 
      78                 : /* for assigning cursor numbers and prepared statement numbers */
      79                 : static unsigned int cursor_number = 0;
      80                 : static unsigned int prep_stmt_number = 0;
      81                 : 
      82                 : /* tracks whether any work is needed in callback functions */
      83                 : static bool xact_got_connection = false;
      84                 : 
      85                 : /*
      86                 :  * Milliseconds to wait to cancel an in-progress query or execute a cleanup
      87                 :  * query; if it takes longer than 30 seconds to do these, we assume the
      88                 :  * connection is dead.
      89                 :  */
      90                 : #define CONNECTION_CLEANUP_TIMEOUT  30000
      91                 : 
      92                 : /* Macro for constructing abort command to be sent */
      93                 : #define CONSTRUCT_ABORT_COMMAND(sql, entry, toplevel) \
      94                 :     do { \
      95                 :         if (toplevel) \
      96                 :             snprintf((sql), sizeof(sql), \
      97                 :                      "ABORT TRANSACTION"); \
      98                 :         else \
      99                 :             snprintf((sql), sizeof(sql), \
     100                 :                      "ROLLBACK TO SAVEPOINT s%d; RELEASE SAVEPOINT s%d", \
     101                 :                      (entry)->xact_depth, (entry)->xact_depth); \
     102                 :     } while(0)
     103                 : 
     104                 : /*
     105                 :  * SQL functions
     106                 :  */
     107 GIC           2 : PG_FUNCTION_INFO_V1(postgres_fdw_get_connections);
     108               2 : PG_FUNCTION_INFO_V1(postgres_fdw_disconnect);
     109               2 : PG_FUNCTION_INFO_V1(postgres_fdw_disconnect_all);
     110                 : 
     111                 : /* prototypes of private functions */
     112                 : static void make_new_connection(ConnCacheEntry *entry, UserMapping *user);
     113                 : static PGconn *connect_pg_server(ForeignServer *server, UserMapping *user);
     114                 : static void disconnect_pg_server(ConnCacheEntry *entry);
     115                 : static void check_conn_params(const char **keywords, const char **values, UserMapping *user);
     116                 : static void configure_remote_session(PGconn *conn);
     117                 : static void do_sql_command_begin(PGconn *conn, const char *sql);
     118                 : static void do_sql_command_end(PGconn *conn, const char *sql,
     119                 :                                bool consume_input);
     120                 : static void begin_remote_xact(ConnCacheEntry *entry);
     121                 : static void pgfdw_xact_callback(XactEvent event, void *arg);
     122                 : static void pgfdw_subxact_callback(SubXactEvent event,
     123                 :                                    SubTransactionId mySubid,
     124                 :                                    SubTransactionId parentSubid,
     125                 :                                    void *arg);
     126                 : static void pgfdw_inval_callback(Datum arg, int cacheid, uint32 hashvalue);
     127                 : static void pgfdw_reject_incomplete_xact_state_change(ConnCacheEntry *entry);
     128 ECB             : static void pgfdw_reset_xact_state(ConnCacheEntry *entry, bool toplevel);
     129                 : static bool pgfdw_cancel_query(PGconn *conn);
     130                 : static bool pgfdw_cancel_query_begin(PGconn *conn);
     131                 : static bool pgfdw_cancel_query_end(PGconn *conn, TimestampTz endtime,
     132                 :                                    bool consume_input);
     133                 : static bool pgfdw_exec_cleanup_query(PGconn *conn, const char *query,
     134                 :                                      bool ignore_errors);
     135                 : static bool pgfdw_exec_cleanup_query_begin(PGconn *conn, const char *query);
     136                 : static bool pgfdw_exec_cleanup_query_end(PGconn *conn, const char *query,
     137                 :                                          TimestampTz endtime,
     138                 :                                          bool consume_input,
     139                 :                                          bool ignore_errors);
     140                 : static bool pgfdw_get_cleanup_result(PGconn *conn, TimestampTz endtime,
     141                 :                                      PGresult **result, bool *timed_out);
     142                 : static void pgfdw_abort_cleanup(ConnCacheEntry *entry, bool toplevel);
     143                 : static bool pgfdw_abort_cleanup_begin(ConnCacheEntry *entry, bool toplevel,
     144                 :                                       List **pending_entries,
     145                 :                                       List **cancel_requested);
     146                 : static void pgfdw_finish_pre_commit_cleanup(List *pending_entries);
     147                 : static void pgfdw_finish_pre_subcommit_cleanup(List *pending_entries,
     148                 :                                                int curlevel);
     149                 : static void pgfdw_finish_abort_cleanup(List *pending_entries,
     150                 :                                        List *cancel_requested,
     151                 :                                        bool toplevel);
     152                 : static bool UserMappingPasswordRequired(UserMapping *user);
     153                 : static bool disconnect_cached_connections(Oid serverid);
     154                 : 
     155                 : /*
     156                 :  * Get a PGconn which can be used to execute queries on the remote PostgreSQL
     157                 :  * server with the user's authorization.  A new connection is established
     158                 :  * if we don't already have a suitable one, and a transaction is opened at
     159                 :  * the right subtransaction nesting depth if we didn't do that already.
     160                 :  *
     161                 :  * will_prep_stmt must be true if caller intends to create any prepared
     162                 :  * statements.  Since those don't go away automatically at transaction end
     163                 :  * (not even on error), we need this flag to cue manual cleanup.
     164                 :  *
     165                 :  * If state is not NULL, *state receives the per-connection state associated
     166                 :  * with the PGconn.
     167                 :  */
     168                 : PGconn *
     169 GIC        1784 : GetConnection(UserMapping *user, bool will_prep_stmt, PgFdwConnState **state)
     170                 : {
     171                 :     bool        found;
     172            1784 :     bool        retry = false;
     173                 :     ConnCacheEntry *entry;
     174                 :     ConnCacheKey key;
     175            1784 :     MemoryContext ccxt = CurrentMemoryContext;
     176                 : 
     177                 :     /* First time through, initialize connection cache hashtable */
     178            1784 :     if (ConnectionHash == NULL)
     179                 :     {
     180                 :         HASHCTL     ctl;
     181                 : 
     182               4 :         ctl.keysize = sizeof(ConnCacheKey);
     183               4 :         ctl.entrysize = sizeof(ConnCacheEntry);
     184               4 :         ConnectionHash = hash_create("postgres_fdw connections", 8,
     185                 :                                      &ctl,
     186                 :                                      HASH_ELEM | HASH_BLOBS);
     187                 : 
     188                 :         /*
     189                 :          * Register some callback functions that manage connection cleanup.
     190                 :          * This should be done just once in each backend.
     191                 :          */
     192               4 :         RegisterXactCallback(pgfdw_xact_callback, NULL);
     193               4 :         RegisterSubXactCallback(pgfdw_subxact_callback, NULL);
     194               4 :         CacheRegisterSyscacheCallback(FOREIGNSERVEROID,
     195                 :                                       pgfdw_inval_callback, (Datum) 0);
     196               4 :         CacheRegisterSyscacheCallback(USERMAPPINGOID,
     197                 :                                       pgfdw_inval_callback, (Datum) 0);
     198                 :     }
     199                 : 
     200                 :     /* Set flag that we did GetConnection during the current transaction */
     201            1784 :     xact_got_connection = true;
     202                 : 
     203                 :     /* Create hash key for the entry.  Assume no pad bytes in key struct */
     204 CBC        1784 :     key = user->umid;
     205                 : 
     206                 :     /*
     207 ECB             :      * Find or create cached entry for requested connection.
     208                 :      */
     209 GIC        1784 :     entry = hash_search(ConnectionHash, &key, HASH_ENTER, &found);
     210 CBC        1784 :     if (!found)
     211                 :     {
     212                 :         /*
     213 ECB             :          * We need only clear "conn" here; remaining fields will be filled
     214                 :          * later when "conn" is set.
     215                 :          */
     216 GIC          11 :         entry->conn = NULL;
     217 ECB             :     }
     218                 : 
     219                 :     /* Reject further use of connections which failed abort cleanup. */
     220 GIC        1784 :     pgfdw_reject_incomplete_xact_state_change(entry);
     221                 : 
     222                 :     /*
     223                 :      * If the connection needs to be remade due to invalidation, disconnect as
     224                 :      * soon as we're out of all transactions.
     225                 :      */
     226            1784 :     if (entry->conn != NULL && entry->invalidated && entry->xact_depth == 0)
     227 ECB             :     {
     228 LBC           0 :         elog(DEBUG3, "closing connection %p for option changes to take effect",
     229 ECB             :              entry->conn);
     230 UIC           0 :         disconnect_pg_server(entry);
     231 ECB             :     }
     232                 : 
     233                 :     /*
     234                 :      * If cache entry doesn't have a connection, we have to establish a new
     235                 :      * connection.  (If connect_pg_server throws an error, the cache entry
     236                 :      * will remain in a valid empty state, ie conn == NULL.)
     237                 :      */
     238 GIC        1784 :     if (entry->conn == NULL)
     239 CBC          61 :         make_new_connection(entry, user);
     240                 : 
     241                 :     /*
     242                 :      * We check the health of the cached connection here when using it.  In
     243                 :      * cases where we're out of all transactions, if a broken connection is
     244 ECB             :      * detected, we try to reestablish a new connection later.
     245                 :      */
     246 GIC        1778 :     PG_TRY();
     247                 :     {
     248                 :         /* Process a pending asynchronous request if any. */
     249            1778 :         if (entry->state.pendingAreq)
     250 UIC           0 :             process_pending_request(entry->state.pendingAreq);
     251 ECB             :         /* Start a new transaction or subtransaction if needed. */
     252 GIC        1778 :         begin_remote_xact(entry);
     253                 :     }
     254               2 :     PG_CATCH();
     255 ECB             :     {
     256 GIC           2 :         MemoryContext ecxt = MemoryContextSwitchTo(ccxt);
     257               2 :         ErrorData  *errdata = CopyErrorData();
     258                 : 
     259                 :         /*
     260                 :          * Determine whether to try to reestablish the connection.
     261 ECB             :          *
     262                 :          * After a broken connection is detected in libpq, any error other
     263 EUB             :          * than connection failure (e.g., out-of-memory) can be thrown
     264                 :          * somewhere between return from libpq and the expected ereport() call
     265                 :          * in pgfdw_report_error(). In this case, since PQstatus() indicates
     266                 :          * CONNECTION_BAD, checking only PQstatus() causes the false detection
     267                 :          * of connection failure. To avoid this, we also verify that the
     268                 :          * error's sqlstate is ERRCODE_CONNECTION_FAILURE. Note that also
     269                 :          * checking only the sqlstate can cause another false detection
     270                 :          * because pgfdw_report_error() may report ERRCODE_CONNECTION_FAILURE
     271                 :          * for any libpq-originated error condition.
     272                 :          */
     273 CBC           2 :         if (errdata->sqlerrcode != ERRCODE_CONNECTION_FAILURE ||
     274               2 :             PQstatus(entry->conn) != CONNECTION_BAD ||
     275 GIC           2 :             entry->xact_depth > 0)
     276                 :         {
     277               1 :             MemoryContextSwitchTo(ecxt);
     278               1 :             PG_RE_THROW();
     279                 :         }
     280                 : 
     281 ECB             :         /* Clean up the error state */
     282 GIC           1 :         FlushErrorState();
     283               1 :         FreeErrorData(errdata);
     284 CBC           1 :         errdata = NULL;
     285 EUB             : 
     286 GIC           1 :         retry = true;
     287 ECB             :     }
     288 GIC        1777 :     PG_END_TRY();
     289 ECB             : 
     290                 :     /*
     291                 :      * If a broken connection is detected, disconnect it, reestablish a new
     292                 :      * connection and retry a new remote transaction. If connection failure is
     293                 :      * reported again, we give up getting a connection.
     294                 :      */
     295 GIC        1777 :     if (retry)
     296                 :     {
     297               1 :         Assert(entry->xact_depth == 0);
     298                 : 
     299               1 :         ereport(DEBUG3,
     300                 :                 (errmsg_internal("could not start remote transaction on connection %p",
     301                 :                                  entry->conn)),
     302                 :                 errdetail_internal("%s", pchomp(PQerrorMessage(entry->conn))));
     303                 : 
     304               1 :         elog(DEBUG3, "closing connection %p to reestablish a new one",
     305                 :              entry->conn);
     306               1 :         disconnect_pg_server(entry);
     307                 : 
     308 GNC           1 :         make_new_connection(entry, user);
     309 ECB             : 
     310 GIC           1 :         begin_remote_xact(entry);
     311 ECB             :     }
     312                 : 
     313                 :     /* Remember if caller will prepare statements */
     314 GIC        1777 :     entry->have_prep_stmt |= will_prep_stmt;
     315                 : 
     316 ECB             :     /* If caller needs access to the per-connection state, return it. */
     317 CBC        1777 :     if (state)
     318             704 :         *state = &entry->state;
     319                 : 
     320            1777 :     return entry->conn;
     321                 : }
     322 ECB             : 
     323                 : /*
     324                 :  * Reset all transient state fields in the cached connection entry and
     325                 :  * establish new connection to the remote server.
     326                 :  */
     327                 : static void
     328 GIC          62 : make_new_connection(ConnCacheEntry *entry, UserMapping *user)
     329 ECB             : {
     330 GIC          62 :     ForeignServer *server = GetForeignServer(user->serverid);
     331 ECB             :     ListCell   *lc;
     332                 : 
     333 CBC          62 :     Assert(entry->conn == NULL);
     334                 : 
     335                 :     /* Reset all transient state fields, to be sure all are clean */
     336 GIC          62 :     entry->xact_depth = 0;
     337              62 :     entry->have_prep_stmt = false;
     338 CBC          62 :     entry->have_error = false;
     339 GIC          62 :     entry->changing_xact_state = false;
     340 CBC          62 :     entry->invalidated = false;
     341 GIC          62 :     entry->serverid = server->serverid;
     342 CBC          62 :     entry->server_hashvalue =
     343 GIC          62 :         GetSysCacheHashValue1(FOREIGNSERVEROID,
     344 ECB             :                               ObjectIdGetDatum(server->serverid));
     345 GIC          62 :     entry->mapping_hashvalue =
     346              62 :         GetSysCacheHashValue1(USERMAPPINGOID,
     347                 :                               ObjectIdGetDatum(user->umid));
     348 CBC          62 :     memset(&entry->state, 0, sizeof(entry->state));
     349                 : 
     350                 :     /*
     351 ECB             :      * Determine whether to keep the connection that we're about to make here
     352                 :      * open even after the transaction using it ends, so that the subsequent
     353                 :      * transactions can re-use it.
     354                 :      *
     355                 :      * By default, all the connections to any foreign servers are kept open.
     356                 :      *
     357                 :      * Also determine whether to commit/abort (sub)transactions opened on the
     358                 :      * remote server in parallel at (sub)transaction end, which is disabled by
     359                 :      * default.
     360                 :      *
     361                 :      * Note: it's enough to determine these only when making a new connection
     362                 :      * because if these settings for it are changed, it will be closed and
     363                 :      * re-made later.
     364                 :      */
     365 GIC          62 :     entry->keep_connections = true;
     366              62 :     entry->parallel_commit = false;
     367 GNC          62 :     entry->parallel_abort = false;
     368 CBC         268 :     foreach(lc, server->options)
     369                 :     {
     370 GIC         206 :         DefElem    *def = (DefElem *) lfirst(lc);
     371 ECB             : 
     372 CBC         206 :         if (strcmp(def->defname, "keep_connections") == 0)
     373               6 :             entry->keep_connections = defGetBoolean(def);
     374             200 :         else if (strcmp(def->defname, "parallel_commit") == 0)
     375               2 :             entry->parallel_commit = defGetBoolean(def);
     376 GNC         198 :         else if (strcmp(def->defname, "parallel_abort") == 0)
     377               2 :             entry->parallel_abort = defGetBoolean(def);
     378 ECB             :     }
     379                 : 
     380                 :     /* Now try to make the connection */
     381 GIC          62 :     entry->conn = connect_pg_server(server, user);
     382 ECB             : 
     383 CBC          56 :     elog(DEBUG3, "new postgres_fdw connection %p for server \"%s\" (user mapping oid %u, userid %u)",
     384                 :          entry->conn, server->servername, user->umid, user->userid);
     385              56 : }
     386                 : 
     387                 : /*
     388                 :  * Connect to remote server using specified server and user mapping properties.
     389                 :  */
     390                 : static PGconn *
     391 GIC          62 : connect_pg_server(ForeignServer *server, UserMapping *user)
     392                 : {
     393              62 :     PGconn     *volatile conn = NULL;
     394                 : 
     395                 :     /*
     396                 :      * Use PG_TRY block to ensure closing connection on error.
     397                 :      */
     398              62 :     PG_TRY();
     399                 :     {
     400                 :         const char **keywords;
     401                 :         const char **values;
     402 CBC          62 :         char       *appname = NULL;
     403 ECB             :         int         n;
     404                 : 
     405                 :         /*
     406                 :          * Construct connection params from generic options of ForeignServer
     407                 :          * and UserMapping.  (Some of them might not be libpq options, in
     408                 :          * which case we'll just waste a few array slots.)  Add 4 extra slots
     409                 :          * for application_name, fallback_application_name, client_encoding,
     410                 :          * end marker.
     411                 :          */
     412 CBC          62 :         n = list_length(server->options) + list_length(user->options) + 4;
     413              62 :         keywords = (const char **) palloc(n * sizeof(char *));
     414              62 :         values = (const char **) palloc(n * sizeof(char *));
     415                 : 
     416 GIC          62 :         n = 0;
     417             124 :         n += ExtractConnectionOptions(server->options,
     418 CBC          62 :                                       keywords + n, values + n);
     419 GIC         124 :         n += ExtractConnectionOptions(user->options,
     420 CBC          62 :                                       keywords + n, values + n);
     421                 : 
     422 ECB             :         /*
     423                 :          * Use pgfdw_application_name as application_name if set.
     424                 :          *
     425                 :          * PQconnectdbParams() processes the parameter arrays from start to
     426                 :          * end. If any key word is repeated, the last value is used. Therefore
     427                 :          * note that pgfdw_application_name must be added to the arrays after
     428                 :          * options of ForeignServer are, so that it can override
     429                 :          * application_name set in ForeignServer.
     430                 :          */
     431 GIC          62 :         if (pgfdw_application_name && *pgfdw_application_name != '\0')
     432                 :         {
     433               1 :             keywords[n] = "application_name";
     434               1 :             values[n] = pgfdw_application_name;
     435 CBC           1 :             n++;
     436                 :         }
     437                 : 
     438                 :         /*
     439 ECB             :          * Search the parameter arrays to find application_name setting, and
     440                 :          * replace escape sequences in it with status information if found.
     441                 :          * The arrays are searched backwards because the last value is used if
     442                 :          * application_name is repeatedly set.
     443                 :          */
     444 GIC         158 :         for (int i = n - 1; i >= 0; i--)
     445                 :         {
     446             112 :             if (strcmp(keywords[i], "application_name") == 0 &&
     447              16 :                 *(values[i]) != '\0')
     448                 :             {
     449 ECB             :                 /*
     450                 :                  * Use this application_name setting if it's not empty string
     451                 :                  * even after any escape sequences in it are replaced.
     452                 :                  */
     453 CBC          16 :                 appname = process_pgfdw_appname(values[i]);
     454              16 :                 if (appname[0] != '\0')
     455 ECB             :                 {
     456 CBC          16 :                     values[i] = appname;
     457              16 :                     break;
     458                 :                 }
     459                 : 
     460                 :                 /*
     461                 :                  * This empty application_name is not used, so we set
     462                 :                  * values[i] to NULL and keep searching the array to find the
     463                 :                  * next one.
     464                 :                  */
     465 UIC           0 :                 values[i] = NULL;
     466               0 :                 pfree(appname);
     467               0 :                 appname = NULL;
     468 ECB             :             }
     469                 :         }
     470                 : 
     471                 :         /* Use "postgres_fdw" as fallback_application_name */
     472 CBC          62 :         keywords[n] = "fallback_application_name";
     473 GIC          62 :         values[n] = "postgres_fdw";
     474              62 :         n++;
     475                 : 
     476                 :         /* Set client_encoding so that libpq can convert encoding properly. */
     477              62 :         keywords[n] = "client_encoding";
     478              62 :         values[n] = GetDatabaseEncodingName();
     479              62 :         n++;
     480                 : 
     481 CBC          62 :         keywords[n] = values[n] = NULL;
     482                 : 
     483 ECB             :         /* verify the set of connection parameters */
     484 CBC          62 :         check_conn_params(keywords, values, user);
     485                 : 
     486 ECB             :         /* OK to make connection */
     487 GNC          60 :         conn = libpqsrv_connect_params(keywords, values,
     488                 :                                        false,   /* expand_dbname */
     489                 :                                        PG_WAIT_EXTENSION);
     490 ECB             : 
     491 CBC          60 :         if (!conn || PQstatus(conn) != CONNECTION_OK)
     492 GIC           2 :             ereport(ERROR,
     493 ECB             :                     (errcode(ERRCODE_SQLCLIENT_UNABLE_TO_ESTABLISH_SQLCONNECTION),
     494                 :                      errmsg("could not connect to server \"%s\"",
     495                 :                             server->servername),
     496                 :                      errdetail_internal("%s", pchomp(PQerrorMessage(conn)))));
     497                 : 
     498                 :         /*
     499                 :          * Check that non-superuser has used password to establish connection;
     500                 :          * otherwise, he's piggybacking on the postgres server's user
     501                 :          * identity. See also dblink_security_check() in contrib/dblink and
     502                 :          * check_conn_params.
     503                 :          */
     504 CBC          58 :         if (!superuser_arg(user->userid) && UserMappingPasswordRequired(user) &&
     505 GIC           2 :             !PQconnectionUsedPassword(conn))
     506               2 :             ereport(ERROR,
     507                 :                     (errcode(ERRCODE_S_R_E_PROHIBITED_SQL_STATEMENT_ATTEMPTED),
     508                 :                      errmsg("password is required"),
     509                 :                      errdetail("Non-superuser cannot connect if the server does not request a password."),
     510                 :                      errhint("Target server's authentication method must be changed or password_required=false set in the user mapping attributes.")));
     511                 : 
     512                 :         /* Prepare new session for use */
     513              56 :         configure_remote_session(conn);
     514                 : 
     515              56 :         if (appname != NULL)
     516 CBC          16 :             pfree(appname);
     517              56 :         pfree(keywords);
     518              56 :         pfree(values);
     519                 :     }
     520 GIC           6 :     PG_CATCH();
     521                 :     {
     522 GNC           6 :         libpqsrv_disconnect(conn);
     523 CBC           6 :         PG_RE_THROW();
     524 ECB             :     }
     525 CBC          56 :     PG_END_TRY();
     526                 : 
     527              56 :     return conn;
     528                 : }
     529 ECB             : 
     530                 : /*
     531                 :  * Disconnect any open connection for a connection cache entry.
     532                 :  */
     533                 : static void
     534 CBC          53 : disconnect_pg_server(ConnCacheEntry *entry)
     535                 : {
     536 GIC          53 :     if (entry->conn != NULL)
     537                 :     {
     538 GNC          53 :         libpqsrv_disconnect(entry->conn);
     539 GIC          53 :         entry->conn = NULL;
     540 ECB             :     }
     541 GIC          53 : }
     542 ECB             : 
     543                 : /*
     544                 :  * Return true if the password_required is defined and false for this user
     545                 :  * mapping, otherwise false. The mapping has been pre-validated.
     546                 :  */
     547                 : static bool
     548 GIC           5 : UserMappingPasswordRequired(UserMapping *user)
     549                 : {
     550                 :     ListCell   *cell;
     551                 : 
     552               8 :     foreach(cell, user->options)
     553                 :     {
     554 CBC           4 :         DefElem    *def = (DefElem *) lfirst(cell);
     555                 : 
     556 GIC           4 :         if (strcmp(def->defname, "password_required") == 0)
     557               1 :             return defGetBoolean(def);
     558 ECB             :     }
     559                 : 
     560 CBC           4 :     return true;
     561                 : }
     562 ECB             : 
     563                 : /*
     564                 :  * For non-superusers, insist that the connstr specify a password.  This
     565                 :  * prevents a password from being picked up from .pgpass, a service file, the
     566                 :  * environment, etc.  We don't want the postgres user's passwords,
     567                 :  * certificates, etc to be accessible to non-superusers.  (See also
     568                 :  * dblink_connstr_check in contrib/dblink.)
     569                 :  */
     570                 : static void
     571 GIC          62 : check_conn_params(const char **keywords, const char **values, UserMapping *user)
     572                 : {
     573                 :     int         i;
     574                 : 
     575                 :     /* no check required if superuser */
     576              62 :     if (superuser_arg(user->userid))
     577 CBC          57 :         return;
     578                 : 
     579                 :     /* ok if params contain a non-empty password */
     580 GIC          19 :     for (i = 0; keywords[i] != NULL; i++)
     581                 :     {
     582 CBC          17 :         if (strcmp(keywords[i], "password") == 0 && values[i][0] != '\0')
     583               3 :             return;
     584                 :     }
     585                 : 
     586 ECB             :     /* ok if the superuser explicitly said so at user mapping creation time */
     587 GIC           2 :     if (!UserMappingPasswordRequired(user))
     588 LBC           0 :         return;
     589 ECB             : 
     590 GIC           2 :     ereport(ERROR,
     591                 :             (errcode(ERRCODE_S_R_E_PROHIBITED_SQL_STATEMENT_ATTEMPTED),
     592                 :              errmsg("password is required"),
     593 ECB             :              errdetail("Non-superusers must provide a password in the user mapping.")));
     594 EUB             : }
     595                 : 
     596 ECB             : /*
     597                 :  * Issue SET commands to make sure remote session is configured properly.
     598                 :  *
     599                 :  * We do this just once at connection, assuming nothing will change the
     600                 :  * values later.  Since we'll never send volatile function calls to the
     601                 :  * remote, there shouldn't be any way to break this assumption from our end.
     602                 :  * It's possible to think of ways to break it at the remote end, eg making
     603                 :  * a foreign table point to a view that includes a set_config call ---
     604                 :  * but once you admit the possibility of a malicious view definition,
     605                 :  * there are any number of ways to break things.
     606                 :  */
     607                 : static void
     608 GIC          56 : configure_remote_session(PGconn *conn)
     609                 : {
     610              56 :     int         remoteversion = PQserverVersion(conn);
     611                 : 
     612                 :     /* Force the search path to contain only pg_catalog (see deparse.c) */
     613              56 :     do_sql_command(conn, "SET search_path = pg_catalog");
     614 ECB             : 
     615                 :     /*
     616                 :      * Set remote timezone; this is basically just cosmetic, since all
     617                 :      * transmitted and returned timestamptzs should specify a zone explicitly
     618                 :      * anyway.  However it makes the regression test outputs more predictable.
     619                 :      *
     620                 :      * We don't risk setting remote zone equal to ours, since the remote
     621                 :      * server might use a different timezone database.  Instead, use UTC
     622                 :      * (quoted, because very old servers are picky about case).
     623                 :      */
     624 GIC          56 :     do_sql_command(conn, "SET timezone = 'UTC'");
     625                 : 
     626                 :     /*
     627                 :      * Set values needed to ensure unambiguous data output from remote.  (This
     628                 :      * logic should match what pg_dump does.  See also set_transmission_modes
     629                 :      * in postgres_fdw.c.)
     630 ECB             :      */
     631 GIC          56 :     do_sql_command(conn, "SET datestyle = ISO");
     632              56 :     if (remoteversion >= 80400)
     633              56 :         do_sql_command(conn, "SET intervalstyle = postgres");
     634              56 :     if (remoteversion >= 90000)
     635              56 :         do_sql_command(conn, "SET extra_float_digits = 3");
     636                 :     else
     637 LBC           0 :         do_sql_command(conn, "SET extra_float_digits = 2");
     638 CBC          56 : }
     639 ECB             : 
     640                 : /*
     641                 :  * Convenience subroutine to issue a non-data-returning SQL command to remote
     642                 :  */
     643 EUB             : void
     644 CBC        1651 : do_sql_command(PGconn *conn, const char *sql)
     645                 : {
     646 GIC        1651 :     do_sql_command_begin(conn, sql);
     647            1651 :     do_sql_command_end(conn, sql, false);
     648            1648 : }
     649                 : 
     650 ECB             : static void
     651 GIC        1669 : do_sql_command_begin(PGconn *conn, const char *sql)
     652 ECB             : {
     653 CBC        1669 :     if (!PQsendQuery(conn, sql))
     654 LBC           0 :         pgfdw_report_error(ERROR, NULL, conn, false, sql);
     655 GIC        1669 : }
     656                 : 
     657 ECB             : static void
     658 GIC        1669 : do_sql_command_end(PGconn *conn, const char *sql, bool consume_input)
     659 ECB             : {
     660 EUB             :     PGresult   *res;
     661 ECB             : 
     662                 :     /*
     663                 :      * If requested, consume whatever data is available from the socket. (Note
     664                 :      * that if all data is available, this allows pgfdw_get_result to call
     665                 :      * PQgetResult without forcing the overhead of WaitLatchOrSocket, which
     666                 :      * would be large compared to the overhead of PQconsumeInput.)
     667                 :      */
     668 GIC        1669 :     if (consume_input && !PQconsumeInput(conn))
     669 UIC           0 :         pgfdw_report_error(ERROR, NULL, conn, false, sql);
     670 GIC        1669 :     res = pgfdw_get_result(conn, sql);
     671            1667 :     if (PQresultStatus(res) != PGRES_COMMAND_OK)
     672               1 :         pgfdw_report_error(ERROR, res, conn, true, sql);
     673            1666 :     PQclear(res);
     674 CBC        1666 : }
     675 EUB             : 
     676 ECB             : /*
     677                 :  * Start remote transaction or subtransaction, if needed.
     678                 :  *
     679                 :  * Note that we always use at least REPEATABLE READ in the remote session.
     680                 :  * This is so that, if a query initiates multiple scans of the same or
     681                 :  * different foreign tables, we will get snapshot-consistent results from
     682                 :  * those scans.  A disadvantage is that we can't provide sane emulation of
     683                 :  * READ COMMITTED behavior --- it would be nice if we had some other way to
     684                 :  * control which remote queries share a snapshot.
     685                 :  */
     686                 : static void
     687 GIC        1779 : begin_remote_xact(ConnCacheEntry *entry)
     688                 : {
     689            1779 :     int         curlevel = GetCurrentTransactionNestLevel();
     690                 : 
     691                 :     /* Start main transaction if we haven't yet */
     692            1779 :     if (entry->xact_depth <= 0)
     693 ECB             :     {
     694                 :         const char *sql;
     695                 : 
     696 GIC         697 :         elog(DEBUG3, "starting remote transaction on connection %p",
     697                 :              entry->conn);
     698 ECB             : 
     699 GIC         697 :         if (IsolationIsSerializable())
     700 UIC           0 :             sql = "START TRANSACTION ISOLATION LEVEL SERIALIZABLE";
     701                 :         else
     702 CBC         697 :             sql = "START TRANSACTION ISOLATION LEVEL REPEATABLE READ";
     703 GIC         697 :         entry->changing_xact_state = true;
     704             697 :         do_sql_command(entry->conn, sql);
     705 CBC         696 :         entry->xact_depth = 1;
     706 GBC         696 :         entry->changing_xact_state = false;
     707                 :     }
     708 ECB             : 
     709                 :     /*
     710                 :      * If we're in a subtransaction, stack up savepoints to match our level.
     711                 :      * This ensures we can rollback just the desired effects when a
     712                 :      * subtransaction aborts.
     713                 :      */
     714 GIC        1792 :     while (entry->xact_depth < curlevel)
     715                 :     {
     716                 :         char        sql[64];
     717                 : 
     718              15 :         snprintf(sql, sizeof(sql), "SAVEPOINT s%d", entry->xact_depth + 1);
     719              15 :         entry->changing_xact_state = true;
     720 CBC          15 :         do_sql_command(entry->conn, sql);
     721 GIC          14 :         entry->xact_depth++;
     722              14 :         entry->changing_xact_state = false;
     723                 :     }
     724 CBC        1777 : }
     725 ECB             : 
     726                 : /*
     727                 :  * Release connection reference count created by calling GetConnection.
     728                 :  */
     729                 : void
     730 CBC        1728 : ReleaseConnection(PGconn *conn)
     731                 : {
     732                 :     /*
     733                 :      * Currently, we don't actually track connection references because all
     734                 :      * cleanup is managed on a transaction or subtransaction basis instead. So
     735                 :      * there's nothing to do here.
     736 ECB             :      */
     737 GIC        1728 : }
     738                 : 
     739                 : /*
     740                 :  * Assign a "unique" number for a cursor.
     741                 :  *
     742                 :  * These really only need to be unique per connection within a transaction.
     743 ECB             :  * For the moment we ignore the per-connection point and assign them across
     744                 :  * all connections in the transaction, but we ask for the connection to be
     745                 :  * supplied in case we want to refine that.
     746                 :  *
     747                 :  * Note that even if wraparound happens in a very long transaction, actual
     748                 :  * collisions are highly improbable; just be sure to use %u not %d to print.
     749                 :  */
     750                 : unsigned int
     751 GIC         503 : GetCursorNumber(PGconn *conn)
     752                 : {
     753             503 :     return ++cursor_number;
     754                 : }
     755                 : 
     756                 : /*
     757 ECB             :  * Assign a "unique" number for a prepared statement.
     758                 :  *
     759                 :  * This works much like GetCursorNumber, except that we never reset the counter
     760                 :  * within a session.  That's because we can't be 100% sure we've gotten rid
     761                 :  * of all prepared statements on all connections, and it's not really worth
     762                 :  * increasing the risk of prepared-statement name collisions by resetting.
     763                 :  */
     764                 : unsigned int
     765 GIC         174 : GetPrepStmtNumber(PGconn *conn)
     766                 : {
     767             174 :     return ++prep_stmt_number;
     768                 : }
     769                 : 
     770                 : /*
     771 ECB             :  * Submit a query and wait for the result.
     772                 :  *
     773                 :  * This function is interruptible by signals.
     774                 :  *
     775                 :  * Caller is responsible for the error handling on the result.
     776                 :  */
     777                 : PGresult *
     778 GIC        3560 : pgfdw_exec_query(PGconn *conn, const char *query, PgFdwConnState *state)
     779                 : {
     780                 :     /* First, process a pending asynchronous request, if any. */
     781            3560 :     if (state && state->pendingAreq)
     782               4 :         process_pending_request(state->pendingAreq);
     783                 : 
     784 ECB             :     /*
     785                 :      * Submit a query.  Since we don't use non-blocking mode, this also can
     786                 :      * block.  But its risk is relatively small, so we ignore that for now.
     787                 :      */
     788 CBC        3560 :     if (!PQsendQuery(conn, query))
     789 UIC           0 :         pgfdw_report_error(ERROR, NULL, conn, false, query);
     790                 : 
     791                 :     /* Wait for the result. */
     792 GIC        3560 :     return pgfdw_get_result(conn, query);
     793                 : }
     794 ECB             : 
     795 EUB             : /*
     796                 :  * Wait for the result from a prior asynchronous execution function call.
     797                 :  *
     798 ECB             :  * This function offers quick responsiveness by checking for any interruptions.
     799                 :  *
     800                 :  * This function emulates PQexec()'s behavior of returning the last result
     801                 :  * when there are many.
     802                 :  *
     803                 :  * Caller is responsible for the error handling on the result.
     804                 :  */
     805                 : PGresult *
     806 GIC        7428 : pgfdw_get_result(PGconn *conn, const char *query)
     807                 : {
     808            7428 :     PGresult   *volatile last_res = NULL;
     809                 : 
     810                 :     /* In what follows, do not leak any PGresults on an error. */
     811            7428 :     PG_TRY();
     812 ECB             :     {
     813                 :         for (;;)
     814 CBC        7428 :         {
     815                 :             PGresult   *res;
     816                 : 
     817           37034 :             while (PQisBusy(conn))
     818                 :             {
     819                 :                 int         wc;
     820 ECB             : 
     821                 :                 /* Sleep until there's something to do */
     822 GIC        7324 :                 wc = WaitLatchOrSocket(MyLatch,
     823 ECB             :                                        WL_LATCH_SET | WL_SOCKET_READABLE |
     824                 :                                        WL_EXIT_ON_PM_DEATH,
     825                 :                                        PQsocket(conn),
     826                 :                                        -1L, PG_WAIT_EXTENSION);
     827 GIC        7324 :                 ResetLatch(MyLatch);
     828 ECB             : 
     829 GIC        7324 :                 CHECK_FOR_INTERRUPTS();
     830                 : 
     831                 :                 /* Data available in socket? */
     832            7324 :                 if (wc & WL_SOCKET_READABLE)
     833 ECB             :                 {
     834 GIC        7322 :                     if (!PQconsumeInput(conn))
     835 CBC           2 :                         pgfdw_report_error(ERROR, NULL, conn, false, query);
     836                 :                 }
     837                 :             }
     838 ECB             : 
     839 GIC       14854 :             res = PQgetResult(conn);
     840 CBC       14854 :             if (res == NULL)
     841            7426 :                 break;          /* query is complete */
     842                 : 
     843 GIC        7428 :             PQclear(last_res);
     844            7428 :             last_res = res;
     845 ECB             :         }
     846                 :     }
     847 CBC           2 :     PG_CATCH();
     848                 :     {
     849               2 :         PQclear(last_res);
     850               2 :         PG_RE_THROW();
     851                 :     }
     852 GIC        7426 :     PG_END_TRY();
     853 ECB             : 
     854 GIC        7426 :     return last_res;
     855 ECB             : }
     856                 : 
     857                 : /*
     858                 :  * Report an error we got from the remote server.
     859                 :  *
     860                 :  * elevel: error level to use (typically ERROR, but might be less)
     861                 :  * res: PGresult containing the error
     862                 :  * conn: connection we did the query on
     863                 :  * clear: if true, PQclear the result (otherwise caller will handle it)
     864                 :  * sql: NULL, or text of remote command we tried to execute
     865                 :  *
     866                 :  * Note: callers that choose not to throw ERROR for a remote error are
     867                 :  * responsible for making sure that the associated ConnCacheEntry gets
     868                 :  * marked with have_error = true.
     869                 :  */
     870                 : void
     871 GIC          15 : pgfdw_report_error(int elevel, PGresult *res, PGconn *conn,
     872                 :                    bool clear, const char *sql)
     873                 : {
     874                 :     /* If requested, PGresult must be released before leaving this function. */
     875              15 :     PG_TRY();
     876                 :     {
     877 CBC          15 :         char       *diag_sqlstate = PQresultErrorField(res, PG_DIAG_SQLSTATE);
     878 GIC          15 :         char       *message_primary = PQresultErrorField(res, PG_DIAG_MESSAGE_PRIMARY);
     879              15 :         char       *message_detail = PQresultErrorField(res, PG_DIAG_MESSAGE_DETAIL);
     880              15 :         char       *message_hint = PQresultErrorField(res, PG_DIAG_MESSAGE_HINT);
     881 CBC          15 :         char       *message_context = PQresultErrorField(res, PG_DIAG_CONTEXT);
     882                 :         int         sqlstate;
     883 ECB             : 
     884 CBC          15 :         if (diag_sqlstate)
     885              13 :             sqlstate = MAKE_SQLSTATE(diag_sqlstate[0],
     886 ECB             :                                      diag_sqlstate[1],
     887                 :                                      diag_sqlstate[2],
     888                 :                                      diag_sqlstate[3],
     889                 :                                      diag_sqlstate[4]);
     890                 :         else
     891 CBC           2 :             sqlstate = ERRCODE_CONNECTION_FAILURE;
     892                 : 
     893                 :         /*
     894                 :          * If we don't get a message from the PGresult, try the PGconn.  This
     895                 :          * is needed because for connection-level failures, PQexec may just
     896                 :          * return NULL, not a PGresult at all.
     897 ECB             :          */
     898 GIC          15 :         if (message_primary == NULL)
     899               2 :             message_primary = pchomp(PQerrorMessage(conn));
     900                 : 
     901              15 :         ereport(elevel,
     902                 :                 (errcode(sqlstate),
     903                 :                  (message_primary != NULL && message_primary[0] != '\0') ?
     904 ECB             :                  errmsg_internal("%s", message_primary) :
     905                 :                  errmsg("could not obtain message string for remote error"),
     906                 :                  message_detail ? errdetail_internal("%s", message_detail) : 0,
     907                 :                  message_hint ? errhint("%s", message_hint) : 0,
     908                 :                  message_context ? errcontext("%s", message_context) : 0,
     909                 :                  sql ? errcontext("remote SQL command: %s", sql) : 0));
     910                 :     }
     911 GIC          15 :     PG_FINALLY();
     912                 :     {
     913              15 :         if (clear)
     914              12 :             PQclear(res);
     915                 :     }
     916              15 :     PG_END_TRY();
     917 LBC           0 : }
     918                 : 
     919 ECB             : /*
     920                 :  * pgfdw_xact_callback --- cleanup at main-transaction end.
     921                 :  *
     922                 :  * This runs just late enough that it must not enter user-defined code
     923 EUB             :  * locally.  (Entering such code on the remote side is fine.  Its remote
     924                 :  * COMMIT TRANSACTION may run deferred triggers.)
     925                 :  */
     926                 : static void
     927 GIC        3678 : pgfdw_xact_callback(XactEvent event, void *arg)
     928                 : {
     929                 :     HASH_SEQ_STATUS scan;
     930                 :     ConnCacheEntry *entry;
     931            3678 :     List       *pending_entries = NIL;
     932 GNC        3678 :     List       *cancel_requested = NIL;
     933                 : 
     934 ECB             :     /* Quick exit if no connections were touched in this transaction. */
     935 GIC        3678 :     if (!xact_got_connection)
     936            3012 :         return;
     937                 : 
     938 ECB             :     /*
     939                 :      * Scan all connection cache entries to find open remote transactions, and
     940                 :      * close them.
     941                 :      */
     942 CBC         666 :     hash_seq_init(&scan, ConnectionHash);
     943            2917 :     while ((entry = (ConnCacheEntry *) hash_seq_search(&scan)))
     944                 :     {
     945                 :         PGresult   *res;
     946                 : 
     947                 :         /* Ignore cache entry if no open connection right now */
     948 GIC        2252 :         if (entry->conn == NULL)
     949 CBC        1060 :             continue;
     950 ECB             : 
     951                 :         /* If it has an open remote transaction, try to close it */
     952 GIC        1192 :         if (entry->xact_depth > 0)
     953                 :         {
     954             697 :             elog(DEBUG3, "closing remote transaction on connection %p",
     955 ECB             :                  entry->conn);
     956                 : 
     957 GIC         697 :             switch (event)
     958                 :             {
     959 CBC         658 :                 case XACT_EVENT_PARALLEL_PRE_COMMIT:
     960                 :                 case XACT_EVENT_PRE_COMMIT:
     961 ECB             : 
     962                 :                     /*
     963                 :                      * If abort cleanup previously failed for this connection,
     964                 :                      * we can't issue any more commands against it.
     965                 :                      */
     966 CBC         658 :                     pgfdw_reject_incomplete_xact_state_change(entry);
     967                 : 
     968                 :                     /* Commit all remote transactions during pre-commit */
     969 GIC         658 :                     entry->changing_xact_state = true;
     970             658 :                     if (entry->parallel_commit)
     971                 :                     {
     972              16 :                         do_sql_command_begin(entry->conn, "COMMIT TRANSACTION");
     973 CBC          16 :                         pending_entries = lappend(pending_entries, entry);
     974 GIC          16 :                         continue;
     975                 :                     }
     976 CBC         642 :                     do_sql_command(entry->conn, "COMMIT TRANSACTION");
     977             642 :                     entry->changing_xact_state = false;
     978                 : 
     979 ECB             :                     /*
     980                 :                      * If there were any errors in subtransactions, and we
     981                 :                      * made prepared statements, do a DEALLOCATE ALL to make
     982                 :                      * sure we get rid of all prepared statements. This is
     983                 :                      * annoying and not terribly bulletproof, but it's
     984                 :                      * probably not worth trying harder.
     985                 :                      *
     986                 :                      * DEALLOCATE ALL only exists in 8.3 and later, so this
     987                 :                      * constrains how old a server postgres_fdw can
     988                 :                      * communicate with.  We intentionally ignore errors in
     989                 :                      * the DEALLOCATE, so that we can hobble along to some
     990                 :                      * extent with older servers (leaking prepared statements
     991                 :                      * as we go; but we don't really support update operations
     992                 :                      * pre-8.3 anyway).
     993                 :                      */
     994 GIC         642 :                     if (entry->have_prep_stmt && entry->have_error)
     995                 :                     {
     996 UIC           0 :                         res = PQexec(entry->conn, "DEALLOCATE ALL");
     997               0 :                         PQclear(res);
     998                 :                     }
     999 GIC         642 :                     entry->have_prep_stmt = false;
    1000             642 :                     entry->have_error = false;
    1001 CBC         642 :                     break;
    1002 GIC           1 :                 case XACT_EVENT_PRE_PREPARE:
    1003 EUB             : 
    1004                 :                     /*
    1005                 :                      * We disallow any remote transactions, since it's not
    1006 ECB             :                      * very reasonable to hold them open until the prepared
    1007                 :                      * transaction is committed.  For the moment, throw error
    1008                 :                      * unconditionally; later we might allow read-only cases.
    1009                 :                      * Note that the error will cause us to come right back
    1010                 :                      * here with event == XACT_EVENT_ABORT, so we'll clean up
    1011                 :                      * the connection state at that point.
    1012                 :                      */
    1013 GIC           1 :                     ereport(ERROR,
    1014                 :                             (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
    1015                 :                              errmsg("cannot PREPARE a transaction that has operated on postgres_fdw foreign tables")));
    1016                 :                     break;
    1017 UIC           0 :                 case XACT_EVENT_PARALLEL_COMMIT:
    1018                 :                 case XACT_EVENT_COMMIT:
    1019                 :                 case XACT_EVENT_PREPARE:
    1020 ECB             :                     /* Pre-commit should have closed the open transaction */
    1021 UIC           0 :                     elog(ERROR, "missed cleaning up connection during pre-commit");
    1022                 :                     break;
    1023 GIC          38 :                 case XACT_EVENT_PARALLEL_ABORT:
    1024 EUB             :                 case XACT_EVENT_ABORT:
    1025                 :                     /* Rollback all remote transactions during abort */
    1026 GNC          38 :                     if (entry->parallel_abort)
    1027                 :                     {
    1028               4 :                         if (pgfdw_abort_cleanup_begin(entry, true,
    1029                 :                                                       &pending_entries,
    1030                 :                                                       &cancel_requested))
    1031               4 :                             continue;
    1032                 :                     }
    1033                 :                     else
    1034              34 :                         pgfdw_abort_cleanup(entry, true);
    1035 GIC          34 :                     break;
    1036 EUB             :             }
    1037                 :         }
    1038 ECB             : 
    1039                 :         /* Reset state to show we're out of a transaction */
    1040 GIC        1171 :         pgfdw_reset_xact_state(entry, true);
    1041 ECB             :     }
    1042                 : 
    1043                 :     /* If there are any pending connections, finish cleaning them up */
    1044 GNC         665 :     if (pending_entries || cancel_requested)
    1045                 :     {
    1046              15 :         if (event == XACT_EVENT_PARALLEL_PRE_COMMIT ||
    1047                 :             event == XACT_EVENT_PRE_COMMIT)
    1048                 :         {
    1049              13 :             Assert(cancel_requested == NIL);
    1050              13 :             pgfdw_finish_pre_commit_cleanup(pending_entries);
    1051                 :         }
    1052                 :         else
    1053                 :         {
    1054               2 :             Assert(event == XACT_EVENT_PARALLEL_ABORT ||
    1055                 :                    event == XACT_EVENT_ABORT);
    1056               2 :             pgfdw_finish_abort_cleanup(pending_entries, cancel_requested,
    1057                 :                                        true);
    1058                 :         }
    1059 ECB             :     }
    1060                 : 
    1061                 :     /*
    1062                 :      * Regardless of the event type, we can now mark ourselves as out of the
    1063                 :      * transaction.  (Note: if we are here during PRE_COMMIT or PRE_PREPARE,
    1064                 :      * this saves a useless scan of the hashtable during COMMIT or PREPARE.)
    1065                 :      */
    1066 GIC         665 :     xact_got_connection = false;
    1067                 : 
    1068                 :     /* Also reset cursor numbering for next transaction */
    1069 CBC         665 :     cursor_number = 0;
    1070                 : }
    1071 ECB             : 
    1072                 : /*
    1073                 :  * pgfdw_subxact_callback --- cleanup at subtransaction end.
    1074                 :  */
    1075                 : static void
    1076 GIC          38 : pgfdw_subxact_callback(SubXactEvent event, SubTransactionId mySubid,
    1077                 :                        SubTransactionId parentSubid, void *arg)
    1078                 : {
    1079 ECB             :     HASH_SEQ_STATUS scan;
    1080                 :     ConnCacheEntry *entry;
    1081                 :     int         curlevel;
    1082 GIC          38 :     List       *pending_entries = NIL;
    1083 GNC          38 :     List       *cancel_requested = NIL;
    1084                 : 
    1085                 :     /* Nothing to do at subxact start, nor after commit. */
    1086 GIC          38 :     if (!(event == SUBXACT_EVENT_PRE_COMMIT_SUB ||
    1087                 :           event == SUBXACT_EVENT_ABORT_SUB))
    1088              23 :         return;
    1089                 : 
    1090                 :     /* Quick exit if no connections were touched in this transaction. */
    1091              15 :     if (!xact_got_connection)
    1092 LBC           0 :         return;
    1093                 : 
    1094                 :     /*
    1095 ECB             :      * Scan all connection cache entries to find open remote subtransactions
    1096                 :      * of the current level, and close them.
    1097                 :      */
    1098 GIC          15 :     curlevel = GetCurrentTransactionNestLevel();
    1099              15 :     hash_seq_init(&scan, ConnectionHash);
    1100              87 :     while ((entry = (ConnCacheEntry *) hash_seq_search(&scan)))
    1101                 :     {
    1102 ECB             :         char        sql[100];
    1103                 : 
    1104                 :         /*
    1105                 :          * We only care about connections with open remote subtransactions of
    1106                 :          * the current level.
    1107                 :          */
    1108 CBC          72 :         if (entry->conn == NULL || entry->xact_depth < curlevel)
    1109              64 :             continue;
    1110                 : 
    1111 GIC          14 :         if (entry->xact_depth > curlevel)
    1112 LBC           0 :             elog(ERROR, "missed cleaning up remote subtransaction at level %d",
    1113                 :                  entry->xact_depth);
    1114 ECB             : 
    1115 GIC          14 :         if (event == SUBXACT_EVENT_PRE_COMMIT_SUB)
    1116                 :         {
    1117 ECB             :             /*
    1118 EUB             :              * If abort cleanup previously failed for this connection, we
    1119                 :              * can't issue any more commands against it.
    1120                 :              */
    1121 GIC           7 :             pgfdw_reject_incomplete_xact_state_change(entry);
    1122                 : 
    1123                 :             /* Commit all remote subtransactions during pre-commit */
    1124 CBC           7 :             snprintf(sql, sizeof(sql), "RELEASE SAVEPOINT s%d", curlevel);
    1125               7 :             entry->changing_xact_state = true;
    1126               7 :             if (entry->parallel_commit)
    1127                 :             {
    1128 GIC           2 :                 do_sql_command_begin(entry->conn, sql);
    1129               2 :                 pending_entries = lappend(pending_entries, entry);
    1130               2 :                 continue;
    1131                 :             }
    1132               5 :             do_sql_command(entry->conn, sql);
    1133               5 :             entry->changing_xact_state = false;
    1134 ECB             :         }
    1135                 :         else
    1136                 :         {
    1137                 :             /* Rollback all remote subtransactions during abort */
    1138 GNC           7 :             if (entry->parallel_abort)
    1139                 :             {
    1140               4 :                 if (pgfdw_abort_cleanup_begin(entry, false,
    1141                 :                                               &pending_entries,
    1142                 :                                               &cancel_requested))
    1143               4 :                     continue;
    1144                 :             }
    1145                 :             else
    1146               3 :                 pgfdw_abort_cleanup(entry, false);
    1147                 :         }
    1148                 : 
    1149 ECB             :         /* OK, we're outta that level of subtransaction */
    1150 GIC           8 :         pgfdw_reset_xact_state(entry, false);
    1151                 :     }
    1152                 : 
    1153                 :     /* If there are any pending connections, finish cleaning them up */
    1154 GNC          15 :     if (pending_entries || cancel_requested)
    1155 ECB             :     {
    1156 GNC           3 :         if (event == SUBXACT_EVENT_PRE_COMMIT_SUB)
    1157                 :         {
    1158               1 :             Assert(cancel_requested == NIL);
    1159               1 :             pgfdw_finish_pre_subcommit_cleanup(pending_entries, curlevel);
    1160                 :         }
    1161                 :         else
    1162                 :         {
    1163               2 :             Assert(event == SUBXACT_EVENT_ABORT_SUB);
    1164               2 :             pgfdw_finish_abort_cleanup(pending_entries, cancel_requested,
    1165                 :                                        false);
    1166                 :         }
    1167 ECB             :     }
    1168                 : }
    1169                 : 
    1170                 : /*
    1171                 :  * Connection invalidation callback function
    1172                 :  *
    1173                 :  * After a change to a pg_foreign_server or pg_user_mapping catalog entry,
    1174                 :  * close connections depending on that entry immediately if current transaction
    1175                 :  * has not used those connections yet. Otherwise, mark those connections as
    1176                 :  * invalid and then make pgfdw_xact_callback() close them at the end of current
    1177                 :  * transaction, since they cannot be closed in the midst of the transaction
    1178                 :  * using them. Closed connections will be remade at the next opportunity if
    1179                 :  * necessary.
    1180                 :  *
    1181                 :  * Although most cache invalidation callbacks blow away all the related stuff
    1182                 :  * regardless of the given hashvalue, connections are expensive enough that
    1183                 :  * it's worth trying to avoid that.
    1184                 :  *
    1185                 :  * NB: We could avoid unnecessary disconnection more strictly by examining
    1186                 :  * individual option values, but it seems too much effort for the gain.
    1187                 :  */
    1188                 : static void
    1189 CBC         162 : pgfdw_inval_callback(Datum arg, int cacheid, uint32 hashvalue)
    1190                 : {
    1191                 :     HASH_SEQ_STATUS scan;
    1192                 :     ConnCacheEntry *entry;
    1193 ECB             : 
    1194 GIC         162 :     Assert(cacheid == FOREIGNSERVEROID || cacheid == USERMAPPINGOID);
    1195                 : 
    1196                 :     /* ConnectionHash must exist already, if we're registered */
    1197 CBC         162 :     hash_seq_init(&scan, ConnectionHash);
    1198 GIC         950 :     while ((entry = (ConnCacheEntry *) hash_seq_search(&scan)))
    1199 ECB             :     {
    1200                 :         /* Ignore invalid entries */
    1201 CBC         788 :         if (entry->conn == NULL)
    1202             625 :             continue;
    1203                 : 
    1204                 :         /* hashvalue == 0 means a cache reset, must clear all state */
    1205 GIC         163 :         if (hashvalue == 0 ||
    1206 CBC         123 :             (cacheid == FOREIGNSERVEROID &&
    1207             163 :              entry->server_hashvalue == hashvalue) ||
    1208 GIC          40 :             (cacheid == USERMAPPINGOID &&
    1209              40 :              entry->mapping_hashvalue == hashvalue))
    1210                 :         {
    1211                 :             /*
    1212                 :              * Close the connection immediately if it's not used yet in this
    1213                 :              * transaction. Otherwise mark it as invalid so that
    1214                 :              * pgfdw_xact_callback() can close it at the end of this
    1215                 :              * transaction.
    1216                 :              */
    1217              46 :             if (entry->xact_depth == 0)
    1218                 :             {
    1219              43 :                 elog(DEBUG3, "discarding connection %p", entry->conn);
    1220              43 :                 disconnect_pg_server(entry);
    1221                 :             }
    1222                 :             else
    1223               3 :                 entry->invalidated = true;
    1224                 :         }
    1225                 :     }
    1226             162 : }
    1227                 : 
    1228                 : /*
    1229                 :  * Raise an error if the given connection cache entry is marked as being
    1230                 :  * in the middle of an xact state change.  This should be called at which no
    1231                 :  * such change is expected to be in progress; if one is found to be in
    1232 ECB             :  * progress, it means that we aborted in the middle of a previous state change
    1233                 :  * and now don't know what the remote transaction state actually is.
    1234                 :  * Such connections can't safely be further used.  Re-establishing the
    1235                 :  * connection would change the snapshot and roll back any writes already
    1236                 :  * performed, so that's not an option, either. Thus, we must abort.
    1237                 :  */
    1238                 : static void
    1239 GIC        2449 : pgfdw_reject_incomplete_xact_state_change(ConnCacheEntry *entry)
    1240 ECB             : {
    1241                 :     ForeignServer *server;
    1242                 : 
    1243                 :     /* nothing to do for inactive entries and entries of sane state */
    1244 CBC        2449 :     if (entry->conn == NULL || !entry->changing_xact_state)
    1245            2449 :         return;
    1246                 : 
    1247                 :     /* make sure this entry is inactive */
    1248 LBC           0 :     disconnect_pg_server(entry);
    1249 ECB             : 
    1250                 :     /* find server name to be shown in the message below */
    1251 LBC           0 :     server = GetForeignServer(entry->serverid);
    1252 ECB             : 
    1253 UIC           0 :     ereport(ERROR,
    1254                 :             (errcode(ERRCODE_CONNECTION_EXCEPTION),
    1255                 :              errmsg("connection to server \"%s\" was lost",
    1256                 :                     server->servername)));
    1257                 : }
    1258                 : 
    1259                 : /*
    1260 ECB             :  * Reset state to show we're out of a (sub)transaction.
    1261                 :  */
    1262                 : static void
    1263 CBC        1205 : pgfdw_reset_xact_state(ConnCacheEntry *entry, bool toplevel)
    1264                 : {
    1265 GIC        1205 :     if (toplevel)
    1266 ECB             :     {
    1267                 :         /* Reset state to show we're out of a transaction */
    1268 GIC        1191 :         entry->xact_depth = 0;
    1269 ECB             : 
    1270                 :         /*
    1271                 :          * If the connection isn't in a good idle state, it is marked as
    1272                 :          * invalid or keep_connections option of its server is disabled, then
    1273                 :          * discard it to recover. Next GetConnection will open a new
    1274                 :          * connection.
    1275                 :          */
    1276 GIC        2381 :         if (PQstatus(entry->conn) != CONNECTION_OK ||
    1277            1190 :             PQtransactionStatus(entry->conn) != PQTRANS_IDLE ||
    1278            1190 :             entry->changing_xact_state ||
    1279            1190 :             entry->invalidated ||
    1280            1188 :             !entry->keep_connections)
    1281                 :         {
    1282 CBC           4 :             elog(DEBUG3, "discarding connection %p", entry->conn);
    1283 GIC           4 :             disconnect_pg_server(entry);
    1284                 :         }
    1285                 :     }
    1286                 :     else
    1287 ECB             :     {
    1288                 :         /* Reset state to show we're out of a subtransaction */
    1289 GIC          14 :         entry->xact_depth--;
    1290                 :     }
    1291 GBC        1205 : }
    1292                 : 
    1293                 : /*
    1294 EUB             :  * Cancel the currently-in-progress query (whose query text we do not have)
    1295                 :  * and ignore the result.  Returns true if we successfully cancel the query
    1296                 :  * and discard any pending result, and false if not.
    1297                 :  *
    1298                 :  * It's not a huge problem if we throw an ERROR here, but if we get into error
    1299                 :  * recursion trouble, we'll end up slamming the connection shut, which will
    1300                 :  * necessitate failing the entire toplevel transaction even if subtransactions
    1301                 :  * were used.  Try to use WARNING where we can.
    1302                 :  *
    1303                 :  * XXX: if the query was one sent by fetch_more_data_begin(), we could get the
    1304                 :  * query text from the pendingAreq saved in the per-connection state, then
    1305                 :  * report the query using it.
    1306 ECB             :  */
    1307                 : static bool
    1308 LBC           0 : pgfdw_cancel_query(PGconn *conn)
    1309                 : {
    1310                 :     TimestampTz endtime;
    1311                 : 
    1312                 :     /*
    1313                 :      * If it takes too long to cancel the query and discard the result, assume
    1314                 :      * the connection is dead.
    1315 ECB             :      */
    1316 UNC           0 :     endtime = TimestampTzPlusMilliseconds(GetCurrentTimestamp(),
    1317                 :                                           CONNECTION_CLEANUP_TIMEOUT);
    1318                 : 
    1319               0 :     if (!pgfdw_cancel_query_begin(conn))
    1320               0 :         return false;
    1321               0 :     return pgfdw_cancel_query_end(conn, endtime, false);
    1322                 : }
    1323                 : 
    1324                 : static bool
    1325               0 : pgfdw_cancel_query_begin(PGconn *conn)
    1326                 : {
    1327                 :     PGcancel   *cancel;
    1328                 :     char        errbuf[256];
    1329 ECB             : 
    1330                 :     /*
    1331                 :      * Issue cancel request.  Unfortunately, there's no good way to limit the
    1332                 :      * amount of time that we might block inside PQgetCancel().
    1333                 :      */
    1334 LBC           0 :     if ((cancel = PQgetCancel(conn)))
    1335                 :     {
    1336 UIC           0 :         if (!PQcancel(cancel, errbuf, sizeof(errbuf)))
    1337                 :         {
    1338               0 :             ereport(WARNING,
    1339                 :                     (errcode(ERRCODE_CONNECTION_FAILURE),
    1340 ECB             :                      errmsg("could not send cancel request: %s",
    1341                 :                             errbuf)));
    1342 LBC           0 :             PQfreeCancel(cancel);
    1343 UIC           0 :             return false;
    1344                 :         }
    1345               0 :         PQfreeCancel(cancel);
    1346                 :     }
    1347                 : 
    1348 UNC           0 :     return true;
    1349                 : }
    1350                 : 
    1351                 : static bool
    1352               0 : pgfdw_cancel_query_end(PGconn *conn, TimestampTz endtime, bool consume_input)
    1353                 : {
    1354               0 :     PGresult   *result = NULL;
    1355                 :     bool        timed_out;
    1356                 : 
    1357                 :     /*
    1358                 :      * If requested, consume whatever data is available from the socket. (Note
    1359                 :      * that if all data is available, this allows pgfdw_get_cleanup_result to
    1360                 :      * call PQgetResult without forcing the overhead of WaitLatchOrSocket,
    1361                 :      * which would be large compared to the overhead of PQconsumeInput.)
    1362                 :      */
    1363               0 :     if (consume_input && !PQconsumeInput(conn))
    1364                 :     {
    1365               0 :         ereport(WARNING,
    1366                 :                 (errcode(ERRCODE_CONNECTION_FAILURE),
    1367                 :                  errmsg("could not get result of cancel request: %s",
    1368                 :                         pchomp(PQerrorMessage(conn)))));
    1369               0 :         return false;
    1370                 :     }
    1371                 : 
    1372                 :     /* Get and discard the result of the query. */
    1373 UIC           0 :     if (pgfdw_get_cleanup_result(conn, endtime, &result, &timed_out))
    1374                 :     {
    1375               0 :         if (timed_out)
    1376               0 :             ereport(WARNING,
    1377                 :                     (errmsg("could not get result of cancel request due to timeout")));
    1378                 :         else
    1379               0 :             ereport(WARNING,
    1380                 :                     (errcode(ERRCODE_CONNECTION_FAILURE),
    1381                 :                      errmsg("could not get result of cancel request: %s",
    1382                 :                             pchomp(PQerrorMessage(conn)))));
    1383 EUB             : 
    1384 UIC           0 :         return false;
    1385                 :     }
    1386               0 :     PQclear(result);
    1387                 : 
    1388               0 :     return true;
    1389                 : }
    1390                 : 
    1391 EUB             : /*
    1392                 :  * Submit a query during (sub)abort cleanup and wait up to 30 seconds for the
    1393                 :  * result.  If the query is executed without error, the return value is true.
    1394                 :  * If the query is executed successfully but returns an error, the return
    1395                 :  * value is true if and only if ignore_errors is set.  If the query can't be
    1396                 :  * sent or times out, the return value is false.
    1397                 :  *
    1398                 :  * It's not a huge problem if we throw an ERROR here, but if we get into error
    1399                 :  * recursion trouble, we'll end up slamming the connection shut, which will
    1400                 :  * necessitate failing the entire toplevel transaction even if subtransactions
    1401                 :  * were used.  Try to use WARNING where we can.
    1402                 :  */
    1403                 : static bool
    1404 GIC          50 : pgfdw_exec_cleanup_query(PGconn *conn, const char *query, bool ignore_errors)
    1405                 : {
    1406                 :     TimestampTz endtime;
    1407 EUB             : 
    1408                 :     /*
    1409                 :      * If it takes too long to execute a cleanup query, assume the connection
    1410                 :      * is dead.  It's fairly likely that this is why we aborted in the first
    1411                 :      * place (e.g. statement timeout, user cancel), so the timeout shouldn't
    1412                 :      * be too long.
    1413                 :      */
    1414 GNC          50 :     endtime = TimestampTzPlusMilliseconds(GetCurrentTimestamp(),
    1415                 :                                           CONNECTION_CLEANUP_TIMEOUT);
    1416 EUB             : 
    1417 GNC          50 :     if (!pgfdw_exec_cleanup_query_begin(conn, query))
    1418 UNC           0 :         return false;
    1419 GNC          50 :     return pgfdw_exec_cleanup_query_end(conn, query, endtime,
    1420                 :                                         false, ignore_errors);
    1421                 : }
    1422                 : 
    1423                 : static bool
    1424              62 : pgfdw_exec_cleanup_query_begin(PGconn *conn, const char *query)
    1425                 : {
    1426 EUB             :     /*
    1427                 :      * Submit a query.  Since we don't use non-blocking mode, this also can
    1428                 :      * block.  But its risk is relatively small, so we ignore that for now.
    1429                 :      */
    1430 GIC          62 :     if (!PQsendQuery(conn, query))
    1431 EUB             :     {
    1432 UIC           0 :         pgfdw_report_error(WARNING, NULL, conn, false, query);
    1433               0 :         return false;
    1434                 :     }
    1435 EUB             : 
    1436 GNC          62 :     return true;
    1437                 : }
    1438                 : 
    1439                 : static bool
    1440              62 : pgfdw_exec_cleanup_query_end(PGconn *conn, const char *query,
    1441                 :                              TimestampTz endtime, bool consume_input,
    1442                 :                              bool ignore_errors)
    1443                 : {
    1444              62 :     PGresult   *result = NULL;
    1445                 :     bool        timed_out;
    1446                 : 
    1447                 :     /*
    1448                 :      * If requested, consume whatever data is available from the socket. (Note
    1449                 :      * that if all data is available, this allows pgfdw_get_cleanup_result to
    1450                 :      * call PQgetResult without forcing the overhead of WaitLatchOrSocket,
    1451                 :      * which would be large compared to the overhead of PQconsumeInput.)
    1452                 :      */
    1453              62 :     if (consume_input && !PQconsumeInput(conn))
    1454                 :     {
    1455 UNC           0 :         pgfdw_report_error(WARNING, NULL, conn, false, query);
    1456               0 :         return false;
    1457                 :     }
    1458                 : 
    1459                 :     /* Get the result of the query. */
    1460 GBC          62 :     if (pgfdw_get_cleanup_result(conn, endtime, &result, &timed_out))
    1461                 :     {
    1462 UIC           0 :         if (timed_out)
    1463               0 :             ereport(WARNING,
    1464                 :                     (errmsg("could not get query result due to timeout"),
    1465                 :                      query ? errcontext("remote SQL command: %s", query) : 0));
    1466                 :         else
    1467               0 :             pgfdw_report_error(WARNING, NULL, conn, false, query);
    1468                 : 
    1469 UBC           0 :         return false;
    1470                 :     }
    1471 EUB             : 
    1472                 :     /* Issue a warning if not successful. */
    1473 GIC          62 :     if (PQresultStatus(result) != PGRES_COMMAND_OK)
    1474                 :     {
    1475 UBC           0 :         pgfdw_report_error(WARNING, result, conn, true, query);
    1476 UIC           0 :         return ignore_errors;
    1477                 :     }
    1478 GIC          62 :     PQclear(result);
    1479 EUB             : 
    1480 GIC          62 :     return true;
    1481 EUB             : }
    1482                 : 
    1483                 : /*
    1484                 :  * Get, during abort cleanup, the result of a query that is in progress.  This
    1485                 :  * might be a query that is being interrupted by transaction abort, or it might
    1486                 :  * be a query that was initiated as part of transaction abort to get the remote
    1487                 :  * side back to the appropriate state.
    1488                 :  *
    1489                 :  * endtime is the time at which we should give up and assume the remote
    1490                 :  * side is dead.  Returns true if the timeout expired or connection trouble
    1491                 :  * occurred, false otherwise.  Sets *result except in case of a timeout.
    1492                 :  * Sets timed_out to true only when the timeout expired.
    1493                 :  */
    1494                 : static bool
    1495 GIC          62 : pgfdw_get_cleanup_result(PGconn *conn, TimestampTz endtime, PGresult **result,
    1496                 :                          bool *timed_out)
    1497                 : {
    1498              62 :     volatile bool failed = false;
    1499              62 :     PGresult   *volatile last_res = NULL;
    1500                 : 
    1501              62 :     *timed_out = false;
    1502                 : 
    1503                 :     /* In what follows, do not leak any PGresults on an error. */
    1504              62 :     PG_TRY();
    1505                 :     {
    1506                 :         for (;;)
    1507              69 :         {
    1508                 :             PGresult   *res;
    1509                 : 
    1510 CBC         312 :             while (PQisBusy(conn))
    1511                 :             {
    1512                 :                 int         wc;
    1513 GIC          50 :                 TimestampTz now = GetCurrentTimestamp();
    1514                 :                 long        cur_timeout;
    1515                 : 
    1516                 :                 /* If timeout has expired, give up, else get sleep time. */
    1517              50 :                 cur_timeout = TimestampDifferenceMilliseconds(now, endtime);
    1518              50 :                 if (cur_timeout <= 0)
    1519                 :                 {
    1520 LBC           0 :                     *timed_out = true;
    1521 UIC           0 :                     failed = true;
    1522               0 :                     goto exit;
    1523 ECB             :                 }
    1524 EUB             : 
    1525 ECB             :                 /* Sleep until there's something to do */
    1526 GIC          50 :                 wc = WaitLatchOrSocket(MyLatch,
    1527                 :                                        WL_LATCH_SET | WL_SOCKET_READABLE |
    1528                 :                                        WL_TIMEOUT | WL_EXIT_ON_PM_DEATH,
    1529                 :                                        PQsocket(conn),
    1530 ECB             :                                        cur_timeout, PG_WAIT_EXTENSION);
    1531 GIC          50 :                 ResetLatch(MyLatch);
    1532                 : 
    1533              50 :                 CHECK_FOR_INTERRUPTS();
    1534                 : 
    1535                 :                 /* Data available in socket? */
    1536 CBC          50 :                 if (wc & WL_SOCKET_READABLE)
    1537                 :                 {
    1538 GBC          50 :                     if (!PQconsumeInput(conn))
    1539 EUB             :                     {
    1540                 :                         /* connection trouble */
    1541 UIC           0 :                         failed = true;
    1542 LBC           0 :                         goto exit;
    1543                 :                     }
    1544                 :                 }
    1545                 :             }
    1546 ECB             : 
    1547 GIC         131 :             res = PQgetResult(conn);
    1548             131 :             if (res == NULL)
    1549              62 :                 break;          /* query is complete */
    1550 ECB             : 
    1551 GIC          69 :             PQclear(last_res);
    1552              69 :             last_res = res;
    1553                 :         }
    1554              62 : exit:   ;
    1555                 :     }
    1556 UIC           0 :     PG_CATCH();
    1557                 :     {
    1558               0 :         PQclear(last_res);
    1559 LBC           0 :         PG_RE_THROW();
    1560                 :     }
    1561 GBC          62 :     PG_END_TRY();
    1562 EUB             : 
    1563 GIC          62 :     if (failed)
    1564 UIC           0 :         PQclear(last_res);
    1565                 :     else
    1566 CBC          62 :         *result = last_res;
    1567 GIC          62 :     return failed;
    1568 EUB             : }
    1569                 : 
    1570                 : /*
    1571                 :  * Abort remote transaction or subtransaction.
    1572                 :  *
    1573                 :  * "toplevel" should be set to true if toplevel (main) transaction is
    1574                 :  * rollbacked, false otherwise.
    1575                 :  *
    1576                 :  * Set entry->changing_xact_state to false on success, true on failure.
    1577                 :  */
    1578                 : static void
    1579 CBC          37 : pgfdw_abort_cleanup(ConnCacheEntry *entry, bool toplevel)
    1580                 : {
    1581 EUB             :     char        sql[100];
    1582                 : 
    1583                 :     /*
    1584 ECB             :      * Don't try to clean up the connection if we're already in error
    1585                 :      * recursion trouble.
    1586                 :      */
    1587 GIC          37 :     if (in_error_recursion_trouble())
    1588 UIC           0 :         entry->changing_xact_state = true;
    1589                 : 
    1590                 :     /*
    1591                 :      * If connection is already unsalvageable, don't touch it further.
    1592                 :      */
    1593 GIC          37 :     if (entry->changing_xact_state)
    1594               1 :         return;
    1595                 : 
    1596                 :     /*
    1597                 :      * Mark this connection as in the process of changing transaction state.
    1598                 :      */
    1599              36 :     entry->changing_xact_state = true;
    1600                 : 
    1601 ECB             :     /* Assume we might have lost track of prepared statements */
    1602 GIC          36 :     entry->have_error = true;
    1603                 : 
    1604 ECB             :     /*
    1605                 :      * If a command has been submitted to the remote server by using an
    1606                 :      * asynchronous execution function, the command might not have yet
    1607                 :      * completed.  Check to see if a command is still being processed by the
    1608                 :      * remote server, and if so, request cancellation of the command.
    1609                 :      */
    1610 CBC          36 :     if (PQtransactionStatus(entry->conn) == PQTRANS_ACTIVE &&
    1611 UIC           0 :         !pgfdw_cancel_query(entry->conn))
    1612               0 :         return;                 /* Unable to cancel running query */
    1613 ECB             : 
    1614 GNC          36 :     CONSTRUCT_ABORT_COMMAND(sql, entry, toplevel);
    1615 GIC          36 :     if (!pgfdw_exec_cleanup_query(entry->conn, sql, false))
    1616 UIC           0 :         return;                 /* Unable to abort remote (sub)transaction */
    1617                 : 
    1618 CBC          36 :     if (toplevel)
    1619 ECB             :     {
    1620 GIC          33 :         if (entry->have_prep_stmt && entry->have_error &&
    1621 GBC          14 :             !pgfdw_exec_cleanup_query(entry->conn,
    1622 EUB             :                                       "DEALLOCATE ALL",
    1623                 :                                       true))
    1624 UIC           0 :             return;             /* Trouble clearing prepared statements */
    1625                 : 
    1626 GIC          33 :         entry->have_prep_stmt = false;
    1627 CBC          33 :         entry->have_error = false;
    1628                 :     }
    1629                 : 
    1630                 :     /*
    1631                 :      * If pendingAreq of the per-connection state is not NULL, it means that
    1632 ECB             :      * an asynchronous fetch begun by fetch_more_data_begin() was not done
    1633                 :      * successfully and thus the per-connection state was not reset in
    1634                 :      * fetch_more_data(); in that case reset the per-connection state here.
    1635                 :      */
    1636 GIC          36 :     if (entry->state.pendingAreq)
    1637 LBC           0 :         memset(&entry->state, 0, sizeof(entry->state));
    1638                 : 
    1639 ECB             :     /* Disarm changing_xact_state if it all worked */
    1640 GIC          36 :     entry->changing_xact_state = false;
    1641                 : }
    1642 EUB             : 
    1643                 : /*
    1644                 :  * Like pgfdw_abort_cleanup, submit an abort command or cancel request, but
    1645                 :  * don't wait for the result.
    1646                 :  *
    1647                 :  * Returns true if the abort command or cancel request is successfully issued,
    1648                 :  * false otherwise.  If the abort command is successfully issued, the given
    1649                 :  * connection cache entry is appended to *pending_entries.  Othewise, if the
    1650                 :  * cancel request is successfully issued, it is appended to *cancel_requested.
    1651                 :  */
    1652                 : static bool
    1653 GNC           8 : pgfdw_abort_cleanup_begin(ConnCacheEntry *entry, bool toplevel,
    1654                 :                           List **pending_entries, List **cancel_requested)
    1655                 : {
    1656                 :     /*
    1657                 :      * Don't try to clean up the connection if we're already in error
    1658                 :      * recursion trouble.
    1659                 :      */
    1660               8 :     if (in_error_recursion_trouble())
    1661 UNC           0 :         entry->changing_xact_state = true;
    1662                 : 
    1663                 :     /*
    1664                 :      * If connection is already unsalvageable, don't touch it further.
    1665                 :      */
    1666 GNC           8 :     if (entry->changing_xact_state)
    1667 UNC           0 :         return false;
    1668                 : 
    1669                 :     /*
    1670                 :      * Mark this connection as in the process of changing transaction state.
    1671                 :      */
    1672 GNC           8 :     entry->changing_xact_state = true;
    1673                 : 
    1674                 :     /* Assume we might have lost track of prepared statements */
    1675               8 :     entry->have_error = true;
    1676                 : 
    1677                 :     /*
    1678                 :      * If a command has been submitted to the remote server by using an
    1679                 :      * asynchronous execution function, the command might not have yet
    1680                 :      * completed.  Check to see if a command is still being processed by the
    1681                 :      * remote server, and if so, request cancellation of the command.
    1682                 :      */
    1683               8 :     if (PQtransactionStatus(entry->conn) == PQTRANS_ACTIVE)
    1684                 :     {
    1685 UNC           0 :         if (!pgfdw_cancel_query_begin(entry->conn))
    1686               0 :             return false;       /* Unable to cancel running query */
    1687               0 :         *cancel_requested = lappend(*cancel_requested, entry);
    1688                 :     }
    1689                 :     else
    1690                 :     {
    1691                 :         char        sql[100];
    1692                 : 
    1693 GNC           8 :         CONSTRUCT_ABORT_COMMAND(sql, entry, toplevel);
    1694               8 :         if (!pgfdw_exec_cleanup_query_begin(entry->conn, sql))
    1695 UNC           0 :             return false;       /* Unable to abort remote transaction */
    1696 GNC           8 :         *pending_entries = lappend(*pending_entries, entry);
    1697                 :     }
    1698                 : 
    1699               8 :     return true;
    1700                 : }
    1701                 : 
    1702 EUB             : /*
    1703                 :  * Finish pre-commit cleanup of connections on each of which we've sent a
    1704                 :  * COMMIT command to the remote server.
    1705                 :  */
    1706                 : static void
    1707 CBC          13 : pgfdw_finish_pre_commit_cleanup(List *pending_entries)
    1708 ECB             : {
    1709                 :     ConnCacheEntry *entry;
    1710 GIC          13 :     List       *pending_deallocs = NIL;
    1711 ECB             :     ListCell   *lc;
    1712                 : 
    1713 GIC          13 :     Assert(pending_entries);
    1714 ECB             : 
    1715                 :     /*
    1716 EUB             :      * Get the result of the COMMIT command for each of the pending entries
    1717                 :      */
    1718 GBC          29 :     foreach(lc, pending_entries)
    1719 EUB             :     {
    1720 GIC          16 :         entry = (ConnCacheEntry *) lfirst(lc);
    1721 ECB             : 
    1722 GIC          16 :         Assert(entry->changing_xact_state);
    1723 ECB             : 
    1724 EUB             :         /*
    1725                 :          * We might already have received the result on the socket, so pass
    1726 ECB             :          * consume_input=true to try to consume it first
    1727                 :          */
    1728 GIC          16 :         do_sql_command_end(entry->conn, "COMMIT TRANSACTION", true);
    1729              16 :         entry->changing_xact_state = false;
    1730                 : 
    1731                 :         /* Do a DEALLOCATE ALL in parallel if needed */
    1732              16 :         if (entry->have_prep_stmt && entry->have_error)
    1733                 :         {
    1734                 :             /* Ignore errors (see notes in pgfdw_xact_callback) */
    1735               2 :             if (PQsendQuery(entry->conn, "DEALLOCATE ALL"))
    1736                 :             {
    1737               2 :                 pending_deallocs = lappend(pending_deallocs, entry);
    1738               2 :                 continue;
    1739 ECB             :             }
    1740                 :         }
    1741 GIC          14 :         entry->have_prep_stmt = false;
    1742              14 :         entry->have_error = false;
    1743                 : 
    1744              14 :         pgfdw_reset_xact_state(entry, true);
    1745                 :     }
    1746                 : 
    1747 ECB             :     /* No further work if no pending entries */
    1748 GBC          13 :     if (!pending_deallocs)
    1749 GIC          12 :         return;
    1750                 : 
    1751                 :     /*
    1752                 :      * Get the result of the DEALLOCATE command for each of the pending
    1753 ECB             :      * entries
    1754                 :      */
    1755 GIC           3 :     foreach(lc, pending_deallocs)
    1756                 :     {
    1757                 :         PGresult   *res;
    1758                 : 
    1759 CBC           2 :         entry = (ConnCacheEntry *) lfirst(lc);
    1760                 : 
    1761                 :         /* Ignore errors (see notes in pgfdw_xact_callback) */
    1762               4 :         while ((res = PQgetResult(entry->conn)) != NULL)
    1763                 :         {
    1764 GIC           2 :             PQclear(res);
    1765                 :             /* Stop if the connection is lost (else we'll loop infinitely) */
    1766               2 :             if (PQstatus(entry->conn) == CONNECTION_BAD)
    1767 UIC           0 :                 break;
    1768                 :         }
    1769 GIC           2 :         entry->have_prep_stmt = false;
    1770 CBC           2 :         entry->have_error = false;
    1771 EUB             : 
    1772 GBC           2 :         pgfdw_reset_xact_state(entry, true);
    1773                 :     }
    1774 ECB             : }
    1775                 : 
    1776 EUB             : /*
    1777                 :  * Finish pre-subcommit cleanup of connections on each of which we've sent a
    1778 ECB             :  * RELEASE command to the remote server.
    1779                 :  */
    1780                 : static void
    1781 CBC           1 : pgfdw_finish_pre_subcommit_cleanup(List *pending_entries, int curlevel)
    1782                 : {
    1783                 :     ConnCacheEntry *entry;
    1784 EUB             :     char        sql[100];
    1785                 :     ListCell   *lc;
    1786 ECB             : 
    1787 CBC           1 :     Assert(pending_entries);
    1788                 : 
    1789                 :     /*
    1790                 :      * Get the result of the RELEASE command for each of the pending entries
    1791                 :      */
    1792 GIC           1 :     snprintf(sql, sizeof(sql), "RELEASE SAVEPOINT s%d", curlevel);
    1793               3 :     foreach(lc, pending_entries)
    1794                 :     {
    1795               2 :         entry = (ConnCacheEntry *) lfirst(lc);
    1796 ECB             : 
    1797 GBC           2 :         Assert(entry->changing_xact_state);
    1798                 : 
    1799                 :         /*
    1800 ECB             :          * We might already have received the result on the socket, so pass
    1801                 :          * consume_input=true to try to consume it first
    1802                 :          */
    1803 GIC           2 :         do_sql_command_end(entry->conn, sql, true);
    1804               2 :         entry->changing_xact_state = false;
    1805                 : 
    1806               2 :         pgfdw_reset_xact_state(entry, false);
    1807                 :     }
    1808               1 : }
    1809                 : 
    1810                 : /*
    1811                 :  * Finish abort cleanup of connections on each of which we've sent an abort
    1812                 :  * command or cancel request to the remote server.
    1813                 :  */
    1814                 : static void
    1815 GNC           4 : pgfdw_finish_abort_cleanup(List *pending_entries, List *cancel_requested,
    1816                 :                            bool toplevel)
    1817                 : {
    1818               4 :     List       *pending_deallocs = NIL;
    1819                 :     ListCell   *lc;
    1820                 : 
    1821                 :     /*
    1822                 :      * For each of the pending cancel requests (if any), get and discard the
    1823                 :      * result of the query, and submit an abort command to the remote server.
    1824                 :      */
    1825               4 :     if (cancel_requested)
    1826                 :     {
    1827 UNC           0 :         foreach(lc, cancel_requested)
    1828                 :         {
    1829               0 :             ConnCacheEntry *entry = (ConnCacheEntry *) lfirst(lc);
    1830                 :             TimestampTz endtime;
    1831                 :             char        sql[100];
    1832                 : 
    1833               0 :             Assert(entry->changing_xact_state);
    1834                 : 
    1835                 :             /*
    1836                 :              * Set end time.  You might think we should do this before issuing
    1837                 :              * cancel request like in normal mode, but that is problematic,
    1838                 :              * because if, for example, it took longer than 30 seconds to
    1839                 :              * process the first few entries in the cancel_requested list, it
    1840                 :              * would cause a timeout error when processing each of the
    1841                 :              * remaining entries in the list, leading to slamming that entry's
    1842                 :              * connection shut.
    1843                 :              */
    1844               0 :             endtime = TimestampTzPlusMilliseconds(GetCurrentTimestamp(),
    1845                 :                                                   CONNECTION_CLEANUP_TIMEOUT);
    1846                 : 
    1847               0 :             if (!pgfdw_cancel_query_end(entry->conn, endtime, true))
    1848                 :             {
    1849                 :                 /* Unable to cancel running query */
    1850               0 :                 pgfdw_reset_xact_state(entry, toplevel);
    1851               0 :                 continue;
    1852                 :             }
    1853                 : 
    1854                 :             /* Send an abort command in parallel if needed */
    1855               0 :             CONSTRUCT_ABORT_COMMAND(sql, entry, toplevel);
    1856               0 :             if (!pgfdw_exec_cleanup_query_begin(entry->conn, sql))
    1857                 :             {
    1858                 :                 /* Unable to abort remote (sub)transaction */
    1859               0 :                 pgfdw_reset_xact_state(entry, toplevel);
    1860                 :             }
    1861                 :             else
    1862               0 :                 pending_entries = lappend(pending_entries, entry);
    1863                 :         }
    1864                 :     }
    1865                 : 
    1866                 :     /* No further work if no pending entries */
    1867 GNC           4 :     if (!pending_entries)
    1868 UNC           0 :         return;
    1869                 : 
    1870                 :     /*
    1871                 :      * Get the result of the abort command for each of the pending entries
    1872                 :      */
    1873 GNC          12 :     foreach(lc, pending_entries)
    1874                 :     {
    1875               8 :         ConnCacheEntry *entry = (ConnCacheEntry *) lfirst(lc);
    1876                 :         TimestampTz endtime;
    1877                 :         char        sql[100];
    1878                 : 
    1879               8 :         Assert(entry->changing_xact_state);
    1880                 : 
    1881                 :         /*
    1882                 :          * Set end time.  We do this now, not before issuing the command like
    1883                 :          * in normal mode, for the same reason as for the cancel_requested
    1884                 :          * entries.
    1885                 :          */
    1886               8 :         endtime = TimestampTzPlusMilliseconds(GetCurrentTimestamp(),
    1887                 :                                               CONNECTION_CLEANUP_TIMEOUT);
    1888                 : 
    1889               8 :         CONSTRUCT_ABORT_COMMAND(sql, entry, toplevel);
    1890               8 :         if (!pgfdw_exec_cleanup_query_end(entry->conn, sql, endtime,
    1891                 :                                           true, false))
    1892                 :         {
    1893                 :             /* Unable to abort remote (sub)transaction */
    1894 UNC           0 :             pgfdw_reset_xact_state(entry, toplevel);
    1895 GNC           4 :             continue;
    1896                 :         }
    1897                 : 
    1898               8 :         if (toplevel)
    1899                 :         {
    1900                 :             /* Do a DEALLOCATE ALL in parallel if needed */
    1901               4 :             if (entry->have_prep_stmt && entry->have_error)
    1902                 :             {
    1903               4 :                 if (!pgfdw_exec_cleanup_query_begin(entry->conn,
    1904                 :                                                     "DEALLOCATE ALL"))
    1905                 :                 {
    1906                 :                     /* Trouble clearing prepared statements */
    1907 UNC           0 :                     pgfdw_reset_xact_state(entry, toplevel);
    1908                 :                 }
    1909                 :                 else
    1910 GNC           4 :                     pending_deallocs = lappend(pending_deallocs, entry);
    1911               4 :                 continue;
    1912                 :             }
    1913 UNC           0 :             entry->have_prep_stmt = false;
    1914               0 :             entry->have_error = false;
    1915                 :         }
    1916                 : 
    1917                 :         /* Reset the per-connection state if needed */
    1918 GNC           4 :         if (entry->state.pendingAreq)
    1919 UNC           0 :             memset(&entry->state, 0, sizeof(entry->state));
    1920                 : 
    1921                 :         /* We're done with this entry; unset the changing_xact_state flag */
    1922 GNC           4 :         entry->changing_xact_state = false;
    1923               4 :         pgfdw_reset_xact_state(entry, toplevel);
    1924                 :     }
    1925                 : 
    1926                 :     /* No further work if no pending entries */
    1927               4 :     if (!pending_deallocs)
    1928               2 :         return;
    1929               2 :     Assert(toplevel);
    1930                 : 
    1931                 :     /*
    1932                 :      * Get the result of the DEALLOCATE command for each of the pending
    1933                 :      * entries
    1934                 :      */
    1935               6 :     foreach(lc, pending_deallocs)
    1936                 :     {
    1937               4 :         ConnCacheEntry *entry = (ConnCacheEntry *) lfirst(lc);
    1938                 :         TimestampTz endtime;
    1939                 : 
    1940               4 :         Assert(entry->changing_xact_state);
    1941               4 :         Assert(entry->have_prep_stmt);
    1942               4 :         Assert(entry->have_error);
    1943                 : 
    1944                 :         /*
    1945                 :          * Set end time.  We do this now, not before issuing the command like
    1946                 :          * in normal mode, for the same reason as for the cancel_requested
    1947                 :          * entries.
    1948                 :          */
    1949               4 :         endtime = TimestampTzPlusMilliseconds(GetCurrentTimestamp(),
    1950                 :                                               CONNECTION_CLEANUP_TIMEOUT);
    1951                 : 
    1952               4 :         if (!pgfdw_exec_cleanup_query_end(entry->conn, "DEALLOCATE ALL",
    1953                 :                                           endtime, true, true))
    1954                 :         {
    1955                 :             /* Trouble clearing prepared statements */
    1956 UNC           0 :             pgfdw_reset_xact_state(entry, toplevel);
    1957               0 :             continue;
    1958                 :         }
    1959 GNC           4 :         entry->have_prep_stmt = false;
    1960               4 :         entry->have_error = false;
    1961                 : 
    1962                 :         /* Reset the per-connection state if needed */
    1963               4 :         if (entry->state.pendingAreq)
    1964 UNC           0 :             memset(&entry->state, 0, sizeof(entry->state));
    1965                 : 
    1966                 :         /* We're done with this entry; unset the changing_xact_state flag */
    1967 GNC           4 :         entry->changing_xact_state = false;
    1968               4 :         pgfdw_reset_xact_state(entry, toplevel);
    1969                 :     }
    1970                 : }
    1971                 : 
    1972                 : /*
    1973                 :  * List active foreign server connections.
    1974                 :  *
    1975 ECB             :  * This function takes no input parameter and returns setof record made of
    1976                 :  * following values:
    1977                 :  * - server_name - server name of active connection. In case the foreign server
    1978                 :  *   is dropped but still the connection is active, then the server name will
    1979                 :  *   be NULL in output.
    1980                 :  * - valid - true/false representing whether the connection is valid or not.
    1981                 :  *   Note that the connections can get invalidated in pgfdw_inval_callback.
    1982                 :  *
    1983 EUB             :  * No records are returned when there are no cached connections at all.
    1984                 :  */
    1985                 : Datum
    1986 GIC          11 : postgres_fdw_get_connections(PG_FUNCTION_ARGS)
    1987                 : {
    1988 ECB             : #define POSTGRES_FDW_GET_CONNECTIONS_COLS   2
    1989 GBC          11 :     ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
    1990                 :     HASH_SEQ_STATUS scan;
    1991                 :     ConnCacheEntry *entry;
    1992                 : 
    1993 GIC          11 :     InitMaterializedSRF(fcinfo, 0);
    1994 ECB             : 
    1995                 :     /* If cache doesn't exist, we return no records */
    1996 GIC          11 :     if (!ConnectionHash)
    1997 LBC           0 :         PG_RETURN_VOID();
    1998                 : 
    1999 GIC          11 :     hash_seq_init(&scan, ConnectionHash);
    2000              82 :     while ((entry = (ConnCacheEntry *) hash_seq_search(&scan)))
    2001                 :     {
    2002                 :         ForeignServer *server;
    2003 GNC          71 :         Datum       values[POSTGRES_FDW_GET_CONNECTIONS_COLS] = {0};
    2004              71 :         bool        nulls[POSTGRES_FDW_GET_CONNECTIONS_COLS] = {0};
    2005 ECB             : 
    2006                 :         /* We only look for open remote connections */
    2007 GBC          71 :         if (!entry->conn)
    2008              60 :             continue;
    2009 EUB             : 
    2010 GIC          11 :         server = GetForeignServerExtended(entry->serverid, FSV_MISSING_OK);
    2011                 : 
    2012 ECB             :         /*
    2013                 :          * The foreign server may have been dropped in current explicit
    2014 EUB             :          * transaction. It is not possible to drop the server from another
    2015 ECB             :          * session when the connection associated with it is in use in the
    2016                 :          * current transaction, if tried so, the drop query in another session
    2017                 :          * blocks until the current transaction finishes.
    2018                 :          *
    2019                 :          * Even though the server is dropped in the current transaction, the
    2020                 :          * cache can still have associated active connection entry, say we
    2021                 :          * call such connections dangling. Since we can not fetch the server
    2022                 :          * name from system catalogs for dangling connections, instead we show
    2023                 :          * NULL value for server name in output.
    2024                 :          *
    2025                 :          * We could have done better by storing the server name in the cache
    2026                 :          * entry instead of server oid so that it could be used in the output.
    2027                 :          * But the server name in each cache entry requires 64 bytes of
    2028                 :          * memory, which is huge, when there are many cached connections and
    2029                 :          * the use case i.e. dropping the foreign server within the explicit
    2030                 :          * current transaction seems rare. So, we chose to show NULL value for
    2031                 :          * server name in output.
    2032                 :          *
    2033                 :          * Such dangling connections get closed either in next use or at the
    2034                 :          * end of current explicit transaction in pgfdw_xact_callback.
    2035                 :          */
    2036 GIC          11 :         if (!server)
    2037 ECB             :         {
    2038                 :             /*
    2039                 :              * If the server has been dropped in the current explicit
    2040                 :              * transaction, then this entry would have been invalidated in
    2041                 :              * pgfdw_inval_callback at the end of drop server command. Note
    2042                 :              * that this connection would not have been closed in
    2043                 :              * pgfdw_inval_callback because it is still being used in the
    2044                 :              * current explicit transaction. So, assert that here.
    2045                 :              */
    2046 GIC           1 :             Assert(entry->conn && entry->xact_depth > 0 && entry->invalidated);
    2047 ECB             : 
    2048                 :             /* Show null, if no server name was found */
    2049 GIC           1 :             nulls[0] = true;
    2050                 :         }
    2051 ECB             :         else
    2052 GIC          10 :             values[0] = CStringGetTextDatum(server->servername);
    2053                 : 
    2054 CBC          11 :         values[1] = BoolGetDatum(!entry->invalidated);
    2055                 : 
    2056              11 :         tuplestore_putvalues(rsinfo->setResult, rsinfo->setDesc, values, nulls);
    2057 ECB             :     }
    2058                 : 
    2059 GIC          11 :     PG_RETURN_VOID();
    2060 ECB             : }
    2061                 : 
    2062                 : /*
    2063                 :  * Disconnect the specified cached connections.
    2064                 :  *
    2065                 :  * This function discards the open connections that are established by
    2066                 :  * postgres_fdw from the local session to the foreign server with
    2067                 :  * the given name. Note that there can be multiple connections to
    2068                 :  * the given server using different user mappings. If the connections
    2069                 :  * are used in the current local transaction, they are not disconnected
    2070                 :  * and warning messages are reported. This function returns true
    2071                 :  * if it disconnects at least one connection, otherwise false. If no
    2072                 :  * foreign server with the given name is found, an error is reported.
    2073                 :  */
    2074                 : Datum
    2075 GIC           4 : postgres_fdw_disconnect(PG_FUNCTION_ARGS)
    2076                 : {
    2077                 :     ForeignServer *server;
    2078 ECB             :     char       *servername;
    2079                 : 
    2080 GIC           4 :     servername = text_to_cstring(PG_GETARG_TEXT_PP(0));
    2081 CBC           4 :     server = GetForeignServerByName(servername, false);
    2082                 : 
    2083               3 :     PG_RETURN_BOOL(disconnect_cached_connections(server->serverid));
    2084                 : }
    2085 ECB             : 
    2086 EUB             : /*
    2087                 :  * Disconnect all the cached connections.
    2088 ECB             :  *
    2089                 :  * This function discards all the open connections that are established by
    2090                 :  * postgres_fdw from the local session to the foreign servers.
    2091                 :  * If the connections are used in the current local transaction, they are
    2092                 :  * not disconnected and warning messages are reported. This function
    2093                 :  * returns true if it disconnects at least one connection, otherwise false.
    2094                 :  */
    2095                 : Datum
    2096 GIC           4 : postgres_fdw_disconnect_all(PG_FUNCTION_ARGS)
    2097                 : {
    2098               4 :     PG_RETURN_BOOL(disconnect_cached_connections(InvalidOid));
    2099                 : }
    2100 ECB             : 
    2101                 : /*
    2102                 :  * Workhorse to disconnect cached connections.
    2103                 :  *
    2104                 :  * This function scans all the connection cache entries and disconnects
    2105                 :  * the open connections whose foreign server OID matches with
    2106                 :  * the specified one. If InvalidOid is specified, it disconnects all
    2107                 :  * the cached connections.
    2108                 :  *
    2109                 :  * This function emits a warning for each connection that's used in
    2110                 :  * the current transaction and doesn't close it. It returns true if
    2111                 :  * it disconnects at least one connection, otherwise false.
    2112                 :  *
    2113                 :  * Note that this function disconnects even the connections that are
    2114                 :  * established by other users in the same local session using different
    2115                 :  * user mappings. This leads even non-superuser to be able to close
    2116                 :  * the connections established by superusers in the same local session.
    2117                 :  *
    2118                 :  * XXX As of now we don't see any security risk doing this. But we should
    2119                 :  * set some restrictions on that, for example, prevent non-superuser
    2120                 :  * from closing the connections established by superusers even
    2121                 :  * in the same session?
    2122                 :  */
    2123                 : static bool
    2124 GIC           7 : disconnect_cached_connections(Oid serverid)
    2125 ECB             : {
    2126                 :     HASH_SEQ_STATUS scan;
    2127                 :     ConnCacheEntry *entry;
    2128 GIC           7 :     bool        all = !OidIsValid(serverid);
    2129               7 :     bool        result = false;
    2130                 : 
    2131                 :     /*
    2132                 :      * Connection cache hashtable has not been initialized yet in this
    2133                 :      * session, so return false.
    2134 ECB             :      */
    2135 GIC           7 :     if (!ConnectionHash)
    2136 UIC           0 :         return false;
    2137 ECB             : 
    2138 GIC           7 :     hash_seq_init(&scan, ConnectionHash);
    2139              50 :     while ((entry = (ConnCacheEntry *) hash_seq_search(&scan)))
    2140                 :     {
    2141                 :         /* Ignore cache entry if no open connection right now. */
    2142              43 :         if (!entry->conn)
    2143              32 :             continue;
    2144 ECB             : 
    2145 GIC          11 :         if (all || entry->serverid == serverid)
    2146 EUB             :         {
    2147                 :             /*
    2148                 :              * Emit a warning because the connection to close is used in the
    2149                 :              * current transaction and cannot be disconnected right now.
    2150                 :              */
    2151 GIC           8 :             if (entry->xact_depth > 0)
    2152 EUB             :             {
    2153                 :                 ForeignServer *server;
    2154                 : 
    2155 GIC           3 :                 server = GetForeignServerExtended(entry->serverid,
    2156                 :                                                   FSV_MISSING_OK);
    2157                 : 
    2158               3 :                 if (!server)
    2159                 :                 {
    2160                 :                     /*
    2161                 :                      * If the foreign server was dropped while its connection
    2162                 :                      * was used in the current transaction, the connection
    2163 EUB             :                      * must have been marked as invalid by
    2164                 :                      * pgfdw_inval_callback at the end of DROP SERVER command.
    2165                 :                      */
    2166 UBC           0 :                     Assert(entry->invalidated);
    2167                 : 
    2168 UIC           0 :                     ereport(WARNING,
    2169 EUB             :                             (errmsg("cannot close dropped server connection because it is still in use")));
    2170                 :                 }
    2171                 :                 else
    2172 GIC           3 :                     ereport(WARNING,
    2173                 :                             (errmsg("cannot close connection for server \"%s\" because it is still in use",
    2174 EUB             :                                     server->servername)));
    2175                 :             }
    2176                 :             else
    2177                 :             {
    2178 GBC           5 :                 elog(DEBUG3, "discarding connection %p", entry->conn);
    2179 GIC           5 :                 disconnect_pg_server(entry);
    2180               5 :                 result = true;
    2181 EUB             :             }
    2182                 :         }
    2183                 :     }
    2184                 : 
    2185 GIC           7 :     return result;
    2186 ECB             : }
        

Generated by: LCOV version v1.16-55-g56c0a2a