LCOV - differential code coverage report
Current view: top level - src/bin/pg_rewind - libpq_source.c (source / functions) Coverage Total Hit UBC CBC
Current: Differential Code Coverage HEAD vs 15 Lines: 84.9 % 225 191 34 191
Current Date: 2023-04-08 15:15:32 Functions: 100.0 % 13 13 13
Baseline: 15
Baseline Date: 2023-04-08 15:09:40
Legend: Lines: hit not hit

           TLA  Line data    Source code
       1                 : /*-------------------------------------------------------------------------
       2                 :  *
       3                 :  * libpq_source.c
       4                 :  *    Functions for fetching files from a remote server via libpq.
       5                 :  *
       6                 :  * Copyright (c) 2013-2023, PostgreSQL Global Development Group
       7                 :  *
       8                 :  *-------------------------------------------------------------------------
       9                 :  */
      10                 : #include "postgres_fe.h"
      11                 : 
      12                 : #include "catalog/pg_type_d.h"
      13                 : #include "common/connect.h"
      14                 : #include "datapagemap.h"
      15                 : #include "file_ops.h"
      16                 : #include "filemap.h"
      17                 : #include "lib/stringinfo.h"
      18                 : #include "pg_rewind.h"
      19                 : #include "port/pg_bswap.h"
      20                 : #include "rewind_source.h"
      21                 : 
      22                 : /*
      23                 :  * Files are fetched MAX_CHUNK_SIZE bytes at a time, and with a
      24                 :  * maximum of MAX_CHUNKS_PER_QUERY chunks in a single query.
      25                 :  */
      26                 : #define MAX_CHUNK_SIZE (1024 * 1024)
      27                 : #define MAX_CHUNKS_PER_QUERY 1000
      28                 : 
      29                 : /* represents a request to fetch a piece of a file from the source */
      30                 : typedef struct
      31                 : {
      32                 :     const char *path;           /* path relative to data directory root */
      33                 :     off_t       offset;
      34                 :     size_t      length;
      35                 : } fetch_range_request;
      36                 : 
      37                 : typedef struct
      38                 : {
      39                 :     rewind_source common;       /* common interface functions */
      40                 : 
      41                 :     PGconn     *conn;
      42                 : 
      43                 :     /*
      44                 :      * Queue of chunks that have been requested with the queue_fetch_range()
      45                 :      * function, but have not been fetched from the remote server yet.
      46                 :      */
      47                 :     int         num_requests;
      48                 :     fetch_range_request request_queue[MAX_CHUNKS_PER_QUERY];
      49                 : 
      50                 :     /* temporary space for process_queued_fetch_requests() */
      51                 :     StringInfoData paths;
      52                 :     StringInfoData offsets;
      53                 :     StringInfoData lengths;
      54                 : } libpq_source;
      55                 : 
      56                 : static void init_libpq_conn(PGconn *conn);
      57                 : static char *run_simple_query(PGconn *conn, const char *sql);
      58                 : static void run_simple_command(PGconn *conn, const char *sql);
      59                 : static void appendArrayEscapedString(StringInfo buf, const char *str);
      60                 : 
      61                 : static void process_queued_fetch_requests(libpq_source *src);
      62                 : 
      63                 : /* public interface functions */
      64                 : static void libpq_traverse_files(rewind_source *source,
      65                 :                                  process_file_callback_t callback);
      66                 : static void libpq_queue_fetch_file(rewind_source *source, const char *path, size_t len);
      67                 : static void libpq_queue_fetch_range(rewind_source *source, const char *path,
      68                 :                                     off_t off, size_t len);
      69                 : static void libpq_finish_fetch(rewind_source *source);
      70                 : static char *libpq_fetch_file(rewind_source *source, const char *path,
      71                 :                               size_t *filesize);
      72                 : static XLogRecPtr libpq_get_current_wal_insert_lsn(rewind_source *source);
      73                 : static void libpq_destroy(rewind_source *source);
      74                 : 
      75                 : /*
      76                 :  * Create a new libpq source.
      77                 :  *
      78                 :  * The caller has already established the connection, but should not try
      79                 :  * to use it while the source is active.
      80                 :  */
      81                 : rewind_source *
      82 CBC           6 : init_libpq_source(PGconn *conn)
      83                 : {
      84                 :     libpq_source *src;
      85                 : 
      86               6 :     init_libpq_conn(conn);
      87                 : 
      88               6 :     src = pg_malloc0(sizeof(libpq_source));
      89                 : 
      90               6 :     src->common.traverse_files = libpq_traverse_files;
      91               6 :     src->common.fetch_file = libpq_fetch_file;
      92               6 :     src->common.queue_fetch_file = libpq_queue_fetch_file;
      93               6 :     src->common.queue_fetch_range = libpq_queue_fetch_range;
      94               6 :     src->common.finish_fetch = libpq_finish_fetch;
      95               6 :     src->common.get_current_wal_insert_lsn = libpq_get_current_wal_insert_lsn;
      96               6 :     src->common.destroy = libpq_destroy;
      97                 : 
      98               6 :     src->conn = conn;
      99                 : 
     100               6 :     initStringInfo(&src->paths);
     101               6 :     initStringInfo(&src->offsets);
     102               6 :     initStringInfo(&src->lengths);
     103                 : 
     104               6 :     return &src->common;
     105                 : }
     106                 : 
     107                 : /*
     108                 :  * Initialize a libpq connection for use.
     109                 :  */
     110                 : static void
     111               6 : init_libpq_conn(PGconn *conn)
     112                 : {
     113                 :     PGresult   *res;
     114                 :     char       *str;
     115                 : 
     116                 :     /* disable all types of timeouts */
     117               6 :     run_simple_command(conn, "SET statement_timeout = 0");
     118               6 :     run_simple_command(conn, "SET lock_timeout = 0");
     119               6 :     run_simple_command(conn, "SET idle_in_transaction_session_timeout = 0");
     120                 : 
     121                 :     /*
     122                 :      * we don't intend to do any updates, put the connection in read-only mode
     123                 :      * to keep us honest
     124                 :      */
     125               6 :     run_simple_command(conn, "SET default_transaction_read_only = on");
     126                 : 
     127                 :     /* secure search_path */
     128               6 :     res = PQexec(conn, ALWAYS_SECURE_SEARCH_PATH_SQL);
     129               6 :     if (PQresultStatus(res) != PGRES_TUPLES_OK)
     130 UBC           0 :         pg_fatal("could not clear search_path: %s",
     131                 :                  PQresultErrorMessage(res));
     132 CBC           6 :     PQclear(res);
     133                 : 
     134                 :     /*
     135                 :      * Also check that full_page_writes is enabled.  We can get torn pages if
     136                 :      * a page is modified while we read it with pg_read_binary_file(), and we
     137                 :      * rely on full page images to fix them.
     138                 :      */
     139               6 :     str = run_simple_query(conn, "SHOW full_page_writes");
     140               6 :     if (strcmp(str, "on") != 0)
     141 UBC           0 :         pg_fatal("full_page_writes must be enabled in the source server");
     142 CBC           6 :     pg_free(str);
     143                 : 
     144                 :     /* Prepare a statement we'll use to fetch files */
     145               6 :     res = PQprepare(conn, "fetch_chunks_stmt",
     146                 :                     "SELECT path, begin,\n"
     147                 :                     "  pg_read_binary_file(path, begin, len, true) AS chunk\n"
     148                 :                     "FROM unnest ($1::text[], $2::int8[], $3::int4[]) as x(path, begin, len)",
     149                 :                     3, NULL);
     150                 : 
     151               6 :     if (PQresultStatus(res) != PGRES_COMMAND_OK)
     152 UBC           0 :         pg_fatal("could not prepare statement to fetch file contents: %s",
     153                 :                  PQresultErrorMessage(res));
     154 CBC           6 :     PQclear(res);
     155               6 : }
     156                 : 
     157                 : /*
     158                 :  * Run a query that returns a single value.
     159                 :  *
     160                 :  * The result should be pg_free'd after use.
     161                 :  */
     162                 : static char *
     163              11 : run_simple_query(PGconn *conn, const char *sql)
     164                 : {
     165                 :     PGresult   *res;
     166                 :     char       *result;
     167                 : 
     168              11 :     res = PQexec(conn, sql);
     169                 : 
     170              11 :     if (PQresultStatus(res) != PGRES_TUPLES_OK)
     171 UBC           0 :         pg_fatal("error running query (%s) on source server: %s",
     172                 :                  sql, PQresultErrorMessage(res));
     173                 : 
     174                 :     /* sanity check the result set */
     175 CBC          11 :     if (PQnfields(res) != 1 || PQntuples(res) != 1 || PQgetisnull(res, 0, 0))
     176 UBC           0 :         pg_fatal("unexpected result set from query");
     177                 : 
     178 CBC          11 :     result = pg_strdup(PQgetvalue(res, 0, 0));
     179                 : 
     180              11 :     PQclear(res);
     181                 : 
     182              11 :     return result;
     183                 : }
     184                 : 
     185                 : /*
     186                 :  * Run a command.
     187                 :  *
     188                 :  * In the event of a failure, exit immediately.
     189                 :  */
     190                 : static void
     191              24 : run_simple_command(PGconn *conn, const char *sql)
     192                 : {
     193                 :     PGresult   *res;
     194                 : 
     195              24 :     res = PQexec(conn, sql);
     196                 : 
     197              24 :     if (PQresultStatus(res) != PGRES_COMMAND_OK)
     198 UBC           0 :         pg_fatal("error running query (%s) in source server: %s",
     199                 :                  sql, PQresultErrorMessage(res));
     200                 : 
     201 CBC          24 :     PQclear(res);
     202              24 : }
     203                 : 
     204                 : /*
     205                 :  * Call the pg_current_wal_insert_lsn() function in the remote system.
     206                 :  */
     207                 : static XLogRecPtr
     208               5 : libpq_get_current_wal_insert_lsn(rewind_source *source)
     209                 : {
     210               5 :     PGconn     *conn = ((libpq_source *) source)->conn;
     211                 :     XLogRecPtr  result;
     212                 :     uint32      hi;
     213                 :     uint32      lo;
     214                 :     char       *val;
     215                 : 
     216               5 :     val = run_simple_query(conn, "SELECT pg_current_wal_insert_lsn()");
     217                 : 
     218               5 :     if (sscanf(val, "%X/%X", &hi, &lo) != 2)
     219 UBC           0 :         pg_fatal("unrecognized result \"%s\" for current WAL insert location", val);
     220                 : 
     221 CBC           5 :     result = ((uint64) hi) << 32 | lo;
     222                 : 
     223               5 :     pg_free(val);
     224                 : 
     225               5 :     return result;
     226                 : }
     227                 : 
     228                 : /*
     229                 :  * Get a list of all files in the data directory.
     230                 :  */
     231                 : static void
     232               6 : libpq_traverse_files(rewind_source *source, process_file_callback_t callback)
     233                 : {
     234               6 :     PGconn     *conn = ((libpq_source *) source)->conn;
     235                 :     PGresult   *res;
     236                 :     const char *sql;
     237                 :     int         i;
     238                 : 
     239                 :     /*
     240                 :      * Create a recursive directory listing of the whole data directory.
     241                 :      *
     242                 :      * The WITH RECURSIVE part does most of the work. The second part gets the
     243                 :      * targets of the symlinks in pg_tblspc directory.
     244                 :      *
     245                 :      * XXX: There is no backend function to get a symbolic link's target in
     246                 :      * general, so if the admin has put any custom symbolic links in the data
     247                 :      * directory, they won't be copied correctly.
     248                 :      */
     249               6 :     sql =
     250                 :         "WITH RECURSIVE files (path, filename, size, isdir) AS (\n"
     251                 :         "  SELECT '' AS path, filename, size, isdir FROM\n"
     252                 :         "  (SELECT pg_ls_dir('.', true, false) AS filename) AS fn,\n"
     253                 :         "        pg_stat_file(fn.filename, true) AS this\n"
     254                 :         "  UNION ALL\n"
     255                 :         "  SELECT parent.path || parent.filename || '/' AS path,\n"
     256                 :         "         fn, this.size, this.isdir\n"
     257                 :         "  FROM files AS parent,\n"
     258                 :         "       pg_ls_dir(parent.path || parent.filename, true, false) AS fn,\n"
     259                 :         "       pg_stat_file(parent.path || parent.filename || '/' || fn, true) AS this\n"
     260                 :         "       WHERE parent.isdir = 't'\n"
     261                 :         ")\n"
     262                 :         "SELECT path || filename, size, isdir,\n"
     263                 :         "       pg_tablespace_location(pg_tablespace.oid) AS link_target\n"
     264                 :         "FROM files\n"
     265                 :         "LEFT OUTER JOIN pg_tablespace ON files.path = 'pg_tblspc/'\n"
     266                 :         "                             AND oid::text = files.filename\n";
     267               6 :     res = PQexec(conn, sql);
     268                 : 
     269               6 :     if (PQresultStatus(res) != PGRES_TUPLES_OK)
     270 UBC           0 :         pg_fatal("could not fetch file list: %s",
     271                 :                  PQresultErrorMessage(res));
     272                 : 
     273                 :     /* sanity check the result set */
     274 CBC           6 :     if (PQnfields(res) != 4)
     275 UBC           0 :         pg_fatal("unexpected result set while fetching file list");
     276                 : 
     277                 :     /* Read result to local variables */
     278 CBC        6953 :     for (i = 0; i < PQntuples(res); i++)
     279                 :     {
     280                 :         char       *path;
     281                 :         int64       filesize;
     282                 :         bool        isdir;
     283                 :         char       *link_target;
     284                 :         file_type_t type;
     285                 : 
     286            6947 :         if (PQgetisnull(res, i, 1))
     287                 :         {
     288                 :             /*
     289                 :              * The file was removed from the server while the query was
     290                 :              * running. Ignore it.
     291                 :              */
     292 UBC           0 :             continue;
     293                 :         }
     294                 : 
     295 CBC        6947 :         path = PQgetvalue(res, i, 0);
     296            6947 :         filesize = atol(PQgetvalue(res, i, 1));
     297            6947 :         isdir = (strcmp(PQgetvalue(res, i, 2), "t") == 0);
     298            6947 :         link_target = PQgetvalue(res, i, 3);
     299                 : 
     300            6947 :         if (link_target[0])
     301 UBC           0 :             type = FILE_TYPE_SYMLINK;
     302 CBC        6947 :         else if (isdir)
     303             159 :             type = FILE_TYPE_DIRECTORY;
     304                 :         else
     305            6788 :             type = FILE_TYPE_REGULAR;
     306                 : 
     307            6947 :         process_source_file(path, type, filesize, link_target);
     308                 :     }
     309               6 :     PQclear(res);
     310               6 : }
     311                 : 
     312                 : /*
     313                 :  * Queue up a request to fetch a file from remote system.
     314                 :  */
     315                 : static void
     316            1958 : libpq_queue_fetch_file(rewind_source *source, const char *path, size_t len)
     317                 : {
     318                 :     /*
     319                 :      * Truncate the target file immediately, and queue a request to fetch it
     320                 :      * from the source. If the file is small, smaller than MAX_CHUNK_SIZE,
     321                 :      * request fetching a full-sized chunk anyway, so that if the file has
     322                 :      * become larger in the source system, after we scanned the source
     323                 :      * directory, we still fetch the whole file. This only works for files up
     324                 :      * to MAX_CHUNK_SIZE, but that's good enough for small configuration files
     325                 :      * and such that are changed every now and then, but not WAL-logged. For
     326                 :      * larger files, we fetch up to the original size.
     327                 :      *
     328                 :      * Even with that mechanism, there is an inherent race condition if the
     329                 :      * file is modified at the same instant that we're copying it, so that we
     330                 :      * might copy a torn version of the file with one half from the old
     331                 :      * version and another half from the new. But pg_basebackup has the same
     332                 :      * problem, and it hasn't been a problem in practice.
     333                 :      *
     334                 :      * It might seem more natural to truncate the file later, when we receive
     335                 :      * it from the source server, but then we'd need to track which
     336                 :      * fetch-requests are for a whole file.
     337                 :      */
     338            1958 :     open_target_file(path, true);
     339            1958 :     libpq_queue_fetch_range(source, path, 0, Max(len, MAX_CHUNK_SIZE));
     340            1958 : }
     341                 : 
     342                 : /*
     343                 :  * Queue up a request to fetch a piece of a file from remote system.
     344                 :  */
     345                 : static void
     346            2772 : libpq_queue_fetch_range(rewind_source *source, const char *path, off_t off,
     347                 :                         size_t len)
     348                 : {
     349            2772 :     libpq_source *src = (libpq_source *) source;
     350                 : 
     351                 :     /*
     352                 :      * Does this request happen to be a continuation of the previous chunk? If
     353                 :      * so, merge it with the previous one.
     354                 :      *
     355                 :      * XXX: We use pointer equality to compare the path. That's good enough
     356                 :      * for our purposes; the caller always passes the same pointer for the
     357                 :      * same filename. If it didn't, we would fail to merge requests, but it
     358                 :      * wouldn't affect correctness.
     359                 :      */
     360            2772 :     if (src->num_requests > 0)
     361                 :     {
     362            2766 :         fetch_range_request *prev = &src->request_queue[src->num_requests - 1];
     363                 : 
     364            2766 :         if (prev->offset + prev->length == off &&
     365             622 :             prev->length < MAX_CHUNK_SIZE &&
     366             622 :             prev->path == path)
     367                 :         {
     368                 :             /*
     369                 :              * Extend the previous request to cover as much of this new
     370                 :              * request as possible, without exceeding MAX_CHUNK_SIZE.
     371                 :              */
     372                 :             size_t      thislen;
     373                 : 
     374             621 :             thislen = Min(len, MAX_CHUNK_SIZE - prev->length);
     375             621 :             prev->length += thislen;
     376                 : 
     377             621 :             off += thislen;
     378             621 :             len -= thislen;
     379                 : 
     380                 :             /*
     381                 :              * Fall through to create new requests for any remaining 'len'
     382                 :              * that didn't fit in the previous chunk.
     383                 :              */
     384                 :         }
     385                 :     }
     386                 : 
     387                 :     /* Divide the request into pieces of MAX_CHUNK_SIZE bytes each */
     388            5193 :     while (len > 0)
     389                 :     {
     390                 :         int32       thislen;
     391                 : 
     392                 :         /* if the queue is full, perform all the work queued up so far */
     393            2421 :         if (src->num_requests == MAX_CHUNKS_PER_QUERY)
     394 UBC           0 :             process_queued_fetch_requests(src);
     395                 : 
     396 CBC        2421 :         thislen = Min(len, MAX_CHUNK_SIZE);
     397            2421 :         src->request_queue[src->num_requests].path = path;
     398            2421 :         src->request_queue[src->num_requests].offset = off;
     399            2421 :         src->request_queue[src->num_requests].length = thislen;
     400            2421 :         src->num_requests++;
     401                 : 
     402            2421 :         off += thislen;
     403            2421 :         len -= thislen;
     404                 :     }
     405            2772 : }
     406                 : 
     407                 : /*
     408                 :  * Fetch all the queued chunks and write them to the target data directory.
     409                 :  */
     410                 : static void
     411               6 : libpq_finish_fetch(rewind_source *source)
     412                 : {
     413               6 :     process_queued_fetch_requests((libpq_source *) source);
     414               6 : }
     415                 : 
     416                 : static void
     417               6 : process_queued_fetch_requests(libpq_source *src)
     418                 : {
     419                 :     const char *params[3];
     420                 :     PGresult   *res;
     421                 :     int         chunkno;
     422                 : 
     423               6 :     if (src->num_requests == 0)
     424 UBC           0 :         return;
     425                 : 
     426 CBC           6 :     pg_log_debug("getting %d file chunks", src->num_requests);
     427                 : 
     428                 :     /*
     429                 :      * The prepared statement, 'fetch_chunks_stmt', takes three arrays with
     430                 :      * the same length as parameters: paths, offsets and lengths. Construct
     431                 :      * the string representations of them.
     432                 :      */
     433               6 :     resetStringInfo(&src->paths);
     434               6 :     resetStringInfo(&src->offsets);
     435               6 :     resetStringInfo(&src->lengths);
     436                 : 
     437               6 :     appendStringInfoChar(&src->paths, '{');
     438               6 :     appendStringInfoChar(&src->offsets, '{');
     439               6 :     appendStringInfoChar(&src->lengths, '{');
     440            2427 :     for (int i = 0; i < src->num_requests; i++)
     441                 :     {
     442            2421 :         fetch_range_request *rq = &src->request_queue[i];
     443                 : 
     444            2421 :         if (i > 0)
     445                 :         {
     446            2415 :             appendStringInfoChar(&src->paths, ',');
     447            2415 :             appendStringInfoChar(&src->offsets, ',');
     448            2415 :             appendStringInfoChar(&src->lengths, ',');
     449                 :         }
     450                 : 
     451            2421 :         appendArrayEscapedString(&src->paths, rq->path);
     452            2421 :         appendStringInfo(&src->offsets, INT64_FORMAT, (int64) rq->offset);
     453            2421 :         appendStringInfo(&src->lengths, INT64_FORMAT, (int64) rq->length);
     454                 :     }
     455               6 :     appendStringInfoChar(&src->paths, '}');
     456               6 :     appendStringInfoChar(&src->offsets, '}');
     457               6 :     appendStringInfoChar(&src->lengths, '}');
     458                 : 
     459                 :     /*
     460                 :      * Execute the prepared statement.
     461                 :      */
     462               6 :     params[0] = src->paths.data;
     463               6 :     params[1] = src->offsets.data;
     464               6 :     params[2] = src->lengths.data;
     465                 : 
     466               6 :     if (PQsendQueryPrepared(src->conn, "fetch_chunks_stmt", 3, params, NULL, NULL, 1) != 1)
     467 UBC           0 :         pg_fatal("could not send query: %s", PQerrorMessage(src->conn));
     468                 : 
     469 CBC           6 :     if (PQsetSingleRowMode(src->conn) != 1)
     470 UBC           0 :         pg_fatal("could not set libpq connection to single row mode");
     471                 : 
     472                 :     /*----
     473                 :      * The result set is of format:
     474                 :      *
     475                 :      * path     text    -- path in the data directory, e.g "base/1/123"
     476                 :      * begin    int8    -- offset within the file
     477                 :      * chunk    bytea   -- file content
     478                 :      *----
     479                 :      */
     480 CBC           6 :     chunkno = 0;
     481            2433 :     while ((res = PQgetResult(src->conn)) != NULL)
     482                 :     {
     483            2427 :         fetch_range_request *rq = &src->request_queue[chunkno];
     484                 :         char       *filename;
     485                 :         int         filenamelen;
     486                 :         int64       chunkoff;
     487                 :         int         chunksize;
     488                 :         char       *chunk;
     489                 : 
     490            2427 :         switch (PQresultStatus(res))
     491                 :         {
     492            2421 :             case PGRES_SINGLE_TUPLE:
     493            2421 :                 break;
     494                 : 
     495               6 :             case PGRES_TUPLES_OK:
     496               6 :                 PQclear(res);
     497               6 :                 continue;       /* final zero-row result */
     498                 : 
     499 UBC           0 :             default:
     500               0 :                 pg_fatal("unexpected result while fetching remote files: %s",
     501                 :                          PQresultErrorMessage(res));
     502                 :         }
     503                 : 
     504 CBC        2421 :         if (chunkno > src->num_requests)
     505 UBC           0 :             pg_fatal("received more data chunks than requested");
     506                 : 
     507                 :         /* sanity check the result set */
     508 CBC        2421 :         if (PQnfields(res) != 3 || PQntuples(res) != 1)
     509 UBC           0 :             pg_fatal("unexpected result set size while fetching remote files");
     510                 : 
     511 CBC        4842 :         if (PQftype(res, 0) != TEXTOID ||
     512            4842 :             PQftype(res, 1) != INT8OID ||
     513            2421 :             PQftype(res, 2) != BYTEAOID)
     514                 :         {
     515 UBC           0 :             pg_fatal("unexpected data types in result set while fetching remote files: %u %u %u",
     516                 :                      PQftype(res, 0), PQftype(res, 1), PQftype(res, 2));
     517                 :         }
     518                 : 
     519 CBC        2421 :         if (PQfformat(res, 0) != 1 &&
     520 UBC           0 :             PQfformat(res, 1) != 1 &&
     521               0 :             PQfformat(res, 2) != 1)
     522                 :         {
     523               0 :             pg_fatal("unexpected result format while fetching remote files");
     524                 :         }
     525                 : 
     526 CBC        4842 :         if (PQgetisnull(res, 0, 0) ||
     527            2421 :             PQgetisnull(res, 0, 1))
     528                 :         {
     529 UBC           0 :             pg_fatal("unexpected null values in result while fetching remote files");
     530                 :         }
     531                 : 
     532 CBC        2421 :         if (PQgetlength(res, 0, 1) != sizeof(int64))
     533 UBC           0 :             pg_fatal("unexpected result length while fetching remote files");
     534                 : 
     535                 :         /* Read result set to local variables */
     536 CBC        2421 :         memcpy(&chunkoff, PQgetvalue(res, 0, 1), sizeof(int64));
     537            2421 :         chunkoff = pg_ntoh64(chunkoff);
     538            2421 :         chunksize = PQgetlength(res, 0, 2);
     539                 : 
     540            2421 :         filenamelen = PQgetlength(res, 0, 0);
     541            2421 :         filename = pg_malloc(filenamelen + 1);
     542            2421 :         memcpy(filename, PQgetvalue(res, 0, 0), filenamelen);
     543            2421 :         filename[filenamelen] = '\0';
     544                 : 
     545            2421 :         chunk = PQgetvalue(res, 0, 2);
     546                 : 
     547                 :         /*
     548                 :          * If a file has been deleted on the source, remove it on the target
     549                 :          * as well.  Note that multiple unlink() calls may happen on the same
     550                 :          * file if multiple data chunks are associated with it, hence ignore
     551                 :          * unconditionally anything missing.
     552                 :          */
     553            2421 :         if (PQgetisnull(res, 0, 2))
     554                 :         {
     555 UBC           0 :             pg_log_debug("received null value for chunk for file \"%s\", file has been deleted",
     556                 :                          filename);
     557               0 :             remove_target_file(filename, true);
     558                 :         }
     559                 :         else
     560                 :         {
     561 CBC        2421 :             pg_log_debug("received chunk for file \"%s\", offset %lld, size %d",
     562                 :                          filename, (long long int) chunkoff, chunksize);
     563                 : 
     564            2421 :             if (strcmp(filename, rq->path) != 0)
     565                 :             {
     566 UBC           0 :                 pg_fatal("received data for file \"%s\", when requested for \"%s\"",
     567                 :                          filename, rq->path);
     568                 :             }
     569 CBC        2421 :             if (chunkoff != rq->offset)
     570 UBC           0 :                 pg_fatal("received data at offset %lld of file \"%s\", when requested for offset %lld",
     571                 :                          (long long int) chunkoff, rq->path, (long long int) rq->offset);
     572                 : 
     573                 :             /*
     574                 :              * We should not receive more data than we requested, or
     575                 :              * pg_read_binary_file() messed up.  We could receive less,
     576                 :              * though, if the file was truncated in the source after we
     577                 :              * checked its size. That's OK, there should be a WAL record of
     578                 :              * the truncation, which will get replayed when you start the
     579                 :              * target system for the first time after pg_rewind has completed.
     580                 :              */
     581 CBC        2421 :             if (chunksize > rq->length)
     582 UBC           0 :                 pg_fatal("received more than requested for file \"%s\"", rq->path);
     583                 : 
     584 CBC        2421 :             open_target_file(filename, false);
     585                 : 
     586            2421 :             write_target_range(chunk, chunkoff, chunksize);
     587                 :         }
     588                 : 
     589            2421 :         pg_free(filename);
     590                 : 
     591            2421 :         PQclear(res);
     592            2421 :         chunkno++;
     593                 :     }
     594               6 :     if (chunkno != src->num_requests)
     595 UBC           0 :         pg_fatal("unexpected number of data chunks received");
     596                 : 
     597 CBC           6 :     src->num_requests = 0;
     598                 : }
     599                 : 
     600                 : /*
     601                 :  * Escape a string to be used as element in a text array constant
     602                 :  */
     603                 : static void
     604            2421 : appendArrayEscapedString(StringInfo buf, const char *str)
     605                 : {
     606            2421 :     appendStringInfoCharMacro(buf, '\"');
     607           44222 :     while (*str)
     608                 :     {
     609           41801 :         char        ch = *str;
     610                 : 
     611           41801 :         if (ch == '"' || ch == '\\')
     612 UBC           0 :             appendStringInfoCharMacro(buf, '\\');
     613                 : 
     614 CBC       41801 :         appendStringInfoCharMacro(buf, ch);
     615                 : 
     616           41801 :         str++;
     617                 :     }
     618            2421 :     appendStringInfoCharMacro(buf, '\"');
     619            2421 : }
     620                 : 
     621                 : /*
     622                 :  * Fetch a single file as a malloc'd buffer.
     623                 :  */
     624                 : static char *
     625              17 : libpq_fetch_file(rewind_source *source, const char *path, size_t *filesize)
     626                 : {
     627              17 :     PGconn     *conn = ((libpq_source *) source)->conn;
     628                 :     PGresult   *res;
     629                 :     char       *result;
     630                 :     int         len;
     631                 :     const char *paramValues[1];
     632                 : 
     633              17 :     paramValues[0] = path;
     634              17 :     res = PQexecParams(conn, "SELECT pg_read_binary_file($1)",
     635                 :                        1, NULL, paramValues, NULL, NULL, 1);
     636                 : 
     637              17 :     if (PQresultStatus(res) != PGRES_TUPLES_OK)
     638 UBC           0 :         pg_fatal("could not fetch remote file \"%s\": %s",
     639                 :                  path, PQresultErrorMessage(res));
     640                 : 
     641                 :     /* sanity check the result set */
     642 CBC          17 :     if (PQntuples(res) != 1 || PQgetisnull(res, 0, 0))
     643 UBC           0 :         pg_fatal("unexpected result set while fetching remote file \"%s\"",
     644                 :                  path);
     645                 : 
     646                 :     /* Read result to local variables */
     647 CBC          17 :     len = PQgetlength(res, 0, 0);
     648              17 :     result = pg_malloc(len + 1);
     649              17 :     memcpy(result, PQgetvalue(res, 0, 0), len);
     650              17 :     result[len] = '\0';
     651                 : 
     652              17 :     PQclear(res);
     653                 : 
     654              17 :     pg_log_debug("fetched file \"%s\", length %d", path, len);
     655                 : 
     656              17 :     if (filesize)
     657              12 :         *filesize = len;
     658              17 :     return result;
     659                 : }
     660                 : 
     661                 : /*
     662                 :  * Close a libpq source.
     663                 :  */
     664                 : static void
     665               6 : libpq_destroy(rewind_source *source)
     666                 : {
     667               6 :     libpq_source *src = (libpq_source *) source;
     668                 : 
     669               6 :     pfree(src->paths.data);
     670               6 :     pfree(src->offsets.data);
     671               6 :     pfree(src->lengths.data);
     672               6 :     pfree(src);
     673                 : 
     674                 :     /* NOTE: we don't close the connection here, as it was not opened by us. */
     675               6 : }
        

Generated by: LCOV version v1.16-55-g56c0a2a