LCOV - differential code coverage report
Current view: top level - src/backend/backup - basebackup_copy.c (source / functions) Coverage Total Hit UNC GIC GNC CBC ECB DUB DCB
Current: Differential Code Coverage HEAD vs 15 Lines: 99.2 % 119 118 1 60 26 32 62 1 24
Current Date: 2023-04-08 15:15:32 Functions: 100.0 % 14 14 14 12 2
Baseline: 15
Baseline Date: 2023-04-08 15:09:40
Legend: Lines: hit not hit

           TLA  Line data    Source code
       1                 : /*-------------------------------------------------------------------------
       2                 :  *
       3                 :  * basebackup_copy.c
       4                 :  *    send basebackup archives using COPY OUT
       5                 :  *
       6                 :  * We send a result set with information about the tabelspaces to be included
       7                 :  * in the backup before starting COPY OUT. Then, we start a single COPY OUT
       8                 :  * operation and transmits all the archives and the manifest if present during
       9                 :  * the course of that single COPY OUT. Each CopyData message begins with a
      10                 :  * type byte, allowing us to signal the start of a new archive, or the
      11                 :  * manifest, by some means other than ending the COPY stream. This also allows
      12                 :  * for future protocol extensions, since we can include arbitrary information
      13                 :  * in the message stream as long as we're certain that the client will know
      14                 :  * what to do with it.
      15                 :  *
      16                 :  * An older method that sent each archive using a separate COPY OUT
      17                 :  * operation is no longer supported.
      18                 :  *
      19                 :  * Portions Copyright (c) 2010-2023, PostgreSQL Global Development Group
      20                 :  *
      21                 :  * IDENTIFICATION
      22                 :  *    src/backend/backup/basebackup_copy.c
      23                 :  *
      24                 :  *-------------------------------------------------------------------------
      25                 :  */
      26                 : #include "postgres.h"
      27                 : 
      28                 : #include "access/tupdesc.h"
      29                 : #include "backup/basebackup.h"
      30                 : #include "backup/basebackup_sink.h"
      31                 : #include "catalog/pg_type_d.h"
      32                 : #include "executor/executor.h"
      33                 : #include "libpq/libpq.h"
      34                 : #include "libpq/pqformat.h"
      35                 : #include "tcop/dest.h"
      36                 : #include "utils/builtins.h"
      37                 : #include "utils/timestamp.h"
      38                 : 
      39                 : typedef struct bbsink_copystream
      40                 : {
      41                 :     /* Common information for all types of sink. */
      42                 :     bbsink      base;
      43                 : 
      44                 :     /* Are we sending the archives to the client, or somewhere else? */
      45                 :     bool        send_to_client;
      46                 : 
      47                 :     /*
      48                 :      * Protocol message buffer. We assemble CopyData protocol messages by
      49                 :      * setting the first character of this buffer to 'd' (archive or manifest
      50                 :      * data) and then making base.bbs_buffer point to the second character so
      51                 :      * that the rest of the data gets copied into the message just where we
      52                 :      * want it.
      53                 :      */
      54                 :     char       *msgbuffer;
      55                 : 
      56                 :     /*
      57                 :      * When did we last report progress to the client, and how much progress
      58                 :      * did we report?
      59                 :      */
      60                 :     TimestampTz last_progress_report_time;
      61                 :     uint64      bytes_done_at_last_time_check;
      62                 : } bbsink_copystream;
      63                 : 
      64                 : /*
      65                 :  * We don't want to send progress messages to the client excessively
      66                 :  * frequently. Ideally, we'd like to send a message when the time since the
      67                 :  * last message reaches PROGRESS_REPORT_MILLISECOND_THRESHOLD, but checking
      68                 :  * the system time every time we send a tiny bit of data seems too expensive.
      69                 :  * So we only check it after the number of bytes sine the last check reaches
      70                 :  * PROGRESS_REPORT_BYTE_INTERVAL.
      71                 :  */
      72                 : #define PROGRESS_REPORT_BYTE_INTERVAL               65536
      73                 : #define PROGRESS_REPORT_MILLISECOND_THRESHOLD       1000
      74                 : 
      75                 : static void bbsink_copystream_begin_backup(bbsink *sink);
      76                 : static void bbsink_copystream_begin_archive(bbsink *sink,
      77                 :                                             const char *archive_name);
      78                 : static void bbsink_copystream_archive_contents(bbsink *sink, size_t len);
      79                 : static void bbsink_copystream_end_archive(bbsink *sink);
      80                 : static void bbsink_copystream_begin_manifest(bbsink *sink);
      81                 : static void bbsink_copystream_manifest_contents(bbsink *sink, size_t len);
      82                 : static void bbsink_copystream_end_manifest(bbsink *sink);
      83                 : static void bbsink_copystream_end_backup(bbsink *sink, XLogRecPtr endptr,
      84                 :                                          TimeLineID endtli);
      85                 : static void bbsink_copystream_cleanup(bbsink *sink);
      86                 : 
      87                 : static void SendCopyOutResponse(void);
      88                 : static void SendCopyDone(void);
      89                 : static void SendXlogRecPtrResult(XLogRecPtr ptr, TimeLineID tli);
      90                 : static void SendTablespaceList(List *tablespaces);
      91                 : 
      92                 : static const bbsink_ops bbsink_copystream_ops = {
      93                 :     .begin_backup = bbsink_copystream_begin_backup,
      94                 :     .begin_archive = bbsink_copystream_begin_archive,
      95                 :     .archive_contents = bbsink_copystream_archive_contents,
      96                 :     .end_archive = bbsink_copystream_end_archive,
      97                 :     .begin_manifest = bbsink_copystream_begin_manifest,
      98                 :     .manifest_contents = bbsink_copystream_manifest_contents,
      99                 :     .end_manifest = bbsink_copystream_end_manifest,
     100                 :     .end_backup = bbsink_copystream_end_backup,
     101                 :     .cleanup = bbsink_copystream_cleanup
     102                 : };
     103                 : 
     104                 : /*
     105                 :  * Create a new 'copystream' bbsink.
     106                 :  */
     107                 : bbsink *
     108 GIC         129 : bbsink_copystream_new(bool send_to_client)
     109                 : {
     110             129 :     bbsink_copystream *sink = palloc0(sizeof(bbsink_copystream));
     111 ECB             : 
     112 GIC         129 :     *((const bbsink_ops **) &sink->base.bbs_ops) = &bbsink_copystream_ops;
     113 CBC         129 :     sink->send_to_client = send_to_client;
     114                 : 
     115 ECB             :     /* Set up for periodic progress reporting. */
     116 CBC         129 :     sink->last_progress_report_time = GetCurrentTimestamp();
     117 GIC         129 :     sink->bytes_done_at_last_time_check = UINT64CONST(0);
     118                 : 
     119 CBC         129 :     return &sink->base;
     120 ECB             : }
     121                 : 
     122                 : /*
     123                 :  * Send start-of-backup wire protocol messages.
     124                 :  */
     125                 : static void
     126 GIC         126 : bbsink_copystream_begin_backup(bbsink *sink)
     127                 : {
     128             126 :     bbsink_copystream *mysink = (bbsink_copystream *) sink;
     129 CBC         126 :     bbsink_state *state = sink->bbs_state;
     130                 :     char       *buf;
     131 ECB             : 
     132                 :     /*
     133                 :      * Initialize buffer. We ultimately want to send the archive and manifest
     134                 :      * data by means of CopyData messages where the payload portion of each
     135                 :      * message begins with a type byte. However, basebackup.c expects the
     136                 :      * buffer to be aligned, so we can't just allocate one extra byte for the
     137                 :      * type byte. Instead, allocate enough extra bytes that the portion of the
     138                 :      * buffer we reveal to our callers can be aligned, while leaving room to
     139                 :      * slip the type byte in just beforehand.  That will allow us to ship the
     140                 :      * data with a single call to pq_putmessage and without needing any extra
     141                 :      * copying.
     142                 :      */
     143 GIC         126 :     buf = palloc(mysink->base.bbs_buffer_length + MAXIMUM_ALIGNOF);
     144             126 :     mysink->msgbuffer = buf + (MAXIMUM_ALIGNOF - 1);
     145             126 :     mysink->base.bbs_buffer = buf + MAXIMUM_ALIGNOF;
     146 CBC         126 :     mysink->msgbuffer[0] = 'd'; /* archive or manifest data */
     147 ECB             : 
     148                 :     /* Tell client the backup start location. */
     149 CBC         126 :     SendXlogRecPtrResult(state->startptr, state->starttli);
     150                 : 
     151                 :     /* Send client a list of tablespaces. */
     152             126 :     SendTablespaceList(state->tablespaces);
     153                 : 
     154                 :     /* Send a CommandComplete message */
     155             126 :     pq_puttextmessage('C', "SELECT");
     156                 : 
     157                 :     /* Begin COPY stream. This will be used for all archives + manifest. */
     158             126 :     SendCopyOutResponse();
     159 GIC         126 : }
     160                 : 
     161 ECB             : /*
     162                 :  * Send a CopyData message announcing the beginning of a new archive.
     163                 :  */
     164                 : static void
     165 GIC         142 : bbsink_copystream_begin_archive(bbsink *sink, const char *archive_name)
     166                 : {
     167             142 :     bbsink_state *state = sink->bbs_state;
     168 ECB             :     tablespaceinfo *ti;
     169                 :     StringInfoData buf;
     170                 : 
     171 GIC         142 :     ti = list_nth(state->tablespaces, state->tablespace_num);
     172             142 :     pq_beginmessage(&buf, 'd'); /* CopyData */
     173             142 :     pq_sendbyte(&buf, 'n');     /* New archive */
     174 CBC         142 :     pq_sendstring(&buf, archive_name);
     175             142 :     pq_sendstring(&buf, ti->path == NULL ? "" : ti->path);
     176             142 :     pq_endmessage(&buf);
     177             142 : }
     178 ECB             : 
     179                 : /*
     180                 :  * Send a CopyData message containing a chunk of archive content.
     181                 :  */
     182                 : static void
     183 GIC      260249 : bbsink_copystream_archive_contents(bbsink *sink, size_t len)
     184                 : {
     185          260249 :     bbsink_copystream *mysink = (bbsink_copystream *) sink;
     186 CBC      260249 :     bbsink_state *state = mysink->base.bbs_state;
     187                 :     StringInfoData buf;
     188 ECB             :     uint64      targetbytes;
     189                 : 
     190                 :     /* Send the archive content to the client, if appropriate. */
     191 GIC      260249 :     if (mysink->send_to_client)
     192                 :     {
     193                 :         /* Add one because we're also sending a leading type byte. */
     194 CBC      245165 :         pq_putmessage('d', mysink->msgbuffer, len + 1);
     195                 :     }
     196                 : 
     197 ECB             :     /* Consider whether to send a progress report to the client. */
     198 GIC      260249 :     targetbytes = mysink->bytes_done_at_last_time_check
     199                 :         + PROGRESS_REPORT_BYTE_INTERVAL;
     200          260249 :     if (targetbytes <= state->bytes_done)
     201 ECB             :     {
     202 GIC       42673 :         TimestampTz now = GetCurrentTimestamp();
     203 ECB             :         long        ms;
     204                 : 
     205                 :         /*
     206                 :          * OK, we've sent a decent number of bytes, so check the system time
     207                 :          * to see whether we're due to send a progress report.
     208                 :          */
     209 GIC       42673 :         mysink->bytes_done_at_last_time_check = state->bytes_done;
     210           42673 :         ms = TimestampDifferenceMilliseconds(mysink->last_progress_report_time,
     211                 :                                              now);
     212 ECB             : 
     213                 :         /*
     214                 :          * Send a progress report if enough time has passed. Also send one if
     215                 :          * the system clock was set backward, so that such occurrences don't
     216                 :          * have the effect of suppressing further progress messages.
     217                 :          */
     218 GNC       42673 :         if (ms >= PROGRESS_REPORT_MILLISECOND_THRESHOLD ||
     219           42651 :             now < mysink->last_progress_report_time)
     220                 :         {
     221 GIC          22 :             mysink->last_progress_report_time = now;
     222 ECB             : 
     223 CBC          22 :             pq_beginmessage(&buf, 'd'); /* CopyData */
     224 GIC          22 :             pq_sendbyte(&buf, 'p'); /* Progress report */
     225 CBC          22 :             pq_sendint64(&buf, state->bytes_done);
     226 GIC          22 :             pq_endmessage(&buf);
     227 CBC          22 :             pq_flush_if_writable();
     228 ECB             :         }
     229                 :     }
     230 CBC      260249 : }
     231 ECB             : 
     232                 : /*
     233                 :  * We don't need to explicitly signal the end of the archive; the client
     234                 :  * will figure out that we've reached the end when we begin the next one,
     235                 :  * or begin the manifest, or end the COPY stream. However, this seems like
     236                 :  * a good time to force out a progress report. One reason for that is that
     237                 :  * if this is the last archive, and we don't force a progress report now,
     238                 :  * the client will never be told that we sent all the bytes.
     239                 :  */
     240                 : static void
     241 GIC         136 : bbsink_copystream_end_archive(bbsink *sink)
     242                 : {
     243             136 :     bbsink_copystream *mysink = (bbsink_copystream *) sink;
     244             136 :     bbsink_state *state = mysink->base.bbs_state;
     245 ECB             :     StringInfoData buf;
     246                 : 
     247 CBC         136 :     mysink->bytes_done_at_last_time_check = state->bytes_done;
     248             136 :     mysink->last_progress_report_time = GetCurrentTimestamp();
     249 GIC         136 :     pq_beginmessage(&buf, 'd'); /* CopyData */
     250             136 :     pq_sendbyte(&buf, 'p');     /* Progress report */
     251 CBC         136 :     pq_sendint64(&buf, state->bytes_done);
     252             136 :     pq_endmessage(&buf);
     253             136 :     pq_flush_if_writable();
     254             136 : }
     255 ECB             : 
     256                 : /*
     257                 :  * Send a CopyData message announcing the beginning of the backup manifest.
     258                 :  */
     259                 : static void
     260 GIC         119 : bbsink_copystream_begin_manifest(bbsink *sink)
     261                 : {
     262                 :     StringInfoData buf;
     263                 : 
     264 CBC         119 :     pq_beginmessage(&buf, 'd'); /* CopyData */
     265 GIC         119 :     pq_sendbyte(&buf, 'm');     /* Manifest */
     266             119 :     pq_endmessage(&buf);
     267             119 : }
     268 ECB             : 
     269                 : /*
     270                 :  * Each chunk of manifest data is sent using a CopyData message.
     271                 :  */
     272                 : static void
     273 GIC         607 : bbsink_copystream_manifest_contents(bbsink *sink, size_t len)
     274                 : {
     275             607 :     bbsink_copystream *mysink = (bbsink_copystream *) sink;
     276                 : 
     277 CBC         607 :     if (mysink->send_to_client)
     278                 :     {
     279 ECB             :         /* Add one because we're also sending a leading type byte. */
     280 GIC         557 :         pq_putmessage('d', mysink->msgbuffer, len + 1);
     281 ECB             :     }
     282 GIC         607 : }
     283                 : 
     284 ECB             : /*
     285                 :  * We don't need an explicit terminator for the backup manifest.
     286                 :  */
     287                 : static void
     288 GIC         119 : bbsink_copystream_end_manifest(bbsink *sink)
     289                 : {
     290                 :     /* Do nothing. */
     291             119 : }
     292 ECB             : 
     293                 : /*
     294                 :  * Send end-of-backup wire protocol messages.
     295                 :  */
     296                 : static void
     297 GIC         120 : bbsink_copystream_end_backup(bbsink *sink, XLogRecPtr endptr,
     298                 :                              TimeLineID endtli)
     299                 : {
     300             120 :     SendCopyDone();
     301 CBC         120 :     SendXlogRecPtrResult(endptr, endtli);
     302 GIC         120 : }
     303                 : 
     304 ECB             : /*
     305                 :  * Cleanup.
     306                 :  */
     307                 : static void
     308 GIC         115 : bbsink_copystream_cleanup(bbsink *sink)
     309                 : {
     310                 :     /* Nothing to do. */
     311             115 : }
     312 ECB             : 
     313                 : /*
     314                 :  * Send a CopyOutResponse message.
     315                 :  */
     316                 : static void
     317 GIC         126 : SendCopyOutResponse(void)
     318                 : {
     319                 :     StringInfoData buf;
     320                 : 
     321 CBC         126 :     pq_beginmessage(&buf, 'H');
     322 GIC         126 :     pq_sendbyte(&buf, 0);       /* overall format */
     323             126 :     pq_sendint16(&buf, 0);      /* natts */
     324             126 :     pq_endmessage(&buf);
     325 CBC         126 : }
     326 ECB             : 
     327                 : /*
     328                 :  * Send a CopyDone message.
     329                 :  */
     330                 : static void
     331 GIC         120 : SendCopyDone(void)
     332                 : {
     333             120 :     pq_putemptymessage('c');
     334             120 : }
     335 ECB             : 
     336                 : /*
     337                 :  * Send a single resultset containing just a single
     338                 :  * XLogRecPtr record (in text format)
     339                 :  */
     340                 : static void
     341 GIC         246 : SendXlogRecPtrResult(XLogRecPtr ptr, TimeLineID tli)
     342                 : {
     343                 :     DestReceiver *dest;
     344                 :     TupOutputState *tstate;
     345                 :     TupleDesc   tupdesc;
     346                 :     Datum       values[2];
     347 GNC         246 :     bool        nulls[2] = {0};
     348                 : 
     349             246 :     dest = CreateDestReceiver(DestRemoteSimple);
     350                 : 
     351             246 :     tupdesc = CreateTemplateTupleDesc(2);
     352             246 :     TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 1, "recptr", TEXTOID, -1, 0);
     353                 :     /*
     354 ECB             :      * int8 may seem like a surprising data type for this, but in theory int4
     355                 :      * would not be wide enough for this, as TimeLineID is unsigned.
     356                 :      */
     357 GNC         246 :     TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 2, "tli", INT8OID, -1, 0);
     358                 : 
     359                 :     /* send RowDescription */
     360             246 :     tstate = begin_tup_output_tupdesc(dest, tupdesc, &TTSOpsVirtual);
     361                 : 
     362                 :     /* Data row */
     363             246 :     values[0]= CStringGetTextDatum(psprintf("%X/%X", LSN_FORMAT_ARGS(ptr)));
     364             246 :     values[1] = Int64GetDatum(tli);
     365             246 :     do_tup_output(tstate, values, nulls);
     366                 : 
     367             246 :     end_tup_output(tstate);
     368                 : 
     369 ECB             :     /* Send a CommandComplete message */
     370 GIC         246 :     pq_puttextmessage('C', "SELECT");
     371 CBC         246 : }
     372 ECB             : 
     373                 : /*
     374                 :  * Send a result set via libpq describing the tablespace list.
     375                 :  */
     376                 : static void
     377 CBC         126 : SendTablespaceList(List *tablespaces)
     378                 : {
     379                 :     DestReceiver *dest;
     380                 :     TupOutputState *tstate;
     381                 :     TupleDesc   tupdesc;
     382 ECB             :     ListCell   *lc;
     383                 : 
     384 GNC         126 :     dest = CreateDestReceiver(DestRemoteSimple);
     385                 : 
     386             126 :     tupdesc = CreateTemplateTupleDesc(3);
     387             126 :     TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 1, "spcoid", OIDOID, -1, 0);
     388             126 :     TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 2, "spclocation", TEXTOID, -1, 0);
     389             126 :     TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 3, "size", INT8OID, -1, 0);
     390                 : 
     391                 :     /* send RowDescription */
     392             126 :     tstate = begin_tup_output_tupdesc(dest, tupdesc, &TTSOpsVirtual);
     393                 : 
     394 ECB             :     /* Construct and send the directory information */
     395 GIC         268 :     foreach(lc, tablespaces)
     396                 :     {
     397             142 :         tablespaceinfo *ti = lfirst(lc);
     398                 :         Datum       values[3];
     399 GNC         142 :         bool        nulls[3] = {0};
     400                 : 
     401                 :         /* Send one datarow message */
     402 GIC         142 :         if (ti->path == NULL)
     403                 :         {
     404 GNC         126 :             nulls[0] = true;
     405             126 :             nulls[1] = true;
     406                 :         }
     407                 :         else
     408                 :         {
     409              16 :             values[0] = ObjectIdGetDatum(strtoul(ti->oid, NULL, 10));
     410              16 :             values[1] = CStringGetTextDatum(ti->path);
     411                 :         }
     412 GIC         142 :         if (ti->size >= 0)
     413 GNC         142 :             values[2] = Int64GetDatum(ti->size / 1024);
     414                 :         else
     415 UNC           0 :             nulls[2] = true;
     416                 : 
     417 GNC         142 :         do_tup_output(tstate, values, nulls);
     418                 :     }
     419                 : 
     420             126 :     end_tup_output(tstate);
     421 GIC         126 : }
        

Generated by: LCOV version v1.16-55-g56c0a2a