Age Owner Branch data TLA Line data Source code
1 : : /*-------------------------------------------------------------------------
2 : : *
3 : : * parallel_slot.c
4 : : * Parallel support for front-end parallel database connections
5 : : *
6 : : *
7 : : * Portions Copyright (c) 1996-2024, PostgreSQL Global Development Group
8 : : * Portions Copyright (c) 1994, Regents of the University of California
9 : : *
10 : : * src/fe_utils/parallel_slot.c
11 : : *
12 : : *-------------------------------------------------------------------------
13 : : */
14 : :
15 : : #if defined(WIN32) && FD_SETSIZE < 1024
16 : : #error FD_SETSIZE needs to have been increased
17 : : #endif
18 : :
19 : : #include "postgres_fe.h"
20 : :
21 : : #include <sys/select.h>
22 : :
23 : : #include "common/logging.h"
24 : : #include "fe_utils/cancel.h"
25 : : #include "fe_utils/parallel_slot.h"
26 : : #include "fe_utils/query_utils.h"
27 : :
28 : : #define ERRCODE_UNDEFINED_TABLE "42P01"
29 : :
30 : : static int select_loop(int maxFd, fd_set *workerset);
31 : : static bool processQueryResult(ParallelSlot *slot, PGresult *result);
32 : :
33 : : /*
34 : : * Process (and delete) a query result. Returns true if there's no problem,
35 : : * false otherwise. It's up to the handler to decide what constitutes a
36 : : * problem.
37 : : */
38 : : static bool
1164 rhaas@postgresql.org 39 :CBC 11319 : processQueryResult(ParallelSlot *slot, PGresult *result)
40 : : {
41 [ - + ]: 11319 : Assert(slot->handler != NULL);
42 : :
43 : : /* On failure, the handler should return NULL after freeing the result */
44 [ + + ]: 11319 : if (!slot->handler(result, slot->connection, slot->handler_context))
45 : 6 : return false;
46 : :
47 : : /* Ok, we have to free it ourself */
48 : 11313 : PQclear(result);
49 : 11313 : return true;
50 : : }
51 : :
52 : : /*
53 : : * Consume all the results generated for the given connection until
54 : : * nothing remains. If at least one error is encountered, return false.
55 : : * Note that this will block if the connection is busy.
56 : : */
57 : : static bool
58 : 203 : consumeQueryResult(ParallelSlot *slot)
59 : : {
60 : 203 : bool ok = true;
61 : : PGresult *result;
62 : :
63 : 203 : SetCancelConn(slot->connection);
64 [ + + ]: 406 : while ((result = PQgetResult(slot->connection)) != NULL)
65 : : {
66 [ + + ]: 203 : if (!processQueryResult(slot, result))
67 : 6 : ok = false;
68 : : }
69 : 203 : ResetCancelConn();
70 : 203 : return ok;
71 : : }
72 : :
73 : : /*
74 : : * Wait until a file descriptor from the given set becomes readable.
75 : : *
76 : : * Returns the number of ready descriptors, or -1 on failure (including
77 : : * getting a cancel request).
78 : : */
79 : : static int
1407 tgl@sss.pgh.pa.us 80 : 11164 : select_loop(int maxFd, fd_set *workerset)
81 : : {
82 : : int i;
1731 michael@paquier.xyz 83 : 11164 : fd_set saveSet = *workerset;
84 : :
85 [ - + ]: 11164 : if (CancelRequested)
1731 michael@paquier.xyz 86 :UBC 0 : return -1;
87 : :
88 : : for (;;)
89 : 0 : {
90 : : /*
91 : : * On Windows, we need to check once in a while for cancel requests;
92 : : * on other platforms we rely on select() returning when interrupted.
93 : : */
94 : : struct timeval *tvp;
95 : : #ifdef WIN32
96 : : struct timeval tv = {0, 1000000};
97 : :
98 : : tvp = &tv;
99 : : #else
1731 michael@paquier.xyz 100 :CBC 11164 : tvp = NULL;
101 : : #endif
102 : :
103 : 11164 : *workerset = saveSet;
104 : 11164 : i = select(maxFd + 1, workerset, NULL, NULL, tvp);
105 : :
106 : : #ifdef WIN32
107 : : if (i == SOCKET_ERROR)
108 : : {
109 : : i = -1;
110 : :
111 : : if (WSAGetLastError() == WSAEINTR)
112 : : errno = EINTR;
113 : : }
114 : : #endif
115 : :
116 [ - + - - ]: 11164 : if (i < 0 && errno == EINTR)
1731 michael@paquier.xyz 117 :UBC 0 : continue; /* ignore this */
1731 michael@paquier.xyz 118 [ + - - + ]:CBC 11164 : if (i < 0 || CancelRequested)
1407 tgl@sss.pgh.pa.us 119 :UBC 0 : return -1; /* but not this */
1731 michael@paquier.xyz 120 [ - + ]:CBC 11164 : if (i == 0)
1731 michael@paquier.xyz 121 :UBC 0 : continue; /* timeout (Win32 only) */
1731 michael@paquier.xyz 122 :CBC 11164 : break;
123 : : }
124 : :
125 : 11164 : return i;
126 : : }
127 : :
128 : : /*
129 : : * Return the offset of a suitable idle slot, or -1 if none are available. If
130 : : * the given dbname is not null, only idle slots connected to the given
131 : : * database are considered suitable, otherwise all idle connected slots are
132 : : * considered suitable.
133 : : */
134 : : static int
1130 rhaas@postgresql.org 135 : 22482 : find_matching_idle_slot(const ParallelSlotArray *sa, const char *dbname)
136 : : {
137 : : int i;
138 : :
139 [ + + ]: 33773 : for (i = 0; i < sa->numslots; i++)
140 : : {
141 [ + + ]: 22588 : if (sa->slots[i].inUse)
142 : 11270 : continue;
143 : :
144 [ + + ]: 11318 : if (sa->slots[i].connection == NULL)
145 : 7 : continue;
146 : :
147 [ + + ]: 11311 : if (dbname == NULL ||
148 [ + + ]: 7779 : strcmp(PQdb(sa->slots[i].connection), dbname) == 0)
149 : 11297 : return i;
150 : : }
151 : 11185 : return -1;
152 : : }
153 : :
154 : : /*
155 : : * Return the offset of the first slot without a database connection, or -1 if
156 : : * all slots are connected.
157 : : */
158 : : static int
159 : 11327 : find_unconnected_slot(const ParallelSlotArray *sa)
160 : : {
161 : : int i;
162 : :
163 [ + + ]: 22580 : for (i = 0; i < sa->numslots; i++)
164 : : {
165 [ + + ]: 11402 : if (sa->slots[i].inUse)
166 : 11239 : continue;
167 : :
168 [ + + ]: 163 : if (sa->slots[i].connection == NULL)
169 : 149 : return i;
170 : : }
171 : :
172 : 11178 : return -1;
173 : : }
174 : :
175 : : /*
176 : : * Return the offset of the first idle slot, or -1 if all slots are busy.
177 : : */
178 : : static int
179 : 11178 : find_any_idle_slot(const ParallelSlotArray *sa)
180 : : {
181 : : int i;
182 : :
183 [ + + ]: 22413 : for (i = 0; i < sa->numslots; i++)
184 [ + + ]: 11249 : if (!sa->slots[i].inUse)
185 : 14 : return i;
186 : :
187 : 11164 : return -1;
188 : : }
189 : :
190 : : /*
191 : : * Wait for any slot's connection to have query results, consume the results,
192 : : * and update the slot's status as appropriate. Returns true on success,
193 : : * false on cancellation, on error, or if no slots are connected.
194 : : */
195 : : static bool
196 : 11164 : wait_on_slots(ParallelSlotArray *sa)
197 : : {
198 : : int i;
199 : : fd_set slotset;
200 : 11164 : int maxFd = 0;
201 : 11164 : PGconn *cancelconn = NULL;
202 : :
203 : : /* We must reconstruct the fd_set for each call to select_loop */
204 [ + + ]: 189788 : FD_ZERO(&slotset);
205 : :
206 [ + + ]: 22399 : for (i = 0; i < sa->numslots; i++)
207 : : {
208 : : int sock;
209 : :
210 : : /* We shouldn't get here if we still have slots without connections */
211 [ - + ]: 11235 : Assert(sa->slots[i].connection != NULL);
212 : :
213 : 11235 : sock = PQsocket(sa->slots[i].connection);
214 : :
215 : : /*
216 : : * We don't really expect any connections to lose their sockets after
217 : : * startup, but just in case, cope by ignoring them.
218 : : */
219 [ - + ]: 11235 : if (sock < 0)
1130 rhaas@postgresql.org 220 :UBC 0 : continue;
221 : :
222 : : /* Keep track of the first valid connection we see. */
1130 rhaas@postgresql.org 223 [ + + ]:CBC 11235 : if (cancelconn == NULL)
224 : 11164 : cancelconn = sa->slots[i].connection;
225 : :
226 : 11235 : FD_SET(sock, &slotset);
227 [ + - ]: 11235 : if (sock > maxFd)
228 : 11235 : maxFd = sock;
229 : : }
230 : :
231 : : /*
232 : : * If we get this far with no valid connections, processing cannot
233 : : * continue.
234 : : */
235 [ - + ]: 11164 : if (cancelconn == NULL)
1130 rhaas@postgresql.org 236 :UBC 0 : return false;
237 : :
596 michael@paquier.xyz 238 :CBC 11164 : SetCancelConn(cancelconn);
1130 rhaas@postgresql.org 239 : 11164 : i = select_loop(maxFd, &slotset);
240 : 11164 : ResetCancelConn();
241 : :
242 : : /* failure? */
243 [ - + ]: 11164 : if (i < 0)
1130 rhaas@postgresql.org 244 :UBC 0 : return false;
245 : :
1130 rhaas@postgresql.org 246 [ + + ]:CBC 22399 : for (i = 0; i < sa->numslots; i++)
247 : : {
248 : : int sock;
249 : :
250 : 11235 : sock = PQsocket(sa->slots[i].connection);
251 : :
252 [ + - + + ]: 11235 : if (sock >= 0 && FD_ISSET(sock, &slotset))
253 : : {
254 : : /* select() says input is available, so consume it */
255 : 11165 : PQconsumeInput(sa->slots[i].connection);
256 : : }
257 : :
258 : : /* Collect result(s) as long as any are available */
259 [ + + ]: 22351 : while (!PQisBusy(sa->slots[i].connection))
260 : : {
261 : 22232 : PGresult *result = PQgetResult(sa->slots[i].connection);
262 : :
263 [ + + ]: 22232 : if (result != NULL)
264 : : {
265 : : /* Handle and discard the command result */
266 [ - + ]: 11116 : if (!processQueryResult(&sa->slots[i], result))
1130 rhaas@postgresql.org 267 :UBC 0 : return false;
268 : : }
269 : : else
270 : : {
271 : : /* This connection has become idle */
1130 rhaas@postgresql.org 272 :CBC 11116 : sa->slots[i].inUse = false;
273 : 11116 : ParallelSlotClearHandler(&sa->slots[i]);
274 : 11116 : break;
275 : : }
276 : : }
277 : : }
278 : 11164 : return true;
279 : : }
280 : :
281 : : /*
282 : : * Open a new database connection using the stored connection parameters and
283 : : * optionally a given dbname if not null, execute the stored initial command if
284 : : * any, and associate the new connection with the given slot.
285 : : */
286 : : static void
287 : 21 : connect_slot(ParallelSlotArray *sa, int slotno, const char *dbname)
288 : : {
289 : : const char *old_override;
290 : 21 : ParallelSlot *slot = &sa->slots[slotno];
291 : :
292 : 21 : old_override = sa->cparams->override_dbname;
293 [ + + ]: 21 : if (dbname)
294 : 17 : sa->cparams->override_dbname = dbname;
295 : 21 : slot->connection = connectDatabase(sa->cparams, sa->progname, sa->echo, false, true);
296 : 21 : sa->cparams->override_dbname = old_override;
297 : :
298 : : /*
299 : : * POSIX defines FD_SETSIZE as the highest file descriptor acceptable to
300 : : * FD_SET() and allied macros. Windows defines it as a ceiling on the
301 : : * count of file descriptors in the set, not a ceiling on the value of
302 : : * each file descriptor; see
303 : : * https://learn.microsoft.com/en-us/windows/win32/api/winsock2/nf-winsock2-select
304 : : * and
305 : : * https://learn.microsoft.com/en-us/windows/win32/api/winsock/ns-winsock-fd_set.
306 : : * We can't ignore that, because Windows starts file descriptors at a
307 : : * higher value, delays reuse, and skips values. With less than ten
308 : : * concurrent file descriptors, opened and closed rapidly, one can reach
309 : : * file descriptor 1024.
310 : : *
311 : : * Doing a hard exit here is a bit grotty, but it doesn't seem worth
312 : : * complicating the API to make it less grotty.
313 : : */
314 : : #ifdef WIN32
315 : : if (slotno >= FD_SETSIZE)
316 : : {
317 : : pg_log_error("too many jobs for this platform: %d", slotno);
318 : : exit(1);
319 : : }
320 : : #else
321 : : {
183 noah@leadboat.com 322 : 21 : int fd = PQsocket(slot->connection);
323 : :
324 [ - + ]: 21 : if (fd >= FD_SETSIZE)
325 : : {
183 noah@leadboat.com 326 :UBC 0 : pg_log_error("socket file descriptor out of range for select(): %d",
327 : : fd);
328 : 0 : pg_log_error_hint("Try fewer jobs.");
329 : 0 : exit(1);
330 : : }
331 : : }
332 : : #endif
333 : :
334 : : /* Setup the connection using the supplied command, if any. */
1130 rhaas@postgresql.org 335 [ - + ]:CBC 21 : if (sa->initcmd)
1130 rhaas@postgresql.org 336 :UBC 0 : executeCommand(slot->connection, sa->initcmd, sa->echo);
1731 michael@paquier.xyz 337 :CBC 21 : }
338 : :
339 : : /*
340 : : * ParallelSlotsGetIdle
341 : : * Return a connection slot that is ready to execute a command.
342 : : *
343 : : * The slot returned is chosen as follows:
344 : : *
345 : : * If any idle slot already has an open connection, and if either dbname is
346 : : * null or the existing connection is to the given database, that slot will be
347 : : * returned allowing the connection to be reused.
348 : : *
349 : : * Otherwise, if any idle slot is not yet connected to any database, the slot
350 : : * will be returned with it's connection opened using the stored cparams and
351 : : * optionally the given dbname if not null.
352 : : *
353 : : * Otherwise, if any idle slot exists, an idle slot will be chosen and returned
354 : : * after having it's connection disconnected and reconnected using the stored
355 : : * cparams and optionally the given dbname if not null.
356 : : *
357 : : * Otherwise, if any slots have connections that are busy, we loop on select()
358 : : * until one socket becomes available. When this happens, we read the whole
359 : : * set and mark as free all sockets that become available. We then select a
360 : : * slot using the same rules as above.
361 : : *
362 : : * Otherwise, we cannot return a slot, which is an error, and NULL is returned.
363 : : *
364 : : * For any connection created, if the stored initcmd is not null, it will be
365 : : * executed as a command on the newly formed connection before the slot is
366 : : * returned.
367 : : *
368 : : * If an error occurs, NULL is returned.
369 : : */
370 : : ParallelSlot *
1130 rhaas@postgresql.org 371 : 11318 : ParallelSlotsGetIdle(ParallelSlotArray *sa, const char *dbname)
372 : : {
373 : : int offset;
374 : :
375 [ - + ]: 11318 : Assert(sa);
376 [ + - ]: 11318 : Assert(sa->numslots > 0);
377 : :
378 : : while (1)
379 : : {
380 : : /* First choice: a slot already connected to the desired database. */
381 : 22482 : offset = find_matching_idle_slot(sa, dbname);
382 [ + + ]: 22482 : if (offset >= 0)
383 : : {
384 : 11297 : sa->slots[offset].inUse = true;
385 : 11297 : return &sa->slots[offset];
386 : : }
387 : :
388 : : /* Second choice: a slot not connected to any database. */
389 : 11185 : offset = find_unconnected_slot(sa);
390 [ + + ]: 11185 : if (offset >= 0)
391 : : {
392 : 7 : connect_slot(sa, offset, dbname);
393 : 7 : sa->slots[offset].inUse = true;
394 : 7 : return &sa->slots[offset];
395 : : }
396 : :
397 : : /* Third choice: a slot connected to the wrong database. */
398 : 11178 : offset = find_any_idle_slot(sa);
399 [ + + ]: 11178 : if (offset >= 0)
400 : : {
401 : 14 : disconnectDatabase(sa->slots[offset].connection);
402 : 14 : sa->slots[offset].connection = NULL;
403 : 14 : connect_slot(sa, offset, dbname);
404 : 14 : sa->slots[offset].inUse = true;
405 : 14 : return &sa->slots[offset];
406 : : }
407 : :
408 : : /*
409 : : * Fourth choice: block until one or more slots become available. If
410 : : * any slots hit a fatal error, we'll find out about that here and
411 : : * return NULL.
412 : : */
413 [ - + ]: 11164 : if (!wait_on_slots(sa))
1130 rhaas@postgresql.org 414 :UBC 0 : return NULL;
415 : : }
416 : : }
417 : :
418 : : /*
419 : : * ParallelSlotsSetup
420 : : * Prepare a set of parallel slots but do not connect to any database.
421 : : *
422 : : * This creates and initializes a set of slots, marking all parallel slots as
423 : : * free and ready to use. Establishing connections is delayed until requesting
424 : : * a free slot. The cparams, progname, echo, and initcmd are stored for later
425 : : * use and must remain valid for the lifetime of the returned array.
426 : : */
427 : : ParallelSlotArray *
1130 rhaas@postgresql.org 428 :CBC 145 : ParallelSlotsSetup(int numslots, ConnParams *cparams, const char *progname,
429 : : bool echo, const char *initcmd)
430 : : {
431 : : ParallelSlotArray *sa;
432 : :
433 [ - + ]: 145 : Assert(numslots > 0);
434 [ - + ]: 145 : Assert(cparams != NULL);
435 [ - + ]: 145 : Assert(progname != NULL);
436 : :
437 : 145 : sa = (ParallelSlotArray *) palloc0(offsetof(ParallelSlotArray, slots) +
438 : 145 : numslots * sizeof(ParallelSlot));
439 : :
440 : 145 : sa->numslots = numslots;
441 : 145 : sa->cparams = cparams;
442 : 145 : sa->progname = progname;
443 : 145 : sa->echo = echo;
444 : 145 : sa->initcmd = initcmd;
445 : :
446 : 145 : return sa;
447 : : }
448 : :
449 : : /*
450 : : * ParallelSlotsAdoptConn
451 : : * Assign an open connection to the slots array for reuse.
452 : : *
453 : : * This turns over ownership of an open connection to a slots array. The
454 : : * caller should not further use or close the connection. All the connection's
455 : : * parameters (user, host, port, etc.) except possibly dbname should match
456 : : * those of the slots array's cparams, as given in ParallelSlotsSetup. If
457 : : * these parameters differ, subsequent behavior is undefined.
458 : : */
459 : : void
460 : 142 : ParallelSlotsAdoptConn(ParallelSlotArray *sa, PGconn *conn)
461 : : {
462 : : int offset;
463 : :
464 : 142 : offset = find_unconnected_slot(sa);
465 [ + - ]: 142 : if (offset >= 0)
466 : 142 : sa->slots[offset].connection = conn;
467 : : else
1130 rhaas@postgresql.org 468 :UBC 0 : disconnectDatabase(conn);
1731 michael@paquier.xyz 469 :CBC 142 : }
470 : :
471 : : /*
472 : : * ParallelSlotsTerminate
473 : : * Clean up a set of parallel slots
474 : : *
475 : : * Iterate through all connections in a given set of ParallelSlots and
476 : : * terminate all connections.
477 : : */
478 : : void
1130 rhaas@postgresql.org 479 : 145 : ParallelSlotsTerminate(ParallelSlotArray *sa)
480 : : {
481 : : int i;
482 : :
483 [ + + ]: 294 : for (i = 0; i < sa->numslots; i++)
484 : : {
485 : 149 : PGconn *conn = sa->slots[i].connection;
486 : :
1731 michael@paquier.xyz 487 [ - + ]: 149 : if (conn == NULL)
1731 michael@paquier.xyz 488 :UBC 0 : continue;
489 : :
1731 michael@paquier.xyz 490 :CBC 149 : disconnectDatabase(conn);
491 : : }
492 : 145 : }
493 : :
494 : : /*
495 : : * ParallelSlotsWaitCompletion
496 : : *
497 : : * Wait for all connections to finish, returning false if at least one
498 : : * error has been found on the way.
499 : : */
500 : : bool
1130 rhaas@postgresql.org 501 : 198 : ParallelSlotsWaitCompletion(ParallelSlotArray *sa)
502 : : {
503 : : int i;
504 : :
505 [ + + ]: 395 : for (i = 0; i < sa->numslots; i++)
506 : : {
507 [ - + ]: 203 : if (sa->slots[i].connection == NULL)
1130 rhaas@postgresql.org 508 :UBC 0 : continue;
1130 rhaas@postgresql.org 509 [ + + ]:CBC 203 : if (!consumeQueryResult(&sa->slots[i]))
1164 510 : 6 : return false;
511 : : /* Mark connection as idle */
464 tgl@sss.pgh.pa.us 512 : 197 : sa->slots[i].inUse = false;
513 : 197 : ParallelSlotClearHandler(&sa->slots[i]);
514 : : }
515 : :
1164 rhaas@postgresql.org 516 : 192 : return true;
517 : : }
518 : :
519 : : /*
520 : : * TableCommandResultHandler
521 : : *
522 : : * ParallelSlotResultHandler for results of commands (not queries) against
523 : : * tables.
524 : : *
525 : : * Requires that the result status is either PGRES_COMMAND_OK or an error about
526 : : * a missing table. This is useful for utilities that compile a list of tables
527 : : * to process and then run commands (vacuum, reindex, or whatever) against
528 : : * those tables, as there is a race condition between the time the list is
529 : : * compiled and the time the command attempts to open the table.
530 : : *
531 : : * For missing tables, logs an error but allows processing to continue.
532 : : *
533 : : * For all other errors, logs an error and terminates further processing.
534 : : *
535 : : * res: PGresult from the query executed on the slot's connection
536 : : * conn: connection belonging to the slot
537 : : * context: unused
538 : : */
539 : : bool
540 : 3537 : TableCommandResultHandler(PGresult *res, PGconn *conn, void *context)
541 : : {
1130 542 [ - + ]: 3537 : Assert(res != NULL);
543 [ - + ]: 3537 : Assert(conn != NULL);
544 : :
545 : : /*
546 : : * If it's an error, report it. Errors about a missing table are harmless
547 : : * so we continue processing; but die for other errors.
548 : : */
1164 549 [ + + ]: 3537 : if (PQresultStatus(res) != PGRES_COMMAND_OK)
550 : : {
551 : 6 : char *sqlState = PQresultErrorField(res, PG_DIAG_SQLSTATE);
552 : :
553 : 6 : pg_log_error("processing of database \"%s\" failed: %s",
554 : : PQdb(conn), PQerrorMessage(conn));
555 : :
556 [ + - + - ]: 6 : if (sqlState && strcmp(sqlState, ERRCODE_UNDEFINED_TABLE) != 0)
557 : : {
558 : 6 : PQclear(res);
1731 michael@paquier.xyz 559 : 6 : return false;
560 : : }
561 : : }
562 : :
563 : 3531 : return true;
564 : : }
|