LCOV - differential code coverage report
Current view: top level - src/fe_utils - parallel_slot.c (source / functions) Coverage Total Hit UBC CBC
Current: Differential Code Coverage 16@8cea358b128 vs 17@8cea358b128 Lines: 89.8 % 166 149 17 149
Current Date: 2024-04-14 14:21:10 Functions: 100.0 % 14 14 14
Baseline: 16@8cea358b128 Branches: 73.7 % 114 84 30 84
Baseline Date: 2024-04-14 14:21:09 Line coverage date bins:
Legend: Lines: hit not hit | Branches: + taken - not taken # not executed (180,240] days: 40.0 % 5 2 3 2
(240..) days: 91.3 % 161 147 14 147
Function coverage date bins:
(240..) days: 100.0 % 14 14 14
Branch coverage date bins:
(180,240] days: 50.0 % 2 1 1 1
(240..) days: 74.1 % 112 83 29 83

 Age         Owner                    Branch data    TLA  Line data    Source code
                                  1                 :                : /*-------------------------------------------------------------------------
                                  2                 :                :  *
                                  3                 :                :  *  parallel_slot.c
                                  4                 :                :  *      Parallel support for front-end parallel database connections
                                  5                 :                :  *
                                  6                 :                :  *
                                  7                 :                :  * Portions Copyright (c) 1996-2024, PostgreSQL Global Development Group
                                  8                 :                :  * Portions Copyright (c) 1994, Regents of the University of California
                                  9                 :                :  *
                                 10                 :                :  * src/fe_utils/parallel_slot.c
                                 11                 :                :  *
                                 12                 :                :  *-------------------------------------------------------------------------
                                 13                 :                :  */
                                 14                 :                : 
                                 15                 :                : #if defined(WIN32) && FD_SETSIZE < 1024
                                 16                 :                : #error FD_SETSIZE needs to have been increased
                                 17                 :                : #endif
                                 18                 :                : 
                                 19                 :                : #include "postgres_fe.h"
                                 20                 :                : 
                                 21                 :                : #include <sys/select.h>
                                 22                 :                : 
                                 23                 :                : #include "common/logging.h"
                                 24                 :                : #include "fe_utils/cancel.h"
                                 25                 :                : #include "fe_utils/parallel_slot.h"
                                 26                 :                : #include "fe_utils/query_utils.h"
                                 27                 :                : 
                                 28                 :                : #define ERRCODE_UNDEFINED_TABLE  "42P01"
                                 29                 :                : 
                                 30                 :                : static int  select_loop(int maxFd, fd_set *workerset);
                                 31                 :                : static bool processQueryResult(ParallelSlot *slot, PGresult *result);
                                 32                 :                : 
                                 33                 :                : /*
                                 34                 :                :  * Process (and delete) a query result.  Returns true if there's no problem,
                                 35                 :                :  * false otherwise. It's up to the handler to decide what constitutes a
                                 36                 :                :  * problem.
                                 37                 :                :  */
                                 38                 :                : static bool
 1164 rhaas@postgresql.org       39                 :CBC       11319 : processQueryResult(ParallelSlot *slot, PGresult *result)
                                 40                 :                : {
                                 41         [ -  + ]:          11319 :     Assert(slot->handler != NULL);
                                 42                 :                : 
                                 43                 :                :     /* On failure, the handler should return NULL after freeing the result */
                                 44         [ +  + ]:          11319 :     if (!slot->handler(result, slot->connection, slot->handler_context))
                                 45                 :              6 :         return false;
                                 46                 :                : 
                                 47                 :                :     /* Ok, we have to free it ourself */
                                 48                 :          11313 :     PQclear(result);
                                 49                 :          11313 :     return true;
                                 50                 :                : }
                                 51                 :                : 
                                 52                 :                : /*
                                 53                 :                :  * Consume all the results generated for the given connection until
                                 54                 :                :  * nothing remains.  If at least one error is encountered, return false.
                                 55                 :                :  * Note that this will block if the connection is busy.
                                 56                 :                :  */
                                 57                 :                : static bool
                                 58                 :            203 : consumeQueryResult(ParallelSlot *slot)
                                 59                 :                : {
                                 60                 :            203 :     bool        ok = true;
                                 61                 :                :     PGresult   *result;
                                 62                 :                : 
                                 63                 :            203 :     SetCancelConn(slot->connection);
                                 64         [ +  + ]:            406 :     while ((result = PQgetResult(slot->connection)) != NULL)
                                 65                 :                :     {
                                 66         [ +  + ]:            203 :         if (!processQueryResult(slot, result))
                                 67                 :              6 :             ok = false;
                                 68                 :                :     }
                                 69                 :            203 :     ResetCancelConn();
                                 70                 :            203 :     return ok;
                                 71                 :                : }
                                 72                 :                : 
                                 73                 :                : /*
                                 74                 :                :  * Wait until a file descriptor from the given set becomes readable.
                                 75                 :                :  *
                                 76                 :                :  * Returns the number of ready descriptors, or -1 on failure (including
                                 77                 :                :  * getting a cancel request).
                                 78                 :                :  */
                                 79                 :                : static int
 1407 tgl@sss.pgh.pa.us          80                 :          11164 : select_loop(int maxFd, fd_set *workerset)
                                 81                 :                : {
                                 82                 :                :     int         i;
 1731 michael@paquier.xyz        83                 :          11164 :     fd_set      saveSet = *workerset;
                                 84                 :                : 
                                 85         [ -  + ]:          11164 :     if (CancelRequested)
 1731 michael@paquier.xyz        86                 :UBC           0 :         return -1;
                                 87                 :                : 
                                 88                 :                :     for (;;)
                                 89                 :              0 :     {
                                 90                 :                :         /*
                                 91                 :                :          * On Windows, we need to check once in a while for cancel requests;
                                 92                 :                :          * on other platforms we rely on select() returning when interrupted.
                                 93                 :                :          */
                                 94                 :                :         struct timeval *tvp;
                                 95                 :                : #ifdef WIN32
                                 96                 :                :         struct timeval tv = {0, 1000000};
                                 97                 :                : 
                                 98                 :                :         tvp = &tv;
                                 99                 :                : #else
 1731 michael@paquier.xyz       100                 :CBC       11164 :         tvp = NULL;
                                101                 :                : #endif
                                102                 :                : 
                                103                 :          11164 :         *workerset = saveSet;
                                104                 :          11164 :         i = select(maxFd + 1, workerset, NULL, NULL, tvp);
                                105                 :                : 
                                106                 :                : #ifdef WIN32
                                107                 :                :         if (i == SOCKET_ERROR)
                                108                 :                :         {
                                109                 :                :             i = -1;
                                110                 :                : 
                                111                 :                :             if (WSAGetLastError() == WSAEINTR)
                                112                 :                :                 errno = EINTR;
                                113                 :                :         }
                                114                 :                : #endif
                                115                 :                : 
                                116   [ -  +  -  - ]:          11164 :         if (i < 0 && errno == EINTR)
 1731 michael@paquier.xyz       117                 :UBC           0 :             continue;           /* ignore this */
 1731 michael@paquier.xyz       118   [ +  -  -  + ]:CBC       11164 :         if (i < 0 || CancelRequested)
 1407 tgl@sss.pgh.pa.us         119                 :UBC           0 :             return -1;          /* but not this */
 1731 michael@paquier.xyz       120         [ -  + ]:CBC       11164 :         if (i == 0)
 1731 michael@paquier.xyz       121                 :UBC           0 :             continue;           /* timeout (Win32 only) */
 1731 michael@paquier.xyz       122                 :CBC       11164 :         break;
                                123                 :                :     }
                                124                 :                : 
                                125                 :          11164 :     return i;
                                126                 :                : }
                                127                 :                : 
                                128                 :                : /*
                                129                 :                :  * Return the offset of a suitable idle slot, or -1 if none are available.  If
                                130                 :                :  * the given dbname is not null, only idle slots connected to the given
                                131                 :                :  * database are considered suitable, otherwise all idle connected slots are
                                132                 :                :  * considered suitable.
                                133                 :                :  */
                                134                 :                : static int
 1130 rhaas@postgresql.org      135                 :          22482 : find_matching_idle_slot(const ParallelSlotArray *sa, const char *dbname)
                                136                 :                : {
                                137                 :                :     int         i;
                                138                 :                : 
                                139         [ +  + ]:          33773 :     for (i = 0; i < sa->numslots; i++)
                                140                 :                :     {
                                141         [ +  + ]:          22588 :         if (sa->slots[i].inUse)
                                142                 :          11270 :             continue;
                                143                 :                : 
                                144         [ +  + ]:          11318 :         if (sa->slots[i].connection == NULL)
                                145                 :              7 :             continue;
                                146                 :                : 
                                147         [ +  + ]:          11311 :         if (dbname == NULL ||
                                148         [ +  + ]:           7779 :             strcmp(PQdb(sa->slots[i].connection), dbname) == 0)
                                149                 :          11297 :             return i;
                                150                 :                :     }
                                151                 :          11185 :     return -1;
                                152                 :                : }
                                153                 :                : 
                                154                 :                : /*
                                155                 :                :  * Return the offset of the first slot without a database connection, or -1 if
                                156                 :                :  * all slots are connected.
                                157                 :                :  */
                                158                 :                : static int
                                159                 :          11327 : find_unconnected_slot(const ParallelSlotArray *sa)
                                160                 :                : {
                                161                 :                :     int         i;
                                162                 :                : 
                                163         [ +  + ]:          22580 :     for (i = 0; i < sa->numslots; i++)
                                164                 :                :     {
                                165         [ +  + ]:          11402 :         if (sa->slots[i].inUse)
                                166                 :          11239 :             continue;
                                167                 :                : 
                                168         [ +  + ]:            163 :         if (sa->slots[i].connection == NULL)
                                169                 :            149 :             return i;
                                170                 :                :     }
                                171                 :                : 
                                172                 :          11178 :     return -1;
                                173                 :                : }
                                174                 :                : 
                                175                 :                : /*
                                176                 :                :  * Return the offset of the first idle slot, or -1 if all slots are busy.
                                177                 :                :  */
                                178                 :                : static int
                                179                 :          11178 : find_any_idle_slot(const ParallelSlotArray *sa)
                                180                 :                : {
                                181                 :                :     int         i;
                                182                 :                : 
                                183         [ +  + ]:          22413 :     for (i = 0; i < sa->numslots; i++)
                                184         [ +  + ]:          11249 :         if (!sa->slots[i].inUse)
                                185                 :             14 :             return i;
                                186                 :                : 
                                187                 :          11164 :     return -1;
                                188                 :                : }
                                189                 :                : 
                                190                 :                : /*
                                191                 :                :  * Wait for any slot's connection to have query results, consume the results,
                                192                 :                :  * and update the slot's status as appropriate.  Returns true on success,
                                193                 :                :  * false on cancellation, on error, or if no slots are connected.
                                194                 :                :  */
                                195                 :                : static bool
                                196                 :          11164 : wait_on_slots(ParallelSlotArray *sa)
                                197                 :                : {
                                198                 :                :     int         i;
                                199                 :                :     fd_set      slotset;
                                200                 :          11164 :     int         maxFd = 0;
                                201                 :          11164 :     PGconn     *cancelconn = NULL;
                                202                 :                : 
                                203                 :                :     /* We must reconstruct the fd_set for each call to select_loop */
                                204         [ +  + ]:         189788 :     FD_ZERO(&slotset);
                                205                 :                : 
                                206         [ +  + ]:          22399 :     for (i = 0; i < sa->numslots; i++)
                                207                 :                :     {
                                208                 :                :         int         sock;
                                209                 :                : 
                                210                 :                :         /* We shouldn't get here if we still have slots without connections */
                                211         [ -  + ]:          11235 :         Assert(sa->slots[i].connection != NULL);
                                212                 :                : 
                                213                 :          11235 :         sock = PQsocket(sa->slots[i].connection);
                                214                 :                : 
                                215                 :                :         /*
                                216                 :                :          * We don't really expect any connections to lose their sockets after
                                217                 :                :          * startup, but just in case, cope by ignoring them.
                                218                 :                :          */
                                219         [ -  + ]:          11235 :         if (sock < 0)
 1130 rhaas@postgresql.org      220                 :UBC           0 :             continue;
                                221                 :                : 
                                222                 :                :         /* Keep track of the first valid connection we see. */
 1130 rhaas@postgresql.org      223         [ +  + ]:CBC       11235 :         if (cancelconn == NULL)
                                224                 :          11164 :             cancelconn = sa->slots[i].connection;
                                225                 :                : 
                                226                 :          11235 :         FD_SET(sock, &slotset);
                                227         [ +  - ]:          11235 :         if (sock > maxFd)
                                228                 :          11235 :             maxFd = sock;
                                229                 :                :     }
                                230                 :                : 
                                231                 :                :     /*
                                232                 :                :      * If we get this far with no valid connections, processing cannot
                                233                 :                :      * continue.
                                234                 :                :      */
                                235         [ -  + ]:          11164 :     if (cancelconn == NULL)
 1130 rhaas@postgresql.org      236                 :UBC           0 :         return false;
                                237                 :                : 
  596 michael@paquier.xyz       238                 :CBC       11164 :     SetCancelConn(cancelconn);
 1130 rhaas@postgresql.org      239                 :          11164 :     i = select_loop(maxFd, &slotset);
                                240                 :          11164 :     ResetCancelConn();
                                241                 :                : 
                                242                 :                :     /* failure? */
                                243         [ -  + ]:          11164 :     if (i < 0)
 1130 rhaas@postgresql.org      244                 :UBC           0 :         return false;
                                245                 :                : 
 1130 rhaas@postgresql.org      246         [ +  + ]:CBC       22399 :     for (i = 0; i < sa->numslots; i++)
                                247                 :                :     {
                                248                 :                :         int         sock;
                                249                 :                : 
                                250                 :          11235 :         sock = PQsocket(sa->slots[i].connection);
                                251                 :                : 
                                252   [ +  -  +  + ]:          11235 :         if (sock >= 0 && FD_ISSET(sock, &slotset))
                                253                 :                :         {
                                254                 :                :             /* select() says input is available, so consume it */
                                255                 :          11165 :             PQconsumeInput(sa->slots[i].connection);
                                256                 :                :         }
                                257                 :                : 
                                258                 :                :         /* Collect result(s) as long as any are available */
                                259         [ +  + ]:          22351 :         while (!PQisBusy(sa->slots[i].connection))
                                260                 :                :         {
                                261                 :          22232 :             PGresult   *result = PQgetResult(sa->slots[i].connection);
                                262                 :                : 
                                263         [ +  + ]:          22232 :             if (result != NULL)
                                264                 :                :             {
                                265                 :                :                 /* Handle and discard the command result */
                                266         [ -  + ]:          11116 :                 if (!processQueryResult(&sa->slots[i], result))
 1130 rhaas@postgresql.org      267                 :UBC           0 :                     return false;
                                268                 :                :             }
                                269                 :                :             else
                                270                 :                :             {
                                271                 :                :                 /* This connection has become idle */
 1130 rhaas@postgresql.org      272                 :CBC       11116 :                 sa->slots[i].inUse = false;
                                273                 :          11116 :                 ParallelSlotClearHandler(&sa->slots[i]);
                                274                 :          11116 :                 break;
                                275                 :                :             }
                                276                 :                :         }
                                277                 :                :     }
                                278                 :          11164 :     return true;
                                279                 :                : }
                                280                 :                : 
                                281                 :                : /*
                                282                 :                :  * Open a new database connection using the stored connection parameters and
                                283                 :                :  * optionally a given dbname if not null, execute the stored initial command if
                                284                 :                :  * any, and associate the new connection with the given slot.
                                285                 :                :  */
                                286                 :                : static void
                                287                 :             21 : connect_slot(ParallelSlotArray *sa, int slotno, const char *dbname)
                                288                 :                : {
                                289                 :                :     const char *old_override;
                                290                 :             21 :     ParallelSlot *slot = &sa->slots[slotno];
                                291                 :                : 
                                292                 :             21 :     old_override = sa->cparams->override_dbname;
                                293         [ +  + ]:             21 :     if (dbname)
                                294                 :             17 :         sa->cparams->override_dbname = dbname;
                                295                 :             21 :     slot->connection = connectDatabase(sa->cparams, sa->progname, sa->echo, false, true);
                                296                 :             21 :     sa->cparams->override_dbname = old_override;
                                297                 :                : 
                                298                 :                :     /*
                                299                 :                :      * POSIX defines FD_SETSIZE as the highest file descriptor acceptable to
                                300                 :                :      * FD_SET() and allied macros.  Windows defines it as a ceiling on the
                                301                 :                :      * count of file descriptors in the set, not a ceiling on the value of
                                302                 :                :      * each file descriptor; see
                                303                 :                :      * https://learn.microsoft.com/en-us/windows/win32/api/winsock2/nf-winsock2-select
                                304                 :                :      * and
                                305                 :                :      * https://learn.microsoft.com/en-us/windows/win32/api/winsock/ns-winsock-fd_set.
                                306                 :                :      * We can't ignore that, because Windows starts file descriptors at a
                                307                 :                :      * higher value, delays reuse, and skips values.  With less than ten
                                308                 :                :      * concurrent file descriptors, opened and closed rapidly, one can reach
                                309                 :                :      * file descriptor 1024.
                                310                 :                :      *
                                311                 :                :      * Doing a hard exit here is a bit grotty, but it doesn't seem worth
                                312                 :                :      * complicating the API to make it less grotty.
                                313                 :                :      */
                                314                 :                : #ifdef WIN32
                                315                 :                :     if (slotno >= FD_SETSIZE)
                                316                 :                :     {
                                317                 :                :         pg_log_error("too many jobs for this platform: %d", slotno);
                                318                 :                :         exit(1);
                                319                 :                :     }
                                320                 :                : #else
                                321                 :                :     {
  183 noah@leadboat.com         322                 :             21 :         int         fd = PQsocket(slot->connection);
                                323                 :                : 
                                324         [ -  + ]:             21 :         if (fd >= FD_SETSIZE)
                                325                 :                :         {
  183 noah@leadboat.com         326                 :UBC           0 :             pg_log_error("socket file descriptor out of range for select(): %d",
                                327                 :                :                          fd);
                                328                 :              0 :             pg_log_error_hint("Try fewer jobs.");
                                329                 :              0 :             exit(1);
                                330                 :                :         }
                                331                 :                :     }
                                332                 :                : #endif
                                333                 :                : 
                                334                 :                :     /* Setup the connection using the supplied command, if any. */
 1130 rhaas@postgresql.org      335         [ -  + ]:CBC          21 :     if (sa->initcmd)
 1130 rhaas@postgresql.org      336                 :UBC           0 :         executeCommand(slot->connection, sa->initcmd, sa->echo);
 1731 michael@paquier.xyz       337                 :CBC          21 : }
                                338                 :                : 
                                339                 :                : /*
                                340                 :                :  * ParallelSlotsGetIdle
                                341                 :                :  *      Return a connection slot that is ready to execute a command.
                                342                 :                :  *
                                343                 :                :  * The slot returned is chosen as follows:
                                344                 :                :  *
                                345                 :                :  * If any idle slot already has an open connection, and if either dbname is
                                346                 :                :  * null or the existing connection is to the given database, that slot will be
                                347                 :                :  * returned allowing the connection to be reused.
                                348                 :                :  *
                                349                 :                :  * Otherwise, if any idle slot is not yet connected to any database, the slot
                                350                 :                :  * will be returned with it's connection opened using the stored cparams and
                                351                 :                :  * optionally the given dbname if not null.
                                352                 :                :  *
                                353                 :                :  * Otherwise, if any idle slot exists, an idle slot will be chosen and returned
                                354                 :                :  * after having it's connection disconnected and reconnected using the stored
                                355                 :                :  * cparams and optionally the given dbname if not null.
                                356                 :                :  *
                                357                 :                :  * Otherwise, if any slots have connections that are busy, we loop on select()
                                358                 :                :  * until one socket becomes available.  When this happens, we read the whole
                                359                 :                :  * set and mark as free all sockets that become available.  We then select a
                                360                 :                :  * slot using the same rules as above.
                                361                 :                :  *
                                362                 :                :  * Otherwise, we cannot return a slot, which is an error, and NULL is returned.
                                363                 :                :  *
                                364                 :                :  * For any connection created, if the stored initcmd is not null, it will be
                                365                 :                :  * executed as a command on the newly formed connection before the slot is
                                366                 :                :  * returned.
                                367                 :                :  *
                                368                 :                :  * If an error occurs, NULL is returned.
                                369                 :                :  */
                                370                 :                : ParallelSlot *
 1130 rhaas@postgresql.org      371                 :          11318 : ParallelSlotsGetIdle(ParallelSlotArray *sa, const char *dbname)
                                372                 :                : {
                                373                 :                :     int         offset;
                                374                 :                : 
                                375         [ -  + ]:          11318 :     Assert(sa);
                                376         [ +  - ]:          11318 :     Assert(sa->numslots > 0);
                                377                 :                : 
                                378                 :                :     while (1)
                                379                 :                :     {
                                380                 :                :         /* First choice: a slot already connected to the desired database. */
                                381                 :          22482 :         offset = find_matching_idle_slot(sa, dbname);
                                382         [ +  + ]:          22482 :         if (offset >= 0)
                                383                 :                :         {
                                384                 :          11297 :             sa->slots[offset].inUse = true;
                                385                 :          11297 :             return &sa->slots[offset];
                                386                 :                :         }
                                387                 :                : 
                                388                 :                :         /* Second choice: a slot not connected to any database. */
                                389                 :          11185 :         offset = find_unconnected_slot(sa);
                                390         [ +  + ]:          11185 :         if (offset >= 0)
                                391                 :                :         {
                                392                 :              7 :             connect_slot(sa, offset, dbname);
                                393                 :              7 :             sa->slots[offset].inUse = true;
                                394                 :              7 :             return &sa->slots[offset];
                                395                 :                :         }
                                396                 :                : 
                                397                 :                :         /* Third choice: a slot connected to the wrong database. */
                                398                 :          11178 :         offset = find_any_idle_slot(sa);
                                399         [ +  + ]:          11178 :         if (offset >= 0)
                                400                 :                :         {
                                401                 :             14 :             disconnectDatabase(sa->slots[offset].connection);
                                402                 :             14 :             sa->slots[offset].connection = NULL;
                                403                 :             14 :             connect_slot(sa, offset, dbname);
                                404                 :             14 :             sa->slots[offset].inUse = true;
                                405                 :             14 :             return &sa->slots[offset];
                                406                 :                :         }
                                407                 :                : 
                                408                 :                :         /*
                                409                 :                :          * Fourth choice: block until one or more slots become available. If
                                410                 :                :          * any slots hit a fatal error, we'll find out about that here and
                                411                 :                :          * return NULL.
                                412                 :                :          */
                                413         [ -  + ]:          11164 :         if (!wait_on_slots(sa))
 1130 rhaas@postgresql.org      414                 :UBC           0 :             return NULL;
                                415                 :                :     }
                                416                 :                : }
                                417                 :                : 
                                418                 :                : /*
                                419                 :                :  * ParallelSlotsSetup
                                420                 :                :  *      Prepare a set of parallel slots but do not connect to any database.
                                421                 :                :  *
                                422                 :                :  * This creates and initializes a set of slots, marking all parallel slots as
                                423                 :                :  * free and ready to use.  Establishing connections is delayed until requesting
                                424                 :                :  * a free slot.  The cparams, progname, echo, and initcmd are stored for later
                                425                 :                :  * use and must remain valid for the lifetime of the returned array.
                                426                 :                :  */
                                427                 :                : ParallelSlotArray *
 1130 rhaas@postgresql.org      428                 :CBC         145 : ParallelSlotsSetup(int numslots, ConnParams *cparams, const char *progname,
                                429                 :                :                    bool echo, const char *initcmd)
                                430                 :                : {
                                431                 :                :     ParallelSlotArray *sa;
                                432                 :                : 
                                433         [ -  + ]:            145 :     Assert(numslots > 0);
                                434         [ -  + ]:            145 :     Assert(cparams != NULL);
                                435         [ -  + ]:            145 :     Assert(progname != NULL);
                                436                 :                : 
                                437                 :            145 :     sa = (ParallelSlotArray *) palloc0(offsetof(ParallelSlotArray, slots) +
                                438                 :            145 :                                        numslots * sizeof(ParallelSlot));
                                439                 :                : 
                                440                 :            145 :     sa->numslots = numslots;
                                441                 :            145 :     sa->cparams = cparams;
                                442                 :            145 :     sa->progname = progname;
                                443                 :            145 :     sa->echo = echo;
                                444                 :            145 :     sa->initcmd = initcmd;
                                445                 :                : 
                                446                 :            145 :     return sa;
                                447                 :                : }
                                448                 :                : 
                                449                 :                : /*
                                450                 :                :  * ParallelSlotsAdoptConn
                                451                 :                :  *      Assign an open connection to the slots array for reuse.
                                452                 :                :  *
                                453                 :                :  * This turns over ownership of an open connection to a slots array.  The
                                454                 :                :  * caller should not further use or close the connection.  All the connection's
                                455                 :                :  * parameters (user, host, port, etc.) except possibly dbname should match
                                456                 :                :  * those of the slots array's cparams, as given in ParallelSlotsSetup.  If
                                457                 :                :  * these parameters differ, subsequent behavior is undefined.
                                458                 :                :  */
                                459                 :                : void
                                460                 :            142 : ParallelSlotsAdoptConn(ParallelSlotArray *sa, PGconn *conn)
                                461                 :                : {
                                462                 :                :     int         offset;
                                463                 :                : 
                                464                 :            142 :     offset = find_unconnected_slot(sa);
                                465         [ +  - ]:            142 :     if (offset >= 0)
                                466                 :            142 :         sa->slots[offset].connection = conn;
                                467                 :                :     else
 1130 rhaas@postgresql.org      468                 :UBC           0 :         disconnectDatabase(conn);
 1731 michael@paquier.xyz       469                 :CBC         142 : }
                                470                 :                : 
                                471                 :                : /*
                                472                 :                :  * ParallelSlotsTerminate
                                473                 :                :  *      Clean up a set of parallel slots
                                474                 :                :  *
                                475                 :                :  * Iterate through all connections in a given set of ParallelSlots and
                                476                 :                :  * terminate all connections.
                                477                 :                :  */
                                478                 :                : void
 1130 rhaas@postgresql.org      479                 :            145 : ParallelSlotsTerminate(ParallelSlotArray *sa)
                                480                 :                : {
                                481                 :                :     int         i;
                                482                 :                : 
                                483         [ +  + ]:            294 :     for (i = 0; i < sa->numslots; i++)
                                484                 :                :     {
                                485                 :            149 :         PGconn     *conn = sa->slots[i].connection;
                                486                 :                : 
 1731 michael@paquier.xyz       487         [ -  + ]:            149 :         if (conn == NULL)
 1731 michael@paquier.xyz       488                 :UBC           0 :             continue;
                                489                 :                : 
 1731 michael@paquier.xyz       490                 :CBC         149 :         disconnectDatabase(conn);
                                491                 :                :     }
                                492                 :            145 : }
                                493                 :                : 
                                494                 :                : /*
                                495                 :                :  * ParallelSlotsWaitCompletion
                                496                 :                :  *
                                497                 :                :  * Wait for all connections to finish, returning false if at least one
                                498                 :                :  * error has been found on the way.
                                499                 :                :  */
                                500                 :                : bool
 1130 rhaas@postgresql.org      501                 :            198 : ParallelSlotsWaitCompletion(ParallelSlotArray *sa)
                                502                 :                : {
                                503                 :                :     int         i;
                                504                 :                : 
                                505         [ +  + ]:            395 :     for (i = 0; i < sa->numslots; i++)
                                506                 :                :     {
                                507         [ -  + ]:            203 :         if (sa->slots[i].connection == NULL)
 1130 rhaas@postgresql.org      508                 :UBC           0 :             continue;
 1130 rhaas@postgresql.org      509         [ +  + ]:CBC         203 :         if (!consumeQueryResult(&sa->slots[i]))
 1164                           510                 :              6 :             return false;
                                511                 :                :         /* Mark connection as idle */
  464 tgl@sss.pgh.pa.us         512                 :            197 :         sa->slots[i].inUse = false;
                                513                 :            197 :         ParallelSlotClearHandler(&sa->slots[i]);
                                514                 :                :     }
                                515                 :                : 
 1164 rhaas@postgresql.org      516                 :            192 :     return true;
                                517                 :                : }
                                518                 :                : 
                                519                 :                : /*
                                520                 :                :  * TableCommandResultHandler
                                521                 :                :  *
                                522                 :                :  * ParallelSlotResultHandler for results of commands (not queries) against
                                523                 :                :  * tables.
                                524                 :                :  *
                                525                 :                :  * Requires that the result status is either PGRES_COMMAND_OK or an error about
                                526                 :                :  * a missing table.  This is useful for utilities that compile a list of tables
                                527                 :                :  * to process and then run commands (vacuum, reindex, or whatever) against
                                528                 :                :  * those tables, as there is a race condition between the time the list is
                                529                 :                :  * compiled and the time the command attempts to open the table.
                                530                 :                :  *
                                531                 :                :  * For missing tables, logs an error but allows processing to continue.
                                532                 :                :  *
                                533                 :                :  * For all other errors, logs an error and terminates further processing.
                                534                 :                :  *
                                535                 :                :  * res: PGresult from the query executed on the slot's connection
                                536                 :                :  * conn: connection belonging to the slot
                                537                 :                :  * context: unused
                                538                 :                :  */
                                539                 :                : bool
                                540                 :           3537 : TableCommandResultHandler(PGresult *res, PGconn *conn, void *context)
                                541                 :                : {
 1130                           542         [ -  + ]:           3537 :     Assert(res != NULL);
                                543         [ -  + ]:           3537 :     Assert(conn != NULL);
                                544                 :                : 
                                545                 :                :     /*
                                546                 :                :      * If it's an error, report it.  Errors about a missing table are harmless
                                547                 :                :      * so we continue processing; but die for other errors.
                                548                 :                :      */
 1164                           549         [ +  + ]:           3537 :     if (PQresultStatus(res) != PGRES_COMMAND_OK)
                                550                 :                :     {
                                551                 :              6 :         char       *sqlState = PQresultErrorField(res, PG_DIAG_SQLSTATE);
                                552                 :                : 
                                553                 :              6 :         pg_log_error("processing of database \"%s\" failed: %s",
                                554                 :                :                      PQdb(conn), PQerrorMessage(conn));
                                555                 :                : 
                                556   [ +  -  +  - ]:              6 :         if (sqlState && strcmp(sqlState, ERRCODE_UNDEFINED_TABLE) != 0)
                                557                 :                :         {
                                558                 :              6 :             PQclear(res);
 1731 michael@paquier.xyz       559                 :              6 :             return false;
                                560                 :                :         }
                                561                 :                :     }
                                562                 :                : 
                                563                 :           3531 :     return true;
                                564                 :                : }
        

Generated by: LCOV version 2.1-beta2-3-g6141622