Age Owner TLA Line data Source code
1 : /*
2 : * parallel.c
3 : *
4 : * multi-process support
5 : *
6 : * Copyright (c) 2010-2023, PostgreSQL Global Development Group
7 : * src/bin/pg_upgrade/parallel.c
8 : */
9 :
10 : #include "postgres_fe.h"
11 :
12 : #include <sys/wait.h>
13 : #ifdef WIN32
14 : #include <io.h>
15 : #endif
16 :
17 : #include "pg_upgrade.h"
18 :
19 : static int parallel_jobs;
20 :
21 : #ifdef WIN32
22 : /*
23 : * Array holding all active threads. There can't be any gaps/zeros so
24 : * it can be passed to WaitForMultipleObjects(). We use two arrays
25 : * so the thread_handles array can be passed to WaitForMultipleObjects().
26 : */
27 : HANDLE *thread_handles;
28 :
29 : typedef struct
30 : {
31 : char *log_file;
32 : char *opt_log_file;
33 : char *cmd;
34 : } exec_thread_arg;
35 :
36 : typedef struct
37 : {
38 : DbInfoArr *old_db_arr;
39 : DbInfoArr *new_db_arr;
40 : char *old_pgdata;
41 : char *new_pgdata;
42 : char *old_tablespace;
43 : } transfer_thread_arg;
44 :
45 : exec_thread_arg **exec_thread_args;
46 : transfer_thread_arg **transfer_thread_args;
47 :
48 : /* track current thread_args struct so reap_child() can be used for all cases */
49 : void **cur_thread_args;
50 :
51 : DWORD win32_exec_prog(exec_thread_arg *args);
52 : DWORD win32_transfer_all_new_dbs(transfer_thread_arg *args);
53 : #endif
54 :
55 : /*
56 : * parallel_exec_prog
57 : *
58 : * This has the same API as exec_prog, except it does parallel execution,
59 : * and therefore must throw errors and doesn't return an error status.
60 : */
61 : void
3756 bruce 62 CBC 11 : parallel_exec_prog(const char *log_file, const char *opt_log_file,
63 : const char *fmt,...)
64 : {
65 : va_list args;
66 : char cmd[MAX_STRING];
67 :
68 : #ifndef WIN32
69 : pid_t child;
70 : #else
71 : HANDLE child;
72 : exec_thread_arg *new_arg;
73 : #endif
74 :
75 11 : va_start(args, fmt);
76 11 : vsnprintf(cmd, sizeof(cmd), fmt, args);
77 11 : va_end(args);
78 :
79 11 : if (user_opts.jobs <= 1)
80 : /* exit_on_error must be true to allow jobs */
1917 81 11 : exec_prog(log_file, opt_log_file, true, true, "%s", cmd);
82 : else
83 : {
84 : /* parallel */
85 : #ifdef WIN32
86 : if (thread_handles == NULL)
87 : thread_handles = pg_malloc(user_opts.jobs * sizeof(HANDLE));
88 :
89 : if (exec_thread_args == NULL)
90 : {
91 : int i;
92 :
93 : exec_thread_args = pg_malloc(user_opts.jobs * sizeof(exec_thread_arg *));
94 :
95 : /*
96 : * For safety and performance, we keep the args allocated during
97 : * the entire life of the process, and we don't free the args in a
98 : * thread different from the one that allocated it.
99 : */
100 : for (i = 0; i < user_opts.jobs; i++)
101 : exec_thread_args[i] = pg_malloc0(sizeof(exec_thread_arg));
102 : }
103 :
104 : cur_thread_args = (void **) exec_thread_args;
105 : #endif
106 : /* harvest any dead children */
3756 bruce 107 UBC 0 : while (reap_child(false) == true)
108 : ;
109 :
110 : /* must we wait for a dead child? */
111 0 : if (parallel_jobs >= user_opts.jobs)
112 0 : reap_child(true);
113 :
114 : /* set this before we start the job */
115 0 : parallel_jobs++;
116 :
117 : /* Ensure stdio state is quiesced before forking */
118 0 : fflush(NULL);
119 :
120 : #ifndef WIN32
121 0 : child = fork();
122 0 : if (child == 0)
123 : /* use _exit to skip atexit() functions */
1917 124 0 : _exit(!exec_prog(log_file, opt_log_file, true, true, "%s", cmd));
3756 125 0 : else if (child < 0)
126 : /* fork failed */
271 tgl 127 UNC 0 : pg_fatal("could not create worker process: %s", strerror(errno));
128 : #else
129 : /* empty array element are always at the end */
130 : new_arg = exec_thread_args[parallel_jobs - 1];
131 :
132 : /* Can only pass one pointer into the function, so use a struct */
133 : pg_free(new_arg->log_file);
134 : new_arg->log_file = pg_strdup(log_file);
135 : pg_free(new_arg->opt_log_file);
136 : new_arg->opt_log_file = opt_log_file ? pg_strdup(opt_log_file) : NULL;
137 : pg_free(new_arg->cmd);
138 : new_arg->cmd = pg_strdup(cmd);
139 :
140 : child = (HANDLE) _beginthreadex(NULL, 0, (void *) win32_exec_prog,
141 : new_arg, 0, NULL);
142 : if (child == 0)
143 : pg_fatal("could not create worker thread: %s", strerror(errno));
144 :
3602 bruce 145 ECB : thread_handles[parallel_jobs - 1] = child;
146 : #endif
147 : }
3756 bruce 148 GIC 11 : }
149 :
150 :
151 : #ifdef WIN32
152 : DWORD
153 : win32_exec_prog(exec_thread_arg *args)
154 : {
155 : int ret;
156 :
157 : ret = !exec_prog(args->log_file, args->opt_log_file, true, true, "%s", args->cmd);
158 :
159 : /* terminates thread */
160 : return ret;
161 : }
162 : #endif
163 :
164 :
165 : /*
166 : * parallel_transfer_all_new_dbs
167 : *
168 : * This has the same API as transfer_all_new_dbs, except it does parallel execution
2905 andres 169 ECB : * by transferring multiple tablespaces in parallel
170 : */
171 : void
3602 bruce 172 GIC 1 : parallel_transfer_all_new_dbs(DbInfoArr *old_db_arr, DbInfoArr *new_db_arr,
173 : char *old_pgdata, char *new_pgdata,
174 : char *old_tablespace)
175 : {
176 : #ifndef WIN32
177 : pid_t child;
178 : #else
179 : HANDLE child;
3602 bruce 180 ECB : transfer_thread_arg *new_arg;
3742 181 : #endif
182 :
3742 bruce 183 GIC 1 : if (user_opts.jobs <= 1)
184 1 : transfer_all_new_dbs(old_db_arr, new_db_arr, old_pgdata, new_pgdata, NULL);
185 : else
186 : {
187 : /* parallel */
188 : #ifdef WIN32
189 : if (thread_handles == NULL)
190 : thread_handles = pg_malloc(user_opts.jobs * sizeof(HANDLE));
191 :
192 : if (transfer_thread_args == NULL)
193 : {
194 : int i;
195 :
196 : transfer_thread_args = pg_malloc(user_opts.jobs * sizeof(transfer_thread_arg *));
197 :
198 : /*
199 : * For safety and performance, we keep the args allocated during
200 : * the entire life of the process, and we don't free the args in a
201 : * thread different from the one that allocated it.
202 : */
203 : for (i = 0; i < user_opts.jobs; i++)
204 : transfer_thread_args[i] = pg_malloc0(sizeof(transfer_thread_arg));
205 : }
206 :
3602 bruce 207 EUB : cur_thread_args = (void **) transfer_thread_args;
208 : #endif
209 : /* harvest any dead children */
3742 bruce 210 UIC 0 : while (reap_child(false) == true)
3742 bruce 211 EUB : ;
212 :
213 : /* must we wait for a dead child? */
3742 bruce 214 UIC 0 : if (parallel_jobs >= user_opts.jobs)
3742 bruce 215 UBC 0 : reap_child(true);
216 :
217 : /* set this before we start the job */
218 0 : parallel_jobs++;
219 :
220 : /* Ensure stdio state is quiesced before forking */
221 0 : fflush(NULL);
3742 bruce 222 EUB :
223 : #ifndef WIN32
3742 bruce 224 UBC 0 : child = fork();
3742 bruce 225 UIC 0 : if (child == 0)
226 : {
227 0 : transfer_all_new_dbs(old_db_arr, new_db_arr, old_pgdata, new_pgdata,
3742 bruce 228 EUB : old_tablespace);
229 : /* if we take another exit path, it will be non-zero */
230 : /* use _exit to skip atexit() functions */
3742 bruce 231 UIC 0 : _exit(0);
3742 bruce 232 EUB : }
3742 bruce 233 UIC 0 : else if (child < 0)
234 : /* fork failed */
271 tgl 235 UNC 0 : pg_fatal("could not create worker process: %s", strerror(errno));
236 : #else
237 : /* empty array element are always at the end */
238 : new_arg = transfer_thread_args[parallel_jobs - 1];
239 :
240 : /* Can only pass one pointer into the function, so use a struct */
241 : new_arg->old_db_arr = old_db_arr;
242 : new_arg->new_db_arr = new_db_arr;
243 : pg_free(new_arg->old_pgdata);
244 : new_arg->old_pgdata = pg_strdup(old_pgdata);
245 : pg_free(new_arg->new_pgdata);
246 : new_arg->new_pgdata = pg_strdup(new_pgdata);
247 : pg_free(new_arg->old_tablespace);
248 : new_arg->old_tablespace = old_tablespace ? pg_strdup(old_tablespace) : NULL;
249 :
250 : child = (HANDLE) _beginthreadex(NULL, 0, (void *) win32_transfer_all_new_dbs,
251 : new_arg, 0, NULL);
3742 bruce 252 ECB : if (child == 0)
253 : pg_fatal("could not create worker thread: %s", strerror(errno));
254 :
255 : thread_handles[parallel_jobs - 1] = child;
256 : #endif
257 : }
3742 bruce 258 GIC 1 : }
259 :
260 :
261 : #ifdef WIN32
262 : DWORD
263 : win32_transfer_all_new_dbs(transfer_thread_arg *args)
264 : {
265 : transfer_all_new_dbs(args->old_db_arr, args->new_db_arr, args->old_pgdata,
266 : args->new_pgdata, args->old_tablespace);
267 :
268 : /* terminates thread */
269 : return 0;
270 : }
271 : #endif
3742 bruce 272 ECB :
273 :
274 : /*
275 : * collect status from a completed worker child
276 : */
277 : bool
3756 bruce 278 GIC 2 : reap_child(bool wait_for_child)
279 : {
280 : #ifndef WIN32
281 : int work_status;
1575 tgl 282 ECB : pid_t child;
3756 bruce 283 : #else
284 : int thread_num;
285 : DWORD res;
3756 bruce 286 EUB : #endif
287 :
3756 bruce 288 GBC 2 : if (user_opts.jobs <= 1 || parallel_jobs == 0)
289 2 : return false;
3756 bruce 290 EUB :
291 : #ifndef WIN32
1575 tgl 292 UBC 0 : child = waitpid(-1, &work_status, wait_for_child ? 0 : WNOHANG);
1575 tgl 293 UIC 0 : if (child == (pid_t) -1)
271 tgl 294 UNC 0 : pg_fatal("%s() failed: %s", "waitpid", strerror(errno));
1575 tgl 295 UIC 0 : if (child == 0)
296 0 : return false; /* no children, or no dead children */
297 0 : if (work_status != 0)
271 tgl 298 UNC 0 : pg_fatal("child process exited abnormally: status %d", work_status);
299 : #else
300 : /* wait for one to finish */
301 : thread_num = WaitForMultipleObjects(parallel_jobs, thread_handles,
302 : false, wait_for_child ? INFINITE : 0);
303 :
304 : if (thread_num == WAIT_TIMEOUT || thread_num == WAIT_FAILED)
305 : return false;
306 :
307 : /* compute thread index in active_threads */
308 : thread_num -= WAIT_OBJECT_0;
309 :
310 : /* get the result */
311 : GetExitCodeThread(thread_handles[thread_num], &res);
312 : if (res != 0)
313 : pg_fatal("child worker exited abnormally: %s", strerror(errno));
314 :
315 : /* dispose of handle to stop leaks */
316 : CloseHandle(thread_handles[thread_num]);
317 :
318 : /* Move last slot into dead child's position */
319 : if (thread_num != parallel_jobs - 1)
320 : {
321 : void *tmp_args;
322 :
323 : thread_handles[thread_num] = thread_handles[parallel_jobs - 1];
324 :
325 : /*
326 : * Move last active thread arg struct into the now-dead slot, and the
327 : * now-dead slot to the end for reuse by the next thread. Though the
328 : * thread struct is in use by another thread, we can safely swap the
329 : * struct pointers within the array.
330 : */
331 : tmp_args = cur_thread_args[thread_num];
3742 bruce 332 EUB : cur_thread_args[thread_num] = cur_thread_args[parallel_jobs - 1];
333 : cur_thread_args[parallel_jobs - 1] = tmp_args;
3756 334 : }
335 : #endif
336 :
337 : /* do this after job has been removed */
3756 bruce 338 UIC 0 : parallel_jobs--;
339 :
340 0 : return true;
341 : }
|