LCOV - differential code coverage report
Current view: top level - src/fe_utils - parallel_slot.c (source / functions) Coverage Total Hit LBC UIC UBC GBC GIC GNC CBC EUB ECB
Current: Differential Code Coverage HEAD vs 15 Lines: 90.8 % 163 148 4 9 2 1 90 2 55 12 89
Current Date: 2023-04-08 15:15:32 Functions: 100.0 % 14 14 14 14
Baseline: 15
Baseline Date: 2023-04-08 15:09:40
Legend: Lines: hit not hit

           TLA  Line data    Source code
       1                 : /*-------------------------------------------------------------------------
       2                 :  *
       3                 :  *  parallel_slot.c
       4                 :  *      Parallel support for front-end parallel database connections
       5                 :  *
       6                 :  *
       7                 :  * Portions Copyright (c) 1996-2023, PostgreSQL Global Development Group
       8                 :  * Portions Copyright (c) 1994, Regents of the University of California
       9                 :  *
      10                 :  * src/fe_utils/parallel_slot.c
      11                 :  *
      12                 :  *-------------------------------------------------------------------------
      13                 :  */
      14                 : 
      15                 : #if defined(WIN32) && FD_SETSIZE < 1024
      16                 : #error FD_SETSIZE needs to have been increased
      17                 : #endif
      18                 : 
      19                 : #include "postgres_fe.h"
      20                 : 
      21                 : #include <sys/select.h>
      22                 : 
      23                 : #include "common/logging.h"
      24                 : #include "fe_utils/cancel.h"
      25                 : #include "fe_utils/parallel_slot.h"
      26                 : #include "fe_utils/query_utils.h"
      27                 : 
      28                 : #define ERRCODE_UNDEFINED_TABLE  "42P01"
      29                 : 
      30                 : static int  select_loop(int maxFd, fd_set *workerset);
      31                 : static bool processQueryResult(ParallelSlot *slot, PGresult *result);
      32                 : 
      33                 : /*
      34                 :  * Process (and delete) a query result.  Returns true if there's no problem,
      35                 :  * false otherwise. It's up to the handler to decide what constitutes a
      36                 :  * problem.
      37 ECB             :  */
      38                 : static bool
      39 CBC        8433 : processQueryResult(ParallelSlot *slot, PGresult *result)
      40                 : {
      41 GIC        8433 :     Assert(slot->handler != NULL);
      42 ECB             : 
      43                 :     /* On failure, the handler should return NULL after freeing the result */
      44 GIC        8433 :     if (!slot->handler(result, slot->connection, slot->handler_context))
      45               7 :         return false;
      46 ECB             : 
      47                 :     /* Ok, we have to free it ourself */
      48 GIC        8426 :     PQclear(result);
      49            8426 :     return true;
      50                 : }
      51                 : 
      52                 : /*
      53                 :  * Consume all the results generated for the given connection until
      54                 :  * nothing remains.  If at least one error is encountered, return false.
      55                 :  * Note that this will block if the connection is busy.
      56 ECB             :  */
      57                 : static bool
      58 CBC         144 : consumeQueryResult(ParallelSlot *slot)
      59                 : {
      60 GIC         144 :     bool        ok = true;
      61 ECB             :     PGresult   *result;
      62                 : 
      63 GIC         144 :     SetCancelConn(slot->connection);
      64 CBC         288 :     while ((result = PQgetResult(slot->connection)) != NULL)
      65 ECB             :     {
      66 GIC         144 :         if (!processQueryResult(slot, result))
      67 CBC           7 :             ok = false;
      68 ECB             :     }
      69 GIC         144 :     ResetCancelConn();
      70             144 :     return ok;
      71                 : }
      72                 : 
      73                 : /*
      74                 :  * Wait until a file descriptor from the given set becomes readable.
      75                 :  *
      76                 :  * Returns the number of ready descriptors, or -1 on failure (including
      77                 :  * getting a cancel request).
      78 ECB             :  */
      79                 : static int
      80 GIC        8335 : select_loop(int maxFd, fd_set *workerset)
      81 ECB             : {
      82                 :     int         i;
      83 CBC        8335 :     fd_set      saveSet = *workerset;
      84 EUB             : 
      85 GIC        8335 :     if (CancelRequested)
      86 UIC           0 :         return -1;
      87 EUB             : 
      88                 :     for (;;)
      89 UIC           0 :     {
      90                 :         /*
      91                 :          * On Windows, we need to check once in a while for cancel requests;
      92                 :          * on other platforms we rely on select() returning when interrupted.
      93                 :          */
      94                 :         struct timeval *tvp;
      95                 : #ifdef WIN32
      96                 :         struct timeval tv = {0, 1000000};
      97                 : 
      98 ECB             :         tvp = &tv;
      99                 : #else
     100 GIC        8335 :         tvp = NULL;
     101 ECB             : #endif
     102                 : 
     103 GIC        8335 :         *workerset = saveSet;
     104            8335 :         i = select(maxFd + 1, workerset, NULL, NULL, tvp);
     105                 : 
     106                 : #ifdef WIN32
     107                 :         if (i == SOCKET_ERROR)
     108                 :         {
     109                 :             i = -1;
     110                 : 
     111                 :             if (WSAGetLastError() == WSAEINTR)
     112                 :                 errno = EINTR;
     113                 :         }
     114 ECB             : #endif
     115 EUB             : 
     116 CBC        8335 :         if (i < 0 && errno == EINTR)
     117 UBC           0 :             continue;           /* ignore this */
     118 CBC        8335 :         if (i < 0 || CancelRequested)
     119 UBC           0 :             return -1;          /* but not this */
     120 CBC        8335 :         if (i == 0)
     121 UIC           0 :             continue;           /* timeout (Win32 only) */
     122 GIC        8335 :         break;
     123 ECB             :     }
     124                 : 
     125 GIC        8335 :     return i;
     126                 : }
     127                 : 
     128                 : /*
     129                 :  * Return the offset of a suitable idle slot, or -1 if none are available.  If
     130                 :  * the given dbname is not null, only idle slots connected to the given
     131                 :  * database are considered suitable, otherwise all idle connected slots are
     132                 :  * considered suitable.
     133 ECB             :  */
     134                 : static int
     135 GIC       16767 : find_matching_idle_slot(const ParallelSlotArray *sa, const char *dbname)
     136                 : {
     137 ECB             :     int         i;
     138                 : 
     139 CBC       25229 :     for (i = 0; i < sa->numslots; i++)
     140 ECB             :     {
     141 GIC       16877 :         if (sa->slots[i].inUse)
     142 CBC        8445 :             continue;
     143 ECB             : 
     144 GIC        8432 :         if (sa->slots[i].connection == NULL)
     145 CBC           6 :             continue;
     146 ECB             : 
     147 CBC        8426 :         if (dbname == NULL ||
     148 GIC        5825 :             strcmp(PQdb(sa->slots[i].connection), dbname) == 0)
     149 CBC        8415 :             return i;
     150                 :     }
     151 GIC        8352 :     return -1;
     152                 : }
     153                 : 
     154                 : /*
     155                 :  * Return the offset of the first slot without a database connection, or -1 if
     156                 :  * all slots are connected.
     157 ECB             :  */
     158                 : static int
     159 GIC        8454 : find_unconnected_slot(const ParallelSlotArray *sa)
     160                 : {
     161 ECB             :     int         i;
     162                 : 
     163 CBC       16875 :     for (i = 0; i < sa->numslots; i++)
     164 ECB             :     {
     165 GIC        8529 :         if (sa->slots[i].inUse)
     166 CBC        8410 :             continue;
     167 ECB             : 
     168 GIC         119 :         if (sa->slots[i].connection == NULL)
     169             108 :             return i;
     170 ECB             :     }
     171                 : 
     172 GIC        8346 :     return -1;
     173                 : }
     174                 : 
     175                 : /*
     176                 :  * Return the offset of the first idle slot, or -1 if all slots are busy.
     177 ECB             :  */
     178                 : static int
     179 GIC        8346 : find_any_idle_slot(const ParallelSlotArray *sa)
     180                 : {
     181 ECB             :     int         i;
     182                 : 
     183 CBC       16753 :     for (i = 0; i < sa->numslots; i++)
     184 GIC        8418 :         if (!sa->slots[i].inUse)
     185 CBC          11 :             return i;
     186                 : 
     187 GIC        8335 :     return -1;
     188                 : }
     189                 : 
     190                 : /*
     191                 :  * Wait for any slot's connection to have query results, consume the results,
     192                 :  * and update the slot's status as appropriate.  Returns true on success,
     193                 :  * false on cancellation, on error, or if no slots are connected.
     194 ECB             :  */
     195                 : static bool
     196 GIC        8335 : wait_on_slots(ParallelSlotArray *sa)
     197                 : {
     198 ECB             :     int         i;
     199                 :     fd_set      slotset;
     200 GIC        8335 :     int         maxFd = 0;
     201            8335 :     PGconn     *cancelconn = NULL;
     202 ECB             : 
     203                 :     /* We must reconstruct the fd_set for each call to select_loop */
     204 CBC      141695 :     FD_ZERO(&slotset);
     205                 : 
     206 GIC       16742 :     for (i = 0; i < sa->numslots; i++)
     207                 :     {
     208                 :         int         sock;
     209 ECB             : 
     210                 :         /* We shouldn't get here if we still have slots without connections */
     211 CBC        8407 :         Assert(sa->slots[i].connection != NULL);
     212                 : 
     213 GIC        8407 :         sock = PQsocket(sa->slots[i].connection);
     214                 : 
     215                 :         /*
     216                 :          * We don't really expect any connections to lose their sockets after
     217 ECB             :          * startup, but just in case, cope by ignoring them.
     218 EUB             :          */
     219 GIC        8407 :         if (sock < 0)
     220 UIC           0 :             continue;
     221 ECB             : 
     222                 :         /* Keep track of the first valid connection we see. */
     223 GIC        8407 :         if (cancelconn == NULL)
     224 CBC        8335 :             cancelconn = sa->slots[i].connection;
     225 ECB             : 
     226 CBC        8407 :         FD_SET(sock, &slotset);
     227 GIC        8407 :         if (sock > maxFd)
     228            8407 :             maxFd = sock;
     229                 :     }
     230                 : 
     231                 :     /*
     232                 :      * If we get this far with no valid connections, processing cannot
     233 ECB             :      * continue.
     234 EUB             :      */
     235 GIC        8335 :     if (cancelconn == NULL)
     236 LBC           0 :         return false;
     237 ECB             : 
     238 CBC        8335 :     SetCancelConn(cancelconn);
     239 GIC        8335 :     i = select_loop(maxFd, &slotset);
     240            8335 :     ResetCancelConn();
     241 ECB             : 
     242 EUB             :     /* failure? */
     243 GIC        8335 :     if (i < 0)
     244 LBC           0 :         return false;
     245                 : 
     246 GIC       16742 :     for (i = 0; i < sa->numslots; i++)
     247                 :     {
     248 ECB             :         int         sock;
     249                 : 
     250 CBC        8407 :         sock = PQsocket(sa->slots[i].connection);
     251                 : 
     252 GIC        8407 :         if (sock >= 0 && FD_ISSET(sock, &slotset))
     253 ECB             :         {
     254                 :             /* select() says input is available, so consume it */
     255 GIC        8335 :             PQconsumeInput(sa->slots[i].connection);
     256                 :         }
     257 ECB             : 
     258                 :         /* Collect result(s) as long as any are available */
     259 CBC       16696 :         while (!PQisBusy(sa->slots[i].connection))
     260                 :         {
     261           16578 :             PGresult   *result = PQgetResult(sa->slots[i].connection);
     262                 : 
     263 GIC       16578 :             if (result != NULL)
     264 ECB             :             {
     265 EUB             :                 /* Handle and discard the command result */
     266 GIC        8289 :                 if (!processQueryResult(&sa->slots[i], result))
     267 UIC           0 :                     return false;
     268                 :             }
     269                 :             else
     270 ECB             :             {
     271                 :                 /* This connection has become idle */
     272 CBC        8289 :                 sa->slots[i].inUse = false;
     273 GIC        8289 :                 ParallelSlotClearHandler(&sa->slots[i]);
     274            8289 :                 break;
     275                 :             }
     276 ECB             :         }
     277                 :     }
     278 GIC        8335 :     return true;
     279                 : }
     280                 : 
     281                 : /*
     282                 :  * Open a new database connection using the stored connection parameters and
     283                 :  * optionally a given dbname if not null, execute the stored initial command if
     284                 :  * any, and associate the new connection with the given slot.
     285 ECB             :  */
     286                 : static void
     287 GIC          17 : connect_slot(ParallelSlotArray *sa, int slotno, const char *dbname)
     288 ECB             : {
     289                 :     const char *old_override;
     290 CBC          17 :     ParallelSlot *slot = &sa->slots[slotno];
     291 ECB             : 
     292 CBC          17 :     old_override = sa->cparams->override_dbname;
     293              17 :     if (dbname)
     294              14 :         sa->cparams->override_dbname = dbname;
     295 GIC          17 :     slot->connection = connectDatabase(sa->cparams, sa->progname, sa->echo, false, true);
     296 CBC          17 :     sa->cparams->override_dbname = old_override;
     297 EUB             : 
     298 GIC          17 :     if (PQsocket(slot->connection) >= FD_SETSIZE)
     299 UIC           0 :         pg_fatal("too many jobs for this platform");
     300 ECB             : 
     301 EUB             :     /* Setup the connection using the supplied command, if any. */
     302 CBC          17 :     if (sa->initcmd)
     303 UIC           0 :         executeCommand(slot->connection, sa->initcmd, sa->echo);
     304 GIC          17 : }
     305                 : 
     306                 : /*
     307                 :  * ParallelSlotsGetIdle
     308                 :  *      Return a connection slot that is ready to execute a command.
     309                 :  *
     310                 :  * The slot returned is chosen as follows:
     311                 :  *
     312                 :  * If any idle slot already has an open connection, and if either dbname is
     313                 :  * null or the existing connection is to the given database, that slot will be
     314                 :  * returned allowing the connection to be reused.
     315                 :  *
     316                 :  * Otherwise, if any idle slot is not yet connected to any database, the slot
     317                 :  * will be returned with it's connection opened using the stored cparams and
     318                 :  * optionally the given dbname if not null.
     319                 :  *
     320                 :  * Otherwise, if any idle slot exists, an idle slot will be chosen and returned
     321                 :  * after having it's connection disconnected and reconnected using the stored
     322                 :  * cparams and optionally the given dbname if not null.
     323                 :  *
     324                 :  * Otherwise, if any slots have connections that are busy, we loop on select()
     325                 :  * until one socket becomes available.  When this happens, we read the whole
     326                 :  * set and mark as free all sockets that become available.  We then select a
     327                 :  * slot using the same rules as above.
     328                 :  *
     329                 :  * Otherwise, we cannot return a slot, which is an error, and NULL is returned.
     330                 :  *
     331                 :  * For any connection created, if the stored initcmd is not null, it will be
     332                 :  * executed as a command on the newly formed connection before the slot is
     333                 :  * returned.
     334                 :  *
     335                 :  * If an error occurs, NULL is returned.
     336 ECB             :  */
     337                 : ParallelSlot *
     338 GIC        8432 : ParallelSlotsGetIdle(ParallelSlotArray *sa, const char *dbname)
     339                 : {
     340 ECB             :     int         offset;
     341                 : 
     342 GIC        8432 :     Assert(sa);
     343            8432 :     Assert(sa->numslots > 0);
     344                 : 
     345                 :     while (1)
     346 ECB             :     {
     347                 :         /* First choice: a slot already connected to the desired database. */
     348 GIC       16767 :         offset = find_matching_idle_slot(sa, dbname);
     349 CBC       16767 :         if (offset >= 0)
     350 ECB             :         {
     351 GIC        8415 :             sa->slots[offset].inUse = true;
     352            8415 :             return &sa->slots[offset];
     353                 :         }
     354 ECB             : 
     355                 :         /* Second choice: a slot not connected to any database. */
     356 GIC        8352 :         offset = find_unconnected_slot(sa);
     357 CBC        8352 :         if (offset >= 0)
     358 ECB             :         {
     359 CBC           6 :             connect_slot(sa, offset, dbname);
     360 GIC           6 :             sa->slots[offset].inUse = true;
     361               6 :             return &sa->slots[offset];
     362                 :         }
     363 ECB             : 
     364                 :         /* Third choice: a slot connected to the wrong database. */
     365 GIC        8346 :         offset = find_any_idle_slot(sa);
     366 CBC        8346 :         if (offset >= 0)
     367 ECB             :         {
     368 CBC          11 :             disconnectDatabase(sa->slots[offset].connection);
     369              11 :             sa->slots[offset].connection = NULL;
     370              11 :             connect_slot(sa, offset, dbname);
     371 GIC          11 :             sa->slots[offset].inUse = true;
     372              11 :             return &sa->slots[offset];
     373                 :         }
     374                 : 
     375                 :         /*
     376                 :          * Fourth choice: block until one or more slots become available. If
     377                 :          * any slots hit a fatal error, we'll find out about that here and
     378 ECB             :          * return NULL.
     379 EUB             :          */
     380 GIC        8335 :         if (!wait_on_slots(sa))
     381 UIC           0 :             return NULL;
     382                 :     }
     383                 : }
     384                 : 
     385                 : /*
     386                 :  * ParallelSlotsSetup
     387                 :  *      Prepare a set of parallel slots but do not connect to any database.
     388                 :  *
     389                 :  * This creates and initializes a set of slots, marking all parallel slots as
     390                 :  * free and ready to use.  Establishing connections is delayed until requesting
     391                 :  * a free slot.  The cparams, progname, echo, and initcmd are stored for later
     392                 :  * use and must remain valid for the lifetime of the returned array.
     393 ECB             :  */
     394                 : ParallelSlotArray *
     395 GIC         105 : ParallelSlotsSetup(int numslots, ConnParams *cparams, const char *progname,
     396                 :                    bool echo, const char *initcmd)
     397                 : {
     398 ECB             :     ParallelSlotArray *sa;
     399                 : 
     400 CBC         105 :     Assert(numslots > 0);
     401 GIC         105 :     Assert(cparams != NULL);
     402 CBC         105 :     Assert(progname != NULL);
     403 ECB             : 
     404 GIC         105 :     sa = (ParallelSlotArray *) palloc0(offsetof(ParallelSlotArray, slots) +
     405 CBC         105 :                                        numslots * sizeof(ParallelSlot));
     406 ECB             : 
     407 CBC         105 :     sa->numslots = numslots;
     408             105 :     sa->cparams = cparams;
     409             105 :     sa->progname = progname;
     410 GIC         105 :     sa->echo = echo;
     411 CBC         105 :     sa->initcmd = initcmd;
     412                 : 
     413 GIC         105 :     return sa;
     414                 : }
     415                 : 
     416                 : /*
     417                 :  * ParallelSlotsAdoptConn
     418                 :  *      Assign an open connection to the slots array for reuse.
     419                 :  *
     420                 :  * This turns over ownership of an open connection to a slots array.  The
     421                 :  * caller should not further use or close the connection.  All the connection's
     422                 :  * parameters (user, host, port, etc.) except possibly dbname should match
     423                 :  * those of the slots array's cparams, as given in ParallelSlotsSetup.  If
     424                 :  * these parameters differ, subsequent behavior is undefined.
     425 ECB             :  */
     426                 : void
     427 GIC         102 : ParallelSlotsAdoptConn(ParallelSlotArray *sa, PGconn *conn)
     428                 : {
     429 ECB             :     int         offset;
     430                 : 
     431 CBC         102 :     offset = find_unconnected_slot(sa);
     432 GIC         102 :     if (offset >= 0)
     433 GBC         102 :         sa->slots[offset].connection = conn;
     434 ECB             :     else
     435 UIC           0 :         disconnectDatabase(conn);
     436 GIC         102 : }
     437                 : 
     438                 : /*
     439                 :  * ParallelSlotsTerminate
     440                 :  *      Clean up a set of parallel slots
     441                 :  *
     442                 :  * Iterate through all connections in a given set of ParallelSlots and
     443                 :  * terminate all connections.
     444 ECB             :  */
     445                 : void
     446 GIC         105 : ParallelSlotsTerminate(ParallelSlotArray *sa)
     447                 : {
     448 ECB             :     int         i;
     449                 : 
     450 CBC         213 :     for (i = 0; i < sa->numslots; i++)
     451                 :     {
     452             108 :         PGconn     *conn = sa->slots[i].connection;
     453 EUB             : 
     454 GIC         108 :         if (conn == NULL)
     455 LBC           0 :             continue;
     456                 : 
     457 CBC         108 :         disconnectDatabase(conn);
     458                 :     }
     459 GIC         105 : }
     460                 : 
     461                 : /*
     462                 :  * ParallelSlotsWaitCompletion
     463                 :  *
     464                 :  * Wait for all connections to finish, returning false if at least one
     465                 :  * error has been found on the way.
     466 ECB             :  */
     467                 : bool
     468 GIC         140 : ParallelSlotsWaitCompletion(ParallelSlotArray *sa)
     469                 : {
     470 ECB             :     int         i;
     471                 : 
     472 CBC         277 :     for (i = 0; i < sa->numslots; i++)
     473 EUB             :     {
     474 CBC         144 :         if (sa->slots[i].connection == NULL)
     475 LBC           0 :             continue;
     476 GIC         144 :         if (!consumeQueryResult(&sa->slots[i]))
     477 CBC           7 :             return false;
     478                 :         /* Mark connection as idle */
     479 GNC         137 :         sa->slots[i].inUse = false;
     480             137 :         ParallelSlotClearHandler(&sa->slots[i]);
     481 ECB             :     }
     482                 : 
     483 GIC         133 :     return true;
     484 ECB             : }
     485                 : 
     486                 : /*
     487                 :  * TableCommandResultHandler
     488                 :  *
     489                 :  * ParallelSlotResultHandler for results of commands (not queries) against
     490                 :  * tables.
     491                 :  *
     492                 :  * Requires that the result status is either PGRES_COMMAND_OK or an error about
     493                 :  * a missing table.  This is useful for utilities that compile a list of tables
     494                 :  * to process and then run commands (vacuum, reindex, or whatever) against
     495                 :  * those tables, as there is a race condition between the time the list is
     496                 :  * compiled and the time the command attempts to open the table.
     497                 :  *
     498                 :  * For missing tables, logs an error but allows processing to continue.
     499                 :  *
     500                 :  * For all other errors, logs an error and terminates further processing.
     501                 :  *
     502                 :  * res: PGresult from the query executed on the slot's connection
     503                 :  * conn: connection belonging to the slot
     504                 :  * context: unused
     505                 :  */
     506                 : bool
     507 GIC        2605 : TableCommandResultHandler(PGresult *res, PGconn *conn, void *context)
     508 ECB             : {
     509 GIC        2605 :     Assert(res != NULL);
     510 CBC        2605 :     Assert(conn != NULL);
     511 ECB             : 
     512                 :     /*
     513                 :      * If it's an error, report it.  Errors about a missing table are harmless
     514                 :      * so we continue processing; but die for other errors.
     515                 :      */
     516 GIC        2605 :     if (PQresultStatus(res) != PGRES_COMMAND_OK)
     517 ECB             :     {
     518 GIC           7 :         char       *sqlState = PQresultErrorField(res, PG_DIAG_SQLSTATE);
     519 ECB             : 
     520 GIC           7 :         pg_log_error("processing of database \"%s\" failed: %s",
     521 ECB             :                      PQdb(conn), PQerrorMessage(conn));
     522                 : 
     523 GIC           7 :         if (sqlState && strcmp(sqlState, ERRCODE_UNDEFINED_TABLE) != 0)
     524 ECB             :         {
     525 GIC           7 :             PQclear(res);
     526 CBC           7 :             return false;
     527 ECB             :         }
     528                 :     }
     529                 : 
     530 GIC        2598 :     return true;
     531 ECB             : }
        

Generated by: LCOV version v1.16-55-g56c0a2a