Age Owner Branch data TLA Line data Source code
1 : : /*
2 : : * parallel.c
3 : : *
4 : : * multi-process support
5 : : *
6 : : * Copyright (c) 2010-2024, PostgreSQL Global Development Group
7 : : * src/bin/pg_upgrade/parallel.c
8 : : */
9 : :
10 : : #include "postgres_fe.h"
11 : :
12 : : #include <sys/wait.h>
13 : : #ifdef WIN32
14 : : #include <io.h>
15 : : #endif
16 : :
17 : : #include "pg_upgrade.h"
18 : :
19 : : static int parallel_jobs;
20 : :
21 : : #ifdef WIN32
22 : : /*
23 : : * Array holding all active threads. There can't be any gaps/zeros so
24 : : * it can be passed to WaitForMultipleObjects(). We use two arrays
25 : : * so the thread_handles array can be passed to WaitForMultipleObjects().
26 : : */
27 : : HANDLE *thread_handles;
28 : :
29 : : typedef struct
30 : : {
31 : : char *log_file;
32 : : char *opt_log_file;
33 : : char *cmd;
34 : : } exec_thread_arg;
35 : :
36 : : typedef struct
37 : : {
38 : : DbInfoArr *old_db_arr;
39 : : DbInfoArr *new_db_arr;
40 : : char *old_pgdata;
41 : : char *new_pgdata;
42 : : char *old_tablespace;
43 : : } transfer_thread_arg;
44 : :
45 : : exec_thread_arg **exec_thread_args;
46 : : transfer_thread_arg **transfer_thread_args;
47 : :
48 : : /* track current thread_args struct so reap_child() can be used for all cases */
49 : : void **cur_thread_args;
50 : :
51 : : DWORD win32_exec_prog(exec_thread_arg *args);
52 : : DWORD win32_transfer_all_new_dbs(transfer_thread_arg *args);
53 : : #endif
54 : :
55 : : /*
56 : : * parallel_exec_prog
57 : : *
58 : : * This has the same API as exec_prog, except it does parallel execution,
59 : : * and therefore must throw errors and doesn't return an error status.
60 : : */
61 : : void
4127 bruce@momjian.us 62 :CBC 19 : parallel_exec_prog(const char *log_file, const char *opt_log_file,
63 : : const char *fmt,...)
64 : : {
65 : : va_list args;
66 : : char cmd[MAX_STRING];
67 : :
68 : : #ifndef WIN32
69 : : pid_t child;
70 : : #else
71 : : HANDLE child;
72 : : exec_thread_arg *new_arg;
73 : : #endif
74 : :
75 : 19 : va_start(args, fmt);
76 : 19 : vsnprintf(cmd, sizeof(cmd), fmt, args);
77 : 19 : va_end(args);
78 : :
79 [ + - ]: 19 : if (user_opts.jobs <= 1)
80 : : /* exit_on_error must be true to allow jobs */
2288 81 : 19 : exec_prog(log_file, opt_log_file, true, true, "%s", cmd);
82 : : else
83 : : {
84 : : /* parallel */
85 : : #ifdef WIN32
86 : : if (thread_handles == NULL)
87 : : thread_handles = pg_malloc(user_opts.jobs * sizeof(HANDLE));
88 : :
89 : : if (exec_thread_args == NULL)
90 : : {
91 : : int i;
92 : :
93 : : exec_thread_args = pg_malloc(user_opts.jobs * sizeof(exec_thread_arg *));
94 : :
95 : : /*
96 : : * For safety and performance, we keep the args allocated during
97 : : * the entire life of the process, and we don't free the args in a
98 : : * thread different from the one that allocated it.
99 : : */
100 : : for (i = 0; i < user_opts.jobs; i++)
101 : : exec_thread_args[i] = pg_malloc0(sizeof(exec_thread_arg));
102 : : }
103 : :
104 : : cur_thread_args = (void **) exec_thread_args;
105 : : #endif
106 : : /* harvest any dead children */
4127 bruce@momjian.us 107 [ # # ]:UBC 0 : while (reap_child(false) == true)
108 : : ;
109 : :
110 : : /* must we wait for a dead child? */
111 [ # # ]: 0 : if (parallel_jobs >= user_opts.jobs)
112 : 0 : reap_child(true);
113 : :
114 : : /* set this before we start the job */
115 : 0 : parallel_jobs++;
116 : :
117 : : /* Ensure stdio state is quiesced before forking */
118 : 0 : fflush(NULL);
119 : :
120 : : #ifndef WIN32
121 : 0 : child = fork();
122 [ # # ]: 0 : if (child == 0)
123 : : /* use _exit to skip atexit() functions */
2288 124 : 0 : _exit(!exec_prog(log_file, opt_log_file, true, true, "%s", cmd));
4127 125 [ # # ]: 0 : else if (child < 0)
126 : : /* fork failed */
33 michael@paquier.xyz 127 :UNC 0 : pg_fatal("could not create worker process: %m");
128 : : #else
129 : : /* empty array element are always at the end */
130 : : new_arg = exec_thread_args[parallel_jobs - 1];
131 : :
132 : : /* Can only pass one pointer into the function, so use a struct */
133 : : pg_free(new_arg->log_file);
134 : : new_arg->log_file = pg_strdup(log_file);
135 : : pg_free(new_arg->opt_log_file);
136 : : new_arg->opt_log_file = opt_log_file ? pg_strdup(opt_log_file) : NULL;
137 : : pg_free(new_arg->cmd);
138 : : new_arg->cmd = pg_strdup(cmd);
139 : :
140 : : child = (HANDLE) _beginthreadex(NULL, 0, (void *) win32_exec_prog,
141 : : new_arg, 0, NULL);
142 : : if (child == 0)
143 : : pg_fatal("could not create worker thread: %m");
144 : :
145 : : thread_handles[parallel_jobs - 1] = child;
146 : : #endif
147 : : }
4127 bruce@momjian.us 148 :CBC 19 : }
149 : :
150 : :
151 : : #ifdef WIN32
152 : : DWORD
153 : : win32_exec_prog(exec_thread_arg *args)
154 : : {
155 : : int ret;
156 : :
157 : : ret = !exec_prog(args->log_file, args->opt_log_file, true, true, "%s", args->cmd);
158 : :
159 : : /* terminates thread */
160 : : return ret;
161 : : }
162 : : #endif
163 : :
164 : :
165 : : /*
166 : : * parallel_transfer_all_new_dbs
167 : : *
168 : : * This has the same API as transfer_all_new_dbs, except it does parallel execution
169 : : * by transferring multiple tablespaces in parallel
170 : : */
171 : : void
3973 172 : 3 : parallel_transfer_all_new_dbs(DbInfoArr *old_db_arr, DbInfoArr *new_db_arr,
173 : : char *old_pgdata, char *new_pgdata,
174 : : char *old_tablespace)
175 : : {
176 : : #ifndef WIN32
177 : : pid_t child;
178 : : #else
179 : : HANDLE child;
180 : : transfer_thread_arg *new_arg;
181 : : #endif
182 : :
4113 183 [ + - ]: 3 : if (user_opts.jobs <= 1)
184 : 3 : transfer_all_new_dbs(old_db_arr, new_db_arr, old_pgdata, new_pgdata, NULL);
185 : : else
186 : : {
187 : : /* parallel */
188 : : #ifdef WIN32
189 : : if (thread_handles == NULL)
190 : : thread_handles = pg_malloc(user_opts.jobs * sizeof(HANDLE));
191 : :
192 : : if (transfer_thread_args == NULL)
193 : : {
194 : : int i;
195 : :
196 : : transfer_thread_args = pg_malloc(user_opts.jobs * sizeof(transfer_thread_arg *));
197 : :
198 : : /*
199 : : * For safety and performance, we keep the args allocated during
200 : : * the entire life of the process, and we don't free the args in a
201 : : * thread different from the one that allocated it.
202 : : */
203 : : for (i = 0; i < user_opts.jobs; i++)
204 : : transfer_thread_args[i] = pg_malloc0(sizeof(transfer_thread_arg));
205 : : }
206 : :
207 : : cur_thread_args = (void **) transfer_thread_args;
208 : : #endif
209 : : /* harvest any dead children */
4113 bruce@momjian.us 210 [ # # ]:UBC 0 : while (reap_child(false) == true)
211 : : ;
212 : :
213 : : /* must we wait for a dead child? */
214 [ # # ]: 0 : if (parallel_jobs >= user_opts.jobs)
215 : 0 : reap_child(true);
216 : :
217 : : /* set this before we start the job */
218 : 0 : parallel_jobs++;
219 : :
220 : : /* Ensure stdio state is quiesced before forking */
221 : 0 : fflush(NULL);
222 : :
223 : : #ifndef WIN32
224 : 0 : child = fork();
225 [ # # ]: 0 : if (child == 0)
226 : : {
227 : 0 : transfer_all_new_dbs(old_db_arr, new_db_arr, old_pgdata, new_pgdata,
228 : : old_tablespace);
229 : : /* if we take another exit path, it will be non-zero */
230 : : /* use _exit to skip atexit() functions */
231 : 0 : _exit(0);
232 : : }
233 [ # # ]: 0 : else if (child < 0)
234 : : /* fork failed */
33 michael@paquier.xyz 235 :UNC 0 : pg_fatal("could not create worker process: %m");
236 : : #else
237 : : /* empty array element are always at the end */
238 : : new_arg = transfer_thread_args[parallel_jobs - 1];
239 : :
240 : : /* Can only pass one pointer into the function, so use a struct */
241 : : new_arg->old_db_arr = old_db_arr;
242 : : new_arg->new_db_arr = new_db_arr;
243 : : pg_free(new_arg->old_pgdata);
244 : : new_arg->old_pgdata = pg_strdup(old_pgdata);
245 : : pg_free(new_arg->new_pgdata);
246 : : new_arg->new_pgdata = pg_strdup(new_pgdata);
247 : : pg_free(new_arg->old_tablespace);
248 : : new_arg->old_tablespace = old_tablespace ? pg_strdup(old_tablespace) : NULL;
249 : :
250 : : child = (HANDLE) _beginthreadex(NULL, 0, (void *) win32_transfer_all_new_dbs,
251 : : new_arg, 0, NULL);
252 : : if (child == 0)
253 : : pg_fatal("could not create worker thread: %m");
254 : :
255 : : thread_handles[parallel_jobs - 1] = child;
256 : : #endif
257 : : }
4113 bruce@momjian.us 258 :CBC 3 : }
259 : :
260 : :
261 : : #ifdef WIN32
262 : : DWORD
263 : : win32_transfer_all_new_dbs(transfer_thread_arg *args)
264 : : {
265 : : transfer_all_new_dbs(args->old_db_arr, args->new_db_arr, args->old_pgdata,
266 : : args->new_pgdata, args->old_tablespace);
267 : :
268 : : /* terminates thread */
269 : : return 0;
270 : : }
271 : : #endif
272 : :
273 : :
274 : : /*
275 : : * collect status from a completed worker child
276 : : */
277 : : bool
4127 278 : 7 : reap_child(bool wait_for_child)
279 : : {
280 : : #ifndef WIN32
281 : : int work_status;
282 : : pid_t child;
283 : : #else
284 : : int thread_num;
285 : : DWORD res;
286 : : #endif
287 : :
288 [ - + - - ]: 7 : if (user_opts.jobs <= 1 || parallel_jobs == 0)
289 : 7 : return false;
290 : :
291 : : #ifndef WIN32
1946 tgl@sss.pgh.pa.us 292 :UBC 0 : child = waitpid(-1, &work_status, wait_for_child ? 0 : WNOHANG);
293 [ # # ]: 0 : if (child == (pid_t) -1)
33 michael@paquier.xyz 294 :UNC 0 : pg_fatal("%s() failed: %m", "waitpid");
1946 tgl@sss.pgh.pa.us 295 [ # # ]:UBC 0 : if (child == 0)
296 : 0 : return false; /* no children, or no dead children */
297 [ # # ]: 0 : if (work_status != 0)
642 298 : 0 : pg_fatal("child process exited abnormally: status %d", work_status);
299 : : #else
300 : : /* wait for one to finish */
301 : : thread_num = WaitForMultipleObjects(parallel_jobs, thread_handles,
302 : : false, wait_for_child ? INFINITE : 0);
303 : :
304 : : if (thread_num == WAIT_TIMEOUT || thread_num == WAIT_FAILED)
305 : : return false;
306 : :
307 : : /* compute thread index in active_threads */
308 : : thread_num -= WAIT_OBJECT_0;
309 : :
310 : : /* get the result */
311 : : GetExitCodeThread(thread_handles[thread_num], &res);
312 : : if (res != 0)
313 : : pg_fatal("child worker exited abnormally: %m");
314 : :
315 : : /* dispose of handle to stop leaks */
316 : : CloseHandle(thread_handles[thread_num]);
317 : :
318 : : /* Move last slot into dead child's position */
319 : : if (thread_num != parallel_jobs - 1)
320 : : {
321 : : void *tmp_args;
322 : :
323 : : thread_handles[thread_num] = thread_handles[parallel_jobs - 1];
324 : :
325 : : /*
326 : : * Move last active thread arg struct into the now-dead slot, and the
327 : : * now-dead slot to the end for reuse by the next thread. Though the
328 : : * thread struct is in use by another thread, we can safely swap the
329 : : * struct pointers within the array.
330 : : */
331 : : tmp_args = cur_thread_args[thread_num];
332 : : cur_thread_args[thread_num] = cur_thread_args[parallel_jobs - 1];
333 : : cur_thread_args[parallel_jobs - 1] = tmp_args;
334 : : }
335 : : #endif
336 : :
337 : : /* do this after job has been removed */
4127 bruce@momjian.us 338 : 0 : parallel_jobs--;
339 : :
340 : 0 : return true;
341 : : }
|