Age Owner TLA Line data Source code
1 : /*-------------------------------------------------------------------------
2 : *
3 : * parallel_slot.c
4 : * Parallel support for front-end parallel database connections
5 : *
6 : *
7 : * Portions Copyright (c) 1996-2023, PostgreSQL Global Development Group
8 : * Portions Copyright (c) 1994, Regents of the University of California
9 : *
10 : * src/fe_utils/parallel_slot.c
11 : *
12 : *-------------------------------------------------------------------------
13 : */
14 :
15 : #if defined(WIN32) && FD_SETSIZE < 1024
16 : #error FD_SETSIZE needs to have been increased
17 : #endif
18 :
19 : #include "postgres_fe.h"
20 :
21 : #include <sys/select.h>
22 :
23 : #include "common/logging.h"
24 : #include "fe_utils/cancel.h"
25 : #include "fe_utils/parallel_slot.h"
26 : #include "fe_utils/query_utils.h"
27 :
28 : #define ERRCODE_UNDEFINED_TABLE "42P01"
29 :
30 : static int select_loop(int maxFd, fd_set *workerset);
31 : static bool processQueryResult(ParallelSlot *slot, PGresult *result);
32 :
33 : /*
34 : * Process (and delete) a query result. Returns true if there's no problem,
35 : * false otherwise. It's up to the handler to decide what constitutes a
36 : * problem.
793 rhaas 37 ECB : */
38 : static bool
793 rhaas 39 CBC 8433 : processQueryResult(ParallelSlot *slot, PGresult *result)
40 : {
793 rhaas 41 GIC 8433 : Assert(slot->handler != NULL);
793 rhaas 42 ECB :
43 : /* On failure, the handler should return NULL after freeing the result */
793 rhaas 44 GIC 8433 : if (!slot->handler(result, slot->connection, slot->handler_context))
45 7 : return false;
793 rhaas 46 ECB :
47 : /* Ok, we have to free it ourself */
793 rhaas 48 GIC 8426 : PQclear(result);
49 8426 : return true;
50 : }
51 :
52 : /*
53 : * Consume all the results generated for the given connection until
54 : * nothing remains. If at least one error is encountered, return false.
55 : * Note that this will block if the connection is busy.
793 rhaas 56 ECB : */
57 : static bool
793 rhaas 58 CBC 144 : consumeQueryResult(ParallelSlot *slot)
59 : {
793 rhaas 60 GIC 144 : bool ok = true;
793 rhaas 61 ECB : PGresult *result;
62 :
793 rhaas 63 GIC 144 : SetCancelConn(slot->connection);
793 rhaas 64 CBC 288 : while ((result = PQgetResult(slot->connection)) != NULL)
793 rhaas 65 ECB : {
793 rhaas 66 GIC 144 : if (!processQueryResult(slot, result))
793 rhaas 67 CBC 7 : ok = false;
793 rhaas 68 ECB : }
793 rhaas 69 GIC 144 : ResetCancelConn();
70 144 : return ok;
71 : }
72 :
73 : /*
74 : * Wait until a file descriptor from the given set becomes readable.
75 : *
76 : * Returns the number of ready descriptors, or -1 on failure (including
77 : * getting a cancel request).
1360 michael 78 ECB : */
79 : static int
1036 tgl 80 GIC 8335 : select_loop(int maxFd, fd_set *workerset)
1360 michael 81 ECB : {
82 : int i;
1360 michael 83 CBC 8335 : fd_set saveSet = *workerset;
1360 michael 84 EUB :
1360 michael 85 GIC 8335 : if (CancelRequested)
1360 michael 86 UIC 0 : return -1;
1360 michael 87 EUB :
88 : for (;;)
1360 michael 89 UIC 0 : {
90 : /*
91 : * On Windows, we need to check once in a while for cancel requests;
92 : * on other platforms we rely on select() returning when interrupted.
93 : */
94 : struct timeval *tvp;
95 : #ifdef WIN32
96 : struct timeval tv = {0, 1000000};
97 :
1360 michael 98 ECB : tvp = &tv;
99 : #else
1360 michael 100 GIC 8335 : tvp = NULL;
1360 michael 101 ECB : #endif
102 :
1360 michael 103 GIC 8335 : *workerset = saveSet;
104 8335 : i = select(maxFd + 1, workerset, NULL, NULL, tvp);
105 :
106 : #ifdef WIN32
107 : if (i == SOCKET_ERROR)
108 : {
109 : i = -1;
110 :
111 : if (WSAGetLastError() == WSAEINTR)
112 : errno = EINTR;
113 : }
1360 michael 114 ECB : #endif
1360 michael 115 EUB :
1360 michael 116 CBC 8335 : if (i < 0 && errno == EINTR)
1360 michael 117 UBC 0 : continue; /* ignore this */
1360 michael 118 CBC 8335 : if (i < 0 || CancelRequested)
1036 tgl 119 UBC 0 : return -1; /* but not this */
1360 michael 120 CBC 8335 : if (i == 0)
1360 michael 121 UIC 0 : continue; /* timeout (Win32 only) */
1360 michael 122 GIC 8335 : break;
1360 michael 123 ECB : }
124 :
1360 michael 125 GIC 8335 : return i;
126 : }
127 :
128 : /*
129 : * Return the offset of a suitable idle slot, or -1 if none are available. If
130 : * the given dbname is not null, only idle slots connected to the given
131 : * database are considered suitable, otherwise all idle connected slots are
132 : * considered suitable.
1360 michael 133 ECB : */
134 : static int
759 rhaas 135 GIC 16767 : find_matching_idle_slot(const ParallelSlotArray *sa, const char *dbname)
136 : {
1360 michael 137 ECB : int i;
138 :
759 rhaas 139 CBC 25229 : for (i = 0; i < sa->numslots; i++)
1360 michael 140 ECB : {
759 rhaas 141 GIC 16877 : if (sa->slots[i].inUse)
759 rhaas 142 CBC 8445 : continue;
759 rhaas 143 ECB :
759 rhaas 144 GIC 8432 : if (sa->slots[i].connection == NULL)
759 rhaas 145 CBC 6 : continue;
759 rhaas 146 ECB :
759 rhaas 147 CBC 8426 : if (dbname == NULL ||
759 rhaas 148 GIC 5825 : strcmp(PQdb(sa->slots[i].connection), dbname) == 0)
759 rhaas 149 CBC 8415 : return i;
150 : }
759 rhaas 151 GIC 8352 : return -1;
152 : }
153 :
154 : /*
155 : * Return the offset of the first slot without a database connection, or -1 if
156 : * all slots are connected.
759 rhaas 157 ECB : */
158 : static int
759 rhaas 159 GIC 8454 : find_unconnected_slot(const ParallelSlotArray *sa)
160 : {
759 rhaas 161 ECB : int i;
162 :
759 rhaas 163 CBC 16875 : for (i = 0; i < sa->numslots; i++)
759 rhaas 164 ECB : {
759 rhaas 165 GIC 8529 : if (sa->slots[i].inUse)
759 rhaas 166 CBC 8410 : continue;
759 rhaas 167 ECB :
759 rhaas 168 GIC 119 : if (sa->slots[i].connection == NULL)
169 108 : return i;
759 rhaas 170 ECB : }
171 :
759 rhaas 172 GIC 8346 : return -1;
173 : }
174 :
175 : /*
176 : * Return the offset of the first idle slot, or -1 if all slots are busy.
759 rhaas 177 ECB : */
178 : static int
759 rhaas 179 GIC 8346 : find_any_idle_slot(const ParallelSlotArray *sa)
180 : {
759 rhaas 181 ECB : int i;
182 :
759 rhaas 183 CBC 16753 : for (i = 0; i < sa->numslots; i++)
759 rhaas 184 GIC 8418 : if (!sa->slots[i].inUse)
759 rhaas 185 CBC 11 : return i;
186 :
759 rhaas 187 GIC 8335 : return -1;
188 : }
189 :
190 : /*
191 : * Wait for any slot's connection to have query results, consume the results,
192 : * and update the slot's status as appropriate. Returns true on success,
193 : * false on cancellation, on error, or if no slots are connected.
759 rhaas 194 ECB : */
195 : static bool
759 rhaas 196 GIC 8335 : wait_on_slots(ParallelSlotArray *sa)
197 : {
759 rhaas 198 ECB : int i;
199 : fd_set slotset;
759 rhaas 200 GIC 8335 : int maxFd = 0;
201 8335 : PGconn *cancelconn = NULL;
759 rhaas 202 ECB :
203 : /* We must reconstruct the fd_set for each call to select_loop */
759 rhaas 204 CBC 141695 : FD_ZERO(&slotset);
205 :
759 rhaas 206 GIC 16742 : for (i = 0; i < sa->numslots; i++)
207 : {
208 : int sock;
759 rhaas 209 ECB :
210 : /* We shouldn't get here if we still have slots without connections */
759 rhaas 211 CBC 8407 : Assert(sa->slots[i].connection != NULL);
212 :
759 rhaas 213 GIC 8407 : sock = PQsocket(sa->slots[i].connection);
214 :
215 : /*
216 : * We don't really expect any connections to lose their sockets after
759 rhaas 217 ECB : * startup, but just in case, cope by ignoring them.
759 rhaas 218 EUB : */
759 rhaas 219 GIC 8407 : if (sock < 0)
759 rhaas 220 UIC 0 : continue;
759 rhaas 221 ECB :
222 : /* Keep track of the first valid connection we see. */
759 rhaas 223 GIC 8407 : if (cancelconn == NULL)
759 rhaas 224 CBC 8335 : cancelconn = sa->slots[i].connection;
759 rhaas 225 ECB :
759 rhaas 226 CBC 8407 : FD_SET(sock, &slotset);
759 rhaas 227 GIC 8407 : if (sock > maxFd)
228 8407 : maxFd = sock;
229 : }
230 :
231 : /*
232 : * If we get this far with no valid connections, processing cannot
759 rhaas 233 ECB : * continue.
1360 michael 234 EUB : */
759 rhaas 235 GIC 8335 : if (cancelconn == NULL)
759 rhaas 236 LBC 0 : return false;
759 rhaas 237 ECB :
225 michael 238 CBC 8335 : SetCancelConn(cancelconn);
759 rhaas 239 GIC 8335 : i = select_loop(maxFd, &slotset);
240 8335 : ResetCancelConn();
759 rhaas 241 ECB :
759 rhaas 242 EUB : /* failure? */
759 rhaas 243 GIC 8335 : if (i < 0)
759 rhaas 244 LBC 0 : return false;
245 :
759 rhaas 246 GIC 16742 : for (i = 0; i < sa->numslots; i++)
247 : {
759 rhaas 248 ECB : int sock;
249 :
759 rhaas 250 CBC 8407 : sock = PQsocket(sa->slots[i].connection);
251 :
759 rhaas 252 GIC 8407 : if (sock >= 0 && FD_ISSET(sock, &slotset))
1360 michael 253 ECB : {
254 : /* select() says input is available, so consume it */
759 rhaas 255 GIC 8335 : PQconsumeInput(sa->slots[i].connection);
256 : }
1360 michael 257 ECB :
258 : /* Collect result(s) as long as any are available */
759 rhaas 259 CBC 16696 : while (!PQisBusy(sa->slots[i].connection))
260 : {
261 16578 : PGresult *result = PQgetResult(sa->slots[i].connection);
262 :
759 rhaas 263 GIC 16578 : if (result != NULL)
1360 michael 264 ECB : {
759 rhaas 265 EUB : /* Handle and discard the command result */
759 rhaas 266 GIC 8289 : if (!processQueryResult(&sa->slots[i], result))
759 rhaas 267 UIC 0 : return false;
268 : }
269 : else
1360 michael 270 ECB : {
759 rhaas 271 : /* This connection has become idle */
759 rhaas 272 CBC 8289 : sa->slots[i].inUse = false;
759 rhaas 273 GIC 8289 : ParallelSlotClearHandler(&sa->slots[i]);
274 8289 : break;
275 : }
1360 michael 276 ECB : }
277 : }
759 rhaas 278 GIC 8335 : return true;
279 : }
280 :
281 : /*
282 : * Open a new database connection using the stored connection parameters and
283 : * optionally a given dbname if not null, execute the stored initial command if
284 : * any, and associate the new connection with the given slot.
759 rhaas 285 ECB : */
286 : static void
759 rhaas 287 GIC 17 : connect_slot(ParallelSlotArray *sa, int slotno, const char *dbname)
759 rhaas 288 ECB : {
289 : const char *old_override;
759 rhaas 290 CBC 17 : ParallelSlot *slot = &sa->slots[slotno];
759 rhaas 291 ECB :
759 rhaas 292 CBC 17 : old_override = sa->cparams->override_dbname;
293 17 : if (dbname)
294 14 : sa->cparams->override_dbname = dbname;
759 rhaas 295 GIC 17 : slot->connection = connectDatabase(sa->cparams, sa->progname, sa->echo, false, true);
759 rhaas 296 CBC 17 : sa->cparams->override_dbname = old_override;
759 rhaas 297 EUB :
759 rhaas 298 GIC 17 : if (PQsocket(slot->connection) >= FD_SETSIZE)
366 tgl 299 UIC 0 : pg_fatal("too many jobs for this platform");
759 rhaas 300 ECB :
759 rhaas 301 EUB : /* Setup the connection using the supplied command, if any. */
759 rhaas 302 CBC 17 : if (sa->initcmd)
759 rhaas 303 UIC 0 : executeCommand(slot->connection, sa->initcmd, sa->echo);
1360 michael 304 GIC 17 : }
305 :
306 : /*
307 : * ParallelSlotsGetIdle
308 : * Return a connection slot that is ready to execute a command.
309 : *
310 : * The slot returned is chosen as follows:
311 : *
312 : * If any idle slot already has an open connection, and if either dbname is
313 : * null or the existing connection is to the given database, that slot will be
314 : * returned allowing the connection to be reused.
315 : *
316 : * Otherwise, if any idle slot is not yet connected to any database, the slot
317 : * will be returned with it's connection opened using the stored cparams and
318 : * optionally the given dbname if not null.
319 : *
320 : * Otherwise, if any idle slot exists, an idle slot will be chosen and returned
321 : * after having it's connection disconnected and reconnected using the stored
322 : * cparams and optionally the given dbname if not null.
323 : *
324 : * Otherwise, if any slots have connections that are busy, we loop on select()
325 : * until one socket becomes available. When this happens, we read the whole
326 : * set and mark as free all sockets that become available. We then select a
327 : * slot using the same rules as above.
328 : *
329 : * Otherwise, we cannot return a slot, which is an error, and NULL is returned.
330 : *
331 : * For any connection created, if the stored initcmd is not null, it will be
332 : * executed as a command on the newly formed connection before the slot is
333 : * returned.
334 : *
335 : * If an error occurs, NULL is returned.
1360 michael 336 ECB : */
337 : ParallelSlot *
759 rhaas 338 GIC 8432 : ParallelSlotsGetIdle(ParallelSlotArray *sa, const char *dbname)
339 : {
759 rhaas 340 ECB : int offset;
1360 michael 341 :
759 rhaas 342 GIC 8432 : Assert(sa);
343 8432 : Assert(sa->numslots > 0);
344 :
345 : while (1)
1360 michael 346 ECB : {
759 rhaas 347 : /* First choice: a slot already connected to the desired database. */
759 rhaas 348 GIC 16767 : offset = find_matching_idle_slot(sa, dbname);
759 rhaas 349 CBC 16767 : if (offset >= 0)
1360 michael 350 ECB : {
759 rhaas 351 GIC 8415 : sa->slots[offset].inUse = true;
352 8415 : return &sa->slots[offset];
353 : }
759 rhaas 354 ECB :
355 : /* Second choice: a slot not connected to any database. */
759 rhaas 356 GIC 8352 : offset = find_unconnected_slot(sa);
759 rhaas 357 CBC 8352 : if (offset >= 0)
759 rhaas 358 ECB : {
759 rhaas 359 CBC 6 : connect_slot(sa, offset, dbname);
759 rhaas 360 GIC 6 : sa->slots[offset].inUse = true;
361 6 : return &sa->slots[offset];
362 : }
1322 michael 363 ECB :
759 rhaas 364 : /* Third choice: a slot connected to the wrong database. */
759 rhaas 365 GIC 8346 : offset = find_any_idle_slot(sa);
759 rhaas 366 CBC 8346 : if (offset >= 0)
759 rhaas 367 ECB : {
759 rhaas 368 CBC 11 : disconnectDatabase(sa->slots[offset].connection);
369 11 : sa->slots[offset].connection = NULL;
370 11 : connect_slot(sa, offset, dbname);
759 rhaas 371 GIC 11 : sa->slots[offset].inUse = true;
372 11 : return &sa->slots[offset];
373 : }
374 :
375 : /*
376 : * Fourth choice: block until one or more slots become available. If
377 : * any slots hit a fatal error, we'll find out about that here and
759 rhaas 378 ECB : * return NULL.
759 rhaas 379 EUB : */
759 rhaas 380 GIC 8335 : if (!wait_on_slots(sa))
759 rhaas 381 UIC 0 : return NULL;
382 : }
383 : }
384 :
385 : /*
386 : * ParallelSlotsSetup
387 : * Prepare a set of parallel slots but do not connect to any database.
388 : *
389 : * This creates and initializes a set of slots, marking all parallel slots as
390 : * free and ready to use. Establishing connections is delayed until requesting
391 : * a free slot. The cparams, progname, echo, and initcmd are stored for later
392 : * use and must remain valid for the lifetime of the returned array.
759 rhaas 393 ECB : */
394 : ParallelSlotArray *
759 rhaas 395 GIC 105 : ParallelSlotsSetup(int numslots, ConnParams *cparams, const char *progname,
396 : bool echo, const char *initcmd)
397 : {
759 rhaas 398 ECB : ParallelSlotArray *sa;
1360 michael 399 :
759 rhaas 400 CBC 105 : Assert(numslots > 0);
759 rhaas 401 GIC 105 : Assert(cparams != NULL);
759 rhaas 402 CBC 105 : Assert(progname != NULL);
759 rhaas 403 ECB :
759 rhaas 404 GIC 105 : sa = (ParallelSlotArray *) palloc0(offsetof(ParallelSlotArray, slots) +
759 rhaas 405 CBC 105 : numslots * sizeof(ParallelSlot));
759 rhaas 406 ECB :
759 rhaas 407 CBC 105 : sa->numslots = numslots;
408 105 : sa->cparams = cparams;
409 105 : sa->progname = progname;
759 rhaas 410 GIC 105 : sa->echo = echo;
759 rhaas 411 CBC 105 : sa->initcmd = initcmd;
412 :
759 rhaas 413 GIC 105 : return sa;
414 : }
415 :
416 : /*
417 : * ParallelSlotsAdoptConn
418 : * Assign an open connection to the slots array for reuse.
419 : *
420 : * This turns over ownership of an open connection to a slots array. The
421 : * caller should not further use or close the connection. All the connection's
422 : * parameters (user, host, port, etc.) except possibly dbname should match
423 : * those of the slots array's cparams, as given in ParallelSlotsSetup. If
424 : * these parameters differ, subsequent behavior is undefined.
759 rhaas 425 ECB : */
426 : void
759 rhaas 427 GIC 102 : ParallelSlotsAdoptConn(ParallelSlotArray *sa, PGconn *conn)
428 : {
759 rhaas 429 ECB : int offset;
430 :
759 rhaas 431 CBC 102 : offset = find_unconnected_slot(sa);
759 rhaas 432 GIC 102 : if (offset >= 0)
759 rhaas 433 GBC 102 : sa->slots[offset].connection = conn;
759 rhaas 434 ECB : else
759 rhaas 435 UIC 0 : disconnectDatabase(conn);
1360 michael 436 GIC 102 : }
437 :
438 : /*
439 : * ParallelSlotsTerminate
440 : * Clean up a set of parallel slots
441 : *
442 : * Iterate through all connections in a given set of ParallelSlots and
443 : * terminate all connections.
1360 michael 444 ECB : */
445 : void
759 rhaas 446 GIC 105 : ParallelSlotsTerminate(ParallelSlotArray *sa)
447 : {
1360 michael 448 ECB : int i;
449 :
759 rhaas 450 CBC 213 : for (i = 0; i < sa->numslots; i++)
451 : {
452 108 : PGconn *conn = sa->slots[i].connection;
1360 michael 453 EUB :
1360 michael 454 GIC 108 : if (conn == NULL)
1360 michael 455 LBC 0 : continue;
456 :
1360 michael 457 CBC 108 : disconnectDatabase(conn);
458 : }
1360 michael 459 GIC 105 : }
460 :
461 : /*
462 : * ParallelSlotsWaitCompletion
463 : *
464 : * Wait for all connections to finish, returning false if at least one
465 : * error has been found on the way.
1360 michael 466 ECB : */
467 : bool
759 rhaas 468 GIC 140 : ParallelSlotsWaitCompletion(ParallelSlotArray *sa)
469 : {
1360 michael 470 ECB : int i;
471 :
759 rhaas 472 CBC 277 : for (i = 0; i < sa->numslots; i++)
1360 michael 473 EUB : {
759 rhaas 474 CBC 144 : if (sa->slots[i].connection == NULL)
759 rhaas 475 LBC 0 : continue;
759 rhaas 476 GIC 144 : if (!consumeQueryResult(&sa->slots[i]))
793 rhaas 477 CBC 7 : return false;
478 : /* Mark connection as idle */
93 tgl 479 GNC 137 : sa->slots[i].inUse = false;
480 137 : ParallelSlotClearHandler(&sa->slots[i]);
793 rhaas 481 ECB : }
482 :
793 rhaas 483 GIC 133 : return true;
793 rhaas 484 ECB : }
485 :
486 : /*
487 : * TableCommandResultHandler
488 : *
489 : * ParallelSlotResultHandler for results of commands (not queries) against
490 : * tables.
491 : *
492 : * Requires that the result status is either PGRES_COMMAND_OK or an error about
493 : * a missing table. This is useful for utilities that compile a list of tables
494 : * to process and then run commands (vacuum, reindex, or whatever) against
495 : * those tables, as there is a race condition between the time the list is
496 : * compiled and the time the command attempts to open the table.
497 : *
498 : * For missing tables, logs an error but allows processing to continue.
499 : *
500 : * For all other errors, logs an error and terminates further processing.
501 : *
502 : * res: PGresult from the query executed on the slot's connection
503 : * conn: connection belonging to the slot
504 : * context: unused
505 : */
506 : bool
793 rhaas 507 GIC 2605 : TableCommandResultHandler(PGresult *res, PGconn *conn, void *context)
793 rhaas 508 ECB : {
759 rhaas 509 GIC 2605 : Assert(res != NULL);
759 rhaas 510 CBC 2605 : Assert(conn != NULL);
759 rhaas 511 ECB :
512 : /*
513 : * If it's an error, report it. Errors about a missing table are harmless
514 : * so we continue processing; but die for other errors.
515 : */
793 rhaas 516 GIC 2605 : if (PQresultStatus(res) != PGRES_COMMAND_OK)
793 rhaas 517 ECB : {
793 rhaas 518 GIC 7 : char *sqlState = PQresultErrorField(res, PG_DIAG_SQLSTATE);
793 rhaas 519 ECB :
793 rhaas 520 GIC 7 : pg_log_error("processing of database \"%s\" failed: %s",
793 rhaas 521 ECB : PQdb(conn), PQerrorMessage(conn));
522 :
793 rhaas 523 GIC 7 : if (sqlState && strcmp(sqlState, ERRCODE_UNDEFINED_TABLE) != 0)
793 rhaas 524 ECB : {
793 rhaas 525 GIC 7 : PQclear(res);
1360 michael 526 CBC 7 : return false;
793 rhaas 527 ECB : }
528 : }
529 :
1360 michael 530 GIC 2598 : return true;
1360 michael 531 ECB : }
|