LCOV - differential code coverage report
Current view: top level - src/bin/pg_upgrade - parallel.c (source / functions) Coverage Total Hit UNC UBC GNC CBC DUB
Current: Differential Code Coverage 16@8cea358b128 vs 17@8cea358b128 Lines: 31.8 % 44 14 3 27 14 3
Current Date: 2024-04-14 14:21:10 Functions: 100.0 % 3 3 3
Baseline: 16@8cea358b128 Branches: 10.0 % 30 3 27 3
Baseline Date: 2024-04-14 14:21:09 Line coverage date bins:
Legend: Lines: hit not hit | Branches: + taken - not taken # not executed [..60] days: 0.0 % 3 0 3
(240..) days: 34.1 % 41 14 27 14
Function coverage date bins:
(240..) days: 100.0 % 3 3 3
Branch coverage date bins:
(240..) days: 10.0 % 30 3 27 3

 Age         Owner                    Branch data    TLA  Line data    Source code
                                  1                 :                : /*
                                  2                 :                :  *  parallel.c
                                  3                 :                :  *
                                  4                 :                :  *  multi-process support
                                  5                 :                :  *
                                  6                 :                :  *  Copyright (c) 2010-2024, PostgreSQL Global Development Group
                                  7                 :                :  *  src/bin/pg_upgrade/parallel.c
                                  8                 :                :  */
                                  9                 :                : 
                                 10                 :                : #include "postgres_fe.h"
                                 11                 :                : 
                                 12                 :                : #include <sys/wait.h>
                                 13                 :                : #ifdef WIN32
                                 14                 :                : #include <io.h>
                                 15                 :                : #endif
                                 16                 :                : 
                                 17                 :                : #include "pg_upgrade.h"
                                 18                 :                : 
                                 19                 :                : static int  parallel_jobs;
                                 20                 :                : 
                                 21                 :                : #ifdef WIN32
                                 22                 :                : /*
                                 23                 :                :  *  Array holding all active threads.  There can't be any gaps/zeros so
                                 24                 :                :  *  it can be passed to WaitForMultipleObjects().  We use two arrays
                                 25                 :                :  *  so the thread_handles array can be passed to WaitForMultipleObjects().
                                 26                 :                :  */
                                 27                 :                : HANDLE     *thread_handles;
                                 28                 :                : 
                                 29                 :                : typedef struct
                                 30                 :                : {
                                 31                 :                :     char       *log_file;
                                 32                 :                :     char       *opt_log_file;
                                 33                 :                :     char       *cmd;
                                 34                 :                : } exec_thread_arg;
                                 35                 :                : 
                                 36                 :                : typedef struct
                                 37                 :                : {
                                 38                 :                :     DbInfoArr  *old_db_arr;
                                 39                 :                :     DbInfoArr  *new_db_arr;
                                 40                 :                :     char       *old_pgdata;
                                 41                 :                :     char       *new_pgdata;
                                 42                 :                :     char       *old_tablespace;
                                 43                 :                : } transfer_thread_arg;
                                 44                 :                : 
                                 45                 :                : exec_thread_arg **exec_thread_args;
                                 46                 :                : transfer_thread_arg **transfer_thread_args;
                                 47                 :                : 
                                 48                 :                : /* track current thread_args struct so reap_child() can be used for all cases */
                                 49                 :                : void      **cur_thread_args;
                                 50                 :                : 
                                 51                 :                : DWORD       win32_exec_prog(exec_thread_arg *args);
                                 52                 :                : DWORD       win32_transfer_all_new_dbs(transfer_thread_arg *args);
                                 53                 :                : #endif
                                 54                 :                : 
                                 55                 :                : /*
                                 56                 :                :  *  parallel_exec_prog
                                 57                 :                :  *
                                 58                 :                :  *  This has the same API as exec_prog, except it does parallel execution,
                                 59                 :                :  *  and therefore must throw errors and doesn't return an error status.
                                 60                 :                :  */
                                 61                 :                : void
 4127 bruce@momjian.us           62                 :CBC          19 : parallel_exec_prog(const char *log_file, const char *opt_log_file,
                                 63                 :                :                    const char *fmt,...)
                                 64                 :                : {
                                 65                 :                :     va_list     args;
                                 66                 :                :     char        cmd[MAX_STRING];
                                 67                 :                : 
                                 68                 :                : #ifndef WIN32
                                 69                 :                :     pid_t       child;
                                 70                 :                : #else
                                 71                 :                :     HANDLE      child;
                                 72                 :                :     exec_thread_arg *new_arg;
                                 73                 :                : #endif
                                 74                 :                : 
                                 75                 :             19 :     va_start(args, fmt);
                                 76                 :             19 :     vsnprintf(cmd, sizeof(cmd), fmt, args);
                                 77                 :             19 :     va_end(args);
                                 78                 :                : 
                                 79         [ +  - ]:             19 :     if (user_opts.jobs <= 1)
                                 80                 :                :         /* exit_on_error must be true to allow jobs */
 2288                            81                 :             19 :         exec_prog(log_file, opt_log_file, true, true, "%s", cmd);
                                 82                 :                :     else
                                 83                 :                :     {
                                 84                 :                :         /* parallel */
                                 85                 :                : #ifdef WIN32
                                 86                 :                :         if (thread_handles == NULL)
                                 87                 :                :             thread_handles = pg_malloc(user_opts.jobs * sizeof(HANDLE));
                                 88                 :                : 
                                 89                 :                :         if (exec_thread_args == NULL)
                                 90                 :                :         {
                                 91                 :                :             int         i;
                                 92                 :                : 
                                 93                 :                :             exec_thread_args = pg_malloc(user_opts.jobs * sizeof(exec_thread_arg *));
                                 94                 :                : 
                                 95                 :                :             /*
                                 96                 :                :              * For safety and performance, we keep the args allocated during
                                 97                 :                :              * the entire life of the process, and we don't free the args in a
                                 98                 :                :              * thread different from the one that allocated it.
                                 99                 :                :              */
                                100                 :                :             for (i = 0; i < user_opts.jobs; i++)
                                101                 :                :                 exec_thread_args[i] = pg_malloc0(sizeof(exec_thread_arg));
                                102                 :                :         }
                                103                 :                : 
                                104                 :                :         cur_thread_args = (void **) exec_thread_args;
                                105                 :                : #endif
                                106                 :                :         /* harvest any dead children */
 4127 bruce@momjian.us          107         [ #  # ]:UBC           0 :         while (reap_child(false) == true)
                                108                 :                :             ;
                                109                 :                : 
                                110                 :                :         /* must we wait for a dead child? */
                                111         [ #  # ]:              0 :         if (parallel_jobs >= user_opts.jobs)
                                112                 :              0 :             reap_child(true);
                                113                 :                : 
                                114                 :                :         /* set this before we start the job */
                                115                 :              0 :         parallel_jobs++;
                                116                 :                : 
                                117                 :                :         /* Ensure stdio state is quiesced before forking */
                                118                 :              0 :         fflush(NULL);
                                119                 :                : 
                                120                 :                : #ifndef WIN32
                                121                 :              0 :         child = fork();
                                122         [ #  # ]:              0 :         if (child == 0)
                                123                 :                :             /* use _exit to skip atexit() functions */
 2288                           124                 :              0 :             _exit(!exec_prog(log_file, opt_log_file, true, true, "%s", cmd));
 4127                           125         [ #  # ]:              0 :         else if (child < 0)
                                126                 :                :             /* fork failed */
   33 michael@paquier.xyz       127                 :UNC           0 :             pg_fatal("could not create worker process: %m");
                                128                 :                : #else
                                129                 :                :         /* empty array element are always at the end */
                                130                 :                :         new_arg = exec_thread_args[parallel_jobs - 1];
                                131                 :                : 
                                132                 :                :         /* Can only pass one pointer into the function, so use a struct */
                                133                 :                :         pg_free(new_arg->log_file);
                                134                 :                :         new_arg->log_file = pg_strdup(log_file);
                                135                 :                :         pg_free(new_arg->opt_log_file);
                                136                 :                :         new_arg->opt_log_file = opt_log_file ? pg_strdup(opt_log_file) : NULL;
                                137                 :                :         pg_free(new_arg->cmd);
                                138                 :                :         new_arg->cmd = pg_strdup(cmd);
                                139                 :                : 
                                140                 :                :         child = (HANDLE) _beginthreadex(NULL, 0, (void *) win32_exec_prog,
                                141                 :                :                                         new_arg, 0, NULL);
                                142                 :                :         if (child == 0)
                                143                 :                :             pg_fatal("could not create worker thread: %m");
                                144                 :                : 
                                145                 :                :         thread_handles[parallel_jobs - 1] = child;
                                146                 :                : #endif
                                147                 :                :     }
 4127 bruce@momjian.us          148                 :CBC          19 : }
                                149                 :                : 
                                150                 :                : 
                                151                 :                : #ifdef WIN32
                                152                 :                : DWORD
                                153                 :                : win32_exec_prog(exec_thread_arg *args)
                                154                 :                : {
                                155                 :                :     int         ret;
                                156                 :                : 
                                157                 :                :     ret = !exec_prog(args->log_file, args->opt_log_file, true, true, "%s", args->cmd);
                                158                 :                : 
                                159                 :                :     /* terminates thread */
                                160                 :                :     return ret;
                                161                 :                : }
                                162                 :                : #endif
                                163                 :                : 
                                164                 :                : 
                                165                 :                : /*
                                166                 :                :  *  parallel_transfer_all_new_dbs
                                167                 :                :  *
                                168                 :                :  *  This has the same API as transfer_all_new_dbs, except it does parallel execution
                                169                 :                :  *  by transferring multiple tablespaces in parallel
                                170                 :                :  */
                                171                 :                : void
 3973                           172                 :              3 : parallel_transfer_all_new_dbs(DbInfoArr *old_db_arr, DbInfoArr *new_db_arr,
                                173                 :                :                               char *old_pgdata, char *new_pgdata,
                                174                 :                :                               char *old_tablespace)
                                175                 :                : {
                                176                 :                : #ifndef WIN32
                                177                 :                :     pid_t       child;
                                178                 :                : #else
                                179                 :                :     HANDLE      child;
                                180                 :                :     transfer_thread_arg *new_arg;
                                181                 :                : #endif
                                182                 :                : 
 4113                           183         [ +  - ]:              3 :     if (user_opts.jobs <= 1)
                                184                 :              3 :         transfer_all_new_dbs(old_db_arr, new_db_arr, old_pgdata, new_pgdata, NULL);
                                185                 :                :     else
                                186                 :                :     {
                                187                 :                :         /* parallel */
                                188                 :                : #ifdef WIN32
                                189                 :                :         if (thread_handles == NULL)
                                190                 :                :             thread_handles = pg_malloc(user_opts.jobs * sizeof(HANDLE));
                                191                 :                : 
                                192                 :                :         if (transfer_thread_args == NULL)
                                193                 :                :         {
                                194                 :                :             int         i;
                                195                 :                : 
                                196                 :                :             transfer_thread_args = pg_malloc(user_opts.jobs * sizeof(transfer_thread_arg *));
                                197                 :                : 
                                198                 :                :             /*
                                199                 :                :              * For safety and performance, we keep the args allocated during
                                200                 :                :              * the entire life of the process, and we don't free the args in a
                                201                 :                :              * thread different from the one that allocated it.
                                202                 :                :              */
                                203                 :                :             for (i = 0; i < user_opts.jobs; i++)
                                204                 :                :                 transfer_thread_args[i] = pg_malloc0(sizeof(transfer_thread_arg));
                                205                 :                :         }
                                206                 :                : 
                                207                 :                :         cur_thread_args = (void **) transfer_thread_args;
                                208                 :                : #endif
                                209                 :                :         /* harvest any dead children */
 4113 bruce@momjian.us          210         [ #  # ]:UBC           0 :         while (reap_child(false) == true)
                                211                 :                :             ;
                                212                 :                : 
                                213                 :                :         /* must we wait for a dead child? */
                                214         [ #  # ]:              0 :         if (parallel_jobs >= user_opts.jobs)
                                215                 :              0 :             reap_child(true);
                                216                 :                : 
                                217                 :                :         /* set this before we start the job */
                                218                 :              0 :         parallel_jobs++;
                                219                 :                : 
                                220                 :                :         /* Ensure stdio state is quiesced before forking */
                                221                 :              0 :         fflush(NULL);
                                222                 :                : 
                                223                 :                : #ifndef WIN32
                                224                 :              0 :         child = fork();
                                225         [ #  # ]:              0 :         if (child == 0)
                                226                 :                :         {
                                227                 :              0 :             transfer_all_new_dbs(old_db_arr, new_db_arr, old_pgdata, new_pgdata,
                                228                 :                :                                  old_tablespace);
                                229                 :                :             /* if we take another exit path, it will be non-zero */
                                230                 :                :             /* use _exit to skip atexit() functions */
                                231                 :              0 :             _exit(0);
                                232                 :                :         }
                                233         [ #  # ]:              0 :         else if (child < 0)
                                234                 :                :             /* fork failed */
   33 michael@paquier.xyz       235                 :UNC           0 :             pg_fatal("could not create worker process: %m");
                                236                 :                : #else
                                237                 :                :         /* empty array element are always at the end */
                                238                 :                :         new_arg = transfer_thread_args[parallel_jobs - 1];
                                239                 :                : 
                                240                 :                :         /* Can only pass one pointer into the function, so use a struct */
                                241                 :                :         new_arg->old_db_arr = old_db_arr;
                                242                 :                :         new_arg->new_db_arr = new_db_arr;
                                243                 :                :         pg_free(new_arg->old_pgdata);
                                244                 :                :         new_arg->old_pgdata = pg_strdup(old_pgdata);
                                245                 :                :         pg_free(new_arg->new_pgdata);
                                246                 :                :         new_arg->new_pgdata = pg_strdup(new_pgdata);
                                247                 :                :         pg_free(new_arg->old_tablespace);
                                248                 :                :         new_arg->old_tablespace = old_tablespace ? pg_strdup(old_tablespace) : NULL;
                                249                 :                : 
                                250                 :                :         child = (HANDLE) _beginthreadex(NULL, 0, (void *) win32_transfer_all_new_dbs,
                                251                 :                :                                         new_arg, 0, NULL);
                                252                 :                :         if (child == 0)
                                253                 :                :             pg_fatal("could not create worker thread: %m");
                                254                 :                : 
                                255                 :                :         thread_handles[parallel_jobs - 1] = child;
                                256                 :                : #endif
                                257                 :                :     }
 4113 bruce@momjian.us          258                 :CBC           3 : }
                                259                 :                : 
                                260                 :                : 
                                261                 :                : #ifdef WIN32
                                262                 :                : DWORD
                                263                 :                : win32_transfer_all_new_dbs(transfer_thread_arg *args)
                                264                 :                : {
                                265                 :                :     transfer_all_new_dbs(args->old_db_arr, args->new_db_arr, args->old_pgdata,
                                266                 :                :                          args->new_pgdata, args->old_tablespace);
                                267                 :                : 
                                268                 :                :     /* terminates thread */
                                269                 :                :     return 0;
                                270                 :                : }
                                271                 :                : #endif
                                272                 :                : 
                                273                 :                : 
                                274                 :                : /*
                                275                 :                :  *  collect status from a completed worker child
                                276                 :                :  */
                                277                 :                : bool
 4127                           278                 :              7 : reap_child(bool wait_for_child)
                                279                 :                : {
                                280                 :                : #ifndef WIN32
                                281                 :                :     int         work_status;
                                282                 :                :     pid_t       child;
                                283                 :                : #else
                                284                 :                :     int         thread_num;
                                285                 :                :     DWORD       res;
                                286                 :                : #endif
                                287                 :                : 
                                288   [ -  +  -  - ]:              7 :     if (user_opts.jobs <= 1 || parallel_jobs == 0)
                                289                 :              7 :         return false;
                                290                 :                : 
                                291                 :                : #ifndef WIN32
 1946 tgl@sss.pgh.pa.us         292                 :UBC           0 :     child = waitpid(-1, &work_status, wait_for_child ? 0 : WNOHANG);
                                293         [ #  # ]:              0 :     if (child == (pid_t) -1)
   33 michael@paquier.xyz       294                 :UNC           0 :         pg_fatal("%s() failed: %m", "waitpid");
 1946 tgl@sss.pgh.pa.us         295         [ #  # ]:UBC           0 :     if (child == 0)
                                296                 :              0 :         return false;           /* no children, or no dead children */
                                297         [ #  # ]:              0 :     if (work_status != 0)
  642                           298                 :              0 :         pg_fatal("child process exited abnormally: status %d", work_status);
                                299                 :                : #else
                                300                 :                :     /* wait for one to finish */
                                301                 :                :     thread_num = WaitForMultipleObjects(parallel_jobs, thread_handles,
                                302                 :                :                                         false, wait_for_child ? INFINITE : 0);
                                303                 :                : 
                                304                 :                :     if (thread_num == WAIT_TIMEOUT || thread_num == WAIT_FAILED)
                                305                 :                :         return false;
                                306                 :                : 
                                307                 :                :     /* compute thread index in active_threads */
                                308                 :                :     thread_num -= WAIT_OBJECT_0;
                                309                 :                : 
                                310                 :                :     /* get the result */
                                311                 :                :     GetExitCodeThread(thread_handles[thread_num], &res);
                                312                 :                :     if (res != 0)
                                313                 :                :         pg_fatal("child worker exited abnormally: %m");
                                314                 :                : 
                                315                 :                :     /* dispose of handle to stop leaks */
                                316                 :                :     CloseHandle(thread_handles[thread_num]);
                                317                 :                : 
                                318                 :                :     /* Move last slot into dead child's position */
                                319                 :                :     if (thread_num != parallel_jobs - 1)
                                320                 :                :     {
                                321                 :                :         void       *tmp_args;
                                322                 :                : 
                                323                 :                :         thread_handles[thread_num] = thread_handles[parallel_jobs - 1];
                                324                 :                : 
                                325                 :                :         /*
                                326                 :                :          * Move last active thread arg struct into the now-dead slot, and the
                                327                 :                :          * now-dead slot to the end for reuse by the next thread. Though the
                                328                 :                :          * thread struct is in use by another thread, we can safely swap the
                                329                 :                :          * struct pointers within the array.
                                330                 :                :          */
                                331                 :                :         tmp_args = cur_thread_args[thread_num];
                                332                 :                :         cur_thread_args[thread_num] = cur_thread_args[parallel_jobs - 1];
                                333                 :                :         cur_thread_args[parallel_jobs - 1] = tmp_args;
                                334                 :                :     }
                                335                 :                : #endif
                                336                 :                : 
                                337                 :                :     /* do this after job has been removed */
 4127 bruce@momjian.us          338                 :              0 :     parallel_jobs--;
                                339                 :                : 
                                340                 :              0 :     return true;
                                341                 :                : }
        

Generated by: LCOV version 2.1-beta2-3-g6141622