LCOV - differential code coverage report
Current view: top level - src/backend/backup - basebackup_copy.c (source / functions) Coverage Total Hit UNC GIC GNC CBC ECB DUB DCB
Current: Differential Code Coverage HEAD vs 15 Lines: 99.2 % 119 118 1 60 26 32 62 1 24
Current Date: 2023-04-08 17:13:01 Functions: 100.0 % 14 14 14 12 2
Baseline: 15 Line coverage date bins:
Baseline Date: 2023-04-08 15:09:40 (60,120] days: 100.0 % 2 2 2
Legend: Lines: hit not hit (240..) days: 99.1 % 117 116 1 60 24 32 62
Function coverage date bins:
(240..) days: 53.8 % 26 14 14 12

 Age         Owner                  TLA  Line data    Source code
                                  1                 : /*-------------------------------------------------------------------------
                                  2                 :  *
                                  3                 :  * basebackup_copy.c
                                  4                 :  *    send basebackup archives using COPY OUT
                                  5                 :  *
                                  6                 :  * We send a result set with information about the tabelspaces to be included
                                  7                 :  * in the backup before starting COPY OUT. Then, we start a single COPY OUT
                                  8                 :  * operation and transmits all the archives and the manifest if present during
                                  9                 :  * the course of that single COPY OUT. Each CopyData message begins with a
                                 10                 :  * type byte, allowing us to signal the start of a new archive, or the
                                 11                 :  * manifest, by some means other than ending the COPY stream. This also allows
                                 12                 :  * for future protocol extensions, since we can include arbitrary information
                                 13                 :  * in the message stream as long as we're certain that the client will know
                                 14                 :  * what to do with it.
                                 15                 :  *
                                 16                 :  * An older method that sent each archive using a separate COPY OUT
                                 17                 :  * operation is no longer supported.
                                 18                 :  *
                                 19                 :  * Portions Copyright (c) 2010-2023, PostgreSQL Global Development Group
                                 20                 :  *
                                 21                 :  * IDENTIFICATION
                                 22                 :  *    src/backend/backup/basebackup_copy.c
                                 23                 :  *
                                 24                 :  *-------------------------------------------------------------------------
                                 25                 :  */
                                 26                 : #include "postgres.h"
                                 27                 : 
                                 28                 : #include "access/tupdesc.h"
                                 29                 : #include "backup/basebackup.h"
                                 30                 : #include "backup/basebackup_sink.h"
                                 31                 : #include "catalog/pg_type_d.h"
                                 32                 : #include "executor/executor.h"
                                 33                 : #include "libpq/libpq.h"
                                 34                 : #include "libpq/pqformat.h"
                                 35                 : #include "tcop/dest.h"
                                 36                 : #include "utils/builtins.h"
                                 37                 : #include "utils/timestamp.h"
                                 38                 : 
                                 39                 : typedef struct bbsink_copystream
                                 40                 : {
                                 41                 :     /* Common information for all types of sink. */
                                 42                 :     bbsink      base;
                                 43                 : 
                                 44                 :     /* Are we sending the archives to the client, or somewhere else? */
                                 45                 :     bool        send_to_client;
                                 46                 : 
                                 47                 :     /*
                                 48                 :      * Protocol message buffer. We assemble CopyData protocol messages by
                                 49                 :      * setting the first character of this buffer to 'd' (archive or manifest
                                 50                 :      * data) and then making base.bbs_buffer point to the second character so
                                 51                 :      * that the rest of the data gets copied into the message just where we
                                 52                 :      * want it.
                                 53                 :      */
                                 54                 :     char       *msgbuffer;
                                 55                 : 
                                 56                 :     /*
                                 57                 :      * When did we last report progress to the client, and how much progress
                                 58                 :      * did we report?
                                 59                 :      */
                                 60                 :     TimestampTz last_progress_report_time;
                                 61                 :     uint64      bytes_done_at_last_time_check;
                                 62                 : } bbsink_copystream;
                                 63                 : 
                                 64                 : /*
                                 65                 :  * We don't want to send progress messages to the client excessively
                                 66                 :  * frequently. Ideally, we'd like to send a message when the time since the
                                 67                 :  * last message reaches PROGRESS_REPORT_MILLISECOND_THRESHOLD, but checking
                                 68                 :  * the system time every time we send a tiny bit of data seems too expensive.
                                 69                 :  * So we only check it after the number of bytes sine the last check reaches
                                 70                 :  * PROGRESS_REPORT_BYTE_INTERVAL.
                                 71                 :  */
                                 72                 : #define PROGRESS_REPORT_BYTE_INTERVAL               65536
                                 73                 : #define PROGRESS_REPORT_MILLISECOND_THRESHOLD       1000
                                 74                 : 
                                 75                 : static void bbsink_copystream_begin_backup(bbsink *sink);
                                 76                 : static void bbsink_copystream_begin_archive(bbsink *sink,
                                 77                 :                                             const char *archive_name);
                                 78                 : static void bbsink_copystream_archive_contents(bbsink *sink, size_t len);
                                 79                 : static void bbsink_copystream_end_archive(bbsink *sink);
                                 80                 : static void bbsink_copystream_begin_manifest(bbsink *sink);
                                 81                 : static void bbsink_copystream_manifest_contents(bbsink *sink, size_t len);
                                 82                 : static void bbsink_copystream_end_manifest(bbsink *sink);
                                 83                 : static void bbsink_copystream_end_backup(bbsink *sink, XLogRecPtr endptr,
                                 84                 :                                          TimeLineID endtli);
                                 85                 : static void bbsink_copystream_cleanup(bbsink *sink);
                                 86                 : 
                                 87                 : static void SendCopyOutResponse(void);
                                 88                 : static void SendCopyDone(void);
                                 89                 : static void SendXlogRecPtrResult(XLogRecPtr ptr, TimeLineID tli);
                                 90                 : static void SendTablespaceList(List *tablespaces);
                                 91                 : 
                                 92                 : static const bbsink_ops bbsink_copystream_ops = {
                                 93                 :     .begin_backup = bbsink_copystream_begin_backup,
                                 94                 :     .begin_archive = bbsink_copystream_begin_archive,
                                 95                 :     .archive_contents = bbsink_copystream_archive_contents,
                                 96                 :     .end_archive = bbsink_copystream_end_archive,
                                 97                 :     .begin_manifest = bbsink_copystream_begin_manifest,
                                 98                 :     .manifest_contents = bbsink_copystream_manifest_contents,
                                 99                 :     .end_manifest = bbsink_copystream_end_manifest,
                                100                 :     .end_backup = bbsink_copystream_end_backup,
                                101                 :     .cleanup = bbsink_copystream_cleanup
                                102                 : };
                                103                 : 
                                104                 : /*
                                105                 :  * Create a new 'copystream' bbsink.
                                106                 :  */
                                107                 : bbsink *
  509 rhaas                     108 GIC         129 : bbsink_copystream_new(bool send_to_client)
                                109                 : {
  446                           110             129 :     bbsink_copystream *sink = palloc0(sizeof(bbsink_copystream));
  446 rhaas                     111 ECB             : 
  446 rhaas                     112 GIC         129 :     *((const bbsink_ops **) &sink->base.bbs_ops) = &bbsink_copystream_ops;
  509 rhaas                     113 CBC         129 :     sink->send_to_client = send_to_client;
                                114                 : 
  446 rhaas                     115 ECB             :     /* Set up for periodic progress reporting. */
  446 rhaas                     116 CBC         129 :     sink->last_progress_report_time = GetCurrentTimestamp();
  446 rhaas                     117 GIC         129 :     sink->bytes_done_at_last_time_check = UINT64CONST(0);
                                118                 : 
  446 rhaas                     119 CBC         129 :     return &sink->base;
  446 rhaas                     120 ECB             : }
                                121                 : 
                                122                 : /*
                                123                 :  * Send start-of-backup wire protocol messages.
                                124                 :  */
                                125                 : static void
  446 rhaas                     126 GIC         126 : bbsink_copystream_begin_backup(bbsink *sink)
                                127                 : {
                                128             126 :     bbsink_copystream *mysink = (bbsink_copystream *) sink;
  446 rhaas                     129 CBC         126 :     bbsink_state *state = sink->bbs_state;
                                130                 :     char       *buf;
  446 rhaas                     131 ECB             : 
                                132                 :     /*
                                133                 :      * Initialize buffer. We ultimately want to send the archive and manifest
                                134                 :      * data by means of CopyData messages where the payload portion of each
                                135                 :      * message begins with a type byte. However, basebackup.c expects the
                                136                 :      * buffer to be aligned, so we can't just allocate one extra byte for the
                                137                 :      * type byte. Instead, allocate enough extra bytes that the portion of the
                                138                 :      * buffer we reveal to our callers can be aligned, while leaving room to
                                139                 :      * slip the type byte in just beforehand.  That will allow us to ship the
                                140                 :      * data with a single call to pq_putmessage and without needing any extra
                                141                 :      * copying.
                                142                 :      */
  445 rhaas                     143 GIC         126 :     buf = palloc(mysink->base.bbs_buffer_length + MAXIMUM_ALIGNOF);
                                144             126 :     mysink->msgbuffer = buf + (MAXIMUM_ALIGNOF - 1);
                                145             126 :     mysink->base.bbs_buffer = buf + MAXIMUM_ALIGNOF;
  446 rhaas                     146 CBC         126 :     mysink->msgbuffer[0] = 'd'; /* archive or manifest data */
  446 rhaas                     147 ECB             : 
                                148                 :     /* Tell client the backup start location. */
  446 rhaas                     149 CBC         126 :     SendXlogRecPtrResult(state->startptr, state->starttli);
                                150                 : 
                                151                 :     /* Send client a list of tablespaces. */
                                152             126 :     SendTablespaceList(state->tablespaces);
                                153                 : 
                                154                 :     /* Send a CommandComplete message */
                                155             126 :     pq_puttextmessage('C', "SELECT");
                                156                 : 
                                157                 :     /* Begin COPY stream. This will be used for all archives + manifest. */
                                158             126 :     SendCopyOutResponse();
  446 rhaas                     159 GIC         126 : }
                                160                 : 
  446 rhaas                     161 ECB             : /*
                                162                 :  * Send a CopyData message announcing the beginning of a new archive.
                                163                 :  */
                                164                 : static void
  446 rhaas                     165 GIC         142 : bbsink_copystream_begin_archive(bbsink *sink, const char *archive_name)
                                166                 : {
                                167             142 :     bbsink_state *state = sink->bbs_state;
  446 rhaas                     168 ECB             :     tablespaceinfo *ti;
                                169                 :     StringInfoData buf;
                                170                 : 
  446 rhaas                     171 GIC         142 :     ti = list_nth(state->tablespaces, state->tablespace_num);
                                172             142 :     pq_beginmessage(&buf, 'd'); /* CopyData */
                                173             142 :     pq_sendbyte(&buf, 'n');     /* New archive */
  446 rhaas                     174 CBC         142 :     pq_sendstring(&buf, archive_name);
                                175             142 :     pq_sendstring(&buf, ti->path == NULL ? "" : ti->path);
                                176             142 :     pq_endmessage(&buf);
                                177             142 : }
  446 rhaas                     178 ECB             : 
                                179                 : /*
                                180                 :  * Send a CopyData message containing a chunk of archive content.
                                181                 :  */
                                182                 : static void
  446 rhaas                     183 GIC      260249 : bbsink_copystream_archive_contents(bbsink *sink, size_t len)
                                184                 : {
                                185          260249 :     bbsink_copystream *mysink = (bbsink_copystream *) sink;
  446 rhaas                     186 CBC      260249 :     bbsink_state *state = mysink->base.bbs_state;
                                187                 :     StringInfoData buf;
  446 rhaas                     188 ECB             :     uint64      targetbytes;
                                189                 : 
                                190                 :     /* Send the archive content to the client, if appropriate. */
  509 rhaas                     191 GIC      260249 :     if (mysink->send_to_client)
                                192                 :     {
                                193                 :         /* Add one because we're also sending a leading type byte. */
  509 rhaas                     194 CBC      245165 :         pq_putmessage('d', mysink->msgbuffer, len + 1);
                                195                 :     }
                                196                 : 
  446 rhaas                     197 ECB             :     /* Consider whether to send a progress report to the client. */
  446 rhaas                     198 GIC      260249 :     targetbytes = mysink->bytes_done_at_last_time_check
                                199                 :         + PROGRESS_REPORT_BYTE_INTERVAL;
                                200          260249 :     if (targetbytes <= state->bytes_done)
  446 rhaas                     201 ECB             :     {
  446 rhaas                     202 GIC       42673 :         TimestampTz now = GetCurrentTimestamp();
  446 rhaas                     203 ECB             :         long        ms;
                                204                 : 
                                205                 :         /*
                                206                 :          * OK, we've sent a decent number of bytes, so check the system time
                                207                 :          * to see whether we're due to send a progress report.
                                208                 :          */
  446 rhaas                     209 GIC       42673 :         mysink->bytes_done_at_last_time_check = state->bytes_done;
                                210           42673 :         ms = TimestampDifferenceMilliseconds(mysink->last_progress_report_time,
                                211                 :                                              now);
  446 rhaas                     212 ECB             : 
                                213                 :         /*
                                214                 :          * Send a progress report if enough time has passed. Also send one if
                                215                 :          * the system clock was set backward, so that such occurrences don't
                                216                 :          * have the effect of suppressing further progress messages.
                                217                 :          */
   73 tgl                       218 GNC       42673 :         if (ms >= PROGRESS_REPORT_MILLISECOND_THRESHOLD ||
                                219           42651 :             now < mysink->last_progress_report_time)
                                220                 :         {
  446 rhaas                     221 GIC          22 :             mysink->last_progress_report_time = now;
  446 rhaas                     222 ECB             : 
  446 rhaas                     223 CBC          22 :             pq_beginmessage(&buf, 'd'); /* CopyData */
  446 rhaas                     224 GIC          22 :             pq_sendbyte(&buf, 'p'); /* Progress report */
  446 rhaas                     225 CBC          22 :             pq_sendint64(&buf, state->bytes_done);
  446 rhaas                     226 GIC          22 :             pq_endmessage(&buf);
  446 rhaas                     227 CBC          22 :             pq_flush_if_writable();
  446 rhaas                     228 ECB             :         }
                                229                 :     }
  446 rhaas                     230 CBC      260249 : }
  446 rhaas                     231 ECB             : 
                                232                 : /*
                                233                 :  * We don't need to explicitly signal the end of the archive; the client
                                234                 :  * will figure out that we've reached the end when we begin the next one,
                                235                 :  * or begin the manifest, or end the COPY stream. However, this seems like
                                236                 :  * a good time to force out a progress report. One reason for that is that
                                237                 :  * if this is the last archive, and we don't force a progress report now,
                                238                 :  * the client will never be told that we sent all the bytes.
                                239                 :  */
                                240                 : static void
  446 rhaas                     241 GIC         136 : bbsink_copystream_end_archive(bbsink *sink)
                                242                 : {
                                243             136 :     bbsink_copystream *mysink = (bbsink_copystream *) sink;
                                244             136 :     bbsink_state *state = mysink->base.bbs_state;
  446 rhaas                     245 ECB             :     StringInfoData buf;
                                246                 : 
  446 rhaas                     247 CBC         136 :     mysink->bytes_done_at_last_time_check = state->bytes_done;
                                248             136 :     mysink->last_progress_report_time = GetCurrentTimestamp();
  446 rhaas                     249 GIC         136 :     pq_beginmessage(&buf, 'd'); /* CopyData */
                                250             136 :     pq_sendbyte(&buf, 'p');     /* Progress report */
  446 rhaas                     251 CBC         136 :     pq_sendint64(&buf, state->bytes_done);
                                252             136 :     pq_endmessage(&buf);
                                253             136 :     pq_flush_if_writable();
                                254             136 : }
  446 rhaas                     255 ECB             : 
                                256                 : /*
                                257                 :  * Send a CopyData message announcing the beginning of the backup manifest.
                                258                 :  */
                                259                 : static void
  446 rhaas                     260 GIC         119 : bbsink_copystream_begin_manifest(bbsink *sink)
                                261                 : {
                                262                 :     StringInfoData buf;
                                263                 : 
  446 rhaas                     264 CBC         119 :     pq_beginmessage(&buf, 'd'); /* CopyData */
  446 rhaas                     265 GIC         119 :     pq_sendbyte(&buf, 'm');     /* Manifest */
                                266             119 :     pq_endmessage(&buf);
                                267             119 : }
  446 rhaas                     268 ECB             : 
                                269                 : /*
                                270                 :  * Each chunk of manifest data is sent using a CopyData message.
                                271                 :  */
                                272                 : static void
  446 rhaas                     273 GIC         607 : bbsink_copystream_manifest_contents(bbsink *sink, size_t len)
                                274                 : {
                                275             607 :     bbsink_copystream *mysink = (bbsink_copystream *) sink;
                                276                 : 
  509 rhaas                     277 CBC         607 :     if (mysink->send_to_client)
                                278                 :     {
  509 rhaas                     279 ECB             :         /* Add one because we're also sending a leading type byte. */
  509 rhaas                     280 GIC         557 :         pq_putmessage('d', mysink->msgbuffer, len + 1);
  509 rhaas                     281 ECB             :     }
  446 rhaas                     282 GIC         607 : }
                                283                 : 
  446 rhaas                     284 ECB             : /*
                                285                 :  * We don't need an explicit terminator for the backup manifest.
                                286                 :  */
                                287                 : static void
  446 rhaas                     288 GIC         119 : bbsink_copystream_end_manifest(bbsink *sink)
                                289                 : {
                                290                 :     /* Do nothing. */
                                291             119 : }
  446 rhaas                     292 ECB             : 
                                293                 : /*
                                294                 :  * Send end-of-backup wire protocol messages.
                                295                 :  */
                                296                 : static void
  446 rhaas                     297 GIC         120 : bbsink_copystream_end_backup(bbsink *sink, XLogRecPtr endptr,
                                298                 :                              TimeLineID endtli)
                                299                 : {
                                300             120 :     SendCopyDone();
  446 rhaas                     301 CBC         120 :     SendXlogRecPtrResult(endptr, endtli);
  446 rhaas                     302 GIC         120 : }
                                303                 : 
  446 rhaas                     304 ECB             : /*
                                305                 :  * Cleanup.
                                306                 :  */
                                307                 : static void
  446 rhaas                     308 GIC         115 : bbsink_copystream_cleanup(bbsink *sink)
                                309                 : {
                                310                 :     /* Nothing to do. */
                                311             115 : }
  446 rhaas                     312 ECB             : 
                                313                 : /*
                                314                 :  * Send a CopyOutResponse message.
  520                           315                 :  */
                                316                 : static void
  520 rhaas                     317 GIC         126 : SendCopyOutResponse(void)
                                318                 : {
                                319                 :     StringInfoData buf;
                                320                 : 
  520 rhaas                     321 CBC         126 :     pq_beginmessage(&buf, 'H');
  520 rhaas                     322 GIC         126 :     pq_sendbyte(&buf, 0);       /* overall format */
                                323             126 :     pq_sendint16(&buf, 0);      /* natts */
                                324             126 :     pq_endmessage(&buf);
  520 rhaas                     325 CBC         126 : }
  520 rhaas                     326 ECB             : 
                                327                 : /*
                                328                 :  * Send a CopyDone message.
                                329                 :  */
                                330                 : static void
  520 rhaas                     331 GIC         120 : SendCopyDone(void)
                                332                 : {
                                333             120 :     pq_putemptymessage('c');
                                334             120 : }
  520 rhaas                     335 ECB             : 
                                336                 : /*
                                337                 :  * Send a single resultset containing just a single
                                338                 :  * XLogRecPtr record (in text format)
                                339                 :  */
                                340                 : static void
  520 rhaas                     341 GIC         246 : SendXlogRecPtrResult(XLogRecPtr ptr, TimeLineID tli)
                                342                 : {
                                343                 :     DestReceiver *dest;
                                344                 :     TupOutputState *tstate;
                                345                 :     TupleDesc   tupdesc;
                                346                 :     Datum       values[2];
  277 peter                     347 GNC         246 :     bool        nulls[2] = {0};
                                348                 : 
  279                           349             246 :     dest = CreateDestReceiver(DestRemoteSimple);
                                350                 : 
                                351             246 :     tupdesc = CreateTemplateTupleDesc(2);
                                352             246 :     TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 1, "recptr", TEXTOID, -1, 0);
                                353                 :     /*
  520 rhaas                     354 ECB             :      * int8 may seem like a surprising data type for this, but in theory int4
                                355                 :      * would not be wide enough for this, as TimeLineID is unsigned.
                                356                 :      */
  279 peter                     357 GNC         246 :     TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 2, "tli", INT8OID, -1, 0);
                                358                 : 
                                359                 :     /* send RowDescription */
  277                           360             246 :     tstate = begin_tup_output_tupdesc(dest, tupdesc, &TTSOpsVirtual);
                                361                 : 
                                362                 :     /* Data row */
                                363             246 :     values[0]= CStringGetTextDatum(psprintf("%X/%X", LSN_FORMAT_ARGS(ptr)));
                                364             246 :     values[1] = Int64GetDatum(tli);
                                365             246 :     do_tup_output(tstate, values, nulls);
                                366                 : 
                                367             246 :     end_tup_output(tstate);
                                368                 : 
  520 rhaas                     369 ECB             :     /* Send a CommandComplete message */
  520 rhaas                     370 GIC         246 :     pq_puttextmessage('C', "SELECT");
  520 rhaas                     371 CBC         246 : }
  520 rhaas                     372 ECB             : 
                                373                 : /*
                                374                 :  * Send a result set via libpq describing the tablespace list.
                                375                 :  */
                                376                 : static void
  520 rhaas                     377 CBC         126 : SendTablespaceList(List *tablespaces)
                                378                 : {
                                379                 :     DestReceiver *dest;
                                380                 :     TupOutputState *tstate;
                                381                 :     TupleDesc   tupdesc;
  520 rhaas                     382 ECB             :     ListCell   *lc;
                                383                 : 
  279 peter                     384 GNC         126 :     dest = CreateDestReceiver(DestRemoteSimple);
                                385                 : 
                                386             126 :     tupdesc = CreateTemplateTupleDesc(3);
                                387             126 :     TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 1, "spcoid", OIDOID, -1, 0);
                                388             126 :     TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 2, "spclocation", TEXTOID, -1, 0);
                                389             126 :     TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 3, "size", INT8OID, -1, 0);
                                390                 : 
                                391                 :     /* send RowDescription */
  277                           392             126 :     tstate = begin_tup_output_tupdesc(dest, tupdesc, &TTSOpsVirtual);
                                393                 : 
  279 peter                     394 ECB             :     /* Construct and send the directory information */
  520 rhaas                     395 GIC         268 :     foreach(lc, tablespaces)
                                396                 :     {
                                397             142 :         tablespaceinfo *ti = lfirst(lc);
                                398                 :         Datum       values[3];
  277 peter                     399 GNC         142 :         bool        nulls[3] = {0};
                                400                 : 
                                401                 :         /* Send one datarow message */
  520 rhaas                     402 GIC         142 :         if (ti->path == NULL)
                                403                 :         {
  277 peter                     404 GNC         126 :             nulls[0] = true;
                                405             126 :             nulls[1] = true;
                                406                 :         }
                                407                 :         else
                                408                 :         {
                                409              16 :             values[0] = ObjectIdGetDatum(strtoul(ti->oid, NULL, 10));
                                410              16 :             values[1] = CStringGetTextDatum(ti->path);
                                411                 :         }
  520 rhaas                     412 GIC         142 :         if (ti->size >= 0)
  277 peter                     413 GNC         142 :             values[2] = Int64GetDatum(ti->size / 1024);
                                414                 :         else
  277 peter                     415 UNC           0 :             nulls[2] = true;
                                416                 : 
  277 peter                     417 GNC         142 :         do_tup_output(tstate, values, nulls);
                                418                 :     }
                                419                 : 
                                420             126 :     end_tup_output(tstate);
  520 rhaas                     421 GIC         126 : }
        

Generated by: LCOV version v1.16-55-g56c0a2a