Age Owner TLA Line data Source code
1 : /*-------------------------------------------------------------------------
2 : *
3 : * connection.c
4 : * Connection management functions for postgres_fdw
5 : *
6 : * Portions Copyright (c) 2012-2023, PostgreSQL Global Development Group
7 : *
8 : * IDENTIFICATION
9 : * contrib/postgres_fdw/connection.c
10 : *
11 : *-------------------------------------------------------------------------
12 : */
13 : #include "postgres.h"
14 :
15 : #include "access/htup_details.h"
16 : #include "access/xact.h"
17 : #include "catalog/pg_user_mapping.h"
18 : #include "commands/defrem.h"
19 : #include "funcapi.h"
20 : #include "libpq/libpq-be-fe-helpers.h"
21 : #include "mb/pg_wchar.h"
22 : #include "miscadmin.h"
23 : #include "pgstat.h"
24 : #include "postgres_fdw.h"
25 : #include "storage/fd.h"
26 : #include "storage/latch.h"
27 : #include "utils/builtins.h"
28 : #include "utils/datetime.h"
29 : #include "utils/hsearch.h"
30 : #include "utils/inval.h"
31 : #include "utils/memutils.h"
32 : #include "utils/syscache.h"
33 :
34 : /*
35 : * Connection cache hash table entry
36 : *
37 : * The lookup key in this hash table is the user mapping OID. We use just one
38 : * connection per user mapping ID, which ensures that all the scans use the
39 : * same snapshot during a query. Using the user mapping OID rather than
40 : * the foreign server OID + user OID avoids creating multiple connections when
41 : * the public user mapping applies to all user OIDs.
42 : *
43 : * The "conn" pointer can be NULL if we don't currently have a live connection.
44 : * When we do have a connection, xact_depth tracks the current depth of
45 : * transactions and subtransactions open on the remote side. We need to issue
46 : * commands at the same nesting depth on the remote as we're executing at
47 : * ourselves, so that rolling back a subtransaction will kill the right
48 : * queries and not the wrong ones.
49 : */
50 : typedef Oid ConnCacheKey;
51 :
52 : typedef struct ConnCacheEntry
53 : {
54 : ConnCacheKey key; /* hash key (must be first) */
55 : PGconn *conn; /* connection to foreign server, or NULL */
56 : /* Remaining fields are invalid when conn is NULL: */
57 : int xact_depth; /* 0 = no xact open, 1 = main xact open, 2 =
58 : * one level of subxact open, etc */
59 : bool have_prep_stmt; /* have we prepared any stmts in this xact? */
60 : bool have_error; /* have any subxacts aborted in this xact? */
61 : bool changing_xact_state; /* xact state change in process */
62 : bool parallel_commit; /* do we commit (sub)xacts in parallel? */
63 : bool parallel_abort; /* do we abort (sub)xacts in parallel? */
64 : bool invalidated; /* true if reconnect is pending */
65 : bool keep_connections; /* setting value of keep_connections
66 : * server option */
67 : Oid serverid; /* foreign server OID used to get server name */
68 : uint32 server_hashvalue; /* hash value of foreign server OID */
69 : uint32 mapping_hashvalue; /* hash value of user mapping OID */
70 : PgFdwConnState state; /* extra per-connection state */
71 : } ConnCacheEntry;
72 :
73 : /*
74 : * Connection cache (initialized on first use)
75 : */
76 : static HTAB *ConnectionHash = NULL;
77 :
78 : /* for assigning cursor numbers and prepared statement numbers */
79 : static unsigned int cursor_number = 0;
80 : static unsigned int prep_stmt_number = 0;
81 :
82 : /* tracks whether any work is needed in callback functions */
83 : static bool xact_got_connection = false;
84 :
85 : /*
86 : * Milliseconds to wait to cancel an in-progress query or execute a cleanup
87 : * query; if it takes longer than 30 seconds to do these, we assume the
88 : * connection is dead.
89 : */
90 : #define CONNECTION_CLEANUP_TIMEOUT 30000
91 :
92 : /* Macro for constructing abort command to be sent */
93 : #define CONSTRUCT_ABORT_COMMAND(sql, entry, toplevel) \
94 : do { \
95 : if (toplevel) \
96 : snprintf((sql), sizeof(sql), \
97 : "ABORT TRANSACTION"); \
98 : else \
99 : snprintf((sql), sizeof(sql), \
100 : "ROLLBACK TO SAVEPOINT s%d; RELEASE SAVEPOINT s%d", \
101 : (entry)->xact_depth, (entry)->xact_depth); \
102 : } while(0)
103 :
104 : /*
105 : * SQL functions
106 : */
811 fujii 107 GIC 2 : PG_FUNCTION_INFO_V1(postgres_fdw_get_connections);
803 108 2 : PG_FUNCTION_INFO_V1(postgres_fdw_disconnect);
109 2 : PG_FUNCTION_INFO_V1(postgres_fdw_disconnect_all);
110 :
111 : /* prototypes of private functions */
112 : static void make_new_connection(ConnCacheEntry *entry, UserMapping *user);
113 : static PGconn *connect_pg_server(ForeignServer *server, UserMapping *user);
114 : static void disconnect_pg_server(ConnCacheEntry *entry);
115 : static void check_conn_params(const char **keywords, const char **values, UserMapping *user);
116 : static void configure_remote_session(PGconn *conn);
117 : static void do_sql_command_begin(PGconn *conn, const char *sql);
118 : static void do_sql_command_end(PGconn *conn, const char *sql,
119 : bool consume_input);
120 : static void begin_remote_xact(ConnCacheEntry *entry);
121 : static void pgfdw_xact_callback(XactEvent event, void *arg);
122 : static void pgfdw_subxact_callback(SubXactEvent event,
123 : SubTransactionId mySubid,
124 : SubTransactionId parentSubid,
125 : void *arg);
126 : static void pgfdw_inval_callback(Datum arg, int cacheid, uint32 hashvalue);
127 : static void pgfdw_reject_incomplete_xact_state_change(ConnCacheEntry *entry);
409 efujita 128 ECB : static void pgfdw_reset_xact_state(ConnCacheEntry *entry, bool toplevel);
2132 rhaas 129 : static bool pgfdw_cancel_query(PGconn *conn);
130 : static bool pgfdw_cancel_query_begin(PGconn *conn);
131 : static bool pgfdw_cancel_query_end(PGconn *conn, TimestampTz endtime,
132 : bool consume_input);
133 : static bool pgfdw_exec_cleanup_query(PGconn *conn, const char *query,
134 : bool ignore_errors);
135 : static bool pgfdw_exec_cleanup_query_begin(PGconn *conn, const char *query);
136 : static bool pgfdw_exec_cleanup_query_end(PGconn *conn, const char *query,
137 : TimestampTz endtime,
138 : bool consume_input,
139 : bool ignore_errors);
140 : static bool pgfdw_get_cleanup_result(PGconn *conn, TimestampTz endtime,
141 : PGresult **result, bool *timed_out);
142 : static void pgfdw_abort_cleanup(ConnCacheEntry *entry, bool toplevel);
143 : static bool pgfdw_abort_cleanup_begin(ConnCacheEntry *entry, bool toplevel,
144 : List **pending_entries,
145 : List **cancel_requested);
146 : static void pgfdw_finish_pre_commit_cleanup(List *pending_entries);
147 : static void pgfdw_finish_pre_subcommit_cleanup(List *pending_entries,
148 : int curlevel);
149 : static void pgfdw_finish_abort_cleanup(List *pending_entries,
150 : List *cancel_requested,
151 : bool toplevel);
152 : static bool UserMappingPasswordRequired(UserMapping *user);
153 : static bool disconnect_cached_connections(Oid serverid);
154 :
155 : /*
156 : * Get a PGconn which can be used to execute queries on the remote PostgreSQL
157 : * server with the user's authorization. A new connection is established
158 : * if we don't already have a suitable one, and a transaction is opened at
159 : * the right subtransaction nesting depth if we didn't do that already.
160 : *
161 : * will_prep_stmt must be true if caller intends to create any prepared
162 : * statements. Since those don't go away automatically at transaction end
163 : * (not even on error), we need this flag to cue manual cleanup.
164 : *
165 : * If state is not NULL, *state receives the per-connection state associated
166 : * with the PGconn.
167 : */
168 : PGconn *
739 efujita 169 GIC 1784 : GetConnection(UserMapping *user, bool will_prep_stmt, PgFdwConnState **state)
170 : {
171 : bool found;
905 fujii 172 1784 : bool retry = false;
173 : ConnCacheEntry *entry;
174 : ConnCacheKey key;
175 1784 : MemoryContext ccxt = CurrentMemoryContext;
176 :
177 : /* First time through, initialize connection cache hashtable */
3699 tgl 178 1784 : if (ConnectionHash == NULL)
179 : {
180 : HASHCTL ctl;
181 :
182 4 : ctl.keysize = sizeof(ConnCacheKey);
183 4 : ctl.entrysize = sizeof(ConnCacheEntry);
184 4 : ConnectionHash = hash_create("postgres_fdw connections", 8,
185 : &ctl,
186 : HASH_ELEM | HASH_BLOBS);
187 :
188 : /*
189 : * Register some callback functions that manage connection cleanup.
190 : * This should be done just once in each backend.
191 : */
192 4 : RegisterXactCallback(pgfdw_xact_callback, NULL);
193 4 : RegisterSubXactCallback(pgfdw_subxact_callback, NULL);
2088 194 4 : CacheRegisterSyscacheCallback(FOREIGNSERVEROID,
195 : pgfdw_inval_callback, (Datum) 0);
196 4 : CacheRegisterSyscacheCallback(USERMAPPINGOID,
197 : pgfdw_inval_callback, (Datum) 0);
198 : }
199 :
200 : /* Set flag that we did GetConnection during the current transaction */
3699 201 1784 : xact_got_connection = true;
202 :
203 : /* Create hash key for the entry. Assume no pad bytes in key struct */
2628 rhaas 204 CBC 1784 : key = user->umid;
205 :
206 : /*
3699 tgl 207 ECB : * Find or create cached entry for requested connection.
208 : */
3699 tgl 209 GIC 1784 : entry = hash_search(ConnectionHash, &key, HASH_ENTER, &found);
3699 tgl 210 CBC 1784 : if (!found)
211 : {
212 : /*
2088 tgl 213 ECB : * We need only clear "conn" here; remaining fields will be filled
214 : * later when "conn" is set.
215 : */
3699 tgl 216 GIC 11 : entry->conn = NULL;
3699 tgl 217 ECB : }
218 :
2132 rhaas 219 : /* Reject further use of connections which failed abort cleanup. */
2132 rhaas 220 GIC 1784 : pgfdw_reject_incomplete_xact_state_change(entry);
221 :
222 : /*
223 : * If the connection needs to be remade due to invalidation, disconnect as
224 : * soon as we're out of all transactions.
225 : */
905 fujii 226 1784 : if (entry->conn != NULL && entry->invalidated && entry->xact_depth == 0)
2088 tgl 227 ECB : {
905 fujii 228 LBC 0 : elog(DEBUG3, "closing connection %p for option changes to take effect",
905 fujii 229 ECB : entry->conn);
2088 tgl 230 UIC 0 : disconnect_pg_server(entry);
2088 tgl 231 ECB : }
232 :
233 : /*
234 : * If cache entry doesn't have a connection, we have to establish a new
235 : * connection. (If connect_pg_server throws an error, the cache entry
236 : * will remain in a valid empty state, ie conn == NULL.)
237 : */
3699 tgl 238 GIC 1784 : if (entry->conn == NULL)
905 fujii 239 CBC 61 : make_new_connection(entry, user);
240 :
241 : /*
242 : * We check the health of the cached connection here when using it. In
243 : * cases where we're out of all transactions, if a broken connection is
549 efujita 244 ECB : * detected, we try to reestablish a new connection later.
3699 tgl 245 : */
915 fujii 246 GIC 1778 : PG_TRY();
247 : {
248 : /* Process a pending asynchronous request if any. */
739 efujita 249 1778 : if (entry->state.pendingAreq)
739 efujita 250 UIC 0 : process_pending_request(entry->state.pendingAreq);
915 fujii 251 ECB : /* Start a new transaction or subtransaction if needed. */
915 fujii 252 GIC 1778 : begin_remote_xact(entry);
253 : }
254 2 : PG_CATCH();
915 fujii 255 ECB : {
905 fujii 256 GIC 2 : MemoryContext ecxt = MemoryContextSwitchTo(ccxt);
257 2 : ErrorData *errdata = CopyErrorData();
258 :
259 : /*
260 : * Determine whether to try to reestablish the connection.
905 fujii 261 ECB : *
262 : * After a broken connection is detected in libpq, any error other
905 fujii 263 EUB : * than connection failure (e.g., out-of-memory) can be thrown
264 : * somewhere between return from libpq and the expected ereport() call
265 : * in pgfdw_report_error(). In this case, since PQstatus() indicates
266 : * CONNECTION_BAD, checking only PQstatus() causes the false detection
267 : * of connection failure. To avoid this, we also verify that the
268 : * error's sqlstate is ERRCODE_CONNECTION_FAILURE. Note that also
269 : * checking only the sqlstate can cause another false detection
270 : * because pgfdw_report_error() may report ERRCODE_CONNECTION_FAILURE
271 : * for any libpq-originated error condition.
272 : */
905 fujii 273 CBC 2 : if (errdata->sqlerrcode != ERRCODE_CONNECTION_FAILURE ||
274 2 : PQstatus(entry->conn) != CONNECTION_BAD ||
905 fujii 275 GIC 2 : entry->xact_depth > 0)
276 : {
277 1 : MemoryContextSwitchTo(ecxt);
915 278 1 : PG_RE_THROW();
279 : }
280 :
905 fujii 281 ECB : /* Clean up the error state */
905 fujii 282 GIC 1 : FlushErrorState();
283 1 : FreeErrorData(errdata);
905 fujii 284 CBC 1 : errdata = NULL;
905 fujii 285 EUB :
905 fujii 286 GIC 1 : retry = true;
915 fujii 287 ECB : }
915 fujii 288 GIC 1777 : PG_END_TRY();
915 fujii 289 ECB :
290 : /*
905 291 : * If a broken connection is detected, disconnect it, reestablish a new
292 : * connection and retry a new remote transaction. If connection failure is
293 : * reported again, we give up getting a connection.
294 : */
905 fujii 295 GIC 1777 : if (retry)
296 : {
297 1 : Assert(entry->xact_depth == 0);
298 :
915 299 1 : ereport(DEBUG3,
300 : (errmsg_internal("could not start remote transaction on connection %p",
301 : entry->conn)),
302 : errdetail_internal("%s", pchomp(PQerrorMessage(entry->conn))));
303 :
905 304 1 : elog(DEBUG3, "closing connection %p to reestablish a new one",
305 : entry->conn);
306 1 : disconnect_pg_server(entry);
307 :
23 efujita 308 GNC 1 : make_new_connection(entry, user);
905 fujii 309 ECB :
905 fujii 310 GIC 1 : begin_remote_xact(entry);
915 fujii 311 ECB : }
3699 tgl 312 :
313 : /* Remember if caller will prepare statements */
3682 tgl 314 GIC 1777 : entry->have_prep_stmt |= will_prep_stmt;
315 :
739 efujita 316 ECB : /* If caller needs access to the per-connection state, return it. */
739 efujita 317 CBC 1777 : if (state)
318 704 : *state = &entry->state;
319 :
3699 tgl 320 1777 : return entry->conn;
321 : }
3699 tgl 322 ECB :
323 : /*
324 : * Reset all transient state fields in the cached connection entry and
325 : * establish new connection to the remote server.
326 : */
327 : static void
905 fujii 328 GIC 62 : make_new_connection(ConnCacheEntry *entry, UserMapping *user)
905 fujii 329 ECB : {
905 fujii 330 GIC 62 : ForeignServer *server = GetForeignServer(user->serverid);
737 fujii 331 ECB : ListCell *lc;
332 :
905 fujii 333 CBC 62 : Assert(entry->conn == NULL);
334 :
335 : /* Reset all transient state fields, to be sure all are clean */
905 fujii 336 GIC 62 : entry->xact_depth = 0;
337 62 : entry->have_prep_stmt = false;
905 fujii 338 CBC 62 : entry->have_error = false;
905 fujii 339 GIC 62 : entry->changing_xact_state = false;
905 fujii 340 CBC 62 : entry->invalidated = false;
814 fujii 341 GIC 62 : entry->serverid = server->serverid;
905 fujii 342 CBC 62 : entry->server_hashvalue =
905 fujii 343 GIC 62 : GetSysCacheHashValue1(FOREIGNSERVEROID,
905 fujii 344 ECB : ObjectIdGetDatum(server->serverid));
905 fujii 345 GIC 62 : entry->mapping_hashvalue =
346 62 : GetSysCacheHashValue1(USERMAPPINGOID,
347 : ObjectIdGetDatum(user->umid));
739 efujita 348 CBC 62 : memset(&entry->state, 0, sizeof(entry->state));
349 :
350 : /*
737 fujii 351 ECB : * Determine whether to keep the connection that we're about to make here
352 : * open even after the transaction using it ends, so that the subsequent
353 : * transactions can re-use it.
354 : *
355 : * By default, all the connections to any foreign servers are kept open.
356 : *
357 : * Also determine whether to commit/abort (sub)transactions opened on the
358 : * remote server in parallel at (sub)transaction end, which is disabled by
359 : * default.
360 : *
361 : * Note: it's enough to determine these only when making a new connection
276 efujita 362 : * because if these settings for it are changed, it will be closed and
363 : * re-made later.
737 fujii 364 : */
737 fujii 365 GIC 62 : entry->keep_connections = true;
409 efujita 366 62 : entry->parallel_commit = false;
3 efujita 367 GNC 62 : entry->parallel_abort = false;
737 fujii 368 CBC 268 : foreach(lc, server->options)
369 : {
737 fujii 370 GIC 206 : DefElem *def = (DefElem *) lfirst(lc);
737 fujii 371 ECB :
737 fujii 372 CBC 206 : if (strcmp(def->defname, "keep_connections") == 0)
373 6 : entry->keep_connections = defGetBoolean(def);
409 efujita 374 200 : else if (strcmp(def->defname, "parallel_commit") == 0)
375 2 : entry->parallel_commit = defGetBoolean(def);
3 efujita 376 GNC 198 : else if (strcmp(def->defname, "parallel_abort") == 0)
377 2 : entry->parallel_abort = defGetBoolean(def);
737 fujii 378 ECB : }
379 :
905 380 : /* Now try to make the connection */
905 fujii 381 GIC 62 : entry->conn = connect_pg_server(server, user);
905 fujii 382 ECB :
905 fujii 383 CBC 56 : elog(DEBUG3, "new postgres_fdw connection %p for server \"%s\" (user mapping oid %u, userid %u)",
384 : entry->conn, server->servername, user->umid, user->userid);
385 56 : }
386 :
387 : /*
388 : * Connect to remote server using specified server and user mapping properties.
389 : */
390 : static PGconn *
3699 tgl 391 GIC 62 : connect_pg_server(ForeignServer *server, UserMapping *user)
392 : {
393 62 : PGconn *volatile conn = NULL;
394 :
395 : /*
396 : * Use PG_TRY block to ensure closing connection on error.
397 : */
398 62 : PG_TRY();
399 : {
400 : const char **keywords;
401 : const char **values;
471 fujii 402 CBC 62 : char *appname = NULL;
3699 tgl 403 ECB : int n;
404 :
405 : /*
406 : * Construct connection params from generic options of ForeignServer
407 : * and UserMapping. (Some of them might not be libpq options, in
408 : * which case we'll just waste a few array slots.) Add 4 extra slots
579 fujii 409 : * for application_name, fallback_application_name, client_encoding,
410 : * end marker.
3699 tgl 411 : */
579 fujii 412 CBC 62 : n = list_length(server->options) + list_length(user->options) + 4;
3699 tgl 413 62 : keywords = (const char **) palloc(n * sizeof(char *));
414 62 : values = (const char **) palloc(n * sizeof(char *));
415 :
3699 tgl 416 GIC 62 : n = 0;
417 124 : n += ExtractConnectionOptions(server->options,
3699 tgl 418 CBC 62 : keywords + n, values + n);
3699 tgl 419 GIC 124 : n += ExtractConnectionOptions(user->options,
3699 tgl 420 CBC 62 : keywords + n, values + n);
421 :
579 fujii 422 ECB : /*
423 : * Use pgfdw_application_name as application_name if set.
424 : *
425 : * PQconnectdbParams() processes the parameter arrays from start to
426 : * end. If any key word is repeated, the last value is used. Therefore
427 : * note that pgfdw_application_name must be added to the arrays after
428 : * options of ForeignServer are, so that it can override
429 : * application_name set in ForeignServer.
430 : */
579 fujii 431 GIC 62 : if (pgfdw_application_name && *pgfdw_application_name != '\0')
432 : {
433 1 : keywords[n] = "application_name";
434 1 : values[n] = pgfdw_application_name;
579 fujii 435 CBC 1 : n++;
436 : }
437 :
438 : /*
471 fujii 439 ECB : * Search the parameter arrays to find application_name setting, and
440 : * replace escape sequences in it with status information if found.
441 : * The arrays are searched backwards because the last value is used if
442 : * application_name is repeatedly set.
443 : */
471 fujii 444 GIC 158 : for (int i = n - 1; i >= 0; i--)
445 : {
446 112 : if (strcmp(keywords[i], "application_name") == 0 &&
447 16 : *(values[i]) != '\0')
448 : {
471 fujii 449 ECB : /*
450 : * Use this application_name setting if it's not empty string
451 : * even after any escape sequences in it are replaced.
452 : */
471 fujii 453 CBC 16 : appname = process_pgfdw_appname(values[i]);
454 16 : if (appname[0] != '\0')
471 fujii 455 ECB : {
471 fujii 456 CBC 16 : values[i] = appname;
457 16 : break;
458 : }
459 :
460 : /*
461 : * This empty application_name is not used, so we set
462 : * values[i] to NULL and keep searching the array to find the
463 : * next one.
464 : */
471 fujii 465 UIC 0 : values[i] = NULL;
466 0 : pfree(appname);
467 0 : appname = NULL;
471 fujii 468 ECB : }
469 : }
470 :
579 471 : /* Use "postgres_fdw" as fallback_application_name */
3699 tgl 472 CBC 62 : keywords[n] = "fallback_application_name";
3699 tgl 473 GIC 62 : values[n] = "postgres_fdw";
474 62 : n++;
475 :
476 : /* Set client_encoding so that libpq can convert encoding properly. */
477 62 : keywords[n] = "client_encoding";
478 62 : values[n] = GetDatabaseEncodingName();
479 62 : n++;
480 :
3699 tgl 481 CBC 62 : keywords[n] = values[n] = NULL;
482 :
1140 tgl 483 ECB : /* verify the set of connection parameters */
1951 rhaas 484 CBC 62 : check_conn_params(keywords, values, user);
485 :
1140 tgl 486 ECB : /* OK to make connection */
76 andres 487 GNC 60 : conn = libpqsrv_connect_params(keywords, values,
488 : false, /* expand_dbname */
489 : PG_WAIT_EXTENSION);
1140 tgl 490 ECB :
3699 tgl 491 CBC 60 : if (!conn || PQstatus(conn) != CONNECTION_OK)
3699 tgl 492 GIC 2 : ereport(ERROR,
2118 tgl 493 ECB : (errcode(ERRCODE_SQLCLIENT_UNABLE_TO_ESTABLISH_SQLCONNECTION),
494 : errmsg("could not connect to server \"%s\"",
495 : server->servername),
496 : errdetail_internal("%s", pchomp(PQerrorMessage(conn)))));
497 :
498 : /*
1 sfrost 499 : * Check that non-superuser has used password to establish connection;
500 : * otherwise, he's piggybacking on the postgres server's user
501 : * identity. See also dblink_security_check() in contrib/dblink and
502 : * check_conn_params.
503 : */
1 sfrost 504 CBC 58 : if (!superuser_arg(user->userid) && UserMappingPasswordRequired(user) &&
1 sfrost 505 GIC 2 : !PQconnectionUsedPassword(conn))
506 2 : ereport(ERROR,
507 : (errcode(ERRCODE_S_R_E_PROHIBITED_SQL_STATEMENT_ATTEMPTED),
508 : errmsg("password is required"),
509 : errdetail("Non-superuser cannot connect if the server does not request a password."),
510 : errhint("Target server's authentication method must be changed or password_required=false set in the user mapping attributes.")));
511 :
512 : /* Prepare new session for use */
3698 tgl 513 56 : configure_remote_session(conn);
514 :
471 fujii 515 56 : if (appname != NULL)
471 fujii 516 CBC 16 : pfree(appname);
3699 tgl 517 56 : pfree(keywords);
518 56 : pfree(values);
519 : }
3699 tgl 520 GIC 6 : PG_CATCH();
521 : {
76 andres 522 GNC 6 : libpqsrv_disconnect(conn);
3699 tgl 523 CBC 6 : PG_RE_THROW();
3699 tgl 524 ECB : }
3699 tgl 525 CBC 56 : PG_END_TRY();
526 :
527 56 : return conn;
528 : }
3699 tgl 529 ECB :
2088 530 : /*
531 : * Disconnect any open connection for a connection cache entry.
532 : */
533 : static void
2088 tgl 534 CBC 53 : disconnect_pg_server(ConnCacheEntry *entry)
535 : {
2088 tgl 536 GIC 53 : if (entry->conn != NULL)
537 : {
76 andres 538 GNC 53 : libpqsrv_disconnect(entry->conn);
2088 tgl 539 GIC 53 : entry->conn = NULL;
2088 tgl 540 ECB : }
2088 tgl 541 GIC 53 : }
2088 tgl 542 ECB :
543 : /*
1206 andrew 544 : * Return true if the password_required is defined and false for this user
545 : * mapping, otherwise false. The mapping has been pre-validated.
546 : */
547 : static bool
1206 andrew 548 GIC 5 : UserMappingPasswordRequired(UserMapping *user)
549 : {
550 : ListCell *cell;
551 :
552 8 : foreach(cell, user->options)
553 : {
1206 andrew 554 CBC 4 : DefElem *def = (DefElem *) lfirst(cell);
555 :
1206 andrew 556 GIC 4 : if (strcmp(def->defname, "password_required") == 0)
557 1 : return defGetBoolean(def);
1206 andrew 558 ECB : }
559 :
1206 andrew 560 CBC 4 : return true;
561 : }
1206 andrew 562 ECB :
3699 tgl 563 : /*
564 : * For non-superusers, insist that the connstr specify a password. This
565 : * prevents a password from being picked up from .pgpass, a service file, the
1206 andrew 566 : * environment, etc. We don't want the postgres user's passwords,
567 : * certificates, etc to be accessible to non-superusers. (See also
568 : * dblink_connstr_check in contrib/dblink.)
569 : */
570 : static void
1951 rhaas 571 GIC 62 : check_conn_params(const char **keywords, const char **values, UserMapping *user)
572 : {
573 : int i;
574 :
575 : /* no check required if superuser */
576 62 : if (superuser_arg(user->userid))
3699 tgl 577 CBC 57 : return;
578 :
579 : /* ok if params contain a non-empty password */
3699 tgl 580 GIC 19 : for (i = 0; keywords[i] != NULL; i++)
581 : {
3699 tgl 582 CBC 17 : if (strcmp(keywords[i], "password") == 0 && values[i][0] != '\0')
583 3 : return;
584 : }
585 :
1206 andrew 586 ECB : /* ok if the superuser explicitly said so at user mapping creation time */
1206 andrew 587 GIC 2 : if (!UserMappingPasswordRequired(user))
1206 andrew 588 LBC 0 : return;
1206 andrew 589 ECB :
3699 tgl 590 GIC 2 : ereport(ERROR,
591 : (errcode(ERRCODE_S_R_E_PROHIBITED_SQL_STATEMENT_ATTEMPTED),
592 : errmsg("password is required"),
1 sfrost 593 ECB : errdetail("Non-superusers must provide a password in the user mapping.")));
3699 tgl 594 EUB : }
595 :
3698 tgl 596 ECB : /*
597 : * Issue SET commands to make sure remote session is configured properly.
598 : *
599 : * We do this just once at connection, assuming nothing will change the
600 : * values later. Since we'll never send volatile function calls to the
601 : * remote, there shouldn't be any way to break this assumption from our end.
602 : * It's possible to think of ways to break it at the remote end, eg making
603 : * a foreign table point to a view that includes a set_config call ---
604 : * but once you admit the possibility of a malicious view definition,
605 : * there are any number of ways to break things.
606 : */
607 : static void
3698 tgl 608 GIC 56 : configure_remote_session(PGconn *conn)
609 : {
3681 610 56 : int remoteversion = PQserverVersion(conn);
611 :
612 : /* Force the search path to contain only pg_catalog (see deparse.c) */
613 56 : do_sql_command(conn, "SET search_path = pg_catalog");
3681 tgl 614 ECB :
615 : /*
616 : * Set remote timezone; this is basically just cosmetic, since all
617 : * transmitted and returned timestamptzs should specify a zone explicitly
618 : * anyway. However it makes the regression test outputs more predictable.
619 : *
620 : * We don't risk setting remote zone equal to ours, since the remote
621 : * server might use a different timezone database. Instead, use UTC
622 : * (quoted, because very old servers are picky about case).
623 : */
3670 tgl 624 GIC 56 : do_sql_command(conn, "SET timezone = 'UTC'");
625 :
626 : /*
627 : * Set values needed to ensure unambiguous data output from remote. (This
628 : * logic should match what pg_dump does. See also set_transmission_modes
629 : * in postgres_fdw.c.)
3681 tgl 630 ECB : */
3681 tgl 631 GIC 56 : do_sql_command(conn, "SET datestyle = ISO");
632 56 : if (remoteversion >= 80400)
633 56 : do_sql_command(conn, "SET intervalstyle = postgres");
634 56 : if (remoteversion >= 90000)
635 56 : do_sql_command(conn, "SET extra_float_digits = 3");
636 : else
3681 tgl 637 LBC 0 : do_sql_command(conn, "SET extra_float_digits = 2");
3681 tgl 638 CBC 56 : }
3681 tgl 639 ECB :
640 : /*
641 : * Convenience subroutine to issue a non-data-returning SQL command to remote
642 : */
731 fujii 643 EUB : void
3681 tgl 644 CBC 1651 : do_sql_command(PGconn *conn, const char *sql)
645 : {
409 efujita 646 GIC 1651 : do_sql_command_begin(conn, sql);
647 1651 : do_sql_command_end(conn, sql, false);
648 1648 : }
649 :
409 efujita 650 ECB : static void
409 efujita 651 GIC 1669 : do_sql_command_begin(PGconn *conn, const char *sql)
409 efujita 652 ECB : {
2132 rhaas 653 CBC 1669 : if (!PQsendQuery(conn, sql))
2132 rhaas 654 LBC 0 : pgfdw_report_error(ERROR, NULL, conn, false, sql);
409 efujita 655 GIC 1669 : }
656 :
409 efujita 657 ECB : static void
409 efujita 658 GIC 1669 : do_sql_command_end(PGconn *conn, const char *sql, bool consume_input)
409 efujita 659 ECB : {
409 efujita 660 EUB : PGresult *res;
409 efujita 661 ECB :
662 : /*
663 : * If requested, consume whatever data is available from the socket. (Note
332 tgl 664 : * that if all data is available, this allows pgfdw_get_result to call
665 : * PQgetResult without forcing the overhead of WaitLatchOrSocket, which
666 : * would be large compared to the overhead of PQconsumeInput.)
667 : */
409 efujita 668 GIC 1669 : if (consume_input && !PQconsumeInput(conn))
409 efujita 669 UIC 0 : pgfdw_report_error(ERROR, NULL, conn, false, sql);
2132 rhaas 670 GIC 1669 : res = pgfdw_get_result(conn, sql);
3698 tgl 671 1667 : if (PQresultStatus(res) != PGRES_COMMAND_OK)
3352 672 1 : pgfdw_report_error(ERROR, res, conn, true, sql);
3698 673 1666 : PQclear(res);
3698 tgl 674 CBC 1666 : }
3698 tgl 675 EUB :
3699 tgl 676 ECB : /*
677 : * Start remote transaction or subtransaction, if needed.
678 : *
679 : * Note that we always use at least REPEATABLE READ in the remote session.
680 : * This is so that, if a query initiates multiple scans of the same or
681 : * different foreign tables, we will get snapshot-consistent results from
682 : * those scans. A disadvantage is that we can't provide sane emulation of
683 : * READ COMMITTED behavior --- it would be nice if we had some other way to
684 : * control which remote queries share a snapshot.
685 : */
686 : static void
3699 tgl 687 GIC 1779 : begin_remote_xact(ConnCacheEntry *entry)
688 : {
689 1779 : int curlevel = GetCurrentTransactionNestLevel();
690 :
691 : /* Start main transaction if we haven't yet */
692 1779 : if (entry->xact_depth <= 0)
3699 tgl 693 ECB : {
694 : const char *sql;
695 :
3699 tgl 696 GIC 697 : elog(DEBUG3, "starting remote transaction on connection %p",
697 : entry->conn);
3699 tgl 698 ECB :
3699 tgl 699 GIC 697 : if (IsolationIsSerializable())
3699 tgl 700 UIC 0 : sql = "START TRANSACTION ISOLATION LEVEL SERIALIZABLE";
701 : else
3699 tgl 702 CBC 697 : sql = "START TRANSACTION ISOLATION LEVEL REPEATABLE READ";
2132 rhaas 703 GIC 697 : entry->changing_xact_state = true;
3681 tgl 704 697 : do_sql_command(entry->conn, sql);
3699 tgl 705 CBC 696 : entry->xact_depth = 1;
2132 rhaas 706 GBC 696 : entry->changing_xact_state = false;
707 : }
3699 tgl 708 ECB :
709 : /*
710 : * If we're in a subtransaction, stack up savepoints to match our level.
711 : * This ensures we can rollback just the desired effects when a
712 : * subtransaction aborts.
713 : */
3699 tgl 714 GIC 1792 : while (entry->xact_depth < curlevel)
715 : {
716 : char sql[64];
717 :
718 15 : snprintf(sql, sizeof(sql), "SAVEPOINT s%d", entry->xact_depth + 1);
2132 rhaas 719 15 : entry->changing_xact_state = true;
3681 tgl 720 CBC 15 : do_sql_command(entry->conn, sql);
3699 tgl 721 GIC 14 : entry->xact_depth++;
2132 rhaas 722 14 : entry->changing_xact_state = false;
723 : }
3699 tgl 724 CBC 1777 : }
3699 tgl 725 ECB :
726 : /*
727 : * Release connection reference count created by calling GetConnection.
728 : */
729 : void
3699 tgl 730 CBC 1728 : ReleaseConnection(PGconn *conn)
731 : {
732 : /*
733 : * Currently, we don't actually track connection references because all
734 : * cleanup is managed on a transaction or subtransaction basis instead. So
735 : * there's nothing to do here.
3699 tgl 736 ECB : */
3699 tgl 737 GIC 1728 : }
738 :
739 : /*
740 : * Assign a "unique" number for a cursor.
741 : *
742 : * These really only need to be unique per connection within a transaction.
3699 tgl 743 ECB : * For the moment we ignore the per-connection point and assign them across
744 : * all connections in the transaction, but we ask for the connection to be
745 : * supplied in case we want to refine that.
746 : *
747 : * Note that even if wraparound happens in a very long transaction, actual
748 : * collisions are highly improbable; just be sure to use %u not %d to print.
749 : */
750 : unsigned int
3699 tgl 751 GIC 503 : GetCursorNumber(PGconn *conn)
752 : {
753 503 : return ++cursor_number;
754 : }
755 :
756 : /*
3682 tgl 757 ECB : * Assign a "unique" number for a prepared statement.
758 : *
759 : * This works much like GetCursorNumber, except that we never reset the counter
760 : * within a session. That's because we can't be 100% sure we've gotten rid
761 : * of all prepared statements on all connections, and it's not really worth
762 : * increasing the risk of prepared-statement name collisions by resetting.
763 : */
764 : unsigned int
3682 tgl 765 GIC 174 : GetPrepStmtNumber(PGconn *conn)
766 : {
767 174 : return ++prep_stmt_number;
768 : }
769 :
770 : /*
2544 rhaas 771 ECB : * Submit a query and wait for the result.
772 : *
773 : * This function is interruptible by signals.
774 : *
775 : * Caller is responsible for the error handling on the result.
776 : */
777 : PGresult *
739 efujita 778 GIC 3560 : pgfdw_exec_query(PGconn *conn, const char *query, PgFdwConnState *state)
779 : {
780 : /* First, process a pending asynchronous request, if any. */
781 3560 : if (state && state->pendingAreq)
782 4 : process_pending_request(state->pendingAreq);
783 :
2544 rhaas 784 ECB : /*
785 : * Submit a query. Since we don't use non-blocking mode, this also can
786 : * block. But its risk is relatively small, so we ignore that for now.
787 : */
2544 rhaas 788 CBC 3560 : if (!PQsendQuery(conn, query))
2544 rhaas 789 UIC 0 : pgfdw_report_error(ERROR, NULL, conn, false, query);
790 :
791 : /* Wait for the result. */
2544 rhaas 792 GIC 3560 : return pgfdw_get_result(conn, query);
793 : }
2544 rhaas 794 ECB :
2544 rhaas 795 EUB : /*
796 : * Wait for the result from a prior asynchronous execution function call.
797 : *
2544 rhaas 798 ECB : * This function offers quick responsiveness by checking for any interruptions.
799 : *
800 : * This function emulates PQexec()'s behavior of returning the last result
801 : * when there are many.
802 : *
803 : * Caller is responsible for the error handling on the result.
804 : */
805 : PGresult *
2544 rhaas 806 GIC 7428 : pgfdw_get_result(PGconn *conn, const char *query)
807 : {
2124 tgl 808 7428 : PGresult *volatile last_res = NULL;
809 :
810 : /* In what follows, do not leak any PGresults on an error. */
811 7428 : PG_TRY();
2544 rhaas 812 ECB : {
813 : for (;;)
2544 rhaas 814 CBC 7428 : {
815 : PGresult *res;
816 :
2124 tgl 817 37034 : while (PQisBusy(conn))
818 : {
819 : int wc;
2544 rhaas 820 ECB :
821 : /* Sleep until there's something to do */
2124 tgl 822 GIC 7324 : wc = WaitLatchOrSocket(MyLatch,
1598 tmunro 823 ECB : WL_LATCH_SET | WL_SOCKET_READABLE |
824 : WL_EXIT_ON_PM_DEATH,
825 : PQsocket(conn),
826 : -1L, PG_WAIT_EXTENSION);
2124 tgl 827 GIC 7324 : ResetLatch(MyLatch);
2544 rhaas 828 ECB :
2124 tgl 829 GIC 7324 : CHECK_FOR_INTERRUPTS();
830 :
831 : /* Data available in socket? */
832 7324 : if (wc & WL_SOCKET_READABLE)
2124 tgl 833 ECB : {
2124 tgl 834 GIC 7322 : if (!PQconsumeInput(conn))
2124 tgl 835 CBC 2 : pgfdw_report_error(ERROR, NULL, conn, false, query);
836 : }
837 : }
2544 rhaas 838 ECB :
2124 tgl 839 GIC 14854 : res = PQgetResult(conn);
2124 tgl 840 CBC 14854 : if (res == NULL)
841 7426 : break; /* query is complete */
842 :
2124 tgl 843 GIC 7428 : PQclear(last_res);
844 7428 : last_res = res;
2124 tgl 845 ECB : }
846 : }
2124 tgl 847 CBC 2 : PG_CATCH();
848 : {
2544 rhaas 849 2 : PQclear(last_res);
2124 tgl 850 2 : PG_RE_THROW();
851 : }
2124 tgl 852 GIC 7426 : PG_END_TRY();
2544 rhaas 853 ECB :
2544 rhaas 854 GIC 7426 : return last_res;
2544 rhaas 855 ECB : }
856 :
857 : /*
3699 tgl 858 : * Report an error we got from the remote server.
859 : *
860 : * elevel: error level to use (typically ERROR, but might be less)
861 : * res: PGresult containing the error
862 : * conn: connection we did the query on
863 : * clear: if true, PQclear the result (otherwise caller will handle it)
864 : * sql: NULL, or text of remote command we tried to execute
865 : *
866 : * Note: callers that choose not to throw ERROR for a remote error are
867 : * responsible for making sure that the associated ConnCacheEntry gets
868 : * marked with have_error = true.
869 : */
870 : void
3352 tgl 871 GIC 15 : pgfdw_report_error(int elevel, PGresult *res, PGconn *conn,
872 : bool clear, const char *sql)
873 : {
874 : /* If requested, PGresult must be released before leaving this function. */
3699 875 15 : PG_TRY();
876 : {
3699 tgl 877 CBC 15 : char *diag_sqlstate = PQresultErrorField(res, PG_DIAG_SQLSTATE);
3699 tgl 878 GIC 15 : char *message_primary = PQresultErrorField(res, PG_DIAG_MESSAGE_PRIMARY);
879 15 : char *message_detail = PQresultErrorField(res, PG_DIAG_MESSAGE_DETAIL);
880 15 : char *message_hint = PQresultErrorField(res, PG_DIAG_MESSAGE_HINT);
3699 tgl 881 CBC 15 : char *message_context = PQresultErrorField(res, PG_DIAG_CONTEXT);
882 : int sqlstate;
3699 tgl 883 ECB :
3699 tgl 884 CBC 15 : if (diag_sqlstate)
885 13 : sqlstate = MAKE_SQLSTATE(diag_sqlstate[0],
3699 tgl 886 ECB : diag_sqlstate[1],
887 : diag_sqlstate[2],
888 : diag_sqlstate[3],
889 : diag_sqlstate[4]);
890 : else
3699 tgl 891 CBC 2 : sqlstate = ERRCODE_CONNECTION_FAILURE;
892 :
893 : /*
894 : * If we don't get a message from the PGresult, try the PGconn. This
895 : * is needed because for connection-level failures, PQexec may just
896 : * return NULL, not a PGresult at all.
3352 tgl 897 ECB : */
3352 tgl 898 GIC 15 : if (message_primary == NULL)
2232 peter_e 899 2 : message_primary = pchomp(PQerrorMessage(conn));
900 :
3699 tgl 901 15 : ereport(elevel,
902 : (errcode(sqlstate),
903 : (message_primary != NULL && message_primary[0] != '\0') ?
492 fujii 904 ECB : errmsg_internal("%s", message_primary) :
2300 mail 905 : errmsg("could not obtain message string for remote error"),
906 : message_detail ? errdetail_internal("%s", message_detail) : 0,
3699 tgl 907 : message_hint ? errhint("%s", message_hint) : 0,
908 : message_context ? errcontext("%s", message_context) : 0,
909 : sql ? errcontext("remote SQL command: %s", sql) : 0));
910 : }
1255 peter 911 GIC 15 : PG_FINALLY();
912 : {
3699 tgl 913 15 : if (clear)
914 12 : PQclear(res);
915 : }
916 15 : PG_END_TRY();
3699 tgl 917 LBC 0 : }
918 :
3699 tgl 919 ECB : /*
920 : * pgfdw_xact_callback --- cleanup at main-transaction end.
921 : *
881 noah 922 : * This runs just late enough that it must not enter user-defined code
881 noah 923 EUB : * locally. (Entering such code on the remote side is fine. Its remote
924 : * COMMIT TRANSACTION may run deferred triggers.)
925 : */
926 : static void
3699 tgl 927 GIC 3678 : pgfdw_xact_callback(XactEvent event, void *arg)
928 : {
929 : HASH_SEQ_STATUS scan;
930 : ConnCacheEntry *entry;
409 efujita 931 3678 : List *pending_entries = NIL;
3 efujita 932 GNC 3678 : List *cancel_requested = NIL;
933 :
3699 tgl 934 ECB : /* Quick exit if no connections were touched in this transaction. */
3699 tgl 935 GIC 3678 : if (!xact_got_connection)
936 3012 : return;
937 :
3699 tgl 938 ECB : /*
939 : * Scan all connection cache entries to find open remote transactions, and
940 : * close them.
941 : */
3699 tgl 942 CBC 666 : hash_seq_init(&scan, ConnectionHash);
943 2917 : while ((entry = (ConnCacheEntry *) hash_seq_search(&scan)))
944 : {
945 : PGresult *res;
946 :
947 : /* Ignore cache entry if no open connection right now */
3352 tgl 948 GIC 2252 : if (entry->conn == NULL)
3699 tgl 949 CBC 1060 : continue;
3699 tgl 950 ECB :
951 : /* If it has an open remote transaction, try to close it */
3352 tgl 952 GIC 1192 : if (entry->xact_depth > 0)
953 : {
954 697 : elog(DEBUG3, "closing remote transaction on connection %p",
3352 tgl 955 ECB : entry->conn);
956 :
3352 tgl 957 GIC 697 : switch (event)
958 : {
2901 rhaas 959 CBC 658 : case XACT_EVENT_PARALLEL_PRE_COMMIT:
960 : case XACT_EVENT_PRE_COMMIT:
2132 rhaas 961 ECB :
962 : /*
963 : * If abort cleanup previously failed for this connection,
964 : * we can't issue any more commands against it.
965 : */
2132 rhaas 966 CBC 658 : pgfdw_reject_incomplete_xact_state_change(entry);
967 :
968 : /* Commit all remote transactions during pre-commit */
2132 rhaas 969 GIC 658 : entry->changing_xact_state = true;
409 efujita 970 658 : if (entry->parallel_commit)
971 : {
972 16 : do_sql_command_begin(entry->conn, "COMMIT TRANSACTION");
409 efujita 973 CBC 16 : pending_entries = lappend(pending_entries, entry);
409 efujita 974 GIC 16 : continue;
975 : }
3352 tgl 976 CBC 642 : do_sql_command(entry->conn, "COMMIT TRANSACTION");
2132 rhaas 977 642 : entry->changing_xact_state = false;
978 :
3352 tgl 979 ECB : /*
980 : * If there were any errors in subtransactions, and we
981 : * made prepared statements, do a DEALLOCATE ALL to make
982 : * sure we get rid of all prepared statements. This is
983 : * annoying and not terribly bulletproof, but it's
984 : * probably not worth trying harder.
985 : *
986 : * DEALLOCATE ALL only exists in 8.3 and later, so this
987 : * constrains how old a server postgres_fdw can
988 : * communicate with. We intentionally ignore errors in
989 : * the DEALLOCATE, so that we can hobble along to some
990 : * extent with older servers (leaking prepared statements
991 : * as we go; but we don't really support update operations
992 : * pre-8.3 anyway).
993 : */
3682 tgl 994 GIC 642 : if (entry->have_prep_stmt && entry->have_error)
995 : {
3682 tgl 996 UIC 0 : res = PQexec(entry->conn, "DEALLOCATE ALL");
997 0 : PQclear(res);
998 : }
3682 tgl 999 GIC 642 : entry->have_prep_stmt = false;
1000 642 : entry->have_error = false;
3352 tgl 1001 CBC 642 : break;
3352 tgl 1002 GIC 1 : case XACT_EVENT_PRE_PREPARE:
3352 tgl 1003 EUB :
1004 : /*
1005 : * We disallow any remote transactions, since it's not
1248 efujita 1006 ECB : * very reasonable to hold them open until the prepared
1007 : * transaction is committed. For the moment, throw error
1008 : * unconditionally; later we might allow read-only cases.
1009 : * Note that the error will cause us to come right back
1010 : * here with event == XACT_EVENT_ABORT, so we'll clean up
1011 : * the connection state at that point.
1012 : */
3352 tgl 1013 GIC 1 : ereport(ERROR,
1014 : (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1015 : errmsg("cannot PREPARE a transaction that has operated on postgres_fdw foreign tables")));
1016 : break;
2901 rhaas 1017 UIC 0 : case XACT_EVENT_PARALLEL_COMMIT:
1018 : case XACT_EVENT_COMMIT:
1019 : case XACT_EVENT_PREPARE:
3352 tgl 1020 ECB : /* Pre-commit should have closed the open transaction */
3352 tgl 1021 UIC 0 : elog(ERROR, "missed cleaning up connection during pre-commit");
1022 : break;
2901 rhaas 1023 GIC 38 : case XACT_EVENT_PARALLEL_ABORT:
3352 tgl 1024 EUB : case XACT_EVENT_ABORT:
1025 : /* Rollback all remote transactions during abort */
3 efujita 1026 GNC 38 : if (entry->parallel_abort)
1027 : {
1028 4 : if (pgfdw_abort_cleanup_begin(entry, true,
1029 : &pending_entries,
1030 : &cancel_requested))
1031 4 : continue;
1032 : }
1033 : else
1034 34 : pgfdw_abort_cleanup(entry, true);
3352 tgl 1035 GIC 34 : break;
3352 tgl 1036 EUB : }
1037 : }
3699 tgl 1038 ECB :
1039 : /* Reset state to show we're out of a transaction */
409 efujita 1040 GIC 1171 : pgfdw_reset_xact_state(entry, true);
409 efujita 1041 ECB : }
1042 :
1043 : /* If there are any pending connections, finish cleaning them up */
3 efujita 1044 GNC 665 : if (pending_entries || cancel_requested)
1045 : {
1046 15 : if (event == XACT_EVENT_PARALLEL_PRE_COMMIT ||
1047 : event == XACT_EVENT_PRE_COMMIT)
1048 : {
1049 13 : Assert(cancel_requested == NIL);
1050 13 : pgfdw_finish_pre_commit_cleanup(pending_entries);
1051 : }
1052 : else
1053 : {
1054 2 : Assert(event == XACT_EVENT_PARALLEL_ABORT ||
1055 : event == XACT_EVENT_ABORT);
1056 2 : pgfdw_finish_abort_cleanup(pending_entries, cancel_requested,
1057 : true);
1058 : }
3699 tgl 1059 ECB : }
1060 :
1061 : /*
1062 : * Regardless of the event type, we can now mark ourselves as out of the
1063 : * transaction. (Note: if we are here during PRE_COMMIT or PRE_PREPARE,
1064 : * this saves a useless scan of the hashtable during COMMIT or PREPARE.)
1065 : */
3699 tgl 1066 GIC 665 : xact_got_connection = false;
1067 :
1068 : /* Also reset cursor numbering for next transaction */
3699 tgl 1069 CBC 665 : cursor_number = 0;
1070 : }
3699 tgl 1071 ECB :
1072 : /*
1073 : * pgfdw_subxact_callback --- cleanup at subtransaction end.
1074 : */
1075 : static void
3699 tgl 1076 GIC 38 : pgfdw_subxact_callback(SubXactEvent event, SubTransactionId mySubid,
1077 : SubTransactionId parentSubid, void *arg)
1078 : {
3699 tgl 1079 ECB : HASH_SEQ_STATUS scan;
1080 : ConnCacheEntry *entry;
1081 : int curlevel;
409 efujita 1082 GIC 38 : List *pending_entries = NIL;
3 efujita 1083 GNC 38 : List *cancel_requested = NIL;
1084 :
1085 : /* Nothing to do at subxact start, nor after commit. */
3699 tgl 1086 GIC 38 : if (!(event == SUBXACT_EVENT_PRE_COMMIT_SUB ||
1087 : event == SUBXACT_EVENT_ABORT_SUB))
1088 23 : return;
1089 :
1090 : /* Quick exit if no connections were touched in this transaction. */
1091 15 : if (!xact_got_connection)
3699 tgl 1092 LBC 0 : return;
1093 :
1094 : /*
3699 tgl 1095 ECB : * Scan all connection cache entries to find open remote subtransactions
1096 : * of the current level, and close them.
1097 : */
3699 tgl 1098 GIC 15 : curlevel = GetCurrentTransactionNestLevel();
1099 15 : hash_seq_init(&scan, ConnectionHash);
1100 87 : while ((entry = (ConnCacheEntry *) hash_seq_search(&scan)))
1101 : {
3699 tgl 1102 ECB : char sql[100];
1103 :
1104 : /*
1105 : * We only care about connections with open remote subtransactions of
1106 : * the current level.
1107 : */
3699 tgl 1108 CBC 72 : if (entry->conn == NULL || entry->xact_depth < curlevel)
1109 64 : continue;
1110 :
3699 tgl 1111 GIC 14 : if (entry->xact_depth > curlevel)
3699 tgl 1112 LBC 0 : elog(ERROR, "missed cleaning up remote subtransaction at level %d",
1113 : entry->xact_depth);
3699 tgl 1114 ECB :
3699 tgl 1115 GIC 14 : if (event == SUBXACT_EVENT_PRE_COMMIT_SUB)
1116 : {
2132 rhaas 1117 ECB : /*
2132 rhaas 1118 EUB : * If abort cleanup previously failed for this connection, we
1119 : * can't issue any more commands against it.
1120 : */
2132 rhaas 1121 GIC 7 : pgfdw_reject_incomplete_xact_state_change(entry);
1122 :
1123 : /* Commit all remote subtransactions during pre-commit */
3699 tgl 1124 CBC 7 : snprintf(sql, sizeof(sql), "RELEASE SAVEPOINT s%d", curlevel);
2132 rhaas 1125 7 : entry->changing_xact_state = true;
409 efujita 1126 7 : if (entry->parallel_commit)
1127 : {
409 efujita 1128 GIC 2 : do_sql_command_begin(entry->conn, sql);
1129 2 : pending_entries = lappend(pending_entries, entry);
1130 2 : continue;
1131 : }
3681 tgl 1132 5 : do_sql_command(entry->conn, sql);
2132 rhaas 1133 5 : entry->changing_xact_state = false;
3699 tgl 1134 ECB : }
564 fujii 1135 : else
1136 : {
1137 : /* Rollback all remote subtransactions during abort */
3 efujita 1138 GNC 7 : if (entry->parallel_abort)
1139 : {
1140 4 : if (pgfdw_abort_cleanup_begin(entry, false,
1141 : &pending_entries,
1142 : &cancel_requested))
1143 4 : continue;
1144 : }
1145 : else
1146 3 : pgfdw_abort_cleanup(entry, false);
1147 : }
1148 :
3699 tgl 1149 ECB : /* OK, we're outta that level of subtransaction */
409 efujita 1150 GIC 8 : pgfdw_reset_xact_state(entry, false);
1151 : }
1152 :
1153 : /* If there are any pending connections, finish cleaning them up */
3 efujita 1154 GNC 15 : if (pending_entries || cancel_requested)
409 efujita 1155 ECB : {
3 efujita 1156 GNC 3 : if (event == SUBXACT_EVENT_PRE_COMMIT_SUB)
1157 : {
1158 1 : Assert(cancel_requested == NIL);
1159 1 : pgfdw_finish_pre_subcommit_cleanup(pending_entries, curlevel);
1160 : }
1161 : else
1162 : {
1163 2 : Assert(event == SUBXACT_EVENT_ABORT_SUB);
1164 2 : pgfdw_finish_abort_cleanup(pending_entries, cancel_requested,
1165 : false);
1166 : }
3699 tgl 1167 ECB : }
1168 : }
2132 rhaas 1169 :
1170 : /*
2088 tgl 1171 : * Connection invalidation callback function
1172 : *
1173 : * After a change to a pg_foreign_server or pg_user_mapping catalog entry,
1174 : * close connections depending on that entry immediately if current transaction
832 fujii 1175 : * has not used those connections yet. Otherwise, mark those connections as
1176 : * invalid and then make pgfdw_xact_callback() close them at the end of current
1177 : * transaction, since they cannot be closed in the midst of the transaction
1178 : * using them. Closed connections will be remade at the next opportunity if
1179 : * necessary.
1180 : *
2088 tgl 1181 : * Although most cache invalidation callbacks blow away all the related stuff
1182 : * regardless of the given hashvalue, connections are expensive enough that
1183 : * it's worth trying to avoid that.
1184 : *
1185 : * NB: We could avoid unnecessary disconnection more strictly by examining
1186 : * individual option values, but it seems too much effort for the gain.
1187 : */
1188 : static void
2088 tgl 1189 CBC 162 : pgfdw_inval_callback(Datum arg, int cacheid, uint32 hashvalue)
1190 : {
1191 : HASH_SEQ_STATUS scan;
1192 : ConnCacheEntry *entry;
2088 tgl 1193 ECB :
2088 tgl 1194 GIC 162 : Assert(cacheid == FOREIGNSERVEROID || cacheid == USERMAPPINGOID);
1195 :
1196 : /* ConnectionHash must exist already, if we're registered */
2088 tgl 1197 CBC 162 : hash_seq_init(&scan, ConnectionHash);
2088 tgl 1198 GIC 950 : while ((entry = (ConnCacheEntry *) hash_seq_search(&scan)))
2088 tgl 1199 ECB : {
1200 : /* Ignore invalid entries */
2088 tgl 1201 CBC 788 : if (entry->conn == NULL)
1202 625 : continue;
1203 :
1204 : /* hashvalue == 0 means a cache reset, must clear all state */
2088 tgl 1205 GIC 163 : if (hashvalue == 0 ||
2088 tgl 1206 CBC 123 : (cacheid == FOREIGNSERVEROID &&
1207 163 : entry->server_hashvalue == hashvalue) ||
2088 tgl 1208 GIC 40 : (cacheid == USERMAPPINGOID &&
1209 40 : entry->mapping_hashvalue == hashvalue))
1210 : {
1211 : /*
1212 : * Close the connection immediately if it's not used yet in this
1213 : * transaction. Otherwise mark it as invalid so that
1214 : * pgfdw_xact_callback() can close it at the end of this
1215 : * transaction.
1216 : */
832 fujii 1217 46 : if (entry->xact_depth == 0)
1218 : {
1219 43 : elog(DEBUG3, "discarding connection %p", entry->conn);
1220 43 : disconnect_pg_server(entry);
1221 : }
1222 : else
1223 3 : entry->invalidated = true;
1224 : }
1225 : }
2088 tgl 1226 162 : }
1227 :
1228 : /*
1229 : * Raise an error if the given connection cache entry is marked as being
1230 : * in the middle of an xact state change. This should be called at which no
1231 : * such change is expected to be in progress; if one is found to be in
2132 rhaas 1232 ECB : * progress, it means that we aborted in the middle of a previous state change
1233 : * and now don't know what the remote transaction state actually is.
1234 : * Such connections can't safely be further used. Re-establishing the
1235 : * connection would change the snapshot and roll back any writes already
1236 : * performed, so that's not an option, either. Thus, we must abort.
1237 : */
1238 : static void
2132 rhaas 1239 GIC 2449 : pgfdw_reject_incomplete_xact_state_change(ConnCacheEntry *entry)
2132 rhaas 1240 ECB : {
1241 : ForeignServer *server;
1242 :
1243 : /* nothing to do for inactive entries and entries of sane state */
2088 tgl 1244 CBC 2449 : if (entry->conn == NULL || !entry->changing_xact_state)
2132 rhaas 1245 2449 : return;
1246 :
1247 : /* make sure this entry is inactive */
2088 tgl 1248 LBC 0 : disconnect_pg_server(entry);
2088 tgl 1249 ECB :
1250 : /* find server name to be shown in the message below */
814 fujii 1251 LBC 0 : server = GetForeignServer(entry->serverid);
2132 rhaas 1252 ECB :
2132 rhaas 1253 UIC 0 : ereport(ERROR,
1254 : (errcode(ERRCODE_CONNECTION_EXCEPTION),
1255 : errmsg("connection to server \"%s\" was lost",
1256 : server->servername)));
1257 : }
1258 :
1259 : /*
409 efujita 1260 ECB : * Reset state to show we're out of a (sub)transaction.
1261 : */
1262 : static void
409 efujita 1263 CBC 1205 : pgfdw_reset_xact_state(ConnCacheEntry *entry, bool toplevel)
1264 : {
409 efujita 1265 GIC 1205 : if (toplevel)
409 efujita 1266 ECB : {
1267 : /* Reset state to show we're out of a transaction */
409 efujita 1268 GIC 1191 : entry->xact_depth = 0;
409 efujita 1269 ECB :
1270 : /*
1271 : * If the connection isn't in a good idle state, it is marked as
1272 : * invalid or keep_connections option of its server is disabled, then
1273 : * discard it to recover. Next GetConnection will open a new
1274 : * connection.
1275 : */
409 efujita 1276 GIC 2381 : if (PQstatus(entry->conn) != CONNECTION_OK ||
1277 1190 : PQtransactionStatus(entry->conn) != PQTRANS_IDLE ||
1278 1190 : entry->changing_xact_state ||
1279 1190 : entry->invalidated ||
1280 1188 : !entry->keep_connections)
1281 : {
409 efujita 1282 CBC 4 : elog(DEBUG3, "discarding connection %p", entry->conn);
409 efujita 1283 GIC 4 : disconnect_pg_server(entry);
1284 : }
1285 : }
1286 : else
409 efujita 1287 ECB : {
1288 : /* Reset state to show we're out of a subtransaction */
409 efujita 1289 GIC 14 : entry->xact_depth--;
1290 : }
409 efujita 1291 GBC 1205 : }
1292 :
1293 : /*
2132 rhaas 1294 EUB : * Cancel the currently-in-progress query (whose query text we do not have)
1295 : * and ignore the result. Returns true if we successfully cancel the query
1296 : * and discard any pending result, and false if not.
1297 : *
1298 : * It's not a huge problem if we throw an ERROR here, but if we get into error
1299 : * recursion trouble, we'll end up slamming the connection shut, which will
1300 : * necessitate failing the entire toplevel transaction even if subtransactions
1301 : * were used. Try to use WARNING where we can.
1302 : *
1303 : * XXX: if the query was one sent by fetch_more_data_begin(), we could get the
1304 : * query text from the pendingAreq saved in the per-connection state, then
1305 : * report the query using it.
2132 rhaas 1306 ECB : */
1307 : static bool
2132 rhaas 1308 LBC 0 : pgfdw_cancel_query(PGconn *conn)
1309 : {
1310 : TimestampTz endtime;
1311 :
1312 : /*
1313 : * If it takes too long to cancel the query and discard the result, assume
1314 : * the connection is dead.
2132 rhaas 1315 ECB : */
3 efujita 1316 UNC 0 : endtime = TimestampTzPlusMilliseconds(GetCurrentTimestamp(),
1317 : CONNECTION_CLEANUP_TIMEOUT);
1318 :
1319 0 : if (!pgfdw_cancel_query_begin(conn))
1320 0 : return false;
1321 0 : return pgfdw_cancel_query_end(conn, endtime, false);
1322 : }
1323 :
1324 : static bool
1325 0 : pgfdw_cancel_query_begin(PGconn *conn)
1326 : {
1327 : PGcancel *cancel;
1328 : char errbuf[256];
2132 rhaas 1329 ECB :
1330 : /*
1331 : * Issue cancel request. Unfortunately, there's no good way to limit the
1332 : * amount of time that we might block inside PQgetCancel().
1333 : */
2132 rhaas 1334 LBC 0 : if ((cancel = PQgetCancel(conn)))
1335 : {
2132 rhaas 1336 UIC 0 : if (!PQcancel(cancel, errbuf, sizeof(errbuf)))
1337 : {
1338 0 : ereport(WARNING,
1339 : (errcode(ERRCODE_CONNECTION_FAILURE),
2132 rhaas 1340 ECB : errmsg("could not send cancel request: %s",
1341 : errbuf)));
2132 rhaas 1342 LBC 0 : PQfreeCancel(cancel);
2132 rhaas 1343 UIC 0 : return false;
1344 : }
1345 0 : PQfreeCancel(cancel);
1346 : }
1347 :
3 efujita 1348 UNC 0 : return true;
1349 : }
1350 :
1351 : static bool
1352 0 : pgfdw_cancel_query_end(PGconn *conn, TimestampTz endtime, bool consume_input)
1353 : {
1354 0 : PGresult *result = NULL;
1355 : bool timed_out;
1356 :
1357 : /*
1358 : * If requested, consume whatever data is available from the socket. (Note
1359 : * that if all data is available, this allows pgfdw_get_cleanup_result to
1360 : * call PQgetResult without forcing the overhead of WaitLatchOrSocket,
1361 : * which would be large compared to the overhead of PQconsumeInput.)
1362 : */
1363 0 : if (consume_input && !PQconsumeInput(conn))
1364 : {
1365 0 : ereport(WARNING,
1366 : (errcode(ERRCODE_CONNECTION_FAILURE),
1367 : errmsg("could not get result of cancel request: %s",
1368 : pchomp(PQerrorMessage(conn)))));
1369 0 : return false;
1370 : }
1371 :
1372 : /* Get and discard the result of the query. */
487 fujii 1373 UIC 0 : if (pgfdw_get_cleanup_result(conn, endtime, &result, &timed_out))
1374 : {
1375 0 : if (timed_out)
1376 0 : ereport(WARNING,
1377 : (errmsg("could not get result of cancel request due to timeout")));
1378 : else
1379 0 : ereport(WARNING,
1380 : (errcode(ERRCODE_CONNECTION_FAILURE),
1381 : errmsg("could not get result of cancel request: %s",
1382 : pchomp(PQerrorMessage(conn)))));
487 fujii 1383 EUB :
2132 rhaas 1384 UIC 0 : return false;
1385 : }
1386 0 : PQclear(result);
1387 :
1388 0 : return true;
1389 : }
1390 :
2132 rhaas 1391 EUB : /*
1392 : * Submit a query during (sub)abort cleanup and wait up to 30 seconds for the
1393 : * result. If the query is executed without error, the return value is true.
1394 : * If the query is executed successfully but returns an error, the return
1395 : * value is true if and only if ignore_errors is set. If the query can't be
1396 : * sent or times out, the return value is false.
1397 : *
1398 : * It's not a huge problem if we throw an ERROR here, but if we get into error
1399 : * recursion trouble, we'll end up slamming the connection shut, which will
543 efujita 1400 : * necessitate failing the entire toplevel transaction even if subtransactions
1401 : * were used. Try to use WARNING where we can.
1402 : */
1403 : static bool
2132 rhaas 1404 GIC 50 : pgfdw_exec_cleanup_query(PGconn *conn, const char *query, bool ignore_errors)
1405 : {
1406 : TimestampTz endtime;
2132 rhaas 1407 EUB :
1408 : /*
1409 : * If it takes too long to execute a cleanup query, assume the connection
1410 : * is dead. It's fairly likely that this is why we aborted in the first
1411 : * place (e.g. statement timeout, user cancel), so the timeout shouldn't
1412 : * be too long.
1413 : */
3 efujita 1414 GNC 50 : endtime = TimestampTzPlusMilliseconds(GetCurrentTimestamp(),
1415 : CONNECTION_CLEANUP_TIMEOUT);
2132 rhaas 1416 EUB :
3 efujita 1417 GNC 50 : if (!pgfdw_exec_cleanup_query_begin(conn, query))
3 efujita 1418 UNC 0 : return false;
3 efujita 1419 GNC 50 : return pgfdw_exec_cleanup_query_end(conn, query, endtime,
1420 : false, ignore_errors);
1421 : }
1422 :
1423 : static bool
1424 62 : pgfdw_exec_cleanup_query_begin(PGconn *conn, const char *query)
1425 : {
2132 rhaas 1426 EUB : /*
1427 : * Submit a query. Since we don't use non-blocking mode, this also can
1428 : * block. But its risk is relatively small, so we ignore that for now.
1429 : */
2132 rhaas 1430 GIC 62 : if (!PQsendQuery(conn, query))
2132 rhaas 1431 EUB : {
2132 rhaas 1432 UIC 0 : pgfdw_report_error(WARNING, NULL, conn, false, query);
1433 0 : return false;
1434 : }
2132 rhaas 1435 EUB :
3 efujita 1436 GNC 62 : return true;
1437 : }
1438 :
1439 : static bool
1440 62 : pgfdw_exec_cleanup_query_end(PGconn *conn, const char *query,
1441 : TimestampTz endtime, bool consume_input,
1442 : bool ignore_errors)
1443 : {
1444 62 : PGresult *result = NULL;
1445 : bool timed_out;
1446 :
1447 : /*
1448 : * If requested, consume whatever data is available from the socket. (Note
1449 : * that if all data is available, this allows pgfdw_get_cleanup_result to
1450 : * call PQgetResult without forcing the overhead of WaitLatchOrSocket,
1451 : * which would be large compared to the overhead of PQconsumeInput.)
1452 : */
1453 62 : if (consume_input && !PQconsumeInput(conn))
1454 : {
3 efujita 1455 UNC 0 : pgfdw_report_error(WARNING, NULL, conn, false, query);
1456 0 : return false;
1457 : }
1458 :
1459 : /* Get the result of the query. */
487 fujii 1460 GBC 62 : if (pgfdw_get_cleanup_result(conn, endtime, &result, &timed_out))
1461 : {
487 fujii 1462 UIC 0 : if (timed_out)
1463 0 : ereport(WARNING,
1464 : (errmsg("could not get query result due to timeout"),
1465 : query ? errcontext("remote SQL command: %s", query) : 0));
1466 : else
1467 0 : pgfdw_report_error(WARNING, NULL, conn, false, query);
1468 :
2132 rhaas 1469 UBC 0 : return false;
1470 : }
2132 rhaas 1471 EUB :
1472 : /* Issue a warning if not successful. */
2132 rhaas 1473 GIC 62 : if (PQresultStatus(result) != PGRES_COMMAND_OK)
1474 : {
2132 rhaas 1475 UBC 0 : pgfdw_report_error(WARNING, result, conn, true, query);
2132 rhaas 1476 UIC 0 : return ignore_errors;
1477 : }
2124 tgl 1478 GIC 62 : PQclear(result);
2132 rhaas 1479 EUB :
2132 rhaas 1480 GIC 62 : return true;
2132 rhaas 1481 EUB : }
1482 :
1483 : /*
1484 : * Get, during abort cleanup, the result of a query that is in progress. This
1485 : * might be a query that is being interrupted by transaction abort, or it might
1486 : * be a query that was initiated as part of transaction abort to get the remote
1487 : * side back to the appropriate state.
1488 : *
1489 : * endtime is the time at which we should give up and assume the remote
487 fujii 1490 : * side is dead. Returns true if the timeout expired or connection trouble
1491 : * occurred, false otherwise. Sets *result except in case of a timeout.
1492 : * Sets timed_out to true only when the timeout expired.
1493 : */
2132 rhaas 1494 : static bool
487 fujii 1495 GIC 62 : pgfdw_get_cleanup_result(PGconn *conn, TimestampTz endtime, PGresult **result,
1496 : bool *timed_out)
1497 : {
1498 62 : volatile bool failed = false;
2124 tgl 1499 62 : PGresult *volatile last_res = NULL;
1500 :
487 fujii 1501 62 : *timed_out = false;
1502 :
1503 : /* In what follows, do not leak any PGresults on an error. */
2124 tgl 1504 62 : PG_TRY();
1505 : {
1506 : for (;;)
2132 rhaas 1507 69 : {
1508 : PGresult *res;
1509 :
2124 tgl 1510 CBC 312 : while (PQisBusy(conn))
1511 : {
1512 : int wc;
2124 tgl 1513 GIC 50 : TimestampTz now = GetCurrentTimestamp();
1514 : long cur_timeout;
1515 :
1516 : /* If timeout has expired, give up, else get sleep time. */
880 1517 50 : cur_timeout = TimestampDifferenceMilliseconds(now, endtime);
1518 50 : if (cur_timeout <= 0)
1519 : {
487 fujii 1520 LBC 0 : *timed_out = true;
487 fujii 1521 UIC 0 : failed = true;
2124 tgl 1522 0 : goto exit;
2124 tgl 1523 ECB : }
2124 tgl 1524 EUB :
2124 tgl 1525 ECB : /* Sleep until there's something to do */
2124 tgl 1526 GIC 50 : wc = WaitLatchOrSocket(MyLatch,
1527 : WL_LATCH_SET | WL_SOCKET_READABLE |
1528 : WL_TIMEOUT | WL_EXIT_ON_PM_DEATH,
1529 : PQsocket(conn),
2124 tgl 1530 ECB : cur_timeout, PG_WAIT_EXTENSION);
2124 tgl 1531 GIC 50 : ResetLatch(MyLatch);
1532 :
1533 50 : CHECK_FOR_INTERRUPTS();
1534 :
1535 : /* Data available in socket? */
2124 tgl 1536 CBC 50 : if (wc & WL_SOCKET_READABLE)
1537 : {
2124 tgl 1538 GBC 50 : if (!PQconsumeInput(conn))
2124 tgl 1539 EUB : {
1540 : /* connection trouble */
487 fujii 1541 UIC 0 : failed = true;
2124 tgl 1542 LBC 0 : goto exit;
1543 : }
1544 : }
1545 : }
2132 rhaas 1546 ECB :
2124 tgl 1547 GIC 131 : res = PQgetResult(conn);
1548 131 : if (res == NULL)
1549 62 : break; /* query is complete */
2132 rhaas 1550 ECB :
2124 tgl 1551 GIC 69 : PQclear(last_res);
1552 69 : last_res = res;
1553 : }
1554 62 : exit: ;
1555 : }
2124 tgl 1556 UIC 0 : PG_CATCH();
1557 : {
2132 rhaas 1558 0 : PQclear(last_res);
2124 tgl 1559 LBC 0 : PG_RE_THROW();
1560 : }
2124 tgl 1561 GBC 62 : PG_END_TRY();
2132 rhaas 1562 EUB :
487 fujii 1563 GIC 62 : if (failed)
2124 tgl 1564 UIC 0 : PQclear(last_res);
1565 : else
2124 tgl 1566 CBC 62 : *result = last_res;
487 fujii 1567 GIC 62 : return failed;
2132 rhaas 1568 EUB : }
811 fujii 1569 :
1570 : /*
1571 : * Abort remote transaction or subtransaction.
1572 : *
564 1573 : * "toplevel" should be set to true if toplevel (main) transaction is
1574 : * rollbacked, false otherwise.
1575 : *
1576 : * Set entry->changing_xact_state to false on success, true on failure.
1577 : */
1578 : static void
380 efujita 1579 CBC 37 : pgfdw_abort_cleanup(ConnCacheEntry *entry, bool toplevel)
1580 : {
380 efujita 1581 EUB : char sql[100];
1582 :
1583 : /*
564 fujii 1584 ECB : * Don't try to clean up the connection if we're already in error
1585 : * recursion trouble.
1586 : */
564 fujii 1587 GIC 37 : if (in_error_recursion_trouble())
564 fujii 1588 UIC 0 : entry->changing_xact_state = true;
1589 :
1590 : /*
1591 : * If connection is already unsalvageable, don't touch it further.
1592 : */
564 fujii 1593 GIC 37 : if (entry->changing_xact_state)
1594 1 : return;
1595 :
1596 : /*
1597 : * Mark this connection as in the process of changing transaction state.
1598 : */
1599 36 : entry->changing_xact_state = true;
1600 :
564 fujii 1601 ECB : /* Assume we might have lost track of prepared statements */
564 fujii 1602 GIC 36 : entry->have_error = true;
1603 :
564 fujii 1604 ECB : /*
1605 : * If a command has been submitted to the remote server by using an
1606 : * asynchronous execution function, the command might not have yet
1607 : * completed. Check to see if a command is still being processed by the
1608 : * remote server, and if so, request cancellation of the command.
1609 : */
564 fujii 1610 CBC 36 : if (PQtransactionStatus(entry->conn) == PQTRANS_ACTIVE &&
564 fujii 1611 UIC 0 : !pgfdw_cancel_query(entry->conn))
1612 0 : return; /* Unable to cancel running query */
564 fujii 1613 ECB :
3 efujita 1614 GNC 36 : CONSTRUCT_ABORT_COMMAND(sql, entry, toplevel);
564 fujii 1615 GIC 36 : if (!pgfdw_exec_cleanup_query(entry->conn, sql, false))
380 efujita 1616 UIC 0 : return; /* Unable to abort remote (sub)transaction */
1617 :
564 fujii 1618 CBC 36 : if (toplevel)
564 fujii 1619 ECB : {
564 fujii 1620 GIC 33 : if (entry->have_prep_stmt && entry->have_error &&
564 fujii 1621 GBC 14 : !pgfdw_exec_cleanup_query(entry->conn,
564 fujii 1622 EUB : "DEALLOCATE ALL",
1623 : true))
564 fujii 1624 UIC 0 : return; /* Trouble clearing prepared statements */
1625 :
564 fujii 1626 GIC 33 : entry->have_prep_stmt = false;
564 fujii 1627 CBC 33 : entry->have_error = false;
1628 : }
1629 :
1630 : /*
1631 : * If pendingAreq of the per-connection state is not NULL, it means that
443 efujita 1632 ECB : * an asynchronous fetch begun by fetch_more_data_begin() was not done
1633 : * successfully and thus the per-connection state was not reset in
1634 : * fetch_more_data(); in that case reset the per-connection state here.
1635 : */
443 efujita 1636 GIC 36 : if (entry->state.pendingAreq)
443 efujita 1637 LBC 0 : memset(&entry->state, 0, sizeof(entry->state));
1638 :
564 fujii 1639 ECB : /* Disarm changing_xact_state if it all worked */
564 fujii 1640 GIC 36 : entry->changing_xact_state = false;
1641 : }
564 fujii 1642 EUB :
1643 : /*
1644 : * Like pgfdw_abort_cleanup, submit an abort command or cancel request, but
1645 : * don't wait for the result.
1646 : *
1647 : * Returns true if the abort command or cancel request is successfully issued,
1648 : * false otherwise. If the abort command is successfully issued, the given
1649 : * connection cache entry is appended to *pending_entries. Othewise, if the
1650 : * cancel request is successfully issued, it is appended to *cancel_requested.
1651 : */
1652 : static bool
3 efujita 1653 GNC 8 : pgfdw_abort_cleanup_begin(ConnCacheEntry *entry, bool toplevel,
1654 : List **pending_entries, List **cancel_requested)
1655 : {
1656 : /*
1657 : * Don't try to clean up the connection if we're already in error
1658 : * recursion trouble.
1659 : */
1660 8 : if (in_error_recursion_trouble())
3 efujita 1661 UNC 0 : entry->changing_xact_state = true;
1662 :
1663 : /*
1664 : * If connection is already unsalvageable, don't touch it further.
1665 : */
3 efujita 1666 GNC 8 : if (entry->changing_xact_state)
3 efujita 1667 UNC 0 : return false;
1668 :
1669 : /*
1670 : * Mark this connection as in the process of changing transaction state.
1671 : */
3 efujita 1672 GNC 8 : entry->changing_xact_state = true;
1673 :
1674 : /* Assume we might have lost track of prepared statements */
1675 8 : entry->have_error = true;
1676 :
1677 : /*
1678 : * If a command has been submitted to the remote server by using an
1679 : * asynchronous execution function, the command might not have yet
1680 : * completed. Check to see if a command is still being processed by the
1681 : * remote server, and if so, request cancellation of the command.
1682 : */
1683 8 : if (PQtransactionStatus(entry->conn) == PQTRANS_ACTIVE)
1684 : {
3 efujita 1685 UNC 0 : if (!pgfdw_cancel_query_begin(entry->conn))
1686 0 : return false; /* Unable to cancel running query */
1687 0 : *cancel_requested = lappend(*cancel_requested, entry);
1688 : }
1689 : else
1690 : {
1691 : char sql[100];
1692 :
3 efujita 1693 GNC 8 : CONSTRUCT_ABORT_COMMAND(sql, entry, toplevel);
1694 8 : if (!pgfdw_exec_cleanup_query_begin(entry->conn, sql))
3 efujita 1695 UNC 0 : return false; /* Unable to abort remote transaction */
3 efujita 1696 GNC 8 : *pending_entries = lappend(*pending_entries, entry);
1697 : }
1698 :
1699 8 : return true;
1700 : }
1701 :
409 efujita 1702 EUB : /*
1703 : * Finish pre-commit cleanup of connections on each of which we've sent a
1704 : * COMMIT command to the remote server.
1705 : */
1706 : static void
409 efujita 1707 CBC 13 : pgfdw_finish_pre_commit_cleanup(List *pending_entries)
409 efujita 1708 ECB : {
1709 : ConnCacheEntry *entry;
409 efujita 1710 GIC 13 : List *pending_deallocs = NIL;
409 efujita 1711 ECB : ListCell *lc;
1712 :
409 efujita 1713 GIC 13 : Assert(pending_entries);
409 efujita 1714 ECB :
1715 : /*
409 efujita 1716 EUB : * Get the result of the COMMIT command for each of the pending entries
1717 : */
409 efujita 1718 GBC 29 : foreach(lc, pending_entries)
409 efujita 1719 EUB : {
409 efujita 1720 GIC 16 : entry = (ConnCacheEntry *) lfirst(lc);
409 efujita 1721 ECB :
409 efujita 1722 GIC 16 : Assert(entry->changing_xact_state);
332 tgl 1723 ECB :
409 efujita 1724 EUB : /*
1725 : * We might already have received the result on the socket, so pass
409 efujita 1726 ECB : * consume_input=true to try to consume it first
1727 : */
409 efujita 1728 GIC 16 : do_sql_command_end(entry->conn, "COMMIT TRANSACTION", true);
1729 16 : entry->changing_xact_state = false;
1730 :
1731 : /* Do a DEALLOCATE ALL in parallel if needed */
1732 16 : if (entry->have_prep_stmt && entry->have_error)
1733 : {
1734 : /* Ignore errors (see notes in pgfdw_xact_callback) */
1735 2 : if (PQsendQuery(entry->conn, "DEALLOCATE ALL"))
1736 : {
1737 2 : pending_deallocs = lappend(pending_deallocs, entry);
1738 2 : continue;
409 efujita 1739 ECB : }
1740 : }
409 efujita 1741 GIC 14 : entry->have_prep_stmt = false;
1742 14 : entry->have_error = false;
1743 :
1744 14 : pgfdw_reset_xact_state(entry, true);
1745 : }
1746 :
409 efujita 1747 ECB : /* No further work if no pending entries */
409 efujita 1748 GBC 13 : if (!pending_deallocs)
409 efujita 1749 GIC 12 : return;
1750 :
1751 : /*
1752 : * Get the result of the DEALLOCATE command for each of the pending
409 efujita 1753 ECB : * entries
1754 : */
409 efujita 1755 GIC 3 : foreach(lc, pending_deallocs)
1756 : {
1757 : PGresult *res;
1758 :
409 efujita 1759 CBC 2 : entry = (ConnCacheEntry *) lfirst(lc);
1760 :
1761 : /* Ignore errors (see notes in pgfdw_xact_callback) */
1762 4 : while ((res = PQgetResult(entry->conn)) != NULL)
1763 : {
409 efujita 1764 GIC 2 : PQclear(res);
1765 : /* Stop if the connection is lost (else we'll loop infinitely) */
1766 2 : if (PQstatus(entry->conn) == CONNECTION_BAD)
409 efujita 1767 UIC 0 : break;
1768 : }
409 efujita 1769 GIC 2 : entry->have_prep_stmt = false;
409 efujita 1770 CBC 2 : entry->have_error = false;
409 efujita 1771 EUB :
409 efujita 1772 GBC 2 : pgfdw_reset_xact_state(entry, true);
1773 : }
409 efujita 1774 ECB : }
1775 :
409 efujita 1776 EUB : /*
1777 : * Finish pre-subcommit cleanup of connections on each of which we've sent a
409 efujita 1778 ECB : * RELEASE command to the remote server.
1779 : */
1780 : static void
409 efujita 1781 CBC 1 : pgfdw_finish_pre_subcommit_cleanup(List *pending_entries, int curlevel)
1782 : {
1783 : ConnCacheEntry *entry;
409 efujita 1784 EUB : char sql[100];
1785 : ListCell *lc;
409 efujita 1786 ECB :
409 efujita 1787 CBC 1 : Assert(pending_entries);
1788 :
1789 : /*
1790 : * Get the result of the RELEASE command for each of the pending entries
1791 : */
409 efujita 1792 GIC 1 : snprintf(sql, sizeof(sql), "RELEASE SAVEPOINT s%d", curlevel);
1793 3 : foreach(lc, pending_entries)
1794 : {
1795 2 : entry = (ConnCacheEntry *) lfirst(lc);
409 efujita 1796 ECB :
409 efujita 1797 GBC 2 : Assert(entry->changing_xact_state);
1798 :
1799 : /*
409 efujita 1800 ECB : * We might already have received the result on the socket, so pass
1801 : * consume_input=true to try to consume it first
1802 : */
409 efujita 1803 GIC 2 : do_sql_command_end(entry->conn, sql, true);
1804 2 : entry->changing_xact_state = false;
1805 :
1806 2 : pgfdw_reset_xact_state(entry, false);
1807 : }
1808 1 : }
1809 :
1810 : /*
1811 : * Finish abort cleanup of connections on each of which we've sent an abort
1812 : * command or cancel request to the remote server.
1813 : */
1814 : static void
3 efujita 1815 GNC 4 : pgfdw_finish_abort_cleanup(List *pending_entries, List *cancel_requested,
1816 : bool toplevel)
1817 : {
1818 4 : List *pending_deallocs = NIL;
1819 : ListCell *lc;
1820 :
1821 : /*
1822 : * For each of the pending cancel requests (if any), get and discard the
1823 : * result of the query, and submit an abort command to the remote server.
1824 : */
1825 4 : if (cancel_requested)
1826 : {
3 efujita 1827 UNC 0 : foreach(lc, cancel_requested)
1828 : {
1829 0 : ConnCacheEntry *entry = (ConnCacheEntry *) lfirst(lc);
1830 : TimestampTz endtime;
1831 : char sql[100];
1832 :
1833 0 : Assert(entry->changing_xact_state);
1834 :
1835 : /*
1836 : * Set end time. You might think we should do this before issuing
1837 : * cancel request like in normal mode, but that is problematic,
1838 : * because if, for example, it took longer than 30 seconds to
1839 : * process the first few entries in the cancel_requested list, it
1840 : * would cause a timeout error when processing each of the
1841 : * remaining entries in the list, leading to slamming that entry's
1842 : * connection shut.
1843 : */
1844 0 : endtime = TimestampTzPlusMilliseconds(GetCurrentTimestamp(),
1845 : CONNECTION_CLEANUP_TIMEOUT);
1846 :
1847 0 : if (!pgfdw_cancel_query_end(entry->conn, endtime, true))
1848 : {
1849 : /* Unable to cancel running query */
1850 0 : pgfdw_reset_xact_state(entry, toplevel);
1851 0 : continue;
1852 : }
1853 :
1854 : /* Send an abort command in parallel if needed */
1855 0 : CONSTRUCT_ABORT_COMMAND(sql, entry, toplevel);
1856 0 : if (!pgfdw_exec_cleanup_query_begin(entry->conn, sql))
1857 : {
1858 : /* Unable to abort remote (sub)transaction */
1859 0 : pgfdw_reset_xact_state(entry, toplevel);
1860 : }
1861 : else
1862 0 : pending_entries = lappend(pending_entries, entry);
1863 : }
1864 : }
1865 :
1866 : /* No further work if no pending entries */
3 efujita 1867 GNC 4 : if (!pending_entries)
3 efujita 1868 UNC 0 : return;
1869 :
1870 : /*
1871 : * Get the result of the abort command for each of the pending entries
1872 : */
3 efujita 1873 GNC 12 : foreach(lc, pending_entries)
1874 : {
1875 8 : ConnCacheEntry *entry = (ConnCacheEntry *) lfirst(lc);
1876 : TimestampTz endtime;
1877 : char sql[100];
1878 :
1879 8 : Assert(entry->changing_xact_state);
1880 :
1881 : /*
1882 : * Set end time. We do this now, not before issuing the command like
1883 : * in normal mode, for the same reason as for the cancel_requested
1884 : * entries.
1885 : */
1886 8 : endtime = TimestampTzPlusMilliseconds(GetCurrentTimestamp(),
1887 : CONNECTION_CLEANUP_TIMEOUT);
1888 :
1889 8 : CONSTRUCT_ABORT_COMMAND(sql, entry, toplevel);
1890 8 : if (!pgfdw_exec_cleanup_query_end(entry->conn, sql, endtime,
1891 : true, false))
1892 : {
1893 : /* Unable to abort remote (sub)transaction */
3 efujita 1894 UNC 0 : pgfdw_reset_xact_state(entry, toplevel);
3 efujita 1895 GNC 4 : continue;
1896 : }
1897 :
1898 8 : if (toplevel)
1899 : {
1900 : /* Do a DEALLOCATE ALL in parallel if needed */
1901 4 : if (entry->have_prep_stmt && entry->have_error)
1902 : {
1903 4 : if (!pgfdw_exec_cleanup_query_begin(entry->conn,
1904 : "DEALLOCATE ALL"))
1905 : {
1906 : /* Trouble clearing prepared statements */
3 efujita 1907 UNC 0 : pgfdw_reset_xact_state(entry, toplevel);
1908 : }
1909 : else
3 efujita 1910 GNC 4 : pending_deallocs = lappend(pending_deallocs, entry);
1911 4 : continue;
1912 : }
3 efujita 1913 UNC 0 : entry->have_prep_stmt = false;
1914 0 : entry->have_error = false;
1915 : }
1916 :
1917 : /* Reset the per-connection state if needed */
3 efujita 1918 GNC 4 : if (entry->state.pendingAreq)
3 efujita 1919 UNC 0 : memset(&entry->state, 0, sizeof(entry->state));
1920 :
1921 : /* We're done with this entry; unset the changing_xact_state flag */
3 efujita 1922 GNC 4 : entry->changing_xact_state = false;
1923 4 : pgfdw_reset_xact_state(entry, toplevel);
1924 : }
1925 :
1926 : /* No further work if no pending entries */
1927 4 : if (!pending_deallocs)
1928 2 : return;
1929 2 : Assert(toplevel);
1930 :
1931 : /*
1932 : * Get the result of the DEALLOCATE command for each of the pending
1933 : * entries
1934 : */
1935 6 : foreach(lc, pending_deallocs)
1936 : {
1937 4 : ConnCacheEntry *entry = (ConnCacheEntry *) lfirst(lc);
1938 : TimestampTz endtime;
1939 :
1940 4 : Assert(entry->changing_xact_state);
1941 4 : Assert(entry->have_prep_stmt);
1942 4 : Assert(entry->have_error);
1943 :
1944 : /*
1945 : * Set end time. We do this now, not before issuing the command like
1946 : * in normal mode, for the same reason as for the cancel_requested
1947 : * entries.
1948 : */
1949 4 : endtime = TimestampTzPlusMilliseconds(GetCurrentTimestamp(),
1950 : CONNECTION_CLEANUP_TIMEOUT);
1951 :
1952 4 : if (!pgfdw_exec_cleanup_query_end(entry->conn, "DEALLOCATE ALL",
1953 : endtime, true, true))
1954 : {
1955 : /* Trouble clearing prepared statements */
3 efujita 1956 UNC 0 : pgfdw_reset_xact_state(entry, toplevel);
1957 0 : continue;
1958 : }
3 efujita 1959 GNC 4 : entry->have_prep_stmt = false;
1960 4 : entry->have_error = false;
1961 :
1962 : /* Reset the per-connection state if needed */
1963 4 : if (entry->state.pendingAreq)
3 efujita 1964 UNC 0 : memset(&entry->state, 0, sizeof(entry->state));
1965 :
1966 : /* We're done with this entry; unset the changing_xact_state flag */
3 efujita 1967 GNC 4 : entry->changing_xact_state = false;
1968 4 : pgfdw_reset_xact_state(entry, toplevel);
1969 : }
1970 : }
1971 :
1972 : /*
1973 : * List active foreign server connections.
1974 : *
811 fujii 1975 ECB : * This function takes no input parameter and returns setof record made of
1976 : * following values:
1977 : * - server_name - server name of active connection. In case the foreign server
1978 : * is dropped but still the connection is active, then the server name will
1979 : * be NULL in output.
1980 : * - valid - true/false representing whether the connection is valid or not.
1981 : * Note that the connections can get invalidated in pgfdw_inval_callback.
1982 : *
811 fujii 1983 EUB : * No records are returned when there are no cached connections at all.
1984 : */
1985 : Datum
811 fujii 1986 GIC 11 : postgres_fdw_get_connections(PG_FUNCTION_ARGS)
1987 : {
811 fujii 1988 ECB : #define POSTGRES_FDW_GET_CONNECTIONS_COLS 2
811 fujii 1989 GBC 11 : ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
1990 : HASH_SEQ_STATUS scan;
1991 : ConnCacheEntry *entry;
1992 :
173 michael 1993 GIC 11 : InitMaterializedSRF(fcinfo, 0);
811 fujii 1994 ECB :
1995 : /* If cache doesn't exist, we return no records */
811 fujii 1996 GIC 11 : if (!ConnectionHash)
811 fujii 1997 LBC 0 : PG_RETURN_VOID();
1998 :
811 fujii 1999 GIC 11 : hash_seq_init(&scan, ConnectionHash);
2000 82 : while ((entry = (ConnCacheEntry *) hash_seq_search(&scan)))
2001 : {
2002 : ForeignServer *server;
267 peter 2003 GNC 71 : Datum values[POSTGRES_FDW_GET_CONNECTIONS_COLS] = {0};
2004 71 : bool nulls[POSTGRES_FDW_GET_CONNECTIONS_COLS] = {0};
811 fujii 2005 ECB :
2006 : /* We only look for open remote connections */
811 fujii 2007 GBC 71 : if (!entry->conn)
2008 60 : continue;
811 fujii 2009 EUB :
811 fujii 2010 GIC 11 : server = GetForeignServerExtended(entry->serverid, FSV_MISSING_OK);
2011 :
811 fujii 2012 ECB : /*
2013 : * The foreign server may have been dropped in current explicit
811 fujii 2014 EUB : * transaction. It is not possible to drop the server from another
811 fujii 2015 ECB : * session when the connection associated with it is in use in the
2016 : * current transaction, if tried so, the drop query in another session
2017 : * blocks until the current transaction finishes.
2018 : *
2019 : * Even though the server is dropped in the current transaction, the
2020 : * cache can still have associated active connection entry, say we
2021 : * call such connections dangling. Since we can not fetch the server
2022 : * name from system catalogs for dangling connections, instead we show
2023 : * NULL value for server name in output.
2024 : *
2025 : * We could have done better by storing the server name in the cache
2026 : * entry instead of server oid so that it could be used in the output.
2027 : * But the server name in each cache entry requires 64 bytes of
2028 : * memory, which is huge, when there are many cached connections and
2029 : * the use case i.e. dropping the foreign server within the explicit
2030 : * current transaction seems rare. So, we chose to show NULL value for
2031 : * server name in output.
2032 : *
2033 : * Such dangling connections get closed either in next use or at the
2034 : * end of current explicit transaction in pgfdw_xact_callback.
2035 : */
811 fujii 2036 GIC 11 : if (!server)
811 fujii 2037 ECB : {
2038 : /*
2039 : * If the server has been dropped in the current explicit
2040 : * transaction, then this entry would have been invalidated in
803 2041 : * pgfdw_inval_callback at the end of drop server command. Note
2042 : * that this connection would not have been closed in
2043 : * pgfdw_inval_callback because it is still being used in the
2044 : * current explicit transaction. So, assert that here.
2045 : */
811 fujii 2046 GIC 1 : Assert(entry->conn && entry->xact_depth > 0 && entry->invalidated);
811 fujii 2047 ECB :
2048 : /* Show null, if no server name was found */
811 fujii 2049 GIC 1 : nulls[0] = true;
2050 : }
811 fujii 2051 ECB : else
811 fujii 2052 GIC 10 : values[0] = CStringGetTextDatum(server->servername);
2053 :
811 fujii 2054 CBC 11 : values[1] = BoolGetDatum(!entry->invalidated);
2055 :
397 michael 2056 11 : tuplestore_putvalues(rsinfo->setResult, rsinfo->setDesc, values, nulls);
811 fujii 2057 ECB : }
2058 :
811 fujii 2059 GIC 11 : PG_RETURN_VOID();
811 fujii 2060 ECB : }
803 2061 :
2062 : /*
2063 : * Disconnect the specified cached connections.
2064 : *
2065 : * This function discards the open connections that are established by
2066 : * postgres_fdw from the local session to the foreign server with
2067 : * the given name. Note that there can be multiple connections to
2068 : * the given server using different user mappings. If the connections
2069 : * are used in the current local transaction, they are not disconnected
2070 : * and warning messages are reported. This function returns true
2071 : * if it disconnects at least one connection, otherwise false. If no
2072 : * foreign server with the given name is found, an error is reported.
2073 : */
2074 : Datum
803 fujii 2075 GIC 4 : postgres_fdw_disconnect(PG_FUNCTION_ARGS)
2076 : {
2077 : ForeignServer *server;
803 fujii 2078 ECB : char *servername;
2079 :
803 fujii 2080 GIC 4 : servername = text_to_cstring(PG_GETARG_TEXT_PP(0));
803 fujii 2081 CBC 4 : server = GetForeignServerByName(servername, false);
2082 :
2083 3 : PG_RETURN_BOOL(disconnect_cached_connections(server->serverid));
2084 : }
803 fujii 2085 ECB :
803 fujii 2086 EUB : /*
2087 : * Disconnect all the cached connections.
803 fujii 2088 ECB : *
2089 : * This function discards all the open connections that are established by
2090 : * postgres_fdw from the local session to the foreign servers.
2091 : * If the connections are used in the current local transaction, they are
2092 : * not disconnected and warning messages are reported. This function
2093 : * returns true if it disconnects at least one connection, otherwise false.
2094 : */
2095 : Datum
803 fujii 2096 GIC 4 : postgres_fdw_disconnect_all(PG_FUNCTION_ARGS)
2097 : {
2098 4 : PG_RETURN_BOOL(disconnect_cached_connections(InvalidOid));
2099 : }
803 fujii 2100 ECB :
2101 : /*
2102 : * Workhorse to disconnect cached connections.
2103 : *
2104 : * This function scans all the connection cache entries and disconnects
2105 : * the open connections whose foreign server OID matches with
2106 : * the specified one. If InvalidOid is specified, it disconnects all
2107 : * the cached connections.
2108 : *
2109 : * This function emits a warning for each connection that's used in
2110 : * the current transaction and doesn't close it. It returns true if
2111 : * it disconnects at least one connection, otherwise false.
2112 : *
2113 : * Note that this function disconnects even the connections that are
2114 : * established by other users in the same local session using different
2115 : * user mappings. This leads even non-superuser to be able to close
2116 : * the connections established by superusers in the same local session.
2117 : *
2118 : * XXX As of now we don't see any security risk doing this. But we should
2119 : * set some restrictions on that, for example, prevent non-superuser
2120 : * from closing the connections established by superusers even
2121 : * in the same session?
2122 : */
2123 : static bool
803 fujii 2124 GIC 7 : disconnect_cached_connections(Oid serverid)
803 fujii 2125 ECB : {
2126 : HASH_SEQ_STATUS scan;
2127 : ConnCacheEntry *entry;
803 fujii 2128 GIC 7 : bool all = !OidIsValid(serverid);
2129 7 : bool result = false;
2130 :
2131 : /*
2132 : * Connection cache hashtable has not been initialized yet in this
2133 : * session, so return false.
803 fujii 2134 ECB : */
803 fujii 2135 GIC 7 : if (!ConnectionHash)
803 fujii 2136 UIC 0 : return false;
803 fujii 2137 ECB :
803 fujii 2138 GIC 7 : hash_seq_init(&scan, ConnectionHash);
2139 50 : while ((entry = (ConnCacheEntry *) hash_seq_search(&scan)))
2140 : {
2141 : /* Ignore cache entry if no open connection right now. */
2142 43 : if (!entry->conn)
2143 32 : continue;
803 fujii 2144 ECB :
803 fujii 2145 GIC 11 : if (all || entry->serverid == serverid)
803 fujii 2146 EUB : {
2147 : /*
2148 : * Emit a warning because the connection to close is used in the
2149 : * current transaction and cannot be disconnected right now.
2150 : */
803 fujii 2151 GIC 8 : if (entry->xact_depth > 0)
803 fujii 2152 EUB : {
2153 : ForeignServer *server;
2154 :
803 fujii 2155 GIC 3 : server = GetForeignServerExtended(entry->serverid,
2156 : FSV_MISSING_OK);
2157 :
2158 3 : if (!server)
2159 : {
2160 : /*
2161 : * If the foreign server was dropped while its connection
2162 : * was used in the current transaction, the connection
803 fujii 2163 EUB : * must have been marked as invalid by
2164 : * pgfdw_inval_callback at the end of DROP SERVER command.
2165 : */
803 fujii 2166 UBC 0 : Assert(entry->invalidated);
2167 :
803 fujii 2168 UIC 0 : ereport(WARNING,
803 fujii 2169 EUB : (errmsg("cannot close dropped server connection because it is still in use")));
2170 : }
2171 : else
803 fujii 2172 GIC 3 : ereport(WARNING,
2173 : (errmsg("cannot close connection for server \"%s\" because it is still in use",
803 fujii 2174 EUB : server->servername)));
2175 : }
2176 : else
2177 : {
803 fujii 2178 GBC 5 : elog(DEBUG3, "discarding connection %p", entry->conn);
803 fujii 2179 GIC 5 : disconnect_pg_server(entry);
2180 5 : result = true;
803 fujii 2181 EUB : }
2182 : }
2183 : }
2184 :
803 fujii 2185 GIC 7 : return result;
803 fujii 2186 ECB : }
|