LCOV - differential code coverage report
Current view: top level - src/bin/pg_upgrade - parallel.c (source / functions) Coverage Total Hit UNC UIC UBC GBC GIC GNC CBC EUB ECB DUB
Current: Differential Code Coverage HEAD vs 15 Lines: 31.8 % 44 14 4 12 14 2 6 6 13 8 1
Current Date: 2023-04-08 15:15:32 Functions: 100.0 % 3 3 2 1 2
Baseline: 15
Baseline Date: 2023-04-08 15:09:40
Legend: Lines: hit not hit

           TLA  Line data    Source code
       1                 : /*
       2                 :  *  parallel.c
       3                 :  *
       4                 :  *  multi-process support
       5                 :  *
       6                 :  *  Copyright (c) 2010-2023, PostgreSQL Global Development Group
       7                 :  *  src/bin/pg_upgrade/parallel.c
       8                 :  */
       9                 : 
      10                 : #include "postgres_fe.h"
      11                 : 
      12                 : #include <sys/wait.h>
      13                 : #ifdef WIN32
      14                 : #include <io.h>
      15                 : #endif
      16                 : 
      17                 : #include "pg_upgrade.h"
      18                 : 
      19                 : static int  parallel_jobs;
      20                 : 
      21                 : #ifdef WIN32
      22                 : /*
      23                 :  *  Array holding all active threads.  There can't be any gaps/zeros so
      24                 :  *  it can be passed to WaitForMultipleObjects().  We use two arrays
      25                 :  *  so the thread_handles array can be passed to WaitForMultipleObjects().
      26                 :  */
      27                 : HANDLE     *thread_handles;
      28                 : 
      29                 : typedef struct
      30                 : {
      31                 :     char       *log_file;
      32                 :     char       *opt_log_file;
      33                 :     char       *cmd;
      34                 : } exec_thread_arg;
      35                 : 
      36                 : typedef struct
      37                 : {
      38                 :     DbInfoArr  *old_db_arr;
      39                 :     DbInfoArr  *new_db_arr;
      40                 :     char       *old_pgdata;
      41                 :     char       *new_pgdata;
      42                 :     char       *old_tablespace;
      43                 : } transfer_thread_arg;
      44                 : 
      45                 : exec_thread_arg **exec_thread_args;
      46                 : transfer_thread_arg **transfer_thread_args;
      47                 : 
      48                 : /* track current thread_args struct so reap_child() can be used for all cases */
      49                 : void      **cur_thread_args;
      50                 : 
      51                 : DWORD       win32_exec_prog(exec_thread_arg *args);
      52                 : DWORD       win32_transfer_all_new_dbs(transfer_thread_arg *args);
      53                 : #endif
      54                 : 
      55                 : /*
      56                 :  *  parallel_exec_prog
      57                 :  *
      58                 :  *  This has the same API as exec_prog, except it does parallel execution,
      59                 :  *  and therefore must throw errors and doesn't return an error status.
      60                 :  */
      61                 : void
      62 CBC          11 : parallel_exec_prog(const char *log_file, const char *opt_log_file,
      63                 :                    const char *fmt,...)
      64                 : {
      65                 :     va_list     args;
      66                 :     char        cmd[MAX_STRING];
      67                 : 
      68                 : #ifndef WIN32
      69                 :     pid_t       child;
      70                 : #else
      71                 :     HANDLE      child;
      72                 :     exec_thread_arg *new_arg;
      73                 : #endif
      74                 : 
      75              11 :     va_start(args, fmt);
      76              11 :     vsnprintf(cmd, sizeof(cmd), fmt, args);
      77              11 :     va_end(args);
      78                 : 
      79              11 :     if (user_opts.jobs <= 1)
      80                 :         /* exit_on_error must be true to allow jobs */
      81              11 :         exec_prog(log_file, opt_log_file, true, true, "%s", cmd);
      82                 :     else
      83                 :     {
      84                 :         /* parallel */
      85                 : #ifdef WIN32
      86                 :         if (thread_handles == NULL)
      87                 :             thread_handles = pg_malloc(user_opts.jobs * sizeof(HANDLE));
      88                 : 
      89                 :         if (exec_thread_args == NULL)
      90                 :         {
      91                 :             int         i;
      92                 : 
      93                 :             exec_thread_args = pg_malloc(user_opts.jobs * sizeof(exec_thread_arg *));
      94                 : 
      95                 :             /*
      96                 :              * For safety and performance, we keep the args allocated during
      97                 :              * the entire life of the process, and we don't free the args in a
      98                 :              * thread different from the one that allocated it.
      99                 :              */
     100                 :             for (i = 0; i < user_opts.jobs; i++)
     101                 :                 exec_thread_args[i] = pg_malloc0(sizeof(exec_thread_arg));
     102                 :         }
     103                 : 
     104                 :         cur_thread_args = (void **) exec_thread_args;
     105                 : #endif
     106                 :         /* harvest any dead children */
     107 UBC           0 :         while (reap_child(false) == true)
     108                 :             ;
     109                 : 
     110                 :         /* must we wait for a dead child? */
     111               0 :         if (parallel_jobs >= user_opts.jobs)
     112               0 :             reap_child(true);
     113                 : 
     114                 :         /* set this before we start the job */
     115               0 :         parallel_jobs++;
     116                 : 
     117                 :         /* Ensure stdio state is quiesced before forking */
     118               0 :         fflush(NULL);
     119                 : 
     120                 : #ifndef WIN32
     121               0 :         child = fork();
     122               0 :         if (child == 0)
     123                 :             /* use _exit to skip atexit() functions */
     124               0 :             _exit(!exec_prog(log_file, opt_log_file, true, true, "%s", cmd));
     125               0 :         else if (child < 0)
     126                 :             /* fork failed */
     127 UNC           0 :             pg_fatal("could not create worker process: %s", strerror(errno));
     128                 : #else
     129                 :         /* empty array element are always at the end */
     130                 :         new_arg = exec_thread_args[parallel_jobs - 1];
     131                 : 
     132                 :         /* Can only pass one pointer into the function, so use a struct */
     133                 :         pg_free(new_arg->log_file);
     134                 :         new_arg->log_file = pg_strdup(log_file);
     135                 :         pg_free(new_arg->opt_log_file);
     136                 :         new_arg->opt_log_file = opt_log_file ? pg_strdup(opt_log_file) : NULL;
     137                 :         pg_free(new_arg->cmd);
     138                 :         new_arg->cmd = pg_strdup(cmd);
     139                 : 
     140                 :         child = (HANDLE) _beginthreadex(NULL, 0, (void *) win32_exec_prog,
     141                 :                                         new_arg, 0, NULL);
     142                 :         if (child == 0)
     143                 :             pg_fatal("could not create worker thread: %s", strerror(errno));
     144                 : 
     145 ECB             :         thread_handles[parallel_jobs - 1] = child;
     146                 : #endif
     147                 :     }
     148 GIC          11 : }
     149                 : 
     150                 : 
     151                 : #ifdef WIN32
     152                 : DWORD
     153                 : win32_exec_prog(exec_thread_arg *args)
     154                 : {
     155                 :     int         ret;
     156                 : 
     157                 :     ret = !exec_prog(args->log_file, args->opt_log_file, true, true, "%s", args->cmd);
     158                 : 
     159                 :     /* terminates thread */
     160                 :     return ret;
     161                 : }
     162                 : #endif
     163                 : 
     164                 : 
     165                 : /*
     166                 :  *  parallel_transfer_all_new_dbs
     167                 :  *
     168                 :  *  This has the same API as transfer_all_new_dbs, except it does parallel execution
     169 ECB             :  *  by transferring multiple tablespaces in parallel
     170                 :  */
     171                 : void
     172 GIC           1 : parallel_transfer_all_new_dbs(DbInfoArr *old_db_arr, DbInfoArr *new_db_arr,
     173                 :                               char *old_pgdata, char *new_pgdata,
     174                 :                               char *old_tablespace)
     175                 : {
     176                 : #ifndef WIN32
     177                 :     pid_t       child;
     178                 : #else
     179                 :     HANDLE      child;
     180 ECB             :     transfer_thread_arg *new_arg;
     181                 : #endif
     182                 : 
     183 GIC           1 :     if (user_opts.jobs <= 1)
     184               1 :         transfer_all_new_dbs(old_db_arr, new_db_arr, old_pgdata, new_pgdata, NULL);
     185                 :     else
     186                 :     {
     187                 :         /* parallel */
     188                 : #ifdef WIN32
     189                 :         if (thread_handles == NULL)
     190                 :             thread_handles = pg_malloc(user_opts.jobs * sizeof(HANDLE));
     191                 : 
     192                 :         if (transfer_thread_args == NULL)
     193                 :         {
     194                 :             int         i;
     195                 : 
     196                 :             transfer_thread_args = pg_malloc(user_opts.jobs * sizeof(transfer_thread_arg *));
     197                 : 
     198                 :             /*
     199                 :              * For safety and performance, we keep the args allocated during
     200                 :              * the entire life of the process, and we don't free the args in a
     201                 :              * thread different from the one that allocated it.
     202                 :              */
     203                 :             for (i = 0; i < user_opts.jobs; i++)
     204                 :                 transfer_thread_args[i] = pg_malloc0(sizeof(transfer_thread_arg));
     205                 :         }
     206                 : 
     207 EUB             :         cur_thread_args = (void **) transfer_thread_args;
     208                 : #endif
     209                 :         /* harvest any dead children */
     210 UIC           0 :         while (reap_child(false) == true)
     211 EUB             :             ;
     212                 : 
     213                 :         /* must we wait for a dead child? */
     214 UIC           0 :         if (parallel_jobs >= user_opts.jobs)
     215 UBC           0 :             reap_child(true);
     216                 : 
     217                 :         /* set this before we start the job */
     218               0 :         parallel_jobs++;
     219                 : 
     220                 :         /* Ensure stdio state is quiesced before forking */
     221               0 :         fflush(NULL);
     222 EUB             : 
     223                 : #ifndef WIN32
     224 UBC           0 :         child = fork();
     225 UIC           0 :         if (child == 0)
     226                 :         {
     227               0 :             transfer_all_new_dbs(old_db_arr, new_db_arr, old_pgdata, new_pgdata,
     228 EUB             :                                  old_tablespace);
     229                 :             /* if we take another exit path, it will be non-zero */
     230                 :             /* use _exit to skip atexit() functions */
     231 UIC           0 :             _exit(0);
     232 EUB             :         }
     233 UIC           0 :         else if (child < 0)
     234                 :             /* fork failed */
     235 UNC           0 :             pg_fatal("could not create worker process: %s", strerror(errno));
     236                 : #else
     237                 :         /* empty array element are always at the end */
     238                 :         new_arg = transfer_thread_args[parallel_jobs - 1];
     239                 : 
     240                 :         /* Can only pass one pointer into the function, so use a struct */
     241                 :         new_arg->old_db_arr = old_db_arr;
     242                 :         new_arg->new_db_arr = new_db_arr;
     243                 :         pg_free(new_arg->old_pgdata);
     244                 :         new_arg->old_pgdata = pg_strdup(old_pgdata);
     245                 :         pg_free(new_arg->new_pgdata);
     246                 :         new_arg->new_pgdata = pg_strdup(new_pgdata);
     247                 :         pg_free(new_arg->old_tablespace);
     248                 :         new_arg->old_tablespace = old_tablespace ? pg_strdup(old_tablespace) : NULL;
     249                 : 
     250                 :         child = (HANDLE) _beginthreadex(NULL, 0, (void *) win32_transfer_all_new_dbs,
     251                 :                                         new_arg, 0, NULL);
     252 ECB             :         if (child == 0)
     253                 :             pg_fatal("could not create worker thread: %s", strerror(errno));
     254                 : 
     255                 :         thread_handles[parallel_jobs - 1] = child;
     256                 : #endif
     257                 :     }
     258 GIC           1 : }
     259                 : 
     260                 : 
     261                 : #ifdef WIN32
     262                 : DWORD
     263                 : win32_transfer_all_new_dbs(transfer_thread_arg *args)
     264                 : {
     265                 :     transfer_all_new_dbs(args->old_db_arr, args->new_db_arr, args->old_pgdata,
     266                 :                          args->new_pgdata, args->old_tablespace);
     267                 : 
     268                 :     /* terminates thread */
     269                 :     return 0;
     270                 : }
     271                 : #endif
     272 ECB             : 
     273                 : 
     274                 : /*
     275                 :  *  collect status from a completed worker child
     276                 :  */
     277                 : bool
     278 GIC           2 : reap_child(bool wait_for_child)
     279                 : {
     280                 : #ifndef WIN32
     281                 :     int         work_status;
     282 ECB             :     pid_t       child;
     283                 : #else
     284                 :     int         thread_num;
     285                 :     DWORD       res;
     286 EUB             : #endif
     287                 : 
     288 GBC           2 :     if (user_opts.jobs <= 1 || parallel_jobs == 0)
     289               2 :         return false;
     290 EUB             : 
     291                 : #ifndef WIN32
     292 UBC           0 :     child = waitpid(-1, &work_status, wait_for_child ? 0 : WNOHANG);
     293 UIC           0 :     if (child == (pid_t) -1)
     294 UNC           0 :         pg_fatal("%s() failed: %s", "waitpid", strerror(errno));
     295 UIC           0 :     if (child == 0)
     296               0 :         return false;           /* no children, or no dead children */
     297               0 :     if (work_status != 0)
     298 UNC           0 :         pg_fatal("child process exited abnormally: status %d", work_status);
     299                 : #else
     300                 :     /* wait for one to finish */
     301                 :     thread_num = WaitForMultipleObjects(parallel_jobs, thread_handles,
     302                 :                                         false, wait_for_child ? INFINITE : 0);
     303                 : 
     304                 :     if (thread_num == WAIT_TIMEOUT || thread_num == WAIT_FAILED)
     305                 :         return false;
     306                 : 
     307                 :     /* compute thread index in active_threads */
     308                 :     thread_num -= WAIT_OBJECT_0;
     309                 : 
     310                 :     /* get the result */
     311                 :     GetExitCodeThread(thread_handles[thread_num], &res);
     312                 :     if (res != 0)
     313                 :         pg_fatal("child worker exited abnormally: %s", strerror(errno));
     314                 : 
     315                 :     /* dispose of handle to stop leaks */
     316                 :     CloseHandle(thread_handles[thread_num]);
     317                 : 
     318                 :     /* Move last slot into dead child's position */
     319                 :     if (thread_num != parallel_jobs - 1)
     320                 :     {
     321                 :         void       *tmp_args;
     322                 : 
     323                 :         thread_handles[thread_num] = thread_handles[parallel_jobs - 1];
     324                 : 
     325                 :         /*
     326                 :          * Move last active thread arg struct into the now-dead slot, and the
     327                 :          * now-dead slot to the end for reuse by the next thread. Though the
     328                 :          * thread struct is in use by another thread, we can safely swap the
     329                 :          * struct pointers within the array.
     330                 :          */
     331                 :         tmp_args = cur_thread_args[thread_num];
     332 EUB             :         cur_thread_args[thread_num] = cur_thread_args[parallel_jobs - 1];
     333                 :         cur_thread_args[parallel_jobs - 1] = tmp_args;
     334                 :     }
     335                 : #endif
     336                 : 
     337                 :     /* do this after job has been removed */
     338 UIC           0 :     parallel_jobs--;
     339                 : 
     340               0 :     return true;
     341                 : }
        

Generated by: LCOV version v1.16-55-g56c0a2a