LCOV - differential code coverage report
Current view: top level - src/fe_utils - parallel_slot.c (source / functions) Coverage Total Hit LBC UIC UBC GBC GIC GNC CBC EUB ECB
Current: Differential Code Coverage HEAD vs 15 Lines: 90.8 % 163 148 4 9 2 1 90 2 55 12 89
Current Date: 2023-04-08 17:13:01 Functions: 100.0 % 14 14 14 14
Baseline: 15 Line coverage date bins:
Baseline Date: 2023-04-08 15:09:40 (60,120] days: 100.0 % 2 2 2
Legend: Lines: hit not hit (180,240] days: 100.0 % 1 1 1
(240..) days: 90.6 % 160 145 4 9 2 1 90 54 12 89
Function coverage date bins:
(240..) days: 50.0 % 28 14 14 14

 Age         Owner                  TLA  Line data    Source code
                                  1                 : /*-------------------------------------------------------------------------
                                  2                 :  *
                                  3                 :  *  parallel_slot.c
                                  4                 :  *      Parallel support for front-end parallel database connections
                                  5                 :  *
                                  6                 :  *
                                  7                 :  * Portions Copyright (c) 1996-2023, PostgreSQL Global Development Group
                                  8                 :  * Portions Copyright (c) 1994, Regents of the University of California
                                  9                 :  *
                                 10                 :  * src/fe_utils/parallel_slot.c
                                 11                 :  *
                                 12                 :  *-------------------------------------------------------------------------
                                 13                 :  */
                                 14                 : 
                                 15                 : #if defined(WIN32) && FD_SETSIZE < 1024
                                 16                 : #error FD_SETSIZE needs to have been increased
                                 17                 : #endif
                                 18                 : 
                                 19                 : #include "postgres_fe.h"
                                 20                 : 
                                 21                 : #include <sys/select.h>
                                 22                 : 
                                 23                 : #include "common/logging.h"
                                 24                 : #include "fe_utils/cancel.h"
                                 25                 : #include "fe_utils/parallel_slot.h"
                                 26                 : #include "fe_utils/query_utils.h"
                                 27                 : 
                                 28                 : #define ERRCODE_UNDEFINED_TABLE  "42P01"
                                 29                 : 
                                 30                 : static int  select_loop(int maxFd, fd_set *workerset);
                                 31                 : static bool processQueryResult(ParallelSlot *slot, PGresult *result);
                                 32                 : 
                                 33                 : /*
                                 34                 :  * Process (and delete) a query result.  Returns true if there's no problem,
                                 35                 :  * false otherwise. It's up to the handler to decide what constitutes a
                                 36                 :  * problem.
  793 rhaas                      37 ECB             :  */
                                 38                 : static bool
  793 rhaas                      39 CBC        8433 : processQueryResult(ParallelSlot *slot, PGresult *result)
                                 40                 : {
  793 rhaas                      41 GIC        8433 :     Assert(slot->handler != NULL);
  793 rhaas                      42 ECB             : 
                                 43                 :     /* On failure, the handler should return NULL after freeing the result */
  793 rhaas                      44 GIC        8433 :     if (!slot->handler(result, slot->connection, slot->handler_context))
                                 45               7 :         return false;
  793 rhaas                      46 ECB             : 
                                 47                 :     /* Ok, we have to free it ourself */
  793 rhaas                      48 GIC        8426 :     PQclear(result);
                                 49            8426 :     return true;
                                 50                 : }
                                 51                 : 
                                 52                 : /*
                                 53                 :  * Consume all the results generated for the given connection until
                                 54                 :  * nothing remains.  If at least one error is encountered, return false.
                                 55                 :  * Note that this will block if the connection is busy.
  793 rhaas                      56 ECB             :  */
                                 57                 : static bool
  793 rhaas                      58 CBC         144 : consumeQueryResult(ParallelSlot *slot)
                                 59                 : {
  793 rhaas                      60 GIC         144 :     bool        ok = true;
  793 rhaas                      61 ECB             :     PGresult   *result;
                                 62                 : 
  793 rhaas                      63 GIC         144 :     SetCancelConn(slot->connection);
  793 rhaas                      64 CBC         288 :     while ((result = PQgetResult(slot->connection)) != NULL)
  793 rhaas                      65 ECB             :     {
  793 rhaas                      66 GIC         144 :         if (!processQueryResult(slot, result))
  793 rhaas                      67 CBC           7 :             ok = false;
  793 rhaas                      68 ECB             :     }
  793 rhaas                      69 GIC         144 :     ResetCancelConn();
                                 70             144 :     return ok;
                                 71                 : }
                                 72                 : 
                                 73                 : /*
                                 74                 :  * Wait until a file descriptor from the given set becomes readable.
                                 75                 :  *
                                 76                 :  * Returns the number of ready descriptors, or -1 on failure (including
                                 77                 :  * getting a cancel request).
 1360 michael                    78 ECB             :  */
                                 79                 : static int
 1036 tgl                        80 GIC        8335 : select_loop(int maxFd, fd_set *workerset)
 1360 michael                    81 ECB             : {
                                 82                 :     int         i;
 1360 michael                    83 CBC        8335 :     fd_set      saveSet = *workerset;
 1360 michael                    84 EUB             : 
 1360 michael                    85 GIC        8335 :     if (CancelRequested)
 1360 michael                    86 UIC           0 :         return -1;
 1360 michael                    87 EUB             : 
                                 88                 :     for (;;)
 1360 michael                    89 UIC           0 :     {
                                 90                 :         /*
                                 91                 :          * On Windows, we need to check once in a while for cancel requests;
                                 92                 :          * on other platforms we rely on select() returning when interrupted.
                                 93                 :          */
                                 94                 :         struct timeval *tvp;
                                 95                 : #ifdef WIN32
                                 96                 :         struct timeval tv = {0, 1000000};
                                 97                 : 
 1360 michael                    98 ECB             :         tvp = &tv;
                                 99                 : #else
 1360 michael                   100 GIC        8335 :         tvp = NULL;
 1360 michael                   101 ECB             : #endif
                                102                 : 
 1360 michael                   103 GIC        8335 :         *workerset = saveSet;
                                104            8335 :         i = select(maxFd + 1, workerset, NULL, NULL, tvp);
                                105                 : 
                                106                 : #ifdef WIN32
                                107                 :         if (i == SOCKET_ERROR)
                                108                 :         {
                                109                 :             i = -1;
                                110                 : 
                                111                 :             if (WSAGetLastError() == WSAEINTR)
                                112                 :                 errno = EINTR;
                                113                 :         }
 1360 michael                   114 ECB             : #endif
 1360 michael                   115 EUB             : 
 1360 michael                   116 CBC        8335 :         if (i < 0 && errno == EINTR)
 1360 michael                   117 UBC           0 :             continue;           /* ignore this */
 1360 michael                   118 CBC        8335 :         if (i < 0 || CancelRequested)
 1036 tgl                       119 UBC           0 :             return -1;          /* but not this */
 1360 michael                   120 CBC        8335 :         if (i == 0)
 1360 michael                   121 UIC           0 :             continue;           /* timeout (Win32 only) */
 1360 michael                   122 GIC        8335 :         break;
 1360 michael                   123 ECB             :     }
                                124                 : 
 1360 michael                   125 GIC        8335 :     return i;
                                126                 : }
                                127                 : 
                                128                 : /*
                                129                 :  * Return the offset of a suitable idle slot, or -1 if none are available.  If
                                130                 :  * the given dbname is not null, only idle slots connected to the given
                                131                 :  * database are considered suitable, otherwise all idle connected slots are
                                132                 :  * considered suitable.
 1360 michael                   133 ECB             :  */
                                134                 : static int
  759 rhaas                     135 GIC       16767 : find_matching_idle_slot(const ParallelSlotArray *sa, const char *dbname)
                                136                 : {
 1360 michael                   137 ECB             :     int         i;
                                138                 : 
  759 rhaas                     139 CBC       25229 :     for (i = 0; i < sa->numslots; i++)
 1360 michael                   140 ECB             :     {
  759 rhaas                     141 GIC       16877 :         if (sa->slots[i].inUse)
  759 rhaas                     142 CBC        8445 :             continue;
  759 rhaas                     143 ECB             : 
  759 rhaas                     144 GIC        8432 :         if (sa->slots[i].connection == NULL)
  759 rhaas                     145 CBC           6 :             continue;
  759 rhaas                     146 ECB             : 
  759 rhaas                     147 CBC        8426 :         if (dbname == NULL ||
  759 rhaas                     148 GIC        5825 :             strcmp(PQdb(sa->slots[i].connection), dbname) == 0)
  759 rhaas                     149 CBC        8415 :             return i;
                                150                 :     }
  759 rhaas                     151 GIC        8352 :     return -1;
                                152                 : }
                                153                 : 
                                154                 : /*
                                155                 :  * Return the offset of the first slot without a database connection, or -1 if
                                156                 :  * all slots are connected.
  759 rhaas                     157 ECB             :  */
                                158                 : static int
  759 rhaas                     159 GIC        8454 : find_unconnected_slot(const ParallelSlotArray *sa)
                                160                 : {
  759 rhaas                     161 ECB             :     int         i;
                                162                 : 
  759 rhaas                     163 CBC       16875 :     for (i = 0; i < sa->numslots; i++)
  759 rhaas                     164 ECB             :     {
  759 rhaas                     165 GIC        8529 :         if (sa->slots[i].inUse)
  759 rhaas                     166 CBC        8410 :             continue;
  759 rhaas                     167 ECB             : 
  759 rhaas                     168 GIC         119 :         if (sa->slots[i].connection == NULL)
                                169             108 :             return i;
  759 rhaas                     170 ECB             :     }
                                171                 : 
  759 rhaas                     172 GIC        8346 :     return -1;
                                173                 : }
                                174                 : 
                                175                 : /*
                                176                 :  * Return the offset of the first idle slot, or -1 if all slots are busy.
  759 rhaas                     177 ECB             :  */
                                178                 : static int
  759 rhaas                     179 GIC        8346 : find_any_idle_slot(const ParallelSlotArray *sa)
                                180                 : {
  759 rhaas                     181 ECB             :     int         i;
                                182                 : 
  759 rhaas                     183 CBC       16753 :     for (i = 0; i < sa->numslots; i++)
  759 rhaas                     184 GIC        8418 :         if (!sa->slots[i].inUse)
  759 rhaas                     185 CBC          11 :             return i;
                                186                 : 
  759 rhaas                     187 GIC        8335 :     return -1;
                                188                 : }
                                189                 : 
                                190                 : /*
                                191                 :  * Wait for any slot's connection to have query results, consume the results,
                                192                 :  * and update the slot's status as appropriate.  Returns true on success,
                                193                 :  * false on cancellation, on error, or if no slots are connected.
  759 rhaas                     194 ECB             :  */
                                195                 : static bool
  759 rhaas                     196 GIC        8335 : wait_on_slots(ParallelSlotArray *sa)
                                197                 : {
  759 rhaas                     198 ECB             :     int         i;
                                199                 :     fd_set      slotset;
  759 rhaas                     200 GIC        8335 :     int         maxFd = 0;
                                201            8335 :     PGconn     *cancelconn = NULL;
  759 rhaas                     202 ECB             : 
                                203                 :     /* We must reconstruct the fd_set for each call to select_loop */
  759 rhaas                     204 CBC      141695 :     FD_ZERO(&slotset);
                                205                 : 
  759 rhaas                     206 GIC       16742 :     for (i = 0; i < sa->numslots; i++)
                                207                 :     {
                                208                 :         int         sock;
  759 rhaas                     209 ECB             : 
                                210                 :         /* We shouldn't get here if we still have slots without connections */
  759 rhaas                     211 CBC        8407 :         Assert(sa->slots[i].connection != NULL);
                                212                 : 
  759 rhaas                     213 GIC        8407 :         sock = PQsocket(sa->slots[i].connection);
                                214                 : 
                                215                 :         /*
                                216                 :          * We don't really expect any connections to lose their sockets after
  759 rhaas                     217 ECB             :          * startup, but just in case, cope by ignoring them.
  759 rhaas                     218 EUB             :          */
  759 rhaas                     219 GIC        8407 :         if (sock < 0)
  759 rhaas                     220 UIC           0 :             continue;
  759 rhaas                     221 ECB             : 
                                222                 :         /* Keep track of the first valid connection we see. */
  759 rhaas                     223 GIC        8407 :         if (cancelconn == NULL)
  759 rhaas                     224 CBC        8335 :             cancelconn = sa->slots[i].connection;
  759 rhaas                     225 ECB             : 
  759 rhaas                     226 CBC        8407 :         FD_SET(sock, &slotset);
  759 rhaas                     227 GIC        8407 :         if (sock > maxFd)
                                228            8407 :             maxFd = sock;
                                229                 :     }
                                230                 : 
                                231                 :     /*
                                232                 :      * If we get this far with no valid connections, processing cannot
  759 rhaas                     233 ECB             :      * continue.
 1360 michael                   234 EUB             :      */
  759 rhaas                     235 GIC        8335 :     if (cancelconn == NULL)
  759 rhaas                     236 LBC           0 :         return false;
  759 rhaas                     237 ECB             : 
  225 michael                   238 CBC        8335 :     SetCancelConn(cancelconn);
  759 rhaas                     239 GIC        8335 :     i = select_loop(maxFd, &slotset);
                                240            8335 :     ResetCancelConn();
  759 rhaas                     241 ECB             : 
  759 rhaas                     242 EUB             :     /* failure? */
  759 rhaas                     243 GIC        8335 :     if (i < 0)
  759 rhaas                     244 LBC           0 :         return false;
                                245                 : 
  759 rhaas                     246 GIC       16742 :     for (i = 0; i < sa->numslots; i++)
                                247                 :     {
  759 rhaas                     248 ECB             :         int         sock;
                                249                 : 
  759 rhaas                     250 CBC        8407 :         sock = PQsocket(sa->slots[i].connection);
                                251                 : 
  759 rhaas                     252 GIC        8407 :         if (sock >= 0 && FD_ISSET(sock, &slotset))
 1360 michael                   253 ECB             :         {
                                254                 :             /* select() says input is available, so consume it */
  759 rhaas                     255 GIC        8335 :             PQconsumeInput(sa->slots[i].connection);
                                256                 :         }
 1360 michael                   257 ECB             : 
                                258                 :         /* Collect result(s) as long as any are available */
  759 rhaas                     259 CBC       16696 :         while (!PQisBusy(sa->slots[i].connection))
                                260                 :         {
                                261           16578 :             PGresult   *result = PQgetResult(sa->slots[i].connection);
                                262                 : 
  759 rhaas                     263 GIC       16578 :             if (result != NULL)
 1360 michael                   264 ECB             :             {
  759 rhaas                     265 EUB             :                 /* Handle and discard the command result */
  759 rhaas                     266 GIC        8289 :                 if (!processQueryResult(&sa->slots[i], result))
  759 rhaas                     267 UIC           0 :                     return false;
                                268                 :             }
                                269                 :             else
 1360 michael                   270 ECB             :             {
  759 rhaas                     271                 :                 /* This connection has become idle */
  759 rhaas                     272 CBC        8289 :                 sa->slots[i].inUse = false;
  759 rhaas                     273 GIC        8289 :                 ParallelSlotClearHandler(&sa->slots[i]);
                                274            8289 :                 break;
                                275                 :             }
 1360 michael                   276 ECB             :         }
                                277                 :     }
  759 rhaas                     278 GIC        8335 :     return true;
                                279                 : }
                                280                 : 
                                281                 : /*
                                282                 :  * Open a new database connection using the stored connection parameters and
                                283                 :  * optionally a given dbname if not null, execute the stored initial command if
                                284                 :  * any, and associate the new connection with the given slot.
  759 rhaas                     285 ECB             :  */
                                286                 : static void
  759 rhaas                     287 GIC          17 : connect_slot(ParallelSlotArray *sa, int slotno, const char *dbname)
  759 rhaas                     288 ECB             : {
                                289                 :     const char *old_override;
  759 rhaas                     290 CBC          17 :     ParallelSlot *slot = &sa->slots[slotno];
  759 rhaas                     291 ECB             : 
  759 rhaas                     292 CBC          17 :     old_override = sa->cparams->override_dbname;
                                293              17 :     if (dbname)
                                294              14 :         sa->cparams->override_dbname = dbname;
  759 rhaas                     295 GIC          17 :     slot->connection = connectDatabase(sa->cparams, sa->progname, sa->echo, false, true);
  759 rhaas                     296 CBC          17 :     sa->cparams->override_dbname = old_override;
  759 rhaas                     297 EUB             : 
  759 rhaas                     298 GIC          17 :     if (PQsocket(slot->connection) >= FD_SETSIZE)
  366 tgl                       299 UIC           0 :         pg_fatal("too many jobs for this platform");
  759 rhaas                     300 ECB             : 
  759 rhaas                     301 EUB             :     /* Setup the connection using the supplied command, if any. */
  759 rhaas                     302 CBC          17 :     if (sa->initcmd)
  759 rhaas                     303 UIC           0 :         executeCommand(slot->connection, sa->initcmd, sa->echo);
 1360 michael                   304 GIC          17 : }
                                305                 : 
                                306                 : /*
                                307                 :  * ParallelSlotsGetIdle
                                308                 :  *      Return a connection slot that is ready to execute a command.
                                309                 :  *
                                310                 :  * The slot returned is chosen as follows:
                                311                 :  *
                                312                 :  * If any idle slot already has an open connection, and if either dbname is
                                313                 :  * null or the existing connection is to the given database, that slot will be
                                314                 :  * returned allowing the connection to be reused.
                                315                 :  *
                                316                 :  * Otherwise, if any idle slot is not yet connected to any database, the slot
                                317                 :  * will be returned with it's connection opened using the stored cparams and
                                318                 :  * optionally the given dbname if not null.
                                319                 :  *
                                320                 :  * Otherwise, if any idle slot exists, an idle slot will be chosen and returned
                                321                 :  * after having it's connection disconnected and reconnected using the stored
                                322                 :  * cparams and optionally the given dbname if not null.
                                323                 :  *
                                324                 :  * Otherwise, if any slots have connections that are busy, we loop on select()
                                325                 :  * until one socket becomes available.  When this happens, we read the whole
                                326                 :  * set and mark as free all sockets that become available.  We then select a
                                327                 :  * slot using the same rules as above.
                                328                 :  *
                                329                 :  * Otherwise, we cannot return a slot, which is an error, and NULL is returned.
                                330                 :  *
                                331                 :  * For any connection created, if the stored initcmd is not null, it will be
                                332                 :  * executed as a command on the newly formed connection before the slot is
                                333                 :  * returned.
                                334                 :  *
                                335                 :  * If an error occurs, NULL is returned.
 1360 michael                   336 ECB             :  */
                                337                 : ParallelSlot *
  759 rhaas                     338 GIC        8432 : ParallelSlotsGetIdle(ParallelSlotArray *sa, const char *dbname)
                                339                 : {
  759 rhaas                     340 ECB             :     int         offset;
 1360 michael                   341                 : 
  759 rhaas                     342 GIC        8432 :     Assert(sa);
                                343            8432 :     Assert(sa->numslots > 0);
                                344                 : 
                                345                 :     while (1)
 1360 michael                   346 ECB             :     {
  759 rhaas                     347                 :         /* First choice: a slot already connected to the desired database. */
  759 rhaas                     348 GIC       16767 :         offset = find_matching_idle_slot(sa, dbname);
  759 rhaas                     349 CBC       16767 :         if (offset >= 0)
 1360 michael                   350 ECB             :         {
  759 rhaas                     351 GIC        8415 :             sa->slots[offset].inUse = true;
                                352            8415 :             return &sa->slots[offset];
                                353                 :         }
  759 rhaas                     354 ECB             : 
                                355                 :         /* Second choice: a slot not connected to any database. */
  759 rhaas                     356 GIC        8352 :         offset = find_unconnected_slot(sa);
  759 rhaas                     357 CBC        8352 :         if (offset >= 0)
  759 rhaas                     358 ECB             :         {
  759 rhaas                     359 CBC           6 :             connect_slot(sa, offset, dbname);
  759 rhaas                     360 GIC           6 :             sa->slots[offset].inUse = true;
                                361               6 :             return &sa->slots[offset];
                                362                 :         }
 1322 michael                   363 ECB             : 
  759 rhaas                     364                 :         /* Third choice: a slot connected to the wrong database. */
  759 rhaas                     365 GIC        8346 :         offset = find_any_idle_slot(sa);
  759 rhaas                     366 CBC        8346 :         if (offset >= 0)
  759 rhaas                     367 ECB             :         {
  759 rhaas                     368 CBC          11 :             disconnectDatabase(sa->slots[offset].connection);
                                369              11 :             sa->slots[offset].connection = NULL;
                                370              11 :             connect_slot(sa, offset, dbname);
  759 rhaas                     371 GIC          11 :             sa->slots[offset].inUse = true;
                                372              11 :             return &sa->slots[offset];
                                373                 :         }
                                374                 : 
                                375                 :         /*
                                376                 :          * Fourth choice: block until one or more slots become available. If
                                377                 :          * any slots hit a fatal error, we'll find out about that here and
  759 rhaas                     378 ECB             :          * return NULL.
  759 rhaas                     379 EUB             :          */
  759 rhaas                     380 GIC        8335 :         if (!wait_on_slots(sa))
  759 rhaas                     381 UIC           0 :             return NULL;
                                382                 :     }
                                383                 : }
                                384                 : 
                                385                 : /*
                                386                 :  * ParallelSlotsSetup
                                387                 :  *      Prepare a set of parallel slots but do not connect to any database.
                                388                 :  *
                                389                 :  * This creates and initializes a set of slots, marking all parallel slots as
                                390                 :  * free and ready to use.  Establishing connections is delayed until requesting
                                391                 :  * a free slot.  The cparams, progname, echo, and initcmd are stored for later
                                392                 :  * use and must remain valid for the lifetime of the returned array.
  759 rhaas                     393 ECB             :  */
                                394                 : ParallelSlotArray *
  759 rhaas                     395 GIC         105 : ParallelSlotsSetup(int numslots, ConnParams *cparams, const char *progname,
                                396                 :                    bool echo, const char *initcmd)
                                397                 : {
  759 rhaas                     398 ECB             :     ParallelSlotArray *sa;
 1360 michael                   399                 : 
  759 rhaas                     400 CBC         105 :     Assert(numslots > 0);
  759 rhaas                     401 GIC         105 :     Assert(cparams != NULL);
  759 rhaas                     402 CBC         105 :     Assert(progname != NULL);
  759 rhaas                     403 ECB             : 
  759 rhaas                     404 GIC         105 :     sa = (ParallelSlotArray *) palloc0(offsetof(ParallelSlotArray, slots) +
  759 rhaas                     405 CBC         105 :                                        numslots * sizeof(ParallelSlot));
  759 rhaas                     406 ECB             : 
  759 rhaas                     407 CBC         105 :     sa->numslots = numslots;
                                408             105 :     sa->cparams = cparams;
                                409             105 :     sa->progname = progname;
  759 rhaas                     410 GIC         105 :     sa->echo = echo;
  759 rhaas                     411 CBC         105 :     sa->initcmd = initcmd;
                                412                 : 
  759 rhaas                     413 GIC         105 :     return sa;
                                414                 : }
                                415                 : 
                                416                 : /*
                                417                 :  * ParallelSlotsAdoptConn
                                418                 :  *      Assign an open connection to the slots array for reuse.
                                419                 :  *
                                420                 :  * This turns over ownership of an open connection to a slots array.  The
                                421                 :  * caller should not further use or close the connection.  All the connection's
                                422                 :  * parameters (user, host, port, etc.) except possibly dbname should match
                                423                 :  * those of the slots array's cparams, as given in ParallelSlotsSetup.  If
                                424                 :  * these parameters differ, subsequent behavior is undefined.
  759 rhaas                     425 ECB             :  */
                                426                 : void
  759 rhaas                     427 GIC         102 : ParallelSlotsAdoptConn(ParallelSlotArray *sa, PGconn *conn)
                                428                 : {
  759 rhaas                     429 ECB             :     int         offset;
                                430                 : 
  759 rhaas                     431 CBC         102 :     offset = find_unconnected_slot(sa);
  759 rhaas                     432 GIC         102 :     if (offset >= 0)
  759 rhaas                     433 GBC         102 :         sa->slots[offset].connection = conn;
  759 rhaas                     434 ECB             :     else
  759 rhaas                     435 UIC           0 :         disconnectDatabase(conn);
 1360 michael                   436 GIC         102 : }
                                437                 : 
                                438                 : /*
                                439                 :  * ParallelSlotsTerminate
                                440                 :  *      Clean up a set of parallel slots
                                441                 :  *
                                442                 :  * Iterate through all connections in a given set of ParallelSlots and
                                443                 :  * terminate all connections.
 1360 michael                   444 ECB             :  */
                                445                 : void
  759 rhaas                     446 GIC         105 : ParallelSlotsTerminate(ParallelSlotArray *sa)
                                447                 : {
 1360 michael                   448 ECB             :     int         i;
                                449                 : 
  759 rhaas                     450 CBC         213 :     for (i = 0; i < sa->numslots; i++)
                                451                 :     {
                                452             108 :         PGconn     *conn = sa->slots[i].connection;
 1360 michael                   453 EUB             : 
 1360 michael                   454 GIC         108 :         if (conn == NULL)
 1360 michael                   455 LBC           0 :             continue;
                                456                 : 
 1360 michael                   457 CBC         108 :         disconnectDatabase(conn);
                                458                 :     }
 1360 michael                   459 GIC         105 : }
                                460                 : 
                                461                 : /*
                                462                 :  * ParallelSlotsWaitCompletion
                                463                 :  *
                                464                 :  * Wait for all connections to finish, returning false if at least one
                                465                 :  * error has been found on the way.
 1360 michael                   466 ECB             :  */
                                467                 : bool
  759 rhaas                     468 GIC         140 : ParallelSlotsWaitCompletion(ParallelSlotArray *sa)
                                469                 : {
 1360 michael                   470 ECB             :     int         i;
                                471                 : 
  759 rhaas                     472 CBC         277 :     for (i = 0; i < sa->numslots; i++)
 1360 michael                   473 EUB             :     {
  759 rhaas                     474 CBC         144 :         if (sa->slots[i].connection == NULL)
  759 rhaas                     475 LBC           0 :             continue;
  759 rhaas                     476 GIC         144 :         if (!consumeQueryResult(&sa->slots[i]))
  793 rhaas                     477 CBC           7 :             return false;
                                478                 :         /* Mark connection as idle */
   93 tgl                       479 GNC         137 :         sa->slots[i].inUse = false;
                                480             137 :         ParallelSlotClearHandler(&sa->slots[i]);
  793 rhaas                     481 ECB             :     }
                                482                 : 
  793 rhaas                     483 GIC         133 :     return true;
  793 rhaas                     484 ECB             : }
                                485                 : 
                                486                 : /*
                                487                 :  * TableCommandResultHandler
                                488                 :  *
                                489                 :  * ParallelSlotResultHandler for results of commands (not queries) against
                                490                 :  * tables.
                                491                 :  *
                                492                 :  * Requires that the result status is either PGRES_COMMAND_OK or an error about
                                493                 :  * a missing table.  This is useful for utilities that compile a list of tables
                                494                 :  * to process and then run commands (vacuum, reindex, or whatever) against
                                495                 :  * those tables, as there is a race condition between the time the list is
                                496                 :  * compiled and the time the command attempts to open the table.
                                497                 :  *
                                498                 :  * For missing tables, logs an error but allows processing to continue.
                                499                 :  *
                                500                 :  * For all other errors, logs an error and terminates further processing.
                                501                 :  *
                                502                 :  * res: PGresult from the query executed on the slot's connection
                                503                 :  * conn: connection belonging to the slot
                                504                 :  * context: unused
                                505                 :  */
                                506                 : bool
  793 rhaas                     507 GIC        2605 : TableCommandResultHandler(PGresult *res, PGconn *conn, void *context)
  793 rhaas                     508 ECB             : {
  759 rhaas                     509 GIC        2605 :     Assert(res != NULL);
  759 rhaas                     510 CBC        2605 :     Assert(conn != NULL);
  759 rhaas                     511 ECB             : 
                                512                 :     /*
                                513                 :      * If it's an error, report it.  Errors about a missing table are harmless
                                514                 :      * so we continue processing; but die for other errors.
                                515                 :      */
  793 rhaas                     516 GIC        2605 :     if (PQresultStatus(res) != PGRES_COMMAND_OK)
  793 rhaas                     517 ECB             :     {
  793 rhaas                     518 GIC           7 :         char       *sqlState = PQresultErrorField(res, PG_DIAG_SQLSTATE);
  793 rhaas                     519 ECB             : 
  793 rhaas                     520 GIC           7 :         pg_log_error("processing of database \"%s\" failed: %s",
  793 rhaas                     521 ECB             :                      PQdb(conn), PQerrorMessage(conn));
                                522                 : 
  793 rhaas                     523 GIC           7 :         if (sqlState && strcmp(sqlState, ERRCODE_UNDEFINED_TABLE) != 0)
  793 rhaas                     524 ECB             :         {
  793 rhaas                     525 GIC           7 :             PQclear(res);
 1360 michael                   526 CBC           7 :             return false;
  793 rhaas                     527 ECB             :         }
                                528                 :     }
                                529                 : 
 1360 michael                   530 GIC        2598 :     return true;
 1360 michael                   531 ECB             : }
        

Generated by: LCOV version v1.16-55-g56c0a2a