LCOV - differential code coverage report
Current view: top level - src/bin/pg_dump - parallel.c (source / functions) Coverage Total Hit LBC UIC UBC GBC GIC CBC EUB ECB
Current: Differential Code Coverage HEAD vs 15 Lines: 82.3 % 339 279 6 40 14 5 178 96 41 177
Current Date: 2023-04-08 15:15:32 Functions: 90.6 % 32 29 3 29 3 29
Baseline: 15
Baseline Date: 2023-04-08 15:09:40
Legend: Lines: hit not hit

           TLA  Line data    Source code
       1                 : /*-------------------------------------------------------------------------
       2                 :  *
       3                 :  * parallel.c
       4                 :  *
       5                 :  *  Parallel support for pg_dump and pg_restore
       6                 :  *
       7                 :  * Portions Copyright (c) 1996-2023, PostgreSQL Global Development Group
       8                 :  * Portions Copyright (c) 1994, Regents of the University of California
       9                 :  *
      10                 :  * IDENTIFICATION
      11                 :  *      src/bin/pg_dump/parallel.c
      12                 :  *
      13                 :  *-------------------------------------------------------------------------
      14                 :  */
      15                 : 
      16                 : /*
      17                 :  * Parallel operation works like this:
      18                 :  *
      19                 :  * The original, leader process calls ParallelBackupStart(), which forks off
      20                 :  * the desired number of worker processes, which each enter WaitForCommands().
      21                 :  *
      22                 :  * The leader process dispatches an individual work item to one of the worker
      23                 :  * processes in DispatchJobForTocEntry().  We send a command string such as
      24                 :  * "DUMP 1234" or "RESTORE 1234", where 1234 is the TocEntry ID.
      25                 :  * The worker process receives and decodes the command and passes it to the
      26                 :  * routine pointed to by AH->WorkerJobDumpPtr or AH->WorkerJobRestorePtr,
      27                 :  * which are routines of the current archive format.  That routine performs
      28                 :  * the required action (dump or restore) and returns an integer status code.
      29                 :  * This is passed back to the leader where we pass it to the
      30                 :  * ParallelCompletionPtr callback function that was passed to
      31                 :  * DispatchJobForTocEntry().  The callback function does state updating
      32                 :  * for the leader control logic in pg_backup_archiver.c.
      33                 :  *
      34                 :  * In principle additional archive-format-specific information might be needed
      35                 :  * in commands or worker status responses, but so far that hasn't proved
      36                 :  * necessary, since workers have full copies of the ArchiveHandle/TocEntry
      37                 :  * data structures.  Remember that we have forked off the workers only after
      38                 :  * we have read in the catalog.  That's why our worker processes can also
      39                 :  * access the catalog information.  (In the Windows case, the workers are
      40                 :  * threads in the same process.  To avoid problems, they work with cloned
      41                 :  * copies of the Archive data structure; see RunWorker().)
      42                 :  *
      43                 :  * In the leader process, the workerStatus field for each worker has one of
      44                 :  * the following values:
      45                 :  *      WRKR_NOT_STARTED: we've not yet forked this worker
      46                 :  *      WRKR_IDLE: it's waiting for a command
      47                 :  *      WRKR_WORKING: it's working on a command
      48                 :  *      WRKR_TERMINATED: process ended
      49                 :  * The pstate->te[] entry for each worker is valid when it's in WRKR_WORKING
      50                 :  * state, and must be NULL in other states.
      51                 :  */
      52                 : 
      53                 : #include "postgres_fe.h"
      54                 : 
      55                 : #ifndef WIN32
      56                 : #include <sys/select.h>
      57                 : #include <sys/wait.h>
      58                 : #include <signal.h>
      59                 : #include <unistd.h>
      60                 : #include <fcntl.h>
      61                 : #endif
      62                 : 
      63                 : #include "fe_utils/string_utils.h"
      64                 : #include "parallel.h"
      65                 : #include "pg_backup_utils.h"
      66                 : #include "port/pg_bswap.h"
      67                 : 
      68                 : /* Mnemonic macros for indexing the fd array returned by pipe(2) */
      69                 : #define PIPE_READ                           0
      70                 : #define PIPE_WRITE                          1
      71                 : 
      72                 : #define NO_SLOT (-1)            /* Failure result for GetIdleWorker() */
      73                 : 
      74                 : /* Worker process statuses */
      75                 : typedef enum
      76                 : {
      77                 :     WRKR_NOT_STARTED = 0,
      78                 :     WRKR_IDLE,
      79                 :     WRKR_WORKING,
      80                 :     WRKR_TERMINATED
      81                 : } T_WorkerStatus;
      82                 : 
      83                 : #define WORKER_IS_RUNNING(workerStatus) \
      84                 :     ((workerStatus) == WRKR_IDLE || (workerStatus) == WRKR_WORKING)
      85                 : 
      86                 : /*
      87                 :  * Private per-parallel-worker state (typedef for this is in parallel.h).
      88                 :  *
      89                 :  * Much of this is valid only in the leader process (or, on Windows, should
      90                 :  * be touched only by the leader thread).  But the AH field should be touched
      91                 :  * only by workers.  The pipe descriptors are valid everywhere.
      92                 :  */
      93                 : struct ParallelSlot
      94                 : {
      95                 :     T_WorkerStatus workerStatus;    /* see enum above */
      96                 : 
      97                 :     /* These fields are valid if workerStatus == WRKR_WORKING: */
      98                 :     ParallelCompletionPtr callback; /* function to call on completion */
      99                 :     void       *callback_data;  /* passthrough data for it */
     100                 : 
     101                 :     ArchiveHandle *AH;          /* Archive data worker is using */
     102                 : 
     103                 :     int         pipeRead;       /* leader's end of the pipes */
     104                 :     int         pipeWrite;
     105                 :     int         pipeRevRead;    /* child's end of the pipes */
     106                 :     int         pipeRevWrite;
     107                 : 
     108                 :     /* Child process/thread identity info: */
     109                 : #ifdef WIN32
     110                 :     uintptr_t   hThread;
     111                 :     unsigned int threadId;
     112                 : #else
     113                 :     pid_t       pid;
     114                 : #endif
     115                 : };
     116                 : 
     117                 : #ifdef WIN32
     118                 : 
     119                 : /*
     120                 :  * Structure to hold info passed by _beginthreadex() to the function it calls
     121                 :  * via its single allowed argument.
     122                 :  */
     123                 : typedef struct
     124                 : {
     125                 :     ArchiveHandle *AH;          /* leader database connection */
     126                 :     ParallelSlot *slot;         /* this worker's parallel slot */
     127                 : } WorkerInfo;
     128                 : 
     129                 : /* Windows implementation of pipe access */
     130                 : static int  pgpipe(int handles[2]);
     131                 : #define piperead(a,b,c)     recv(a,b,c,0)
     132                 : #define pipewrite(a,b,c)    send(a,b,c,0)
     133                 : 
     134                 : #else                           /* !WIN32 */
     135                 : 
     136                 : /* Non-Windows implementation of pipe access */
     137                 : #define pgpipe(a)           pipe(a)
     138                 : #define piperead(a,b,c)     read(a,b,c)
     139                 : #define pipewrite(a,b,c)    write(a,b,c)
     140                 : 
     141                 : #endif                          /* WIN32 */
     142                 : 
     143                 : /*
     144                 :  * State info for archive_close_connection() shutdown callback.
     145                 :  */
     146                 : typedef struct ShutdownInformation
     147                 : {
     148                 :     ParallelState *pstate;
     149                 :     Archive    *AHX;
     150                 : } ShutdownInformation;
     151                 : 
     152                 : static ShutdownInformation shutdown_info;
     153                 : 
     154                 : /*
     155                 :  * State info for signal handling.
     156                 :  * We assume signal_info initializes to zeroes.
     157                 :  *
     158                 :  * On Unix, myAH is the leader DB connection in the leader process, and the
     159                 :  * worker's own connection in worker processes.  On Windows, we have only one
     160                 :  * instance of signal_info, so myAH is the leader connection and the worker
     161                 :  * connections must be dug out of pstate->parallelSlot[].
     162                 :  */
     163                 : typedef struct DumpSignalInformation
     164                 : {
     165                 :     ArchiveHandle *myAH;        /* database connection to issue cancel for */
     166                 :     ParallelState *pstate;      /* parallel state, if any */
     167                 :     bool        handler_set;    /* signal handler set up in this process? */
     168                 : #ifndef WIN32
     169                 :     bool        am_worker;      /* am I a worker process? */
     170                 : #endif
     171                 : } DumpSignalInformation;
     172                 : 
     173                 : static volatile DumpSignalInformation signal_info;
     174                 : 
     175                 : #ifdef WIN32
     176                 : static CRITICAL_SECTION signal_info_lock;
     177                 : #endif
     178                 : 
     179                 : /*
     180                 :  * Write a simple string to stderr --- must be safe in a signal handler.
     181                 :  * We ignore the write() result since there's not much we could do about it.
     182                 :  * Certain compilers make that harder than it ought to be.
     183                 :  */
     184                 : #define write_stderr(str) \
     185                 :     do { \
     186                 :         const char *str_ = (str); \
     187                 :         int     rc_; \
     188                 :         rc_ = write(fileno(stderr), str_, strlen(str_)); \
     189                 :         (void) rc_; \
     190                 :     } while (0)
     191                 : 
     192                 : 
     193                 : #ifdef WIN32
     194                 : /* file-scope variables */
     195                 : static DWORD tls_index;
     196                 : 
     197                 : /* globally visible variables (needed by exit_nicely) */
     198                 : bool        parallel_init_done = false;
     199                 : DWORD       mainThreadId;
     200                 : #endif                          /* WIN32 */
     201                 : 
     202                 : /* Local function prototypes */
     203                 : static ParallelSlot *GetMyPSlot(ParallelState *pstate);
     204                 : static void archive_close_connection(int code, void *arg);
     205                 : static void ShutdownWorkersHard(ParallelState *pstate);
     206                 : static void WaitForTerminatingWorkers(ParallelState *pstate);
     207                 : static void setup_cancel_handler(void);
     208                 : static void set_cancel_pstate(ParallelState *pstate);
     209                 : static void set_cancel_slot_archive(ParallelSlot *slot, ArchiveHandle *AH);
     210                 : static void RunWorker(ArchiveHandle *AH, ParallelSlot *slot);
     211                 : static int  GetIdleWorker(ParallelState *pstate);
     212                 : static bool HasEveryWorkerTerminated(ParallelState *pstate);
     213                 : static void lockTableForWorker(ArchiveHandle *AH, TocEntry *te);
     214                 : static void WaitForCommands(ArchiveHandle *AH, int pipefd[2]);
     215                 : static bool ListenToWorkers(ArchiveHandle *AH, ParallelState *pstate,
     216                 :                             bool do_wait);
     217                 : static char *getMessageFromLeader(int pipefd[2]);
     218                 : static void sendMessageToLeader(int pipefd[2], const char *str);
     219                 : static int  select_loop(int maxFd, fd_set *workerset);
     220                 : static char *getMessageFromWorker(ParallelState *pstate,
     221                 :                                   bool do_wait, int *worker);
     222                 : static void sendMessageToWorker(ParallelState *pstate,
     223                 :                                 int worker, const char *str);
     224                 : static char *readMessageFromPipe(int fd);
     225                 : 
     226                 : #define messageStartsWith(msg, prefix) \
     227                 :     (strncmp(msg, prefix, strlen(prefix)) == 0)
     228                 : 
     229                 : 
     230                 : /*
     231                 :  * Initialize parallel dump support --- should be called early in process
     232                 :  * startup.  (Currently, this is called whether or not we intend parallel
     233                 :  * activity.)
     234 ECB             :  */
     235                 : void
     236 GIC         227 : init_parallel_dump_utils(void)
     237                 : {
     238                 : #ifdef WIN32
     239                 :     if (!parallel_init_done)
     240                 :     {
     241                 :         WSADATA     wsaData;
     242                 :         int         err;
     243                 : 
     244                 :         /* Prepare for threaded operation */
     245                 :         tls_index = TlsAlloc();
     246                 :         mainThreadId = GetCurrentThreadId();
     247                 : 
     248                 :         /* Initialize socket access */
     249                 :         err = WSAStartup(MAKEWORD(2, 2), &wsaData);
     250                 :         if (err != 0)
     251                 :             pg_fatal("%s() failed: error code %d", "WSAStartup", err);
     252                 : 
     253                 :         parallel_init_done = true;
     254 ECB             :     }
     255                 : #endif
     256 GIC         227 : }
     257                 : 
     258                 : /*
     259                 :  * Find the ParallelSlot for the current worker process or thread.
     260                 :  *
     261                 :  * Returns NULL if no matching slot is found (this implies we're the leader).
     262 EUB             :  */
     263                 : static ParallelSlot *
     264 UIC           0 : GetMyPSlot(ParallelState *pstate)
     265                 : {
     266 EUB             :     int         i;
     267                 : 
     268 UIC           0 :     for (i = 0; i < pstate->numWorkers; i++)
     269                 :     {
     270                 : #ifdef WIN32
     271 EUB             :         if (pstate->parallelSlot[i].threadId == GetCurrentThreadId())
     272                 : #else
     273 UBC           0 :         if (pstate->parallelSlot[i].pid == getpid())
     274                 : #endif
     275 UIC           0 :             return &(pstate->parallelSlot[i]);
     276 EUB             :     }
     277                 : 
     278 UIC           0 :     return NULL;
     279                 : }
     280                 : 
     281                 : /*
     282                 :  * A thread-local version of getLocalPQExpBuffer().
     283                 :  *
     284                 :  * Non-reentrant but reduces memory leakage: we'll consume one buffer per
     285                 :  * thread, which is much better than one per fmtId/fmtQualifiedId call.
     286                 :  */
     287                 : #ifdef WIN32
     288                 : static PQExpBuffer
     289                 : getThreadLocalPQExpBuffer(void)
     290                 : {
     291                 :     /*
     292                 :      * The Tls code goes awry if we use a static var, so we provide for both
     293                 :      * static and auto, and omit any use of the static var when using Tls. We
     294                 :      * rely on TlsGetValue() to return 0 if the value is not yet set.
     295                 :      */
     296                 :     static PQExpBuffer s_id_return = NULL;
     297                 :     PQExpBuffer id_return;
     298                 : 
     299                 :     if (parallel_init_done)
     300                 :         id_return = (PQExpBuffer) TlsGetValue(tls_index);
     301                 :     else
     302                 :         id_return = s_id_return;
     303                 : 
     304                 :     if (id_return)              /* first time through? */
     305                 :     {
     306                 :         /* same buffer, just wipe contents */
     307                 :         resetPQExpBuffer(id_return);
     308                 :     }
     309                 :     else
     310                 :     {
     311                 :         /* new buffer */
     312                 :         id_return = createPQExpBuffer();
     313                 :         if (parallel_init_done)
     314                 :             TlsSetValue(tls_index, id_return);
     315                 :         else
     316                 :             s_id_return = id_return;
     317                 :     }
     318                 : 
     319                 :     return id_return;
     320                 : }
     321                 : #endif                          /* WIN32 */
     322                 : 
     323                 : /*
     324                 :  * pg_dump and pg_restore call this to register the cleanup handler
     325                 :  * as soon as they've created the ArchiveHandle.
     326 ECB             :  */
     327                 : void
     328 CBC         163 : on_exit_close_archive(Archive *AHX)
     329 ECB             : {
     330 CBC         163 :     shutdown_info.AHX = AHX;
     331 GIC         163 :     on_exit_nicely(archive_close_connection, &shutdown_info);
     332             163 : }
     333                 : 
     334                 : /*
     335                 :  * on_exit_nicely handler for shutting down database connections and
     336                 :  * worker processes cleanly.
     337 ECB             :  */
     338                 : static void
     339 CBC         130 : archive_close_connection(int code, void *arg)
     340                 : {
     341             130 :     ShutdownInformation *si = (ShutdownInformation *) arg;
     342                 : 
     343 GIC         130 :     if (si->pstate)
     344 EUB             :     {
     345                 :         /* In parallel mode, must figure out who we are */
     346 UBC           0 :         ParallelSlot *slot = GetMyPSlot(si->pstate);
     347                 : 
     348 UIC           0 :         if (!slot)
     349                 :         {
     350                 :             /*
     351                 :              * We're the leader.  Forcibly shut down workers, then close our
     352 EUB             :              * own database connection, if any.
     353                 :              */
     354 UBC           0 :             ShutdownWorkersHard(si->pstate);
     355 EUB             : 
     356 UIC           0 :             if (si->AHX)
     357               0 :                 DisconnectDatabase(si->AHX);
     358                 :         }
     359                 :         else
     360                 :         {
     361                 :             /*
     362                 :              * We're a worker.  Shut down our own DB connection if any.  On
     363                 :              * Windows, we also have to close our communication sockets, to
     364                 :              * emulate what will happen on Unix when the worker process exits.
     365                 :              * (Without this, if this is a premature exit, the leader would
     366                 :              * fail to detect it because there would be no EOF condition on
     367 EUB             :              * the other end of the pipe.)
     368                 :              */
     369 UIC           0 :             if (slot->AH)
     370               0 :                 DisconnectDatabase(&(slot->AH->public));
     371                 : 
     372                 : #ifdef WIN32
     373                 :             closesocket(slot->pipeRevRead);
     374                 :             closesocket(slot->pipeRevWrite);
     375                 : #endif
     376                 :         }
     377                 :     }
     378                 :     else
     379 ECB             :     {
     380                 :         /* Non-parallel operation: just kill the leader DB connection */
     381 GIC         130 :         if (si->AHX)
     382 CBC         130 :             DisconnectDatabase(si->AHX);
     383                 :     }
     384 GIC         130 : }
     385                 : 
     386                 : /*
     387                 :  * Forcibly shut down any remaining workers, waiting for them to finish.
     388                 :  *
     389                 :  * Note that we don't expect to come here during normal exit (the workers
     390                 :  * should be long gone, and the ParallelState too).  We're only here in a
     391                 :  * pg_fatal() situation, so intervening to cancel active commands is
     392                 :  * appropriate.
     393 EUB             :  */
     394                 : static void
     395 UIC           0 : ShutdownWorkersHard(ParallelState *pstate)
     396                 : {
     397                 :     int         i;
     398                 : 
     399                 :     /*
     400                 :      * Close our write end of the sockets so that any workers waiting for
     401                 :      * commands know they can exit.  (Note: some of the pipeWrite fields might
     402                 :      * still be zero, if we failed to initialize all the workers.  Hence, just
     403 EUB             :      * ignore errors here.)
     404                 :      */
     405 UIC           0 :     for (i = 0; i < pstate->numWorkers; i++)
     406               0 :         closesocket(pstate->parallelSlot[i].pipeWrite);
     407                 : 
     408                 :     /*
     409                 :      * Force early termination of any commands currently in progress.
     410                 :      */
     411 EUB             : #ifndef WIN32
     412                 :     /* On non-Windows, send SIGTERM to each worker process. */
     413 UBC           0 :     for (i = 0; i < pstate->numWorkers; i++)
     414                 :     {
     415               0 :         pid_t       pid = pstate->parallelSlot[i].pid;
     416 EUB             : 
     417 UIC           0 :         if (pid != 0)
     418               0 :             kill(pid, SIGTERM);
     419                 :     }
     420                 : #else
     421                 : 
     422                 :     /*
     423                 :      * On Windows, send query cancels directly to the workers' backends.  Use
     424                 :      * a critical section to ensure worker threads don't change state.
     425                 :      */
     426                 :     EnterCriticalSection(&signal_info_lock);
     427                 :     for (i = 0; i < pstate->numWorkers; i++)
     428                 :     {
     429                 :         ArchiveHandle *AH = pstate->parallelSlot[i].AH;
     430                 :         char        errbuf[1];
     431                 : 
     432                 :         if (AH != NULL && AH->connCancel != NULL)
     433                 :             (void) PQcancel(AH->connCancel, errbuf, sizeof(errbuf));
     434                 :     }
     435                 :     LeaveCriticalSection(&signal_info_lock);
     436                 : #endif
     437 EUB             : 
     438                 :     /* Now wait for them to terminate. */
     439 UIC           0 :     WaitForTerminatingWorkers(pstate);
     440               0 : }
     441                 : 
     442                 : /*
     443                 :  * Wait for all workers to terminate.
     444 ECB             :  */
     445                 : static void
     446 CBC          13 : WaitForTerminatingWorkers(ParallelState *pstate)
     447                 : {
     448              41 :     while (!HasEveryWorkerTerminated(pstate))
     449                 :     {
     450 GIC          28 :         ParallelSlot *slot = NULL;
     451                 :         int         j;
     452                 : 
     453                 : #ifndef WIN32
     454 ECB             :         /* On non-Windows, use wait() to wait for next worker to end */
     455                 :         int         status;
     456 GIC          28 :         pid_t       pid = wait(&status);
     457 ECB             : 
     458                 :         /* Find dead worker's slot, and clear the PID field */
     459 CBC          45 :         for (j = 0; j < pstate->numWorkers; j++)
     460 ECB             :         {
     461 GIC          45 :             slot = &(pstate->parallelSlot[j]);
     462 CBC          45 :             if (slot->pid == pid)
     463 ECB             :             {
     464 GIC          28 :                 slot->pid = 0;
     465              28 :                 break;
     466                 :             }
     467                 :         }
     468                 : #else                           /* WIN32 */
     469                 :         /* On Windows, we must use WaitForMultipleObjects() */
     470                 :         HANDLE     *lpHandles = pg_malloc(sizeof(HANDLE) * pstate->numWorkers);
     471                 :         int         nrun = 0;
     472                 :         DWORD       ret;
     473                 :         uintptr_t   hThread;
     474                 : 
     475                 :         for (j = 0; j < pstate->numWorkers; j++)
     476                 :         {
     477                 :             if (WORKER_IS_RUNNING(pstate->parallelSlot[j].workerStatus))
     478                 :             {
     479                 :                 lpHandles[nrun] = (HANDLE) pstate->parallelSlot[j].hThread;
     480                 :                 nrun++;
     481                 :             }
     482                 :         }
     483                 :         ret = WaitForMultipleObjects(nrun, lpHandles, false, INFINITE);
     484                 :         Assert(ret != WAIT_FAILED);
     485                 :         hThread = (uintptr_t) lpHandles[ret - WAIT_OBJECT_0];
     486                 :         free(lpHandles);
     487                 : 
     488                 :         /* Find dead worker's slot, and clear the hThread field */
     489                 :         for (j = 0; j < pstate->numWorkers; j++)
     490                 :         {
     491                 :             slot = &(pstate->parallelSlot[j]);
     492                 :             if (slot->hThread == hThread)
     493                 :             {
     494                 :                 /* For cleanliness, close handles for dead threads */
     495                 :                 CloseHandle((HANDLE) slot->hThread);
     496                 :                 slot->hThread = (uintptr_t) INVALID_HANDLE_VALUE;
     497                 :                 break;
     498                 :             }
     499                 :         }
     500                 : #endif                          /* WIN32 */
     501 ECB             : 
     502                 :         /* On all platforms, update workerStatus and te[] as well */
     503 CBC          28 :         Assert(j < pstate->numWorkers);
     504 GIC          28 :         slot->workerStatus = WRKR_TERMINATED;
     505 CBC          28 :         pstate->te[j] = NULL;
     506                 :     }
     507 GIC          13 : }
     508                 : 
     509                 : 
     510                 : /*
     511                 :  * Code for responding to cancel interrupts (SIGINT, control-C, etc)
     512                 :  *
     513                 :  * This doesn't quite belong in this module, but it needs access to the
     514                 :  * ParallelState data, so there's not really a better place either.
     515                 :  *
     516                 :  * When we get a cancel interrupt, we could just die, but in pg_restore that
     517                 :  * could leave a SQL command (e.g., CREATE INDEX on a large table) running
     518                 :  * for a long time.  Instead, we try to send a cancel request and then die.
     519                 :  * pg_dump probably doesn't really need this, but we might as well use it
     520                 :  * there too.  Note that sending the cancel directly from the signal handler
     521                 :  * is safe because PQcancel() is written to make it so.
     522                 :  *
     523                 :  * In parallel operation on Unix, each process is responsible for canceling
     524                 :  * its own connection (this must be so because nobody else has access to it).
     525                 :  * Furthermore, the leader process should attempt to forward its signal to
     526                 :  * each child.  In simple manual use of pg_dump/pg_restore, forwarding isn't
     527                 :  * needed because typing control-C at the console would deliver SIGINT to
     528                 :  * every member of the terminal process group --- but in other scenarios it
     529                 :  * might be that only the leader gets signaled.
     530                 :  *
     531                 :  * On Windows, the cancel handler runs in a separate thread, because that's
     532                 :  * how SetConsoleCtrlHandler works.  We make it stop worker threads, send
     533                 :  * cancels on all active connections, and then return FALSE, which will allow
     534                 :  * the process to die.  For safety's sake, we use a critical section to
     535                 :  * protect the PGcancel structures against being changed while the signal
     536                 :  * thread runs.
     537                 :  */
     538                 : 
     539                 : #ifndef WIN32
     540                 : 
     541                 : /*
     542                 :  * Signal handler (Unix only)
     543 EUB             :  */
     544                 : static void
     545 UIC           0 : sigTermHandler(SIGNAL_ARGS)
     546                 : {
     547                 :     int         i;
     548                 :     char        errbuf[1];
     549                 : 
     550                 :     /*
     551                 :      * Some platforms allow delivery of new signals to interrupt an active
     552                 :      * signal handler.  That could muck up our attempt to send PQcancel, so
     553 EUB             :      * disable the signals that setup_cancel_handler enabled.
     554                 :      */
     555 UBC           0 :     pqsignal(SIGINT, SIG_IGN);
     556 UIC           0 :     pqsignal(SIGTERM, SIG_IGN);
     557               0 :     pqsignal(SIGQUIT, SIG_IGN);
     558                 : 
     559                 :     /*
     560                 :      * If we're in the leader, forward signal to all workers.  (It seems best
     561                 :      * to do this before PQcancel; killing the leader transaction will result
     562                 :      * in invalid-snapshot errors from active workers, which maybe we can
     563 EUB             :      * quiet by killing workers first.)  Ignore any errors.
     564                 :      */
     565 UBC           0 :     if (signal_info.pstate != NULL)
     566                 :     {
     567               0 :         for (i = 0; i < signal_info.pstate->numWorkers; i++)
     568                 :         {
     569               0 :             pid_t       pid = signal_info.pstate->parallelSlot[i].pid;
     570 EUB             : 
     571 UIC           0 :             if (pid != 0)
     572               0 :                 kill(pid, SIGTERM);
     573                 :         }
     574                 :     }
     575                 : 
     576                 :     /*
     577                 :      * Send QueryCancel if we have a connection to send to.  Ignore errors,
     578 EUB             :      * there's not much we can do about them anyway.
     579                 :      */
     580 UIC           0 :     if (signal_info.myAH != NULL && signal_info.myAH->connCancel != NULL)
     581               0 :         (void) PQcancel(signal_info.myAH->connCancel, errbuf, sizeof(errbuf));
     582                 : 
     583                 :     /*
     584                 :      * Report we're quitting, using nothing more complicated than write(2).
     585 EUB             :      * When in parallel operation, only the leader process should do this.
     586                 :      */
     587 UBC           0 :     if (!signal_info.am_worker)
     588                 :     {
     589               0 :         if (progname)
     590 EUB             :         {
     591 UIC           0 :             write_stderr(progname);
     592 UBC           0 :             write_stderr(": ");
     593                 :         }
     594 UIC           0 :         write_stderr("terminated by user\n");
     595                 :     }
     596                 : 
     597                 :     /*
     598                 :      * And die, using _exit() not exit() because the latter will invoke atexit
     599 EUB             :      * handlers that can fail if we interrupted related code.
     600                 :      */
     601 UIC           0 :     _exit(1);
     602                 : }
     603                 : 
     604                 : /*
     605                 :  * Enable cancel interrupt handler, if not already done.
     606 ECB             :  */
     607                 : static void
     608 GIC         383 : setup_cancel_handler(void)
     609                 : {
     610                 :     /*
     611                 :      * When forking, signal_info.handler_set will propagate into the new
     612 ECB             :      * process, but that's fine because the signal handler state does too.
     613                 :      */
     614 CBC         383 :     if (!signal_info.handler_set)
     615                 :     {
     616             141 :         signal_info.handler_set = true;
     617 ECB             : 
     618 CBC         141 :         pqsignal(SIGINT, sigTermHandler);
     619 GIC         141 :         pqsignal(SIGTERM, sigTermHandler);
     620 CBC         141 :         pqsignal(SIGQUIT, sigTermHandler);
     621                 :     }
     622 GIC         383 : }
     623                 : 
     624                 : #else                           /* WIN32 */
     625                 : 
     626                 : /*
     627                 :  * Console interrupt handler --- runs in a newly-started thread.
     628                 :  *
     629                 :  * After stopping other threads and sending cancel requests on all open
     630                 :  * connections, we return FALSE which will allow the default ExitProcess()
     631                 :  * action to be taken.
     632                 :  */
     633                 : static BOOL WINAPI
     634                 : consoleHandler(DWORD dwCtrlType)
     635                 : {
     636                 :     int         i;
     637                 :     char        errbuf[1];
     638                 : 
     639                 :     if (dwCtrlType == CTRL_C_EVENT ||
     640                 :         dwCtrlType == CTRL_BREAK_EVENT)
     641                 :     {
     642                 :         /* Critical section prevents changing data we look at here */
     643                 :         EnterCriticalSection(&signal_info_lock);
     644                 : 
     645                 :         /*
     646                 :          * If in parallel mode, stop worker threads and send QueryCancel to
     647                 :          * their connected backends.  The main point of stopping the worker
     648                 :          * threads is to keep them from reporting the query cancels as errors,
     649                 :          * which would clutter the user's screen.  We needn't stop the leader
     650                 :          * thread since it won't be doing much anyway.  Do this before
     651                 :          * canceling the main transaction, else we might get invalid-snapshot
     652                 :          * errors reported before we can stop the workers.  Ignore errors,
     653                 :          * there's not much we can do about them anyway.
     654                 :          */
     655                 :         if (signal_info.pstate != NULL)
     656                 :         {
     657                 :             for (i = 0; i < signal_info.pstate->numWorkers; i++)
     658                 :             {
     659                 :                 ParallelSlot *slot = &(signal_info.pstate->parallelSlot[i]);
     660                 :                 ArchiveHandle *AH = slot->AH;
     661                 :                 HANDLE      hThread = (HANDLE) slot->hThread;
     662                 : 
     663                 :                 /*
     664                 :                  * Using TerminateThread here may leave some resources leaked,
     665                 :                  * but it doesn't matter since we're about to end the whole
     666                 :                  * process.
     667                 :                  */
     668                 :                 if (hThread != INVALID_HANDLE_VALUE)
     669                 :                     TerminateThread(hThread, 0);
     670                 : 
     671                 :                 if (AH != NULL && AH->connCancel != NULL)
     672                 :                     (void) PQcancel(AH->connCancel, errbuf, sizeof(errbuf));
     673                 :             }
     674                 :         }
     675                 : 
     676                 :         /*
     677                 :          * Send QueryCancel to leader connection, if enabled.  Ignore errors,
     678                 :          * there's not much we can do about them anyway.
     679                 :          */
     680                 :         if (signal_info.myAH != NULL && signal_info.myAH->connCancel != NULL)
     681                 :             (void) PQcancel(signal_info.myAH->connCancel,
     682                 :                             errbuf, sizeof(errbuf));
     683                 : 
     684                 :         LeaveCriticalSection(&signal_info_lock);
     685                 : 
     686                 :         /*
     687                 :          * Report we're quitting, using nothing more complicated than
     688                 :          * write(2).  (We might be able to get away with using pg_log_*()
     689                 :          * here, but since we terminated other threads uncleanly above, it
     690                 :          * seems better to assume as little as possible.)
     691                 :          */
     692                 :         if (progname)
     693                 :         {
     694                 :             write_stderr(progname);
     695                 :             write_stderr(": ");
     696                 :         }
     697                 :         write_stderr("terminated by user\n");
     698                 :     }
     699                 : 
     700                 :     /* Always return FALSE to allow signal handling to continue */
     701                 :     return FALSE;
     702                 : }
     703                 : 
     704                 : /*
     705                 :  * Enable cancel interrupt handler, if not already done.
     706                 :  */
     707                 : static void
     708                 : setup_cancel_handler(void)
     709                 : {
     710                 :     if (!signal_info.handler_set)
     711                 :     {
     712                 :         signal_info.handler_set = true;
     713                 : 
     714                 :         InitializeCriticalSection(&signal_info_lock);
     715                 : 
     716                 :         SetConsoleCtrlHandler(consoleHandler, TRUE);
     717                 :     }
     718                 : }
     719                 : 
     720                 : #endif                          /* WIN32 */
     721                 : 
     722                 : 
     723                 : /*
     724                 :  * set_archive_cancel_info
     725                 :  *
     726                 :  * Fill AH->connCancel with cancellation info for the specified database
     727                 :  * connection; or clear it if conn is NULL.
     728 ECB             :  */
     729                 : void
     730 GIC         383 : set_archive_cancel_info(ArchiveHandle *AH, PGconn *conn)
     731                 : {
     732                 :     PGcancel   *oldConnCancel;
     733                 : 
     734                 :     /*
     735                 :      * Activate the interrupt handler if we didn't yet in this process.  On
     736                 :      * Windows, this also initializes signal_info_lock; therefore it's
     737                 :      * important that this happen at least once before we fork off any
     738 ECB             :      * threads.
     739                 :      */
     740 GIC         383 :     setup_cancel_handler();
     741                 : 
     742                 :     /*
     743                 :      * On Unix, we assume that storing a pointer value is atomic with respect
     744                 :      * to any possible signal interrupt.  On Windows, use a critical section.
     745                 :      */
     746                 : 
     747                 : #ifdef WIN32
     748                 :     EnterCriticalSection(&signal_info_lock);
     749                 : #endif
     750 ECB             : 
     751                 :     /* Free the old one if we have one */
     752 CBC         383 :     oldConnCancel = AH->connCancel;
     753                 :     /* be sure interrupt handler doesn't use pointer while freeing */
     754             383 :     AH->connCancel = NULL;
     755 ECB             : 
     756 GIC         383 :     if (oldConnCancel != NULL)
     757             193 :         PQfreeCancel(oldConnCancel);
     758 ECB             : 
     759                 :     /* Set the new one if specified */
     760 GIC         383 :     if (conn)
     761             195 :         AH->connCancel = PQgetCancel(conn);
     762                 : 
     763                 :     /*
     764                 :      * On Unix, there's only ever one active ArchiveHandle per process, so we
     765                 :      * can just set signal_info.myAH unconditionally.  On Windows, do that
     766                 :      * only in the main thread; worker threads have to make sure their
     767                 :      * ArchiveHandle appears in the pstate data, which is dealt with in
     768                 :      * RunWorker().
     769 ECB             :      */
     770                 : #ifndef WIN32
     771 GIC         383 :     signal_info.myAH = AH;
     772                 : #else
     773                 :     if (mainThreadId == GetCurrentThreadId())
     774                 :         signal_info.myAH = AH;
     775                 : #endif
     776                 : 
     777                 : #ifdef WIN32
     778 ECB             :     LeaveCriticalSection(&signal_info_lock);
     779                 : #endif
     780 GIC         383 : }
     781                 : 
     782                 : /*
     783                 :  * set_cancel_pstate
     784                 :  *
     785                 :  * Set signal_info.pstate to point to the specified ParallelState, if any.
     786                 :  * We need this mainly to have an interlock against Windows signal thread.
     787 ECB             :  */
     788                 : static void
     789 GIC          26 : set_cancel_pstate(ParallelState *pstate)
     790                 : {
     791                 : #ifdef WIN32
     792                 :     EnterCriticalSection(&signal_info_lock);
     793 ECB             : #endif
     794                 : 
     795 GIC          26 :     signal_info.pstate = pstate;
     796                 : 
     797                 : #ifdef WIN32
     798 ECB             :     LeaveCriticalSection(&signal_info_lock);
     799                 : #endif
     800 GIC          26 : }
     801                 : 
     802                 : /*
     803                 :  * set_cancel_slot_archive
     804                 :  *
     805                 :  * Set ParallelSlot's AH field to point to the specified archive, if any.
     806                 :  * We need this mainly to have an interlock against Windows signal thread.
     807 ECB             :  */
     808                 : static void
     809 GIC          56 : set_cancel_slot_archive(ParallelSlot *slot, ArchiveHandle *AH)
     810                 : {
     811                 : #ifdef WIN32
     812                 :     EnterCriticalSection(&signal_info_lock);
     813 ECB             : #endif
     814                 : 
     815 GIC          56 :     slot->AH = AH;
     816                 : 
     817                 : #ifdef WIN32
     818 ECB             :     LeaveCriticalSection(&signal_info_lock);
     819                 : #endif
     820 GIC          56 : }
     821                 : 
     822                 : 
     823                 : /*
     824                 :  * This function is called by both Unix and Windows variants to set up
     825                 :  * and run a worker process.  Caller should exit the process (or thread)
     826                 :  * upon return.
     827 ECB             :  */
     828                 : static void
     829 GIC          28 : RunWorker(ArchiveHandle *AH, ParallelSlot *slot)
     830                 : {
     831                 :     int         pipefd[2];
     832 ECB             : 
     833                 :     /* fetch child ends of pipes */
     834 GIC          28 :     pipefd[PIPE_READ] = slot->pipeRevRead;
     835              28 :     pipefd[PIPE_WRITE] = slot->pipeRevWrite;
     836                 : 
     837                 :     /*
     838                 :      * Clone the archive so that we have our own state to work with, and in
     839                 :      * particular our own database connection.
     840                 :      *
     841                 :      * We clone on Unix as well as Windows, even though technically we don't
     842                 :      * need to because fork() gives us a copy in our own address space
     843                 :      * already.  But CloneArchive resets the state information and also clones
     844 ECB             :      * the database connection which both seem kinda helpful.
     845                 :      */
     846 GIC          28 :     AH = CloneArchive(AH);
     847 ECB             : 
     848                 :     /* Remember cloned archive where signal handler can find it */
     849 GIC          28 :     set_cancel_slot_archive(slot, AH);
     850                 : 
     851                 :     /*
     852 ECB             :      * Call the setup worker function that's defined in the ArchiveHandle.
     853                 :      */
     854 GIC          28 :     (AH->SetupWorkerPtr) ((Archive *) AH);
     855                 : 
     856                 :     /*
     857 ECB             :      * Execute commands until done.
     858                 :      */
     859 GIC          28 :     WaitForCommands(AH, pipefd);
     860                 : 
     861                 :     /*
     862 ECB             :      * Disconnect from database and clean up.
     863                 :      */
     864 CBC          28 :     set_cancel_slot_archive(slot, NULL);
     865              28 :     DisconnectDatabase(&(AH->public));
     866 GIC          28 :     DeCloneArchive(AH);
     867              28 : }
     868                 : 
     869                 : /*
     870                 :  * Thread base function for Windows
     871                 :  */
     872                 : #ifdef WIN32
     873                 : static unsigned __stdcall
     874                 : init_spawned_worker_win32(WorkerInfo *wi)
     875                 : {
     876                 :     ArchiveHandle *AH = wi->AH;
     877                 :     ParallelSlot *slot = wi->slot;
     878                 : 
     879                 :     /* Don't need WorkerInfo anymore */
     880                 :     free(wi);
     881                 : 
     882                 :     /* Run the worker ... */
     883                 :     RunWorker(AH, slot);
     884                 : 
     885                 :     /* Exit the thread */
     886                 :     _endthreadex(0);
     887                 :     return 0;
     888                 : }
     889                 : #endif                          /* WIN32 */
     890                 : 
     891                 : /*
     892                 :  * This function starts a parallel dump or restore by spawning off the worker
     893                 :  * processes.  For Windows, it creates a number of threads; on Unix the
     894                 :  * workers are created with fork().
     895 ECB             :  */
     896                 : ParallelState *
     897 GIC          15 : ParallelBackupStart(ArchiveHandle *AH)
     898                 : {
     899                 :     ParallelState *pstate;
     900 ECB             :     int         i;
     901                 : 
     902 CBC          15 :     Assert(AH->public.numWorkers > 0);
     903                 : 
     904              15 :     pstate = (ParallelState *) pg_malloc(sizeof(ParallelState));
     905 ECB             : 
     906 CBC          15 :     pstate->numWorkers = AH->public.numWorkers;
     907 GIC          15 :     pstate->te = NULL;
     908 CBC          15 :     pstate->parallelSlot = NULL;
     909 ECB             : 
     910 GIC          15 :     if (AH->public.numWorkers == 1)
     911               2 :         return pstate;
     912 ECB             : 
     913                 :     /* Create status arrays, being sure to initialize all fields to 0 */
     914 CBC          13 :     pstate->te = (TocEntry **)
     915              13 :         pg_malloc0(pstate->numWorkers * sizeof(TocEntry *));
     916 GIC          13 :     pstate->parallelSlot = (ParallelSlot *)
     917              13 :         pg_malloc0(pstate->numWorkers * sizeof(ParallelSlot));
     918                 : 
     919                 : #ifdef WIN32
     920                 :     /* Make fmtId() and fmtQualifiedId() use thread-local storage */
     921                 :     getLocalPQExpBuffer = getThreadLocalPQExpBuffer;
     922                 : #endif
     923                 : 
     924                 :     /*
     925                 :      * Set the pstate in shutdown_info, to tell the exit handler that it must
     926                 :      * clean up workers as well as the main database connection.  But we don't
     927                 :      * set this in signal_info yet, because we don't want child processes to
     928 ECB             :      * inherit non-NULL signal_info.pstate.
     929                 :      */
     930 GIC          13 :     shutdown_info.pstate = pstate;
     931                 : 
     932                 :     /*
     933                 :      * Temporarily disable query cancellation on the leader connection.  This
     934                 :      * ensures that child processes won't inherit valid AH->connCancel
     935                 :      * settings and thus won't try to issue cancels against the leader's
     936                 :      * connection.  No harm is done if we fail while it's disabled, because
     937 ECB             :      * the leader connection is idle at this point anyway.
     938                 :      */
     939 GIC          13 :     set_archive_cancel_info(AH, NULL);
     940 ECB             : 
     941                 :     /* Ensure stdio state is quiesced before forking */
     942 GIC          13 :     fflush(NULL);
     943 ECB             : 
     944                 :     /* Create desired number of workers */
     945 GIC          41 :     for (i = 0; i < pstate->numWorkers; i++)
     946                 :     {
     947                 : #ifdef WIN32
     948                 :         WorkerInfo *wi;
     949                 :         uintptr_t   handle;
     950                 : #else
     951 ECB             :         pid_t       pid;
     952                 : #endif
     953 GIC          28 :         ParallelSlot *slot = &(pstate->parallelSlot[i]);
     954                 :         int         pipeMW[2],
     955                 :                     pipeWM[2];
     956 ECB             : 
     957 EUB             :         /* Create communication pipes for this worker */
     958 GIC          28 :         if (pgpipe(pipeMW) < 0 || pgpipe(pipeWM) < 0)
     959 UIC           0 :             pg_fatal("could not create communication channels: %m");
     960 ECB             : 
     961                 :         /* leader's ends of the pipes */
     962 GIC          28 :         slot->pipeRead = pipeWM[PIPE_READ];
     963 CBC          28 :         slot->pipeWrite = pipeMW[PIPE_WRITE];
     964 ECB             :         /* child's ends of the pipes */
     965 GIC          28 :         slot->pipeRevRead = pipeMW[PIPE_READ];
     966              28 :         slot->pipeRevWrite = pipeWM[PIPE_WRITE];
     967                 : 
     968                 : #ifdef WIN32
     969                 :         /* Create transient structure to pass args to worker function */
     970                 :         wi = (WorkerInfo *) pg_malloc(sizeof(WorkerInfo));
     971                 : 
     972                 :         wi->AH = AH;
     973                 :         wi->slot = slot;
     974                 : 
     975                 :         handle = _beginthreadex(NULL, 0, (void *) &init_spawned_worker_win32,
     976                 :                                 wi, 0, &(slot->threadId));
     977                 :         slot->hThread = handle;
     978 ECB             :         slot->workerStatus = WRKR_IDLE;
     979                 : #else                           /* !WIN32 */
     980 GIC          28 :         pid = fork();
     981              56 :         if (pid == 0)
     982                 :         {
     983                 :             /* we are the worker */
     984                 :             int         j;
     985 ECB             : 
     986                 :             /* this is needed for GetMyPSlot() */
     987 GIC          28 :             slot->pid = getpid();
     988 ECB             : 
     989                 :             /* instruct signal handler that we're in a worker now */
     990 GIC          28 :             signal_info.am_worker = true;
     991 ECB             : 
     992                 :             /* close read end of Worker -> Leader */
     993 CBC          28 :             closesocket(pipeWM[PIPE_READ]);
     994                 :             /* close write end of Leader -> Worker */
     995 GIC          28 :             closesocket(pipeMW[PIPE_WRITE]);
     996                 : 
     997                 :             /*
     998                 :              * Close all inherited fds for communication of the leader with
     999 ECB             :              * previously-forked workers.
    1000                 :              */
    1001 CBC          45 :             for (j = 0; j < i; j++)
    1002 ECB             :             {
    1003 GIC          17 :                 closesocket(pstate->parallelSlot[j].pipeRead);
    1004              17 :                 closesocket(pstate->parallelSlot[j].pipeWrite);
    1005                 :             }
    1006 ECB             : 
    1007                 :             /* Run the worker ... */
    1008 GIC          28 :             RunWorker(AH, slot);
    1009 ECB             : 
    1010                 :             /* We can just exit(0) when done */
    1011 CBC          28 :             exit(0);
    1012                 :         }
    1013 GIC          28 :         else if (pid < 0)
    1014 EUB             :         {
    1015                 :             /* fork failed */
    1016 UIC           0 :             pg_fatal("could not create worker process: %m");
    1017                 :         }
    1018 ECB             : 
    1019                 :         /* In Leader after successful fork */
    1020 GIC          28 :         slot->pid = pid;
    1021              28 :         slot->workerStatus = WRKR_IDLE;
    1022 ECB             : 
    1023                 :         /* close read end of Leader -> Worker */
    1024 CBC          28 :         closesocket(pipeMW[PIPE_READ]);
    1025                 :         /* close write end of Worker -> Leader */
    1026 GIC          28 :         closesocket(pipeWM[PIPE_WRITE]);
    1027                 : #endif                          /* WIN32 */
    1028                 :     }
    1029                 : 
    1030                 :     /*
    1031                 :      * Having forked off the workers, disable SIGPIPE so that leader isn't
    1032                 :      * killed if it tries to send a command to a dead worker.  We don't want
    1033                 :      * the workers to inherit this setting, though.
    1034 ECB             :      */
    1035                 : #ifndef WIN32
    1036 GIC          13 :     pqsignal(SIGPIPE, SIG_IGN);
    1037                 : #endif
    1038                 : 
    1039                 :     /*
    1040 ECB             :      * Re-establish query cancellation on the leader connection.
    1041                 :      */
    1042 GIC          13 :     set_archive_cancel_info(AH, AH->connection);
    1043                 : 
    1044                 :     /*
    1045                 :      * Tell the cancel signal handler to forward signals to worker processes,
    1046                 :      * too.  (As with query cancel, we did not need this earlier because the
    1047                 :      * workers have not yet been given anything to do; if we die before this
    1048 ECB             :      * point, any already-started workers will see EOF and quit promptly.)
    1049                 :      */
    1050 CBC          13 :     set_cancel_pstate(pstate);
    1051                 : 
    1052 GIC          13 :     return pstate;
    1053                 : }
    1054                 : 
    1055                 : /*
    1056                 :  * Close down a parallel dump or restore.
    1057 ECB             :  */
    1058                 : void
    1059 GIC          15 : ParallelBackupEnd(ArchiveHandle *AH, ParallelState *pstate)
    1060                 : {
    1061                 :     int         i;
    1062 ECB             : 
    1063                 :     /* No work if non-parallel */
    1064 GIC          15 :     if (pstate->numWorkers == 1)
    1065               2 :         return;
    1066 ECB             : 
    1067                 :     /* There should not be any unfinished jobs */
    1068 GIC          13 :     Assert(IsEveryWorkerIdle(pstate));
    1069 ECB             : 
    1070                 :     /* Close the sockets so that the workers know they can exit */
    1071 CBC          41 :     for (i = 0; i < pstate->numWorkers; i++)
    1072 ECB             :     {
    1073 GIC          28 :         closesocket(pstate->parallelSlot[i].pipeRead);
    1074              28 :         closesocket(pstate->parallelSlot[i].pipeWrite);
    1075                 :     }
    1076 ECB             : 
    1077                 :     /* Wait for them to exit */
    1078 GIC          13 :     WaitForTerminatingWorkers(pstate);
    1079                 : 
    1080                 :     /*
    1081                 :      * Unlink pstate from shutdown_info, so the exit handler will not try to
    1082 ECB             :      * use it; and likewise unlink from signal_info.
    1083                 :      */
    1084 GIC          13 :     shutdown_info.pstate = NULL;
    1085              13 :     set_cancel_pstate(NULL);
    1086 ECB             : 
    1087                 :     /* Release state (mere neatnik-ism, since we're about to terminate) */
    1088 CBC          13 :     free(pstate->te);
    1089 GIC          13 :     free(pstate->parallelSlot);
    1090              13 :     free(pstate);
    1091                 : }
    1092                 : 
    1093                 : /*
    1094                 :  * These next four functions handle construction and parsing of the command
    1095                 :  * strings and response strings for parallel workers.
    1096                 :  *
    1097                 :  * Currently, these can be the same regardless of which archive format we are
    1098                 :  * processing.  In future, we might want to let format modules override these
    1099                 :  * functions to add format-specific data to a command or response.
    1100                 :  */
    1101                 : 
    1102                 : /*
    1103                 :  * buildWorkerCommand: format a command string to send to a worker.
    1104                 :  *
    1105                 :  * The string is built in the caller-supplied buffer of size buflen.
    1106 ECB             :  */
    1107                 : static void
    1108 GIC         168 : buildWorkerCommand(ArchiveHandle *AH, TocEntry *te, T_Action act,
    1109 ECB             :                    char *buf, int buflen)
    1110                 : {
    1111 CBC         168 :     if (act == ACT_DUMP)
    1112             122 :         snprintf(buf, buflen, "DUMP %d", te->dumpId);
    1113 GIC          46 :     else if (act == ACT_RESTORE)
    1114 GBC          46 :         snprintf(buf, buflen, "RESTORE %d", te->dumpId);
    1115 ECB             :     else
    1116 UIC           0 :         Assert(false);
    1117 GIC         168 : }
    1118                 : 
    1119                 : /*
    1120                 :  * parseWorkerCommand: interpret a command string in a worker.
    1121 ECB             :  */
    1122                 : static void
    1123 GIC         168 : parseWorkerCommand(ArchiveHandle *AH, TocEntry **te, T_Action *act,
    1124                 :                    const char *msg)
    1125                 : {
    1126                 :     DumpId      dumpId;
    1127 ECB             :     int         nBytes;
    1128                 : 
    1129 CBC         168 :     if (messageStartsWith(msg, "DUMP "))
    1130 ECB             :     {
    1131 CBC         122 :         *act = ACT_DUMP;
    1132             122 :         sscanf(msg, "DUMP %d%n", &dumpId, &nBytes);
    1133             122 :         Assert(nBytes == strlen(msg));
    1134 GIC         122 :         *te = getTocEntryByDumpId(AH, dumpId);
    1135 CBC         122 :         Assert(*te != NULL);
    1136                 :     }
    1137              46 :     else if (messageStartsWith(msg, "RESTORE "))
    1138 ECB             :     {
    1139 CBC          46 :         *act = ACT_RESTORE;
    1140              46 :         sscanf(msg, "RESTORE %d%n", &dumpId, &nBytes);
    1141              46 :         Assert(nBytes == strlen(msg));
    1142 GIC          46 :         *te = getTocEntryByDumpId(AH, dumpId);
    1143              46 :         Assert(*te != NULL);
    1144 EUB             :     }
    1145                 :     else
    1146 LBC           0 :         pg_fatal("unrecognized command received from leader: \"%s\"",
    1147                 :                  msg);
    1148 GIC         168 : }
    1149                 : 
    1150                 : /*
    1151                 :  * buildWorkerResponse: format a response string to send to the leader.
    1152                 :  *
    1153                 :  * The string is built in the caller-supplied buffer of size buflen.
    1154 ECB             :  */
    1155                 : static void
    1156 GIC         168 : buildWorkerResponse(ArchiveHandle *AH, TocEntry *te, T_Action act, int status,
    1157 ECB             :                     char *buf, int buflen)
    1158                 : {
    1159 GIC         168 :     snprintf(buf, buflen, "OK %d %d %d",
    1160                 :              te->dumpId,
    1161 ECB             :              status,
    1162                 :              status == WORKER_IGNORED_ERRORS ? AH->public.n_errors : 0);
    1163 GIC         168 : }
    1164                 : 
    1165                 : /*
    1166                 :  * parseWorkerResponse: parse the status message returned by a worker.
    1167                 :  *
    1168                 :  * Returns the integer status code, and may update fields of AH and/or te.
    1169 ECB             :  */
    1170                 : static int
    1171 GIC         168 : parseWorkerResponse(ArchiveHandle *AH, TocEntry *te,
    1172                 :                     const char *msg)
    1173                 : {
    1174                 :     DumpId      dumpId;
    1175 ECB             :     int         nBytes,
    1176                 :                 n_errors;
    1177 CBC         168 :     int         status = 0;
    1178                 : 
    1179             168 :     if (messageStartsWith(msg, "OK "))
    1180                 :     {
    1181             168 :         sscanf(msg, "OK %d %d %d%n", &dumpId, &status, &n_errors, &nBytes);
    1182 ECB             : 
    1183 GIC         168 :         Assert(dumpId == te->dumpId);
    1184 CBC         168 :         Assert(nBytes == strlen(msg));
    1185                 : 
    1186 GIC         168 :         AH->public.n_errors += n_errors;
    1187 EUB             :     }
    1188                 :     else
    1189 UIC           0 :         pg_fatal("invalid message received from worker: \"%s\"",
    1190 ECB             :                  msg);
    1191                 : 
    1192 GIC         168 :     return status;
    1193                 : }
    1194                 : 
    1195                 : /*
    1196                 :  * Dispatch a job to some free worker.
    1197                 :  *
    1198                 :  * te is the TocEntry to be processed, act is the action to be taken on it.
    1199                 :  * callback is the function to call on completion of the job.
    1200                 :  *
    1201                 :  * If no worker is currently available, this will block, and previously
    1202                 :  * registered callback functions may be called.
    1203 ECB             :  */
    1204                 : void
    1205 GIC         168 : DispatchJobForTocEntry(ArchiveHandle *AH,
    1206                 :                        ParallelState *pstate,
    1207                 :                        TocEntry *te,
    1208                 :                        T_Action act,
    1209                 :                        ParallelCompletionPtr callback,
    1210                 :                        void *callback_data)
    1211                 : {
    1212                 :     int         worker;
    1213                 :     char        buf[256];
    1214 ECB             : 
    1215                 :     /* Get a worker, waiting if none are idle */
    1216 GIC         273 :     while ((worker = GetIdleWorker(pstate)) == NO_SLOT)
    1217             105 :         WaitForWorkers(AH, pstate, WFW_ONE_IDLE);
    1218 ECB             : 
    1219                 :     /* Construct and send command string */
    1220 CBC         168 :     buildWorkerCommand(AH, te, act, buf, sizeof(buf));
    1221                 : 
    1222 GIC         168 :     sendMessageToWorker(pstate, worker, buf);
    1223 ECB             : 
    1224                 :     /* Remember worker is busy, and which TocEntry it's working on */
    1225 CBC         168 :     pstate->parallelSlot[worker].workerStatus = WRKR_WORKING;
    1226             168 :     pstate->parallelSlot[worker].callback = callback;
    1227             168 :     pstate->parallelSlot[worker].callback_data = callback_data;
    1228 GIC         168 :     pstate->te[worker] = te;
    1229             168 : }
    1230                 : 
    1231                 : /*
    1232                 :  * Find an idle worker and return its slot number.
    1233                 :  * Return NO_SLOT if none are idle.
    1234 ECB             :  */
    1235                 : static int
    1236 GIC         423 : GetIdleWorker(ParallelState *pstate)
    1237                 : {
    1238 ECB             :     int         i;
    1239                 : 
    1240 CBC        1083 :     for (i = 0; i < pstate->numWorkers; i++)
    1241 ECB             :     {
    1242 GIC         838 :         if (pstate->parallelSlot[i].workerStatus == WRKR_IDLE)
    1243 CBC         178 :             return i;
    1244                 :     }
    1245 GIC         245 :     return NO_SLOT;
    1246                 : }
    1247                 : 
    1248                 : /*
    1249                 :  * Return true iff no worker is running.
    1250 ECB             :  */
    1251                 : static bool
    1252 GIC          41 : HasEveryWorkerTerminated(ParallelState *pstate)
    1253                 : {
    1254 ECB             :     int         i;
    1255                 : 
    1256 CBC          76 :     for (i = 0; i < pstate->numWorkers; i++)
    1257 ECB             :     {
    1258 GIC          63 :         if (WORKER_IS_RUNNING(pstate->parallelSlot[i].workerStatus))
    1259 CBC          28 :             return false;
    1260                 :     }
    1261 GIC          13 :     return true;
    1262                 : }
    1263                 : 
    1264                 : /*
    1265                 :  * Return true iff every worker is in the WRKR_IDLE state.
    1266 ECB             :  */
    1267                 : bool
    1268 GIC          71 : IsEveryWorkerIdle(ParallelState *pstate)
    1269                 : {
    1270 ECB             :     int         i;
    1271                 : 
    1272 CBC         156 :     for (i = 0; i < pstate->numWorkers; i++)
    1273 ECB             :     {
    1274 GIC         122 :         if (pstate->parallelSlot[i].workerStatus != WRKR_IDLE)
    1275 CBC          37 :             return false;
    1276                 :     }
    1277 GIC          34 :     return true;
    1278                 : }
    1279                 : 
    1280                 : /*
    1281                 :  * Acquire lock on a table to be dumped by a worker process.
    1282                 :  *
    1283                 :  * The leader process is already holding an ACCESS SHARE lock.  Ordinarily
    1284                 :  * it's no problem for a worker to get one too, but if anything else besides
    1285                 :  * pg_dump is running, there's a possible deadlock:
    1286                 :  *
    1287                 :  * 1) Leader dumps the schema and locks all tables in ACCESS SHARE mode.
    1288                 :  * 2) Another process requests an ACCESS EXCLUSIVE lock (which is not granted
    1289                 :  *    because the leader holds a conflicting ACCESS SHARE lock).
    1290                 :  * 3) A worker process also requests an ACCESS SHARE lock to read the table.
    1291                 :  *    The worker is enqueued behind the ACCESS EXCLUSIVE lock request.
    1292                 :  * 4) Now we have a deadlock, since the leader is effectively waiting for
    1293                 :  *    the worker.  The server cannot detect that, however.
    1294                 :  *
    1295                 :  * To prevent an infinite wait, prior to touching a table in a worker, request
    1296                 :  * a lock in ACCESS SHARE mode but with NOWAIT.  If we don't get the lock,
    1297                 :  * then we know that somebody else has requested an ACCESS EXCLUSIVE lock and
    1298                 :  * so we have a deadlock.  We must fail the backup in that case.
    1299 ECB             :  */
    1300                 : static void
    1301 GIC         122 : lockTableForWorker(ArchiveHandle *AH, TocEntry *te)
    1302                 : {
    1303                 :     const char *qualId;
    1304                 :     PQExpBuffer query;
    1305                 :     PGresult   *res;
    1306 ECB             : 
    1307                 :     /* Nothing to do for BLOBS */
    1308 GIC         122 :     if (strcmp(te->desc, "BLOBS") == 0)
    1309 CBC           4 :         return;
    1310                 : 
    1311             118 :     query = createPQExpBuffer();
    1312                 : 
    1313             118 :     qualId = fmtQualifiedId(te->namespace, te->tag);
    1314                 : 
    1315 GIC         118 :     appendPQExpBuffer(query, "LOCK TABLE %s IN ACCESS SHARE MODE NOWAIT",
    1316 ECB             :                       qualId);
    1317                 : 
    1318 CBC         118 :     res = PQexec(AH->connection, query->data);
    1319 EUB             : 
    1320 GIC         118 :     if (!res || PQresultStatus(res) != PGRES_COMMAND_OK)
    1321 UIC           0 :         pg_fatal("could not obtain lock on relation \"%s\"\n"
    1322                 :                  "This usually means that someone requested an ACCESS EXCLUSIVE lock "
    1323                 :                  "on the table after the pg_dump parent process had gotten the "
    1324 ECB             :                  "initial ACCESS SHARE lock on the table.", qualId);
    1325                 : 
    1326 GIC         118 :     PQclear(res);
    1327             118 :     destroyPQExpBuffer(query);
    1328                 : }
    1329                 : 
    1330                 : /*
    1331                 :  * WaitForCommands: main routine for a worker process.
    1332                 :  *
    1333                 :  * Read and execute commands from the leader until we see EOF on the pipe.
    1334 ECB             :  */
    1335                 : static void
    1336 GIC          28 : WaitForCommands(ArchiveHandle *AH, int pipefd[2])
    1337                 : {
    1338                 :     char       *command;
    1339 ECB             :     TocEntry   *te;
    1340                 :     T_Action    act;
    1341 GIC          28 :     int         status = 0;
    1342                 :     char        buf[256];
    1343                 : 
    1344 ECB             :     for (;;)
    1345                 :     {
    1346 GIC         196 :         if (!(command = getMessageFromLeader(pipefd)))
    1347 ECB             :         {
    1348                 :             /* EOF, so done */
    1349 GIC          28 :             return;
    1350                 :         }
    1351 ECB             : 
    1352                 :         /* Decode the command */
    1353 CBC         168 :         parseWorkerCommand(AH, &te, &act, command);
    1354                 : 
    1355 GIC         168 :         if (act == ACT_DUMP)
    1356 ECB             :         {
    1357                 :             /* Acquire lock on this table within the worker's session */
    1358 GIC         122 :             lockTableForWorker(AH, te);
    1359 ECB             : 
    1360                 :             /* Perform the dump command */
    1361 CBC         122 :             status = (AH->WorkerJobDumpPtr) (AH, te);
    1362                 :         }
    1363 GIC          46 :         else if (act == ACT_RESTORE)
    1364 ECB             :         {
    1365                 :             /* Perform the restore command */
    1366 GIC          46 :             status = (AH->WorkerJobRestorePtr) (AH, te);
    1367 EUB             :         }
    1368                 :         else
    1369 UIC           0 :             Assert(false);
    1370 ECB             : 
    1371                 :         /* Return status to leader */
    1372 CBC         168 :         buildWorkerResponse(AH, te, act, status, buf, sizeof(buf));
    1373                 : 
    1374 GIC         168 :         sendMessageToLeader(pipefd, buf);
    1375 ECB             : 
    1376                 :         /* command was pg_malloc'd and we are responsible for free()ing it. */
    1377 GIC         168 :         free(command);
    1378                 :     }
    1379                 : }
    1380                 : 
    1381                 : /*
    1382                 :  * Check for status messages from workers.
    1383                 :  *
    1384                 :  * If do_wait is true, wait to get a status message; otherwise, just return
    1385                 :  * immediately if there is none available.
    1386                 :  *
    1387                 :  * When we get a status message, we pass the status code to the callback
    1388                 :  * function that was specified to DispatchJobForTocEntry, then reset the
    1389                 :  * worker status to IDLE.
    1390                 :  *
    1391                 :  * Returns true if we collected a status message, else false.
    1392                 :  *
    1393                 :  * XXX is it worth checking for more than one status message per call?
    1394                 :  * It seems somewhat unlikely that multiple workers would finish at exactly
    1395                 :  * the same time.
    1396 ECB             :  */
    1397                 : static bool
    1398 GIC         327 : ListenToWorkers(ArchiveHandle *AH, ParallelState *pstate, bool do_wait)
    1399                 : {
    1400                 :     int         worker;
    1401                 :     char       *msg;
    1402 ECB             : 
    1403                 :     /* Try to collect a status message */
    1404 CBC         327 :     msg = getMessageFromWorker(pstate, do_wait, &worker);
    1405                 : 
    1406 GIC         327 :     if (!msg)
    1407 ECB             :     {
    1408 EUB             :         /* If do_wait is true, we must have detected EOF on some socket */
    1409 CBC         159 :         if (do_wait)
    1410 UIC           0 :             pg_fatal("a worker process died unexpectedly");
    1411 GIC         159 :         return false;
    1412                 :     }
    1413 ECB             : 
    1414                 :     /* Process it and update our idea of the worker's status */
    1415 CBC         168 :     if (messageStartsWith(msg, "OK "))
    1416 ECB             :     {
    1417 GIC         168 :         ParallelSlot *slot = &pstate->parallelSlot[worker];
    1418             168 :         TocEntry   *te = pstate->te[worker];
    1419 ECB             :         int         status;
    1420                 : 
    1421 CBC         168 :         status = parseWorkerResponse(AH, te, msg);
    1422             168 :         slot->callback(AH, te, status, slot->callback_data);
    1423 GIC         168 :         slot->workerStatus = WRKR_IDLE;
    1424             168 :         pstate->te[worker] = NULL;
    1425 EUB             :     }
    1426                 :     else
    1427 UIC           0 :         pg_fatal("invalid message received from worker: \"%s\"",
    1428                 :                  msg);
    1429 ECB             : 
    1430                 :     /* Free the string returned from getMessageFromWorker */
    1431 CBC         168 :     free(msg);
    1432                 : 
    1433 GIC         168 :     return true;
    1434                 : }
    1435                 : 
    1436                 : /*
    1437                 :  * Check for status results from workers, waiting if necessary.
    1438                 :  *
    1439                 :  * Available wait modes are:
    1440                 :  * WFW_NO_WAIT: reap any available status, but don't block
    1441                 :  * WFW_GOT_STATUS: wait for at least one more worker to finish
    1442                 :  * WFW_ONE_IDLE: wait for at least one worker to be idle
    1443                 :  * WFW_ALL_IDLE: wait for all workers to be idle
    1444                 :  *
    1445                 :  * Any received results are passed to the callback specified to
    1446                 :  * DispatchJobForTocEntry.
    1447                 :  *
    1448                 :  * This function is executed in the leader process.
    1449 ECB             :  */
    1450                 : void
    1451 CBC         170 : WaitForWorkers(ArchiveHandle *AH, ParallelState *pstate, WFW_WaitOption mode)
    1452                 : {
    1453 GIC         170 :     bool        do_wait = false;
    1454                 : 
    1455                 :     /*
    1456                 :      * In GOT_STATUS mode, always block waiting for a message, since we can't
    1457                 :      * return till we get something.  In other modes, we don't block the first
    1458 ECB             :      * time through the loop.
    1459                 :      */
    1460 GIC         170 :     if (mode == WFW_GOT_STATUS)
    1461 ECB             :     {
    1462                 :         /* Assert that caller knows what it's doing */
    1463 GIC          10 :         Assert(!IsEveryWorkerIdle(pstate));
    1464              10 :         do_wait = true;
    1465                 :     }
    1466                 : 
    1467                 :     for (;;)
    1468                 :     {
    1469                 :         /*
    1470                 :          * Check for status messages, even if we don't need to block.  We do
    1471                 :          * not try very hard to reap all available messages, though, since
    1472 ECB             :          * there's unlikely to be more than one.
    1473                 :          */
    1474 GIC         327 :         if (ListenToWorkers(AH, pstate, do_wait))
    1475                 :         {
    1476                 :             /*
    1477                 :              * If we got a message, we are done by definition for GOT_STATUS
    1478                 :              * mode, and we can also be certain that there's at least one idle
    1479 ECB             :              * worker.  So we're done in all but ALL_IDLE mode.
    1480                 :              */
    1481 GIC         168 :             if (mode != WFW_ALL_IDLE)
    1482             151 :                 return;
    1483                 :         }
    1484 ECB             : 
    1485                 :         /* Check whether we must wait for new status messages */
    1486 GBC         176 :         switch (mode)
    1487 EUB             :         {
    1488 UBC           0 :             case WFW_NO_WAIT:
    1489               0 :                 return;         /* never wait */
    1490 UIC           0 :             case WFW_GOT_STATUS:
    1491 LBC           0 :                 Assert(false);  /* can't get here, because we waited */
    1492 ECB             :                 break;
    1493 CBC         150 :             case WFW_ONE_IDLE:
    1494             150 :                 if (GetIdleWorker(pstate) != NO_SLOT)
    1495              10 :                     return;
    1496             140 :                 break;
    1497              26 :             case WFW_ALL_IDLE:
    1498              26 :                 if (IsEveryWorkerIdle(pstate))
    1499 GIC           9 :                     return;
    1500              17 :                 break;
    1501                 :         }
    1502 ECB             : 
    1503                 :         /* Loop back, and this time wait for something to happen */
    1504 GIC         157 :         do_wait = true;
    1505                 :     }
    1506                 : }
    1507                 : 
    1508                 : /*
    1509                 :  * Read one command message from the leader, blocking if necessary
    1510                 :  * until one is available, and return it as a malloc'd string.
    1511                 :  * On EOF, return NULL.
    1512                 :  *
    1513                 :  * This function is executed in worker processes.
    1514 ECB             :  */
    1515                 : static char *
    1516 CBC         196 : getMessageFromLeader(int pipefd[2])
    1517                 : {
    1518 GIC         196 :     return readMessageFromPipe(pipefd[PIPE_READ]);
    1519                 : }
    1520                 : 
    1521                 : /*
    1522                 :  * Send a status message to the leader.
    1523                 :  *
    1524                 :  * This function is executed in worker processes.
    1525 ECB             :  */
    1526                 : static void
    1527 CBC         168 : sendMessageToLeader(int pipefd[2], const char *str)
    1528                 : {
    1529             168 :     int         len = strlen(str) + 1;
    1530 EUB             : 
    1531 CBC         168 :     if (pipewrite(pipefd[PIPE_WRITE], str, len) != len)
    1532 UIC           0 :         pg_fatal("could not write to the communication channel: %m");
    1533 GIC         168 : }
    1534                 : 
    1535                 : /*
    1536                 :  * Wait until some descriptor in "workerset" becomes readable.
    1537                 :  * Returns -1 on error, else the number of readable descriptors.
    1538 ECB             :  */
    1539                 : static int
    1540 GIC         167 : select_loop(int maxFd, fd_set *workerset)
    1541 ECB             : {
    1542                 :     int         i;
    1543 GIC         167 :     fd_set      saveSet = *workerset;
    1544                 : 
    1545 ECB             :     for (;;)
    1546                 :     {
    1547 GIC         167 :         *workerset = saveSet;
    1548             167 :         i = select(maxFd + 1, workerset, NULL, NULL, NULL);
    1549 ECB             : 
    1550 EUB             : #ifndef WIN32
    1551 GIC         167 :         if (i < 0 && errno == EINTR)
    1552 UIC           0 :             continue;
    1553                 : #else
    1554                 :         if (i == SOCKET_ERROR && WSAGetLastError() == WSAEINTR)
    1555 ECB             :             continue;
    1556                 : #endif
    1557 GIC         167 :         break;
    1558 ECB             :     }
    1559                 : 
    1560 GIC         167 :     return i;
    1561                 : }
    1562                 : 
    1563                 : 
    1564                 : /*
    1565                 :  * Check for messages from worker processes.
    1566                 :  *
    1567                 :  * If a message is available, return it as a malloc'd string, and put the
    1568                 :  * index of the sending worker in *worker.
    1569                 :  *
    1570                 :  * If nothing is available, wait if "do_wait" is true, else return NULL.
    1571                 :  *
    1572                 :  * If we detect EOF on any socket, we'll return NULL.  It's not great that
    1573                 :  * that's hard to distinguish from the no-data-available case, but for now
    1574                 :  * our one caller is okay with that.
    1575                 :  *
    1576                 :  * This function is executed in the leader process.
    1577 ECB             :  */
    1578                 : static char *
    1579 GIC         327 : getMessageFromWorker(ParallelState *pstate, bool do_wait, int *worker)
    1580                 : {
    1581 ECB             :     int         i;
    1582                 :     fd_set      workerset;
    1583 GIC         327 :     int         maxFd = -1;
    1584             327 :     struct timeval nowait = {0, 0};
    1585 ECB             : 
    1586                 :     /* construct bitmap of socket descriptors for select() */
    1587 GIC        5559 :     FD_ZERO(&workerset);
    1588 CBC        1069 :     for (i = 0; i < pstate->numWorkers; i++)
    1589 EUB             :     {
    1590 CBC         742 :         if (!WORKER_IS_RUNNING(pstate->parallelSlot[i].workerStatus))
    1591 LBC           0 :             continue;
    1592 CBC         742 :         FD_SET(pstate->parallelSlot[i].pipeRead, &workerset);
    1593 GIC         742 :         if (pstate->parallelSlot[i].pipeRead > maxFd)
    1594             742 :             maxFd = pstate->parallelSlot[i].pipeRead;
    1595 ECB             :     }
    1596                 : 
    1597 CBC         327 :     if (do_wait)
    1598 ECB             :     {
    1599 GIC         167 :         i = select_loop(maxFd, &workerset);
    1600             167 :         Assert(i != 0);
    1601                 :     }
    1602 ECB             :     else
    1603                 :     {
    1604 GIC         160 :         if ((i = select(maxFd + 1, &workerset, NULL, NULL, &nowait)) == 0)
    1605             159 :             return NULL;
    1606 ECB             :     }
    1607 EUB             : 
    1608 GIC         168 :     if (i < 0)
    1609 LBC           0 :         pg_fatal("%s() failed: %m", "select");
    1610                 : 
    1611 GIC         289 :     for (i = 0; i < pstate->numWorkers; i++)
    1612                 :     {
    1613 ECB             :         char       *msg;
    1614 EUB             : 
    1615 CBC         289 :         if (!WORKER_IS_RUNNING(pstate->parallelSlot[i].workerStatus))
    1616 LBC           0 :             continue;
    1617 GIC         289 :         if (!FD_ISSET(pstate->parallelSlot[i].pipeRead, &workerset))
    1618             121 :             continue;
    1619                 : 
    1620                 :         /*
    1621                 :          * Read the message if any.  If the socket is ready because of EOF,
    1622                 :          * we'll return NULL instead (and the socket will stay ready, so the
    1623                 :          * condition will persist).
    1624                 :          *
    1625                 :          * Note: because this is a blocking read, we'll wait if only part of
    1626                 :          * the message is available.  Waiting a long time would be bad, but
    1627                 :          * since worker status messages are short and are always sent in one
    1628 ECB             :          * operation, it shouldn't be a problem in practice.
    1629                 :          */
    1630 CBC         168 :         msg = readMessageFromPipe(pstate->parallelSlot[i].pipeRead);
    1631 GIC         168 :         *worker = i;
    1632 GBC         168 :         return msg;
    1633                 :     }
    1634 UIC           0 :     Assert(false);
    1635                 :     return NULL;
    1636                 : }
    1637                 : 
    1638                 : /*
    1639                 :  * Send a command message to the specified worker process.
    1640                 :  *
    1641                 :  * This function is executed in the leader process.
    1642 ECB             :  */
    1643                 : static void
    1644 CBC         168 : sendMessageToWorker(ParallelState *pstate, int worker, const char *str)
    1645                 : {
    1646             168 :     int         len = strlen(str) + 1;
    1647                 : 
    1648 GBC         168 :     if (pipewrite(pstate->parallelSlot[worker].pipeWrite, str, len) != len)
    1649                 :     {
    1650 LBC           0 :         pg_fatal("could not write to the communication channel: %m");
    1651                 :     }
    1652 GIC         168 : }
    1653                 : 
    1654                 : /*
    1655                 :  * Read one message from the specified pipe (fd), blocking if necessary
    1656                 :  * until one is available, and return it as a malloc'd string.
    1657                 :  * On EOF, return NULL.
    1658                 :  *
    1659                 :  * A "message" on the channel is just a null-terminated string.
    1660 ECB             :  */
    1661                 : static char *
    1662 GIC         364 : readMessageFromPipe(int fd)
    1663                 : {
    1664                 :     char       *msg;
    1665                 :     int         msgsize,
    1666                 :                 bufsize;
    1667                 :     int         ret;
    1668                 : 
    1669                 :     /*
    1670                 :      * In theory, if we let piperead() read multiple bytes, it might give us
    1671                 :      * back fragments of multiple messages.  (That can't actually occur, since
    1672                 :      * neither leader nor workers send more than one message without waiting
    1673                 :      * for a reply, but we don't wish to assume that here.)  For simplicity,
    1674                 :      * read a byte at a time until we get the terminating '\0'.  This method
    1675                 :      * is a bit inefficient, but since this is only used for relatively short
    1676 ECB             :      * command and status strings, it shouldn't matter.
    1677                 :      */
    1678 CBC         364 :     bufsize = 64;               /* could be any number */
    1679 GIC         364 :     msg = (char *) pg_malloc(bufsize);
    1680             364 :     msgsize = 0;
    1681 ECB             :     for (;;)
    1682                 :     {
    1683 CBC        3862 :         Assert(msgsize < bufsize);
    1684            3862 :         ret = piperead(fd, msg + msgsize, 1);
    1685 GIC        3862 :         if (ret <= 0)
    1686 CBC          28 :             break;              /* error or connection closure */
    1687                 : 
    1688            3834 :         Assert(ret == 1);
    1689 ECB             : 
    1690 GIC        3834 :         if (msg[msgsize] == '\0')
    1691 CBC         336 :             return msg;         /* collected whole message */
    1692 ECB             : 
    1693 GIC        3498 :         msgsize++;
    1694 GBC        3498 :         if (msgsize == bufsize) /* enlarge buffer if needed */
    1695 EUB             :         {
    1696 UIC           0 :             bufsize += 16;      /* could be any number */
    1697               0 :             msg = (char *) pg_realloc(msg, bufsize);
    1698                 :         }
    1699                 :     }
    1700 ECB             : 
    1701                 :     /* Other end has closed the connection */
    1702 GIC          28 :     pg_free(msg);
    1703              28 :     return NULL;
    1704                 : }
    1705                 : 
    1706                 : #ifdef WIN32
    1707                 : 
    1708                 : /*
    1709                 :  * This is a replacement version of pipe(2) for Windows which allows the pipe
    1710                 :  * handles to be used in select().
    1711                 :  *
    1712                 :  * Reads and writes on the pipe must go through piperead()/pipewrite().
    1713                 :  *
    1714                 :  * For consistency with Unix we declare the returned handles as "int".
    1715                 :  * This is okay even on WIN64 because system handles are not more than
    1716                 :  * 32 bits wide, but we do have to do some casting.
    1717                 :  */
    1718                 : static int
    1719                 : pgpipe(int handles[2])
    1720                 : {
    1721                 :     pgsocket    s,
    1722                 :                 tmp_sock;
    1723                 :     struct sockaddr_in serv_addr;
    1724                 :     int         len = sizeof(serv_addr);
    1725                 : 
    1726                 :     /* We have to use the Unix socket invalid file descriptor value here. */
    1727                 :     handles[0] = handles[1] = -1;
    1728                 : 
    1729                 :     /*
    1730                 :      * setup listen socket
    1731                 :      */
    1732                 :     if ((s = socket(AF_INET, SOCK_STREAM, 0)) == PGINVALID_SOCKET)
    1733                 :     {
    1734                 :         pg_log_error("pgpipe: could not create socket: error code %d",
    1735                 :                      WSAGetLastError());
    1736                 :         return -1;
    1737                 :     }
    1738                 : 
    1739                 :     memset(&serv_addr, 0, sizeof(serv_addr));
    1740                 :     serv_addr.sin_family = AF_INET;
    1741                 :     serv_addr.sin_port = pg_hton16(0);
    1742                 :     serv_addr.sin_addr.s_addr = pg_hton32(INADDR_LOOPBACK);
    1743                 :     if (bind(s, (SOCKADDR *) &serv_addr, len) == SOCKET_ERROR)
    1744                 :     {
    1745                 :         pg_log_error("pgpipe: could not bind: error code %d",
    1746                 :                      WSAGetLastError());
    1747                 :         closesocket(s);
    1748                 :         return -1;
    1749                 :     }
    1750                 :     if (listen(s, 1) == SOCKET_ERROR)
    1751                 :     {
    1752                 :         pg_log_error("pgpipe: could not listen: error code %d",
    1753                 :                      WSAGetLastError());
    1754                 :         closesocket(s);
    1755                 :         return -1;
    1756                 :     }
    1757                 :     if (getsockname(s, (SOCKADDR *) &serv_addr, &len) == SOCKET_ERROR)
    1758                 :     {
    1759                 :         pg_log_error("pgpipe: %s() failed: error code %d", "getsockname",
    1760                 :                      WSAGetLastError());
    1761                 :         closesocket(s);
    1762                 :         return -1;
    1763                 :     }
    1764                 : 
    1765                 :     /*
    1766                 :      * setup pipe handles
    1767                 :      */
    1768                 :     if ((tmp_sock = socket(AF_INET, SOCK_STREAM, 0)) == PGINVALID_SOCKET)
    1769                 :     {
    1770                 :         pg_log_error("pgpipe: could not create second socket: error code %d",
    1771                 :                      WSAGetLastError());
    1772                 :         closesocket(s);
    1773                 :         return -1;
    1774                 :     }
    1775                 :     handles[1] = (int) tmp_sock;
    1776                 : 
    1777                 :     if (connect(handles[1], (SOCKADDR *) &serv_addr, len) == SOCKET_ERROR)
    1778                 :     {
    1779                 :         pg_log_error("pgpipe: could not connect socket: error code %d",
    1780                 :                      WSAGetLastError());
    1781                 :         closesocket(handles[1]);
    1782                 :         handles[1] = -1;
    1783                 :         closesocket(s);
    1784                 :         return -1;
    1785                 :     }
    1786                 :     if ((tmp_sock = accept(s, (SOCKADDR *) &serv_addr, &len)) == PGINVALID_SOCKET)
    1787                 :     {
    1788                 :         pg_log_error("pgpipe: could not accept connection: error code %d",
    1789                 :                      WSAGetLastError());
    1790                 :         closesocket(handles[1]);
    1791                 :         handles[1] = -1;
    1792                 :         closesocket(s);
    1793                 :         return -1;
    1794                 :     }
    1795                 :     handles[0] = (int) tmp_sock;
    1796                 : 
    1797                 :     closesocket(s);
    1798                 :     return 0;
    1799                 : }
    1800                 : 
    1801                 : #endif                          /* WIN32 */
        

Generated by: LCOV version v1.16-55-g56c0a2a