Age Owner Branch data TLA Line data Source code
1 : : /*-------------------------------------------------------------------------
2 : : *
3 : : * libpq-be-fe-helpers.h
4 : : * Helper functions for using libpq in extensions
5 : : *
6 : : * Code built directly into the backend is not allowed to link to libpq
7 : : * directly. Extension code is allowed to use libpq however. However, libpq
8 : : * used in extensions has to be careful not to block inside libpq, otherwise
9 : : * interrupts will not be processed, leading to issues like unresolvable
10 : : * deadlocks. Backend code also needs to take care to acquire/release an
11 : : * external fd for the connection, otherwise fd.c's accounting of fd's is
12 : : * broken.
13 : : *
14 : : * This file provides helper functions to make it easier to comply with these
15 : : * rules. It is a header only library as it needs to be linked into each
16 : : * extension using libpq, and it seems too small to be worth adding a
17 : : * dedicated static library for.
18 : : *
19 : : * TODO: For historical reasons the connections established here are not put
20 : : * into non-blocking mode. That can lead to blocking even when only the async
21 : : * libpq functions are used. This should be fixed.
22 : : *
23 : : * Portions Copyright (c) 1996-2024, PostgreSQL Global Development Group
24 : : * Portions Copyright (c) 1994, Regents of the University of California
25 : : *
26 : : * src/include/libpq/libpq-be-fe-helpers.h
27 : : *
28 : : *-------------------------------------------------------------------------
29 : : */
30 : : #ifndef LIBPQ_BE_FE_HELPERS_H
31 : : #define LIBPQ_BE_FE_HELPERS_H
32 : :
33 : : /*
34 : : * Despite the name, BUILDING_DLL is set only when building code directly part
35 : : * of the backend. Which also is where libpq isn't allowed to be
36 : : * used. Obviously this doesn't protect against libpq-fe.h getting included
37 : : * otherwise, but perhaps still protects against a few mistakes...
38 : : */
39 : : #ifdef BUILDING_DLL
40 : : #error "libpq may not be used code directly built into the backend"
41 : : #endif
42 : :
43 : : #include "libpq-fe.h"
44 : : #include "miscadmin.h"
45 : : #include "storage/fd.h"
46 : : #include "storage/latch.h"
47 : : #include "utils/timestamp.h"
48 : : #include "utils/wait_event.h"
49 : :
50 : :
51 : : static inline void libpqsrv_connect_prepare(void);
52 : : static inline void libpqsrv_connect_internal(PGconn *conn, uint32 wait_event_info);
53 : : static inline PGresult *libpqsrv_get_result_last(PGconn *conn, uint32 wait_event_info);
54 : : static inline PGresult *libpqsrv_get_result(PGconn *conn, uint32 wait_event_info);
55 : :
56 : :
57 : : /*
58 : : * PQconnectdb() wrapper that reserves a file descriptor and processes
59 : : * interrupts during connection establishment.
60 : : *
61 : : * Throws an error if AcquireExternalFD() fails, but does not throw if
62 : : * connection establishment itself fails. Callers need to use PQstatus() to
63 : : * check if connection establishment succeeded.
64 : : */
65 : : static inline PGconn *
447 andres@anarazel.de 66 :CBC 24 : libpqsrv_connect(const char *conninfo, uint32 wait_event_info)
67 : : {
68 : 24 : PGconn *conn = NULL;
69 : :
70 : 24 : libpqsrv_connect_prepare();
71 : :
72 : 24 : conn = PQconnectStart(conninfo);
73 : :
74 : 24 : libpqsrv_connect_internal(conn, wait_event_info);
75 : :
76 : 24 : return conn;
77 : : }
78 : :
79 : : /*
80 : : * Like libpqsrv_connect(), except that this is a wrapper for
81 : : * PQconnectdbParams().
82 : : */
83 : : static inline PGconn *
84 : 66 : libpqsrv_connect_params(const char *const *keywords,
85 : : const char *const *values,
86 : : int expand_dbname,
87 : : uint32 wait_event_info)
88 : : {
89 : 66 : PGconn *conn = NULL;
90 : :
91 : 66 : libpqsrv_connect_prepare();
92 : :
93 : 66 : conn = PQconnectStartParams(keywords, values, expand_dbname);
94 : :
95 : 66 : libpqsrv_connect_internal(conn, wait_event_info);
96 : :
97 : 66 : return conn;
98 : : }
99 : :
100 : : /*
101 : : * PQfinish() wrapper that additionally releases the reserved file descriptor.
102 : : *
103 : : * It is allowed to call this with a NULL pgconn iff NULL was returned by
104 : : * libpqsrv_connect*.
105 : : */
106 : : static inline void
107 : 88 : libpqsrv_disconnect(PGconn *conn)
108 : : {
109 : : /*
110 : : * If no connection was established, we haven't reserved an FD for it (or
111 : : * already released it). This rule makes it easier to write PG_CATCH()
112 : : * handlers for this facility's users.
113 : : *
114 : : * See also libpqsrv_connect_internal().
115 : : */
116 [ + + ]: 88 : if (conn == NULL)
117 : 4 : return;
118 : :
119 : 84 : ReleaseExternalFD();
120 : 84 : PQfinish(conn);
121 : : }
122 : :
123 : :
124 : : /* internal helper functions follow */
125 : :
126 : :
127 : : /*
128 : : * Helper function for all connection establishment functions.
129 : : */
130 : : static inline void
131 : 90 : libpqsrv_connect_prepare(void)
132 : : {
133 : : /*
134 : : * We must obey fd.c's limit on non-virtual file descriptors. Assume that
135 : : * a PGconn represents one long-lived FD. (Doing this here also ensures
136 : : * that VFDs are closed if needed to make room.)
137 : : */
138 [ - + ]: 90 : if (!AcquireExternalFD())
139 : : {
140 : : #ifndef WIN32 /* can't write #if within ereport() macro */
447 andres@anarazel.de 141 [ # # ]:UBC 0 : ereport(ERROR,
142 : : (errcode(ERRCODE_SQLCLIENT_UNABLE_TO_ESTABLISH_SQLCONNECTION),
143 : : errmsg("could not establish connection"),
144 : : errdetail("There are too many open files on the local server."),
145 : : errhint("Raise the server's max_files_per_process and/or \"ulimit -n\" limits.")));
146 : : #else
147 : : ereport(ERROR,
148 : : (errcode(ERRCODE_SQLCLIENT_UNABLE_TO_ESTABLISH_SQLCONNECTION),
149 : : errmsg("could not establish connection"),
150 : : errdetail("There are too many open files on the local server."),
151 : : errhint("Raise the server's max_files_per_process setting.")));
152 : : #endif
153 : : }
447 andres@anarazel.de 154 :CBC 90 : }
155 : :
156 : : /*
157 : : * Helper function for all connection establishment functions.
158 : : */
159 : : static inline void
160 : 90 : libpqsrv_connect_internal(PGconn *conn, uint32 wait_event_info)
161 : : {
162 : : /*
163 : : * With conn == NULL libpqsrv_disconnect() wouldn't release the FD. So do
164 : : * that here.
165 : : */
166 [ - + ]: 90 : if (conn == NULL)
167 : : {
447 andres@anarazel.de 168 :UBC 0 : ReleaseExternalFD();
169 : 0 : return;
170 : : }
171 : :
172 : : /*
173 : : * Can't wait without a socket. Note that we don't want to close the libpq
174 : : * connection yet, so callers can emit a useful error.
175 : : */
447 andres@anarazel.de 176 [ + + ]:CBC 90 : if (PQstatus(conn) == CONNECTION_BAD)
177 : 2 : return;
178 : :
179 : : /*
180 : : * WaitLatchOrSocket() can conceivably fail, handle that case here instead
181 : : * of requiring all callers to do so.
182 : : */
183 [ + - ]: 88 : PG_TRY();
184 : : {
185 : : PostgresPollingStatusType status;
186 : :
187 : : /*
188 : : * Poll connection until we have OK or FAILED status.
189 : : *
190 : : * Per spec for PQconnectPoll, first wait till socket is write-ready.
191 : : */
192 : 88 : status = PGRES_POLLING_WRITING;
193 [ + + + + ]: 390 : while (status != PGRES_POLLING_OK && status != PGRES_POLLING_FAILED)
194 : : {
195 : : int io_flag;
196 : : int rc;
197 : :
198 [ + + ]: 214 : if (status == PGRES_POLLING_READING)
199 : 108 : io_flag = WL_SOCKET_READABLE;
200 : : #ifdef WIN32
201 : :
202 : : /*
203 : : * Windows needs a different test while waiting for
204 : : * connection-made
205 : : */
206 : : else if (PQstatus(conn) == CONNECTION_STARTED)
207 : : io_flag = WL_SOCKET_CONNECTED;
208 : : #endif
209 : : else
210 : 106 : io_flag = WL_SOCKET_WRITEABLE;
211 : :
212 : 214 : rc = WaitLatchOrSocket(MyLatch,
213 : : WL_EXIT_ON_PM_DEATH | WL_LATCH_SET | io_flag,
214 : : PQsocket(conn),
215 : : 0,
216 : : wait_event_info);
217 : :
218 : : /* Interrupted? */
219 [ - + ]: 214 : if (rc & WL_LATCH_SET)
220 : : {
447 andres@anarazel.de 221 :LBC (2) : ResetLatch(MyLatch);
222 [ # # ]: (2) : CHECK_FOR_INTERRUPTS();
223 : : }
224 : :
225 : : /* If socket is ready, advance the libpq state machine */
447 andres@anarazel.de 226 [ + - ]:CBC 214 : if (rc & io_flag)
227 : 214 : status = PQconnectPoll(conn);
228 : : }
229 : : }
447 andres@anarazel.de 230 :UBC 0 : PG_CATCH();
231 : : {
232 : : /*
233 : : * If an error is thrown here, the callers won't call
234 : : * libpqsrv_disconnect() with a conn, so release resources
235 : : * immediately.
236 : : */
237 : 0 : ReleaseExternalFD();
238 : 0 : PQfinish(conn);
239 : :
240 : 0 : PG_RE_THROW();
241 : : }
447 andres@anarazel.de 242 [ - + ]:CBC 88 : PG_END_TRY();
243 : : }
244 : :
245 : : /*
246 : : * PQexec() wrapper that processes interrupts.
247 : : *
248 : : * Unless PQsetnonblocking(conn, 1) is in effect, this can't process
249 : : * interrupts while pushing the query text to the server. Consider that
250 : : * setting if query strings can be long relative to TCP buffer size.
251 : : *
252 : : * This has the preconditions of PQsendQuery(), not those of PQexec(). Most
253 : : * notably, PQexec() would silently discard any prior query results.
254 : : */
255 : : static inline PGresult *
97 noah@leadboat.com 256 :GNC 62 : libpqsrv_exec(PGconn *conn, const char *query, uint32 wait_event_info)
257 : : {
258 [ - + ]: 62 : if (!PQsendQuery(conn, query))
97 noah@leadboat.com 259 :UNC 0 : return NULL;
97 noah@leadboat.com 260 :GNC 62 : return libpqsrv_get_result_last(conn, wait_event_info);
261 : : }
262 : :
263 : : /*
264 : : * PQexecParams() wrapper that processes interrupts.
265 : : *
266 : : * See notes at libpqsrv_exec().
267 : : */
268 : : static inline PGresult *
269 : : libpqsrv_exec_params(PGconn *conn,
270 : : const char *command,
271 : : int nParams,
272 : : const Oid *paramTypes,
273 : : const char *const *paramValues,
274 : : const int *paramLengths,
275 : : const int *paramFormats,
276 : : int resultFormat,
277 : : uint32 wait_event_info)
278 : : {
279 : : if (!PQsendQueryParams(conn, command, nParams, paramTypes, paramValues,
280 : : paramLengths, paramFormats, resultFormat))
281 : : return NULL;
282 : : return libpqsrv_get_result_last(conn, wait_event_info);
283 : : }
284 : :
285 : : /*
286 : : * Like PQexec(), loop over PQgetResult() until it returns NULL or another
287 : : * terminal state. Return the last non-NULL result or the terminal state.
288 : : */
289 : : static inline PGresult *
290 : 7886 : libpqsrv_get_result_last(PGconn *conn, uint32 wait_event_info)
291 : : {
292 : 7886 : PGresult *volatile lastResult = NULL;
293 : :
294 : : /* In what follows, do not leak any PGresults on an error. */
295 [ + + ]: 7886 : PG_TRY();
296 : : {
297 : : for (;;)
298 : 7885 : {
299 : : /* Wait for, and collect, the next PGresult. */
300 : : PGresult *result;
301 : :
302 : 15771 : result = libpqsrv_get_result(conn, wait_event_info);
303 [ + + ]: 15770 : if (result == NULL)
304 : 7883 : break; /* query is complete, or failure */
305 : :
306 : : /*
307 : : * Emulate PQexec()'s behavior of returning the last result when
308 : : * there are many.
309 : : */
310 : 7887 : PQclear(lastResult);
311 : 7887 : lastResult = result;
312 : :
313 [ + - ]: 7887 : if (PQresultStatus(lastResult) == PGRES_COPY_IN ||
314 [ + - ]: 7887 : PQresultStatus(lastResult) == PGRES_COPY_OUT ||
315 [ + - ]: 7887 : PQresultStatus(lastResult) == PGRES_COPY_BOTH ||
316 [ + + ]: 7887 : PQstatus(conn) == CONNECTION_BAD)
317 : : break;
318 : : }
319 : : }
320 : 1 : PG_CATCH();
321 : : {
322 : 1 : PQclear(lastResult);
323 : 1 : PG_RE_THROW();
324 : : }
325 [ - + ]: 7885 : PG_END_TRY();
326 : :
327 : 7885 : return lastResult;
328 : : }
329 : :
330 : : /*
331 : : * Perform the equivalent of PQgetResult(), but watch for interrupts.
332 : : */
333 : : static inline PGresult *
334 : 15959 : libpqsrv_get_result(PGconn *conn, uint32 wait_event_info)
335 : : {
336 : : /*
337 : : * Collect data until PQgetResult is ready to get the result without
338 : : * blocking.
339 : : */
340 [ + + ]: 23789 : while (PQisBusy(conn))
341 : : {
342 : : int rc;
343 : :
344 : 7833 : rc = WaitLatchOrSocket(MyLatch,
345 : : WL_EXIT_ON_PM_DEATH | WL_LATCH_SET |
346 : : WL_SOCKET_READABLE,
347 : : PQsocket(conn),
348 : : 0,
349 : : wait_event_info);
350 : :
351 : : /* Interrupted? */
352 [ + + ]: 7833 : if (rc & WL_LATCH_SET)
353 : : {
354 : 3 : ResetLatch(MyLatch);
355 [ + + ]: 3 : CHECK_FOR_INTERRUPTS();
356 : : }
357 : :
358 : : /* Consume whatever data is available from the socket */
359 [ + + ]: 7832 : if (PQconsumeInput(conn) == 0)
360 : : {
361 : : /* trouble; expect PQgetResult() to return NULL */
362 : 2 : break;
363 : : }
364 : : }
365 : :
366 : : /* Now we can collect and return the next PGresult */
367 : 15958 : return PQgetResult(conn);
368 : : }
369 : :
370 : : /*
371 : : * Submit a cancel request to the given connection, waiting only until
372 : : * the given time.
373 : : *
374 : : * We sleep interruptibly until we receive confirmation that the cancel
375 : : * request has been accepted, and if it is, return NULL; if the cancel
376 : : * request fails, return an error message string (which is not to be
377 : : * freed).
378 : : *
379 : : * For other problems (to wit: OOM when strdup'ing an error message from
380 : : * libpq), this function can ereport(ERROR).
381 : : *
382 : : * Note: this function leaks a string's worth of memory when reporting
383 : : * libpq errors. Make sure to call it in a transient memory context.
384 : : */
385 : : static inline const char *
17 alvherre@alvh.no-ip. 386 : 3 : libpqsrv_cancel(PGconn *conn, TimestampTz endtime)
387 : : {
388 : : PGcancelConn *cancel_conn;
9 389 : 3 : const char *error = NULL;
390 : :
17 391 : 3 : cancel_conn = PQcancelCreate(conn);
392 [ - + ]: 3 : if (cancel_conn == NULL)
17 alvherre@alvh.no-ip. 393 :UNC 0 : return "out of memory";
394 : :
395 : : /* In what follows, do not leak any PGcancelConn on any errors. */
396 : :
17 alvherre@alvh.no-ip. 397 [ + - ]:GNC 3 : PG_TRY();
398 : : {
399 [ - + ]: 3 : if (!PQcancelStart(cancel_conn))
400 : : {
17 alvherre@alvh.no-ip. 401 :UNC 0 : error = pchomp(PQcancelErrorMessage(cancel_conn));
402 : 0 : goto exit;
403 : : }
404 : :
405 : : for (;;)
17 alvherre@alvh.no-ip. 406 :GNC 3 : {
407 : : PostgresPollingStatusType pollres;
408 : : TimestampTz now;
409 : : long cur_timeout;
410 : 6 : int waitEvents = WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH;
411 : :
412 : 6 : pollres = PQcancelPoll(cancel_conn);
413 [ + + ]: 6 : if (pollres == PGRES_POLLING_OK)
414 : 3 : break; /* success! */
415 : :
416 : : /* If timeout has expired, give up, else get sleep time. */
417 : 3 : now = GetCurrentTimestamp();
418 : 3 : cur_timeout = TimestampDifferenceMilliseconds(now, endtime);
419 [ - + ]: 3 : if (cur_timeout <= 0)
420 : : {
17 alvherre@alvh.no-ip. 421 :UNC 0 : error = "cancel request timed out";
422 : 0 : break;
423 : : }
424 : :
17 alvherre@alvh.no-ip. 425 [ + - - ]:GNC 3 : switch (pollres)
426 : : {
427 : 3 : case PGRES_POLLING_READING:
428 : 3 : waitEvents |= WL_SOCKET_READABLE;
429 : 3 : break;
17 alvherre@alvh.no-ip. 430 :UNC 0 : case PGRES_POLLING_WRITING:
431 : 0 : waitEvents |= WL_SOCKET_WRITEABLE;
432 : 0 : break;
433 : 0 : default:
434 : 0 : error = pchomp(PQcancelErrorMessage(cancel_conn));
435 : 0 : goto exit;
436 : : }
437 : :
438 : : /* Sleep until there's something to do */
17 alvherre@alvh.no-ip. 439 :GNC 3 : WaitLatchOrSocket(MyLatch, waitEvents, PQcancelSocket(cancel_conn),
440 : : cur_timeout, PG_WAIT_CLIENT);
441 : :
442 : 3 : ResetLatch(MyLatch);
443 : :
444 [ - + ]: 3 : CHECK_FOR_INTERRUPTS();
445 : : }
446 : 3 : exit: ;
447 : : }
17 alvherre@alvh.no-ip. 448 :UNC 0 : PG_FINALLY();
449 : : {
17 alvherre@alvh.no-ip. 450 :GNC 3 : PQcancelFinish(cancel_conn);
451 : : }
452 [ - + ]: 3 : PG_END_TRY();
453 : :
454 : 3 : return error;
455 : : }
456 : :
457 : : #endif /* LIBPQ_BE_FE_HELPERS_H */
|