LCOV - differential code coverage report
Current view: top level - src/bin/pg_basebackup - streamutil.c (source / functions) Coverage Total Hit UNC LBC UIC UBC GBC GIC GNC CBC EUB ECB DUB DCB
Current: Differential Code Coverage HEAD vs 15 Lines: 71.9 % 327 235 1 21 49 21 18 110 2 105 51 108 2 1
Current Date: 2023-04-08 17:13:01 Functions: 100.0 % 15 15 14 1 14
Baseline: 15 Line coverage date bins:
Baseline Date: 2023-04-08 15:09:40 (180,240] days: 100.0 % 1 1 1
Legend: Lines: hit not hit (240..) days: 71.8 % 326 234 1 21 49 21 18 109 2 105 51 108
Function coverage date bins:
(240..) days: 51.7 % 29 15 14 1 14

 Age         Owner                  TLA  Line data    Source code
                                  1                 : /*-------------------------------------------------------------------------
                                  2                 :  *
                                  3                 :  * streamutil.c - utility functions for pg_basebackup, pg_receivewal and
                                  4                 :  *                  pg_recvlogical
                                  5                 :  *
                                  6                 :  * Author: Magnus Hagander <magnus@hagander.net>
                                  7                 :  *
                                  8                 :  * Portions Copyright (c) 1996-2023, PostgreSQL Global Development Group
                                  9                 :  *
                                 10                 :  * IDENTIFICATION
                                 11                 :  *        src/bin/pg_basebackup/streamutil.c
                                 12                 :  *-------------------------------------------------------------------------
                                 13                 :  */
                                 14                 : 
                                 15                 : #include "postgres_fe.h"
                                 16                 : 
                                 17                 : #include <sys/time.h>
                                 18                 : #include <unistd.h>
                                 19                 : 
                                 20                 : #include "access/xlog_internal.h"
                                 21                 : #include "common/connect.h"
                                 22                 : #include "common/fe_memutils.h"
                                 23                 : #include "common/file_perm.h"
                                 24                 : #include "common/logging.h"
                                 25                 : #include "common/string.h"
                                 26                 : #include "datatype/timestamp.h"
                                 27                 : #include "port/pg_bswap.h"
                                 28                 : #include "pqexpbuffer.h"
                                 29                 : #include "receivelog.h"
                                 30                 : #include "streamutil.h"
                                 31                 : 
                                 32                 : #define ERRCODE_DUPLICATE_OBJECT  "42710"
                                 33                 : 
                                 34                 : int         WalSegSz;
                                 35                 : 
                                 36                 : static bool RetrieveDataDirCreatePerm(PGconn *conn);
                                 37                 : 
                                 38                 : /* SHOW command for replication connection was introduced in version 10 */
                                 39                 : #define MINIMUM_VERSION_FOR_SHOW_CMD 100000
                                 40                 : 
                                 41                 : /*
                                 42                 :  * Group access is supported from version 11.
                                 43                 :  */
                                 44                 : #define MINIMUM_VERSION_FOR_GROUP_ACCESS 110000
                                 45                 : 
                                 46                 : const char *progname;
                                 47                 : char       *connection_string = NULL;
                                 48                 : char       *dbhost = NULL;
                                 49                 : char       *dbuser = NULL;
                                 50                 : char       *dbport = NULL;
                                 51                 : char       *dbname = NULL;
                                 52                 : int         dbgetpassword = 0;  /* 0=auto, -1=never, 1=always */
                                 53                 : static char *password = NULL;
                                 54                 : PGconn     *conn = NULL;
                                 55                 : 
                                 56                 : /*
                                 57                 :  * Connect to the server. Returns a valid PGconn pointer if connected,
                                 58                 :  * or NULL on non-permanent error. On permanent error, the function will
                                 59                 :  * call exit(1) directly.
                                 60                 :  */
                                 61                 : PGconn *
 4183 magnus                     62 CBC         299 : GetConnection(void)
                                 63                 : {
                                 64                 :     PGconn     *tmpconn;
 3695 heikki.linnakangas         65             299 :     int         argcount = 7;   /* dbname, replication, fallback_app_name,
                                 66                 :                                  * host, user, port, password */
                                 67                 :     int         i;
                                 68                 :     const char **keywords;
                                 69                 :     const char **values;
                                 70                 :     const char *tmpparam;
                                 71                 :     bool        need_password;
                                 72             299 :     PQconninfoOption *conn_opts = NULL;
                                 73                 :     PQconninfoOption *conn_opt;
                                 74             299 :     char       *err_msg = NULL;
                                 75                 : 
                                 76                 :     /* pg_recvlogical uses dbname only; others use connection_string only. */
 2435 noah                       77             299 :     Assert(dbname == NULL || connection_string == NULL);
                                 78                 : 
                                 79                 :     /*
                                 80                 :      * Merge the connection info inputs given in form of connection string,
                                 81                 :      * options and default values (dbname=replication, replication=true, etc.)
                                 82                 :      * Explicitly discard any dbname value in the connection string;
                                 83                 :      * otherwise, PQconnectdbParams() would interpret that value as being
                                 84                 :      * itself a connection string.
                                 85                 :      */
 3695 heikki.linnakangas         86             299 :     i = 0;
                                 87             299 :     if (connection_string)
                                 88                 :     {
                                 89               2 :         conn_opts = PQconninfoParse(connection_string, &err_msg);
                                 90               2 :         if (conn_opts == NULL)
  366 tgl                        91 UBC           0 :             pg_fatal("%s", err_msg);
                                 92                 : 
 3695 heikki.linnakangas         93 CBC          80 :         for (conn_opt = conn_opts; conn_opt->keyword != NULL; conn_opt++)
                                 94                 :         {
 2435 noah                       95              78 :             if (conn_opt->val != NULL && conn_opt->val[0] != '\0' &&
                                 96               6 :                 strcmp(conn_opt->keyword, "dbname") != 0)
 3695 heikki.linnakangas         97               4 :                 argcount++;
                                 98                 :         }
                                 99                 : 
                                100               2 :         keywords = pg_malloc0((argcount + 1) * sizeof(*keywords));
                                101               2 :         values = pg_malloc0((argcount + 1) * sizeof(*values));
                                102                 : 
                                103              80 :         for (conn_opt = conn_opts; conn_opt->keyword != NULL; conn_opt++)
                                104                 :         {
 2435 noah                      105              78 :             if (conn_opt->val != NULL && conn_opt->val[0] != '\0' &&
                                106               6 :                 strcmp(conn_opt->keyword, "dbname") != 0)
                                107                 :             {
 3695 heikki.linnakangas        108               4 :                 keywords[i] = conn_opt->keyword;
                                109               4 :                 values[i] = conn_opt->val;
                                110               4 :                 i++;
                                111                 :             }
                                112                 :         }
                                113                 :     }
                                114                 :     else
                                115                 :     {
                                116             297 :         keywords = pg_malloc0((argcount + 1) * sizeof(*keywords));
                                117             297 :         values = pg_malloc0((argcount + 1) * sizeof(*values));
                                118                 :     }
                                119                 : 
                                120             299 :     keywords[i] = "dbname";
 3309 rhaas                     121             299 :     values[i] = dbname == NULL ? "replication" : dbname;
 3695 heikki.linnakangas        122             299 :     i++;
                                123             299 :     keywords[i] = "replication";
 3309 rhaas                     124             299 :     values[i] = dbname == NULL ? "true" : "database";
 3695 heikki.linnakangas        125             299 :     i++;
                                126             299 :     keywords[i] = "fallback_application_name";
                                127             299 :     values[i] = progname;
                                128             299 :     i++;
                                129                 : 
 4183 magnus                    130             299 :     if (dbhost)
                                131                 :     {
                                132             107 :         keywords[i] = "host";
                                133             107 :         values[i] = dbhost;
                                134             107 :         i++;
                                135                 :     }
                                136             299 :     if (dbuser)
                                137                 :     {
                                138               7 :         keywords[i] = "user";
                                139               7 :         values[i] = dbuser;
                                140               7 :         i++;
                                141                 :     }
                                142             299 :     if (dbport)
                                143                 :     {
                                144             107 :         keywords[i] = "port";
                                145             107 :         values[i] = dbport;
                                146             107 :         i++;
                                147                 :     }
                                148                 : 
                                149                 :     /* If -W was given, force prompt for password, but only the first time */
  948 tgl                       150             299 :     need_password = (dbgetpassword == 1 && !password);
                                151                 : 
                                152                 :     do
                                153                 :     {
                                154                 :         /* Get a new password if appropriate */
 3432                           155             299 :         if (need_password)
                                156                 :         {
  297 peter                     157 UNC           0 :             free(password);
  948 tgl                       158 UBC           0 :             password = simple_prompt("Password: ", false);
 3432 tgl                       159 UIC           0 :             need_password = false;
                                160                 :         }
                                161                 : 
 3432 tgl                       162 ECB             :         /* Use (or reuse, on a subsequent connection) password if we have it */
  948 tgl                       163 GIC         299 :         if (password)
 4183 magnus                    164 EUB             :         {
 3695 heikki.linnakangas        165 UBC           0 :             keywords[i] = "password";
 2413 tgl                       166 UIC           0 :             values[i] = password;
                                167                 :         }
                                168                 :         else
 4183 magnus                    169 ECB             :         {
 3432 tgl                       170 CBC         299 :             keywords[i] = NULL;
 3432 tgl                       171 GIC         299 :             values[i] = NULL;
                                172                 :         }
 4183 magnus                    173 ECB             : 
 4183 magnus                    174 GIC         299 :         tmpconn = PQconnectdbParams(keywords, values, true);
                                175                 : 
                                176                 :         /*
                                177                 :          * If there is too little memory even to allocate the PGconn object
                                178                 :          * and PQconnectdbParams returns NULL, we call exit(1) directly.
 3923 magnus                    179 ECB             :          */
 3923 magnus                    180 GBC         299 :         if (!tmpconn)
  366 tgl                       181 UIC           0 :             pg_fatal("could not connect to server");
                                182                 : 
 3432 tgl                       183 ECB             :         /* If we need a password and -w wasn't given, loop back and get one */
 4183 magnus                    184 CBC         301 :         if (PQstatus(tmpconn) == CONNECTION_BAD &&
 4183 magnus                    185 GBC           2 :             PQconnectionNeedsPassword(tmpconn) &&
 4183 magnus                    186 UIC           0 :             dbgetpassword != -1)
 4183 magnus                    187 EUB             :         {
 3923 magnus                    188 UBC           0 :             PQfinish(tmpconn);
 3432 tgl                       189 UIC           0 :             need_password = true;
                                190                 :         }
 3432 tgl                       191 ECB             :     }
 3426 peter_e                   192 GIC         299 :     while (need_password);
 4183 magnus                    193 ECB             : 
 3432 tgl                       194 GIC         299 :     if (PQstatus(tmpconn) != CONNECTION_OK)
 3432 tgl                       195 ECB             :     {
  883 peter                     196 CBC           2 :         pg_log_error("%s", PQerrorMessage(tmpconn));
 3432 tgl                       197               2 :         PQfinish(tmpconn);
 4183 magnus                    198               2 :         free(values);
                                199               2 :         free(keywords);
  280 peter                     200 GNC           2 :         PQconninfoFree(conn_opts);
 3432 tgl                       201 GIC           2 :         return NULL;
                                202                 :     }
 4183 magnus                    203 ECB             : 
 3432 tgl                       204                 :     /* Connection ok! */
 3432 tgl                       205 CBC         297 :     free(values);
 3432 tgl                       206 GIC         297 :     free(keywords);
  280 peter                     207 GNC         297 :     PQconninfoFree(conn_opts);
                                208                 : 
                                209                 :     /*
                                210                 :      * Set always-secure search path, so malicious users can't get control.
                                211                 :      * The capacity to run normal SQL queries was added in PostgreSQL 10, so
 1809 tgl                       212 ECB             :      * the search path cannot be changed (by us or attackers) on earlier
                                213                 :      * versions.
                                214                 :      */
 1810 noah                      215 GIC         297 :     if (dbname != NULL && PQserverVersion(tmpconn) >= 100000)
 1868 noah                      216 ECB             :     {
                                217                 :         PGresult   *res;
                                218                 : 
 1868 noah                      219 GBC          47 :         res = PQexec(tmpconn, ALWAYS_SECURE_SEARCH_PATH_SQL);
 1868 noah                      220 GIC          47 :         if (PQresultStatus(res) != PGRES_TUPLES_OK)
 1868 noah                      221 EUB             :         {
 1469 peter                     222 UBC           0 :             pg_log_error("could not clear search_path: %s",
 1469 peter                     223 EUB             :                          PQerrorMessage(tmpconn));
 1868 noah                      224 UIC           0 :             PQclear(res);
 1868 noah                      225 LBC           0 :             PQfinish(tmpconn);
 1868 noah                      226 UIC           0 :             exit(1);
                                227                 :         }
 1868 noah                      228 GIC          47 :         PQclear(res);
                                229                 :     }
                                230                 : 
                                231                 :     /*
 2236 tgl                       232 ECB             :      * Ensure we have the same value of integer_datetimes (now always "on") as
                                233                 :      * the server we are connecting to.
                                234                 :      */
 3432 tgl                       235 GBC         297 :     tmpparam = PQparameterStatus(tmpconn, "integer_datetimes");
                                236             297 :     if (!tmpparam)
 3432 tgl                       237 EUB             :     {
 1469 peter                     238 UIC           0 :         pg_log_error("could not determine server setting for integer_datetimes");
 3432 tgl                       239               0 :         PQfinish(tmpconn);
 3432 tgl                       240 LBC           0 :         exit(1);
                                241                 :     }
 3974 rhaas                     242 EUB             : 
 3432 tgl                       243 GBC         297 :     if (strcmp(tmpparam, "on") != 0)
 3432 tgl                       244 EUB             :     {
 1469 peter                     245 UIC           0 :         pg_log_error("integer_datetimes compile flag does not match server");
 3432 tgl                       246               0 :         PQfinish(tmpconn);
                                247               0 :         exit(1);
                                248                 :     }
                                249                 : 
                                250                 :     /*
 1828 sfrost                    251 ECB             :      * Retrieve the source data directory mode and use it to construct a umask
                                252                 :      * for creating directories and files.
 1828 sfrost                    253 EUB             :      */
 1828 sfrost                    254 GBC         297 :     if (!RetrieveDataDirCreatePerm(tmpconn))
                                255                 :     {
 1828 sfrost                    256 UIC           0 :         PQfinish(tmpconn);
 1828 sfrost                    257 LBC           0 :         exit(1);
                                258                 :     }
                                259                 : 
 3432 tgl                       260 GIC         297 :     return tmpconn;
                                261                 : }
                                262                 : 
                                263                 : /*
                                264                 :  * From version 10, explicitly set wal segment size using SHOW wal_segment_size
 2028 andres                    265 ECB             :  * since ControlFile is not accessible here.
                                266                 :  */
                                267                 : bool
 2028 andres                    268 GIC         149 : RetrieveWalSegSize(PGconn *conn)
                                269                 : {
 2028 andres                    270 ECB             :     PGresult   *res;
                                271                 :     char        xlog_unit[3];
                                272                 :     int         xlog_val,
 2028 andres                    273 CBC         149 :                 multiplier = 1;
                                274                 : 
                                275                 :     /* check connection existence */
                                276             149 :     Assert(conn != NULL);
                                277                 : 
 2028 andres                    278 EUB             :     /* for previous versions set the default xlog seg size */
 2028 andres                    279 GBC         149 :     if (PQserverVersion(conn) < MINIMUM_VERSION_FOR_SHOW_CMD)
                                280                 :     {
 2028 andres                    281 UIC           0 :         WalSegSz = DEFAULT_XLOG_SEG_SIZE;
 2028 andres                    282 LBC           0 :         return true;
 2028 andres                    283 ECB             :     }
                                284                 : 
 2028 andres                    285 GBC         149 :     res = PQexec(conn, "SHOW wal_segment_size");
 2028 andres                    286 GIC         149 :     if (PQresultStatus(res) != PGRES_TUPLES_OK)
                                287                 :     {
 1469 peter                     288 UBC           0 :         pg_log_error("could not send replication command \"%s\": %s",
 1469 peter                     289 EUB             :                      "SHOW wal_segment_size", PQerrorMessage(conn));
                                290                 : 
 2028 andres                    291 LBC           0 :         PQclear(res);
 2028 andres                    292 UIC           0 :         return false;
 2028 andres                    293 EUB             :     }
 2028 andres                    294 GIC         149 :     if (PQntuples(res) != 1 || PQnfields(res) < 1)
                                295                 :     {
 1469 peter                     296 UBC           0 :         pg_log_error("could not fetch WAL segment size: got %d rows and %d fields, expected %d rows and %d or more fields",
 1469 peter                     297 EUB             :                      PQntuples(res), PQnfields(res), 1, 1);
                                298                 : 
 2028 andres                    299 UIC           0 :         PQclear(res);
                                300               0 :         return false;
 2028 andres                    301 ECB             :     }
                                302                 : 
 2028 andres                    303 EUB             :     /* fetch xlog value and unit from the result */
  537 dgustafsson               304 GBC         149 :     if (sscanf(PQgetvalue(res, 0, 0), "%d%2s", &xlog_val, xlog_unit) != 2)
 2028 andres                    305 EUB             :     {
 1469 peter                     306 UIC           0 :         pg_log_error("WAL segment size could not be parsed");
 1087 michael                   307               0 :         PQclear(res);
 2028 andres                    308 LBC           0 :         return false;
                                309                 :     }
                                310                 : 
 1087 michael                   311 CBC         149 :     PQclear(res);
 1087 michael                   312 ECB             : 
 2028 andres                    313 EUB             :     /* set the multiplier based on unit to convert xlog_val to bytes */
 2028 andres                    314 GBC         149 :     if (strcmp(xlog_unit, "MB") == 0)
 2028 andres                    315 GIC         149 :         multiplier = 1024 * 1024;
 2028 andres                    316 UIC           0 :     else if (strcmp(xlog_unit, "GB") == 0)
 2028 andres                    317 LBC           0 :         multiplier = 1024 * 1024 * 1024;
                                318                 : 
 2028 andres                    319 ECB             :     /* convert and set WalSegSz */
 2028 andres                    320 GIC         149 :     WalSegSz = xlog_val * multiplier;
 2028 andres                    321 EUB             : 
 2028 andres                    322 GIC         149 :     if (!IsValidWalSegSize(WalSegSz))
                                323                 :     {
 1469 peter                     324 UIC           0 :         pg_log_error(ngettext("WAL segment size must be a power of two between 1 MB and 1 GB, but the remote server reported a value of %d byte",
 1469 peter                     325 EUB             :                               "WAL segment size must be a power of two between 1 MB and 1 GB, but the remote server reported a value of %d bytes",
                                326                 :                               WalSegSz),
                                327                 :                      WalSegSz);
 2028 andres                    328 LBC           0 :         return false;
                                329                 :     }
                                330                 : 
 2028 andres                    331 GIC         149 :     return true;
                                332                 : }
                                333                 : 
                                334                 : /*
                                335                 :  * RetrieveDataDirCreatePerm
                                336                 :  *
                                337                 :  * This function is used to determine the privileges on the server's PG data
                                338                 :  * directory and, based on that, set what the permissions will be for
                                339                 :  * directories and files we create.
                                340                 :  *
                                341                 :  * PG11 added support for (optionally) group read/execute rights to be set on
                                342                 :  * the data directory.  Prior to PG11, only the owner was allowed to have rights
 1828 sfrost                    343 ECB             :  * on the data directory.
                                344                 :  */
                                345                 : static bool
 1828 sfrost                    346 GIC         297 : RetrieveDataDirCreatePerm(PGconn *conn)
                                347                 : {
                                348                 :     PGresult   *res;
 1828 sfrost                    349 ECB             :     int         data_directory_mode;
                                350                 : 
                                351                 :     /* check connection existence */
 1828 sfrost                    352 CBC         297 :     Assert(conn != NULL);
 1828 sfrost                    353 EUB             : 
                                354                 :     /* for previous versions leave the default group access */
 1828 sfrost                    355 CBC         297 :     if (PQserverVersion(conn) < MINIMUM_VERSION_FOR_GROUP_ACCESS)
 1828 sfrost                    356 LBC           0 :         return true;
                                357                 : 
 1828 sfrost                    358 GBC         297 :     res = PQexec(conn, "SHOW data_directory_mode");
 1828 sfrost                    359 GIC         297 :     if (PQresultStatus(res) != PGRES_TUPLES_OK)
                                360                 :     {
 1469 peter                     361 UBC           0 :         pg_log_error("could not send replication command \"%s\": %s",
 1469 peter                     362 EUB             :                      "SHOW data_directory_mode", PQerrorMessage(conn));
                                363                 : 
 1828 sfrost                    364 LBC           0 :         PQclear(res);
 1828 sfrost                    365 UIC           0 :         return false;
 1828 sfrost                    366 EUB             :     }
 1828 sfrost                    367 GIC         297 :     if (PQntuples(res) != 1 || PQnfields(res) < 1)
                                368                 :     {
 1469 peter                     369 UBC           0 :         pg_log_error("could not fetch group access flag: got %d rows and %d fields, expected %d rows and %d or more fields",
 1469 peter                     370 EUB             :                      PQntuples(res), PQnfields(res), 1, 1);
                                371                 : 
 1828 sfrost                    372 UIC           0 :         PQclear(res);
 1828 sfrost                    373 LBC           0 :         return false;
                                374                 :     }
 1828 sfrost                    375 EUB             : 
 1828 sfrost                    376 GIC         297 :     if (sscanf(PQgetvalue(res, 0, 0), "%o", &data_directory_mode) != 1)
                                377                 :     {
 1469 peter                     378 UBC           0 :         pg_log_error("group access flag could not be parsed: %s",
 1469 peter                     379 EUB             :                      PQgetvalue(res, 0, 0));
                                380                 : 
 1828 sfrost                    381 UIC           0 :         PQclear(res);
 1828 sfrost                    382 LBC           0 :         return false;
                                383                 :     }
 1828 sfrost                    384 ECB             : 
 1828 sfrost                    385 CBC         297 :     SetDataDirectoryCreatePerm(data_directory_mode);
                                386                 : 
 1828 sfrost                    387 GIC         297 :     PQclear(res);
                                388             297 :     return true;
                                389                 : }
                                390                 : 
                                391                 : /*
                                392                 :  * Run IDENTIFY_SYSTEM through a given connection and give back to caller
                                393                 :  * some result information if requested:
                                394                 :  * - System identifier
                                395                 :  * - Current timeline ID
                                396                 :  * - Start LSN position
 2651 alvherre                  397 ECB             :  * - Database name (NULL in servers prior to 9.4)
                                398                 :  */
                                399                 : bool
 3112 andres                    400 GIC         305 : RunIdentifySystem(PGconn *conn, char **sysid, TimeLineID *starttli,
                                401                 :                   XLogRecPtr *startpos, char **db_name)
                                402                 : {
                                403                 :     PGresult   *res;
                                404                 :     uint32      hi,
 2878 bruce                     405 ECB             :                 lo;
                                406                 : 
 3112 andres                    407                 :     /* Check connection existence */
 3112 andres                    408 CBC         305 :     Assert(conn != NULL);
                                409                 : 
 3112 andres                    410 GBC         305 :     res = PQexec(conn, "IDENTIFY_SYSTEM");
 3112 andres                    411 GIC         305 :     if (PQresultStatus(res) != PGRES_TUPLES_OK)
                                412                 :     {
 1469 peter                     413 UBC           0 :         pg_log_error("could not send replication command \"%s\": %s",
 1469 peter                     414 EUB             :                      "IDENTIFY_SYSTEM", PQerrorMessage(conn));
                                415                 : 
 3107 sfrost                    416 LBC           0 :         PQclear(res);
 3112 andres                    417 UIC           0 :         return false;
 3112 andres                    418 EUB             :     }
 3112 andres                    419 GIC         305 :     if (PQntuples(res) != 1 || PQnfields(res) < 3)
                                420                 :     {
 1469 peter                     421 UBC           0 :         pg_log_error("could not identify system: got %d rows and %d fields, expected %d rows and %d or more fields",
 1469 peter                     422 EUB             :                      PQntuples(res), PQnfields(res), 1, 3);
                                423                 : 
 3107 sfrost                    424 UIC           0 :         PQclear(res);
 3112 andres                    425               0 :         return false;
 3112 andres                    426 ECB             :     }
                                427                 : 
                                428                 :     /* Get system identifier */
 3112 andres                    429 GIC         305 :     if (sysid != NULL)
 3112 andres                    430 CBC         249 :         *sysid = pg_strdup(PQgetvalue(res, 0, 0));
 3112 andres                    431 ECB             : 
                                432                 :     /* Get timeline ID to start streaming from */
 3112 andres                    433 GIC         305 :     if (starttli != NULL)
 3112 andres                    434 CBC         249 :         *starttli = atoi(PQgetvalue(res, 0, 1));
                                435                 : 
 3112 andres                    436 ECB             :     /* Get LSN start position if necessary */
 3112 andres                    437 GIC         305 :     if (startpos != NULL)
 3112 andres                    438 EUB             :     {
 3112 andres                    439 GIC           7 :         if (sscanf(PQgetvalue(res, 0, 2), "%X/%X", &hi, &lo) != 2)
                                440                 :         {
 1469 peter                     441 UBC           0 :             pg_log_error("could not parse write-ahead log location \"%s\"",
 1469 peter                     442 EUB             :                          PQgetvalue(res, 0, 2));
                                443                 : 
 3107 sfrost                    444 LBC           0 :             PQclear(res);
 3112 andres                    445 UIC           0 :             return false;
                                446                 :         }
 3112 andres                    447 GIC           7 :         *startpos = ((uint64) hi) << 32 | lo;
 3112 andres                    448 ECB             :     }
                                449                 : 
                                450                 :     /* Get database name, only available in 9.4 and newer versions */
 2878 bruce                     451 CBC         305 :     if (db_name != NULL)
                                452                 :     {
 2651 alvherre                  453              56 :         *db_name = NULL;
 2651 alvherre                  454 GIC          56 :         if (PQserverVersion(conn) >= 90400)
 2651 alvherre                  455 EUB             :         {
 2651 alvherre                  456 GIC          56 :             if (PQnfields(res) < 4)
                                457                 :             {
 1469 peter                     458 UBC           0 :                 pg_log_error("could not identify system: got %d rows and %d fields, expected %d rows and %d or more fields",
 1469 peter                     459 EUB             :                              PQntuples(res), PQnfields(res), 1, 4);
                                460                 : 
 2651 alvherre                  461 LBC           0 :                 PQclear(res);
                                462               0 :                 return false;
                                463                 :             }
 2651 alvherre                  464 GIC          56 :             if (!PQgetisnull(res, 0, 3))
                                465              47 :                 *db_name = pg_strdup(PQgetvalue(res, 0, 3));
 2651 alvherre                  466 ECB             :         }
 3112 andres                    467                 :     }
                                468                 : 
 3112 andres                    469 GIC         305 :     PQclear(res);
                                470             305 :     return true;
                                471                 : }
                                472                 : 
                                473                 : /*
                                474                 :  * Run READ_REPLICATION_SLOT through a given connection and give back to
                                475                 :  * caller some result information if requested for this slot:
                                476                 :  * - Start LSN position, InvalidXLogRecPtr if unknown.
                                477                 :  * - Current timeline ID, 0 if unknown.
  530 michael                   478 ECB             :  * Returns false on failure, and true otherwise.
                                479                 :  */
                                480                 : bool
  530 michael                   481 GIC           3 : GetSlotInformation(PGconn *conn, const char *slot_name,
                                482                 :                    XLogRecPtr *restart_lsn, TimeLineID *restart_tli)
  530 michael                   483 ECB             : {
                                484                 :     PGresult   *res;
                                485                 :     PQExpBuffer query;
  530 michael                   486 CBC           3 :     XLogRecPtr  lsn_loc = InvalidXLogRecPtr;
                                487               3 :     TimeLineID  tli_loc = 0;
  530 michael                   488 ECB             : 
  530 michael                   489 CBC           3 :     if (restart_lsn)
  530 michael                   490 GIC           3 :         *restart_lsn = lsn_loc;
  530 michael                   491 CBC           3 :     if (restart_tli)
                                492               3 :         *restart_tli = tli_loc;
  530 michael                   493 ECB             : 
  530 michael                   494 CBC           3 :     query = createPQExpBuffer();
  530 michael                   495 GIC           3 :     appendPQExpBuffer(query, "READ_REPLICATION_SLOT %s", slot_name);
  530 michael                   496 CBC           3 :     res = PQexec(conn, query->data);
  530 michael                   497 GIC           3 :     destroyPQExpBuffer(query);
  530 michael                   498 EUB             : 
  530 michael                   499 GIC           3 :     if (PQresultStatus(res) != PGRES_TUPLES_OK)
  530 michael                   500 EUB             :     {
  530 michael                   501 UBC           0 :         pg_log_error("could not send replication command \"%s\": %s",
                                502                 :                      "READ_REPLICATION_SLOT", PQerrorMessage(conn));
  530 michael                   503 UIC           0 :         PQclear(res);
                                504               0 :         return false;
  530 michael                   505 ECB             :     }
                                506                 : 
  530 michael                   507 EUB             :     /* The command should always return precisely one tuple and three fields */
  530 michael                   508 GIC           3 :     if (PQntuples(res) != 1 || PQnfields(res) != 3)
  530 michael                   509 EUB             :     {
  530 michael                   510 UBC           0 :         pg_log_error("could not read replication slot \"%s\": got %d rows and %d fields, expected %d rows and %d fields",
                                511                 :                      slot_name, PQntuples(res), PQnfields(res), 1, 3);
  530 michael                   512 UIC           0 :         PQclear(res);
                                513               0 :         return false;
                                514                 :     }
                                515                 : 
                                516                 :     /*
  530 michael                   517 ECB             :      * When the slot doesn't exist, the command returns a tuple with NULL
                                518                 :      * values.  This checks only the slot type field.
                                519                 :      */
  530 michael                   520 CBC           3 :     if (PQgetisnull(res, 0, 0))
  530 michael                   521 ECB             :     {
  197 peter                     522 GIC           1 :         pg_log_error("replication slot \"%s\" does not exist", slot_name);
  530 michael                   523               1 :         PQclear(res);
                                524               1 :         return false;
                                525                 :     }
                                526                 : 
                                527                 :     /*
  530 michael                   528 ECB             :      * Note that this cannot happen as READ_REPLICATION_SLOT supports only
                                529                 :      * physical slots, but play it safe.
  530 michael                   530 EUB             :      */
  530 michael                   531 GIC           2 :     if (strcmp(PQgetvalue(res, 0, 0), "physical") != 0)
  530 michael                   532 EUB             :     {
  530 michael                   533 UBC           0 :         pg_log_error("expected a physical replication slot, got type \"%s\" instead",
                                534                 :                      PQgetvalue(res, 0, 0));
  530 michael                   535 UIC           0 :         PQclear(res);
                                536               0 :         return false;
  530 michael                   537 ECB             :     }
                                538                 : 
                                539                 :     /* restart LSN */
  530 michael                   540 GIC           2 :     if (!PQgetisnull(res, 0, 1))
                                541                 :     {
  530 michael                   542 ECB             :         uint32      hi,
                                543                 :                     lo;
  530 michael                   544 EUB             : 
  530 michael                   545 GIC           2 :         if (sscanf(PQgetvalue(res, 0, 1), "%X/%X", &hi, &lo) != 2)
  530 michael                   546 EUB             :         {
  530 michael                   547 UBC           0 :             pg_log_error("could not parse restart_lsn \"%s\" for replication slot \"%s\"",
                                548                 :                          PQgetvalue(res, 0, 1), slot_name);
  530 michael                   549 LBC           0 :             PQclear(res);
  530 michael                   550 UIC           0 :             return false;
                                551                 :         }
  530 michael                   552 GIC           2 :         lsn_loc = ((uint64) hi) << 32 | lo;
  530 michael                   553 ECB             :     }
                                554                 : 
                                555                 :     /* current TLI */
  530 michael                   556 CBC           2 :     if (!PQgetisnull(res, 0, 2))
  530 michael                   557 GIC           2 :         tli_loc = (TimeLineID) atol(PQgetvalue(res, 0, 2));
                                558                 : 
  530 michael                   559 CBC           2 :     PQclear(res);
  530 michael                   560 ECB             : 
                                561                 :     /* Assign results if requested */
  530 michael                   562 CBC           2 :     if (restart_lsn)
  530 michael                   563 GIC           2 :         *restart_lsn = lsn_loc;
  530 michael                   564 CBC           2 :     if (restart_tli)
  530 michael                   565 GIC           2 :         *restart_tli = tli_loc;
                                566                 : 
                                567               2 :     return true;
                                568                 : }
                                569                 : 
                                570                 : /*
                                571                 :  * Create a replication slot for the given connection. This function
 2212 magnus                    572 ECB             :  * returns true in case of success.
                                573                 :  */
                                574                 : bool
 3112 andres                    575 GIC         118 : CreateReplicationSlot(PGconn *conn, const char *slot_name, const char *plugin,
                                576                 :                       bool is_temporary, bool is_physical, bool reserve_wal,
                                577                 :                       bool slot_exists_ok, bool two_phase)
 3112 andres                    578 ECB             : {
                                579                 :     PQExpBuffer query;
                                580                 :     PGresult   *res;
  551 rhaas                     581 GIC         118 :     bool        use_new_option_syntax = (PQserverVersion(conn) >= 150000);
 3112 andres                    582 ECB             : 
 3112 andres                    583 GIC         118 :     query = createPQExpBuffer();
 3112 andres                    584 ECB             : 
 3112 andres                    585 CBC         118 :     Assert((is_physical && plugin == NULL) ||
                                586                 :            (!is_physical && plugin != NULL));
  648 akapila                   587 GIC         118 :     Assert(!(two_phase && is_physical));
 3112 andres                    588 CBC         118 :     Assert(slot_name != NULL);
 3112 andres                    589 ECB             : 
  551 rhaas                     590                 :     /* Build base portion of query */
 2021 peter_e                   591 CBC         118 :     appendPQExpBuffer(query, "CREATE_REPLICATION_SLOT \"%s\"", slot_name);
                                592             118 :     if (is_temporary)
 1375 drowley                   593 GIC          92 :         appendPQExpBufferStr(query, " TEMPORARY");
 3112 andres                    594 CBC         118 :     if (is_physical)
 1375 drowley                   595 GIC          95 :         appendPQExpBufferStr(query, " PHYSICAL");
                                596                 :     else
  551 rhaas                     597 CBC          23 :         appendPQExpBuffer(query, " LOGICAL \"%s\"", plugin);
  551 rhaas                     598 ECB             : 
                                599                 :     /* Add any requested options */
  551 rhaas                     600 GIC         118 :     if (use_new_option_syntax)
  551 rhaas                     601 CBC         118 :         appendPQExpBufferStr(query, " (");
                                602             118 :     if (is_physical)
                                603                 :     {
 2021 peter_e                   604 GIC          95 :         if (reserve_wal)
  551 rhaas                     605              94 :             AppendPlainCommandOption(query, use_new_option_syntax,
                                606                 :                                      "RESERVE_WAL");
 2021 peter_e                   607 ECB             :     }
 3112 andres                    608                 :     else
                                609                 :     {
  648 akapila                   610 GIC          23 :         if (two_phase && PQserverVersion(conn) >= 150000)
  551 rhaas                     611 CBC           1 :             AppendPlainCommandOption(query, use_new_option_syntax,
                                612                 :                                      "TWO_PHASE");
                                613                 : 
 2217 peter_e                   614              23 :         if (PQserverVersion(conn) >= 100000)
  551 rhaas                     615 ECB             :         {
                                616                 :             /* pg_recvlogical doesn't use an exported snapshot, so suppress */
  551 rhaas                     617 GIC          23 :             if (use_new_option_syntax)
  551 rhaas                     618 GBC          23 :                 AppendStringCommandOption(query, use_new_option_syntax,
                                619                 :                                           "SNAPSHOT", "nothing");
                                620                 :             else
  551 rhaas                     621 UIC           0 :                 AppendPlainCommandOption(query, use_new_option_syntax,
  551 rhaas                     622 ECB             :                                          "NOEXPORT_SNAPSHOT");
                                623                 :         }
                                624                 :     }
  551 rhaas                     625 CBC         118 :     if (use_new_option_syntax)
                                626                 :     {
  551 rhaas                     627 ECB             :         /* Suppress option list if it would be empty, otherwise terminate */
  551 rhaas                     628 CBC         118 :         if (query->data[query->len - 1] == '(')
                                629                 :         {
  551 rhaas                     630 GIC           1 :             query->len -= 2;
  551 rhaas                     631 CBC           1 :             query->data[query->len] = '\0';
                                632                 :         }
                                633                 :         else
  551 rhaas                     634 GIC         117 :             appendPQExpBufferChar(query, ')');
 2217 peter_e                   635 ECB             :     }
 3112 andres                    636                 : 
                                637                 :     /* Now run the query */
 3112 andres                    638 CBC         118 :     res = PQexec(conn, query->data);
 3112 andres                    639 GIC         118 :     if (PQresultStatus(res) != PGRES_TUPLES_OK)
 3112 andres                    640 ECB             :     {
 2828 andres                    641 GBC           1 :         const char *sqlstate = PQresultErrorField(res, PG_DIAG_SQLSTATE);
 3107 sfrost                    642 EUB             : 
 2797 andres                    643 GIC           1 :         if (slot_exists_ok &&
 2797 andres                    644 UBC           0 :             sqlstate &&
                                645               0 :             strcmp(sqlstate, ERRCODE_DUPLICATE_OBJECT) == 0)
 2828 andres                    646 EUB             :         {
 2828 andres                    647 UIC           0 :             destroyPQExpBuffer(query);
                                648               0 :             PQclear(res);
                                649               0 :             return true;
 2828 andres                    650 ECB             :         }
                                651                 :         else
                                652                 :         {
 1469 peter                     653 CBC           1 :             pg_log_error("could not send replication command \"%s\": %s",
 1469 peter                     654 ECB             :                          query->data, PQerrorMessage(conn));
 2828 andres                    655                 : 
 2828 andres                    656 GIC           1 :             destroyPQExpBuffer(query);
                                657               1 :             PQclear(res);
                                658               1 :             return false;
 2828 andres                    659 ECB             :         }
                                660                 :     }
 3112 andres                    661 EUB             : 
 3112 andres                    662 GIC         117 :     if (PQntuples(res) != 1 || PQnfields(res) != 4)
                                663                 :     {
 1469 peter                     664 UIC           0 :         pg_log_error("could not create replication slot \"%s\": got %d rows and %d fields, expected %d rows and %d fields",
 1469 peter                     665 EUB             :                      slot_name,
                                666                 :                      PQntuples(res), PQnfields(res), 1, 4);
 3107 sfrost                    667                 : 
 3107 sfrost                    668 UIC           0 :         destroyPQExpBuffer(query);
                                669               0 :         PQclear(res);
 3112 andres                    670 LBC           0 :         return false;
 3112 andres                    671 ECB             :     }
                                672                 : 
 3107 sfrost                    673 GIC         117 :     destroyPQExpBuffer(query);
 3112 andres                    674             117 :     PQclear(res);
                                675             117 :     return true;
                                676                 : }
                                677                 : 
                                678                 : /*
                                679                 :  * Drop a replication slot for the given connection. This function
 3112 andres                    680 ECB             :  * returns true in case of success.
                                681                 :  */
                                682                 : bool
 3112 andres                    683 GIC           2 : DropReplicationSlot(PGconn *conn, const char *slot_name)
                                684                 : {
 3112 andres                    685 ECB             :     PQExpBuffer query;
                                686                 :     PGresult   *res;
                                687                 : 
 3112 andres                    688 GIC           2 :     Assert(slot_name != NULL);
                                689                 : 
 3112 andres                    690 CBC           2 :     query = createPQExpBuffer();
                                691                 : 
 3112 andres                    692 ECB             :     /* Build query */
 3112 andres                    693 CBC           2 :     appendPQExpBuffer(query, "DROP_REPLICATION_SLOT \"%s\"",
                                694                 :                       slot_name);
 3112 andres                    695 GBC           2 :     res = PQexec(conn, query->data);
 3112 andres                    696 GIC           2 :     if (PQresultStatus(res) != PGRES_COMMAND_OK)
                                697                 :     {
 1469 peter                     698 UBC           0 :         pg_log_error("could not send replication command \"%s\": %s",
 1469 peter                     699 EUB             :                      query->data, PQerrorMessage(conn));
 3107 sfrost                    700                 : 
 3107 sfrost                    701 UIC           0 :         destroyPQExpBuffer(query);
                                702               0 :         PQclear(res);
 3112 andres                    703 LBC           0 :         return false;
                                704                 :     }
 3112 andres                    705 EUB             : 
 3112 andres                    706 GIC           2 :     if (PQntuples(res) != 0 || PQnfields(res) != 0)
                                707                 :     {
 1469 peter                     708 UIC           0 :         pg_log_error("could not drop replication slot \"%s\": got %d rows and %d fields, expected %d rows and %d fields",
 1469 peter                     709 EUB             :                      slot_name,
                                710                 :                      PQntuples(res), PQnfields(res), 0, 0);
 3107 sfrost                    711                 : 
 3107 sfrost                    712 UIC           0 :         destroyPQExpBuffer(query);
                                713               0 :         PQclear(res);
 3112 andres                    714 LBC           0 :         return false;
 3112 andres                    715 ECB             :     }
                                716                 : 
 2828 tgl                       717 GIC           2 :     destroyPQExpBuffer(query);
 3112 andres                    718               2 :     PQclear(res);
                                719               2 :     return true;
                                720                 : }
                                721                 : 
                                722                 : /*
                                723                 :  * Append a "plain" option - one with no value - to a server command that
                                724                 :  * is being constructed.
                                725                 :  *
                                726                 :  * In the old syntax, all options were parser keywords, so you could just
                                727                 :  * write things like SOME_COMMAND OPTION1 OPTION2 'opt2value' OPTION3 42. The
                                728                 :  * new syntax uses a comma-separated list surrounded by parentheses, so the
  551 rhaas                     729 ECB             :  * equivalent is SOME_COMMAND (OPTION1, OPTION2 'optvalue', OPTION3 42).
                                730                 :  */
                                731                 : void
  551 rhaas                     732 CBC        1066 : AppendPlainCommandOption(PQExpBuffer buf, bool use_new_option_syntax,
                                733                 :                          char *option_name)
  551 rhaas                     734 ECB             : {
  551 rhaas                     735 CBC        1066 :     if (buf->len > 0 && buf->data[buf->len - 1] != '(')
                                736                 :     {
  551 rhaas                     737 GBC         807 :         if (use_new_option_syntax)
  551 rhaas                     738 GIC         807 :             appendPQExpBufferStr(buf, ", ");
                                739                 :         else
  551 rhaas                     740 LBC           0 :             appendPQExpBufferChar(buf, ' ');
  551 rhaas                     741 ECB             :     }
                                742                 : 
  551 rhaas                     743 GIC        1066 :     appendPQExpBuffer(buf, " %s", option_name);
                                744            1066 : }
                                745                 : 
                                746                 : /*
                                747                 :  * Append an option with an associated string value to a server command that
                                748                 :  * is being constructed.
                                749                 :  *
  551 rhaas                     750 ECB             :  * See comments for AppendPlainCommandOption, above.
                                751                 :  */
                                752                 : void
  551 rhaas                     753 CBC         639 : AppendStringCommandOption(PQExpBuffer buf, bool use_new_option_syntax,
                                754                 :                           char *option_name, char *option_value)
  551 rhaas                     755 ECB             : {
  551 rhaas                     756 GIC         639 :     AppendPlainCommandOption(buf, use_new_option_syntax, option_name);
  551 rhaas                     757 ECB             : 
  551 rhaas                     758 CBC         639 :     if (option_value != NULL)
                                759                 :     {
                                760             639 :         size_t      length = strlen(option_value);
                                761             639 :         char       *escaped_value = palloc(1 + 2 * length);
  551 rhaas                     762 ECB             : 
  551 rhaas                     763 GIC         639 :         PQescapeStringConn(conn, escaped_value, option_value, length, NULL);
  551 rhaas                     764 CBC         639 :         appendPQExpBuffer(buf, " '%s'", escaped_value);
  551 rhaas                     765 GIC         639 :         pfree(escaped_value);
                                766                 :     }
                                767             639 : }
                                768                 : 
                                769                 : /*
                                770                 :  * Append an option with an associated integer value to a server command
                                771                 :  * is being constructed.
                                772                 :  *
  551 rhaas                     773 ECB             :  * See comments for AppendPlainCommandOption, above.
                                774                 :  */
                                775                 : void
  551 rhaas                     776 CBC         137 : AppendIntegerCommandOption(PQExpBuffer buf, bool use_new_option_syntax,
                                777                 :                            char *option_name, int32 option_value)
  551 rhaas                     778 ECB             : {
  551 rhaas                     779 CBC         137 :     AppendPlainCommandOption(buf, use_new_option_syntax, option_name);
                                780                 : 
  551 rhaas                     781 GIC         137 :     appendPQExpBuffer(buf, " %d", option_value);
                                782             137 : }
                                783                 : 
                                784                 : /*
                                785                 :  * Frontend version of GetCurrentTimestamp(), since we are not linked with
 2236 tgl                       786 ECB             :  * backend code.
                                787                 :  */
                                788                 : TimestampTz
 3309 rhaas                     789 GIC        4278 : feGetCurrentTimestamp(void)
                                790                 : {
 2236 tgl                       791 ECB             :     TimestampTz result;
                                792                 :     struct timeval tp;
 3309 rhaas                     793                 : 
 3309 rhaas                     794 GIC        4278 :     gettimeofday(&tp, NULL);
 3309 rhaas                     795 ECB             : 
 2236 tgl                       796 GIC        4278 :     result = (TimestampTz) tp.tv_sec -
 3309 rhaas                     797 ECB             :         ((POSTGRES_EPOCH_JDATE - UNIX_EPOCH_JDATE) * SECS_PER_DAY);
 3309 rhaas                     798 GIC        4278 :     result = (result * USECS_PER_SEC) + tp.tv_usec;
                                799                 : 
                                800            4278 :     return result;
                                801                 : }
                                802                 : 
                                803                 : /*
                                804                 :  * Frontend version of TimestampDifference(), since we are not linked with
 3309 rhaas                     805 ECB             :  * backend code.
                                806                 :  */
                                807                 : void
 2236 tgl                       808 CBC        3793 : feTimestampDifference(TimestampTz start_time, TimestampTz stop_time,
                                809                 :                       long *secs, int *microsecs)
 3309 rhaas                     810 ECB             : {
 2236 tgl                       811 GIC        3793 :     TimestampTz diff = stop_time - start_time;
 3309 rhaas                     812 EUB             : 
 3309 rhaas                     813 GBC        3793 :     if (diff <= 0)
                                814                 :     {
 3309 rhaas                     815 UIC           0 :         *secs = 0;
                                816               0 :         *microsecs = 0;
 3309 rhaas                     817 ECB             :     }
                                818                 :     else
                                819                 :     {
 3309 rhaas                     820 CBC        3793 :         *secs = (long) (diff / USECS_PER_SEC);
 3309 rhaas                     821 GIC        3793 :         *microsecs = (int) (diff % USECS_PER_SEC);
                                822                 :     }
                                823            3793 : }
                                824                 : 
                                825                 : /*
                                826                 :  * Frontend version of TimestampDifferenceExceeds(), since we are not
 3309 rhaas                     827 ECB             :  * linked with backend code.
                                828                 :  */
                                829                 : bool
 2236 tgl                       830 GIC        4809 : feTimestampDifferenceExceeds(TimestampTz start_time,
 2236 tgl                       831 ECB             :                              TimestampTz stop_time,
                                832                 :                              int msec)
 3309 rhaas                     833                 : {
 2236 tgl                       834 GIC        4809 :     TimestampTz diff = stop_time - start_time;
                                835                 : 
 3309 rhaas                     836            4809 :     return (diff >= msec * INT64CONST(1000));
                                837                 : }
                                838                 : 
                                839                 : /*
 3309 rhaas                     840 ECB             :  * Converts an int64 to network byte order.
                                841                 :  */
                                842                 : void
 3309 rhaas                     843 GIC         496 : fe_sendint64(int64 i, char *buf)
 3309 rhaas                     844 ECB             : {
 2016 andres                    845 CBC         496 :     uint64      n64 = pg_hton64(i);
                                846                 : 
 2016 andres                    847 GIC         496 :     memcpy(buf, &n64, sizeof(n64));
 3309 rhaas                     848             496 : }
                                849                 : 
                                850                 : /*
 3309 rhaas                     851 ECB             :  * Converts an int64 from network byte order to native format.
                                852                 :  */
                                853                 : int64
 3309 rhaas                     854 GIC        7049 : fe_recvint64(char *buf)
 3309 rhaas                     855 ECB             : {
                                856                 :     uint64      n64;
                                857                 : 
 2016 andres                    858 GIC        7049 :     memcpy(&n64, buf, sizeof(n64));
                                859                 : 
                                860            7049 :     return pg_ntoh64(n64);
                                861                 : }
        

Generated by: LCOV version v1.16-55-g56c0a2a