Age Owner TLA Line data Source code
1 : /*-------------------------------------------------------------------------
2 : *
3 : * parallel.c
4 : *
5 : * Parallel support for pg_dump and pg_restore
6 : *
7 : * Portions Copyright (c) 1996-2023, PostgreSQL Global Development Group
8 : * Portions Copyright (c) 1994, Regents of the University of California
9 : *
10 : * IDENTIFICATION
11 : * src/bin/pg_dump/parallel.c
12 : *
13 : *-------------------------------------------------------------------------
14 : */
15 :
16 : /*
17 : * Parallel operation works like this:
18 : *
19 : * The original, leader process calls ParallelBackupStart(), which forks off
20 : * the desired number of worker processes, which each enter WaitForCommands().
21 : *
22 : * The leader process dispatches an individual work item to one of the worker
23 : * processes in DispatchJobForTocEntry(). We send a command string such as
24 : * "DUMP 1234" or "RESTORE 1234", where 1234 is the TocEntry ID.
25 : * The worker process receives and decodes the command and passes it to the
26 : * routine pointed to by AH->WorkerJobDumpPtr or AH->WorkerJobRestorePtr,
27 : * which are routines of the current archive format. That routine performs
28 : * the required action (dump or restore) and returns an integer status code.
29 : * This is passed back to the leader where we pass it to the
30 : * ParallelCompletionPtr callback function that was passed to
31 : * DispatchJobForTocEntry(). The callback function does state updating
32 : * for the leader control logic in pg_backup_archiver.c.
33 : *
34 : * In principle additional archive-format-specific information might be needed
35 : * in commands or worker status responses, but so far that hasn't proved
36 : * necessary, since workers have full copies of the ArchiveHandle/TocEntry
37 : * data structures. Remember that we have forked off the workers only after
38 : * we have read in the catalog. That's why our worker processes can also
39 : * access the catalog information. (In the Windows case, the workers are
40 : * threads in the same process. To avoid problems, they work with cloned
41 : * copies of the Archive data structure; see RunWorker().)
42 : *
43 : * In the leader process, the workerStatus field for each worker has one of
44 : * the following values:
45 : * WRKR_NOT_STARTED: we've not yet forked this worker
46 : * WRKR_IDLE: it's waiting for a command
47 : * WRKR_WORKING: it's working on a command
48 : * WRKR_TERMINATED: process ended
49 : * The pstate->te[] entry for each worker is valid when it's in WRKR_WORKING
50 : * state, and must be NULL in other states.
51 : */
52 :
53 : #include "postgres_fe.h"
54 :
55 : #ifndef WIN32
56 : #include <sys/select.h>
57 : #include <sys/wait.h>
58 : #include <signal.h>
59 : #include <unistd.h>
60 : #include <fcntl.h>
61 : #endif
62 :
63 : #include "fe_utils/string_utils.h"
64 : #include "parallel.h"
65 : #include "pg_backup_utils.h"
66 : #include "port/pg_bswap.h"
67 :
68 : /* Mnemonic macros for indexing the fd array returned by pipe(2) */
69 : #define PIPE_READ 0
70 : #define PIPE_WRITE 1
71 :
72 : #define NO_SLOT (-1) /* Failure result for GetIdleWorker() */
73 :
74 : /* Worker process statuses */
75 : typedef enum
76 : {
77 : WRKR_NOT_STARTED = 0,
78 : WRKR_IDLE,
79 : WRKR_WORKING,
80 : WRKR_TERMINATED
81 : } T_WorkerStatus;
82 :
83 : #define WORKER_IS_RUNNING(workerStatus) \
84 : ((workerStatus) == WRKR_IDLE || (workerStatus) == WRKR_WORKING)
85 :
86 : /*
87 : * Private per-parallel-worker state (typedef for this is in parallel.h).
88 : *
89 : * Much of this is valid only in the leader process (or, on Windows, should
90 : * be touched only by the leader thread). But the AH field should be touched
91 : * only by workers. The pipe descriptors are valid everywhere.
92 : */
93 : struct ParallelSlot
94 : {
95 : T_WorkerStatus workerStatus; /* see enum above */
96 :
97 : /* These fields are valid if workerStatus == WRKR_WORKING: */
98 : ParallelCompletionPtr callback; /* function to call on completion */
99 : void *callback_data; /* passthrough data for it */
100 :
101 : ArchiveHandle *AH; /* Archive data worker is using */
102 :
103 : int pipeRead; /* leader's end of the pipes */
104 : int pipeWrite;
105 : int pipeRevRead; /* child's end of the pipes */
106 : int pipeRevWrite;
107 :
108 : /* Child process/thread identity info: */
109 : #ifdef WIN32
110 : uintptr_t hThread;
111 : unsigned int threadId;
112 : #else
113 : pid_t pid;
114 : #endif
115 : };
116 :
117 : #ifdef WIN32
118 :
119 : /*
120 : * Structure to hold info passed by _beginthreadex() to the function it calls
121 : * via its single allowed argument.
122 : */
123 : typedef struct
124 : {
125 : ArchiveHandle *AH; /* leader database connection */
126 : ParallelSlot *slot; /* this worker's parallel slot */
127 : } WorkerInfo;
128 :
129 : /* Windows implementation of pipe access */
130 : static int pgpipe(int handles[2]);
131 : #define piperead(a,b,c) recv(a,b,c,0)
132 : #define pipewrite(a,b,c) send(a,b,c,0)
133 :
134 : #else /* !WIN32 */
135 :
136 : /* Non-Windows implementation of pipe access */
137 : #define pgpipe(a) pipe(a)
138 : #define piperead(a,b,c) read(a,b,c)
139 : #define pipewrite(a,b,c) write(a,b,c)
140 :
141 : #endif /* WIN32 */
142 :
143 : /*
144 : * State info for archive_close_connection() shutdown callback.
145 : */
146 : typedef struct ShutdownInformation
147 : {
148 : ParallelState *pstate;
149 : Archive *AHX;
150 : } ShutdownInformation;
151 :
152 : static ShutdownInformation shutdown_info;
153 :
154 : /*
155 : * State info for signal handling.
156 : * We assume signal_info initializes to zeroes.
157 : *
158 : * On Unix, myAH is the leader DB connection in the leader process, and the
159 : * worker's own connection in worker processes. On Windows, we have only one
160 : * instance of signal_info, so myAH is the leader connection and the worker
161 : * connections must be dug out of pstate->parallelSlot[].
162 : */
163 : typedef struct DumpSignalInformation
164 : {
165 : ArchiveHandle *myAH; /* database connection to issue cancel for */
166 : ParallelState *pstate; /* parallel state, if any */
167 : bool handler_set; /* signal handler set up in this process? */
168 : #ifndef WIN32
169 : bool am_worker; /* am I a worker process? */
170 : #endif
171 : } DumpSignalInformation;
172 :
173 : static volatile DumpSignalInformation signal_info;
174 :
175 : #ifdef WIN32
176 : static CRITICAL_SECTION signal_info_lock;
177 : #endif
178 :
179 : /*
180 : * Write a simple string to stderr --- must be safe in a signal handler.
181 : * We ignore the write() result since there's not much we could do about it.
182 : * Certain compilers make that harder than it ought to be.
183 : */
184 : #define write_stderr(str) \
185 : do { \
186 : const char *str_ = (str); \
187 : int rc_; \
188 : rc_ = write(fileno(stderr), str_, strlen(str_)); \
189 : (void) rc_; \
190 : } while (0)
191 :
192 :
193 : #ifdef WIN32
194 : /* file-scope variables */
195 : static DWORD tls_index;
196 :
197 : /* globally visible variables (needed by exit_nicely) */
198 : bool parallel_init_done = false;
199 : DWORD mainThreadId;
200 : #endif /* WIN32 */
201 :
202 : /* Local function prototypes */
203 : static ParallelSlot *GetMyPSlot(ParallelState *pstate);
204 : static void archive_close_connection(int code, void *arg);
205 : static void ShutdownWorkersHard(ParallelState *pstate);
206 : static void WaitForTerminatingWorkers(ParallelState *pstate);
207 : static void setup_cancel_handler(void);
208 : static void set_cancel_pstate(ParallelState *pstate);
209 : static void set_cancel_slot_archive(ParallelSlot *slot, ArchiveHandle *AH);
210 : static void RunWorker(ArchiveHandle *AH, ParallelSlot *slot);
211 : static int GetIdleWorker(ParallelState *pstate);
212 : static bool HasEveryWorkerTerminated(ParallelState *pstate);
213 : static void lockTableForWorker(ArchiveHandle *AH, TocEntry *te);
214 : static void WaitForCommands(ArchiveHandle *AH, int pipefd[2]);
215 : static bool ListenToWorkers(ArchiveHandle *AH, ParallelState *pstate,
216 : bool do_wait);
217 : static char *getMessageFromLeader(int pipefd[2]);
218 : static void sendMessageToLeader(int pipefd[2], const char *str);
219 : static int select_loop(int maxFd, fd_set *workerset);
220 : static char *getMessageFromWorker(ParallelState *pstate,
221 : bool do_wait, int *worker);
222 : static void sendMessageToWorker(ParallelState *pstate,
223 : int worker, const char *str);
224 : static char *readMessageFromPipe(int fd);
225 :
226 : #define messageStartsWith(msg, prefix) \
227 : (strncmp(msg, prefix, strlen(prefix)) == 0)
228 :
229 :
230 : /*
231 : * Initialize parallel dump support --- should be called early in process
232 : * startup. (Currently, this is called whether or not we intend parallel
233 : * activity.)
2507 tgl 234 ECB : */
235 : void
3665 heikki.linnakangas 236 GIC 227 : init_parallel_dump_utils(void)
237 : {
238 : #ifdef WIN32
239 : if (!parallel_init_done)
240 : {
241 : WSADATA wsaData;
242 : int err;
243 :
244 : /* Prepare for threaded operation */
245 : tls_index = TlsAlloc();
246 : mainThreadId = GetCurrentThreadId();
247 :
248 : /* Initialize socket access */
249 : err = WSAStartup(MAKEWORD(2, 2), &wsaData);
250 : if (err != 0)
251 : pg_fatal("%s() failed: error code %d", "WSAStartup", err);
252 :
253 : parallel_init_done = true;
3665 heikki.linnakangas 254 ECB : }
255 : #endif
3665 heikki.linnakangas 256 GIC 227 : }
257 :
258 : /*
259 : * Find the ParallelSlot for the current worker process or thread.
260 : *
261 : * Returns NULL if no matching slot is found (this implies we're the leader).
2507 tgl 262 EUB : */
263 : static ParallelSlot *
3668 andrew 264 UIC 0 : GetMyPSlot(ParallelState *pstate)
265 : {
3668 andrew 266 EUB : int i;
267 :
3668 andrew 268 UIC 0 : for (i = 0; i < pstate->numWorkers; i++)
269 : {
270 : #ifdef WIN32
3668 andrew 271 EUB : if (pstate->parallelSlot[i].threadId == GetCurrentThreadId())
272 : #else
3668 andrew 273 UBC 0 : if (pstate->parallelSlot[i].pid == getpid())
274 : #endif
3668 andrew 275 UIC 0 : return &(pstate->parallelSlot[i]);
2507 tgl 276 EUB : }
277 :
3668 andrew 278 UIC 0 : return NULL;
279 : }
280 :
281 : /*
282 : * A thread-local version of getLocalPQExpBuffer().
283 : *
284 : * Non-reentrant but reduces memory leakage: we'll consume one buffer per
285 : * thread, which is much better than one per fmtId/fmtQualifiedId call.
286 : */
287 : #ifdef WIN32
288 : static PQExpBuffer
289 : getThreadLocalPQExpBuffer(void)
290 : {
291 : /*
292 : * The Tls code goes awry if we use a static var, so we provide for both
293 : * static and auto, and omit any use of the static var when using Tls. We
294 : * rely on TlsGetValue() to return 0 if the value is not yet set.
295 : */
296 : static PQExpBuffer s_id_return = NULL;
297 : PQExpBuffer id_return;
298 :
299 : if (parallel_init_done)
300 : id_return = (PQExpBuffer) TlsGetValue(tls_index);
301 : else
302 : id_return = s_id_return;
303 :
304 : if (id_return) /* first time through? */
305 : {
306 : /* same buffer, just wipe contents */
307 : resetPQExpBuffer(id_return);
308 : }
309 : else
310 : {
311 : /* new buffer */
312 : id_return = createPQExpBuffer();
313 : if (parallel_init_done)
314 : TlsSetValue(tls_index, id_return);
315 : else
316 : s_id_return = id_return;
317 : }
318 :
319 : return id_return;
320 : }
321 : #endif /* WIN32 */
322 :
323 : /*
324 : * pg_dump and pg_restore call this to register the cleanup handler
325 : * as soon as they've created the ArchiveHandle.
3668 andrew 326 ECB : */
327 : void
3668 andrew 328 CBC 163 : on_exit_close_archive(Archive *AHX)
3668 andrew 329 ECB : {
3668 andrew 330 CBC 163 : shutdown_info.AHX = AHX;
3668 andrew 331 GIC 163 : on_exit_nicely(archive_close_connection, &shutdown_info);
332 163 : }
333 :
334 : /*
335 : * on_exit_nicely handler for shutting down database connections and
336 : * worker processes cleanly.
3668 andrew 337 ECB : */
338 : static void
3668 andrew 339 CBC 130 : archive_close_connection(int code, void *arg)
340 : {
341 130 : ShutdownInformation *si = (ShutdownInformation *) arg;
342 :
3668 andrew 343 GIC 130 : if (si->pstate)
3668 andrew 344 EUB : {
345 : /* In parallel mode, must figure out who we are */
3668 andrew 346 UBC 0 : ParallelSlot *slot = GetMyPSlot(si->pstate);
347 :
3668 andrew 348 UIC 0 : if (!slot)
349 : {
350 : /*
351 : * We're the leader. Forcibly shut down workers, then close our
2502 tgl 352 EUB : * own database connection, if any.
353 : */
2502 tgl 354 UBC 0 : ShutdownWorkersHard(si->pstate);
2502 tgl 355 EUB :
2510 tgl 356 UIC 0 : if (si->AHX)
357 0 : DisconnectDatabase(si->AHX);
358 : }
359 : else
360 : {
361 : /*
362 : * We're a worker. Shut down our own DB connection if any. On
363 : * Windows, we also have to close our communication sockets, to
364 : * emulate what will happen on Unix when the worker process exits.
365 : * (Without this, if this is a premature exit, the leader would
366 : * fail to detect it because there would be no EOF condition on
2510 tgl 367 EUB : * the other end of the pipe.)
368 : */
2385 tgl 369 UIC 0 : if (slot->AH)
370 0 : DisconnectDatabase(&(slot->AH->public));
371 :
372 : #ifdef WIN32
373 : closesocket(slot->pipeRevRead);
374 : closesocket(slot->pipeRevWrite);
375 : #endif
376 : }
377 : }
378 : else
2510 tgl 379 ECB : {
1029 andres 380 : /* Non-parallel operation: just kill the leader DB connection */
2510 tgl 381 GIC 130 : if (si->AHX)
2510 tgl 382 CBC 130 : DisconnectDatabase(si->AHX);
383 : }
3668 andrew 384 GIC 130 : }
385 :
386 : /*
387 : * Forcibly shut down any remaining workers, waiting for them to finish.
388 : *
389 : * Note that we don't expect to come here during normal exit (the workers
390 : * should be long gone, and the ParallelState too). We're only here in a
391 : * pg_fatal() situation, so intervening to cancel active commands is
392 : * appropriate.
3668 andrew 393 EUB : */
394 : static void
3668 andrew 395 UIC 0 : ShutdownWorkersHard(ParallelState *pstate)
396 : {
397 : int i;
398 :
399 : /*
400 : * Close our write end of the sockets so that any workers waiting for
401 : * commands know they can exit. (Note: some of the pipeWrite fields might
402 : * still be zero, if we failed to initialize all the workers. Hence, just
1164 tgl 403 EUB : * ignore errors here.)
3668 andrew 404 : */
3668 andrew 405 UIC 0 : for (i = 0; i < pstate->numWorkers; i++)
406 0 : closesocket(pstate->parallelSlot[i].pipeWrite);
407 :
408 : /*
409 : * Force early termination of any commands currently in progress.
410 : */
2509 tgl 411 EUB : #ifndef WIN32
412 : /* On non-Windows, send SIGTERM to each worker process. */
3668 andrew 413 UBC 0 : for (i = 0; i < pstate->numWorkers; i++)
414 : {
2502 tgl 415 0 : pid_t pid = pstate->parallelSlot[i].pid;
2502 tgl 416 EUB :
2502 tgl 417 UIC 0 : if (pid != 0)
418 0 : kill(pid, SIGTERM);
419 : }
420 : #else
421 :
422 : /*
423 : * On Windows, send query cancels directly to the workers' backends. Use
424 : * a critical section to ensure worker threads don't change state.
425 : */
426 : EnterCriticalSection(&signal_info_lock);
427 : for (i = 0; i < pstate->numWorkers; i++)
428 : {
429 : ArchiveHandle *AH = pstate->parallelSlot[i].AH;
430 : char errbuf[1];
431 :
432 : if (AH != NULL && AH->connCancel != NULL)
433 : (void) PQcancel(AH->connCancel, errbuf, sizeof(errbuf));
434 : }
435 : LeaveCriticalSection(&signal_info_lock);
436 : #endif
3668 andrew 437 EUB :
2502 tgl 438 : /* Now wait for them to terminate. */
3668 andrew 439 UIC 0 : WaitForTerminatingWorkers(pstate);
440 0 : }
441 :
442 : /*
443 : * Wait for all workers to terminate.
3668 andrew 444 ECB : */
445 : static void
3668 andrew 446 CBC 13 : WaitForTerminatingWorkers(ParallelState *pstate)
447 : {
448 41 : while (!HasEveryWorkerTerminated(pstate))
449 : {
3668 andrew 450 GIC 28 : ParallelSlot *slot = NULL;
451 : int j;
452 :
453 : #ifndef WIN32
2508 tgl 454 ECB : /* On non-Windows, use wait() to wait for next worker to end */
455 : int status;
3668 andrew 456 GIC 28 : pid_t pid = wait(&status);
3668 andrew 457 ECB :
458 : /* Find dead worker's slot, and clear the PID field */
3668 andrew 459 CBC 45 : for (j = 0; j < pstate->numWorkers; j++)
2508 tgl 460 ECB : {
2508 tgl 461 GIC 45 : slot = &(pstate->parallelSlot[j]);
2508 tgl 462 CBC 45 : if (slot->pid == pid)
2508 tgl 463 ECB : {
2508 tgl 464 GIC 28 : slot->pid = 0;
465 28 : break;
466 : }
467 : }
468 : #else /* WIN32 */
469 : /* On Windows, we must use WaitForMultipleObjects() */
470 : HANDLE *lpHandles = pg_malloc(sizeof(HANDLE) * pstate->numWorkers);
471 : int nrun = 0;
472 : DWORD ret;
473 : uintptr_t hThread;
474 :
475 : for (j = 0; j < pstate->numWorkers; j++)
476 : {
477 : if (WORKER_IS_RUNNING(pstate->parallelSlot[j].workerStatus))
478 : {
479 : lpHandles[nrun] = (HANDLE) pstate->parallelSlot[j].hThread;
480 : nrun++;
481 : }
482 : }
483 : ret = WaitForMultipleObjects(nrun, lpHandles, false, INFINITE);
484 : Assert(ret != WAIT_FAILED);
485 : hThread = (uintptr_t) lpHandles[ret - WAIT_OBJECT_0];
486 : free(lpHandles);
487 :
488 : /* Find dead worker's slot, and clear the hThread field */
489 : for (j = 0; j < pstate->numWorkers; j++)
490 : {
491 : slot = &(pstate->parallelSlot[j]);
492 : if (slot->hThread == hThread)
493 : {
494 : /* For cleanliness, close handles for dead threads */
495 : CloseHandle((HANDLE) slot->hThread);
496 : slot->hThread = (uintptr_t) INVALID_HANDLE_VALUE;
497 : break;
498 : }
499 : }
500 : #endif /* WIN32 */
3668 andrew 501 ECB :
2385 tgl 502 : /* On all platforms, update workerStatus and te[] as well */
2508 tgl 503 CBC 28 : Assert(j < pstate->numWorkers);
3668 andrew 504 GIC 28 : slot->workerStatus = WRKR_TERMINATED;
2385 tgl 505 CBC 28 : pstate->te[j] = NULL;
506 : }
3668 andrew 507 GIC 13 : }
508 :
509 :
510 : /*
511 : * Code for responding to cancel interrupts (SIGINT, control-C, etc)
512 : *
513 : * This doesn't quite belong in this module, but it needs access to the
514 : * ParallelState data, so there's not really a better place either.
515 : *
516 : * When we get a cancel interrupt, we could just die, but in pg_restore that
517 : * could leave a SQL command (e.g., CREATE INDEX on a large table) running
518 : * for a long time. Instead, we try to send a cancel request and then die.
519 : * pg_dump probably doesn't really need this, but we might as well use it
520 : * there too. Note that sending the cancel directly from the signal handler
521 : * is safe because PQcancel() is written to make it so.
522 : *
523 : * In parallel operation on Unix, each process is responsible for canceling
524 : * its own connection (this must be so because nobody else has access to it).
525 : * Furthermore, the leader process should attempt to forward its signal to
526 : * each child. In simple manual use of pg_dump/pg_restore, forwarding isn't
527 : * needed because typing control-C at the console would deliver SIGINT to
528 : * every member of the terminal process group --- but in other scenarios it
529 : * might be that only the leader gets signaled.
530 : *
531 : * On Windows, the cancel handler runs in a separate thread, because that's
532 : * how SetConsoleCtrlHandler works. We make it stop worker threads, send
533 : * cancels on all active connections, and then return FALSE, which will allow
534 : * the process to die. For safety's sake, we use a critical section to
535 : * protect the PGcancel structures against being changed while the signal
536 : * thread runs.
537 : */
538 :
539 : #ifndef WIN32
540 :
541 : /*
542 : * Signal handler (Unix only)
2502 tgl 543 EUB : */
544 : static void
2507 tgl 545 UIC 0 : sigTermHandler(SIGNAL_ARGS)
546 : {
547 : int i;
548 : char errbuf[1];
549 :
550 : /*
551 : * Some platforms allow delivery of new signals to interrupt an active
552 : * signal handler. That could muck up our attempt to send PQcancel, so
2502 tgl 553 EUB : * disable the signals that setup_cancel_handler enabled.
554 : */
2502 tgl 555 UBC 0 : pqsignal(SIGINT, SIG_IGN);
2502 tgl 556 UIC 0 : pqsignal(SIGTERM, SIG_IGN);
557 0 : pqsignal(SIGQUIT, SIG_IGN);
558 :
559 : /*
560 : * If we're in the leader, forward signal to all workers. (It seems best
561 : * to do this before PQcancel; killing the leader transaction will result
562 : * in invalid-snapshot errors from active workers, which maybe we can
2502 tgl 563 EUB : * quiet by killing workers first.) Ignore any errors.
564 : */
2502 tgl 565 UBC 0 : if (signal_info.pstate != NULL)
566 : {
567 0 : for (i = 0; i < signal_info.pstate->numWorkers; i++)
568 : {
569 0 : pid_t pid = signal_info.pstate->parallelSlot[i].pid;
2502 tgl 570 EUB :
2502 tgl 571 UIC 0 : if (pid != 0)
572 0 : kill(pid, SIGTERM);
573 : }
574 : }
575 :
576 : /*
577 : * Send QueryCancel if we have a connection to send to. Ignore errors,
2502 tgl 578 EUB : * there's not much we can do about them anyway.
579 : */
2502 tgl 580 UIC 0 : if (signal_info.myAH != NULL && signal_info.myAH->connCancel != NULL)
581 0 : (void) PQcancel(signal_info.myAH->connCancel, errbuf, sizeof(errbuf));
582 :
583 : /*
584 : * Report we're quitting, using nothing more complicated than write(2).
1029 andres 585 EUB : * When in parallel operation, only the leader process should do this.
586 : */
2502 tgl 587 UBC 0 : if (!signal_info.am_worker)
588 : {
589 0 : if (progname)
2502 tgl 590 EUB : {
2502 tgl 591 UIC 0 : write_stderr(progname);
2502 tgl 592 UBC 0 : write_stderr(": ");
593 : }
2502 tgl 594 UIC 0 : write_stderr("terminated by user\n");
595 : }
596 :
597 : /*
598 : * And die, using _exit() not exit() because the latter will invoke atexit
1175 tgl 599 EUB : * handlers that can fail if we interrupted related code.
600 : */
1175 tgl 601 UIC 0 : _exit(1);
602 : }
603 :
604 : /*
605 : * Enable cancel interrupt handler, if not already done.
2502 tgl 606 ECB : */
607 : static void
2502 tgl 608 GIC 383 : setup_cancel_handler(void)
609 : {
610 : /*
611 : * When forking, signal_info.handler_set will propagate into the new
2502 tgl 612 ECB : * process, but that's fine because the signal handler state does too.
613 : */
2502 tgl 614 CBC 383 : if (!signal_info.handler_set)
615 : {
616 141 : signal_info.handler_set = true;
2502 tgl 617 ECB :
2502 tgl 618 CBC 141 : pqsignal(SIGINT, sigTermHandler);
2502 tgl 619 GIC 141 : pqsignal(SIGTERM, sigTermHandler);
2502 tgl 620 CBC 141 : pqsignal(SIGQUIT, sigTermHandler);
621 : }
2502 tgl 622 GIC 383 : }
623 :
624 : #else /* WIN32 */
625 :
626 : /*
627 : * Console interrupt handler --- runs in a newly-started thread.
628 : *
629 : * After stopping other threads and sending cancel requests on all open
630 : * connections, we return FALSE which will allow the default ExitProcess()
631 : * action to be taken.
632 : */
633 : static BOOL WINAPI
634 : consoleHandler(DWORD dwCtrlType)
635 : {
636 : int i;
637 : char errbuf[1];
638 :
639 : if (dwCtrlType == CTRL_C_EVENT ||
640 : dwCtrlType == CTRL_BREAK_EVENT)
641 : {
642 : /* Critical section prevents changing data we look at here */
643 : EnterCriticalSection(&signal_info_lock);
644 :
645 : /*
646 : * If in parallel mode, stop worker threads and send QueryCancel to
647 : * their connected backends. The main point of stopping the worker
648 : * threads is to keep them from reporting the query cancels as errors,
649 : * which would clutter the user's screen. We needn't stop the leader
650 : * thread since it won't be doing much anyway. Do this before
651 : * canceling the main transaction, else we might get invalid-snapshot
652 : * errors reported before we can stop the workers. Ignore errors,
653 : * there's not much we can do about them anyway.
654 : */
655 : if (signal_info.pstate != NULL)
656 : {
657 : for (i = 0; i < signal_info.pstate->numWorkers; i++)
658 : {
659 : ParallelSlot *slot = &(signal_info.pstate->parallelSlot[i]);
660 : ArchiveHandle *AH = slot->AH;
661 : HANDLE hThread = (HANDLE) slot->hThread;
662 :
663 : /*
664 : * Using TerminateThread here may leave some resources leaked,
665 : * but it doesn't matter since we're about to end the whole
666 : * process.
667 : */
668 : if (hThread != INVALID_HANDLE_VALUE)
669 : TerminateThread(hThread, 0);
670 :
671 : if (AH != NULL && AH->connCancel != NULL)
672 : (void) PQcancel(AH->connCancel, errbuf, sizeof(errbuf));
673 : }
674 : }
675 :
676 : /*
677 : * Send QueryCancel to leader connection, if enabled. Ignore errors,
678 : * there's not much we can do about them anyway.
679 : */
680 : if (signal_info.myAH != NULL && signal_info.myAH->connCancel != NULL)
681 : (void) PQcancel(signal_info.myAH->connCancel,
682 : errbuf, sizeof(errbuf));
683 :
684 : LeaveCriticalSection(&signal_info_lock);
685 :
686 : /*
687 : * Report we're quitting, using nothing more complicated than
688 : * write(2). (We might be able to get away with using pg_log_*()
689 : * here, but since we terminated other threads uncleanly above, it
690 : * seems better to assume as little as possible.)
691 : */
692 : if (progname)
693 : {
694 : write_stderr(progname);
695 : write_stderr(": ");
696 : }
697 : write_stderr("terminated by user\n");
698 : }
699 :
700 : /* Always return FALSE to allow signal handling to continue */
701 : return FALSE;
702 : }
703 :
704 : /*
705 : * Enable cancel interrupt handler, if not already done.
706 : */
707 : static void
708 : setup_cancel_handler(void)
709 : {
710 : if (!signal_info.handler_set)
711 : {
712 : signal_info.handler_set = true;
713 :
714 : InitializeCriticalSection(&signal_info_lock);
715 :
716 : SetConsoleCtrlHandler(consoleHandler, TRUE);
717 : }
718 : }
719 :
720 : #endif /* WIN32 */
721 :
722 :
723 : /*
724 : * set_archive_cancel_info
725 : *
726 : * Fill AH->connCancel with cancellation info for the specified database
727 : * connection; or clear it if conn is NULL.
2502 tgl 728 ECB : */
729 : void
2502 tgl 730 GIC 383 : set_archive_cancel_info(ArchiveHandle *AH, PGconn *conn)
731 : {
732 : PGcancel *oldConnCancel;
733 :
734 : /*
735 : * Activate the interrupt handler if we didn't yet in this process. On
736 : * Windows, this also initializes signal_info_lock; therefore it's
737 : * important that this happen at least once before we fork off any
2502 tgl 738 ECB : * threads.
739 : */
2502 tgl 740 GIC 383 : setup_cancel_handler();
741 :
742 : /*
743 : * On Unix, we assume that storing a pointer value is atomic with respect
744 : * to any possible signal interrupt. On Windows, use a critical section.
745 : */
746 :
747 : #ifdef WIN32
748 : EnterCriticalSection(&signal_info_lock);
749 : #endif
2502 tgl 750 ECB :
751 : /* Free the old one if we have one */
2502 tgl 752 CBC 383 : oldConnCancel = AH->connCancel;
753 : /* be sure interrupt handler doesn't use pointer while freeing */
754 383 : AH->connCancel = NULL;
2502 tgl 755 ECB :
2502 tgl 756 GIC 383 : if (oldConnCancel != NULL)
757 193 : PQfreeCancel(oldConnCancel);
2502 tgl 758 ECB :
759 : /* Set the new one if specified */
2502 tgl 760 GIC 383 : if (conn)
761 195 : AH->connCancel = PQgetCancel(conn);
762 :
763 : /*
764 : * On Unix, there's only ever one active ArchiveHandle per process, so we
765 : * can just set signal_info.myAH unconditionally. On Windows, do that
766 : * only in the main thread; worker threads have to make sure their
767 : * ArchiveHandle appears in the pstate data, which is dealt with in
768 : * RunWorker().
2502 tgl 769 ECB : */
770 : #ifndef WIN32
2502 tgl 771 GIC 383 : signal_info.myAH = AH;
772 : #else
773 : if (mainThreadId == GetCurrentThreadId())
774 : signal_info.myAH = AH;
775 : #endif
776 :
777 : #ifdef WIN32
2502 tgl 778 ECB : LeaveCriticalSection(&signal_info_lock);
779 : #endif
2502 tgl 780 GIC 383 : }
781 :
782 : /*
783 : * set_cancel_pstate
784 : *
785 : * Set signal_info.pstate to point to the specified ParallelState, if any.
786 : * We need this mainly to have an interlock against Windows signal thread.
2502 tgl 787 ECB : */
788 : static void
2502 tgl 789 GIC 26 : set_cancel_pstate(ParallelState *pstate)
790 : {
791 : #ifdef WIN32
792 : EnterCriticalSection(&signal_info_lock);
2502 tgl 793 ECB : #endif
794 :
2502 tgl 795 GIC 26 : signal_info.pstate = pstate;
796 :
797 : #ifdef WIN32
2502 tgl 798 ECB : LeaveCriticalSection(&signal_info_lock);
799 : #endif
2502 tgl 800 GIC 26 : }
801 :
802 : /*
803 : * set_cancel_slot_archive
804 : *
805 : * Set ParallelSlot's AH field to point to the specified archive, if any.
806 : * We need this mainly to have an interlock against Windows signal thread.
2502 tgl 807 ECB : */
808 : static void
2502 tgl 809 GIC 56 : set_cancel_slot_archive(ParallelSlot *slot, ArchiveHandle *AH)
810 : {
811 : #ifdef WIN32
812 : EnterCriticalSection(&signal_info_lock);
2502 tgl 813 ECB : #endif
814 :
2385 tgl 815 GIC 56 : slot->AH = AH;
816 :
817 : #ifdef WIN32
2502 tgl 818 ECB : LeaveCriticalSection(&signal_info_lock);
819 : #endif
2502 tgl 820 GIC 56 : }
821 :
822 :
823 : /*
824 : * This function is called by both Unix and Windows variants to set up
825 : * and run a worker process. Caller should exit the process (or thread)
826 : * upon return.
3668 andrew 827 ECB : */
828 : static void
2502 tgl 829 GIC 28 : RunWorker(ArchiveHandle *AH, ParallelSlot *slot)
830 : {
831 : int pipefd[2];
2502 tgl 832 ECB :
833 : /* fetch child ends of pipes */
2502 tgl 834 GIC 28 : pipefd[PIPE_READ] = slot->pipeRevRead;
835 28 : pipefd[PIPE_WRITE] = slot->pipeRevWrite;
836 :
837 : /*
838 : * Clone the archive so that we have our own state to work with, and in
839 : * particular our own database connection.
840 : *
841 : * We clone on Unix as well as Windows, even though technically we don't
842 : * need to because fork() gives us a copy in our own address space
843 : * already. But CloneArchive resets the state information and also clones
2502 tgl 844 ECB : * the database connection which both seem kinda helpful.
845 : */
2502 tgl 846 GIC 28 : AH = CloneArchive(AH);
2502 tgl 847 ECB :
848 : /* Remember cloned archive where signal handler can find it */
2502 tgl 849 GIC 28 : set_cancel_slot_archive(slot, AH);
850 :
851 : /*
3668 andrew 852 ECB : * Call the setup worker function that's defined in the ArchiveHandle.
853 : */
2643 tgl 854 GIC 28 : (AH->SetupWorkerPtr) ((Archive *) AH);
855 :
856 : /*
2507 tgl 857 ECB : * Execute commands until done.
858 : */
2643 tgl 859 GIC 28 : WaitForCommands(AH, pipefd);
860 :
861 : /*
2502 tgl 862 ECB : * Disconnect from database and clean up.
863 : */
2502 tgl 864 CBC 28 : set_cancel_slot_archive(slot, NULL);
865 28 : DisconnectDatabase(&(AH->public));
2502 tgl 866 GIC 28 : DeCloneArchive(AH);
3668 andrew 867 28 : }
868 :
869 : /*
870 : * Thread base function for Windows
871 : */
872 : #ifdef WIN32
873 : static unsigned __stdcall
874 : init_spawned_worker_win32(WorkerInfo *wi)
875 : {
876 : ArchiveHandle *AH = wi->AH;
877 : ParallelSlot *slot = wi->slot;
878 :
879 : /* Don't need WorkerInfo anymore */
880 : free(wi);
881 :
882 : /* Run the worker ... */
883 : RunWorker(AH, slot);
884 :
885 : /* Exit the thread */
886 : _endthreadex(0);
887 : return 0;
888 : }
889 : #endif /* WIN32 */
890 :
891 : /*
892 : * This function starts a parallel dump or restore by spawning off the worker
893 : * processes. For Windows, it creates a number of threads; on Unix the
894 : * workers are created with fork().
3668 andrew 895 ECB : */
896 : ParallelState *
2643 tgl 897 GIC 15 : ParallelBackupStart(ArchiveHandle *AH)
898 : {
899 : ParallelState *pstate;
3668 andrew 900 ECB : int i;
901 :
3668 andrew 902 CBC 15 : Assert(AH->public.numWorkers > 0);
903 :
904 15 : pstate = (ParallelState *) pg_malloc(sizeof(ParallelState));
3668 andrew 905 ECB :
3668 andrew 906 CBC 15 : pstate->numWorkers = AH->public.numWorkers;
2385 tgl 907 GIC 15 : pstate->te = NULL;
3668 andrew 908 CBC 15 : pstate->parallelSlot = NULL;
3668 andrew 909 ECB :
3668 andrew 910 GIC 15 : if (AH->public.numWorkers == 1)
911 2 : return pstate;
3668 andrew 912 ECB :
1164 tgl 913 : /* Create status arrays, being sure to initialize all fields to 0 */
2385 tgl 914 CBC 13 : pstate->te = (TocEntry **)
915 13 : pg_malloc0(pstate->numWorkers * sizeof(TocEntry *));
2385 tgl 916 GIC 13 : pstate->parallelSlot = (ParallelSlot *)
917 13 : pg_malloc0(pstate->numWorkers * sizeof(ParallelSlot));
918 :
919 : #ifdef WIN32
920 : /* Make fmtId() and fmtQualifiedId() use thread-local storage */
921 : getLocalPQExpBuffer = getThreadLocalPQExpBuffer;
922 : #endif
923 :
924 : /*
925 : * Set the pstate in shutdown_info, to tell the exit handler that it must
926 : * clean up workers as well as the main database connection. But we don't
927 : * set this in signal_info yet, because we don't want child processes to
2502 tgl 928 ECB : * inherit non-NULL signal_info.pstate.
929 : */
2502 tgl 930 GIC 13 : shutdown_info.pstate = pstate;
931 :
932 : /*
933 : * Temporarily disable query cancellation on the leader connection. This
934 : * ensures that child processes won't inherit valid AH->connCancel
935 : * settings and thus won't try to issue cancels against the leader's
936 : * connection. No harm is done if we fail while it's disabled, because
1029 andres 937 ECB : * the leader connection is idle at this point anyway.
938 : */
2502 tgl 939 GIC 13 : set_archive_cancel_info(AH, NULL);
2502 tgl 940 ECB :
941 : /* Ensure stdio state is quiesced before forking */
2502 tgl 942 GIC 13 : fflush(NULL);
2502 tgl 943 ECB :
944 : /* Create desired number of workers */
3668 andrew 945 GIC 41 : for (i = 0; i < pstate->numWorkers; i++)
946 : {
947 : #ifdef WIN32
948 : WorkerInfo *wi;
949 : uintptr_t handle;
950 : #else
3668 andrew 951 ECB : pid_t pid;
952 : #endif
2502 tgl 953 GIC 28 : ParallelSlot *slot = &(pstate->parallelSlot[i]);
954 : int pipeMW[2],
955 : pipeWM[2];
3668 andrew 956 ECB :
2507 tgl 957 EUB : /* Create communication pipes for this worker */
3668 andrew 958 GIC 28 : if (pgpipe(pipeMW) < 0 || pgpipe(pipeWM) < 0)
366 tgl 959 UIC 0 : pg_fatal("could not create communication channels: %m");
3668 andrew 960 ECB :
1029 andres 961 : /* leader's ends of the pipes */
2502 tgl 962 GIC 28 : slot->pipeRead = pipeWM[PIPE_READ];
2502 tgl 963 CBC 28 : slot->pipeWrite = pipeMW[PIPE_WRITE];
2510 tgl 964 ECB : /* child's ends of the pipes */
2502 tgl 965 GIC 28 : slot->pipeRevRead = pipeMW[PIPE_READ];
966 28 : slot->pipeRevWrite = pipeWM[PIPE_WRITE];
967 :
968 : #ifdef WIN32
969 : /* Create transient structure to pass args to worker function */
970 : wi = (WorkerInfo *) pg_malloc(sizeof(WorkerInfo));
971 :
972 : wi->AH = AH;
973 : wi->slot = slot;
974 :
975 : handle = _beginthreadex(NULL, 0, (void *) &init_spawned_worker_win32,
976 : wi, 0, &(slot->threadId));
977 : slot->hThread = handle;
1164 tgl 978 ECB : slot->workerStatus = WRKR_IDLE;
2507 979 : #else /* !WIN32 */
3668 andrew 980 GIC 28 : pid = fork();
981 56 : if (pid == 0)
982 : {
983 : /* we are the worker */
984 : int j;
3292 rhaas 985 ECB :
986 : /* this is needed for GetMyPSlot() */
2502 tgl 987 GIC 28 : slot->pid = getpid();
3668 andrew 988 ECB :
989 : /* instruct signal handler that we're in a worker now */
2502 tgl 990 GIC 28 : signal_info.am_worker = true;
3668 andrew 991 ECB :
992 : /* close read end of Worker -> Leader */
3668 andrew 993 CBC 28 : closesocket(pipeWM[PIPE_READ]);
994 : /* close write end of Leader -> Worker */
3668 andrew 995 GIC 28 : closesocket(pipeMW[PIPE_WRITE]);
996 :
997 : /*
998 : * Close all inherited fds for communication of the leader with
2510 tgl 999 ECB : * previously-forked workers.
1000 : */
3668 andrew 1001 CBC 45 : for (j = 0; j < i; j++)
3668 andrew 1002 ECB : {
3668 andrew 1003 GIC 17 : closesocket(pstate->parallelSlot[j].pipeRead);
1004 17 : closesocket(pstate->parallelSlot[j].pipeWrite);
1005 : }
3668 andrew 1006 ECB :
1007 : /* Run the worker ... */
2502 tgl 1008 GIC 28 : RunWorker(AH, slot);
2507 tgl 1009 ECB :
1010 : /* We can just exit(0) when done */
3668 andrew 1011 CBC 28 : exit(0);
1012 : }
3668 andrew 1013 GIC 28 : else if (pid < 0)
2507 tgl 1014 EUB : {
1015 : /* fork failed */
366 tgl 1016 UIC 0 : pg_fatal("could not create worker process: %m");
1017 : }
3668 andrew 1018 ECB :
1029 andres 1019 : /* In Leader after successful fork */
2502 tgl 1020 GIC 28 : slot->pid = pid;
1164 1021 28 : slot->workerStatus = WRKR_IDLE;
3668 andrew 1022 ECB :
1023 : /* close read end of Leader -> Worker */
3668 andrew 1024 CBC 28 : closesocket(pipeMW[PIPE_READ]);
1025 : /* close write end of Worker -> Leader */
3668 andrew 1026 GIC 28 : closesocket(pipeWM[PIPE_WRITE]);
1027 : #endif /* WIN32 */
1028 : }
1029 :
1030 : /*
1031 : * Having forked off the workers, disable SIGPIPE so that leader isn't
1032 : * killed if it tries to send a command to a dead worker. We don't want
1033 : * the workers to inherit this setting, though.
2510 tgl 1034 ECB : */
1035 : #ifndef WIN32
2502 tgl 1036 GIC 13 : pqsignal(SIGPIPE, SIG_IGN);
1037 : #endif
1038 :
1039 : /*
1029 andres 1040 ECB : * Re-establish query cancellation on the leader connection.
1041 : */
2502 tgl 1042 GIC 13 : set_archive_cancel_info(AH, AH->connection);
1043 :
1044 : /*
1045 : * Tell the cancel signal handler to forward signals to worker processes,
1046 : * too. (As with query cancel, we did not need this earlier because the
1047 : * workers have not yet been given anything to do; if we die before this
2502 tgl 1048 ECB : * point, any already-started workers will see EOF and quit promptly.)
1049 : */
2502 tgl 1050 CBC 13 : set_cancel_pstate(pstate);
1051 :
3668 andrew 1052 GIC 13 : return pstate;
1053 : }
1054 :
1055 : /*
1056 : * Close down a parallel dump or restore.
3668 andrew 1057 ECB : */
1058 : void
3668 andrew 1059 GIC 15 : ParallelBackupEnd(ArchiveHandle *AH, ParallelState *pstate)
1060 : {
1061 : int i;
3668 andrew 1062 ECB :
2507 tgl 1063 : /* No work if non-parallel */
3668 andrew 1064 GIC 15 : if (pstate->numWorkers == 1)
1065 2 : return;
3668 andrew 1066 ECB :
1067 : /* There should not be any unfinished jobs */
3668 andrew 1068 GIC 13 : Assert(IsEveryWorkerIdle(pstate));
3668 andrew 1069 ECB :
1070 : /* Close the sockets so that the workers know they can exit */
3668 andrew 1071 CBC 41 : for (i = 0; i < pstate->numWorkers; i++)
3668 andrew 1072 ECB : {
3668 andrew 1073 GIC 28 : closesocket(pstate->parallelSlot[i].pipeRead);
1074 28 : closesocket(pstate->parallelSlot[i].pipeWrite);
1075 : }
2507 tgl 1076 ECB :
1077 : /* Wait for them to exit */
3668 andrew 1078 GIC 13 : WaitForTerminatingWorkers(pstate);
1079 :
1080 : /*
1081 : * Unlink pstate from shutdown_info, so the exit handler will not try to
2502 tgl 1082 ECB : * use it; and likewise unlink from signal_info.
3668 andrew 1083 : */
3668 andrew 1084 GIC 13 : shutdown_info.pstate = NULL;
2502 tgl 1085 13 : set_cancel_pstate(NULL);
3668 andrew 1086 ECB :
2507 tgl 1087 : /* Release state (mere neatnik-ism, since we're about to terminate) */
2385 tgl 1088 CBC 13 : free(pstate->te);
3668 andrew 1089 GIC 13 : free(pstate->parallelSlot);
1090 13 : free(pstate);
1091 : }
1092 :
1093 : /*
1094 : * These next four functions handle construction and parsing of the command
1095 : * strings and response strings for parallel workers.
1096 : *
1097 : * Currently, these can be the same regardless of which archive format we are
1098 : * processing. In future, we might want to let format modules override these
1099 : * functions to add format-specific data to a command or response.
1100 : */
1101 :
1102 : /*
1103 : * buildWorkerCommand: format a command string to send to a worker.
1104 : *
1105 : * The string is built in the caller-supplied buffer of size buflen.
2385 tgl 1106 ECB : */
1107 : static void
2385 tgl 1108 GIC 168 : buildWorkerCommand(ArchiveHandle *AH, TocEntry *te, T_Action act,
2385 tgl 1109 ECB : char *buf, int buflen)
1110 : {
2385 tgl 1111 CBC 168 : if (act == ACT_DUMP)
1112 122 : snprintf(buf, buflen, "DUMP %d", te->dumpId);
2385 tgl 1113 GIC 46 : else if (act == ACT_RESTORE)
2385 tgl 1114 GBC 46 : snprintf(buf, buflen, "RESTORE %d", te->dumpId);
2385 tgl 1115 ECB : else
2385 tgl 1116 UIC 0 : Assert(false);
2385 tgl 1117 GIC 168 : }
1118 :
1119 : /*
1120 : * parseWorkerCommand: interpret a command string in a worker.
2385 tgl 1121 ECB : */
1122 : static void
2385 tgl 1123 GIC 168 : parseWorkerCommand(ArchiveHandle *AH, TocEntry **te, T_Action *act,
1124 : const char *msg)
1125 : {
1126 : DumpId dumpId;
2385 tgl 1127 ECB : int nBytes;
1128 :
2385 tgl 1129 CBC 168 : if (messageStartsWith(msg, "DUMP "))
2385 tgl 1130 ECB : {
2385 tgl 1131 CBC 122 : *act = ACT_DUMP;
1132 122 : sscanf(msg, "DUMP %d%n", &dumpId, &nBytes);
1133 122 : Assert(nBytes == strlen(msg));
2385 tgl 1134 GIC 122 : *te = getTocEntryByDumpId(AH, dumpId);
2385 tgl 1135 CBC 122 : Assert(*te != NULL);
1136 : }
1137 46 : else if (messageStartsWith(msg, "RESTORE "))
2385 tgl 1138 ECB : {
2385 tgl 1139 CBC 46 : *act = ACT_RESTORE;
1140 46 : sscanf(msg, "RESTORE %d%n", &dumpId, &nBytes);
1141 46 : Assert(nBytes == strlen(msg));
2385 tgl 1142 GIC 46 : *te = getTocEntryByDumpId(AH, dumpId);
1143 46 : Assert(*te != NULL);
2385 tgl 1144 EUB : }
1145 : else
366 tgl 1146 LBC 0 : pg_fatal("unrecognized command received from leader: \"%s\"",
1147 : msg);
2385 tgl 1148 GIC 168 : }
1149 :
1150 : /*
1151 : * buildWorkerResponse: format a response string to send to the leader.
1152 : *
1153 : * The string is built in the caller-supplied buffer of size buflen.
2385 tgl 1154 ECB : */
1155 : static void
2385 tgl 1156 GIC 168 : buildWorkerResponse(ArchiveHandle *AH, TocEntry *te, T_Action act, int status,
2385 tgl 1157 ECB : char *buf, int buflen)
1158 : {
2385 tgl 1159 GIC 168 : snprintf(buf, buflen, "OK %d %d %d",
1160 : te->dumpId,
2385 tgl 1161 ECB : status,
1162 : status == WORKER_IGNORED_ERRORS ? AH->public.n_errors : 0);
2385 tgl 1163 GIC 168 : }
1164 :
1165 : /*
1166 : * parseWorkerResponse: parse the status message returned by a worker.
1167 : *
1168 : * Returns the integer status code, and may update fields of AH and/or te.
2385 tgl 1169 ECB : */
1170 : static int
2385 tgl 1171 GIC 168 : parseWorkerResponse(ArchiveHandle *AH, TocEntry *te,
1172 : const char *msg)
1173 : {
1174 : DumpId dumpId;
2385 tgl 1175 ECB : int nBytes,
1176 : n_errors;
2385 tgl 1177 CBC 168 : int status = 0;
1178 :
1179 168 : if (messageStartsWith(msg, "OK "))
1180 : {
1181 168 : sscanf(msg, "OK %d %d %d%n", &dumpId, &status, &n_errors, &nBytes);
2385 tgl 1182 ECB :
2385 tgl 1183 GIC 168 : Assert(dumpId == te->dumpId);
2385 tgl 1184 CBC 168 : Assert(nBytes == strlen(msg));
1185 :
2385 tgl 1186 GIC 168 : AH->public.n_errors += n_errors;
2385 tgl 1187 EUB : }
1188 : else
366 tgl 1189 UIC 0 : pg_fatal("invalid message received from worker: \"%s\"",
366 tgl 1190 ECB : msg);
1191 :
2385 tgl 1192 GIC 168 : return status;
1193 : }
1194 :
1195 : /*
1196 : * Dispatch a job to some free worker.
1197 : *
1198 : * te is the TocEntry to be processed, act is the action to be taken on it.
1199 : * callback is the function to call on completion of the job.
1200 : *
1201 : * If no worker is currently available, this will block, and previously
1202 : * registered callback functions may be called.
3668 andrew 1203 ECB : */
1204 : void
2385 tgl 1205 GIC 168 : DispatchJobForTocEntry(ArchiveHandle *AH,
1206 : ParallelState *pstate,
1207 : TocEntry *te,
1208 : T_Action act,
1209 : ParallelCompletionPtr callback,
1210 : void *callback_data)
1211 : {
1212 : int worker;
1213 : char buf[256];
3668 andrew 1214 ECB :
2385 tgl 1215 : /* Get a worker, waiting if none are idle */
2385 tgl 1216 GIC 273 : while ((worker = GetIdleWorker(pstate)) == NO_SLOT)
1217 105 : WaitForWorkers(AH, pstate, WFW_ONE_IDLE);
3668 andrew 1218 ECB :
1219 : /* Construct and send command string */
2385 tgl 1220 CBC 168 : buildWorkerCommand(AH, te, act, buf, sizeof(buf));
1221 :
2385 tgl 1222 GIC 168 : sendMessageToWorker(pstate, worker, buf);
2507 tgl 1223 ECB :
1224 : /* Remember worker is busy, and which TocEntry it's working on */
3668 andrew 1225 CBC 168 : pstate->parallelSlot[worker].workerStatus = WRKR_WORKING;
2385 tgl 1226 168 : pstate->parallelSlot[worker].callback = callback;
1227 168 : pstate->parallelSlot[worker].callback_data = callback_data;
2385 tgl 1228 GIC 168 : pstate->te[worker] = te;
3668 andrew 1229 168 : }
1230 :
1231 : /*
1232 : * Find an idle worker and return its slot number.
1233 : * Return NO_SLOT if none are idle.
3668 andrew 1234 ECB : */
1235 : static int
3668 andrew 1236 GIC 423 : GetIdleWorker(ParallelState *pstate)
1237 : {
3668 andrew 1238 ECB : int i;
1239 :
3668 andrew 1240 CBC 1083 : for (i = 0; i < pstate->numWorkers; i++)
2507 tgl 1241 ECB : {
3668 andrew 1242 GIC 838 : if (pstate->parallelSlot[i].workerStatus == WRKR_IDLE)
3668 andrew 1243 CBC 178 : return i;
1244 : }
3668 andrew 1245 GIC 245 : return NO_SLOT;
1246 : }
1247 :
1248 : /*
1249 : * Return true iff no worker is running.
3668 andrew 1250 ECB : */
1251 : static bool
3668 andrew 1252 GIC 41 : HasEveryWorkerTerminated(ParallelState *pstate)
1253 : {
3668 andrew 1254 ECB : int i;
1255 :
3668 andrew 1256 CBC 76 : for (i = 0; i < pstate->numWorkers; i++)
2507 tgl 1257 ECB : {
1164 tgl 1258 GIC 63 : if (WORKER_IS_RUNNING(pstate->parallelSlot[i].workerStatus))
3668 andrew 1259 CBC 28 : return false;
1260 : }
3668 andrew 1261 GIC 13 : return true;
1262 : }
1263 :
1264 : /*
1265 : * Return true iff every worker is in the WRKR_IDLE state.
3668 andrew 1266 ECB : */
1267 : bool
3668 andrew 1268 GIC 71 : IsEveryWorkerIdle(ParallelState *pstate)
1269 : {
3668 andrew 1270 ECB : int i;
1271 :
3668 andrew 1272 CBC 156 : for (i = 0; i < pstate->numWorkers; i++)
2507 tgl 1273 ECB : {
3668 andrew 1274 GIC 122 : if (pstate->parallelSlot[i].workerStatus != WRKR_IDLE)
3668 andrew 1275 CBC 37 : return false;
1276 : }
3668 andrew 1277 GIC 34 : return true;
1278 : }
1279 :
1280 : /*
1281 : * Acquire lock on a table to be dumped by a worker process.
1282 : *
1283 : * The leader process is already holding an ACCESS SHARE lock. Ordinarily
1284 : * it's no problem for a worker to get one too, but if anything else besides
1285 : * pg_dump is running, there's a possible deadlock:
1286 : *
1287 : * 1) Leader dumps the schema and locks all tables in ACCESS SHARE mode.
1288 : * 2) Another process requests an ACCESS EXCLUSIVE lock (which is not granted
1289 : * because the leader holds a conflicting ACCESS SHARE lock).
1290 : * 3) A worker process also requests an ACCESS SHARE lock to read the table.
1291 : * The worker is enqueued behind the ACCESS EXCLUSIVE lock request.
1292 : * 4) Now we have a deadlock, since the leader is effectively waiting for
1293 : * the worker. The server cannot detect that, however.
1294 : *
1295 : * To prevent an infinite wait, prior to touching a table in a worker, request
1296 : * a lock in ACCESS SHARE mode but with NOWAIT. If we don't get the lock,
1297 : * then we know that somebody else has requested an ACCESS EXCLUSIVE lock and
1298 : * so we have a deadlock. We must fail the backup in that case.
3668 andrew 1299 ECB : */
1300 : static void
2507 tgl 1301 GIC 122 : lockTableForWorker(ArchiveHandle *AH, TocEntry *te)
1302 : {
1303 : const char *qualId;
1304 : PQExpBuffer query;
1305 : PGresult *res;
3668 andrew 1306 ECB :
2507 tgl 1307 : /* Nothing to do for BLOBS */
2507 tgl 1308 GIC 122 : if (strcmp(te->desc, "BLOBS") == 0)
2507 tgl 1309 CBC 4 : return;
1310 :
1311 118 : query = createPQExpBuffer();
1312 :
1696 1313 118 : qualId = fmtQualifiedId(te->namespace, te->tag);
1314 :
3668 andrew 1315 GIC 118 : appendPQExpBuffer(query, "LOCK TABLE %s IN ACCESS SHARE MODE NOWAIT",
3668 andrew 1316 ECB : qualId);
1317 :
3668 andrew 1318 CBC 118 : res = PQexec(AH->connection, query->data);
3668 andrew 1319 EUB :
3668 andrew 1320 GIC 118 : if (!res || PQresultStatus(res) != PGRES_COMMAND_OK)
366 tgl 1321 UIC 0 : pg_fatal("could not obtain lock on relation \"%s\"\n"
1322 : "This usually means that someone requested an ACCESS EXCLUSIVE lock "
1323 : "on the table after the pg_dump parent process had gotten the "
366 tgl 1324 ECB : "initial ACCESS SHARE lock on the table.", qualId);
3668 andrew 1325 :
3668 andrew 1326 GIC 118 : PQclear(res);
1327 118 : destroyPQExpBuffer(query);
1328 : }
1329 :
1330 : /*
1331 : * WaitForCommands: main routine for a worker process.
1332 : *
1333 : * Read and execute commands from the leader until we see EOF on the pipe.
3668 andrew 1334 ECB : */
1335 : static void
2643 tgl 1336 GIC 28 : WaitForCommands(ArchiveHandle *AH, int pipefd[2])
1337 : {
1338 : char *command;
3668 andrew 1339 ECB : TocEntry *te;
1340 : T_Action act;
2385 tgl 1341 GIC 28 : int status = 0;
1342 : char buf[256];
1343 :
3668 andrew 1344 ECB : for (;;)
1345 : {
1029 andres 1346 GIC 196 : if (!(command = getMessageFromLeader(pipefd)))
3668 andrew 1347 ECB : {
1348 : /* EOF, so done */
3668 andrew 1349 GIC 28 : return;
1350 : }
3668 andrew 1351 ECB :
1352 : /* Decode the command */
2385 tgl 1353 CBC 168 : parseWorkerCommand(AH, &te, &act, command);
1354 :
2385 tgl 1355 GIC 168 : if (act == ACT_DUMP)
2385 tgl 1356 ECB : {
1357 : /* Acquire lock on this table within the worker's session */
2507 tgl 1358 GIC 122 : lockTableForWorker(AH, te);
3668 andrew 1359 ECB :
1360 : /* Perform the dump command */
2385 tgl 1361 CBC 122 : status = (AH->WorkerJobDumpPtr) (AH, te);
1362 : }
2385 tgl 1363 GIC 46 : else if (act == ACT_RESTORE)
3668 andrew 1364 ECB : {
1365 : /* Perform the restore command */
2385 tgl 1366 GIC 46 : status = (AH->WorkerJobRestorePtr) (AH, te);
3668 andrew 1367 EUB : }
1368 : else
2385 tgl 1369 UIC 0 : Assert(false);
2385 tgl 1370 ECB :
1371 : /* Return status to leader */
2385 tgl 1372 CBC 168 : buildWorkerResponse(AH, te, act, status, buf, sizeof(buf));
1373 :
1029 andres 1374 GIC 168 : sendMessageToLeader(pipefd, buf);
3556 sfrost 1375 ECB :
1376 : /* command was pg_malloc'd and we are responsible for free()ing it. */
3556 sfrost 1377 GIC 168 : free(command);
1378 : }
1379 : }
1380 :
1381 : /*
1382 : * Check for status messages from workers.
1383 : *
1384 : * If do_wait is true, wait to get a status message; otherwise, just return
1385 : * immediately if there is none available.
1386 : *
1387 : * When we get a status message, we pass the status code to the callback
1388 : * function that was specified to DispatchJobForTocEntry, then reset the
1389 : * worker status to IDLE.
1390 : *
1391 : * Returns true if we collected a status message, else false.
1392 : *
1393 : * XXX is it worth checking for more than one status message per call?
1394 : * It seems somewhat unlikely that multiple workers would finish at exactly
1395 : * the same time.
3668 andrew 1396 ECB : */
1397 : static bool
3668 andrew 1398 GIC 327 : ListenToWorkers(ArchiveHandle *AH, ParallelState *pstate, bool do_wait)
1399 : {
1400 : int worker;
1401 : char *msg;
3668 andrew 1402 ECB :
1403 : /* Try to collect a status message */
3668 andrew 1404 CBC 327 : msg = getMessageFromWorker(pstate, do_wait, &worker);
1405 :
3668 andrew 1406 GIC 327 : if (!msg)
3668 andrew 1407 ECB : {
2507 tgl 1408 EUB : /* If do_wait is true, we must have detected EOF on some socket */
3668 andrew 1409 CBC 159 : if (do_wait)
366 tgl 1410 UIC 0 : pg_fatal("a worker process died unexpectedly");
2385 tgl 1411 GIC 159 : return false;
1412 : }
3668 andrew 1413 ECB :
1414 : /* Process it and update our idea of the worker's status */
3668 andrew 1415 CBC 168 : if (messageStartsWith(msg, "OK "))
3668 andrew 1416 ECB : {
2385 tgl 1417 GIC 168 : ParallelSlot *slot = &pstate->parallelSlot[worker];
1418 168 : TocEntry *te = pstate->te[worker];
2385 tgl 1419 ECB : int status;
3668 andrew 1420 :
2385 tgl 1421 CBC 168 : status = parseWorkerResponse(AH, te, msg);
1422 168 : slot->callback(AH, te, status, slot->callback_data);
2385 tgl 1423 GIC 168 : slot->workerStatus = WRKR_IDLE;
1424 168 : pstate->te[worker] = NULL;
3668 andrew 1425 EUB : }
1426 : else
366 tgl 1427 UIC 0 : pg_fatal("invalid message received from worker: \"%s\"",
1428 : msg);
3668 andrew 1429 ECB :
1430 : /* Free the string returned from getMessageFromWorker */
3668 andrew 1431 CBC 168 : free(msg);
1432 :
2385 tgl 1433 GIC 168 : return true;
1434 : }
1435 :
1436 : /*
1437 : * Check for status results from workers, waiting if necessary.
1438 : *
1439 : * Available wait modes are:
1440 : * WFW_NO_WAIT: reap any available status, but don't block
1441 : * WFW_GOT_STATUS: wait for at least one more worker to finish
1442 : * WFW_ONE_IDLE: wait for at least one worker to be idle
1443 : * WFW_ALL_IDLE: wait for all workers to be idle
1444 : *
1445 : * Any received results are passed to the callback specified to
1446 : * DispatchJobForTocEntry.
1447 : *
1448 : * This function is executed in the leader process.
3668 andrew 1449 ECB : */
1450 : void
2385 tgl 1451 CBC 170 : WaitForWorkers(ArchiveHandle *AH, ParallelState *pstate, WFW_WaitOption mode)
1452 : {
2385 tgl 1453 GIC 170 : bool do_wait = false;
1454 :
1455 : /*
1456 : * In GOT_STATUS mode, always block waiting for a message, since we can't
1457 : * return till we get something. In other modes, we don't block the first
2385 tgl 1458 ECB : * time through the loop.
1459 : */
2385 tgl 1460 GIC 170 : if (mode == WFW_GOT_STATUS)
3668 andrew 1461 ECB : {
2385 tgl 1462 : /* Assert that caller knows what it's doing */
2385 tgl 1463 GIC 10 : Assert(!IsEveryWorkerIdle(pstate));
1464 10 : do_wait = true;
1465 : }
1466 :
1467 : for (;;)
1468 : {
1469 : /*
1470 : * Check for status messages, even if we don't need to block. We do
1471 : * not try very hard to reap all available messages, though, since
2385 tgl 1472 ECB : * there's unlikely to be more than one.
1473 : */
2385 tgl 1474 GIC 327 : if (ListenToWorkers(AH, pstate, do_wait))
1475 : {
1476 : /*
1477 : * If we got a message, we are done by definition for GOT_STATUS
1478 : * mode, and we can also be certain that there's at least one idle
2385 tgl 1479 ECB : * worker. So we're done in all but ALL_IDLE mode.
1480 : */
2385 tgl 1481 GIC 168 : if (mode != WFW_ALL_IDLE)
1482 151 : return;
1483 : }
3668 andrew 1484 ECB :
1485 : /* Check whether we must wait for new status messages */
2385 tgl 1486 GBC 176 : switch (mode)
2385 tgl 1487 EUB : {
2385 tgl 1488 UBC 0 : case WFW_NO_WAIT:
1489 0 : return; /* never wait */
2385 tgl 1490 UIC 0 : case WFW_GOT_STATUS:
2385 tgl 1491 LBC 0 : Assert(false); /* can't get here, because we waited */
2385 tgl 1492 ECB : break;
2385 tgl 1493 CBC 150 : case WFW_ONE_IDLE:
1494 150 : if (GetIdleWorker(pstate) != NO_SLOT)
1495 10 : return;
1496 140 : break;
1497 26 : case WFW_ALL_IDLE:
1498 26 : if (IsEveryWorkerIdle(pstate))
2385 tgl 1499 GIC 9 : return;
1500 17 : break;
1501 : }
3668 andrew 1502 ECB :
1503 : /* Loop back, and this time wait for something to happen */
2385 tgl 1504 GIC 157 : do_wait = true;
1505 : }
1506 : }
1507 :
1508 : /*
1509 : * Read one command message from the leader, blocking if necessary
1510 : * until one is available, and return it as a malloc'd string.
1511 : * On EOF, return NULL.
1512 : *
1513 : * This function is executed in worker processes.
3668 andrew 1514 ECB : */
1515 : static char *
1029 andres 1516 CBC 196 : getMessageFromLeader(int pipefd[2])
1517 : {
3668 andrew 1518 GIC 196 : return readMessageFromPipe(pipefd[PIPE_READ]);
1519 : }
1520 :
1521 : /*
1522 : * Send a status message to the leader.
1523 : *
1524 : * This function is executed in worker processes.
3668 andrew 1525 ECB : */
1526 : static void
1029 andres 1527 CBC 168 : sendMessageToLeader(int pipefd[2], const char *str)
1528 : {
3668 andrew 1529 168 : int len = strlen(str) + 1;
3668 andrew 1530 EUB :
3668 andrew 1531 CBC 168 : if (pipewrite(pipefd[PIPE_WRITE], str, len) != len)
366 tgl 1532 UIC 0 : pg_fatal("could not write to the communication channel: %m");
3668 andrew 1533 GIC 168 : }
1534 :
1535 : /*
1536 : * Wait until some descriptor in "workerset" becomes readable.
1537 : * Returns -1 on error, else the number of readable descriptors.
3668 andrew 1538 ECB : */
1539 : static int
3668 andrew 1540 GIC 167 : select_loop(int maxFd, fd_set *workerset)
3668 andrew 1541 ECB : {
1542 : int i;
3668 andrew 1543 GIC 167 : fd_set saveSet = *workerset;
1544 :
3668 andrew 1545 ECB : for (;;)
1546 : {
3668 andrew 1547 GIC 167 : *workerset = saveSet;
1548 167 : i = select(maxFd + 1, workerset, NULL, NULL, NULL);
3668 andrew 1549 ECB :
2502 tgl 1550 EUB : #ifndef WIN32
3668 andrew 1551 GIC 167 : if (i < 0 && errno == EINTR)
3668 andrew 1552 UIC 0 : continue;
1553 : #else
1554 : if (i == SOCKET_ERROR && WSAGetLastError() == WSAEINTR)
2502 tgl 1555 ECB : continue;
1556 : #endif
3668 andrew 1557 GIC 167 : break;
3668 andrew 1558 ECB : }
1559 :
3668 andrew 1560 GIC 167 : return i;
1561 : }
1562 :
1563 :
1564 : /*
1565 : * Check for messages from worker processes.
1566 : *
1567 : * If a message is available, return it as a malloc'd string, and put the
1568 : * index of the sending worker in *worker.
1569 : *
1570 : * If nothing is available, wait if "do_wait" is true, else return NULL.
1571 : *
1572 : * If we detect EOF on any socket, we'll return NULL. It's not great that
1573 : * that's hard to distinguish from the no-data-available case, but for now
1574 : * our one caller is okay with that.
1575 : *
1576 : * This function is executed in the leader process.
3668 andrew 1577 ECB : */
1578 : static char *
3668 andrew 1579 GIC 327 : getMessageFromWorker(ParallelState *pstate, bool do_wait, int *worker)
1580 : {
3668 andrew 1581 ECB : int i;
1582 : fd_set workerset;
3668 andrew 1583 GIC 327 : int maxFd = -1;
1584 327 : struct timeval nowait = {0, 0};
3668 andrew 1585 ECB :
2507 tgl 1586 : /* construct bitmap of socket descriptors for select() */
3668 andrew 1587 GIC 5559 : FD_ZERO(&workerset);
3668 andrew 1588 CBC 1069 : for (i = 0; i < pstate->numWorkers; i++)
3668 andrew 1589 EUB : {
1164 tgl 1590 CBC 742 : if (!WORKER_IS_RUNNING(pstate->parallelSlot[i].workerStatus))
3668 andrew 1591 LBC 0 : continue;
3668 andrew 1592 CBC 742 : FD_SET(pstate->parallelSlot[i].pipeRead, &workerset);
3668 andrew 1593 GIC 742 : if (pstate->parallelSlot[i].pipeRead > maxFd)
1594 742 : maxFd = pstate->parallelSlot[i].pipeRead;
3668 andrew 1595 ECB : }
1596 :
3668 andrew 1597 CBC 327 : if (do_wait)
3668 andrew 1598 ECB : {
3668 andrew 1599 GIC 167 : i = select_loop(maxFd, &workerset);
1600 167 : Assert(i != 0);
1601 : }
3668 andrew 1602 ECB : else
1603 : {
3668 andrew 1604 GIC 160 : if ((i = select(maxFd + 1, &workerset, NULL, NULL, &nowait)) == 0)
1605 159 : return NULL;
3668 andrew 1606 ECB : }
3668 andrew 1607 EUB :
3668 andrew 1608 GIC 168 : if (i < 0)
366 tgl 1609 LBC 0 : pg_fatal("%s() failed: %m", "select");
1610 :
3668 andrew 1611 GIC 289 : for (i = 0; i < pstate->numWorkers; i++)
1612 : {
3668 andrew 1613 ECB : char *msg;
3668 andrew 1614 EUB :
1164 tgl 1615 CBC 289 : if (!WORKER_IS_RUNNING(pstate->parallelSlot[i].workerStatus))
1164 tgl 1616 LBC 0 : continue;
3668 andrew 1617 GIC 289 : if (!FD_ISSET(pstate->parallelSlot[i].pipeRead, &workerset))
1618 121 : continue;
1619 :
1620 : /*
1621 : * Read the message if any. If the socket is ready because of EOF,
1622 : * we'll return NULL instead (and the socket will stay ready, so the
1623 : * condition will persist).
1624 : *
1625 : * Note: because this is a blocking read, we'll wait if only part of
1626 : * the message is available. Waiting a long time would be bad, but
1627 : * since worker status messages are short and are always sent in one
2507 tgl 1628 ECB : * operation, it shouldn't be a problem in practice.
1629 : */
3668 andrew 1630 CBC 168 : msg = readMessageFromPipe(pstate->parallelSlot[i].pipeRead);
3668 andrew 1631 GIC 168 : *worker = i;
3668 andrew 1632 GBC 168 : return msg;
1633 : }
3668 andrew 1634 UIC 0 : Assert(false);
1635 : return NULL;
1636 : }
1637 :
1638 : /*
1639 : * Send a command message to the specified worker process.
1640 : *
1641 : * This function is executed in the leader process.
3668 andrew 1642 ECB : */
1643 : static void
3668 andrew 1644 CBC 168 : sendMessageToWorker(ParallelState *pstate, int worker, const char *str)
1645 : {
1646 168 : int len = strlen(str) + 1;
1647 :
3668 andrew 1648 GBC 168 : if (pipewrite(pstate->parallelSlot[worker].pipeWrite, str, len) != len)
1649 : {
366 tgl 1650 LBC 0 : pg_fatal("could not write to the communication channel: %m");
1651 : }
3668 andrew 1652 GIC 168 : }
1653 :
1654 : /*
1655 : * Read one message from the specified pipe (fd), blocking if necessary
1656 : * until one is available, and return it as a malloc'd string.
1657 : * On EOF, return NULL.
1658 : *
1659 : * A "message" on the channel is just a null-terminated string.
3668 andrew 1660 ECB : */
1661 : static char *
3668 andrew 1662 GIC 364 : readMessageFromPipe(int fd)
1663 : {
1664 : char *msg;
1665 : int msgsize,
1666 : bufsize;
1667 : int ret;
1668 :
1669 : /*
1670 : * In theory, if we let piperead() read multiple bytes, it might give us
1671 : * back fragments of multiple messages. (That can't actually occur, since
1672 : * neither leader nor workers send more than one message without waiting
1673 : * for a reply, but we don't wish to assume that here.) For simplicity,
1674 : * read a byte at a time until we get the terminating '\0'. This method
1675 : * is a bit inefficient, but since this is only used for relatively short
2507 tgl 1676 ECB : * command and status strings, it shouldn't matter.
3668 andrew 1677 : */
3668 andrew 1678 CBC 364 : bufsize = 64; /* could be any number */
3668 andrew 1679 GIC 364 : msg = (char *) pg_malloc(bufsize);
1680 364 : msgsize = 0;
3668 andrew 1681 ECB : for (;;)
1682 : {
2507 tgl 1683 CBC 3862 : Assert(msgsize < bufsize);
3668 andrew 1684 3862 : ret = piperead(fd, msg + msgsize, 1);
3668 andrew 1685 GIC 3862 : if (ret <= 0)
2507 tgl 1686 CBC 28 : break; /* error or connection closure */
1687 :
3668 andrew 1688 3834 : Assert(ret == 1);
3668 andrew 1689 ECB :
3668 andrew 1690 GIC 3834 : if (msg[msgsize] == '\0')
2507 tgl 1691 CBC 336 : return msg; /* collected whole message */
3668 andrew 1692 ECB :
3668 andrew 1693 GIC 3498 : msgsize++;
2507 tgl 1694 GBC 3498 : if (msgsize == bufsize) /* enlarge buffer if needed */
3668 andrew 1695 EUB : {
2507 tgl 1696 UIC 0 : bufsize += 16; /* could be any number */
3064 1697 0 : msg = (char *) pg_realloc(msg, bufsize);
1698 : }
1699 : }
3362 sfrost 1700 ECB :
2507 tgl 1701 : /* Other end has closed the connection */
3064 tgl 1702 GIC 28 : pg_free(msg);
3362 sfrost 1703 28 : return NULL;
1704 : }
1705 :
1706 : #ifdef WIN32
1707 :
1708 : /*
1709 : * This is a replacement version of pipe(2) for Windows which allows the pipe
1710 : * handles to be used in select().
1711 : *
1712 : * Reads and writes on the pipe must go through piperead()/pipewrite().
1713 : *
1714 : * For consistency with Unix we declare the returned handles as "int".
1715 : * This is okay even on WIN64 because system handles are not more than
1716 : * 32 bits wide, but we do have to do some casting.
1717 : */
1718 : static int
1719 : pgpipe(int handles[2])
1720 : {
1721 : pgsocket s,
1722 : tmp_sock;
1723 : struct sockaddr_in serv_addr;
1724 : int len = sizeof(serv_addr);
1725 :
1726 : /* We have to use the Unix socket invalid file descriptor value here. */
1727 : handles[0] = handles[1] = -1;
1728 :
1729 : /*
1730 : * setup listen socket
1731 : */
1732 : if ((s = socket(AF_INET, SOCK_STREAM, 0)) == PGINVALID_SOCKET)
1733 : {
1734 : pg_log_error("pgpipe: could not create socket: error code %d",
1735 : WSAGetLastError());
1736 : return -1;
1737 : }
1738 :
1739 : memset(&serv_addr, 0, sizeof(serv_addr));
1740 : serv_addr.sin_family = AF_INET;
1741 : serv_addr.sin_port = pg_hton16(0);
1742 : serv_addr.sin_addr.s_addr = pg_hton32(INADDR_LOOPBACK);
1743 : if (bind(s, (SOCKADDR *) &serv_addr, len) == SOCKET_ERROR)
1744 : {
1745 : pg_log_error("pgpipe: could not bind: error code %d",
1746 : WSAGetLastError());
1747 : closesocket(s);
1748 : return -1;
1749 : }
1750 : if (listen(s, 1) == SOCKET_ERROR)
1751 : {
1752 : pg_log_error("pgpipe: could not listen: error code %d",
1753 : WSAGetLastError());
1754 : closesocket(s);
1755 : return -1;
1756 : }
1757 : if (getsockname(s, (SOCKADDR *) &serv_addr, &len) == SOCKET_ERROR)
1758 : {
1759 : pg_log_error("pgpipe: %s() failed: error code %d", "getsockname",
1760 : WSAGetLastError());
1761 : closesocket(s);
1762 : return -1;
1763 : }
1764 :
1765 : /*
1766 : * setup pipe handles
1767 : */
1768 : if ((tmp_sock = socket(AF_INET, SOCK_STREAM, 0)) == PGINVALID_SOCKET)
1769 : {
1770 : pg_log_error("pgpipe: could not create second socket: error code %d",
1771 : WSAGetLastError());
1772 : closesocket(s);
1773 : return -1;
1774 : }
1775 : handles[1] = (int) tmp_sock;
1776 :
1777 : if (connect(handles[1], (SOCKADDR *) &serv_addr, len) == SOCKET_ERROR)
1778 : {
1779 : pg_log_error("pgpipe: could not connect socket: error code %d",
1780 : WSAGetLastError());
1781 : closesocket(handles[1]);
1782 : handles[1] = -1;
1783 : closesocket(s);
1784 : return -1;
1785 : }
1786 : if ((tmp_sock = accept(s, (SOCKADDR *) &serv_addr, &len)) == PGINVALID_SOCKET)
1787 : {
1788 : pg_log_error("pgpipe: could not accept connection: error code %d",
1789 : WSAGetLastError());
1790 : closesocket(handles[1]);
1791 : handles[1] = -1;
1792 : closesocket(s);
1793 : return -1;
1794 : }
1795 : handles[0] = (int) tmp_sock;
1796 :
1797 : closesocket(s);
1798 : return 0;
1799 : }
1800 :
1801 : #endif /* WIN32 */
|