LCOV - differential code coverage report
Current view: top level - src/bin/pg_basebackup - streamutil.c (source / functions) Coverage Total Hit UNC LBC UIC UBC GBC GIC GNC CBC EUB ECB DUB DCB
Current: Differential Code Coverage HEAD vs 15 Lines: 71.9 % 327 235 1 21 49 21 18 110 2 105 51 108 2 1
Current Date: 2023-04-08 15:15:32 Functions: 100.0 % 15 15 14 1 14
Baseline: 15
Baseline Date: 2023-04-08 15:09:40
Legend: Lines: hit not hit

           TLA  Line data    Source code
       1                 : /*-------------------------------------------------------------------------
       2                 :  *
       3                 :  * streamutil.c - utility functions for pg_basebackup, pg_receivewal and
       4                 :  *                  pg_recvlogical
       5                 :  *
       6                 :  * Author: Magnus Hagander <magnus@hagander.net>
       7                 :  *
       8                 :  * Portions Copyright (c) 1996-2023, PostgreSQL Global Development Group
       9                 :  *
      10                 :  * IDENTIFICATION
      11                 :  *        src/bin/pg_basebackup/streamutil.c
      12                 :  *-------------------------------------------------------------------------
      13                 :  */
      14                 : 
      15                 : #include "postgres_fe.h"
      16                 : 
      17                 : #include <sys/time.h>
      18                 : #include <unistd.h>
      19                 : 
      20                 : #include "access/xlog_internal.h"
      21                 : #include "common/connect.h"
      22                 : #include "common/fe_memutils.h"
      23                 : #include "common/file_perm.h"
      24                 : #include "common/logging.h"
      25                 : #include "common/string.h"
      26                 : #include "datatype/timestamp.h"
      27                 : #include "port/pg_bswap.h"
      28                 : #include "pqexpbuffer.h"
      29                 : #include "receivelog.h"
      30                 : #include "streamutil.h"
      31                 : 
      32                 : #define ERRCODE_DUPLICATE_OBJECT  "42710"
      33                 : 
      34                 : int         WalSegSz;
      35                 : 
      36                 : static bool RetrieveDataDirCreatePerm(PGconn *conn);
      37                 : 
      38                 : /* SHOW command for replication connection was introduced in version 10 */
      39                 : #define MINIMUM_VERSION_FOR_SHOW_CMD 100000
      40                 : 
      41                 : /*
      42                 :  * Group access is supported from version 11.
      43                 :  */
      44                 : #define MINIMUM_VERSION_FOR_GROUP_ACCESS 110000
      45                 : 
      46                 : const char *progname;
      47                 : char       *connection_string = NULL;
      48                 : char       *dbhost = NULL;
      49                 : char       *dbuser = NULL;
      50                 : char       *dbport = NULL;
      51                 : char       *dbname = NULL;
      52                 : int         dbgetpassword = 0;  /* 0=auto, -1=never, 1=always */
      53                 : static char *password = NULL;
      54                 : PGconn     *conn = NULL;
      55                 : 
      56                 : /*
      57                 :  * Connect to the server. Returns a valid PGconn pointer if connected,
      58                 :  * or NULL on non-permanent error. On permanent error, the function will
      59                 :  * call exit(1) directly.
      60                 :  */
      61                 : PGconn *
      62 CBC         299 : GetConnection(void)
      63                 : {
      64                 :     PGconn     *tmpconn;
      65             299 :     int         argcount = 7;   /* dbname, replication, fallback_app_name,
      66                 :                                  * host, user, port, password */
      67                 :     int         i;
      68                 :     const char **keywords;
      69                 :     const char **values;
      70                 :     const char *tmpparam;
      71                 :     bool        need_password;
      72             299 :     PQconninfoOption *conn_opts = NULL;
      73                 :     PQconninfoOption *conn_opt;
      74             299 :     char       *err_msg = NULL;
      75                 : 
      76                 :     /* pg_recvlogical uses dbname only; others use connection_string only. */
      77             299 :     Assert(dbname == NULL || connection_string == NULL);
      78                 : 
      79                 :     /*
      80                 :      * Merge the connection info inputs given in form of connection string,
      81                 :      * options and default values (dbname=replication, replication=true, etc.)
      82                 :      * Explicitly discard any dbname value in the connection string;
      83                 :      * otherwise, PQconnectdbParams() would interpret that value as being
      84                 :      * itself a connection string.
      85                 :      */
      86             299 :     i = 0;
      87             299 :     if (connection_string)
      88                 :     {
      89               2 :         conn_opts = PQconninfoParse(connection_string, &err_msg);
      90               2 :         if (conn_opts == NULL)
      91 UBC           0 :             pg_fatal("%s", err_msg);
      92                 : 
      93 CBC          80 :         for (conn_opt = conn_opts; conn_opt->keyword != NULL; conn_opt++)
      94                 :         {
      95              78 :             if (conn_opt->val != NULL && conn_opt->val[0] != '\0' &&
      96               6 :                 strcmp(conn_opt->keyword, "dbname") != 0)
      97               4 :                 argcount++;
      98                 :         }
      99                 : 
     100               2 :         keywords = pg_malloc0((argcount + 1) * sizeof(*keywords));
     101               2 :         values = pg_malloc0((argcount + 1) * sizeof(*values));
     102                 : 
     103              80 :         for (conn_opt = conn_opts; conn_opt->keyword != NULL; conn_opt++)
     104                 :         {
     105              78 :             if (conn_opt->val != NULL && conn_opt->val[0] != '\0' &&
     106               6 :                 strcmp(conn_opt->keyword, "dbname") != 0)
     107                 :             {
     108               4 :                 keywords[i] = conn_opt->keyword;
     109               4 :                 values[i] = conn_opt->val;
     110               4 :                 i++;
     111                 :             }
     112                 :         }
     113                 :     }
     114                 :     else
     115                 :     {
     116             297 :         keywords = pg_malloc0((argcount + 1) * sizeof(*keywords));
     117             297 :         values = pg_malloc0((argcount + 1) * sizeof(*values));
     118                 :     }
     119                 : 
     120             299 :     keywords[i] = "dbname";
     121             299 :     values[i] = dbname == NULL ? "replication" : dbname;
     122             299 :     i++;
     123             299 :     keywords[i] = "replication";
     124             299 :     values[i] = dbname == NULL ? "true" : "database";
     125             299 :     i++;
     126             299 :     keywords[i] = "fallback_application_name";
     127             299 :     values[i] = progname;
     128             299 :     i++;
     129                 : 
     130             299 :     if (dbhost)
     131                 :     {
     132             107 :         keywords[i] = "host";
     133             107 :         values[i] = dbhost;
     134             107 :         i++;
     135                 :     }
     136             299 :     if (dbuser)
     137                 :     {
     138               7 :         keywords[i] = "user";
     139               7 :         values[i] = dbuser;
     140               7 :         i++;
     141                 :     }
     142             299 :     if (dbport)
     143                 :     {
     144             107 :         keywords[i] = "port";
     145             107 :         values[i] = dbport;
     146             107 :         i++;
     147                 :     }
     148                 : 
     149                 :     /* If -W was given, force prompt for password, but only the first time */
     150             299 :     need_password = (dbgetpassword == 1 && !password);
     151                 : 
     152                 :     do
     153                 :     {
     154                 :         /* Get a new password if appropriate */
     155             299 :         if (need_password)
     156                 :         {
     157 UNC           0 :             free(password);
     158 UBC           0 :             password = simple_prompt("Password: ", false);
     159 UIC           0 :             need_password = false;
     160                 :         }
     161                 : 
     162 ECB             :         /* Use (or reuse, on a subsequent connection) password if we have it */
     163 GIC         299 :         if (password)
     164 EUB             :         {
     165 UBC           0 :             keywords[i] = "password";
     166 UIC           0 :             values[i] = password;
     167                 :         }
     168                 :         else
     169 ECB             :         {
     170 CBC         299 :             keywords[i] = NULL;
     171 GIC         299 :             values[i] = NULL;
     172                 :         }
     173 ECB             : 
     174 GIC         299 :         tmpconn = PQconnectdbParams(keywords, values, true);
     175                 : 
     176                 :         /*
     177                 :          * If there is too little memory even to allocate the PGconn object
     178                 :          * and PQconnectdbParams returns NULL, we call exit(1) directly.
     179 ECB             :          */
     180 GBC         299 :         if (!tmpconn)
     181 UIC           0 :             pg_fatal("could not connect to server");
     182                 : 
     183 ECB             :         /* If we need a password and -w wasn't given, loop back and get one */
     184 CBC         301 :         if (PQstatus(tmpconn) == CONNECTION_BAD &&
     185 GBC           2 :             PQconnectionNeedsPassword(tmpconn) &&
     186 UIC           0 :             dbgetpassword != -1)
     187 EUB             :         {
     188 UBC           0 :             PQfinish(tmpconn);
     189 UIC           0 :             need_password = true;
     190                 :         }
     191 ECB             :     }
     192 GIC         299 :     while (need_password);
     193 ECB             : 
     194 GIC         299 :     if (PQstatus(tmpconn) != CONNECTION_OK)
     195 ECB             :     {
     196 CBC           2 :         pg_log_error("%s", PQerrorMessage(tmpconn));
     197               2 :         PQfinish(tmpconn);
     198               2 :         free(values);
     199               2 :         free(keywords);
     200 GNC           2 :         PQconninfoFree(conn_opts);
     201 GIC           2 :         return NULL;
     202                 :     }
     203 ECB             : 
     204                 :     /* Connection ok! */
     205 CBC         297 :     free(values);
     206 GIC         297 :     free(keywords);
     207 GNC         297 :     PQconninfoFree(conn_opts);
     208                 : 
     209                 :     /*
     210                 :      * Set always-secure search path, so malicious users can't get control.
     211                 :      * The capacity to run normal SQL queries was added in PostgreSQL 10, so
     212 ECB             :      * the search path cannot be changed (by us or attackers) on earlier
     213                 :      * versions.
     214                 :      */
     215 GIC         297 :     if (dbname != NULL && PQserverVersion(tmpconn) >= 100000)
     216 ECB             :     {
     217                 :         PGresult   *res;
     218                 : 
     219 GBC          47 :         res = PQexec(tmpconn, ALWAYS_SECURE_SEARCH_PATH_SQL);
     220 GIC          47 :         if (PQresultStatus(res) != PGRES_TUPLES_OK)
     221 EUB             :         {
     222 UBC           0 :             pg_log_error("could not clear search_path: %s",
     223 EUB             :                          PQerrorMessage(tmpconn));
     224 UIC           0 :             PQclear(res);
     225 LBC           0 :             PQfinish(tmpconn);
     226 UIC           0 :             exit(1);
     227                 :         }
     228 GIC          47 :         PQclear(res);
     229                 :     }
     230                 : 
     231                 :     /*
     232 ECB             :      * Ensure we have the same value of integer_datetimes (now always "on") as
     233                 :      * the server we are connecting to.
     234                 :      */
     235 GBC         297 :     tmpparam = PQparameterStatus(tmpconn, "integer_datetimes");
     236             297 :     if (!tmpparam)
     237 EUB             :     {
     238 UIC           0 :         pg_log_error("could not determine server setting for integer_datetimes");
     239               0 :         PQfinish(tmpconn);
     240 LBC           0 :         exit(1);
     241                 :     }
     242 EUB             : 
     243 GBC         297 :     if (strcmp(tmpparam, "on") != 0)
     244 EUB             :     {
     245 UIC           0 :         pg_log_error("integer_datetimes compile flag does not match server");
     246               0 :         PQfinish(tmpconn);
     247               0 :         exit(1);
     248                 :     }
     249                 : 
     250                 :     /*
     251 ECB             :      * Retrieve the source data directory mode and use it to construct a umask
     252                 :      * for creating directories and files.
     253 EUB             :      */
     254 GBC         297 :     if (!RetrieveDataDirCreatePerm(tmpconn))
     255                 :     {
     256 UIC           0 :         PQfinish(tmpconn);
     257 LBC           0 :         exit(1);
     258                 :     }
     259                 : 
     260 GIC         297 :     return tmpconn;
     261                 : }
     262                 : 
     263                 : /*
     264                 :  * From version 10, explicitly set wal segment size using SHOW wal_segment_size
     265 ECB             :  * since ControlFile is not accessible here.
     266                 :  */
     267                 : bool
     268 GIC         149 : RetrieveWalSegSize(PGconn *conn)
     269                 : {
     270 ECB             :     PGresult   *res;
     271                 :     char        xlog_unit[3];
     272                 :     int         xlog_val,
     273 CBC         149 :                 multiplier = 1;
     274                 : 
     275                 :     /* check connection existence */
     276             149 :     Assert(conn != NULL);
     277                 : 
     278 EUB             :     /* for previous versions set the default xlog seg size */
     279 GBC         149 :     if (PQserverVersion(conn) < MINIMUM_VERSION_FOR_SHOW_CMD)
     280                 :     {
     281 UIC           0 :         WalSegSz = DEFAULT_XLOG_SEG_SIZE;
     282 LBC           0 :         return true;
     283 ECB             :     }
     284                 : 
     285 GBC         149 :     res = PQexec(conn, "SHOW wal_segment_size");
     286 GIC         149 :     if (PQresultStatus(res) != PGRES_TUPLES_OK)
     287                 :     {
     288 UBC           0 :         pg_log_error("could not send replication command \"%s\": %s",
     289 EUB             :                      "SHOW wal_segment_size", PQerrorMessage(conn));
     290                 : 
     291 LBC           0 :         PQclear(res);
     292 UIC           0 :         return false;
     293 EUB             :     }
     294 GIC         149 :     if (PQntuples(res) != 1 || PQnfields(res) < 1)
     295                 :     {
     296 UBC           0 :         pg_log_error("could not fetch WAL segment size: got %d rows and %d fields, expected %d rows and %d or more fields",
     297 EUB             :                      PQntuples(res), PQnfields(res), 1, 1);
     298                 : 
     299 UIC           0 :         PQclear(res);
     300               0 :         return false;
     301 ECB             :     }
     302                 : 
     303 EUB             :     /* fetch xlog value and unit from the result */
     304 GBC         149 :     if (sscanf(PQgetvalue(res, 0, 0), "%d%2s", &xlog_val, xlog_unit) != 2)
     305 EUB             :     {
     306 UIC           0 :         pg_log_error("WAL segment size could not be parsed");
     307               0 :         PQclear(res);
     308 LBC           0 :         return false;
     309                 :     }
     310                 : 
     311 CBC         149 :     PQclear(res);
     312 ECB             : 
     313 EUB             :     /* set the multiplier based on unit to convert xlog_val to bytes */
     314 GBC         149 :     if (strcmp(xlog_unit, "MB") == 0)
     315 GIC         149 :         multiplier = 1024 * 1024;
     316 UIC           0 :     else if (strcmp(xlog_unit, "GB") == 0)
     317 LBC           0 :         multiplier = 1024 * 1024 * 1024;
     318                 : 
     319 ECB             :     /* convert and set WalSegSz */
     320 GIC         149 :     WalSegSz = xlog_val * multiplier;
     321 EUB             : 
     322 GIC         149 :     if (!IsValidWalSegSize(WalSegSz))
     323                 :     {
     324 UIC           0 :         pg_log_error(ngettext("WAL segment size must be a power of two between 1 MB and 1 GB, but the remote server reported a value of %d byte",
     325 EUB             :                               "WAL segment size must be a power of two between 1 MB and 1 GB, but the remote server reported a value of %d bytes",
     326                 :                               WalSegSz),
     327                 :                      WalSegSz);
     328 LBC           0 :         return false;
     329                 :     }
     330                 : 
     331 GIC         149 :     return true;
     332                 : }
     333                 : 
     334                 : /*
     335                 :  * RetrieveDataDirCreatePerm
     336                 :  *
     337                 :  * This function is used to determine the privileges on the server's PG data
     338                 :  * directory and, based on that, set what the permissions will be for
     339                 :  * directories and files we create.
     340                 :  *
     341                 :  * PG11 added support for (optionally) group read/execute rights to be set on
     342                 :  * the data directory.  Prior to PG11, only the owner was allowed to have rights
     343 ECB             :  * on the data directory.
     344                 :  */
     345                 : static bool
     346 GIC         297 : RetrieveDataDirCreatePerm(PGconn *conn)
     347                 : {
     348                 :     PGresult   *res;
     349 ECB             :     int         data_directory_mode;
     350                 : 
     351                 :     /* check connection existence */
     352 CBC         297 :     Assert(conn != NULL);
     353 EUB             : 
     354                 :     /* for previous versions leave the default group access */
     355 CBC         297 :     if (PQserverVersion(conn) < MINIMUM_VERSION_FOR_GROUP_ACCESS)
     356 LBC           0 :         return true;
     357                 : 
     358 GBC         297 :     res = PQexec(conn, "SHOW data_directory_mode");
     359 GIC         297 :     if (PQresultStatus(res) != PGRES_TUPLES_OK)
     360                 :     {
     361 UBC           0 :         pg_log_error("could not send replication command \"%s\": %s",
     362 EUB             :                      "SHOW data_directory_mode", PQerrorMessage(conn));
     363                 : 
     364 LBC           0 :         PQclear(res);
     365 UIC           0 :         return false;
     366 EUB             :     }
     367 GIC         297 :     if (PQntuples(res) != 1 || PQnfields(res) < 1)
     368                 :     {
     369 UBC           0 :         pg_log_error("could not fetch group access flag: got %d rows and %d fields, expected %d rows and %d or more fields",
     370 EUB             :                      PQntuples(res), PQnfields(res), 1, 1);
     371                 : 
     372 UIC           0 :         PQclear(res);
     373 LBC           0 :         return false;
     374                 :     }
     375 EUB             : 
     376 GIC         297 :     if (sscanf(PQgetvalue(res, 0, 0), "%o", &data_directory_mode) != 1)
     377                 :     {
     378 UBC           0 :         pg_log_error("group access flag could not be parsed: %s",
     379 EUB             :                      PQgetvalue(res, 0, 0));
     380                 : 
     381 UIC           0 :         PQclear(res);
     382 LBC           0 :         return false;
     383                 :     }
     384 ECB             : 
     385 CBC         297 :     SetDataDirectoryCreatePerm(data_directory_mode);
     386                 : 
     387 GIC         297 :     PQclear(res);
     388             297 :     return true;
     389                 : }
     390                 : 
     391                 : /*
     392                 :  * Run IDENTIFY_SYSTEM through a given connection and give back to caller
     393                 :  * some result information if requested:
     394                 :  * - System identifier
     395                 :  * - Current timeline ID
     396                 :  * - Start LSN position
     397 ECB             :  * - Database name (NULL in servers prior to 9.4)
     398                 :  */
     399                 : bool
     400 GIC         305 : RunIdentifySystem(PGconn *conn, char **sysid, TimeLineID *starttli,
     401                 :                   XLogRecPtr *startpos, char **db_name)
     402                 : {
     403                 :     PGresult   *res;
     404                 :     uint32      hi,
     405 ECB             :                 lo;
     406                 : 
     407                 :     /* Check connection existence */
     408 CBC         305 :     Assert(conn != NULL);
     409                 : 
     410 GBC         305 :     res = PQexec(conn, "IDENTIFY_SYSTEM");
     411 GIC         305 :     if (PQresultStatus(res) != PGRES_TUPLES_OK)
     412                 :     {
     413 UBC           0 :         pg_log_error("could not send replication command \"%s\": %s",
     414 EUB             :                      "IDENTIFY_SYSTEM", PQerrorMessage(conn));
     415                 : 
     416 LBC           0 :         PQclear(res);
     417 UIC           0 :         return false;
     418 EUB             :     }
     419 GIC         305 :     if (PQntuples(res) != 1 || PQnfields(res) < 3)
     420                 :     {
     421 UBC           0 :         pg_log_error("could not identify system: got %d rows and %d fields, expected %d rows and %d or more fields",
     422 EUB             :                      PQntuples(res), PQnfields(res), 1, 3);
     423                 : 
     424 UIC           0 :         PQclear(res);
     425               0 :         return false;
     426 ECB             :     }
     427                 : 
     428                 :     /* Get system identifier */
     429 GIC         305 :     if (sysid != NULL)
     430 CBC         249 :         *sysid = pg_strdup(PQgetvalue(res, 0, 0));
     431 ECB             : 
     432                 :     /* Get timeline ID to start streaming from */
     433 GIC         305 :     if (starttli != NULL)
     434 CBC         249 :         *starttli = atoi(PQgetvalue(res, 0, 1));
     435                 : 
     436 ECB             :     /* Get LSN start position if necessary */
     437 GIC         305 :     if (startpos != NULL)
     438 EUB             :     {
     439 GIC           7 :         if (sscanf(PQgetvalue(res, 0, 2), "%X/%X", &hi, &lo) != 2)
     440                 :         {
     441 UBC           0 :             pg_log_error("could not parse write-ahead log location \"%s\"",
     442 EUB             :                          PQgetvalue(res, 0, 2));
     443                 : 
     444 LBC           0 :             PQclear(res);
     445 UIC           0 :             return false;
     446                 :         }
     447 GIC           7 :         *startpos = ((uint64) hi) << 32 | lo;
     448 ECB             :     }
     449                 : 
     450                 :     /* Get database name, only available in 9.4 and newer versions */
     451 CBC         305 :     if (db_name != NULL)
     452                 :     {
     453              56 :         *db_name = NULL;
     454 GIC          56 :         if (PQserverVersion(conn) >= 90400)
     455 EUB             :         {
     456 GIC          56 :             if (PQnfields(res) < 4)
     457                 :             {
     458 UBC           0 :                 pg_log_error("could not identify system: got %d rows and %d fields, expected %d rows and %d or more fields",
     459 EUB             :                              PQntuples(res), PQnfields(res), 1, 4);
     460                 : 
     461 LBC           0 :                 PQclear(res);
     462               0 :                 return false;
     463                 :             }
     464 GIC          56 :             if (!PQgetisnull(res, 0, 3))
     465              47 :                 *db_name = pg_strdup(PQgetvalue(res, 0, 3));
     466 ECB             :         }
     467                 :     }
     468                 : 
     469 GIC         305 :     PQclear(res);
     470             305 :     return true;
     471                 : }
     472                 : 
     473                 : /*
     474                 :  * Run READ_REPLICATION_SLOT through a given connection and give back to
     475                 :  * caller some result information if requested for this slot:
     476                 :  * - Start LSN position, InvalidXLogRecPtr if unknown.
     477                 :  * - Current timeline ID, 0 if unknown.
     478 ECB             :  * Returns false on failure, and true otherwise.
     479                 :  */
     480                 : bool
     481 GIC           3 : GetSlotInformation(PGconn *conn, const char *slot_name,
     482                 :                    XLogRecPtr *restart_lsn, TimeLineID *restart_tli)
     483 ECB             : {
     484                 :     PGresult   *res;
     485                 :     PQExpBuffer query;
     486 CBC           3 :     XLogRecPtr  lsn_loc = InvalidXLogRecPtr;
     487               3 :     TimeLineID  tli_loc = 0;
     488 ECB             : 
     489 CBC           3 :     if (restart_lsn)
     490 GIC           3 :         *restart_lsn = lsn_loc;
     491 CBC           3 :     if (restart_tli)
     492               3 :         *restart_tli = tli_loc;
     493 ECB             : 
     494 CBC           3 :     query = createPQExpBuffer();
     495 GIC           3 :     appendPQExpBuffer(query, "READ_REPLICATION_SLOT %s", slot_name);
     496 CBC           3 :     res = PQexec(conn, query->data);
     497 GIC           3 :     destroyPQExpBuffer(query);
     498 EUB             : 
     499 GIC           3 :     if (PQresultStatus(res) != PGRES_TUPLES_OK)
     500 EUB             :     {
     501 UBC           0 :         pg_log_error("could not send replication command \"%s\": %s",
     502                 :                      "READ_REPLICATION_SLOT", PQerrorMessage(conn));
     503 UIC           0 :         PQclear(res);
     504               0 :         return false;
     505 ECB             :     }
     506                 : 
     507 EUB             :     /* The command should always return precisely one tuple and three fields */
     508 GIC           3 :     if (PQntuples(res) != 1 || PQnfields(res) != 3)
     509 EUB             :     {
     510 UBC           0 :         pg_log_error("could not read replication slot \"%s\": got %d rows and %d fields, expected %d rows and %d fields",
     511                 :                      slot_name, PQntuples(res), PQnfields(res), 1, 3);
     512 UIC           0 :         PQclear(res);
     513               0 :         return false;
     514                 :     }
     515                 : 
     516                 :     /*
     517 ECB             :      * When the slot doesn't exist, the command returns a tuple with NULL
     518                 :      * values.  This checks only the slot type field.
     519                 :      */
     520 CBC           3 :     if (PQgetisnull(res, 0, 0))
     521 ECB             :     {
     522 GIC           1 :         pg_log_error("replication slot \"%s\" does not exist", slot_name);
     523               1 :         PQclear(res);
     524               1 :         return false;
     525                 :     }
     526                 : 
     527                 :     /*
     528 ECB             :      * Note that this cannot happen as READ_REPLICATION_SLOT supports only
     529                 :      * physical slots, but play it safe.
     530 EUB             :      */
     531 GIC           2 :     if (strcmp(PQgetvalue(res, 0, 0), "physical") != 0)
     532 EUB             :     {
     533 UBC           0 :         pg_log_error("expected a physical replication slot, got type \"%s\" instead",
     534                 :                      PQgetvalue(res, 0, 0));
     535 UIC           0 :         PQclear(res);
     536               0 :         return false;
     537 ECB             :     }
     538                 : 
     539                 :     /* restart LSN */
     540 GIC           2 :     if (!PQgetisnull(res, 0, 1))
     541                 :     {
     542 ECB             :         uint32      hi,
     543                 :                     lo;
     544 EUB             : 
     545 GIC           2 :         if (sscanf(PQgetvalue(res, 0, 1), "%X/%X", &hi, &lo) != 2)
     546 EUB             :         {
     547 UBC           0 :             pg_log_error("could not parse restart_lsn \"%s\" for replication slot \"%s\"",
     548                 :                          PQgetvalue(res, 0, 1), slot_name);
     549 LBC           0 :             PQclear(res);
     550 UIC           0 :             return false;
     551                 :         }
     552 GIC           2 :         lsn_loc = ((uint64) hi) << 32 | lo;
     553 ECB             :     }
     554                 : 
     555                 :     /* current TLI */
     556 CBC           2 :     if (!PQgetisnull(res, 0, 2))
     557 GIC           2 :         tli_loc = (TimeLineID) atol(PQgetvalue(res, 0, 2));
     558                 : 
     559 CBC           2 :     PQclear(res);
     560 ECB             : 
     561                 :     /* Assign results if requested */
     562 CBC           2 :     if (restart_lsn)
     563 GIC           2 :         *restart_lsn = lsn_loc;
     564 CBC           2 :     if (restart_tli)
     565 GIC           2 :         *restart_tli = tli_loc;
     566                 : 
     567               2 :     return true;
     568                 : }
     569                 : 
     570                 : /*
     571                 :  * Create a replication slot for the given connection. This function
     572 ECB             :  * returns true in case of success.
     573                 :  */
     574                 : bool
     575 GIC         118 : CreateReplicationSlot(PGconn *conn, const char *slot_name, const char *plugin,
     576                 :                       bool is_temporary, bool is_physical, bool reserve_wal,
     577                 :                       bool slot_exists_ok, bool two_phase)
     578 ECB             : {
     579                 :     PQExpBuffer query;
     580                 :     PGresult   *res;
     581 GIC         118 :     bool        use_new_option_syntax = (PQserverVersion(conn) >= 150000);
     582 ECB             : 
     583 GIC         118 :     query = createPQExpBuffer();
     584 ECB             : 
     585 CBC         118 :     Assert((is_physical && plugin == NULL) ||
     586                 :            (!is_physical && plugin != NULL));
     587 GIC         118 :     Assert(!(two_phase && is_physical));
     588 CBC         118 :     Assert(slot_name != NULL);
     589 ECB             : 
     590                 :     /* Build base portion of query */
     591 CBC         118 :     appendPQExpBuffer(query, "CREATE_REPLICATION_SLOT \"%s\"", slot_name);
     592             118 :     if (is_temporary)
     593 GIC          92 :         appendPQExpBufferStr(query, " TEMPORARY");
     594 CBC         118 :     if (is_physical)
     595 GIC          95 :         appendPQExpBufferStr(query, " PHYSICAL");
     596                 :     else
     597 CBC          23 :         appendPQExpBuffer(query, " LOGICAL \"%s\"", plugin);
     598 ECB             : 
     599                 :     /* Add any requested options */
     600 GIC         118 :     if (use_new_option_syntax)
     601 CBC         118 :         appendPQExpBufferStr(query, " (");
     602             118 :     if (is_physical)
     603                 :     {
     604 GIC          95 :         if (reserve_wal)
     605              94 :             AppendPlainCommandOption(query, use_new_option_syntax,
     606                 :                                      "RESERVE_WAL");
     607 ECB             :     }
     608                 :     else
     609                 :     {
     610 GIC          23 :         if (two_phase && PQserverVersion(conn) >= 150000)
     611 CBC           1 :             AppendPlainCommandOption(query, use_new_option_syntax,
     612                 :                                      "TWO_PHASE");
     613                 : 
     614              23 :         if (PQserverVersion(conn) >= 100000)
     615 ECB             :         {
     616                 :             /* pg_recvlogical doesn't use an exported snapshot, so suppress */
     617 GIC          23 :             if (use_new_option_syntax)
     618 GBC          23 :                 AppendStringCommandOption(query, use_new_option_syntax,
     619                 :                                           "SNAPSHOT", "nothing");
     620                 :             else
     621 UIC           0 :                 AppendPlainCommandOption(query, use_new_option_syntax,
     622 ECB             :                                          "NOEXPORT_SNAPSHOT");
     623                 :         }
     624                 :     }
     625 CBC         118 :     if (use_new_option_syntax)
     626                 :     {
     627 ECB             :         /* Suppress option list if it would be empty, otherwise terminate */
     628 CBC         118 :         if (query->data[query->len - 1] == '(')
     629                 :         {
     630 GIC           1 :             query->len -= 2;
     631 CBC           1 :             query->data[query->len] = '\0';
     632                 :         }
     633                 :         else
     634 GIC         117 :             appendPQExpBufferChar(query, ')');
     635 ECB             :     }
     636                 : 
     637                 :     /* Now run the query */
     638 CBC         118 :     res = PQexec(conn, query->data);
     639 GIC         118 :     if (PQresultStatus(res) != PGRES_TUPLES_OK)
     640 ECB             :     {
     641 GBC           1 :         const char *sqlstate = PQresultErrorField(res, PG_DIAG_SQLSTATE);
     642 EUB             : 
     643 GIC           1 :         if (slot_exists_ok &&
     644 UBC           0 :             sqlstate &&
     645               0 :             strcmp(sqlstate, ERRCODE_DUPLICATE_OBJECT) == 0)
     646 EUB             :         {
     647 UIC           0 :             destroyPQExpBuffer(query);
     648               0 :             PQclear(res);
     649               0 :             return true;
     650 ECB             :         }
     651                 :         else
     652                 :         {
     653 CBC           1 :             pg_log_error("could not send replication command \"%s\": %s",
     654 ECB             :                          query->data, PQerrorMessage(conn));
     655                 : 
     656 GIC           1 :             destroyPQExpBuffer(query);
     657               1 :             PQclear(res);
     658               1 :             return false;
     659 ECB             :         }
     660                 :     }
     661 EUB             : 
     662 GIC         117 :     if (PQntuples(res) != 1 || PQnfields(res) != 4)
     663                 :     {
     664 UIC           0 :         pg_log_error("could not create replication slot \"%s\": got %d rows and %d fields, expected %d rows and %d fields",
     665 EUB             :                      slot_name,
     666                 :                      PQntuples(res), PQnfields(res), 1, 4);
     667                 : 
     668 UIC           0 :         destroyPQExpBuffer(query);
     669               0 :         PQclear(res);
     670 LBC           0 :         return false;
     671 ECB             :     }
     672                 : 
     673 GIC         117 :     destroyPQExpBuffer(query);
     674             117 :     PQclear(res);
     675             117 :     return true;
     676                 : }
     677                 : 
     678                 : /*
     679                 :  * Drop a replication slot for the given connection. This function
     680 ECB             :  * returns true in case of success.
     681                 :  */
     682                 : bool
     683 GIC           2 : DropReplicationSlot(PGconn *conn, const char *slot_name)
     684                 : {
     685 ECB             :     PQExpBuffer query;
     686                 :     PGresult   *res;
     687                 : 
     688 GIC           2 :     Assert(slot_name != NULL);
     689                 : 
     690 CBC           2 :     query = createPQExpBuffer();
     691                 : 
     692 ECB             :     /* Build query */
     693 CBC           2 :     appendPQExpBuffer(query, "DROP_REPLICATION_SLOT \"%s\"",
     694                 :                       slot_name);
     695 GBC           2 :     res = PQexec(conn, query->data);
     696 GIC           2 :     if (PQresultStatus(res) != PGRES_COMMAND_OK)
     697                 :     {
     698 UBC           0 :         pg_log_error("could not send replication command \"%s\": %s",
     699 EUB             :                      query->data, PQerrorMessage(conn));
     700                 : 
     701 UIC           0 :         destroyPQExpBuffer(query);
     702               0 :         PQclear(res);
     703 LBC           0 :         return false;
     704                 :     }
     705 EUB             : 
     706 GIC           2 :     if (PQntuples(res) != 0 || PQnfields(res) != 0)
     707                 :     {
     708 UIC           0 :         pg_log_error("could not drop replication slot \"%s\": got %d rows and %d fields, expected %d rows and %d fields",
     709 EUB             :                      slot_name,
     710                 :                      PQntuples(res), PQnfields(res), 0, 0);
     711                 : 
     712 UIC           0 :         destroyPQExpBuffer(query);
     713               0 :         PQclear(res);
     714 LBC           0 :         return false;
     715 ECB             :     }
     716                 : 
     717 GIC           2 :     destroyPQExpBuffer(query);
     718               2 :     PQclear(res);
     719               2 :     return true;
     720                 : }
     721                 : 
     722                 : /*
     723                 :  * Append a "plain" option - one with no value - to a server command that
     724                 :  * is being constructed.
     725                 :  *
     726                 :  * In the old syntax, all options were parser keywords, so you could just
     727                 :  * write things like SOME_COMMAND OPTION1 OPTION2 'opt2value' OPTION3 42. The
     728                 :  * new syntax uses a comma-separated list surrounded by parentheses, so the
     729 ECB             :  * equivalent is SOME_COMMAND (OPTION1, OPTION2 'optvalue', OPTION3 42).
     730                 :  */
     731                 : void
     732 CBC        1066 : AppendPlainCommandOption(PQExpBuffer buf, bool use_new_option_syntax,
     733                 :                          char *option_name)
     734 ECB             : {
     735 CBC        1066 :     if (buf->len > 0 && buf->data[buf->len - 1] != '(')
     736                 :     {
     737 GBC         807 :         if (use_new_option_syntax)
     738 GIC         807 :             appendPQExpBufferStr(buf, ", ");
     739                 :         else
     740 LBC           0 :             appendPQExpBufferChar(buf, ' ');
     741 ECB             :     }
     742                 : 
     743 GIC        1066 :     appendPQExpBuffer(buf, " %s", option_name);
     744            1066 : }
     745                 : 
     746                 : /*
     747                 :  * Append an option with an associated string value to a server command that
     748                 :  * is being constructed.
     749                 :  *
     750 ECB             :  * See comments for AppendPlainCommandOption, above.
     751                 :  */
     752                 : void
     753 CBC         639 : AppendStringCommandOption(PQExpBuffer buf, bool use_new_option_syntax,
     754                 :                           char *option_name, char *option_value)
     755 ECB             : {
     756 GIC         639 :     AppendPlainCommandOption(buf, use_new_option_syntax, option_name);
     757 ECB             : 
     758 CBC         639 :     if (option_value != NULL)
     759                 :     {
     760             639 :         size_t      length = strlen(option_value);
     761             639 :         char       *escaped_value = palloc(1 + 2 * length);
     762 ECB             : 
     763 GIC         639 :         PQescapeStringConn(conn, escaped_value, option_value, length, NULL);
     764 CBC         639 :         appendPQExpBuffer(buf, " '%s'", escaped_value);
     765 GIC         639 :         pfree(escaped_value);
     766                 :     }
     767             639 : }
     768                 : 
     769                 : /*
     770                 :  * Append an option with an associated integer value to a server command
     771                 :  * is being constructed.
     772                 :  *
     773 ECB             :  * See comments for AppendPlainCommandOption, above.
     774                 :  */
     775                 : void
     776 CBC         137 : AppendIntegerCommandOption(PQExpBuffer buf, bool use_new_option_syntax,
     777                 :                            char *option_name, int32 option_value)
     778 ECB             : {
     779 CBC         137 :     AppendPlainCommandOption(buf, use_new_option_syntax, option_name);
     780                 : 
     781 GIC         137 :     appendPQExpBuffer(buf, " %d", option_value);
     782             137 : }
     783                 : 
     784                 : /*
     785                 :  * Frontend version of GetCurrentTimestamp(), since we are not linked with
     786 ECB             :  * backend code.
     787                 :  */
     788                 : TimestampTz
     789 GIC        4278 : feGetCurrentTimestamp(void)
     790                 : {
     791 ECB             :     TimestampTz result;
     792                 :     struct timeval tp;
     793                 : 
     794 GIC        4278 :     gettimeofday(&tp, NULL);
     795 ECB             : 
     796 GIC        4278 :     result = (TimestampTz) tp.tv_sec -
     797 ECB             :         ((POSTGRES_EPOCH_JDATE - UNIX_EPOCH_JDATE) * SECS_PER_DAY);
     798 GIC        4278 :     result = (result * USECS_PER_SEC) + tp.tv_usec;
     799                 : 
     800            4278 :     return result;
     801                 : }
     802                 : 
     803                 : /*
     804                 :  * Frontend version of TimestampDifference(), since we are not linked with
     805 ECB             :  * backend code.
     806                 :  */
     807                 : void
     808 CBC        3793 : feTimestampDifference(TimestampTz start_time, TimestampTz stop_time,
     809                 :                       long *secs, int *microsecs)
     810 ECB             : {
     811 GIC        3793 :     TimestampTz diff = stop_time - start_time;
     812 EUB             : 
     813 GBC        3793 :     if (diff <= 0)
     814                 :     {
     815 UIC           0 :         *secs = 0;
     816               0 :         *microsecs = 0;
     817 ECB             :     }
     818                 :     else
     819                 :     {
     820 CBC        3793 :         *secs = (long) (diff / USECS_PER_SEC);
     821 GIC        3793 :         *microsecs = (int) (diff % USECS_PER_SEC);
     822                 :     }
     823            3793 : }
     824                 : 
     825                 : /*
     826                 :  * Frontend version of TimestampDifferenceExceeds(), since we are not
     827 ECB             :  * linked with backend code.
     828                 :  */
     829                 : bool
     830 GIC        4809 : feTimestampDifferenceExceeds(TimestampTz start_time,
     831 ECB             :                              TimestampTz stop_time,
     832                 :                              int msec)
     833                 : {
     834 GIC        4809 :     TimestampTz diff = stop_time - start_time;
     835                 : 
     836            4809 :     return (diff >= msec * INT64CONST(1000));
     837                 : }
     838                 : 
     839                 : /*
     840 ECB             :  * Converts an int64 to network byte order.
     841                 :  */
     842                 : void
     843 GIC         496 : fe_sendint64(int64 i, char *buf)
     844 ECB             : {
     845 CBC         496 :     uint64      n64 = pg_hton64(i);
     846                 : 
     847 GIC         496 :     memcpy(buf, &n64, sizeof(n64));
     848             496 : }
     849                 : 
     850                 : /*
     851 ECB             :  * Converts an int64 from network byte order to native format.
     852                 :  */
     853                 : int64
     854 GIC        7049 : fe_recvint64(char *buf)
     855 ECB             : {
     856                 :     uint64      n64;
     857                 : 
     858 GIC        7049 :     memcpy(&n64, buf, sizeof(n64));
     859                 : 
     860            7049 :     return pg_ntoh64(n64);
     861                 : }
        

Generated by: LCOV version v1.16-55-g56c0a2a