Age Owner Branch data TLA Line data Source code
1 : : /*-------------------------------------------------------------------------
2 : : *
3 : : * connection.c
4 : : * Connection management functions for postgres_fdw
5 : : *
6 : : * Portions Copyright (c) 2012-2024, PostgreSQL Global Development Group
7 : : *
8 : : * IDENTIFICATION
9 : : * contrib/postgres_fdw/connection.c
10 : : *
11 : : *-------------------------------------------------------------------------
12 : : */
13 : : #include "postgres.h"
14 : :
15 : : #include "access/htup_details.h"
16 : : #include "access/xact.h"
17 : : #include "catalog/pg_user_mapping.h"
18 : : #include "commands/defrem.h"
19 : : #include "funcapi.h"
20 : : #include "libpq/libpq-be.h"
21 : : #include "libpq/libpq-be-fe-helpers.h"
22 : : #include "mb/pg_wchar.h"
23 : : #include "miscadmin.h"
24 : : #include "pgstat.h"
25 : : #include "postgres_fdw.h"
26 : : #include "storage/fd.h"
27 : : #include "storage/latch.h"
28 : : #include "utils/builtins.h"
29 : : #include "utils/datetime.h"
30 : : #include "utils/hsearch.h"
31 : : #include "utils/inval.h"
32 : : #include "utils/memutils.h"
33 : : #include "utils/syscache.h"
34 : :
35 : : /*
36 : : * Connection cache hash table entry
37 : : *
38 : : * The lookup key in this hash table is the user mapping OID. We use just one
39 : : * connection per user mapping ID, which ensures that all the scans use the
40 : : * same snapshot during a query. Using the user mapping OID rather than
41 : : * the foreign server OID + user OID avoids creating multiple connections when
42 : : * the public user mapping applies to all user OIDs.
43 : : *
44 : : * The "conn" pointer can be NULL if we don't currently have a live connection.
45 : : * When we do have a connection, xact_depth tracks the current depth of
46 : : * transactions and subtransactions open on the remote side. We need to issue
47 : : * commands at the same nesting depth on the remote as we're executing at
48 : : * ourselves, so that rolling back a subtransaction will kill the right
49 : : * queries and not the wrong ones.
50 : : */
51 : : typedef Oid ConnCacheKey;
52 : :
53 : : typedef struct ConnCacheEntry
54 : : {
55 : : ConnCacheKey key; /* hash key (must be first) */
56 : : PGconn *conn; /* connection to foreign server, or NULL */
57 : : /* Remaining fields are invalid when conn is NULL: */
58 : : int xact_depth; /* 0 = no xact open, 1 = main xact open, 2 =
59 : : * one level of subxact open, etc */
60 : : bool have_prep_stmt; /* have we prepared any stmts in this xact? */
61 : : bool have_error; /* have any subxacts aborted in this xact? */
62 : : bool changing_xact_state; /* xact state change in process */
63 : : bool parallel_commit; /* do we commit (sub)xacts in parallel? */
64 : : bool parallel_abort; /* do we abort (sub)xacts in parallel? */
65 : : bool invalidated; /* true if reconnect is pending */
66 : : bool keep_connections; /* setting value of keep_connections
67 : : * server option */
68 : : Oid serverid; /* foreign server OID used to get server name */
69 : : uint32 server_hashvalue; /* hash value of foreign server OID */
70 : : uint32 mapping_hashvalue; /* hash value of user mapping OID */
71 : : PgFdwConnState state; /* extra per-connection state */
72 : : } ConnCacheEntry;
73 : :
74 : : /*
75 : : * Connection cache (initialized on first use)
76 : : */
77 : : static HTAB *ConnectionHash = NULL;
78 : :
79 : : /* for assigning cursor numbers and prepared statement numbers */
80 : : static unsigned int cursor_number = 0;
81 : : static unsigned int prep_stmt_number = 0;
82 : :
83 : : /* tracks whether any work is needed in callback functions */
84 : : static bool xact_got_connection = false;
85 : :
86 : : /* custom wait event values, retrieved from shared memory */
87 : : static uint32 pgfdw_we_cleanup_result = 0;
88 : : static uint32 pgfdw_we_connect = 0;
89 : : static uint32 pgfdw_we_get_result = 0;
90 : :
91 : : /*
92 : : * Milliseconds to wait to cancel an in-progress query or execute a cleanup
93 : : * query; if it takes longer than 30 seconds to do these, we assume the
94 : : * connection is dead.
95 : : */
96 : : #define CONNECTION_CLEANUP_TIMEOUT 30000
97 : :
98 : : /* Macro for constructing abort command to be sent */
99 : : #define CONSTRUCT_ABORT_COMMAND(sql, entry, toplevel) \
100 : : do { \
101 : : if (toplevel) \
102 : : snprintf((sql), sizeof(sql), \
103 : : "ABORT TRANSACTION"); \
104 : : else \
105 : : snprintf((sql), sizeof(sql), \
106 : : "ROLLBACK TO SAVEPOINT s%d; RELEASE SAVEPOINT s%d", \
107 : : (entry)->xact_depth, (entry)->xact_depth); \
108 : : } while(0)
109 : :
110 : : /*
111 : : * SQL functions
112 : : */
1182 fujii@postgresql.org 113 :CBC 3 : PG_FUNCTION_INFO_V1(postgres_fdw_get_connections);
1174 114 : 3 : PG_FUNCTION_INFO_V1(postgres_fdw_disconnect);
115 : 3 : PG_FUNCTION_INFO_V1(postgres_fdw_disconnect_all);
116 : :
117 : : /* prototypes of private functions */
118 : : static void make_new_connection(ConnCacheEntry *entry, UserMapping *user);
119 : : static PGconn *connect_pg_server(ForeignServer *server, UserMapping *user);
120 : : static void disconnect_pg_server(ConnCacheEntry *entry);
121 : : static void check_conn_params(const char **keywords, const char **values, UserMapping *user);
122 : : static void configure_remote_session(PGconn *conn);
123 : : static void do_sql_command_begin(PGconn *conn, const char *sql);
124 : : static void do_sql_command_end(PGconn *conn, const char *sql,
125 : : bool consume_input);
126 : : static void begin_remote_xact(ConnCacheEntry *entry);
127 : : static void pgfdw_xact_callback(XactEvent event, void *arg);
128 : : static void pgfdw_subxact_callback(SubXactEvent event,
129 : : SubTransactionId mySubid,
130 : : SubTransactionId parentSubid,
131 : : void *arg);
132 : : static void pgfdw_inval_callback(Datum arg, int cacheid, uint32 hashvalue);
133 : : static void pgfdw_reject_incomplete_xact_state_change(ConnCacheEntry *entry);
134 : : static void pgfdw_reset_xact_state(ConnCacheEntry *entry, bool toplevel);
135 : : static bool pgfdw_cancel_query(PGconn *conn);
136 : : static bool pgfdw_cancel_query_begin(PGconn *conn, TimestampTz endtime);
137 : : static bool pgfdw_cancel_query_end(PGconn *conn, TimestampTz endtime,
138 : : bool consume_input);
139 : : static bool pgfdw_exec_cleanup_query(PGconn *conn, const char *query,
140 : : bool ignore_errors);
141 : : static bool pgfdw_exec_cleanup_query_begin(PGconn *conn, const char *query);
142 : : static bool pgfdw_exec_cleanup_query_end(PGconn *conn, const char *query,
143 : : TimestampTz endtime,
144 : : bool consume_input,
145 : : bool ignore_errors);
146 : : static bool pgfdw_get_cleanup_result(PGconn *conn, TimestampTz endtime,
147 : : PGresult **result, bool *timed_out);
148 : : static void pgfdw_abort_cleanup(ConnCacheEntry *entry, bool toplevel);
149 : : static bool pgfdw_abort_cleanup_begin(ConnCacheEntry *entry, bool toplevel,
150 : : List **pending_entries,
151 : : List **cancel_requested);
152 : : static void pgfdw_finish_pre_commit_cleanup(List *pending_entries);
153 : : static void pgfdw_finish_pre_subcommit_cleanup(List *pending_entries,
154 : : int curlevel);
155 : : static void pgfdw_finish_abort_cleanup(List *pending_entries,
156 : : List *cancel_requested,
157 : : bool toplevel);
158 : : static void pgfdw_security_check(const char **keywords, const char **values,
159 : : UserMapping *user, PGconn *conn);
160 : : static bool UserMappingPasswordRequired(UserMapping *user);
161 : : static bool disconnect_cached_connections(Oid serverid);
162 : :
163 : : /*
164 : : * Get a PGconn which can be used to execute queries on the remote PostgreSQL
165 : : * server with the user's authorization. A new connection is established
166 : : * if we don't already have a suitable one, and a transaction is opened at
167 : : * the right subtransaction nesting depth if we didn't do that already.
168 : : *
169 : : * will_prep_stmt must be true if caller intends to create any prepared
170 : : * statements. Since those don't go away automatically at transaction end
171 : : * (not even on error), we need this flag to cue manual cleanup.
172 : : *
173 : : * If state is not NULL, *state receives the per-connection state associated
174 : : * with the PGconn.
175 : : */
176 : : PGconn *
1110 efujita@postgresql.o 177 : 2105 : GetConnection(UserMapping *user, bool will_prep_stmt, PgFdwConnState **state)
178 : : {
179 : : bool found;
1276 fujii@postgresql.org 180 : 2105 : bool retry = false;
181 : : ConnCacheEntry *entry;
182 : : ConnCacheKey key;
183 : 2105 : MemoryContext ccxt = CurrentMemoryContext;
184 : :
185 : : /* First time through, initialize connection cache hashtable */
4070 tgl@sss.pgh.pa.us 186 [ + + ]: 2105 : if (ConnectionHash == NULL)
187 : : {
188 : : HASHCTL ctl;
189 : :
97 noah@leadboat.com 190 [ + - ]:GNC 9 : if (pgfdw_we_get_result == 0)
191 : 9 : pgfdw_we_get_result =
192 : 9 : WaitEventExtensionNew("PostgresFdwGetResult");
193 : :
4070 tgl@sss.pgh.pa.us 194 :CBC 9 : ctl.keysize = sizeof(ConnCacheKey);
195 : 9 : ctl.entrysize = sizeof(ConnCacheEntry);
196 : 9 : ConnectionHash = hash_create("postgres_fdw connections", 8,
197 : : &ctl,
198 : : HASH_ELEM | HASH_BLOBS);
199 : :
200 : : /*
201 : : * Register some callback functions that manage connection cleanup.
202 : : * This should be done just once in each backend.
203 : : */
204 : 9 : RegisterXactCallback(pgfdw_xact_callback, NULL);
205 : 9 : RegisterSubXactCallback(pgfdw_subxact_callback, NULL);
2459 206 : 9 : CacheRegisterSyscacheCallback(FOREIGNSERVEROID,
207 : : pgfdw_inval_callback, (Datum) 0);
208 : 9 : CacheRegisterSyscacheCallback(USERMAPPINGOID,
209 : : pgfdw_inval_callback, (Datum) 0);
210 : : }
211 : :
212 : : /* Set flag that we did GetConnection during the current transaction */
4070 213 : 2105 : xact_got_connection = true;
214 : :
215 : : /* Create hash key for the entry. Assume no pad bytes in key struct */
2999 rhaas@postgresql.org 216 : 2105 : key = user->umid;
217 : :
218 : : /*
219 : : * Find or create cached entry for requested connection.
220 : : */
4070 tgl@sss.pgh.pa.us 221 : 2105 : entry = hash_search(ConnectionHash, &key, HASH_ENTER, &found);
222 [ + + ]: 2105 : if (!found)
223 : : {
224 : : /*
225 : : * We need only clear "conn" here; remaining fields will be filled
226 : : * later when "conn" is set.
227 : : */
228 : 17 : entry->conn = NULL;
229 : : }
230 : :
231 : : /* Reject further use of connections which failed abort cleanup. */
2503 rhaas@postgresql.org 232 : 2105 : pgfdw_reject_incomplete_xact_state_change(entry);
233 : :
234 : : /*
235 : : * If the connection needs to be remade due to invalidation, disconnect as
236 : : * soon as we're out of all transactions.
237 : : */
1276 fujii@postgresql.org 238 [ + + - + : 2105 : if (entry->conn != NULL && entry->invalidated && entry->xact_depth == 0)
- - ]
239 : : {
1276 fujii@postgresql.org 240 [ # # ]:UBC 0 : elog(DEBUG3, "closing connection %p for option changes to take effect",
241 : : entry->conn);
2459 tgl@sss.pgh.pa.us 242 : 0 : disconnect_pg_server(entry);
243 : : }
244 : :
245 : : /*
246 : : * If cache entry doesn't have a connection, we have to establish a new
247 : : * connection. (If connect_pg_server throws an error, the cache entry
248 : : * will remain in a valid empty state, ie conn == NULL.)
249 : : */
4070 tgl@sss.pgh.pa.us 250 [ + + ]:CBC 2105 : if (entry->conn == NULL)
1276 fujii@postgresql.org 251 : 69 : make_new_connection(entry, user);
252 : :
253 : : /*
254 : : * We check the health of the cached connection here when using it. In
255 : : * cases where we're out of all transactions, if a broken connection is
256 : : * detected, we try to reestablish a new connection later.
257 : : */
1286 258 [ + + ]: 2096 : PG_TRY();
259 : : {
260 : : /* Process a pending asynchronous request if any. */
1110 efujita@postgresql.o 261 [ - + ]: 2096 : if (entry->state.pendingAreq)
1110 efujita@postgresql.o 262 :UBC 0 : process_pending_request(entry->state.pendingAreq);
263 : : /* Start a new transaction or subtransaction if needed. */
1286 fujii@postgresql.org 264 :CBC 2096 : begin_remote_xact(entry);
265 : : }
266 : 2 : PG_CATCH();
267 : : {
1276 268 : 2 : MemoryContext ecxt = MemoryContextSwitchTo(ccxt);
269 : 2 : ErrorData *errdata = CopyErrorData();
270 : :
271 : : /*
272 : : * Determine whether to try to reestablish the connection.
273 : : *
274 : : * After a broken connection is detected in libpq, any error other
275 : : * than connection failure (e.g., out-of-memory) can be thrown
276 : : * somewhere between return from libpq and the expected ereport() call
277 : : * in pgfdw_report_error(). In this case, since PQstatus() indicates
278 : : * CONNECTION_BAD, checking only PQstatus() causes the false detection
279 : : * of connection failure. To avoid this, we also verify that the
280 : : * error's sqlstate is ERRCODE_CONNECTION_FAILURE. Note that also
281 : : * checking only the sqlstate can cause another false detection
282 : : * because pgfdw_report_error() may report ERRCODE_CONNECTION_FAILURE
283 : : * for any libpq-originated error condition.
284 : : */
285 [ + - ]: 2 : if (errdata->sqlerrcode != ERRCODE_CONNECTION_FAILURE ||
286 [ + - ]: 2 : PQstatus(entry->conn) != CONNECTION_BAD ||
287 [ + + ]: 2 : entry->xact_depth > 0)
288 : : {
289 : 1 : MemoryContextSwitchTo(ecxt);
1286 290 : 1 : PG_RE_THROW();
291 : : }
292 : :
293 : : /* Clean up the error state */
1276 294 : 1 : FlushErrorState();
295 : 1 : FreeErrorData(errdata);
296 : 1 : errdata = NULL;
297 : :
298 : 1 : retry = true;
299 : : }
1286 300 [ - + ]: 2095 : PG_END_TRY();
301 : :
302 : : /*
303 : : * If a broken connection is detected, disconnect it, reestablish a new
304 : : * connection and retry a new remote transaction. If connection failure is
305 : : * reported again, we give up getting a connection.
306 : : */
1276 307 [ + + ]: 2095 : if (retry)
308 : : {
309 [ - + ]: 1 : Assert(entry->xact_depth == 0);
310 : :
1286 311 [ - + ]: 1 : ereport(DEBUG3,
312 : : (errmsg_internal("could not start remote transaction on connection %p",
313 : : entry->conn)),
314 : : errdetail_internal("%s", pchomp(PQerrorMessage(entry->conn))));
315 : :
1276 316 [ - + ]: 1 : elog(DEBUG3, "closing connection %p to reestablish a new one",
317 : : entry->conn);
318 : 1 : disconnect_pg_server(entry);
319 : :
394 efujita@postgresql.o 320 : 1 : make_new_connection(entry, user);
321 : :
1276 fujii@postgresql.org 322 : 1 : begin_remote_xact(entry);
323 : : }
324 : :
325 : : /* Remember if caller will prepare statements */
4053 tgl@sss.pgh.pa.us 326 : 2095 : entry->have_prep_stmt |= will_prep_stmt;
327 : :
328 : : /* If caller needs access to the per-connection state, return it. */
1110 efujita@postgresql.o 329 [ + + ]: 2095 : if (state)
330 : 712 : *state = &entry->state;
331 : :
4070 tgl@sss.pgh.pa.us 332 : 2095 : return entry->conn;
333 : : }
334 : :
335 : : /*
336 : : * Reset all transient state fields in the cached connection entry and
337 : : * establish new connection to the remote server.
338 : : */
339 : : static void
1276 fujii@postgresql.org 340 : 70 : make_new_connection(ConnCacheEntry *entry, UserMapping *user)
341 : : {
342 : 70 : ForeignServer *server = GetForeignServer(user->serverid);
343 : : ListCell *lc;
344 : :
345 [ - + ]: 70 : Assert(entry->conn == NULL);
346 : :
347 : : /* Reset all transient state fields, to be sure all are clean */
348 : 70 : entry->xact_depth = 0;
349 : 70 : entry->have_prep_stmt = false;
350 : 70 : entry->have_error = false;
351 : 70 : entry->changing_xact_state = false;
352 : 70 : entry->invalidated = false;
1185 353 : 70 : entry->serverid = server->serverid;
1276 354 : 70 : entry->server_hashvalue =
355 : 70 : GetSysCacheHashValue1(FOREIGNSERVEROID,
356 : : ObjectIdGetDatum(server->serverid));
357 : 70 : entry->mapping_hashvalue =
358 : 70 : GetSysCacheHashValue1(USERMAPPINGOID,
359 : : ObjectIdGetDatum(user->umid));
1110 efujita@postgresql.o 360 : 70 : memset(&entry->state, 0, sizeof(entry->state));
361 : :
362 : : /*
363 : : * Determine whether to keep the connection that we're about to make here
364 : : * open even after the transaction using it ends, so that the subsequent
365 : : * transactions can re-use it.
366 : : *
367 : : * By default, all the connections to any foreign servers are kept open.
368 : : *
369 : : * Also determine whether to commit/abort (sub)transactions opened on the
370 : : * remote server in parallel at (sub)transaction end, which is disabled by
371 : : * default.
372 : : *
373 : : * Note: it's enough to determine these only when making a new connection
374 : : * because if these settings for it are changed, it will be closed and
375 : : * re-made later.
376 : : */
1108 fujii@postgresql.org 377 : 70 : entry->keep_connections = true;
780 efujita@postgresql.o 378 : 70 : entry->parallel_commit = false;
374 379 : 70 : entry->parallel_abort = false;
1108 fujii@postgresql.org 380 [ + - + + : 304 : foreach(lc, server->options)
+ + ]
381 : : {
382 : 234 : DefElem *def = (DefElem *) lfirst(lc);
383 : :
384 [ + + ]: 234 : if (strcmp(def->defname, "keep_connections") == 0)
385 : 6 : entry->keep_connections = defGetBoolean(def);
780 efujita@postgresql.o 386 [ + + ]: 228 : else if (strcmp(def->defname, "parallel_commit") == 0)
387 : 2 : entry->parallel_commit = defGetBoolean(def);
374 388 [ + + ]: 226 : else if (strcmp(def->defname, "parallel_abort") == 0)
389 : 2 : entry->parallel_abort = defGetBoolean(def);
390 : : }
391 : :
392 : : /* Now try to make the connection */
1276 fujii@postgresql.org 393 : 70 : entry->conn = connect_pg_server(server, user);
394 : :
395 [ - + ]: 61 : elog(DEBUG3, "new postgres_fdw connection %p for server \"%s\" (user mapping oid %u, userid %u)",
396 : : entry->conn, server->servername, user->umid, user->userid);
397 : 61 : }
398 : :
399 : : /*
400 : : * Check that non-superuser has used password or delegated credentials
401 : : * to establish connection; otherwise, he's piggybacking on the
402 : : * postgres server's user identity. See also dblink_security_check()
403 : : * in contrib/dblink and check_conn_params.
404 : : */
405 : : static void
367 sfrost@snowman.net 406 : 64 : pgfdw_security_check(const char **keywords, const char **values, UserMapping *user, PGconn *conn)
407 : : {
408 : : /* Superusers bypass the check */
409 [ + + ]: 64 : if (superuser_arg(user->userid))
410 : 57 : return;
411 : :
412 : : #ifdef ENABLE_GSS
413 : : /* Connected via GSSAPI with delegated credentials- all good. */
330 bruce@momjian.us 414 [ + + + - ]: 7 : if (PQconnectionUsedGSSAPI(conn) && be_gssapi_get_delegation(MyProcPort))
367 sfrost@snowman.net 415 : 2 : return;
416 : : #endif
417 : :
418 : : /* Ok if superuser set PW required false. */
419 [ + + ]: 5 : if (!UserMappingPasswordRequired(user))
420 : 2 : return;
421 : :
422 : : /* Connected via PW, with PW required true, and provided non-empty PW. */
423 [ + + ]: 3 : if (PQconnectionUsedPassword(conn))
424 : : {
425 : : /* ok if params contain a non-empty password */
426 [ + + ]: 7 : for (int i = 0; keywords[i] != NULL; i++)
427 : : {
428 [ - + - - ]: 6 : if (strcmp(keywords[i], "password") == 0 && values[i][0] != '\0')
367 sfrost@snowman.net 429 :UBC 0 : return;
430 : : }
431 : : }
432 : :
367 sfrost@snowman.net 433 [ + - ]:CBC 3 : ereport(ERROR,
434 : : (errcode(ERRCODE_S_R_E_PROHIBITED_SQL_STATEMENT_ATTEMPTED),
435 : : errmsg("password or GSSAPI delegated credentials required"),
436 : : errdetail("Non-superuser cannot connect if the server does not request a password or use GSSAPI with delegated credentials."),
437 : : errhint("Target server's authentication method must be changed or password_required=false set in the user mapping attributes.")));
438 : : }
439 : :
440 : : /*
441 : : * Connect to remote server using specified server and user mapping properties.
442 : : */
443 : : static PGconn *
4070 tgl@sss.pgh.pa.us 444 : 70 : connect_pg_server(ForeignServer *server, UserMapping *user)
445 : : {
446 : 70 : PGconn *volatile conn = NULL;
447 : :
448 : : /*
449 : : * Use PG_TRY block to ensure closing connection on error.
450 : : */
451 [ + + ]: 70 : PG_TRY();
452 : : {
453 : : const char **keywords;
454 : : const char **values;
842 fujii@postgresql.org 455 : 70 : char *appname = NULL;
456 : : int n;
457 : :
458 : : /*
459 : : * Construct connection params from generic options of ForeignServer
460 : : * and UserMapping. (Some of them might not be libpq options, in
461 : : * which case we'll just waste a few array slots.) Add 4 extra slots
462 : : * for application_name, fallback_application_name, client_encoding,
463 : : * end marker.
464 : : */
950 465 : 70 : n = list_length(server->options) + list_length(user->options) + 4;
4070 tgl@sss.pgh.pa.us 466 : 70 : keywords = (const char **) palloc(n * sizeof(char *));
467 : 70 : values = (const char **) palloc(n * sizeof(char *));
468 : :
469 : 70 : n = 0;
470 : 140 : n += ExtractConnectionOptions(server->options,
471 : 70 : keywords + n, values + n);
472 : 140 : n += ExtractConnectionOptions(user->options,
473 : 70 : keywords + n, values + n);
474 : :
475 : : /*
476 : : * Use pgfdw_application_name as application_name if set.
477 : : *
478 : : * PQconnectdbParams() processes the parameter arrays from start to
479 : : * end. If any key word is repeated, the last value is used. Therefore
480 : : * note that pgfdw_application_name must be added to the arrays after
481 : : * options of ForeignServer are, so that it can override
482 : : * application_name set in ForeignServer.
483 : : */
950 fujii@postgresql.org 484 [ + + + - ]: 70 : if (pgfdw_application_name && *pgfdw_application_name != '\0')
485 : : {
486 : 1 : keywords[n] = "application_name";
487 : 1 : values[n] = pgfdw_application_name;
488 : 1 : n++;
489 : : }
490 : :
491 : : /*
492 : : * Search the parameter arrays to find application_name setting, and
493 : : * replace escape sequences in it with status information if found.
494 : : * The arrays are searched backwards because the last value is used if
495 : : * application_name is repeatedly set.
496 : : */
842 497 [ + + ]: 195 : for (int i = n - 1; i >= 0; i--)
498 : : {
499 [ + + ]: 141 : if (strcmp(keywords[i], "application_name") == 0 &&
500 [ + - ]: 16 : *(values[i]) != '\0')
501 : : {
502 : : /*
503 : : * Use this application_name setting if it's not empty string
504 : : * even after any escape sequences in it are replaced.
505 : : */
506 : 16 : appname = process_pgfdw_appname(values[i]);
507 [ + - ]: 16 : if (appname[0] != '\0')
508 : : {
509 : 16 : values[i] = appname;
510 : 16 : break;
511 : : }
512 : :
513 : : /*
514 : : * This empty application_name is not used, so we set
515 : : * values[i] to NULL and keep searching the array to find the
516 : : * next one.
517 : : */
842 fujii@postgresql.org 518 :UBC 0 : values[i] = NULL;
519 : 0 : pfree(appname);
520 : 0 : appname = NULL;
521 : : }
522 : : }
523 : :
524 : : /* Use "postgres_fdw" as fallback_application_name */
4070 tgl@sss.pgh.pa.us 525 :CBC 70 : keywords[n] = "fallback_application_name";
526 : 70 : values[n] = "postgres_fdw";
527 : 70 : n++;
528 : :
529 : : /* Set client_encoding so that libpq can convert encoding properly. */
530 : 70 : keywords[n] = "client_encoding";
531 : 70 : values[n] = GetDatabaseEncodingName();
532 : 70 : n++;
533 : :
534 : 70 : keywords[n] = values[n] = NULL;
535 : :
536 : : /* verify the set of connection parameters */
2322 rhaas@postgresql.org 537 : 70 : check_conn_params(keywords, values, user);
538 : :
539 : : /* first time, allocate or get the custom wait event */
192 michael@paquier.xyz 540 [ + + ]:GNC 66 : if (pgfdw_we_connect == 0)
541 : 7 : pgfdw_we_connect = WaitEventExtensionNew("PostgresFdwConnect");
542 : :
543 : : /* OK to make connection */
447 andres@anarazel.de 544 :CBC 66 : conn = libpqsrv_connect_params(keywords, values,
545 : : false, /* expand_dbname */
546 : : pgfdw_we_connect);
547 : :
4070 tgl@sss.pgh.pa.us 548 [ + - + + ]: 66 : if (!conn || PQstatus(conn) != CONNECTION_OK)
549 [ + - ]: 2 : ereport(ERROR,
550 : : (errcode(ERRCODE_SQLCLIENT_UNABLE_TO_ESTABLISH_SQLCONNECTION),
551 : : errmsg("could not connect to server \"%s\"",
552 : : server->servername),
553 : : errdetail_internal("%s", pchomp(PQerrorMessage(conn)))));
554 : :
555 : : /* Perform post-connection security checks */
367 sfrost@snowman.net 556 : 64 : pgfdw_security_check(keywords, values, user, conn);
557 : :
558 : : /* Prepare new session for use */
4069 tgl@sss.pgh.pa.us 559 : 61 : configure_remote_session(conn);
560 : :
842 fujii@postgresql.org 561 [ + + ]: 61 : if (appname != NULL)
562 : 16 : pfree(appname);
4070 tgl@sss.pgh.pa.us 563 : 61 : pfree(keywords);
564 : 61 : pfree(values);
565 : : }
566 : 9 : PG_CATCH();
567 : : {
447 andres@anarazel.de 568 : 9 : libpqsrv_disconnect(conn);
4070 tgl@sss.pgh.pa.us 569 : 9 : PG_RE_THROW();
570 : : }
571 [ - + ]: 61 : PG_END_TRY();
572 : :
573 : 61 : return conn;
574 : : }
575 : :
576 : : /*
577 : : * Disconnect any open connection for a connection cache entry.
578 : : */
579 : : static void
2459 580 : 56 : disconnect_pg_server(ConnCacheEntry *entry)
581 : : {
582 [ + - ]: 56 : if (entry->conn != NULL)
583 : : {
447 andres@anarazel.de 584 : 56 : libpqsrv_disconnect(entry->conn);
2459 tgl@sss.pgh.pa.us 585 : 56 : entry->conn = NULL;
586 : : }
587 : 56 : }
588 : :
589 : : /*
590 : : * Return true if the password_required is defined and false for this user
591 : : * mapping, otherwise false. The mapping has been pre-validated.
592 : : */
593 : : static bool
1577 andrew@dunslane.net 594 : 10 : UserMappingPasswordRequired(UserMapping *user)
595 : : {
596 : : ListCell *cell;
597 : :
598 [ + + + + : 16 : foreach(cell, user->options)
+ + ]
599 : : {
600 : 9 : DefElem *def = (DefElem *) lfirst(cell);
601 : :
602 [ + + ]: 9 : if (strcmp(def->defname, "password_required") == 0)
603 : 3 : return defGetBoolean(def);
604 : : }
605 : :
606 : 7 : return true;
607 : : }
608 : :
609 : : /*
610 : : * For non-superusers, insist that the connstr specify a password or that the
611 : : * user provided their own GSSAPI delegated credentials. This
612 : : * prevents a password from being picked up from .pgpass, a service file, the
613 : : * environment, etc. We don't want the postgres user's passwords,
614 : : * certificates, etc to be accessible to non-superusers. (See also
615 : : * dblink_connstr_check in contrib/dblink.)
616 : : */
617 : : static void
2322 rhaas@postgresql.org 618 : 70 : check_conn_params(const char **keywords, const char **values, UserMapping *user)
619 : : {
620 : : int i;
621 : :
622 : : /* no check required if superuser */
623 [ + + ]: 70 : if (superuser_arg(user->userid))
4070 tgl@sss.pgh.pa.us 624 : 59 : return;
625 : :
626 : : #ifdef ENABLE_GSS
627 : : /* ok if the user provided their own delegated credentials */
330 bruce@momjian.us 628 [ + + ]: 11 : if (be_gssapi_get_delegation(MyProcPort))
367 sfrost@snowman.net 629 : 3 : return;
630 : : #endif
631 : :
632 : : /* ok if params contain a non-empty password */
4070 tgl@sss.pgh.pa.us 633 [ + + ]: 39 : for (i = 0; keywords[i] != NULL; i++)
634 : : {
635 [ + + + - ]: 34 : if (strcmp(keywords[i], "password") == 0 && values[i][0] != '\0')
636 : 3 : return;
637 : : }
638 : :
639 : : /* ok if the superuser explicitly said so at user mapping creation time */
1577 andrew@dunslane.net 640 [ + + ]: 5 : if (!UserMappingPasswordRequired(user))
641 : 1 : return;
642 : :
4070 tgl@sss.pgh.pa.us 643 [ + - ]: 4 : ereport(ERROR,
644 : : (errcode(ERRCODE_S_R_E_PROHIBITED_SQL_STATEMENT_ATTEMPTED),
645 : : errmsg("password or GSSAPI delegated credentials required"),
646 : : errdetail("Non-superusers must delegate GSSAPI credentials or provide a password in the user mapping.")));
647 : : }
648 : :
649 : : /*
650 : : * Issue SET commands to make sure remote session is configured properly.
651 : : *
652 : : * We do this just once at connection, assuming nothing will change the
653 : : * values later. Since we'll never send volatile function calls to the
654 : : * remote, there shouldn't be any way to break this assumption from our end.
655 : : * It's possible to think of ways to break it at the remote end, eg making
656 : : * a foreign table point to a view that includes a set_config call ---
657 : : * but once you admit the possibility of a malicious view definition,
658 : : * there are any number of ways to break things.
659 : : */
660 : : static void
4069 661 : 61 : configure_remote_session(PGconn *conn)
662 : : {
4052 663 : 61 : int remoteversion = PQserverVersion(conn);
664 : :
665 : : /* Force the search path to contain only pg_catalog (see deparse.c) */
666 : 61 : do_sql_command(conn, "SET search_path = pg_catalog");
667 : :
668 : : /*
669 : : * Set remote timezone; this is basically just cosmetic, since all
670 : : * transmitted and returned timestamptzs should specify a zone explicitly
671 : : * anyway. However it makes the regression test outputs more predictable.
672 : : *
673 : : * We don't risk setting remote zone equal to ours, since the remote
674 : : * server might use a different timezone database. Instead, use UTC
675 : : * (quoted, because very old servers are picky about case).
676 : : */
4041 677 : 61 : do_sql_command(conn, "SET timezone = 'UTC'");
678 : :
679 : : /*
680 : : * Set values needed to ensure unambiguous data output from remote. (This
681 : : * logic should match what pg_dump does. See also set_transmission_modes
682 : : * in postgres_fdw.c.)
683 : : */
4052 684 : 61 : do_sql_command(conn, "SET datestyle = ISO");
685 [ + - ]: 61 : if (remoteversion >= 80400)
686 : 61 : do_sql_command(conn, "SET intervalstyle = postgres");
687 [ + - ]: 61 : if (remoteversion >= 90000)
688 : 61 : do_sql_command(conn, "SET extra_float_digits = 3");
689 : : else
4052 tgl@sss.pgh.pa.us 690 :UBC 0 : do_sql_command(conn, "SET extra_float_digits = 2");
4052 tgl@sss.pgh.pa.us 691 :CBC 61 : }
692 : :
693 : : /*
694 : : * Convenience subroutine to issue a non-data-returning SQL command to remote
695 : : */
696 : : void
697 : 1725 : do_sql_command(PGconn *conn, const char *sql)
698 : : {
780 efujita@postgresql.o 699 : 1725 : do_sql_command_begin(conn, sql);
700 : 1725 : do_sql_command_end(conn, sql, false);
701 : 1722 : }
702 : :
703 : : static void
704 : 1743 : do_sql_command_begin(PGconn *conn, const char *sql)
705 : : {
2503 rhaas@postgresql.org 706 [ - + ]: 1743 : if (!PQsendQuery(conn, sql))
2503 rhaas@postgresql.org 707 :UBC 0 : pgfdw_report_error(ERROR, NULL, conn, false, sql);
780 efujita@postgresql.o 708 :CBC 1743 : }
709 : :
710 : : static void
711 : 1743 : do_sql_command_end(PGconn *conn, const char *sql, bool consume_input)
712 : : {
713 : : PGresult *res;
714 : :
715 : : /*
716 : : * If requested, consume whatever data is available from the socket. (Note
717 : : * that if all data is available, this allows pgfdw_get_result to call
718 : : * PQgetResult without forcing the overhead of WaitLatchOrSocket, which
719 : : * would be large compared to the overhead of PQconsumeInput.)
720 : : */
721 [ + + - + ]: 1743 : if (consume_input && !PQconsumeInput(conn))
780 efujita@postgresql.o 722 :UBC 0 : pgfdw_report_error(ERROR, NULL, conn, false, sql);
97 noah@leadboat.com 723 :GNC 1743 : res = pgfdw_get_result(conn);
4069 tgl@sss.pgh.pa.us 724 [ + + ]:CBC 1743 : if (PQresultStatus(res) != PGRES_COMMAND_OK)
3723 725 : 3 : pgfdw_report_error(ERROR, res, conn, true, sql);
4069 726 : 1740 : PQclear(res);
727 : 1740 : }
728 : :
729 : : /*
730 : : * Start remote transaction or subtransaction, if needed.
731 : : *
732 : : * Note that we always use at least REPEATABLE READ in the remote session.
733 : : * This is so that, if a query initiates multiple scans of the same or
734 : : * different foreign tables, we will get snapshot-consistent results from
735 : : * those scans. A disadvantage is that we can't provide sane emulation of
736 : : * READ COMMITTED behavior --- it would be nice if we had some other way to
737 : : * control which remote queries share a snapshot.
738 : : */
739 : : static void
4070 740 : 2097 : begin_remote_xact(ConnCacheEntry *entry)
741 : : {
742 : 2097 : int curlevel = GetCurrentTransactionNestLevel();
743 : :
744 : : /* Start main transaction if we haven't yet */
745 [ + + ]: 2097 : if (entry->xact_depth <= 0)
746 : : {
747 : : const char *sql;
748 : :
749 [ - + ]: 723 : elog(DEBUG3, "starting remote transaction on connection %p",
750 : : entry->conn);
751 : :
752 [ - + ]: 723 : if (IsolationIsSerializable())
4070 tgl@sss.pgh.pa.us 753 :UBC 0 : sql = "START TRANSACTION ISOLATION LEVEL SERIALIZABLE";
754 : : else
4070 tgl@sss.pgh.pa.us 755 :CBC 723 : sql = "START TRANSACTION ISOLATION LEVEL REPEATABLE READ";
2503 rhaas@postgresql.org 756 : 723 : entry->changing_xact_state = true;
4052 tgl@sss.pgh.pa.us 757 : 723 : do_sql_command(entry->conn, sql);
4070 758 : 722 : entry->xact_depth = 1;
2503 rhaas@postgresql.org 759 : 722 : entry->changing_xact_state = false;
760 : : }
761 : :
762 : : /*
763 : : * If we're in a subtransaction, stack up savepoints to match our level.
764 : : * This ensures we can rollback just the desired effects when a
765 : : * subtransaction aborts.
766 : : */
4070 tgl@sss.pgh.pa.us 767 [ + + ]: 2110 : while (entry->xact_depth < curlevel)
768 : : {
769 : : char sql[64];
770 : :
771 : 15 : snprintf(sql, sizeof(sql), "SAVEPOINT s%d", entry->xact_depth + 1);
2503 rhaas@postgresql.org 772 : 15 : entry->changing_xact_state = true;
4052 tgl@sss.pgh.pa.us 773 : 15 : do_sql_command(entry->conn, sql);
4070 774 : 14 : entry->xact_depth++;
2503 rhaas@postgresql.org 775 : 14 : entry->changing_xact_state = false;
776 : : }
4070 tgl@sss.pgh.pa.us 777 : 2095 : }
778 : :
779 : : /*
780 : : * Release connection reference count created by calling GetConnection.
781 : : */
782 : : void
783 : 2042 : ReleaseConnection(PGconn *conn)
784 : : {
785 : : /*
786 : : * Currently, we don't actually track connection references because all
787 : : * cleanup is managed on a transaction or subtransaction basis instead. So
788 : : * there's nothing to do here.
789 : : */
790 : 2042 : }
791 : :
792 : : /*
793 : : * Assign a "unique" number for a cursor.
794 : : *
795 : : * These really only need to be unique per connection within a transaction.
796 : : * For the moment we ignore the per-connection point and assign them across
797 : : * all connections in the transaction, but we ask for the connection to be
798 : : * supplied in case we want to refine that.
799 : : *
800 : : * Note that even if wraparound happens in a very long transaction, actual
801 : : * collisions are highly improbable; just be sure to use %u not %d to print.
802 : : */
803 : : unsigned int
804 : 511 : GetCursorNumber(PGconn *conn)
805 : : {
806 : 511 : return ++cursor_number;
807 : : }
808 : :
809 : : /*
810 : : * Assign a "unique" number for a prepared statement.
811 : : *
812 : : * This works much like GetCursorNumber, except that we never reset the counter
813 : : * within a session. That's because we can't be 100% sure we've gotten rid
814 : : * of all prepared statements on all connections, and it's not really worth
815 : : * increasing the risk of prepared-statement name collisions by resetting.
816 : : */
817 : : unsigned int
4053 818 : 174 : GetPrepStmtNumber(PGconn *conn)
819 : : {
820 : 174 : return ++prep_stmt_number;
821 : : }
822 : :
823 : : /*
824 : : * Submit a query and wait for the result.
825 : : *
826 : : * Since we don't use non-blocking mode, this can't process interrupts while
827 : : * pushing the query text to the server. That risk is relatively small, so we
828 : : * ignore that for now.
829 : : *
830 : : * Caller is responsible for the error handling on the result.
831 : : */
832 : : PGresult *
1110 efujita@postgresql.o 833 : 3868 : pgfdw_exec_query(PGconn *conn, const char *query, PgFdwConnState *state)
834 : : {
835 : : /* First, process a pending asynchronous request, if any. */
836 [ + + + + ]: 3868 : if (state && state->pendingAreq)
837 : 4 : process_pending_request(state->pendingAreq);
838 : :
2915 rhaas@postgresql.org 839 [ - + ]: 3868 : if (!PQsendQuery(conn, query))
97 noah@leadboat.com 840 :UNC 0 : return NULL;
97 noah@leadboat.com 841 :GNC 3868 : return pgfdw_get_result(conn);
842 : : }
843 : :
844 : : /*
845 : : * Wrap libpqsrv_get_result_last(), adding wait event.
846 : : *
847 : : * Caller is responsible for the error handling on the result.
848 : : */
849 : : PGresult *
850 : 7824 : pgfdw_get_result(PGconn *conn)
851 : : {
852 : 7824 : return libpqsrv_get_result_last(conn, pgfdw_we_get_result);
853 : : }
854 : :
855 : : /*
856 : : * Report an error we got from the remote server.
857 : : *
858 : : * elevel: error level to use (typically ERROR, but might be less)
859 : : * res: PGresult containing the error
860 : : * conn: connection we did the query on
861 : : * clear: if true, PQclear the result (otherwise caller will handle it)
862 : : * sql: NULL, or text of remote command we tried to execute
863 : : *
864 : : * Note: callers that choose not to throw ERROR for a remote error are
865 : : * responsible for making sure that the associated ConnCacheEntry gets
866 : : * marked with have_error = true.
867 : : */
868 : : void
3723 tgl@sss.pgh.pa.us 869 :CBC 16 : pgfdw_report_error(int elevel, PGresult *res, PGconn *conn,
870 : : bool clear, const char *sql)
871 : : {
872 : : /* If requested, PGresult must be released before leaving this function. */
4070 873 [ + + ]: 16 : PG_TRY();
874 : : {
875 : 16 : char *diag_sqlstate = PQresultErrorField(res, PG_DIAG_SQLSTATE);
876 : 16 : char *message_primary = PQresultErrorField(res, PG_DIAG_MESSAGE_PRIMARY);
877 : 16 : char *message_detail = PQresultErrorField(res, PG_DIAG_MESSAGE_DETAIL);
878 : 16 : char *message_hint = PQresultErrorField(res, PG_DIAG_MESSAGE_HINT);
879 : 16 : char *message_context = PQresultErrorField(res, PG_DIAG_CONTEXT);
880 : : int sqlstate;
881 : :
882 [ + + ]: 16 : if (diag_sqlstate)
883 : 14 : sqlstate = MAKE_SQLSTATE(diag_sqlstate[0],
884 : : diag_sqlstate[1],
885 : : diag_sqlstate[2],
886 : : diag_sqlstate[3],
887 : : diag_sqlstate[4]);
888 : : else
889 : 2 : sqlstate = ERRCODE_CONNECTION_FAILURE;
890 : :
891 : : /*
892 : : * If we don't get a message from the PGresult, try the PGconn. This
893 : : * is needed because for connection-level failures, PQgetResult may
894 : : * just return NULL, not a PGresult at all.
895 : : */
3723 896 [ + + ]: 16 : if (message_primary == NULL)
2603 peter_e@gmx.net 897 : 2 : message_primary = pchomp(PQerrorMessage(conn));
898 : :
4070 tgl@sss.pgh.pa.us 899 [ + - + - : 16 : ereport(elevel,
+ - + + +
+ - + +
- ]
900 : : (errcode(sqlstate),
901 : : (message_primary != NULL && message_primary[0] != '\0') ?
902 : : errmsg_internal("%s", message_primary) :
903 : : errmsg("could not obtain message string for remote error"),
904 : : message_detail ? errdetail_internal("%s", message_detail) : 0,
905 : : message_hint ? errhint("%s", message_hint) : 0,
906 : : message_context ? errcontext("%s", message_context) : 0,
907 : : sql ? errcontext("remote SQL command: %s", sql) : 0));
908 : : }
1626 peter@eisentraut.org 909 : 16 : PG_FINALLY();
910 : : {
4070 tgl@sss.pgh.pa.us 911 [ + + ]: 16 : if (clear)
912 : 15 : PQclear(res);
913 : : }
914 [ + - ]: 16 : PG_END_TRY();
4070 tgl@sss.pgh.pa.us 915 :UBC 0 : }
916 : :
917 : : /*
918 : : * pgfdw_xact_callback --- cleanup at main-transaction end.
919 : : *
920 : : * This runs just late enough that it must not enter user-defined code
921 : : * locally. (Entering such code on the remote side is fine. Its remote
922 : : * COMMIT TRANSACTION may run deferred triggers.)
923 : : */
924 : : static void
4070 tgl@sss.pgh.pa.us 925 :CBC 3785 : pgfdw_xact_callback(XactEvent event, void *arg)
926 : : {
927 : : HASH_SEQ_STATUS scan;
928 : : ConnCacheEntry *entry;
780 efujita@postgresql.o 929 : 3785 : List *pending_entries = NIL;
374 930 : 3785 : List *cancel_requested = NIL;
931 : :
932 : : /* Quick exit if no connections were touched in this transaction. */
4070 tgl@sss.pgh.pa.us 933 [ + + ]: 3785 : if (!xact_got_connection)
934 : 3091 : return;
935 : :
936 : : /*
937 : : * Scan all connection cache entries to find open remote transactions, and
938 : : * close them.
939 : : */
940 : 694 : hash_seq_init(&scan, ConnectionHash);
941 [ + + ]: 3562 : while ((entry = (ConnCacheEntry *) hash_seq_search(&scan)))
942 : : {
943 : : PGresult *res;
944 : :
945 : : /* Ignore cache entry if no open connection right now */
3723 946 [ + + ]: 2869 : if (entry->conn == NULL)
4070 947 : 1623 : continue;
948 : :
949 : : /* If it has an open remote transaction, try to close it */
3723 950 [ + + ]: 1246 : if (entry->xact_depth > 0)
951 : : {
952 [ - + ]: 723 : elog(DEBUG3, "closing remote transaction on connection %p",
953 : : entry->conn);
954 : :
955 [ + + - + : 723 : switch (event)
- ]
956 : : {
3272 rhaas@postgresql.org 957 : 681 : case XACT_EVENT_PARALLEL_PRE_COMMIT:
958 : : case XACT_EVENT_PRE_COMMIT:
959 : :
960 : : /*
961 : : * If abort cleanup previously failed for this connection,
962 : : * we can't issue any more commands against it.
963 : : */
2503 964 : 681 : pgfdw_reject_incomplete_xact_state_change(entry);
965 : :
966 : : /* Commit all remote transactions during pre-commit */
967 : 681 : entry->changing_xact_state = true;
780 efujita@postgresql.o 968 [ + + ]: 681 : if (entry->parallel_commit)
969 : : {
970 : 16 : do_sql_command_begin(entry->conn, "COMMIT TRANSACTION");
971 : 16 : pending_entries = lappend(pending_entries, entry);
972 : 16 : continue;
973 : : }
3723 tgl@sss.pgh.pa.us 974 : 665 : do_sql_command(entry->conn, "COMMIT TRANSACTION");
2503 rhaas@postgresql.org 975 : 665 : entry->changing_xact_state = false;
976 : :
977 : : /*
978 : : * If there were any errors in subtransactions, and we
979 : : * made prepared statements, do a DEALLOCATE ALL to make
980 : : * sure we get rid of all prepared statements. This is
981 : : * annoying and not terribly bulletproof, but it's
982 : : * probably not worth trying harder.
983 : : *
984 : : * DEALLOCATE ALL only exists in 8.3 and later, so this
985 : : * constrains how old a server postgres_fdw can
986 : : * communicate with. We intentionally ignore errors in
987 : : * the DEALLOCATE, so that we can hobble along to some
988 : : * extent with older servers (leaking prepared statements
989 : : * as we go; but we don't really support update operations
990 : : * pre-8.3 anyway).
991 : : */
4053 tgl@sss.pgh.pa.us 992 [ + + - + ]: 665 : if (entry->have_prep_stmt && entry->have_error)
993 : : {
97 noah@leadboat.com 994 :UNC 0 : res = pgfdw_exec_query(entry->conn, "DEALLOCATE ALL",
995 : : NULL);
4053 tgl@sss.pgh.pa.us 996 :UBC 0 : PQclear(res);
997 : : }
4053 tgl@sss.pgh.pa.us 998 :CBC 665 : entry->have_prep_stmt = false;
999 : 665 : entry->have_error = false;
3723 1000 : 665 : break;
1001 : 1 : case XACT_EVENT_PRE_PREPARE:
1002 : :
1003 : : /*
1004 : : * We disallow any remote transactions, since it's not
1005 : : * very reasonable to hold them open until the prepared
1006 : : * transaction is committed. For the moment, throw error
1007 : : * unconditionally; later we might allow read-only cases.
1008 : : * Note that the error will cause us to come right back
1009 : : * here with event == XACT_EVENT_ABORT, so we'll clean up
1010 : : * the connection state at that point.
1011 : : */
1012 [ + - ]: 1 : ereport(ERROR,
1013 : : (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1014 : : errmsg("cannot PREPARE a transaction that has operated on postgres_fdw foreign tables")));
1015 : : break;
3272 rhaas@postgresql.org 1016 :UBC 0 : case XACT_EVENT_PARALLEL_COMMIT:
1017 : : case XACT_EVENT_COMMIT:
1018 : : case XACT_EVENT_PREPARE:
1019 : : /* Pre-commit should have closed the open transaction */
3723 tgl@sss.pgh.pa.us 1020 [ # # ]: 0 : elog(ERROR, "missed cleaning up connection during pre-commit");
1021 : : break;
3272 rhaas@postgresql.org 1022 :CBC 41 : case XACT_EVENT_PARALLEL_ABORT:
1023 : : case XACT_EVENT_ABORT:
1024 : : /* Rollback all remote transactions during abort */
374 efujita@postgresql.o 1025 [ + + ]: 41 : if (entry->parallel_abort)
1026 : : {
1027 [ + - ]: 4 : if (pgfdw_abort_cleanup_begin(entry, true,
1028 : : &pending_entries,
1029 : : &cancel_requested))
1030 : 4 : continue;
1031 : : }
1032 : : else
1033 : 37 : pgfdw_abort_cleanup(entry, true);
3723 tgl@sss.pgh.pa.us 1034 : 37 : break;
1035 : : }
1036 : : }
1037 : :
1038 : : /* Reset state to show we're out of a transaction */
780 efujita@postgresql.o 1039 : 1225 : pgfdw_reset_xact_state(entry, true);
1040 : : }
1041 : :
1042 : : /* If there are any pending connections, finish cleaning them up */
374 1043 [ + + - + ]: 693 : if (pending_entries || cancel_requested)
1044 : : {
1045 [ + - + + ]: 15 : if (event == XACT_EVENT_PARALLEL_PRE_COMMIT ||
1046 : : event == XACT_EVENT_PRE_COMMIT)
1047 : : {
1048 [ - + ]: 13 : Assert(cancel_requested == NIL);
1049 : 13 : pgfdw_finish_pre_commit_cleanup(pending_entries);
1050 : : }
1051 : : else
1052 : : {
1053 [ + - - + ]: 2 : Assert(event == XACT_EVENT_PARALLEL_ABORT ||
1054 : : event == XACT_EVENT_ABORT);
1055 : 2 : pgfdw_finish_abort_cleanup(pending_entries, cancel_requested,
1056 : : true);
1057 : : }
1058 : : }
1059 : :
1060 : : /*
1061 : : * Regardless of the event type, we can now mark ourselves as out of the
1062 : : * transaction. (Note: if we are here during PRE_COMMIT or PRE_PREPARE,
1063 : : * this saves a useless scan of the hashtable during COMMIT or PREPARE.)
1064 : : */
4070 tgl@sss.pgh.pa.us 1065 : 693 : xact_got_connection = false;
1066 : :
1067 : : /* Also reset cursor numbering for next transaction */
1068 : 693 : cursor_number = 0;
1069 : : }
1070 : :
1071 : : /*
1072 : : * pgfdw_subxact_callback --- cleanup at subtransaction end.
1073 : : */
1074 : : static void
1075 : 38 : pgfdw_subxact_callback(SubXactEvent event, SubTransactionId mySubid,
1076 : : SubTransactionId parentSubid, void *arg)
1077 : : {
1078 : : HASH_SEQ_STATUS scan;
1079 : : ConnCacheEntry *entry;
1080 : : int curlevel;
780 efujita@postgresql.o 1081 : 38 : List *pending_entries = NIL;
374 1082 : 38 : List *cancel_requested = NIL;
1083 : :
1084 : : /* Nothing to do at subxact start, nor after commit. */
4070 tgl@sss.pgh.pa.us 1085 [ + + + + ]: 38 : if (!(event == SUBXACT_EVENT_PRE_COMMIT_SUB ||
1086 : : event == SUBXACT_EVENT_ABORT_SUB))
1087 : 23 : return;
1088 : :
1089 : : /* Quick exit if no connections were touched in this transaction. */
1090 [ - + ]: 15 : if (!xact_got_connection)
4070 tgl@sss.pgh.pa.us 1091 :UBC 0 : return;
1092 : :
1093 : : /*
1094 : : * Scan all connection cache entries to find open remote subtransactions
1095 : : * of the current level, and close them.
1096 : : */
4070 tgl@sss.pgh.pa.us 1097 :CBC 15 : curlevel = GetCurrentTransactionNestLevel();
1098 : 15 : hash_seq_init(&scan, ConnectionHash);
1099 [ + + ]: 102 : while ((entry = (ConnCacheEntry *) hash_seq_search(&scan)))
1100 : : {
1101 : : char sql[100];
1102 : :
1103 : : /*
1104 : : * We only care about connections with open remote subtransactions of
1105 : : * the current level.
1106 : : */
1107 [ + + + + ]: 87 : if (entry->conn == NULL || entry->xact_depth < curlevel)
1108 : 79 : continue;
1109 : :
1110 [ - + ]: 14 : if (entry->xact_depth > curlevel)
4070 tgl@sss.pgh.pa.us 1111 [ # # ]:UBC 0 : elog(ERROR, "missed cleaning up remote subtransaction at level %d",
1112 : : entry->xact_depth);
1113 : :
4070 tgl@sss.pgh.pa.us 1114 [ + + ]:CBC 14 : if (event == SUBXACT_EVENT_PRE_COMMIT_SUB)
1115 : : {
1116 : : /*
1117 : : * If abort cleanup previously failed for this connection, we
1118 : : * can't issue any more commands against it.
1119 : : */
2503 rhaas@postgresql.org 1120 : 7 : pgfdw_reject_incomplete_xact_state_change(entry);
1121 : :
1122 : : /* Commit all remote subtransactions during pre-commit */
4070 tgl@sss.pgh.pa.us 1123 : 7 : snprintf(sql, sizeof(sql), "RELEASE SAVEPOINT s%d", curlevel);
2503 rhaas@postgresql.org 1124 : 7 : entry->changing_xact_state = true;
780 efujita@postgresql.o 1125 [ + + ]: 7 : if (entry->parallel_commit)
1126 : : {
1127 : 2 : do_sql_command_begin(entry->conn, sql);
1128 : 2 : pending_entries = lappend(pending_entries, entry);
1129 : 2 : continue;
1130 : : }
4052 tgl@sss.pgh.pa.us 1131 : 5 : do_sql_command(entry->conn, sql);
2503 rhaas@postgresql.org 1132 : 5 : entry->changing_xact_state = false;
1133 : : }
1134 : : else
1135 : : {
1136 : : /* Rollback all remote subtransactions during abort */
374 efujita@postgresql.o 1137 [ + + ]: 7 : if (entry->parallel_abort)
1138 : : {
1139 [ + - ]: 4 : if (pgfdw_abort_cleanup_begin(entry, false,
1140 : : &pending_entries,
1141 : : &cancel_requested))
1142 : 4 : continue;
1143 : : }
1144 : : else
1145 : 3 : pgfdw_abort_cleanup(entry, false);
1146 : : }
1147 : :
1148 : : /* OK, we're outta that level of subtransaction */
780 1149 : 8 : pgfdw_reset_xact_state(entry, false);
1150 : : }
1151 : :
1152 : : /* If there are any pending connections, finish cleaning them up */
374 1153 [ + + - + ]: 15 : if (pending_entries || cancel_requested)
1154 : : {
1155 [ + + ]: 3 : if (event == SUBXACT_EVENT_PRE_COMMIT_SUB)
1156 : : {
1157 [ - + ]: 1 : Assert(cancel_requested == NIL);
1158 : 1 : pgfdw_finish_pre_subcommit_cleanup(pending_entries, curlevel);
1159 : : }
1160 : : else
1161 : : {
1162 [ - + ]: 2 : Assert(event == SUBXACT_EVENT_ABORT_SUB);
1163 : 2 : pgfdw_finish_abort_cleanup(pending_entries, cancel_requested,
1164 : : false);
1165 : : }
1166 : : }
1167 : : }
1168 : :
1169 : : /*
1170 : : * Connection invalidation callback function
1171 : : *
1172 : : * After a change to a pg_foreign_server or pg_user_mapping catalog entry,
1173 : : * close connections depending on that entry immediately if current transaction
1174 : : * has not used those connections yet. Otherwise, mark those connections as
1175 : : * invalid and then make pgfdw_xact_callback() close them at the end of current
1176 : : * transaction, since they cannot be closed in the midst of the transaction
1177 : : * using them. Closed connections will be remade at the next opportunity if
1178 : : * necessary.
1179 : : *
1180 : : * Although most cache invalidation callbacks blow away all the related stuff
1181 : : * regardless of the given hashvalue, connections are expensive enough that
1182 : : * it's worth trying to avoid that.
1183 : : *
1184 : : * NB: We could avoid unnecessary disconnection more strictly by examining
1185 : : * individual option values, but it seems too much effort for the gain.
1186 : : */
1187 : : static void
2459 tgl@sss.pgh.pa.us 1188 : 170 : pgfdw_inval_callback(Datum arg, int cacheid, uint32 hashvalue)
1189 : : {
1190 : : HASH_SEQ_STATUS scan;
1191 : : ConnCacheEntry *entry;
1192 : :
1193 [ + + - + ]: 170 : Assert(cacheid == FOREIGNSERVEROID || cacheid == USERMAPPINGOID);
1194 : :
1195 : : /* ConnectionHash must exist already, if we're registered */
1196 : 170 : hash_seq_init(&scan, ConnectionHash);
1197 [ + + ]: 1146 : while ((entry = (ConnCacheEntry *) hash_seq_search(&scan)))
1198 : : {
1199 : : /* Ignore invalid entries */
1200 [ + + ]: 976 : if (entry->conn == NULL)
1201 : 794 : continue;
1202 : :
1203 : : /* hashvalue == 0 means a cache reset, must clear all state */
1204 [ + - + + ]: 182 : if (hashvalue == 0 ||
1205 : 129 : (cacheid == FOREIGNSERVEROID &&
1206 [ + + + + ]: 182 : entry->server_hashvalue == hashvalue) ||
1207 : 53 : (cacheid == USERMAPPINGOID &&
1208 [ + + ]: 53 : entry->mapping_hashvalue == hashvalue))
1209 : : {
1210 : : /*
1211 : : * Close the connection immediately if it's not used yet in this
1212 : : * transaction. Otherwise mark it as invalid so that
1213 : : * pgfdw_xact_callback() can close it at the end of this
1214 : : * transaction.
1215 : : */
1203 fujii@postgresql.org 1216 [ + + ]: 49 : if (entry->xact_depth == 0)
1217 : : {
1218 [ - + ]: 46 : elog(DEBUG3, "discarding connection %p", entry->conn);
1219 : 46 : disconnect_pg_server(entry);
1220 : : }
1221 : : else
1222 : 3 : entry->invalidated = true;
1223 : : }
1224 : : }
2459 tgl@sss.pgh.pa.us 1225 : 170 : }
1226 : :
1227 : : /*
1228 : : * Raise an error if the given connection cache entry is marked as being
1229 : : * in the middle of an xact state change. This should be called at which no
1230 : : * such change is expected to be in progress; if one is found to be in
1231 : : * progress, it means that we aborted in the middle of a previous state change
1232 : : * and now don't know what the remote transaction state actually is.
1233 : : * Such connections can't safely be further used. Re-establishing the
1234 : : * connection would change the snapshot and roll back any writes already
1235 : : * performed, so that's not an option, either. Thus, we must abort.
1236 : : */
1237 : : static void
2503 rhaas@postgresql.org 1238 : 2793 : pgfdw_reject_incomplete_xact_state_change(ConnCacheEntry *entry)
1239 : : {
1240 : : ForeignServer *server;
1241 : :
1242 : : /* nothing to do for inactive entries and entries of sane state */
2459 tgl@sss.pgh.pa.us 1243 [ + + + - ]: 2793 : if (entry->conn == NULL || !entry->changing_xact_state)
2503 rhaas@postgresql.org 1244 : 2793 : return;
1245 : :
1246 : : /* make sure this entry is inactive */
2459 tgl@sss.pgh.pa.us 1247 :UBC 0 : disconnect_pg_server(entry);
1248 : :
1249 : : /* find server name to be shown in the message below */
1185 fujii@postgresql.org 1250 : 0 : server = GetForeignServer(entry->serverid);
1251 : :
2503 rhaas@postgresql.org 1252 [ # # ]: 0 : ereport(ERROR,
1253 : : (errcode(ERRCODE_CONNECTION_EXCEPTION),
1254 : : errmsg("connection to server \"%s\" was lost",
1255 : : server->servername)));
1256 : : }
1257 : :
1258 : : /*
1259 : : * Reset state to show we're out of a (sub)transaction.
1260 : : */
1261 : : static void
780 efujita@postgresql.o 1262 :CBC 1259 : pgfdw_reset_xact_state(ConnCacheEntry *entry, bool toplevel)
1263 : : {
1264 [ + + ]: 1259 : if (toplevel)
1265 : : {
1266 : : /* Reset state to show we're out of a transaction */
1267 : 1245 : entry->xact_depth = 0;
1268 : :
1269 : : /*
1270 : : * If the connection isn't in a good idle state, it is marked as
1271 : : * invalid or keep_connections option of its server is disabled, then
1272 : : * discard it to recover. Next GetConnection will open a new
1273 : : * connection.
1274 : : */
1275 [ + + + - ]: 2489 : if (PQstatus(entry->conn) != CONNECTION_OK ||
1276 : 1244 : PQtransactionStatus(entry->conn) != PQTRANS_IDLE ||
1277 [ + - ]: 1244 : entry->changing_xact_state ||
1278 [ + + ]: 1244 : entry->invalidated ||
1279 [ + + ]: 1242 : !entry->keep_connections)
1280 : : {
1281 [ - + ]: 4 : elog(DEBUG3, "discarding connection %p", entry->conn);
1282 : 4 : disconnect_pg_server(entry);
1283 : : }
1284 : : }
1285 : : else
1286 : : {
1287 : : /* Reset state to show we're out of a subtransaction */
1288 : 14 : entry->xact_depth--;
1289 : : }
1290 : 1259 : }
1291 : :
1292 : : /*
1293 : : * Cancel the currently-in-progress query (whose query text we do not have)
1294 : : * and ignore the result. Returns true if we successfully cancel the query
1295 : : * and discard any pending result, and false if not.
1296 : : *
1297 : : * It's not a huge problem if we throw an ERROR here, but if we get into error
1298 : : * recursion trouble, we'll end up slamming the connection shut, which will
1299 : : * necessitate failing the entire toplevel transaction even if subtransactions
1300 : : * were used. Try to use WARNING where we can.
1301 : : *
1302 : : * XXX: if the query was one sent by fetch_more_data_begin(), we could get the
1303 : : * query text from the pendingAreq saved in the per-connection state, then
1304 : : * report the query using it.
1305 : : */
1306 : : static bool
2503 rhaas@postgresql.org 1307 : 2 : pgfdw_cancel_query(PGconn *conn)
1308 : : {
1309 : : TimestampTz endtime;
1310 : :
1311 : : /*
1312 : : * If it takes too long to cancel the query and discard the result, assume
1313 : : * the connection is dead.
1314 : : */
374 efujita@postgresql.o 1315 : 2 : endtime = TimestampTzPlusMilliseconds(GetCurrentTimestamp(),
1316 : : CONNECTION_CLEANUP_TIMEOUT);
1317 : :
17 alvherre@alvh.no-ip. 1318 [ - + ]:GNC 2 : if (!pgfdw_cancel_query_begin(conn, endtime))
374 efujita@postgresql.o 1319 :UBC 0 : return false;
374 efujita@postgresql.o 1320 :CBC 2 : return pgfdw_cancel_query_end(conn, endtime, false);
1321 : : }
1322 : :
1323 : : /*
1324 : : * Submit a cancel request to the given connection, waiting only until
1325 : : * the given time.
1326 : : *
1327 : : * We sleep interruptibly until we receive confirmation that the cancel
1328 : : * request has been accepted, and if it is, return true; if the timeout
1329 : : * lapses without that, or the request fails for whatever reason, return
1330 : : * false.
1331 : : */
1332 : : static bool
17 alvherre@alvh.no-ip. 1333 :GNC 2 : pgfdw_cancel_query_begin(PGconn *conn, TimestampTz endtime)
1334 : : {
9 1335 : 2 : const char *errormsg = libpqsrv_cancel(conn, endtime);
1336 : :
17 1337 [ - + ]: 2 : if (errormsg != NULL)
17 alvherre@alvh.no-ip. 1338 [ # # ]:UNC 0 : ereport(WARNING,
1339 : : errcode(ERRCODE_CONNECTION_FAILURE),
1340 : : errmsg("could not send cancel request: %s", errormsg));
1341 : :
17 alvherre@alvh.no-ip. 1342 :GNC 2 : return errormsg == NULL;
1343 : : }
1344 : :
1345 : : static bool
374 efujita@postgresql.o 1346 :CBC 2 : pgfdw_cancel_query_end(PGconn *conn, TimestampTz endtime, bool consume_input)
1347 : : {
1348 : 2 : PGresult *result = NULL;
1349 : : bool timed_out;
1350 : :
1351 : : /*
1352 : : * If requested, consume whatever data is available from the socket. (Note
1353 : : * that if all data is available, this allows pgfdw_get_cleanup_result to
1354 : : * call PQgetResult without forcing the overhead of WaitLatchOrSocket,
1355 : : * which would be large compared to the overhead of PQconsumeInput.)
1356 : : */
1357 [ - + - - ]: 2 : if (consume_input && !PQconsumeInput(conn))
1358 : : {
374 efujita@postgresql.o 1359 [ # # ]:UBC 0 : ereport(WARNING,
1360 : : (errcode(ERRCODE_CONNECTION_FAILURE),
1361 : : errmsg("could not get result of cancel request: %s",
1362 : : pchomp(PQerrorMessage(conn)))));
1363 : 0 : return false;
1364 : : }
1365 : :
1366 : : /* Get and discard the result of the query. */
858 fujii@postgresql.org 1367 [ - + ]:CBC 2 : if (pgfdw_get_cleanup_result(conn, endtime, &result, &timed_out))
1368 : : {
858 fujii@postgresql.org 1369 [ # # ]:UBC 0 : if (timed_out)
1370 [ # # ]: 0 : ereport(WARNING,
1371 : : (errmsg("could not get result of cancel request due to timeout")));
1372 : : else
1373 [ # # ]: 0 : ereport(WARNING,
1374 : : (errcode(ERRCODE_CONNECTION_FAILURE),
1375 : : errmsg("could not get result of cancel request: %s",
1376 : : pchomp(PQerrorMessage(conn)))));
1377 : :
2503 rhaas@postgresql.org 1378 : 0 : return false;
1379 : : }
2503 rhaas@postgresql.org 1380 :CBC 2 : PQclear(result);
1381 : :
1382 : 2 : return true;
1383 : : }
1384 : :
1385 : : /*
1386 : : * Submit a query during (sub)abort cleanup and wait up to 30 seconds for the
1387 : : * result. If the query is executed without error, the return value is true.
1388 : : * If the query is executed successfully but returns an error, the return
1389 : : * value is true if and only if ignore_errors is set. If the query can't be
1390 : : * sent or times out, the return value is false.
1391 : : *
1392 : : * It's not a huge problem if we throw an ERROR here, but if we get into error
1393 : : * recursion trouble, we'll end up slamming the connection shut, which will
1394 : : * necessitate failing the entire toplevel transaction even if subtransactions
1395 : : * were used. Try to use WARNING where we can.
1396 : : */
1397 : : static bool
1398 : 53 : pgfdw_exec_cleanup_query(PGconn *conn, const char *query, bool ignore_errors)
1399 : : {
1400 : : TimestampTz endtime;
1401 : :
1402 : : /*
1403 : : * If it takes too long to execute a cleanup query, assume the connection
1404 : : * is dead. It's fairly likely that this is why we aborted in the first
1405 : : * place (e.g. statement timeout, user cancel), so the timeout shouldn't
1406 : : * be too long.
1407 : : */
374 efujita@postgresql.o 1408 : 53 : endtime = TimestampTzPlusMilliseconds(GetCurrentTimestamp(),
1409 : : CONNECTION_CLEANUP_TIMEOUT);
1410 : :
1411 [ - + ]: 53 : if (!pgfdw_exec_cleanup_query_begin(conn, query))
374 efujita@postgresql.o 1412 :UBC 0 : return false;
374 efujita@postgresql.o 1413 :CBC 53 : return pgfdw_exec_cleanup_query_end(conn, query, endtime,
1414 : : false, ignore_errors);
1415 : : }
1416 : :
1417 : : static bool
1418 : 65 : pgfdw_exec_cleanup_query_begin(PGconn *conn, const char *query)
1419 : : {
10 efujita@postgresql.o 1420 [ - + ]:GNC 65 : Assert(query != NULL);
1421 : :
1422 : : /*
1423 : : * Submit a query. Since we don't use non-blocking mode, this also can
1424 : : * block. But its risk is relatively small, so we ignore that for now.
1425 : : */
2503 rhaas@postgresql.org 1426 [ - + ]:CBC 65 : if (!PQsendQuery(conn, query))
1427 : : {
2503 rhaas@postgresql.org 1428 :UBC 0 : pgfdw_report_error(WARNING, NULL, conn, false, query);
1429 : 0 : return false;
1430 : : }
1431 : :
374 efujita@postgresql.o 1432 :CBC 65 : return true;
1433 : : }
1434 : :
1435 : : static bool
1436 : 65 : pgfdw_exec_cleanup_query_end(PGconn *conn, const char *query,
1437 : : TimestampTz endtime, bool consume_input,
1438 : : bool ignore_errors)
1439 : : {
1440 : 65 : PGresult *result = NULL;
1441 : : bool timed_out;
1442 : :
10 efujita@postgresql.o 1443 [ - + ]:GNC 65 : Assert(query != NULL);
1444 : :
1445 : : /*
1446 : : * If requested, consume whatever data is available from the socket. (Note
1447 : : * that if all data is available, this allows pgfdw_get_cleanup_result to
1448 : : * call PQgetResult without forcing the overhead of WaitLatchOrSocket,
1449 : : * which would be large compared to the overhead of PQconsumeInput.)
1450 : : */
374 efujita@postgresql.o 1451 [ + + - + ]:CBC 65 : if (consume_input && !PQconsumeInput(conn))
1452 : : {
374 efujita@postgresql.o 1453 :UBC 0 : pgfdw_report_error(WARNING, NULL, conn, false, query);
1454 : 0 : return false;
1455 : : }
1456 : :
1457 : : /* Get the result of the query. */
858 fujii@postgresql.org 1458 [ - + ]:CBC 65 : if (pgfdw_get_cleanup_result(conn, endtime, &result, &timed_out))
1459 : : {
858 fujii@postgresql.org 1460 [ # # ]:UBC 0 : if (timed_out)
1461 [ # # # # ]: 0 : ereport(WARNING,
1462 : : (errmsg("could not get query result due to timeout"),
1463 : : errcontext("remote SQL command: %s", query)));
1464 : : else
1465 : 0 : pgfdw_report_error(WARNING, NULL, conn, false, query);
1466 : :
2503 rhaas@postgresql.org 1467 : 0 : return false;
1468 : : }
1469 : :
1470 : : /* Issue a warning if not successful. */
2503 rhaas@postgresql.org 1471 [ - + ]:CBC 65 : if (PQresultStatus(result) != PGRES_COMMAND_OK)
1472 : : {
2503 rhaas@postgresql.org 1473 :UBC 0 : pgfdw_report_error(WARNING, result, conn, true, query);
1474 : 0 : return ignore_errors;
1475 : : }
2495 tgl@sss.pgh.pa.us 1476 :CBC 65 : PQclear(result);
1477 : :
2503 rhaas@postgresql.org 1478 : 65 : return true;
1479 : : }
1480 : :
1481 : : /*
1482 : : * Get, during abort cleanup, the result of a query that is in progress. This
1483 : : * might be a query that is being interrupted by transaction abort, or it might
1484 : : * be a query that was initiated as part of transaction abort to get the remote
1485 : : * side back to the appropriate state.
1486 : : *
1487 : : * endtime is the time at which we should give up and assume the remote
1488 : : * side is dead. Returns true if the timeout expired or connection trouble
1489 : : * occurred, false otherwise. Sets *result except in case of a timeout.
1490 : : * Sets timed_out to true only when the timeout expired.
1491 : : */
1492 : : static bool
858 fujii@postgresql.org 1493 : 67 : pgfdw_get_cleanup_result(PGconn *conn, TimestampTz endtime, PGresult **result,
1494 : : bool *timed_out)
1495 : : {
1496 : 67 : volatile bool failed = false;
2495 tgl@sss.pgh.pa.us 1497 : 67 : PGresult *volatile last_res = NULL;
1498 : :
858 fujii@postgresql.org 1499 : 67 : *timed_out = false;
1500 : :
1501 : : /* In what follows, do not leak any PGresults on an error. */
2495 tgl@sss.pgh.pa.us 1502 [ + - ]: 67 : PG_TRY();
1503 : : {
1504 : : for (;;)
2503 rhaas@postgresql.org 1505 : 74 : {
1506 : : PGresult *res;
1507 : :
2495 tgl@sss.pgh.pa.us 1508 [ + + ]: 347 : while (PQisBusy(conn))
1509 : : {
1510 : : int wc;
1511 : 65 : TimestampTz now = GetCurrentTimestamp();
1512 : : long cur_timeout;
1513 : :
1514 : : /* If timeout has expired, give up, else get sleep time. */
1251 1515 : 65 : cur_timeout = TimestampDifferenceMilliseconds(now, endtime);
1516 [ - + ]: 65 : if (cur_timeout <= 0)
1517 : : {
858 fujii@postgresql.org 1518 :UBC 0 : *timed_out = true;
1519 : 0 : failed = true;
2495 tgl@sss.pgh.pa.us 1520 : 0 : goto exit;
1521 : : }
1522 : :
1523 : : /* first time, allocate or get the custom wait event */
192 michael@paquier.xyz 1524 [ + + ]:GNC 65 : if (pgfdw_we_cleanup_result == 0)
1525 : 1 : pgfdw_we_cleanup_result = WaitEventExtensionNew("PostgresFdwCleanupResult");
1526 : :
1527 : : /* Sleep until there's something to do */
2495 tgl@sss.pgh.pa.us 1528 :CBC 65 : wc = WaitLatchOrSocket(MyLatch,
1529 : : WL_LATCH_SET | WL_SOCKET_READABLE |
1530 : : WL_TIMEOUT | WL_EXIT_ON_PM_DEATH,
1531 : : PQsocket(conn),
1532 : : cur_timeout, pgfdw_we_cleanup_result);
1533 : 65 : ResetLatch(MyLatch);
1534 : :
1535 [ - + ]: 65 : CHECK_FOR_INTERRUPTS();
1536 : :
1537 : : /* Data available in socket? */
1538 [ + - ]: 65 : if (wc & WL_SOCKET_READABLE)
1539 : : {
1540 [ - + ]: 65 : if (!PQconsumeInput(conn))
1541 : : {
1542 : : /* connection trouble */
858 fujii@postgresql.org 1543 :UBC 0 : failed = true;
2495 tgl@sss.pgh.pa.us 1544 : 0 : goto exit;
1545 : : }
1546 : : }
1547 : : }
1548 : :
2495 tgl@sss.pgh.pa.us 1549 :CBC 141 : res = PQgetResult(conn);
1550 [ + + ]: 141 : if (res == NULL)
1551 : 67 : break; /* query is complete */
1552 : :
1553 : 74 : PQclear(last_res);
1554 : 74 : last_res = res;
1555 : : }
1556 : 67 : exit: ;
1557 : : }
2495 tgl@sss.pgh.pa.us 1558 :UBC 0 : PG_CATCH();
1559 : : {
2503 rhaas@postgresql.org 1560 : 0 : PQclear(last_res);
2495 tgl@sss.pgh.pa.us 1561 : 0 : PG_RE_THROW();
1562 : : }
2495 tgl@sss.pgh.pa.us 1563 [ - + ]:CBC 67 : PG_END_TRY();
1564 : :
858 fujii@postgresql.org 1565 [ - + ]: 67 : if (failed)
2495 tgl@sss.pgh.pa.us 1566 :UBC 0 : PQclear(last_res);
1567 : : else
2495 tgl@sss.pgh.pa.us 1568 :CBC 67 : *result = last_res;
858 fujii@postgresql.org 1569 : 67 : return failed;
1570 : : }
1571 : :
1572 : : /*
1573 : : * Abort remote transaction or subtransaction.
1574 : : *
1575 : : * "toplevel" should be set to true if toplevel (main) transaction is
1576 : : * rollbacked, false otherwise.
1577 : : *
1578 : : * Set entry->changing_xact_state to false on success, true on failure.
1579 : : */
1580 : : static void
751 efujita@postgresql.o 1581 : 40 : pgfdw_abort_cleanup(ConnCacheEntry *entry, bool toplevel)
1582 : : {
1583 : : char sql[100];
1584 : :
1585 : : /*
1586 : : * Don't try to clean up the connection if we're already in error
1587 : : * recursion trouble.
1588 : : */
935 fujii@postgresql.org 1589 [ - + ]: 40 : if (in_error_recursion_trouble())
935 fujii@postgresql.org 1590 :UBC 0 : entry->changing_xact_state = true;
1591 : :
1592 : : /*
1593 : : * If connection is already unsalvageable, don't touch it further.
1594 : : */
935 fujii@postgresql.org 1595 [ + + ]:CBC 40 : if (entry->changing_xact_state)
1596 : 1 : return;
1597 : :
1598 : : /*
1599 : : * Mark this connection as in the process of changing transaction state.
1600 : : */
1601 : 39 : entry->changing_xact_state = true;
1602 : :
1603 : : /* Assume we might have lost track of prepared statements */
1604 : 39 : entry->have_error = true;
1605 : :
1606 : : /*
1607 : : * If a command has been submitted to the remote server by using an
1608 : : * asynchronous execution function, the command might not have yet
1609 : : * completed. Check to see if a command is still being processed by the
1610 : : * remote server, and if so, request cancellation of the command.
1611 : : */
1612 [ + + ]: 39 : if (PQtransactionStatus(entry->conn) == PQTRANS_ACTIVE &&
1613 [ - + ]: 2 : !pgfdw_cancel_query(entry->conn))
935 fujii@postgresql.org 1614 :UBC 0 : return; /* Unable to cancel running query */
1615 : :
374 efujita@postgresql.o 1616 [ + + ]:CBC 39 : CONSTRUCT_ABORT_COMMAND(sql, entry, toplevel);
935 fujii@postgresql.org 1617 [ - + ]: 39 : if (!pgfdw_exec_cleanup_query(entry->conn, sql, false))
751 efujita@postgresql.o 1618 :UBC 0 : return; /* Unable to abort remote (sub)transaction */
1619 : :
935 fujii@postgresql.org 1620 [ + + ]:CBC 39 : if (toplevel)
1621 : : {
1622 [ + + + - ]: 36 : if (entry->have_prep_stmt && entry->have_error &&
1623 [ - + ]: 14 : !pgfdw_exec_cleanup_query(entry->conn,
1624 : : "DEALLOCATE ALL",
1625 : : true))
935 fujii@postgresql.org 1626 :UBC 0 : return; /* Trouble clearing prepared statements */
1627 : :
935 fujii@postgresql.org 1628 :CBC 36 : entry->have_prep_stmt = false;
1629 : 36 : entry->have_error = false;
1630 : : }
1631 : :
1632 : : /*
1633 : : * If pendingAreq of the per-connection state is not NULL, it means that
1634 : : * an asynchronous fetch begun by fetch_more_data_begin() was not done
1635 : : * successfully and thus the per-connection state was not reset in
1636 : : * fetch_more_data(); in that case reset the per-connection state here.
1637 : : */
814 efujita@postgresql.o 1638 [ + + ]: 39 : if (entry->state.pendingAreq)
1639 : 1 : memset(&entry->state, 0, sizeof(entry->state));
1640 : :
1641 : : /* Disarm changing_xact_state if it all worked */
935 fujii@postgresql.org 1642 : 39 : entry->changing_xact_state = false;
1643 : : }
1644 : :
1645 : : /*
1646 : : * Like pgfdw_abort_cleanup, submit an abort command or cancel request, but
1647 : : * don't wait for the result.
1648 : : *
1649 : : * Returns true if the abort command or cancel request is successfully issued,
1650 : : * false otherwise. If the abort command is successfully issued, the given
1651 : : * connection cache entry is appended to *pending_entries. Otherwise, if the
1652 : : * cancel request is successfully issued, it is appended to *cancel_requested.
1653 : : */
1654 : : static bool
374 efujita@postgresql.o 1655 : 8 : pgfdw_abort_cleanup_begin(ConnCacheEntry *entry, bool toplevel,
1656 : : List **pending_entries, List **cancel_requested)
1657 : : {
1658 : : /*
1659 : : * Don't try to clean up the connection if we're already in error
1660 : : * recursion trouble.
1661 : : */
1662 [ - + ]: 8 : if (in_error_recursion_trouble())
374 efujita@postgresql.o 1663 :UBC 0 : entry->changing_xact_state = true;
1664 : :
1665 : : /*
1666 : : * If connection is already unsalvageable, don't touch it further.
1667 : : */
374 efujita@postgresql.o 1668 [ - + ]:CBC 8 : if (entry->changing_xact_state)
374 efujita@postgresql.o 1669 :UBC 0 : return false;
1670 : :
1671 : : /*
1672 : : * Mark this connection as in the process of changing transaction state.
1673 : : */
374 efujita@postgresql.o 1674 :CBC 8 : entry->changing_xact_state = true;
1675 : :
1676 : : /* Assume we might have lost track of prepared statements */
1677 : 8 : entry->have_error = true;
1678 : :
1679 : : /*
1680 : : * If a command has been submitted to the remote server by using an
1681 : : * asynchronous execution function, the command might not have yet
1682 : : * completed. Check to see if a command is still being processed by the
1683 : : * remote server, and if so, request cancellation of the command.
1684 : : */
1685 [ - + ]: 8 : if (PQtransactionStatus(entry->conn) == PQTRANS_ACTIVE)
1686 : : {
1687 : : TimestampTz endtime;
1688 : :
17 alvherre@alvh.no-ip. 1689 :UNC 0 : endtime = TimestampTzPlusMilliseconds(GetCurrentTimestamp(),
1690 : : CONNECTION_CLEANUP_TIMEOUT);
1691 [ # # ]: 0 : if (!pgfdw_cancel_query_begin(entry->conn, endtime))
374 efujita@postgresql.o 1692 :UBC 0 : return false; /* Unable to cancel running query */
1693 : 0 : *cancel_requested = lappend(*cancel_requested, entry);
1694 : : }
1695 : : else
1696 : : {
1697 : : char sql[100];
1698 : :
374 efujita@postgresql.o 1699 [ + + ]:CBC 8 : CONSTRUCT_ABORT_COMMAND(sql, entry, toplevel);
1700 [ - + ]: 8 : if (!pgfdw_exec_cleanup_query_begin(entry->conn, sql))
374 efujita@postgresql.o 1701 :UBC 0 : return false; /* Unable to abort remote transaction */
374 efujita@postgresql.o 1702 :CBC 8 : *pending_entries = lappend(*pending_entries, entry);
1703 : : }
1704 : :
1705 : 8 : return true;
1706 : : }
1707 : :
1708 : : /*
1709 : : * Finish pre-commit cleanup of connections on each of which we've sent a
1710 : : * COMMIT command to the remote server.
1711 : : */
1712 : : static void
780 1713 : 13 : pgfdw_finish_pre_commit_cleanup(List *pending_entries)
1714 : : {
1715 : : ConnCacheEntry *entry;
1716 : 13 : List *pending_deallocs = NIL;
1717 : : ListCell *lc;
1718 : :
1719 [ - + ]: 13 : Assert(pending_entries);
1720 : :
1721 : : /*
1722 : : * Get the result of the COMMIT command for each of the pending entries
1723 : : */
1724 [ + - + + : 29 : foreach(lc, pending_entries)
+ + ]
1725 : : {
1726 : 16 : entry = (ConnCacheEntry *) lfirst(lc);
1727 : :
1728 [ - + ]: 16 : Assert(entry->changing_xact_state);
1729 : :
1730 : : /*
1731 : : * We might already have received the result on the socket, so pass
1732 : : * consume_input=true to try to consume it first
1733 : : */
1734 : 16 : do_sql_command_end(entry->conn, "COMMIT TRANSACTION", true);
1735 : 16 : entry->changing_xact_state = false;
1736 : :
1737 : : /* Do a DEALLOCATE ALL in parallel if needed */
1738 [ + + + + ]: 16 : if (entry->have_prep_stmt && entry->have_error)
1739 : : {
1740 : : /* Ignore errors (see notes in pgfdw_xact_callback) */
1741 [ + - ]: 2 : if (PQsendQuery(entry->conn, "DEALLOCATE ALL"))
1742 : : {
1743 : 2 : pending_deallocs = lappend(pending_deallocs, entry);
1744 : 2 : continue;
1745 : : }
1746 : : }
1747 : 14 : entry->have_prep_stmt = false;
1748 : 14 : entry->have_error = false;
1749 : :
1750 : 14 : pgfdw_reset_xact_state(entry, true);
1751 : : }
1752 : :
1753 : : /* No further work if no pending entries */
1754 [ + + ]: 13 : if (!pending_deallocs)
1755 : 12 : return;
1756 : :
1757 : : /*
1758 : : * Get the result of the DEALLOCATE command for each of the pending
1759 : : * entries
1760 : : */
1761 [ + - + + : 3 : foreach(lc, pending_deallocs)
+ + ]
1762 : : {
1763 : : PGresult *res;
1764 : :
1765 : 2 : entry = (ConnCacheEntry *) lfirst(lc);
1766 : :
1767 : : /* Ignore errors (see notes in pgfdw_xact_callback) */
1768 [ + + ]: 4 : while ((res = PQgetResult(entry->conn)) != NULL)
1769 : : {
1770 : 2 : PQclear(res);
1771 : : /* Stop if the connection is lost (else we'll loop infinitely) */
1772 [ - + ]: 2 : if (PQstatus(entry->conn) == CONNECTION_BAD)
780 efujita@postgresql.o 1773 :UBC 0 : break;
1774 : : }
780 efujita@postgresql.o 1775 :CBC 2 : entry->have_prep_stmt = false;
1776 : 2 : entry->have_error = false;
1777 : :
1778 : 2 : pgfdw_reset_xact_state(entry, true);
1779 : : }
1780 : : }
1781 : :
1782 : : /*
1783 : : * Finish pre-subcommit cleanup of connections on each of which we've sent a
1784 : : * RELEASE command to the remote server.
1785 : : */
1786 : : static void
1787 : 1 : pgfdw_finish_pre_subcommit_cleanup(List *pending_entries, int curlevel)
1788 : : {
1789 : : ConnCacheEntry *entry;
1790 : : char sql[100];
1791 : : ListCell *lc;
1792 : :
1793 [ - + ]: 1 : Assert(pending_entries);
1794 : :
1795 : : /*
1796 : : * Get the result of the RELEASE command for each of the pending entries
1797 : : */
1798 : 1 : snprintf(sql, sizeof(sql), "RELEASE SAVEPOINT s%d", curlevel);
1799 [ + - + + : 3 : foreach(lc, pending_entries)
+ + ]
1800 : : {
1801 : 2 : entry = (ConnCacheEntry *) lfirst(lc);
1802 : :
1803 [ - + ]: 2 : Assert(entry->changing_xact_state);
1804 : :
1805 : : /*
1806 : : * We might already have received the result on the socket, so pass
1807 : : * consume_input=true to try to consume it first
1808 : : */
1809 : 2 : do_sql_command_end(entry->conn, sql, true);
1810 : 2 : entry->changing_xact_state = false;
1811 : :
1812 : 2 : pgfdw_reset_xact_state(entry, false);
1813 : : }
1814 : 1 : }
1815 : :
1816 : : /*
1817 : : * Finish abort cleanup of connections on each of which we've sent an abort
1818 : : * command or cancel request to the remote server.
1819 : : */
1820 : : static void
374 1821 : 4 : pgfdw_finish_abort_cleanup(List *pending_entries, List *cancel_requested,
1822 : : bool toplevel)
1823 : : {
1824 : 4 : List *pending_deallocs = NIL;
1825 : : ListCell *lc;
1826 : :
1827 : : /*
1828 : : * For each of the pending cancel requests (if any), get and discard the
1829 : : * result of the query, and submit an abort command to the remote server.
1830 : : */
1831 [ - + ]: 4 : if (cancel_requested)
1832 : : {
374 efujita@postgresql.o 1833 [ # # # # :UBC 0 : foreach(lc, cancel_requested)
# # ]
1834 : : {
1835 : 0 : ConnCacheEntry *entry = (ConnCacheEntry *) lfirst(lc);
1836 : : TimestampTz endtime;
1837 : : char sql[100];
1838 : :
1839 [ # # ]: 0 : Assert(entry->changing_xact_state);
1840 : :
1841 : : /*
1842 : : * Set end time. You might think we should do this before issuing
1843 : : * cancel request like in normal mode, but that is problematic,
1844 : : * because if, for example, it took longer than 30 seconds to
1845 : : * process the first few entries in the cancel_requested list, it
1846 : : * would cause a timeout error when processing each of the
1847 : : * remaining entries in the list, leading to slamming that entry's
1848 : : * connection shut.
1849 : : */
1850 : 0 : endtime = TimestampTzPlusMilliseconds(GetCurrentTimestamp(),
1851 : : CONNECTION_CLEANUP_TIMEOUT);
1852 : :
1853 [ # # ]: 0 : if (!pgfdw_cancel_query_end(entry->conn, endtime, true))
1854 : : {
1855 : : /* Unable to cancel running query */
1856 : 0 : pgfdw_reset_xact_state(entry, toplevel);
1857 : 0 : continue;
1858 : : }
1859 : :
1860 : : /* Send an abort command in parallel if needed */
1861 [ # # ]: 0 : CONSTRUCT_ABORT_COMMAND(sql, entry, toplevel);
1862 [ # # ]: 0 : if (!pgfdw_exec_cleanup_query_begin(entry->conn, sql))
1863 : : {
1864 : : /* Unable to abort remote (sub)transaction */
1865 : 0 : pgfdw_reset_xact_state(entry, toplevel);
1866 : : }
1867 : : else
1868 : 0 : pending_entries = lappend(pending_entries, entry);
1869 : : }
1870 : : }
1871 : :
1872 : : /* No further work if no pending entries */
374 efujita@postgresql.o 1873 [ - + ]:CBC 4 : if (!pending_entries)
374 efujita@postgresql.o 1874 :UBC 0 : return;
1875 : :
1876 : : /*
1877 : : * Get the result of the abort command for each of the pending entries
1878 : : */
374 efujita@postgresql.o 1879 [ + - + + :CBC 12 : foreach(lc, pending_entries)
+ + ]
1880 : : {
1881 : 8 : ConnCacheEntry *entry = (ConnCacheEntry *) lfirst(lc);
1882 : : TimestampTz endtime;
1883 : : char sql[100];
1884 : :
1885 [ - + ]: 8 : Assert(entry->changing_xact_state);
1886 : :
1887 : : /*
1888 : : * Set end time. We do this now, not before issuing the command like
1889 : : * in normal mode, for the same reason as for the cancel_requested
1890 : : * entries.
1891 : : */
1892 : 8 : endtime = TimestampTzPlusMilliseconds(GetCurrentTimestamp(),
1893 : : CONNECTION_CLEANUP_TIMEOUT);
1894 : :
1895 [ + + ]: 8 : CONSTRUCT_ABORT_COMMAND(sql, entry, toplevel);
1896 [ - + ]: 8 : if (!pgfdw_exec_cleanup_query_end(entry->conn, sql, endtime,
1897 : : true, false))
1898 : : {
1899 : : /* Unable to abort remote (sub)transaction */
374 efujita@postgresql.o 1900 :UBC 0 : pgfdw_reset_xact_state(entry, toplevel);
1901 : 0 : continue;
1902 : : }
1903 : :
374 efujita@postgresql.o 1904 [ + + ]:CBC 8 : if (toplevel)
1905 : : {
1906 : : /* Do a DEALLOCATE ALL in parallel if needed */
1907 [ + - + - ]: 4 : if (entry->have_prep_stmt && entry->have_error)
1908 : : {
1909 [ - + ]: 4 : if (!pgfdw_exec_cleanup_query_begin(entry->conn,
1910 : : "DEALLOCATE ALL"))
1911 : : {
1912 : : /* Trouble clearing prepared statements */
374 efujita@postgresql.o 1913 :UBC 0 : pgfdw_reset_xact_state(entry, toplevel);
1914 : : }
1915 : : else
374 efujita@postgresql.o 1916 :CBC 4 : pending_deallocs = lappend(pending_deallocs, entry);
1917 : 4 : continue;
1918 : : }
374 efujita@postgresql.o 1919 :UBC 0 : entry->have_prep_stmt = false;
1920 : 0 : entry->have_error = false;
1921 : : }
1922 : :
1923 : : /* Reset the per-connection state if needed */
374 efujita@postgresql.o 1924 [ - + ]:CBC 4 : if (entry->state.pendingAreq)
374 efujita@postgresql.o 1925 :UBC 0 : memset(&entry->state, 0, sizeof(entry->state));
1926 : :
1927 : : /* We're done with this entry; unset the changing_xact_state flag */
374 efujita@postgresql.o 1928 :CBC 4 : entry->changing_xact_state = false;
1929 : 4 : pgfdw_reset_xact_state(entry, toplevel);
1930 : : }
1931 : :
1932 : : /* No further work if no pending entries */
1933 [ + + ]: 4 : if (!pending_deallocs)
1934 : 2 : return;
1935 [ - + ]: 2 : Assert(toplevel);
1936 : :
1937 : : /*
1938 : : * Get the result of the DEALLOCATE command for each of the pending
1939 : : * entries
1940 : : */
1941 [ + - + + : 6 : foreach(lc, pending_deallocs)
+ + ]
1942 : : {
1943 : 4 : ConnCacheEntry *entry = (ConnCacheEntry *) lfirst(lc);
1944 : : TimestampTz endtime;
1945 : :
1946 [ - + ]: 4 : Assert(entry->changing_xact_state);
1947 [ - + ]: 4 : Assert(entry->have_prep_stmt);
1948 [ - + ]: 4 : Assert(entry->have_error);
1949 : :
1950 : : /*
1951 : : * Set end time. We do this now, not before issuing the command like
1952 : : * in normal mode, for the same reason as for the cancel_requested
1953 : : * entries.
1954 : : */
1955 : 4 : endtime = TimestampTzPlusMilliseconds(GetCurrentTimestamp(),
1956 : : CONNECTION_CLEANUP_TIMEOUT);
1957 : :
1958 [ - + ]: 4 : if (!pgfdw_exec_cleanup_query_end(entry->conn, "DEALLOCATE ALL",
1959 : : endtime, true, true))
1960 : : {
1961 : : /* Trouble clearing prepared statements */
374 efujita@postgresql.o 1962 :UBC 0 : pgfdw_reset_xact_state(entry, toplevel);
1963 : 0 : continue;
1964 : : }
374 efujita@postgresql.o 1965 :CBC 4 : entry->have_prep_stmt = false;
1966 : 4 : entry->have_error = false;
1967 : :
1968 : : /* Reset the per-connection state if needed */
1969 [ - + ]: 4 : if (entry->state.pendingAreq)
374 efujita@postgresql.o 1970 :UBC 0 : memset(&entry->state, 0, sizeof(entry->state));
1971 : :
1972 : : /* We're done with this entry; unset the changing_xact_state flag */
374 efujita@postgresql.o 1973 :CBC 4 : entry->changing_xact_state = false;
1974 : 4 : pgfdw_reset_xact_state(entry, toplevel);
1975 : : }
1976 : : }
1977 : :
1978 : : /*
1979 : : * List active foreign server connections.
1980 : : *
1981 : : * This function takes no input parameter and returns setof record made of
1982 : : * following values:
1983 : : * - server_name - server name of active connection. In case the foreign server
1984 : : * is dropped but still the connection is active, then the server name will
1985 : : * be NULL in output.
1986 : : * - valid - true/false representing whether the connection is valid or not.
1987 : : * Note that the connections can get invalidated in pgfdw_inval_callback.
1988 : : *
1989 : : * No records are returned when there are no cached connections at all.
1990 : : */
1991 : : Datum
1182 fujii@postgresql.org 1992 : 11 : postgres_fdw_get_connections(PG_FUNCTION_ARGS)
1993 : : {
1994 : : #define POSTGRES_FDW_GET_CONNECTIONS_COLS 2
1995 : 11 : ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
1996 : : HASH_SEQ_STATUS scan;
1997 : : ConnCacheEntry *entry;
1998 : :
544 michael@paquier.xyz 1999 : 11 : InitMaterializedSRF(fcinfo, 0);
2000 : :
2001 : : /* If cache doesn't exist, we return no records */
1182 fujii@postgresql.org 2002 [ - + ]: 11 : if (!ConnectionHash)
1182 fujii@postgresql.org 2003 :UBC 0 : PG_RETURN_VOID();
2004 : :
1182 fujii@postgresql.org 2005 :CBC 11 : hash_seq_init(&scan, ConnectionHash);
2006 [ + + ]: 93 : while ((entry = (ConnCacheEntry *) hash_seq_search(&scan)))
2007 : : {
2008 : : ForeignServer *server;
638 peter@eisentraut.org 2009 : 82 : Datum values[POSTGRES_FDW_GET_CONNECTIONS_COLS] = {0};
2010 : 82 : bool nulls[POSTGRES_FDW_GET_CONNECTIONS_COLS] = {0};
2011 : :
2012 : : /* We only look for open remote connections */
1182 fujii@postgresql.org 2013 [ + + ]: 82 : if (!entry->conn)
2014 : 71 : continue;
2015 : :
2016 : 11 : server = GetForeignServerExtended(entry->serverid, FSV_MISSING_OK);
2017 : :
2018 : : /*
2019 : : * The foreign server may have been dropped in current explicit
2020 : : * transaction. It is not possible to drop the server from another
2021 : : * session when the connection associated with it is in use in the
2022 : : * current transaction, if tried so, the drop query in another session
2023 : : * blocks until the current transaction finishes.
2024 : : *
2025 : : * Even though the server is dropped in the current transaction, the
2026 : : * cache can still have associated active connection entry, say we
2027 : : * call such connections dangling. Since we can not fetch the server
2028 : : * name from system catalogs for dangling connections, instead we show
2029 : : * NULL value for server name in output.
2030 : : *
2031 : : * We could have done better by storing the server name in the cache
2032 : : * entry instead of server oid so that it could be used in the output.
2033 : : * But the server name in each cache entry requires 64 bytes of
2034 : : * memory, which is huge, when there are many cached connections and
2035 : : * the use case i.e. dropping the foreign server within the explicit
2036 : : * current transaction seems rare. So, we chose to show NULL value for
2037 : : * server name in output.
2038 : : *
2039 : : * Such dangling connections get closed either in next use or at the
2040 : : * end of current explicit transaction in pgfdw_xact_callback.
2041 : : */
2042 [ + + ]: 11 : if (!server)
2043 : : {
2044 : : /*
2045 : : * If the server has been dropped in the current explicit
2046 : : * transaction, then this entry would have been invalidated in
2047 : : * pgfdw_inval_callback at the end of drop server command. Note
2048 : : * that this connection would not have been closed in
2049 : : * pgfdw_inval_callback because it is still being used in the
2050 : : * current explicit transaction. So, assert that here.
2051 : : */
2052 [ + - + - : 1 : Assert(entry->conn && entry->xact_depth > 0 && entry->invalidated);
- + ]
2053 : :
2054 : : /* Show null, if no server name was found */
2055 : 1 : nulls[0] = true;
2056 : : }
2057 : : else
2058 : 10 : values[0] = CStringGetTextDatum(server->servername);
2059 : :
2060 : 11 : values[1] = BoolGetDatum(!entry->invalidated);
2061 : :
768 michael@paquier.xyz 2062 : 11 : tuplestore_putvalues(rsinfo->setResult, rsinfo->setDesc, values, nulls);
2063 : : }
2064 : :
1182 fujii@postgresql.org 2065 : 11 : PG_RETURN_VOID();
2066 : : }
2067 : :
2068 : : /*
2069 : : * Disconnect the specified cached connections.
2070 : : *
2071 : : * This function discards the open connections that are established by
2072 : : * postgres_fdw from the local session to the foreign server with
2073 : : * the given name. Note that there can be multiple connections to
2074 : : * the given server using different user mappings. If the connections
2075 : : * are used in the current local transaction, they are not disconnected
2076 : : * and warning messages are reported. This function returns true
2077 : : * if it disconnects at least one connection, otherwise false. If no
2078 : : * foreign server with the given name is found, an error is reported.
2079 : : */
2080 : : Datum
1174 2081 : 4 : postgres_fdw_disconnect(PG_FUNCTION_ARGS)
2082 : : {
2083 : : ForeignServer *server;
2084 : : char *servername;
2085 : :
2086 : 4 : servername = text_to_cstring(PG_GETARG_TEXT_PP(0));
2087 : 4 : server = GetForeignServerByName(servername, false);
2088 : :
2089 : 3 : PG_RETURN_BOOL(disconnect_cached_connections(server->serverid));
2090 : : }
2091 : :
2092 : : /*
2093 : : * Disconnect all the cached connections.
2094 : : *
2095 : : * This function discards all the open connections that are established by
2096 : : * postgres_fdw from the local session to the foreign servers.
2097 : : * If the connections are used in the current local transaction, they are
2098 : : * not disconnected and warning messages are reported. This function
2099 : : * returns true if it disconnects at least one connection, otherwise false.
2100 : : */
2101 : : Datum
2102 : 4 : postgres_fdw_disconnect_all(PG_FUNCTION_ARGS)
2103 : : {
2104 : 4 : PG_RETURN_BOOL(disconnect_cached_connections(InvalidOid));
2105 : : }
2106 : :
2107 : : /*
2108 : : * Workhorse to disconnect cached connections.
2109 : : *
2110 : : * This function scans all the connection cache entries and disconnects
2111 : : * the open connections whose foreign server OID matches with
2112 : : * the specified one. If InvalidOid is specified, it disconnects all
2113 : : * the cached connections.
2114 : : *
2115 : : * This function emits a warning for each connection that's used in
2116 : : * the current transaction and doesn't close it. It returns true if
2117 : : * it disconnects at least one connection, otherwise false.
2118 : : *
2119 : : * Note that this function disconnects even the connections that are
2120 : : * established by other users in the same local session using different
2121 : : * user mappings. This leads even non-superuser to be able to close
2122 : : * the connections established by superusers in the same local session.
2123 : : *
2124 : : * XXX As of now we don't see any security risk doing this. But we should
2125 : : * set some restrictions on that, for example, prevent non-superuser
2126 : : * from closing the connections established by superusers even
2127 : : * in the same session?
2128 : : */
2129 : : static bool
2130 : 7 : disconnect_cached_connections(Oid serverid)
2131 : : {
2132 : : HASH_SEQ_STATUS scan;
2133 : : ConnCacheEntry *entry;
2134 : 7 : bool all = !OidIsValid(serverid);
2135 : 7 : bool result = false;
2136 : :
2137 : : /*
2138 : : * Connection cache hashtable has not been initialized yet in this
2139 : : * session, so return false.
2140 : : */
2141 [ - + ]: 7 : if (!ConnectionHash)
1174 fujii@postgresql.org 2142 :UBC 0 : return false;
2143 : :
1174 fujii@postgresql.org 2144 :CBC 7 : hash_seq_init(&scan, ConnectionHash);
2145 [ + + ]: 57 : while ((entry = (ConnCacheEntry *) hash_seq_search(&scan)))
2146 : : {
2147 : : /* Ignore cache entry if no open connection right now. */
2148 [ + + ]: 50 : if (!entry->conn)
2149 : 39 : continue;
2150 : :
2151 [ + + + + ]: 11 : if (all || entry->serverid == serverid)
2152 : : {
2153 : : /*
2154 : : * Emit a warning because the connection to close is used in the
2155 : : * current transaction and cannot be disconnected right now.
2156 : : */
2157 [ + + ]: 8 : if (entry->xact_depth > 0)
2158 : : {
2159 : : ForeignServer *server;
2160 : :
2161 : 3 : server = GetForeignServerExtended(entry->serverid,
2162 : : FSV_MISSING_OK);
2163 : :
2164 [ - + ]: 3 : if (!server)
2165 : : {
2166 : : /*
2167 : : * If the foreign server was dropped while its connection
2168 : : * was used in the current transaction, the connection
2169 : : * must have been marked as invalid by
2170 : : * pgfdw_inval_callback at the end of DROP SERVER command.
2171 : : */
1174 fujii@postgresql.org 2172 [ # # ]:UBC 0 : Assert(entry->invalidated);
2173 : :
2174 [ # # ]: 0 : ereport(WARNING,
2175 : : (errmsg("cannot close dropped server connection because it is still in use")));
2176 : : }
2177 : : else
1174 fujii@postgresql.org 2178 [ + - ]:CBC 3 : ereport(WARNING,
2179 : : (errmsg("cannot close connection for server \"%s\" because it is still in use",
2180 : : server->servername)));
2181 : : }
2182 : : else
2183 : : {
2184 [ - + ]: 5 : elog(DEBUG3, "discarding connection %p", entry->conn);
2185 : 5 : disconnect_pg_server(entry);
2186 : 5 : result = true;
2187 : : }
2188 : : }
2189 : : }
2190 : :
2191 : 7 : return result;
2192 : : }
|