Age Owner Branch data TLA Line data Source code
1 : : /*-------------------------------------------------------------------------
2 : : *
3 : : * parallel.c
4 : : *
5 : : * Parallel support for pg_dump and pg_restore
6 : : *
7 : : * Portions Copyright (c) 1996-2024, PostgreSQL Global Development Group
8 : : * Portions Copyright (c) 1994, Regents of the University of California
9 : : *
10 : : * IDENTIFICATION
11 : : * src/bin/pg_dump/parallel.c
12 : : *
13 : : *-------------------------------------------------------------------------
14 : : */
15 : :
16 : : /*
17 : : * Parallel operation works like this:
18 : : *
19 : : * The original, leader process calls ParallelBackupStart(), which forks off
20 : : * the desired number of worker processes, which each enter WaitForCommands().
21 : : *
22 : : * The leader process dispatches an individual work item to one of the worker
23 : : * processes in DispatchJobForTocEntry(). We send a command string such as
24 : : * "DUMP 1234" or "RESTORE 1234", where 1234 is the TocEntry ID.
25 : : * The worker process receives and decodes the command and passes it to the
26 : : * routine pointed to by AH->WorkerJobDumpPtr or AH->WorkerJobRestorePtr,
27 : : * which are routines of the current archive format. That routine performs
28 : : * the required action (dump or restore) and returns an integer status code.
29 : : * This is passed back to the leader where we pass it to the
30 : : * ParallelCompletionPtr callback function that was passed to
31 : : * DispatchJobForTocEntry(). The callback function does state updating
32 : : * for the leader control logic in pg_backup_archiver.c.
33 : : *
34 : : * In principle additional archive-format-specific information might be needed
35 : : * in commands or worker status responses, but so far that hasn't proved
36 : : * necessary, since workers have full copies of the ArchiveHandle/TocEntry
37 : : * data structures. Remember that we have forked off the workers only after
38 : : * we have read in the catalog. That's why our worker processes can also
39 : : * access the catalog information. (In the Windows case, the workers are
40 : : * threads in the same process. To avoid problems, they work with cloned
41 : : * copies of the Archive data structure; see RunWorker().)
42 : : *
43 : : * In the leader process, the workerStatus field for each worker has one of
44 : : * the following values:
45 : : * WRKR_NOT_STARTED: we've not yet forked this worker
46 : : * WRKR_IDLE: it's waiting for a command
47 : : * WRKR_WORKING: it's working on a command
48 : : * WRKR_TERMINATED: process ended
49 : : * The pstate->te[] entry for each worker is valid when it's in WRKR_WORKING
50 : : * state, and must be NULL in other states.
51 : : */
52 : :
53 : : #include "postgres_fe.h"
54 : :
55 : : #ifndef WIN32
56 : : #include <sys/select.h>
57 : : #include <sys/wait.h>
58 : : #include <signal.h>
59 : : #include <unistd.h>
60 : : #include <fcntl.h>
61 : : #endif
62 : :
63 : : #include "fe_utils/string_utils.h"
64 : : #include "parallel.h"
65 : : #include "pg_backup_utils.h"
66 : : #include "port/pg_bswap.h"
67 : :
68 : : /* Mnemonic macros for indexing the fd array returned by pipe(2) */
69 : : #define PIPE_READ 0
70 : : #define PIPE_WRITE 1
71 : :
72 : : #define NO_SLOT (-1) /* Failure result for GetIdleWorker() */
73 : :
74 : : /* Worker process statuses */
75 : : typedef enum
76 : : {
77 : : WRKR_NOT_STARTED = 0,
78 : : WRKR_IDLE,
79 : : WRKR_WORKING,
80 : : WRKR_TERMINATED,
81 : : } T_WorkerStatus;
82 : :
83 : : #define WORKER_IS_RUNNING(workerStatus) \
84 : : ((workerStatus) == WRKR_IDLE || (workerStatus) == WRKR_WORKING)
85 : :
86 : : /*
87 : : * Private per-parallel-worker state (typedef for this is in parallel.h).
88 : : *
89 : : * Much of this is valid only in the leader process (or, on Windows, should
90 : : * be touched only by the leader thread). But the AH field should be touched
91 : : * only by workers. The pipe descriptors are valid everywhere.
92 : : */
93 : : struct ParallelSlot
94 : : {
95 : : T_WorkerStatus workerStatus; /* see enum above */
96 : :
97 : : /* These fields are valid if workerStatus == WRKR_WORKING: */
98 : : ParallelCompletionPtr callback; /* function to call on completion */
99 : : void *callback_data; /* passthrough data for it */
100 : :
101 : : ArchiveHandle *AH; /* Archive data worker is using */
102 : :
103 : : int pipeRead; /* leader's end of the pipes */
104 : : int pipeWrite;
105 : : int pipeRevRead; /* child's end of the pipes */
106 : : int pipeRevWrite;
107 : :
108 : : /* Child process/thread identity info: */
109 : : #ifdef WIN32
110 : : uintptr_t hThread;
111 : : unsigned int threadId;
112 : : #else
113 : : pid_t pid;
114 : : #endif
115 : : };
116 : :
117 : : #ifdef WIN32
118 : :
119 : : /*
120 : : * Structure to hold info passed by _beginthreadex() to the function it calls
121 : : * via its single allowed argument.
122 : : */
123 : : typedef struct
124 : : {
125 : : ArchiveHandle *AH; /* leader database connection */
126 : : ParallelSlot *slot; /* this worker's parallel slot */
127 : : } WorkerInfo;
128 : :
129 : : /* Windows implementation of pipe access */
130 : : static int pgpipe(int handles[2]);
131 : : #define piperead(a,b,c) recv(a,b,c,0)
132 : : #define pipewrite(a,b,c) send(a,b,c,0)
133 : :
134 : : #else /* !WIN32 */
135 : :
136 : : /* Non-Windows implementation of pipe access */
137 : : #define pgpipe(a) pipe(a)
138 : : #define piperead(a,b,c) read(a,b,c)
139 : : #define pipewrite(a,b,c) write(a,b,c)
140 : :
141 : : #endif /* WIN32 */
142 : :
143 : : /*
144 : : * State info for archive_close_connection() shutdown callback.
145 : : */
146 : : typedef struct ShutdownInformation
147 : : {
148 : : ParallelState *pstate;
149 : : Archive *AHX;
150 : : } ShutdownInformation;
151 : :
152 : : static ShutdownInformation shutdown_info;
153 : :
154 : : /*
155 : : * State info for signal handling.
156 : : * We assume signal_info initializes to zeroes.
157 : : *
158 : : * On Unix, myAH is the leader DB connection in the leader process, and the
159 : : * worker's own connection in worker processes. On Windows, we have only one
160 : : * instance of signal_info, so myAH is the leader connection and the worker
161 : : * connections must be dug out of pstate->parallelSlot[].
162 : : */
163 : : typedef struct DumpSignalInformation
164 : : {
165 : : ArchiveHandle *myAH; /* database connection to issue cancel for */
166 : : ParallelState *pstate; /* parallel state, if any */
167 : : bool handler_set; /* signal handler set up in this process? */
168 : : #ifndef WIN32
169 : : bool am_worker; /* am I a worker process? */
170 : : #endif
171 : : } DumpSignalInformation;
172 : :
173 : : static volatile DumpSignalInformation signal_info;
174 : :
175 : : #ifdef WIN32
176 : : static CRITICAL_SECTION signal_info_lock;
177 : : #endif
178 : :
179 : : /*
180 : : * Write a simple string to stderr --- must be safe in a signal handler.
181 : : * We ignore the write() result since there's not much we could do about it.
182 : : * Certain compilers make that harder than it ought to be.
183 : : */
184 : : #define write_stderr(str) \
185 : : do { \
186 : : const char *str_ = (str); \
187 : : int rc_; \
188 : : rc_ = write(fileno(stderr), str_, strlen(str_)); \
189 : : (void) rc_; \
190 : : } while (0)
191 : :
192 : :
193 : : #ifdef WIN32
194 : : /* file-scope variables */
195 : : static DWORD tls_index;
196 : :
197 : : /* globally visible variables (needed by exit_nicely) */
198 : : bool parallel_init_done = false;
199 : : DWORD mainThreadId;
200 : : #endif /* WIN32 */
201 : :
202 : : /* Local function prototypes */
203 : : static ParallelSlot *GetMyPSlot(ParallelState *pstate);
204 : : static void archive_close_connection(int code, void *arg);
205 : : static void ShutdownWorkersHard(ParallelState *pstate);
206 : : static void WaitForTerminatingWorkers(ParallelState *pstate);
207 : : static void set_cancel_handler(void);
208 : : static void set_cancel_pstate(ParallelState *pstate);
209 : : static void set_cancel_slot_archive(ParallelSlot *slot, ArchiveHandle *AH);
210 : : static void RunWorker(ArchiveHandle *AH, ParallelSlot *slot);
211 : : static int GetIdleWorker(ParallelState *pstate);
212 : : static bool HasEveryWorkerTerminated(ParallelState *pstate);
213 : : static void lockTableForWorker(ArchiveHandle *AH, TocEntry *te);
214 : : static void WaitForCommands(ArchiveHandle *AH, int pipefd[2]);
215 : : static bool ListenToWorkers(ArchiveHandle *AH, ParallelState *pstate,
216 : : bool do_wait);
217 : : static char *getMessageFromLeader(int pipefd[2]);
218 : : static void sendMessageToLeader(int pipefd[2], const char *str);
219 : : static int select_loop(int maxFd, fd_set *workerset);
220 : : static char *getMessageFromWorker(ParallelState *pstate,
221 : : bool do_wait, int *worker);
222 : : static void sendMessageToWorker(ParallelState *pstate,
223 : : int worker, const char *str);
224 : : static char *readMessageFromPipe(int fd);
225 : :
226 : : #define messageStartsWith(msg, prefix) \
227 : : (strncmp(msg, prefix, strlen(prefix)) == 0)
228 : :
229 : :
230 : : /*
231 : : * Initialize parallel dump support --- should be called early in process
232 : : * startup. (Currently, this is called whether or not we intend parallel
233 : : * activity.)
234 : : */
235 : : void
4036 heikki.linnakangas@i 236 :CBC 311 : init_parallel_dump_utils(void)
237 : : {
238 : : #ifdef WIN32
239 : : if (!parallel_init_done)
240 : : {
241 : : WSADATA wsaData;
242 : : int err;
243 : :
244 : : /* Prepare for threaded operation */
245 : : tls_index = TlsAlloc();
246 : : mainThreadId = GetCurrentThreadId();
247 : :
248 : : /* Initialize socket access */
249 : : err = WSAStartup(MAKEWORD(2, 2), &wsaData);
250 : : if (err != 0)
251 : : pg_fatal("%s() failed: error code %d", "WSAStartup", err);
252 : :
253 : : parallel_init_done = true;
254 : : }
255 : : #endif
256 : 311 : }
257 : :
258 : : /*
259 : : * Find the ParallelSlot for the current worker process or thread.
260 : : *
261 : : * Returns NULL if no matching slot is found (this implies we're the leader).
262 : : */
263 : : static ParallelSlot *
4039 andrew@dunslane.net 264 :UBC 0 : GetMyPSlot(ParallelState *pstate)
265 : : {
266 : : int i;
267 : :
268 [ # # ]: 0 : for (i = 0; i < pstate->numWorkers; i++)
269 : : {
270 : : #ifdef WIN32
271 : : if (pstate->parallelSlot[i].threadId == GetCurrentThreadId())
272 : : #else
273 [ # # ]: 0 : if (pstate->parallelSlot[i].pid == getpid())
274 : : #endif
275 : 0 : return &(pstate->parallelSlot[i]);
276 : : }
277 : :
278 : 0 : return NULL;
279 : : }
280 : :
281 : : /*
282 : : * A thread-local version of getLocalPQExpBuffer().
283 : : *
284 : : * Non-reentrant but reduces memory leakage: we'll consume one buffer per
285 : : * thread, which is much better than one per fmtId/fmtQualifiedId call.
286 : : */
287 : : #ifdef WIN32
288 : : static PQExpBuffer
289 : : getThreadLocalPQExpBuffer(void)
290 : : {
291 : : /*
292 : : * The Tls code goes awry if we use a static var, so we provide for both
293 : : * static and auto, and omit any use of the static var when using Tls. We
294 : : * rely on TlsGetValue() to return 0 if the value is not yet set.
295 : : */
296 : : static PQExpBuffer s_id_return = NULL;
297 : : PQExpBuffer id_return;
298 : :
299 : : if (parallel_init_done)
300 : : id_return = (PQExpBuffer) TlsGetValue(tls_index);
301 : : else
302 : : id_return = s_id_return;
303 : :
304 : : if (id_return) /* first time through? */
305 : : {
306 : : /* same buffer, just wipe contents */
307 : : resetPQExpBuffer(id_return);
308 : : }
309 : : else
310 : : {
311 : : /* new buffer */
312 : : id_return = createPQExpBuffer();
313 : : if (parallel_init_done)
314 : : TlsSetValue(tls_index, id_return);
315 : : else
316 : : s_id_return = id_return;
317 : : }
318 : :
319 : : return id_return;
320 : : }
321 : : #endif /* WIN32 */
322 : :
323 : : /*
324 : : * pg_dump and pg_restore call this to register the cleanup handler
325 : : * as soon as they've created the ArchiveHandle.
326 : : */
327 : : void
4039 andrew@dunslane.net 328 :CBC 215 : on_exit_close_archive(Archive *AHX)
329 : : {
330 : 215 : shutdown_info.AHX = AHX;
331 : 215 : on_exit_nicely(archive_close_connection, &shutdown_info);
332 : 215 : }
333 : :
334 : : /*
335 : : * on_exit_nicely handler for shutting down database connections and
336 : : * worker processes cleanly.
337 : : */
338 : : static void
339 : 172 : archive_close_connection(int code, void *arg)
340 : : {
341 : 172 : ShutdownInformation *si = (ShutdownInformation *) arg;
342 : :
343 [ - + ]: 172 : if (si->pstate)
344 : : {
345 : : /* In parallel mode, must figure out who we are */
4039 andrew@dunslane.net 346 :UBC 0 : ParallelSlot *slot = GetMyPSlot(si->pstate);
347 : :
348 [ # # ]: 0 : if (!slot)
349 : : {
350 : : /*
351 : : * We're the leader. Forcibly shut down workers, then close our
352 : : * own database connection, if any.
353 : : */
2873 tgl@sss.pgh.pa.us 354 : 0 : ShutdownWorkersHard(si->pstate);
355 : :
2881 356 [ # # ]: 0 : if (si->AHX)
357 : 0 : DisconnectDatabase(si->AHX);
358 : : }
359 : : else
360 : : {
361 : : /*
362 : : * We're a worker. Shut down our own DB connection if any. On
363 : : * Windows, we also have to close our communication sockets, to
364 : : * emulate what will happen on Unix when the worker process exits.
365 : : * (Without this, if this is a premature exit, the leader would
366 : : * fail to detect it because there would be no EOF condition on
367 : : * the other end of the pipe.)
368 : : */
2756 369 [ # # ]: 0 : if (slot->AH)
370 : 0 : DisconnectDatabase(&(slot->AH->public));
371 : :
372 : : #ifdef WIN32
373 : : closesocket(slot->pipeRevRead);
374 : : closesocket(slot->pipeRevWrite);
375 : : #endif
376 : : }
377 : : }
378 : : else
379 : : {
380 : : /* Non-parallel operation: just kill the leader DB connection */
2881 tgl@sss.pgh.pa.us 381 [ + - ]:CBC 172 : if (si->AHX)
382 : 172 : DisconnectDatabase(si->AHX);
383 : : }
4039 andrew@dunslane.net 384 : 172 : }
385 : :
386 : : /*
387 : : * Forcibly shut down any remaining workers, waiting for them to finish.
388 : : *
389 : : * Note that we don't expect to come here during normal exit (the workers
390 : : * should be long gone, and the ParallelState too). We're only here in a
391 : : * pg_fatal() situation, so intervening to cancel active commands is
392 : : * appropriate.
393 : : */
394 : : static void
4039 andrew@dunslane.net 395 :UBC 0 : ShutdownWorkersHard(ParallelState *pstate)
396 : : {
397 : : int i;
398 : :
399 : : /*
400 : : * Close our write end of the sockets so that any workers waiting for
401 : : * commands know they can exit. (Note: some of the pipeWrite fields might
402 : : * still be zero, if we failed to initialize all the workers. Hence, just
403 : : * ignore errors here.)
404 : : */
405 [ # # ]: 0 : for (i = 0; i < pstate->numWorkers; i++)
406 : 0 : closesocket(pstate->parallelSlot[i].pipeWrite);
407 : :
408 : : /*
409 : : * Force early termination of any commands currently in progress.
410 : : */
411 : : #ifndef WIN32
412 : : /* On non-Windows, send SIGTERM to each worker process. */
413 [ # # ]: 0 : for (i = 0; i < pstate->numWorkers; i++)
414 : : {
2873 tgl@sss.pgh.pa.us 415 : 0 : pid_t pid = pstate->parallelSlot[i].pid;
416 : :
417 [ # # ]: 0 : if (pid != 0)
418 : 0 : kill(pid, SIGTERM);
419 : : }
420 : : #else
421 : :
422 : : /*
423 : : * On Windows, send query cancels directly to the workers' backends. Use
424 : : * a critical section to ensure worker threads don't change state.
425 : : */
426 : : EnterCriticalSection(&signal_info_lock);
427 : : for (i = 0; i < pstate->numWorkers; i++)
428 : : {
429 : : ArchiveHandle *AH = pstate->parallelSlot[i].AH;
430 : : char errbuf[1];
431 : :
432 : : if (AH != NULL && AH->connCancel != NULL)
433 : : (void) PQcancel(AH->connCancel, errbuf, sizeof(errbuf));
434 : : }
435 : : LeaveCriticalSection(&signal_info_lock);
436 : : #endif
437 : :
438 : : /* Now wait for them to terminate. */
4039 andrew@dunslane.net 439 : 0 : WaitForTerminatingWorkers(pstate);
440 : 0 : }
441 : :
442 : : /*
443 : : * Wait for all workers to terminate.
444 : : */
445 : : static void
4039 andrew@dunslane.net 446 :CBC 13 : WaitForTerminatingWorkers(ParallelState *pstate)
447 : : {
448 [ + + ]: 41 : while (!HasEveryWorkerTerminated(pstate))
449 : : {
450 : 28 : ParallelSlot *slot = NULL;
451 : : int j;
452 : :
453 : : #ifndef WIN32
454 : : /* On non-Windows, use wait() to wait for next worker to end */
455 : : int status;
456 : 28 : pid_t pid = wait(&status);
457 : :
458 : : /* Find dead worker's slot, and clear the PID field */
459 [ + - ]: 45 : for (j = 0; j < pstate->numWorkers; j++)
460 : : {
2879 tgl@sss.pgh.pa.us 461 : 45 : slot = &(pstate->parallelSlot[j]);
462 [ + + ]: 45 : if (slot->pid == pid)
463 : : {
464 : 28 : slot->pid = 0;
465 : 28 : break;
466 : : }
467 : : }
468 : : #else /* WIN32 */
469 : : /* On Windows, we must use WaitForMultipleObjects() */
470 : : HANDLE *lpHandles = pg_malloc(sizeof(HANDLE) * pstate->numWorkers);
471 : : int nrun = 0;
472 : : DWORD ret;
473 : : uintptr_t hThread;
474 : :
475 : : for (j = 0; j < pstate->numWorkers; j++)
476 : : {
477 : : if (WORKER_IS_RUNNING(pstate->parallelSlot[j].workerStatus))
478 : : {
479 : : lpHandles[nrun] = (HANDLE) pstate->parallelSlot[j].hThread;
480 : : nrun++;
481 : : }
482 : : }
483 : : ret = WaitForMultipleObjects(nrun, lpHandles, false, INFINITE);
484 : : Assert(ret != WAIT_FAILED);
485 : : hThread = (uintptr_t) lpHandles[ret - WAIT_OBJECT_0];
486 : : free(lpHandles);
487 : :
488 : : /* Find dead worker's slot, and clear the hThread field */
489 : : for (j = 0; j < pstate->numWorkers; j++)
490 : : {
491 : : slot = &(pstate->parallelSlot[j]);
492 : : if (slot->hThread == hThread)
493 : : {
494 : : /* For cleanliness, close handles for dead threads */
495 : : CloseHandle((HANDLE) slot->hThread);
496 : : slot->hThread = (uintptr_t) INVALID_HANDLE_VALUE;
497 : : break;
498 : : }
499 : : }
500 : : #endif /* WIN32 */
501 : :
502 : : /* On all platforms, update workerStatus and te[] as well */
503 [ - + ]: 28 : Assert(j < pstate->numWorkers);
4039 andrew@dunslane.net 504 : 28 : slot->workerStatus = WRKR_TERMINATED;
2756 tgl@sss.pgh.pa.us 505 : 28 : pstate->te[j] = NULL;
506 : : }
4039 andrew@dunslane.net 507 : 13 : }
508 : :
509 : :
510 : : /*
511 : : * Code for responding to cancel interrupts (SIGINT, control-C, etc)
512 : : *
513 : : * This doesn't quite belong in this module, but it needs access to the
514 : : * ParallelState data, so there's not really a better place either.
515 : : *
516 : : * When we get a cancel interrupt, we could just die, but in pg_restore that
517 : : * could leave a SQL command (e.g., CREATE INDEX on a large table) running
518 : : * for a long time. Instead, we try to send a cancel request and then die.
519 : : * pg_dump probably doesn't really need this, but we might as well use it
520 : : * there too. Note that sending the cancel directly from the signal handler
521 : : * is safe because PQcancel() is written to make it so.
522 : : *
523 : : * In parallel operation on Unix, each process is responsible for canceling
524 : : * its own connection (this must be so because nobody else has access to it).
525 : : * Furthermore, the leader process should attempt to forward its signal to
526 : : * each child. In simple manual use of pg_dump/pg_restore, forwarding isn't
527 : : * needed because typing control-C at the console would deliver SIGINT to
528 : : * every member of the terminal process group --- but in other scenarios it
529 : : * might be that only the leader gets signaled.
530 : : *
531 : : * On Windows, the cancel handler runs in a separate thread, because that's
532 : : * how SetConsoleCtrlHandler works. We make it stop worker threads, send
533 : : * cancels on all active connections, and then return FALSE, which will allow
534 : : * the process to die. For safety's sake, we use a critical section to
535 : : * protect the PGcancel structures against being changed while the signal
536 : : * thread runs.
537 : : */
538 : :
539 : : #ifndef WIN32
540 : :
541 : : /*
542 : : * Signal handler (Unix only)
543 : : */
544 : : static void
2878 tgl@sss.pgh.pa.us 545 :UBC 0 : sigTermHandler(SIGNAL_ARGS)
546 : : {
547 : : int i;
548 : : char errbuf[1];
549 : :
550 : : /*
551 : : * Some platforms allow delivery of new signals to interrupt an active
552 : : * signal handler. That could muck up our attempt to send PQcancel, so
553 : : * disable the signals that set_cancel_handler enabled.
554 : : */
2873 555 : 0 : pqsignal(SIGINT, SIG_IGN);
556 : 0 : pqsignal(SIGTERM, SIG_IGN);
557 : 0 : pqsignal(SIGQUIT, SIG_IGN);
558 : :
559 : : /*
560 : : * If we're in the leader, forward signal to all workers. (It seems best
561 : : * to do this before PQcancel; killing the leader transaction will result
562 : : * in invalid-snapshot errors from active workers, which maybe we can
563 : : * quiet by killing workers first.) Ignore any errors.
564 : : */
565 [ # # ]: 0 : if (signal_info.pstate != NULL)
566 : : {
567 [ # # ]: 0 : for (i = 0; i < signal_info.pstate->numWorkers; i++)
568 : : {
569 : 0 : pid_t pid = signal_info.pstate->parallelSlot[i].pid;
570 : :
571 [ # # ]: 0 : if (pid != 0)
572 : 0 : kill(pid, SIGTERM);
573 : : }
574 : : }
575 : :
576 : : /*
577 : : * Send QueryCancel if we have a connection to send to. Ignore errors,
578 : : * there's not much we can do about them anyway.
579 : : */
580 [ # # # # ]: 0 : if (signal_info.myAH != NULL && signal_info.myAH->connCancel != NULL)
581 : 0 : (void) PQcancel(signal_info.myAH->connCancel, errbuf, sizeof(errbuf));
582 : :
583 : : /*
584 : : * Report we're quitting, using nothing more complicated than write(2).
585 : : * When in parallel operation, only the leader process should do this.
586 : : */
587 [ # # ]: 0 : if (!signal_info.am_worker)
588 : : {
589 [ # # ]: 0 : if (progname)
590 : : {
591 : 0 : write_stderr(progname);
592 : 0 : write_stderr(": ");
593 : : }
594 : 0 : write_stderr("terminated by user\n");
595 : : }
596 : :
597 : : /*
598 : : * And die, using _exit() not exit() because the latter will invoke atexit
599 : : * handlers that can fail if we interrupted related code.
600 : : */
1546 601 : 0 : _exit(1);
602 : : }
603 : :
604 : : /*
605 : : * Enable cancel interrupt handler, if not already done.
606 : : */
607 : : static void
67 dgustafsson@postgres 608 :GNC 481 : set_cancel_handler(void)
609 : : {
610 : : /*
611 : : * When forking, signal_info.handler_set will propagate into the new
612 : : * process, but that's fine because the signal handler state does too.
613 : : */
2873 tgl@sss.pgh.pa.us 614 [ + + ]:CBC 481 : if (!signal_info.handler_set)
615 : : {
616 : 186 : signal_info.handler_set = true;
617 : :
618 : 186 : pqsignal(SIGINT, sigTermHandler);
619 : 186 : pqsignal(SIGTERM, sigTermHandler);
620 : 186 : pqsignal(SIGQUIT, sigTermHandler);
621 : : }
622 : 481 : }
623 : :
624 : : #else /* WIN32 */
625 : :
626 : : /*
627 : : * Console interrupt handler --- runs in a newly-started thread.
628 : : *
629 : : * After stopping other threads and sending cancel requests on all open
630 : : * connections, we return FALSE which will allow the default ExitProcess()
631 : : * action to be taken.
632 : : */
633 : : static BOOL WINAPI
634 : : consoleHandler(DWORD dwCtrlType)
635 : : {
636 : : int i;
637 : : char errbuf[1];
638 : :
639 : : if (dwCtrlType == CTRL_C_EVENT ||
640 : : dwCtrlType == CTRL_BREAK_EVENT)
641 : : {
642 : : /* Critical section prevents changing data we look at here */
643 : : EnterCriticalSection(&signal_info_lock);
644 : :
645 : : /*
646 : : * If in parallel mode, stop worker threads and send QueryCancel to
647 : : * their connected backends. The main point of stopping the worker
648 : : * threads is to keep them from reporting the query cancels as errors,
649 : : * which would clutter the user's screen. We needn't stop the leader
650 : : * thread since it won't be doing much anyway. Do this before
651 : : * canceling the main transaction, else we might get invalid-snapshot
652 : : * errors reported before we can stop the workers. Ignore errors,
653 : : * there's not much we can do about them anyway.
654 : : */
655 : : if (signal_info.pstate != NULL)
656 : : {
657 : : for (i = 0; i < signal_info.pstate->numWorkers; i++)
658 : : {
659 : : ParallelSlot *slot = &(signal_info.pstate->parallelSlot[i]);
660 : : ArchiveHandle *AH = slot->AH;
661 : : HANDLE hThread = (HANDLE) slot->hThread;
662 : :
663 : : /*
664 : : * Using TerminateThread here may leave some resources leaked,
665 : : * but it doesn't matter since we're about to end the whole
666 : : * process.
667 : : */
668 : : if (hThread != INVALID_HANDLE_VALUE)
669 : : TerminateThread(hThread, 0);
670 : :
671 : : if (AH != NULL && AH->connCancel != NULL)
672 : : (void) PQcancel(AH->connCancel, errbuf, sizeof(errbuf));
673 : : }
674 : : }
675 : :
676 : : /*
677 : : * Send QueryCancel to leader connection, if enabled. Ignore errors,
678 : : * there's not much we can do about them anyway.
679 : : */
680 : : if (signal_info.myAH != NULL && signal_info.myAH->connCancel != NULL)
681 : : (void) PQcancel(signal_info.myAH->connCancel,
682 : : errbuf, sizeof(errbuf));
683 : :
684 : : LeaveCriticalSection(&signal_info_lock);
685 : :
686 : : /*
687 : : * Report we're quitting, using nothing more complicated than
688 : : * write(2). (We might be able to get away with using pg_log_*()
689 : : * here, but since we terminated other threads uncleanly above, it
690 : : * seems better to assume as little as possible.)
691 : : */
692 : : if (progname)
693 : : {
694 : : write_stderr(progname);
695 : : write_stderr(": ");
696 : : }
697 : : write_stderr("terminated by user\n");
698 : : }
699 : :
700 : : /* Always return FALSE to allow signal handling to continue */
701 : : return FALSE;
702 : : }
703 : :
704 : : /*
705 : : * Enable cancel interrupt handler, if not already done.
706 : : */
707 : : static void
708 : : set_cancel_handler(void)
709 : : {
710 : : if (!signal_info.handler_set)
711 : : {
712 : : signal_info.handler_set = true;
713 : :
714 : : InitializeCriticalSection(&signal_info_lock);
715 : :
716 : : SetConsoleCtrlHandler(consoleHandler, TRUE);
717 : : }
718 : : }
719 : :
720 : : #endif /* WIN32 */
721 : :
722 : :
723 : : /*
724 : : * set_archive_cancel_info
725 : : *
726 : : * Fill AH->connCancel with cancellation info for the specified database
727 : : * connection; or clear it if conn is NULL.
728 : : */
729 : : void
730 : 481 : set_archive_cancel_info(ArchiveHandle *AH, PGconn *conn)
731 : : {
732 : : PGcancel *oldConnCancel;
733 : :
734 : : /*
735 : : * Activate the interrupt handler if we didn't yet in this process. On
736 : : * Windows, this also initializes signal_info_lock; therefore it's
737 : : * important that this happen at least once before we fork off any
738 : : * threads.
739 : : */
67 dgustafsson@postgres 740 :GNC 481 : set_cancel_handler();
741 : :
742 : : /*
743 : : * On Unix, we assume that storing a pointer value is atomic with respect
744 : : * to any possible signal interrupt. On Windows, use a critical section.
745 : : */
746 : :
747 : : #ifdef WIN32
748 : : EnterCriticalSection(&signal_info_lock);
749 : : #endif
750 : :
751 : : /* Free the old one if we have one */
2873 tgl@sss.pgh.pa.us 752 :CBC 481 : oldConnCancel = AH->connCancel;
753 : : /* be sure interrupt handler doesn't use pointer while freeing */
754 : 481 : AH->connCancel = NULL;
755 : :
756 [ + + ]: 481 : if (oldConnCancel != NULL)
757 : 246 : PQfreeCancel(oldConnCancel);
758 : :
759 : : /* Set the new one if specified */
760 [ + + ]: 481 : if (conn)
761 : 248 : AH->connCancel = PQgetCancel(conn);
762 : :
763 : : /*
764 : : * On Unix, there's only ever one active ArchiveHandle per process, so we
765 : : * can just set signal_info.myAH unconditionally. On Windows, do that
766 : : * only in the main thread; worker threads have to make sure their
767 : : * ArchiveHandle appears in the pstate data, which is dealt with in
768 : : * RunWorker().
769 : : */
770 : : #ifndef WIN32
771 : 481 : signal_info.myAH = AH;
772 : : #else
773 : : if (mainThreadId == GetCurrentThreadId())
774 : : signal_info.myAH = AH;
775 : : #endif
776 : :
777 : : #ifdef WIN32
778 : : LeaveCriticalSection(&signal_info_lock);
779 : : #endif
780 : 481 : }
781 : :
782 : : /*
783 : : * set_cancel_pstate
784 : : *
785 : : * Set signal_info.pstate to point to the specified ParallelState, if any.
786 : : * We need this mainly to have an interlock against Windows signal thread.
787 : : */
788 : : static void
789 : 26 : set_cancel_pstate(ParallelState *pstate)
790 : : {
791 : : #ifdef WIN32
792 : : EnterCriticalSection(&signal_info_lock);
793 : : #endif
794 : :
795 : 26 : signal_info.pstate = pstate;
796 : :
797 : : #ifdef WIN32
798 : : LeaveCriticalSection(&signal_info_lock);
799 : : #endif
800 : 26 : }
801 : :
802 : : /*
803 : : * set_cancel_slot_archive
804 : : *
805 : : * Set ParallelSlot's AH field to point to the specified archive, if any.
806 : : * We need this mainly to have an interlock against Windows signal thread.
807 : : */
808 : : static void
809 : 56 : set_cancel_slot_archive(ParallelSlot *slot, ArchiveHandle *AH)
810 : : {
811 : : #ifdef WIN32
812 : : EnterCriticalSection(&signal_info_lock);
813 : : #endif
814 : :
2756 815 : 56 : slot->AH = AH;
816 : :
817 : : #ifdef WIN32
818 : : LeaveCriticalSection(&signal_info_lock);
819 : : #endif
2873 820 : 56 : }
821 : :
822 : :
823 : : /*
824 : : * This function is called by both Unix and Windows variants to set up
825 : : * and run a worker process. Caller should exit the process (or thread)
826 : : * upon return.
827 : : */
828 : : static void
829 : 28 : RunWorker(ArchiveHandle *AH, ParallelSlot *slot)
830 : : {
831 : : int pipefd[2];
832 : :
833 : : /* fetch child ends of pipes */
834 : 28 : pipefd[PIPE_READ] = slot->pipeRevRead;
835 : 28 : pipefd[PIPE_WRITE] = slot->pipeRevWrite;
836 : :
837 : : /*
838 : : * Clone the archive so that we have our own state to work with, and in
839 : : * particular our own database connection.
840 : : *
841 : : * We clone on Unix as well as Windows, even though technically we don't
842 : : * need to because fork() gives us a copy in our own address space
843 : : * already. But CloneArchive resets the state information and also clones
844 : : * the database connection which both seem kinda helpful.
845 : : */
846 : 28 : AH = CloneArchive(AH);
847 : :
848 : : /* Remember cloned archive where signal handler can find it */
849 : 28 : set_cancel_slot_archive(slot, AH);
850 : :
851 : : /*
852 : : * Call the setup worker function that's defined in the ArchiveHandle.
853 : : */
3014 854 : 28 : (AH->SetupWorkerPtr) ((Archive *) AH);
855 : :
856 : : /*
857 : : * Execute commands until done.
858 : : */
859 : 28 : WaitForCommands(AH, pipefd);
860 : :
861 : : /*
862 : : * Disconnect from database and clean up.
863 : : */
2873 864 : 28 : set_cancel_slot_archive(slot, NULL);
865 : 28 : DisconnectDatabase(&(AH->public));
866 : 28 : DeCloneArchive(AH);
4039 andrew@dunslane.net 867 : 28 : }
868 : :
869 : : /*
870 : : * Thread base function for Windows
871 : : */
872 : : #ifdef WIN32
873 : : static unsigned __stdcall
874 : : init_spawned_worker_win32(WorkerInfo *wi)
875 : : {
876 : : ArchiveHandle *AH = wi->AH;
877 : : ParallelSlot *slot = wi->slot;
878 : :
879 : : /* Don't need WorkerInfo anymore */
880 : : free(wi);
881 : :
882 : : /* Run the worker ... */
883 : : RunWorker(AH, slot);
884 : :
885 : : /* Exit the thread */
886 : : _endthreadex(0);
887 : : return 0;
888 : : }
889 : : #endif /* WIN32 */
890 : :
891 : : /*
892 : : * This function starts a parallel dump or restore by spawning off the worker
893 : : * processes. For Windows, it creates a number of threads; on Unix the
894 : : * workers are created with fork().
895 : : */
896 : : ParallelState *
3014 tgl@sss.pgh.pa.us 897 : 15 : ParallelBackupStart(ArchiveHandle *AH)
898 : : {
899 : : ParallelState *pstate;
900 : : int i;
901 : :
4039 andrew@dunslane.net 902 [ - + ]: 15 : Assert(AH->public.numWorkers > 0);
903 : :
904 : 15 : pstate = (ParallelState *) pg_malloc(sizeof(ParallelState));
905 : :
906 : 15 : pstate->numWorkers = AH->public.numWorkers;
2756 tgl@sss.pgh.pa.us 907 : 15 : pstate->te = NULL;
4039 andrew@dunslane.net 908 : 15 : pstate->parallelSlot = NULL;
909 : :
910 [ + + ]: 15 : if (AH->public.numWorkers == 1)
911 : 2 : return pstate;
912 : :
913 : : /* Create status arrays, being sure to initialize all fields to 0 */
2756 tgl@sss.pgh.pa.us 914 : 13 : pstate->te = (TocEntry **)
915 : 13 : pg_malloc0(pstate->numWorkers * sizeof(TocEntry *));
916 : 13 : pstate->parallelSlot = (ParallelSlot *)
917 : 13 : pg_malloc0(pstate->numWorkers * sizeof(ParallelSlot));
918 : :
919 : : #ifdef WIN32
920 : : /* Make fmtId() and fmtQualifiedId() use thread-local storage */
921 : : getLocalPQExpBuffer = getThreadLocalPQExpBuffer;
922 : : #endif
923 : :
924 : : /*
925 : : * Set the pstate in shutdown_info, to tell the exit handler that it must
926 : : * clean up workers as well as the main database connection. But we don't
927 : : * set this in signal_info yet, because we don't want child processes to
928 : : * inherit non-NULL signal_info.pstate.
929 : : */
2873 930 : 13 : shutdown_info.pstate = pstate;
931 : :
932 : : /*
933 : : * Temporarily disable query cancellation on the leader connection. This
934 : : * ensures that child processes won't inherit valid AH->connCancel
935 : : * settings and thus won't try to issue cancels against the leader's
936 : : * connection. No harm is done if we fail while it's disabled, because
937 : : * the leader connection is idle at this point anyway.
938 : : */
939 : 13 : set_archive_cancel_info(AH, NULL);
940 : :
941 : : /* Ensure stdio state is quiesced before forking */
942 : 13 : fflush(NULL);
943 : :
944 : : /* Create desired number of workers */
4039 andrew@dunslane.net 945 [ + + ]: 41 : for (i = 0; i < pstate->numWorkers; i++)
946 : : {
947 : : #ifdef WIN32
948 : : WorkerInfo *wi;
949 : : uintptr_t handle;
950 : : #else
951 : : pid_t pid;
952 : : #endif
2873 tgl@sss.pgh.pa.us 953 : 28 : ParallelSlot *slot = &(pstate->parallelSlot[i]);
954 : : int pipeMW[2],
955 : : pipeWM[2];
956 : :
957 : : /* Create communication pipes for this worker */
4039 andrew@dunslane.net 958 [ + - - + ]: 28 : if (pgpipe(pipeMW) < 0 || pgpipe(pipeWM) < 0)
737 tgl@sss.pgh.pa.us 959 :UBC 0 : pg_fatal("could not create communication channels: %m");
960 : :
961 : : /* leader's ends of the pipes */
2873 tgl@sss.pgh.pa.us 962 :CBC 28 : slot->pipeRead = pipeWM[PIPE_READ];
963 : 28 : slot->pipeWrite = pipeMW[PIPE_WRITE];
964 : : /* child's ends of the pipes */
965 : 28 : slot->pipeRevRead = pipeMW[PIPE_READ];
966 : 28 : slot->pipeRevWrite = pipeWM[PIPE_WRITE];
967 : :
968 : : #ifdef WIN32
969 : : /* Create transient structure to pass args to worker function */
970 : : wi = (WorkerInfo *) pg_malloc(sizeof(WorkerInfo));
971 : :
972 : : wi->AH = AH;
973 : : wi->slot = slot;
974 : :
975 : : handle = _beginthreadex(NULL, 0, (void *) &init_spawned_worker_win32,
976 : : wi, 0, &(slot->threadId));
977 : : slot->hThread = handle;
978 : : slot->workerStatus = WRKR_IDLE;
979 : : #else /* !WIN32 */
4039 andrew@dunslane.net 980 : 28 : pid = fork();
981 [ + + ]: 56 : if (pid == 0)
982 : : {
983 : : /* we are the worker */
984 : : int j;
985 : :
986 : : /* this is needed for GetMyPSlot() */
2873 tgl@sss.pgh.pa.us 987 : 28 : slot->pid = getpid();
988 : :
989 : : /* instruct signal handler that we're in a worker now */
990 : 28 : signal_info.am_worker = true;
991 : :
992 : : /* close read end of Worker -> Leader */
4039 andrew@dunslane.net 993 : 28 : closesocket(pipeWM[PIPE_READ]);
994 : : /* close write end of Leader -> Worker */
995 : 28 : closesocket(pipeMW[PIPE_WRITE]);
996 : :
997 : : /*
998 : : * Close all inherited fds for communication of the leader with
999 : : * previously-forked workers.
1000 : : */
1001 [ + + ]: 45 : for (j = 0; j < i; j++)
1002 : : {
1003 : 17 : closesocket(pstate->parallelSlot[j].pipeRead);
1004 : 17 : closesocket(pstate->parallelSlot[j].pipeWrite);
1005 : : }
1006 : :
1007 : : /* Run the worker ... */
2873 tgl@sss.pgh.pa.us 1008 : 28 : RunWorker(AH, slot);
1009 : :
1010 : : /* We can just exit(0) when done */
4039 andrew@dunslane.net 1011 : 28 : exit(0);
1012 : : }
1013 [ - + ]: 28 : else if (pid < 0)
1014 : : {
1015 : : /* fork failed */
737 tgl@sss.pgh.pa.us 1016 :UBC 0 : pg_fatal("could not create worker process: %m");
1017 : : }
1018 : :
1019 : : /* In Leader after successful fork */
2873 tgl@sss.pgh.pa.us 1020 :CBC 28 : slot->pid = pid;
1535 1021 : 28 : slot->workerStatus = WRKR_IDLE;
1022 : :
1023 : : /* close read end of Leader -> Worker */
4039 andrew@dunslane.net 1024 : 28 : closesocket(pipeMW[PIPE_READ]);
1025 : : /* close write end of Worker -> Leader */
1026 : 28 : closesocket(pipeWM[PIPE_WRITE]);
1027 : : #endif /* WIN32 */
1028 : : }
1029 : :
1030 : : /*
1031 : : * Having forked off the workers, disable SIGPIPE so that leader isn't
1032 : : * killed if it tries to send a command to a dead worker. We don't want
1033 : : * the workers to inherit this setting, though.
1034 : : */
1035 : : #ifndef WIN32
2873 tgl@sss.pgh.pa.us 1036 : 13 : pqsignal(SIGPIPE, SIG_IGN);
1037 : : #endif
1038 : :
1039 : : /*
1040 : : * Re-establish query cancellation on the leader connection.
1041 : : */
1042 : 13 : set_archive_cancel_info(AH, AH->connection);
1043 : :
1044 : : /*
1045 : : * Tell the cancel signal handler to forward signals to worker processes,
1046 : : * too. (As with query cancel, we did not need this earlier because the
1047 : : * workers have not yet been given anything to do; if we die before this
1048 : : * point, any already-started workers will see EOF and quit promptly.)
1049 : : */
1050 : 13 : set_cancel_pstate(pstate);
1051 : :
4039 andrew@dunslane.net 1052 : 13 : return pstate;
1053 : : }
1054 : :
1055 : : /*
1056 : : * Close down a parallel dump or restore.
1057 : : */
1058 : : void
1059 : 15 : ParallelBackupEnd(ArchiveHandle *AH, ParallelState *pstate)
1060 : : {
1061 : : int i;
1062 : :
1063 : : /* No work if non-parallel */
1064 [ + + ]: 15 : if (pstate->numWorkers == 1)
1065 : 2 : return;
1066 : :
1067 : : /* There should not be any unfinished jobs */
1068 [ - + ]: 13 : Assert(IsEveryWorkerIdle(pstate));
1069 : :
1070 : : /* Close the sockets so that the workers know they can exit */
1071 [ + + ]: 41 : for (i = 0; i < pstate->numWorkers; i++)
1072 : : {
1073 : 28 : closesocket(pstate->parallelSlot[i].pipeRead);
1074 : 28 : closesocket(pstate->parallelSlot[i].pipeWrite);
1075 : : }
1076 : :
1077 : : /* Wait for them to exit */
1078 : 13 : WaitForTerminatingWorkers(pstate);
1079 : :
1080 : : /*
1081 : : * Unlink pstate from shutdown_info, so the exit handler will not try to
1082 : : * use it; and likewise unlink from signal_info.
1083 : : */
1084 : 13 : shutdown_info.pstate = NULL;
2873 tgl@sss.pgh.pa.us 1085 : 13 : set_cancel_pstate(NULL);
1086 : :
1087 : : /* Release state (mere neatnik-ism, since we're about to terminate) */
2756 1088 : 13 : free(pstate->te);
4039 andrew@dunslane.net 1089 : 13 : free(pstate->parallelSlot);
1090 : 13 : free(pstate);
1091 : : }
1092 : :
1093 : : /*
1094 : : * These next four functions handle construction and parsing of the command
1095 : : * strings and response strings for parallel workers.
1096 : : *
1097 : : * Currently, these can be the same regardless of which archive format we are
1098 : : * processing. In future, we might want to let format modules override these
1099 : : * functions to add format-specific data to a command or response.
1100 : : */
1101 : :
1102 : : /*
1103 : : * buildWorkerCommand: format a command string to send to a worker.
1104 : : *
1105 : : * The string is built in the caller-supplied buffer of size buflen.
1106 : : */
1107 : : static void
2756 tgl@sss.pgh.pa.us 1108 : 196 : buildWorkerCommand(ArchiveHandle *AH, TocEntry *te, T_Action act,
1109 : : char *buf, int buflen)
1110 : : {
1111 [ + + ]: 196 : if (act == ACT_DUMP)
1112 : 150 : snprintf(buf, buflen, "DUMP %d", te->dumpId);
1113 [ + - ]: 46 : else if (act == ACT_RESTORE)
1114 : 46 : snprintf(buf, buflen, "RESTORE %d", te->dumpId);
1115 : : else
2756 tgl@sss.pgh.pa.us 1116 :UBC 0 : Assert(false);
2756 tgl@sss.pgh.pa.us 1117 :CBC 196 : }
1118 : :
1119 : : /*
1120 : : * parseWorkerCommand: interpret a command string in a worker.
1121 : : */
1122 : : static void
1123 : 196 : parseWorkerCommand(ArchiveHandle *AH, TocEntry **te, T_Action *act,
1124 : : const char *msg)
1125 : : {
1126 : : DumpId dumpId;
1127 : : int nBytes;
1128 : :
1129 [ + + ]: 196 : if (messageStartsWith(msg, "DUMP "))
1130 : : {
1131 : 150 : *act = ACT_DUMP;
1132 : 150 : sscanf(msg, "DUMP %d%n", &dumpId, &nBytes);
1133 [ - + ]: 150 : Assert(nBytes == strlen(msg));
1134 : 150 : *te = getTocEntryByDumpId(AH, dumpId);
1135 [ - + ]: 150 : Assert(*te != NULL);
1136 : : }
1137 [ + - ]: 46 : else if (messageStartsWith(msg, "RESTORE "))
1138 : : {
1139 : 46 : *act = ACT_RESTORE;
1140 : 46 : sscanf(msg, "RESTORE %d%n", &dumpId, &nBytes);
1141 [ - + ]: 46 : Assert(nBytes == strlen(msg));
1142 : 46 : *te = getTocEntryByDumpId(AH, dumpId);
1143 [ - + ]: 46 : Assert(*te != NULL);
1144 : : }
1145 : : else
737 tgl@sss.pgh.pa.us 1146 :UBC 0 : pg_fatal("unrecognized command received from leader: \"%s\"",
1147 : : msg);
2756 tgl@sss.pgh.pa.us 1148 :CBC 196 : }
1149 : :
1150 : : /*
1151 : : * buildWorkerResponse: format a response string to send to the leader.
1152 : : *
1153 : : * The string is built in the caller-supplied buffer of size buflen.
1154 : : */
1155 : : static void
1156 : 196 : buildWorkerResponse(ArchiveHandle *AH, TocEntry *te, T_Action act, int status,
1157 : : char *buf, int buflen)
1158 : : {
1159 [ - + ]: 196 : snprintf(buf, buflen, "OK %d %d %d",
1160 : : te->dumpId,
1161 : : status,
1162 : : status == WORKER_IGNORED_ERRORS ? AH->public.n_errors : 0);
1163 : 196 : }
1164 : :
1165 : : /*
1166 : : * parseWorkerResponse: parse the status message returned by a worker.
1167 : : *
1168 : : * Returns the integer status code, and may update fields of AH and/or te.
1169 : : */
1170 : : static int
1171 : 196 : parseWorkerResponse(ArchiveHandle *AH, TocEntry *te,
1172 : : const char *msg)
1173 : : {
1174 : : DumpId dumpId;
1175 : : int nBytes,
1176 : : n_errors;
1177 : 196 : int status = 0;
1178 : :
1179 [ + - ]: 196 : if (messageStartsWith(msg, "OK "))
1180 : : {
1181 : 196 : sscanf(msg, "OK %d %d %d%n", &dumpId, &status, &n_errors, &nBytes);
1182 : :
1183 [ - + ]: 196 : Assert(dumpId == te->dumpId);
1184 [ - + ]: 196 : Assert(nBytes == strlen(msg));
1185 : :
1186 : 196 : AH->public.n_errors += n_errors;
1187 : : }
1188 : : else
737 tgl@sss.pgh.pa.us 1189 :UBC 0 : pg_fatal("invalid message received from worker: \"%s\"",
1190 : : msg);
1191 : :
2756 tgl@sss.pgh.pa.us 1192 :CBC 196 : return status;
1193 : : }
1194 : :
1195 : : /*
1196 : : * Dispatch a job to some free worker.
1197 : : *
1198 : : * te is the TocEntry to be processed, act is the action to be taken on it.
1199 : : * callback is the function to call on completion of the job.
1200 : : *
1201 : : * If no worker is currently available, this will block, and previously
1202 : : * registered callback functions may be called.
1203 : : */
1204 : : void
1205 : 196 : DispatchJobForTocEntry(ArchiveHandle *AH,
1206 : : ParallelState *pstate,
1207 : : TocEntry *te,
1208 : : T_Action act,
1209 : : ParallelCompletionPtr callback,
1210 : : void *callback_data)
1211 : : {
1212 : : int worker;
1213 : : char buf[256];
1214 : :
1215 : : /* Get a worker, waiting if none are idle */
1216 [ + + ]: 329 : while ((worker = GetIdleWorker(pstate)) == NO_SLOT)
1217 : 133 : WaitForWorkers(AH, pstate, WFW_ONE_IDLE);
1218 : :
1219 : : /* Construct and send command string */
1220 : 196 : buildWorkerCommand(AH, te, act, buf, sizeof(buf));
1221 : :
1222 : 196 : sendMessageToWorker(pstate, worker, buf);
1223 : :
1224 : : /* Remember worker is busy, and which TocEntry it's working on */
4039 andrew@dunslane.net 1225 : 196 : pstate->parallelSlot[worker].workerStatus = WRKR_WORKING;
2756 tgl@sss.pgh.pa.us 1226 : 196 : pstate->parallelSlot[worker].callback = callback;
1227 : 196 : pstate->parallelSlot[worker].callback_data = callback_data;
1228 : 196 : pstate->te[worker] = te;
4039 andrew@dunslane.net 1229 : 196 : }
1230 : :
1231 : : /*
1232 : : * Find an idle worker and return its slot number.
1233 : : * Return NO_SLOT if none are idle.
1234 : : */
1235 : : static int
1236 : 505 : GetIdleWorker(ParallelState *pstate)
1237 : : {
1238 : : int i;
1239 : :
1240 [ + + ]: 1255 : for (i = 0; i < pstate->numWorkers; i++)
1241 : : {
1242 [ + + ]: 958 : if (pstate->parallelSlot[i].workerStatus == WRKR_IDLE)
1243 : 208 : return i;
1244 : : }
1245 : 297 : return NO_SLOT;
1246 : : }
1247 : :
1248 : : /*
1249 : : * Return true iff no worker is running.
1250 : : */
1251 : : static bool
1252 : 41 : HasEveryWorkerTerminated(ParallelState *pstate)
1253 : : {
1254 : : int i;
1255 : :
1256 [ + + ]: 82 : for (i = 0; i < pstate->numWorkers; i++)
1257 : : {
1535 tgl@sss.pgh.pa.us 1258 [ + + - + ]: 69 : if (WORKER_IS_RUNNING(pstate->parallelSlot[i].workerStatus))
4039 andrew@dunslane.net 1259 : 28 : return false;
1260 : : }
1261 : 13 : return true;
1262 : : }
1263 : :
1264 : : /*
1265 : : * Return true iff every worker is in the WRKR_IDLE state.
1266 : : */
1267 : : bool
1268 : 74 : IsEveryWorkerIdle(ParallelState *pstate)
1269 : : {
1270 : : int i;
1271 : :
1272 [ + + ]: 159 : for (i = 0; i < pstate->numWorkers; i++)
1273 : : {
1274 [ + + ]: 125 : if (pstate->parallelSlot[i].workerStatus != WRKR_IDLE)
1275 : 40 : return false;
1276 : : }
1277 : 34 : return true;
1278 : : }
1279 : :
1280 : : /*
1281 : : * Acquire lock on a table to be dumped by a worker process.
1282 : : *
1283 : : * The leader process is already holding an ACCESS SHARE lock. Ordinarily
1284 : : * it's no problem for a worker to get one too, but if anything else besides
1285 : : * pg_dump is running, there's a possible deadlock:
1286 : : *
1287 : : * 1) Leader dumps the schema and locks all tables in ACCESS SHARE mode.
1288 : : * 2) Another process requests an ACCESS EXCLUSIVE lock (which is not granted
1289 : : * because the leader holds a conflicting ACCESS SHARE lock).
1290 : : * 3) A worker process also requests an ACCESS SHARE lock to read the table.
1291 : : * The worker is enqueued behind the ACCESS EXCLUSIVE lock request.
1292 : : * 4) Now we have a deadlock, since the leader is effectively waiting for
1293 : : * the worker. The server cannot detect that, however.
1294 : : *
1295 : : * To prevent an infinite wait, prior to touching a table in a worker, request
1296 : : * a lock in ACCESS SHARE mode but with NOWAIT. If we don't get the lock,
1297 : : * then we know that somebody else has requested an ACCESS EXCLUSIVE lock and
1298 : : * so we have a deadlock. We must fail the backup in that case.
1299 : : */
1300 : : static void
2878 tgl@sss.pgh.pa.us 1301 : 150 : lockTableForWorker(ArchiveHandle *AH, TocEntry *te)
1302 : : {
1303 : : const char *qualId;
1304 : : PQExpBuffer query;
1305 : : PGresult *res;
1306 : :
1307 : : /* Nothing to do for BLOBS */
1308 [ + + ]: 150 : if (strcmp(te->desc, "BLOBS") == 0)
1309 : 8 : return;
1310 : :
1311 : 142 : query = createPQExpBuffer();
1312 : :
2067 1313 : 142 : qualId = fmtQualifiedId(te->namespace, te->tag);
1314 : :
4039 andrew@dunslane.net 1315 : 142 : appendPQExpBuffer(query, "LOCK TABLE %s IN ACCESS SHARE MODE NOWAIT",
1316 : : qualId);
1317 : :
1318 : 142 : res = PQexec(AH->connection, query->data);
1319 : :
1320 [ + - - + ]: 142 : if (!res || PQresultStatus(res) != PGRES_COMMAND_OK)
737 tgl@sss.pgh.pa.us 1321 :UBC 0 : pg_fatal("could not obtain lock on relation \"%s\"\n"
1322 : : "This usually means that someone requested an ACCESS EXCLUSIVE lock "
1323 : : "on the table after the pg_dump parent process had gotten the "
1324 : : "initial ACCESS SHARE lock on the table.", qualId);
1325 : :
4039 andrew@dunslane.net 1326 :CBC 142 : PQclear(res);
1327 : 142 : destroyPQExpBuffer(query);
1328 : : }
1329 : :
1330 : : /*
1331 : : * WaitForCommands: main routine for a worker process.
1332 : : *
1333 : : * Read and execute commands from the leader until we see EOF on the pipe.
1334 : : */
1335 : : static void
3014 tgl@sss.pgh.pa.us 1336 : 28 : WaitForCommands(ArchiveHandle *AH, int pipefd[2])
1337 : : {
1338 : : char *command;
1339 : : TocEntry *te;
1340 : : T_Action act;
2756 1341 : 28 : int status = 0;
1342 : : char buf[256];
1343 : :
1344 : : for (;;)
1345 : : {
1400 andres@anarazel.de 1346 [ + + ]: 224 : if (!(command = getMessageFromLeader(pipefd)))
1347 : : {
1348 : : /* EOF, so done */
4039 andrew@dunslane.net 1349 : 28 : return;
1350 : : }
1351 : :
1352 : : /* Decode the command */
2756 tgl@sss.pgh.pa.us 1353 : 196 : parseWorkerCommand(AH, &te, &act, command);
1354 : :
1355 [ + + ]: 196 : if (act == ACT_DUMP)
1356 : : {
1357 : : /* Acquire lock on this table within the worker's session */
2878 1358 : 150 : lockTableForWorker(AH, te);
1359 : :
1360 : : /* Perform the dump command */
2756 1361 : 150 : status = (AH->WorkerJobDumpPtr) (AH, te);
1362 : : }
1363 [ + - ]: 46 : else if (act == ACT_RESTORE)
1364 : : {
1365 : : /* Perform the restore command */
1366 : 46 : status = (AH->WorkerJobRestorePtr) (AH, te);
1367 : : }
1368 : : else
2756 tgl@sss.pgh.pa.us 1369 :UBC 0 : Assert(false);
1370 : :
1371 : : /* Return status to leader */
2756 tgl@sss.pgh.pa.us 1372 :CBC 196 : buildWorkerResponse(AH, te, act, status, buf, sizeof(buf));
1373 : :
1400 andres@anarazel.de 1374 : 196 : sendMessageToLeader(pipefd, buf);
1375 : :
1376 : : /* command was pg_malloc'd and we are responsible for free()ing it. */
3927 sfrost@snowman.net 1377 : 196 : free(command);
1378 : : }
1379 : : }
1380 : :
1381 : : /*
1382 : : * Check for status messages from workers.
1383 : : *
1384 : : * If do_wait is true, wait to get a status message; otherwise, just return
1385 : : * immediately if there is none available.
1386 : : *
1387 : : * When we get a status message, we pass the status code to the callback
1388 : : * function that was specified to DispatchJobForTocEntry, then reset the
1389 : : * worker status to IDLE.
1390 : : *
1391 : : * Returns true if we collected a status message, else false.
1392 : : *
1393 : : * XXX is it worth checking for more than one status message per call?
1394 : : * It seems somewhat unlikely that multiple workers would finish at exactly
1395 : : * the same time.
1396 : : */
1397 : : static bool
4039 andrew@dunslane.net 1398 : 380 : ListenToWorkers(ArchiveHandle *AH, ParallelState *pstate, bool do_wait)
1399 : : {
1400 : : int worker;
1401 : : char *msg;
1402 : :
1403 : : /* Try to collect a status message */
1404 : 380 : msg = getMessageFromWorker(pstate, do_wait, &worker);
1405 : :
1406 [ + + ]: 380 : if (!msg)
1407 : : {
1408 : : /* If do_wait is true, we must have detected EOF on some socket */
1409 [ - + ]: 184 : if (do_wait)
737 tgl@sss.pgh.pa.us 1410 :UBC 0 : pg_fatal("a worker process died unexpectedly");
2756 tgl@sss.pgh.pa.us 1411 :CBC 184 : return false;
1412 : : }
1413 : :
1414 : : /* Process it and update our idea of the worker's status */
4039 andrew@dunslane.net 1415 [ + - ]: 196 : if (messageStartsWith(msg, "OK "))
1416 : : {
2756 tgl@sss.pgh.pa.us 1417 : 196 : ParallelSlot *slot = &pstate->parallelSlot[worker];
1418 : 196 : TocEntry *te = pstate->te[worker];
1419 : : int status;
1420 : :
1421 : 196 : status = parseWorkerResponse(AH, te, msg);
1422 : 196 : slot->callback(AH, te, status, slot->callback_data);
1423 : 196 : slot->workerStatus = WRKR_IDLE;
1424 : 196 : pstate->te[worker] = NULL;
1425 : : }
1426 : : else
737 tgl@sss.pgh.pa.us 1427 :UBC 0 : pg_fatal("invalid message received from worker: \"%s\"",
1428 : : msg);
1429 : :
1430 : : /* Free the string returned from getMessageFromWorker */
4039 andrew@dunslane.net 1431 :CBC 196 : free(msg);
1432 : :
2756 tgl@sss.pgh.pa.us 1433 : 196 : return true;
1434 : : }
1435 : :
1436 : : /*
1437 : : * Check for status results from workers, waiting if necessary.
1438 : : *
1439 : : * Available wait modes are:
1440 : : * WFW_NO_WAIT: reap any available status, but don't block
1441 : : * WFW_GOT_STATUS: wait for at least one more worker to finish
1442 : : * WFW_ONE_IDLE: wait for at least one worker to be idle
1443 : : * WFW_ALL_IDLE: wait for all workers to be idle
1444 : : *
1445 : : * Any received results are passed to the callback specified to
1446 : : * DispatchJobForTocEntry.
1447 : : *
1448 : : * This function is executed in the leader process.
1449 : : */
1450 : : void
1451 : 200 : WaitForWorkers(ArchiveHandle *AH, ParallelState *pstate, WFW_WaitOption mode)
1452 : : {
1453 : 200 : bool do_wait = false;
1454 : :
1455 : : /*
1456 : : * In GOT_STATUS mode, always block waiting for a message, since we can't
1457 : : * return till we get something. In other modes, we don't block the first
1458 : : * time through the loop.
1459 : : */
1460 [ + + ]: 200 : if (mode == WFW_GOT_STATUS)
1461 : : {
1462 : : /* Assert that caller knows what it's doing */
1463 [ - + ]: 12 : Assert(!IsEveryWorkerIdle(pstate));
1464 : 12 : do_wait = true;
1465 : : }
1466 : :
1467 : : for (;;)
1468 : : {
1469 : : /*
1470 : : * Check for status messages, even if we don't need to block. We do
1471 : : * not try very hard to reap all available messages, though, since
1472 : : * there's unlikely to be more than one.
1473 : : */
1474 [ + + ]: 380 : if (ListenToWorkers(AH, pstate, do_wait))
1475 : : {
1476 : : /*
1477 : : * If we got a message, we are done by definition for GOT_STATUS
1478 : : * mode, and we can also be certain that there's at least one idle
1479 : : * worker. So we're done in all but ALL_IDLE mode.
1480 : : */
1481 [ + + ]: 196 : if (mode != WFW_ALL_IDLE)
1482 : 179 : return;
1483 : : }
1484 : :
1485 : : /* Check whether we must wait for new status messages */
1486 [ - - + + : 201 : switch (mode)
- ]
1487 : : {
2756 tgl@sss.pgh.pa.us 1488 :UBC 0 : case WFW_NO_WAIT:
1489 : 0 : return; /* never wait */
1490 : 0 : case WFW_GOT_STATUS:
1491 : 0 : Assert(false); /* can't get here, because we waited */
1492 : : break;
2756 tgl@sss.pgh.pa.us 1493 :CBC 176 : case WFW_ONE_IDLE:
1494 [ + + ]: 176 : if (GetIdleWorker(pstate) != NO_SLOT)
1495 : 12 : return;
1496 : 164 : break;
1497 : 25 : case WFW_ALL_IDLE:
1498 [ + + ]: 25 : if (IsEveryWorkerIdle(pstate))
1499 : 9 : return;
1500 : 16 : break;
1501 : : }
1502 : :
1503 : : /* Loop back, and this time wait for something to happen */
1504 : 180 : do_wait = true;
1505 : : }
1506 : : }
1507 : :
1508 : : /*
1509 : : * Read one command message from the leader, blocking if necessary
1510 : : * until one is available, and return it as a malloc'd string.
1511 : : * On EOF, return NULL.
1512 : : *
1513 : : * This function is executed in worker processes.
1514 : : */
1515 : : static char *
1400 andres@anarazel.de 1516 : 224 : getMessageFromLeader(int pipefd[2])
1517 : : {
4039 andrew@dunslane.net 1518 : 224 : return readMessageFromPipe(pipefd[PIPE_READ]);
1519 : : }
1520 : :
1521 : : /*
1522 : : * Send a status message to the leader.
1523 : : *
1524 : : * This function is executed in worker processes.
1525 : : */
1526 : : static void
1400 andres@anarazel.de 1527 : 196 : sendMessageToLeader(int pipefd[2], const char *str)
1528 : : {
4039 andrew@dunslane.net 1529 : 196 : int len = strlen(str) + 1;
1530 : :
1531 [ - + ]: 196 : if (pipewrite(pipefd[PIPE_WRITE], str, len) != len)
737 tgl@sss.pgh.pa.us 1532 :UBC 0 : pg_fatal("could not write to the communication channel: %m");
4039 andrew@dunslane.net 1533 :CBC 196 : }
1534 : :
1535 : : /*
1536 : : * Wait until some descriptor in "workerset" becomes readable.
1537 : : * Returns -1 on error, else the number of readable descriptors.
1538 : : */
1539 : : static int
1540 : 192 : select_loop(int maxFd, fd_set *workerset)
1541 : : {
1542 : : int i;
1543 : 192 : fd_set saveSet = *workerset;
1544 : :
1545 : : for (;;)
1546 : : {
1547 : 192 : *workerset = saveSet;
1548 : 192 : i = select(maxFd + 1, workerset, NULL, NULL, NULL);
1549 : :
1550 : : #ifndef WIN32
1551 [ - + - - ]: 192 : if (i < 0 && errno == EINTR)
4039 andrew@dunslane.net 1552 :UBC 0 : continue;
1553 : : #else
1554 : : if (i == SOCKET_ERROR && WSAGetLastError() == WSAEINTR)
1555 : : continue;
1556 : : #endif
4039 andrew@dunslane.net 1557 :CBC 192 : break;
1558 : : }
1559 : :
1560 : 192 : return i;
1561 : : }
1562 : :
1563 : :
1564 : : /*
1565 : : * Check for messages from worker processes.
1566 : : *
1567 : : * If a message is available, return it as a malloc'd string, and put the
1568 : : * index of the sending worker in *worker.
1569 : : *
1570 : : * If nothing is available, wait if "do_wait" is true, else return NULL.
1571 : : *
1572 : : * If we detect EOF on any socket, we'll return NULL. It's not great that
1573 : : * that's hard to distinguish from the no-data-available case, but for now
1574 : : * our one caller is okay with that.
1575 : : *
1576 : : * This function is executed in the leader process.
1577 : : */
1578 : : static char *
1579 : 380 : getMessageFromWorker(ParallelState *pstate, bool do_wait, int *worker)
1580 : : {
1581 : : int i;
1582 : : fd_set workerset;
1583 : 380 : int maxFd = -1;
1584 : 380 : struct timeval nowait = {0, 0};
1585 : :
1586 : : /* construct bitmap of socket descriptors for select() */
1587 [ + + ]: 6460 : FD_ZERO(&workerset);
1588 [ + + ]: 1226 : for (i = 0; i < pstate->numWorkers; i++)
1589 : : {
1535 tgl@sss.pgh.pa.us 1590 [ + + - + ]: 846 : if (!WORKER_IS_RUNNING(pstate->parallelSlot[i].workerStatus))
4039 andrew@dunslane.net 1591 :UBC 0 : continue;
4039 andrew@dunslane.net 1592 :CBC 846 : FD_SET(pstate->parallelSlot[i].pipeRead, &workerset);
1593 [ + - ]: 846 : if (pstate->parallelSlot[i].pipeRead > maxFd)
1594 : 846 : maxFd = pstate->parallelSlot[i].pipeRead;
1595 : : }
1596 : :
1597 [ + + ]: 380 : if (do_wait)
1598 : : {
1599 : 192 : i = select_loop(maxFd, &workerset);
1600 [ - + ]: 192 : Assert(i != 0);
1601 : : }
1602 : : else
1603 : : {
1604 [ + + ]: 188 : if ((i = select(maxFd + 1, &workerset, NULL, NULL, &nowait)) == 0)
1605 : 184 : return NULL;
1606 : : }
1607 : :
1608 [ - + ]: 196 : if (i < 0)
737 tgl@sss.pgh.pa.us 1609 :UBC 0 : pg_fatal("%s() failed: %m", "select");
1610 : :
4039 andrew@dunslane.net 1611 [ + - ]:CBC 302 : for (i = 0; i < pstate->numWorkers; i++)
1612 : : {
1613 : : char *msg;
1614 : :
1535 tgl@sss.pgh.pa.us 1615 [ + + - + ]: 302 : if (!WORKER_IS_RUNNING(pstate->parallelSlot[i].workerStatus))
1535 tgl@sss.pgh.pa.us 1616 :UBC 0 : continue;
4039 andrew@dunslane.net 1617 [ + + ]:CBC 302 : if (!FD_ISSET(pstate->parallelSlot[i].pipeRead, &workerset))
1618 : 106 : continue;
1619 : :
1620 : : /*
1621 : : * Read the message if any. If the socket is ready because of EOF,
1622 : : * we'll return NULL instead (and the socket will stay ready, so the
1623 : : * condition will persist).
1624 : : *
1625 : : * Note: because this is a blocking read, we'll wait if only part of
1626 : : * the message is available. Waiting a long time would be bad, but
1627 : : * since worker status messages are short and are always sent in one
1628 : : * operation, it shouldn't be a problem in practice.
1629 : : */
1630 : 196 : msg = readMessageFromPipe(pstate->parallelSlot[i].pipeRead);
1631 : 196 : *worker = i;
1632 : 196 : return msg;
1633 : : }
4039 andrew@dunslane.net 1634 :UBC 0 : Assert(false);
1635 : : return NULL;
1636 : : }
1637 : :
1638 : : /*
1639 : : * Send a command message to the specified worker process.
1640 : : *
1641 : : * This function is executed in the leader process.
1642 : : */
1643 : : static void
4039 andrew@dunslane.net 1644 :CBC 196 : sendMessageToWorker(ParallelState *pstate, int worker, const char *str)
1645 : : {
1646 : 196 : int len = strlen(str) + 1;
1647 : :
1648 [ - + ]: 196 : if (pipewrite(pstate->parallelSlot[worker].pipeWrite, str, len) != len)
1649 : : {
737 tgl@sss.pgh.pa.us 1650 :UBC 0 : pg_fatal("could not write to the communication channel: %m");
1651 : : }
4039 andrew@dunslane.net 1652 :CBC 196 : }
1653 : :
1654 : : /*
1655 : : * Read one message from the specified pipe (fd), blocking if necessary
1656 : : * until one is available, and return it as a malloc'd string.
1657 : : * On EOF, return NULL.
1658 : : *
1659 : : * A "message" on the channel is just a null-terminated string.
1660 : : */
1661 : : static char *
1662 : 420 : readMessageFromPipe(int fd)
1663 : : {
1664 : : char *msg;
1665 : : int msgsize,
1666 : : bufsize;
1667 : : int ret;
1668 : :
1669 : : /*
1670 : : * In theory, if we let piperead() read multiple bytes, it might give us
1671 : : * back fragments of multiple messages. (That can't actually occur, since
1672 : : * neither leader nor workers send more than one message without waiting
1673 : : * for a reply, but we don't wish to assume that here.) For simplicity,
1674 : : * read a byte at a time until we get the terminating '\0'. This method
1675 : : * is a bit inefficient, but since this is only used for relatively short
1676 : : * command and status strings, it shouldn't matter.
1677 : : */
1678 : 420 : bufsize = 64; /* could be any number */
1679 : 420 : msg = (char *) pg_malloc(bufsize);
1680 : 420 : msgsize = 0;
1681 : : for (;;)
1682 : : {
2878 tgl@sss.pgh.pa.us 1683 [ - + ]: 4478 : Assert(msgsize < bufsize);
4039 andrew@dunslane.net 1684 : 4478 : ret = piperead(fd, msg + msgsize, 1);
1685 [ + + ]: 4478 : if (ret <= 0)
2878 tgl@sss.pgh.pa.us 1686 : 28 : break; /* error or connection closure */
1687 : :
4039 andrew@dunslane.net 1688 [ - + ]: 4450 : Assert(ret == 1);
1689 : :
1690 [ + + ]: 4450 : if (msg[msgsize] == '\0')
2878 tgl@sss.pgh.pa.us 1691 : 392 : return msg; /* collected whole message */
1692 : :
4039 andrew@dunslane.net 1693 : 4058 : msgsize++;
2878 tgl@sss.pgh.pa.us 1694 [ - + ]: 4058 : if (msgsize == bufsize) /* enlarge buffer if needed */
1695 : : {
2878 tgl@sss.pgh.pa.us 1696 :UBC 0 : bufsize += 16; /* could be any number */
3435 1697 : 0 : msg = (char *) pg_realloc(msg, bufsize);
1698 : : }
1699 : : }
1700 : :
1701 : : /* Other end has closed the connection */
3435 tgl@sss.pgh.pa.us 1702 :CBC 28 : pg_free(msg);
3733 sfrost@snowman.net 1703 : 28 : return NULL;
1704 : : }
1705 : :
1706 : : #ifdef WIN32
1707 : :
1708 : : /*
1709 : : * This is a replacement version of pipe(2) for Windows which allows the pipe
1710 : : * handles to be used in select().
1711 : : *
1712 : : * Reads and writes on the pipe must go through piperead()/pipewrite().
1713 : : *
1714 : : * For consistency with Unix we declare the returned handles as "int".
1715 : : * This is okay even on WIN64 because system handles are not more than
1716 : : * 32 bits wide, but we do have to do some casting.
1717 : : */
1718 : : static int
1719 : : pgpipe(int handles[2])
1720 : : {
1721 : : pgsocket s,
1722 : : tmp_sock;
1723 : : struct sockaddr_in serv_addr;
1724 : : int len = sizeof(serv_addr);
1725 : :
1726 : : /* We have to use the Unix socket invalid file descriptor value here. */
1727 : : handles[0] = handles[1] = -1;
1728 : :
1729 : : /*
1730 : : * setup listen socket
1731 : : */
1732 : : if ((s = socket(AF_INET, SOCK_STREAM, 0)) == PGINVALID_SOCKET)
1733 : : {
1734 : : pg_log_error("pgpipe: could not create socket: error code %d",
1735 : : WSAGetLastError());
1736 : : return -1;
1737 : : }
1738 : :
1739 : : memset(&serv_addr, 0, sizeof(serv_addr));
1740 : : serv_addr.sin_family = AF_INET;
1741 : : serv_addr.sin_port = pg_hton16(0);
1742 : : serv_addr.sin_addr.s_addr = pg_hton32(INADDR_LOOPBACK);
1743 : : if (bind(s, (SOCKADDR *) &serv_addr, len) == SOCKET_ERROR)
1744 : : {
1745 : : pg_log_error("pgpipe: could not bind: error code %d",
1746 : : WSAGetLastError());
1747 : : closesocket(s);
1748 : : return -1;
1749 : : }
1750 : : if (listen(s, 1) == SOCKET_ERROR)
1751 : : {
1752 : : pg_log_error("pgpipe: could not listen: error code %d",
1753 : : WSAGetLastError());
1754 : : closesocket(s);
1755 : : return -1;
1756 : : }
1757 : : if (getsockname(s, (SOCKADDR *) &serv_addr, &len) == SOCKET_ERROR)
1758 : : {
1759 : : pg_log_error("pgpipe: %s() failed: error code %d", "getsockname",
1760 : : WSAGetLastError());
1761 : : closesocket(s);
1762 : : return -1;
1763 : : }
1764 : :
1765 : : /*
1766 : : * setup pipe handles
1767 : : */
1768 : : if ((tmp_sock = socket(AF_INET, SOCK_STREAM, 0)) == PGINVALID_SOCKET)
1769 : : {
1770 : : pg_log_error("pgpipe: could not create second socket: error code %d",
1771 : : WSAGetLastError());
1772 : : closesocket(s);
1773 : : return -1;
1774 : : }
1775 : : handles[1] = (int) tmp_sock;
1776 : :
1777 : : if (connect(handles[1], (SOCKADDR *) &serv_addr, len) == SOCKET_ERROR)
1778 : : {
1779 : : pg_log_error("pgpipe: could not connect socket: error code %d",
1780 : : WSAGetLastError());
1781 : : closesocket(handles[1]);
1782 : : handles[1] = -1;
1783 : : closesocket(s);
1784 : : return -1;
1785 : : }
1786 : : if ((tmp_sock = accept(s, (SOCKADDR *) &serv_addr, &len)) == PGINVALID_SOCKET)
1787 : : {
1788 : : pg_log_error("pgpipe: could not accept connection: error code %d",
1789 : : WSAGetLastError());
1790 : : closesocket(handles[1]);
1791 : : handles[1] = -1;
1792 : : closesocket(s);
1793 : : return -1;
1794 : : }
1795 : : handles[0] = (int) tmp_sock;
1796 : :
1797 : : closesocket(s);
1798 : : return 0;
1799 : : }
1800 : :
1801 : : #endif /* WIN32 */
|