LCOV - differential code coverage report
Current view: top level - src/bin/pg_upgrade - parallel.c (source / functions) Coverage Total Hit UNC UIC UBC GBC GIC GNC CBC EUB ECB DUB
Current: Differential Code Coverage HEAD vs 15 Lines: 31.8 % 44 14 4 12 14 2 6 6 13 8 1
Current Date: 2023-04-08 17:13:01 Functions: 100.0 % 3 3 2 1 2
Baseline: 15 Line coverage date bins:
Baseline Date: 2023-04-08 15:09:40 (240..) days: 31.8 % 44 14 4 12 14 2 6 6 13 8
Legend: Lines: hit not hit Function coverage date bins:
(240..) days: 60.0 % 5 3 2 1 2

 Age         Owner                  TLA  Line data    Source code
                                  1                 : /*
                                  2                 :  *  parallel.c
                                  3                 :  *
                                  4                 :  *  multi-process support
                                  5                 :  *
                                  6                 :  *  Copyright (c) 2010-2023, PostgreSQL Global Development Group
                                  7                 :  *  src/bin/pg_upgrade/parallel.c
                                  8                 :  */
                                  9                 : 
                                 10                 : #include "postgres_fe.h"
                                 11                 : 
                                 12                 : #include <sys/wait.h>
                                 13                 : #ifdef WIN32
                                 14                 : #include <io.h>
                                 15                 : #endif
                                 16                 : 
                                 17                 : #include "pg_upgrade.h"
                                 18                 : 
                                 19                 : static int  parallel_jobs;
                                 20                 : 
                                 21                 : #ifdef WIN32
                                 22                 : /*
                                 23                 :  *  Array holding all active threads.  There can't be any gaps/zeros so
                                 24                 :  *  it can be passed to WaitForMultipleObjects().  We use two arrays
                                 25                 :  *  so the thread_handles array can be passed to WaitForMultipleObjects().
                                 26                 :  */
                                 27                 : HANDLE     *thread_handles;
                                 28                 : 
                                 29                 : typedef struct
                                 30                 : {
                                 31                 :     char       *log_file;
                                 32                 :     char       *opt_log_file;
                                 33                 :     char       *cmd;
                                 34                 : } exec_thread_arg;
                                 35                 : 
                                 36                 : typedef struct
                                 37                 : {
                                 38                 :     DbInfoArr  *old_db_arr;
                                 39                 :     DbInfoArr  *new_db_arr;
                                 40                 :     char       *old_pgdata;
                                 41                 :     char       *new_pgdata;
                                 42                 :     char       *old_tablespace;
                                 43                 : } transfer_thread_arg;
                                 44                 : 
                                 45                 : exec_thread_arg **exec_thread_args;
                                 46                 : transfer_thread_arg **transfer_thread_args;
                                 47                 : 
                                 48                 : /* track current thread_args struct so reap_child() can be used for all cases */
                                 49                 : void      **cur_thread_args;
                                 50                 : 
                                 51                 : DWORD       win32_exec_prog(exec_thread_arg *args);
                                 52                 : DWORD       win32_transfer_all_new_dbs(transfer_thread_arg *args);
                                 53                 : #endif
                                 54                 : 
                                 55                 : /*
                                 56                 :  *  parallel_exec_prog
                                 57                 :  *
                                 58                 :  *  This has the same API as exec_prog, except it does parallel execution,
                                 59                 :  *  and therefore must throw errors and doesn't return an error status.
                                 60                 :  */
                                 61                 : void
 3756 bruce                      62 CBC          11 : parallel_exec_prog(const char *log_file, const char *opt_log_file,
                                 63                 :                    const char *fmt,...)
                                 64                 : {
                                 65                 :     va_list     args;
                                 66                 :     char        cmd[MAX_STRING];
                                 67                 : 
                                 68                 : #ifndef WIN32
                                 69                 :     pid_t       child;
                                 70                 : #else
                                 71                 :     HANDLE      child;
                                 72                 :     exec_thread_arg *new_arg;
                                 73                 : #endif
                                 74                 : 
                                 75              11 :     va_start(args, fmt);
                                 76              11 :     vsnprintf(cmd, sizeof(cmd), fmt, args);
                                 77              11 :     va_end(args);
                                 78                 : 
                                 79              11 :     if (user_opts.jobs <= 1)
                                 80                 :         /* exit_on_error must be true to allow jobs */
 1917                            81              11 :         exec_prog(log_file, opt_log_file, true, true, "%s", cmd);
                                 82                 :     else
                                 83                 :     {
                                 84                 :         /* parallel */
                                 85                 : #ifdef WIN32
                                 86                 :         if (thread_handles == NULL)
                                 87                 :             thread_handles = pg_malloc(user_opts.jobs * sizeof(HANDLE));
                                 88                 : 
                                 89                 :         if (exec_thread_args == NULL)
                                 90                 :         {
                                 91                 :             int         i;
                                 92                 : 
                                 93                 :             exec_thread_args = pg_malloc(user_opts.jobs * sizeof(exec_thread_arg *));
                                 94                 : 
                                 95                 :             /*
                                 96                 :              * For safety and performance, we keep the args allocated during
                                 97                 :              * the entire life of the process, and we don't free the args in a
                                 98                 :              * thread different from the one that allocated it.
                                 99                 :              */
                                100                 :             for (i = 0; i < user_opts.jobs; i++)
                                101                 :                 exec_thread_args[i] = pg_malloc0(sizeof(exec_thread_arg));
                                102                 :         }
                                103                 : 
                                104                 :         cur_thread_args = (void **) exec_thread_args;
                                105                 : #endif
                                106                 :         /* harvest any dead children */
 3756 bruce                     107 UBC           0 :         while (reap_child(false) == true)
                                108                 :             ;
                                109                 : 
                                110                 :         /* must we wait for a dead child? */
                                111               0 :         if (parallel_jobs >= user_opts.jobs)
                                112               0 :             reap_child(true);
                                113                 : 
                                114                 :         /* set this before we start the job */
                                115               0 :         parallel_jobs++;
                                116                 : 
                                117                 :         /* Ensure stdio state is quiesced before forking */
                                118               0 :         fflush(NULL);
                                119                 : 
                                120                 : #ifndef WIN32
                                121               0 :         child = fork();
                                122               0 :         if (child == 0)
                                123                 :             /* use _exit to skip atexit() functions */
 1917                           124               0 :             _exit(!exec_prog(log_file, opt_log_file, true, true, "%s", cmd));
 3756                           125               0 :         else if (child < 0)
                                126                 :             /* fork failed */
  271 tgl                       127 UNC           0 :             pg_fatal("could not create worker process: %s", strerror(errno));
                                128                 : #else
                                129                 :         /* empty array element are always at the end */
                                130                 :         new_arg = exec_thread_args[parallel_jobs - 1];
                                131                 : 
                                132                 :         /* Can only pass one pointer into the function, so use a struct */
                                133                 :         pg_free(new_arg->log_file);
                                134                 :         new_arg->log_file = pg_strdup(log_file);
                                135                 :         pg_free(new_arg->opt_log_file);
                                136                 :         new_arg->opt_log_file = opt_log_file ? pg_strdup(opt_log_file) : NULL;
                                137                 :         pg_free(new_arg->cmd);
                                138                 :         new_arg->cmd = pg_strdup(cmd);
                                139                 : 
                                140                 :         child = (HANDLE) _beginthreadex(NULL, 0, (void *) win32_exec_prog,
                                141                 :                                         new_arg, 0, NULL);
                                142                 :         if (child == 0)
                                143                 :             pg_fatal("could not create worker thread: %s", strerror(errno));
                                144                 : 
 3602 bruce                     145 ECB             :         thread_handles[parallel_jobs - 1] = child;
                                146                 : #endif
                                147                 :     }
 3756 bruce                     148 GIC          11 : }
                                149                 : 
                                150                 : 
                                151                 : #ifdef WIN32
                                152                 : DWORD
                                153                 : win32_exec_prog(exec_thread_arg *args)
                                154                 : {
                                155                 :     int         ret;
                                156                 : 
                                157                 :     ret = !exec_prog(args->log_file, args->opt_log_file, true, true, "%s", args->cmd);
                                158                 : 
                                159                 :     /* terminates thread */
                                160                 :     return ret;
                                161                 : }
                                162                 : #endif
                                163                 : 
                                164                 : 
                                165                 : /*
                                166                 :  *  parallel_transfer_all_new_dbs
                                167                 :  *
                                168                 :  *  This has the same API as transfer_all_new_dbs, except it does parallel execution
 2905 andres                    169 ECB             :  *  by transferring multiple tablespaces in parallel
                                170                 :  */
                                171                 : void
 3602 bruce                     172 GIC           1 : parallel_transfer_all_new_dbs(DbInfoArr *old_db_arr, DbInfoArr *new_db_arr,
                                173                 :                               char *old_pgdata, char *new_pgdata,
                                174                 :                               char *old_tablespace)
                                175                 : {
                                176                 : #ifndef WIN32
                                177                 :     pid_t       child;
                                178                 : #else
                                179                 :     HANDLE      child;
 3602 bruce                     180 ECB             :     transfer_thread_arg *new_arg;
 3742                           181                 : #endif
                                182                 : 
 3742 bruce                     183 GIC           1 :     if (user_opts.jobs <= 1)
                                184               1 :         transfer_all_new_dbs(old_db_arr, new_db_arr, old_pgdata, new_pgdata, NULL);
                                185                 :     else
                                186                 :     {
                                187                 :         /* parallel */
                                188                 : #ifdef WIN32
                                189                 :         if (thread_handles == NULL)
                                190                 :             thread_handles = pg_malloc(user_opts.jobs * sizeof(HANDLE));
                                191                 : 
                                192                 :         if (transfer_thread_args == NULL)
                                193                 :         {
                                194                 :             int         i;
                                195                 : 
                                196                 :             transfer_thread_args = pg_malloc(user_opts.jobs * sizeof(transfer_thread_arg *));
                                197                 : 
                                198                 :             /*
                                199                 :              * For safety and performance, we keep the args allocated during
                                200                 :              * the entire life of the process, and we don't free the args in a
                                201                 :              * thread different from the one that allocated it.
                                202                 :              */
                                203                 :             for (i = 0; i < user_opts.jobs; i++)
                                204                 :                 transfer_thread_args[i] = pg_malloc0(sizeof(transfer_thread_arg));
                                205                 :         }
                                206                 : 
 3602 bruce                     207 EUB             :         cur_thread_args = (void **) transfer_thread_args;
                                208                 : #endif
                                209                 :         /* harvest any dead children */
 3742 bruce                     210 UIC           0 :         while (reap_child(false) == true)
 3742 bruce                     211 EUB             :             ;
                                212                 : 
                                213                 :         /* must we wait for a dead child? */
 3742 bruce                     214 UIC           0 :         if (parallel_jobs >= user_opts.jobs)
 3742 bruce                     215 UBC           0 :             reap_child(true);
                                216                 : 
                                217                 :         /* set this before we start the job */
                                218               0 :         parallel_jobs++;
                                219                 : 
                                220                 :         /* Ensure stdio state is quiesced before forking */
                                221               0 :         fflush(NULL);
 3742 bruce                     222 EUB             : 
                                223                 : #ifndef WIN32
 3742 bruce                     224 UBC           0 :         child = fork();
 3742 bruce                     225 UIC           0 :         if (child == 0)
                                226                 :         {
                                227               0 :             transfer_all_new_dbs(old_db_arr, new_db_arr, old_pgdata, new_pgdata,
 3742 bruce                     228 EUB             :                                  old_tablespace);
                                229                 :             /* if we take another exit path, it will be non-zero */
                                230                 :             /* use _exit to skip atexit() functions */
 3742 bruce                     231 UIC           0 :             _exit(0);
 3742 bruce                     232 EUB             :         }
 3742 bruce                     233 UIC           0 :         else if (child < 0)
                                234                 :             /* fork failed */
  271 tgl                       235 UNC           0 :             pg_fatal("could not create worker process: %s", strerror(errno));
                                236                 : #else
                                237                 :         /* empty array element are always at the end */
                                238                 :         new_arg = transfer_thread_args[parallel_jobs - 1];
                                239                 : 
                                240                 :         /* Can only pass one pointer into the function, so use a struct */
                                241                 :         new_arg->old_db_arr = old_db_arr;
                                242                 :         new_arg->new_db_arr = new_db_arr;
                                243                 :         pg_free(new_arg->old_pgdata);
                                244                 :         new_arg->old_pgdata = pg_strdup(old_pgdata);
                                245                 :         pg_free(new_arg->new_pgdata);
                                246                 :         new_arg->new_pgdata = pg_strdup(new_pgdata);
                                247                 :         pg_free(new_arg->old_tablespace);
                                248                 :         new_arg->old_tablespace = old_tablespace ? pg_strdup(old_tablespace) : NULL;
                                249                 : 
                                250                 :         child = (HANDLE) _beginthreadex(NULL, 0, (void *) win32_transfer_all_new_dbs,
                                251                 :                                         new_arg, 0, NULL);
 3742 bruce                     252 ECB             :         if (child == 0)
                                253                 :             pg_fatal("could not create worker thread: %s", strerror(errno));
                                254                 : 
                                255                 :         thread_handles[parallel_jobs - 1] = child;
                                256                 : #endif
                                257                 :     }
 3742 bruce                     258 GIC           1 : }
                                259                 : 
                                260                 : 
                                261                 : #ifdef WIN32
                                262                 : DWORD
                                263                 : win32_transfer_all_new_dbs(transfer_thread_arg *args)
                                264                 : {
                                265                 :     transfer_all_new_dbs(args->old_db_arr, args->new_db_arr, args->old_pgdata,
                                266                 :                          args->new_pgdata, args->old_tablespace);
                                267                 : 
                                268                 :     /* terminates thread */
                                269                 :     return 0;
                                270                 : }
                                271                 : #endif
 3742 bruce                     272 ECB             : 
                                273                 : 
                                274                 : /*
                                275                 :  *  collect status from a completed worker child
                                276                 :  */
                                277                 : bool
 3756 bruce                     278 GIC           2 : reap_child(bool wait_for_child)
                                279                 : {
                                280                 : #ifndef WIN32
                                281                 :     int         work_status;
 1575 tgl                       282 ECB             :     pid_t       child;
 3756 bruce                     283                 : #else
                                284                 :     int         thread_num;
                                285                 :     DWORD       res;
 3756 bruce                     286 EUB             : #endif
                                287                 : 
 3756 bruce                     288 GBC           2 :     if (user_opts.jobs <= 1 || parallel_jobs == 0)
                                289               2 :         return false;
 3756 bruce                     290 EUB             : 
                                291                 : #ifndef WIN32
 1575 tgl                       292 UBC           0 :     child = waitpid(-1, &work_status, wait_for_child ? 0 : WNOHANG);
 1575 tgl                       293 UIC           0 :     if (child == (pid_t) -1)
  271 tgl                       294 UNC           0 :         pg_fatal("%s() failed: %s", "waitpid", strerror(errno));
 1575 tgl                       295 UIC           0 :     if (child == 0)
                                296               0 :         return false;           /* no children, or no dead children */
                                297               0 :     if (work_status != 0)
  271 tgl                       298 UNC           0 :         pg_fatal("child process exited abnormally: status %d", work_status);
                                299                 : #else
                                300                 :     /* wait for one to finish */
                                301                 :     thread_num = WaitForMultipleObjects(parallel_jobs, thread_handles,
                                302                 :                                         false, wait_for_child ? INFINITE : 0);
                                303                 : 
                                304                 :     if (thread_num == WAIT_TIMEOUT || thread_num == WAIT_FAILED)
                                305                 :         return false;
                                306                 : 
                                307                 :     /* compute thread index in active_threads */
                                308                 :     thread_num -= WAIT_OBJECT_0;
                                309                 : 
                                310                 :     /* get the result */
                                311                 :     GetExitCodeThread(thread_handles[thread_num], &res);
                                312                 :     if (res != 0)
                                313                 :         pg_fatal("child worker exited abnormally: %s", strerror(errno));
                                314                 : 
                                315                 :     /* dispose of handle to stop leaks */
                                316                 :     CloseHandle(thread_handles[thread_num]);
                                317                 : 
                                318                 :     /* Move last slot into dead child's position */
                                319                 :     if (thread_num != parallel_jobs - 1)
                                320                 :     {
                                321                 :         void       *tmp_args;
                                322                 : 
                                323                 :         thread_handles[thread_num] = thread_handles[parallel_jobs - 1];
                                324                 : 
                                325                 :         /*
                                326                 :          * Move last active thread arg struct into the now-dead slot, and the
                                327                 :          * now-dead slot to the end for reuse by the next thread. Though the
                                328                 :          * thread struct is in use by another thread, we can safely swap the
                                329                 :          * struct pointers within the array.
                                330                 :          */
                                331                 :         tmp_args = cur_thread_args[thread_num];
 3742 bruce                     332 EUB             :         cur_thread_args[thread_num] = cur_thread_args[parallel_jobs - 1];
                                333                 :         cur_thread_args[parallel_jobs - 1] = tmp_args;
 3756                           334                 :     }
                                335                 : #endif
                                336                 : 
                                337                 :     /* do this after job has been removed */
 3756 bruce                     338 UIC           0 :     parallel_jobs--;
                                339                 : 
                                340               0 :     return true;
                                341                 : }
        

Generated by: LCOV version v1.16-55-g56c0a2a