Age Owner TLA Line data Source code
1 : /*-------------------------------------------------------------------------
2 : *
3 : * basebackup_copy.c
4 : * send basebackup archives using COPY OUT
5 : *
6 : * We send a result set with information about the tabelspaces to be included
7 : * in the backup before starting COPY OUT. Then, we start a single COPY OUT
8 : * operation and transmits all the archives and the manifest if present during
9 : * the course of that single COPY OUT. Each CopyData message begins with a
10 : * type byte, allowing us to signal the start of a new archive, or the
11 : * manifest, by some means other than ending the COPY stream. This also allows
12 : * for future protocol extensions, since we can include arbitrary information
13 : * in the message stream as long as we're certain that the client will know
14 : * what to do with it.
15 : *
16 : * An older method that sent each archive using a separate COPY OUT
17 : * operation is no longer supported.
18 : *
19 : * Portions Copyright (c) 2010-2023, PostgreSQL Global Development Group
20 : *
21 : * IDENTIFICATION
22 : * src/backend/backup/basebackup_copy.c
23 : *
24 : *-------------------------------------------------------------------------
25 : */
26 : #include "postgres.h"
27 :
28 : #include "access/tupdesc.h"
29 : #include "backup/basebackup.h"
30 : #include "backup/basebackup_sink.h"
31 : #include "catalog/pg_type_d.h"
32 : #include "executor/executor.h"
33 : #include "libpq/libpq.h"
34 : #include "libpq/pqformat.h"
35 : #include "tcop/dest.h"
36 : #include "utils/builtins.h"
37 : #include "utils/timestamp.h"
38 :
39 : typedef struct bbsink_copystream
40 : {
41 : /* Common information for all types of sink. */
42 : bbsink base;
43 :
44 : /* Are we sending the archives to the client, or somewhere else? */
45 : bool send_to_client;
46 :
47 : /*
48 : * Protocol message buffer. We assemble CopyData protocol messages by
49 : * setting the first character of this buffer to 'd' (archive or manifest
50 : * data) and then making base.bbs_buffer point to the second character so
51 : * that the rest of the data gets copied into the message just where we
52 : * want it.
53 : */
54 : char *msgbuffer;
55 :
56 : /*
57 : * When did we last report progress to the client, and how much progress
58 : * did we report?
59 : */
60 : TimestampTz last_progress_report_time;
61 : uint64 bytes_done_at_last_time_check;
62 : } bbsink_copystream;
63 :
64 : /*
65 : * We don't want to send progress messages to the client excessively
66 : * frequently. Ideally, we'd like to send a message when the time since the
67 : * last message reaches PROGRESS_REPORT_MILLISECOND_THRESHOLD, but checking
68 : * the system time every time we send a tiny bit of data seems too expensive.
69 : * So we only check it after the number of bytes sine the last check reaches
70 : * PROGRESS_REPORT_BYTE_INTERVAL.
71 : */
72 : #define PROGRESS_REPORT_BYTE_INTERVAL 65536
73 : #define PROGRESS_REPORT_MILLISECOND_THRESHOLD 1000
74 :
75 : static void bbsink_copystream_begin_backup(bbsink *sink);
76 : static void bbsink_copystream_begin_archive(bbsink *sink,
77 : const char *archive_name);
78 : static void bbsink_copystream_archive_contents(bbsink *sink, size_t len);
79 : static void bbsink_copystream_end_archive(bbsink *sink);
80 : static void bbsink_copystream_begin_manifest(bbsink *sink);
81 : static void bbsink_copystream_manifest_contents(bbsink *sink, size_t len);
82 : static void bbsink_copystream_end_manifest(bbsink *sink);
83 : static void bbsink_copystream_end_backup(bbsink *sink, XLogRecPtr endptr,
84 : TimeLineID endtli);
85 : static void bbsink_copystream_cleanup(bbsink *sink);
86 :
87 : static void SendCopyOutResponse(void);
88 : static void SendCopyDone(void);
89 : static void SendXlogRecPtrResult(XLogRecPtr ptr, TimeLineID tli);
90 : static void SendTablespaceList(List *tablespaces);
91 :
92 : static const bbsink_ops bbsink_copystream_ops = {
93 : .begin_backup = bbsink_copystream_begin_backup,
94 : .begin_archive = bbsink_copystream_begin_archive,
95 : .archive_contents = bbsink_copystream_archive_contents,
96 : .end_archive = bbsink_copystream_end_archive,
97 : .begin_manifest = bbsink_copystream_begin_manifest,
98 : .manifest_contents = bbsink_copystream_manifest_contents,
99 : .end_manifest = bbsink_copystream_end_manifest,
100 : .end_backup = bbsink_copystream_end_backup,
101 : .cleanup = bbsink_copystream_cleanup
102 : };
103 :
104 : /*
105 : * Create a new 'copystream' bbsink.
106 : */
107 : bbsink *
509 rhaas 108 GIC 129 : bbsink_copystream_new(bool send_to_client)
109 : {
446 110 129 : bbsink_copystream *sink = palloc0(sizeof(bbsink_copystream));
446 rhaas 111 ECB :
446 rhaas 112 GIC 129 : *((const bbsink_ops **) &sink->base.bbs_ops) = &bbsink_copystream_ops;
509 rhaas 113 CBC 129 : sink->send_to_client = send_to_client;
114 :
446 rhaas 115 ECB : /* Set up for periodic progress reporting. */
446 rhaas 116 CBC 129 : sink->last_progress_report_time = GetCurrentTimestamp();
446 rhaas 117 GIC 129 : sink->bytes_done_at_last_time_check = UINT64CONST(0);
118 :
446 rhaas 119 CBC 129 : return &sink->base;
446 rhaas 120 ECB : }
121 :
122 : /*
123 : * Send start-of-backup wire protocol messages.
124 : */
125 : static void
446 rhaas 126 GIC 126 : bbsink_copystream_begin_backup(bbsink *sink)
127 : {
128 126 : bbsink_copystream *mysink = (bbsink_copystream *) sink;
446 rhaas 129 CBC 126 : bbsink_state *state = sink->bbs_state;
130 : char *buf;
446 rhaas 131 ECB :
132 : /*
133 : * Initialize buffer. We ultimately want to send the archive and manifest
134 : * data by means of CopyData messages where the payload portion of each
135 : * message begins with a type byte. However, basebackup.c expects the
136 : * buffer to be aligned, so we can't just allocate one extra byte for the
137 : * type byte. Instead, allocate enough extra bytes that the portion of the
138 : * buffer we reveal to our callers can be aligned, while leaving room to
139 : * slip the type byte in just beforehand. That will allow us to ship the
140 : * data with a single call to pq_putmessage and without needing any extra
141 : * copying.
142 : */
445 rhaas 143 GIC 126 : buf = palloc(mysink->base.bbs_buffer_length + MAXIMUM_ALIGNOF);
144 126 : mysink->msgbuffer = buf + (MAXIMUM_ALIGNOF - 1);
145 126 : mysink->base.bbs_buffer = buf + MAXIMUM_ALIGNOF;
446 rhaas 146 CBC 126 : mysink->msgbuffer[0] = 'd'; /* archive or manifest data */
446 rhaas 147 ECB :
148 : /* Tell client the backup start location. */
446 rhaas 149 CBC 126 : SendXlogRecPtrResult(state->startptr, state->starttli);
150 :
151 : /* Send client a list of tablespaces. */
152 126 : SendTablespaceList(state->tablespaces);
153 :
154 : /* Send a CommandComplete message */
155 126 : pq_puttextmessage('C', "SELECT");
156 :
157 : /* Begin COPY stream. This will be used for all archives + manifest. */
158 126 : SendCopyOutResponse();
446 rhaas 159 GIC 126 : }
160 :
446 rhaas 161 ECB : /*
162 : * Send a CopyData message announcing the beginning of a new archive.
163 : */
164 : static void
446 rhaas 165 GIC 142 : bbsink_copystream_begin_archive(bbsink *sink, const char *archive_name)
166 : {
167 142 : bbsink_state *state = sink->bbs_state;
446 rhaas 168 ECB : tablespaceinfo *ti;
169 : StringInfoData buf;
170 :
446 rhaas 171 GIC 142 : ti = list_nth(state->tablespaces, state->tablespace_num);
172 142 : pq_beginmessage(&buf, 'd'); /* CopyData */
173 142 : pq_sendbyte(&buf, 'n'); /* New archive */
446 rhaas 174 CBC 142 : pq_sendstring(&buf, archive_name);
175 142 : pq_sendstring(&buf, ti->path == NULL ? "" : ti->path);
176 142 : pq_endmessage(&buf);
177 142 : }
446 rhaas 178 ECB :
179 : /*
180 : * Send a CopyData message containing a chunk of archive content.
181 : */
182 : static void
446 rhaas 183 GIC 260249 : bbsink_copystream_archive_contents(bbsink *sink, size_t len)
184 : {
185 260249 : bbsink_copystream *mysink = (bbsink_copystream *) sink;
446 rhaas 186 CBC 260249 : bbsink_state *state = mysink->base.bbs_state;
187 : StringInfoData buf;
446 rhaas 188 ECB : uint64 targetbytes;
189 :
190 : /* Send the archive content to the client, if appropriate. */
509 rhaas 191 GIC 260249 : if (mysink->send_to_client)
192 : {
193 : /* Add one because we're also sending a leading type byte. */
509 rhaas 194 CBC 245165 : pq_putmessage('d', mysink->msgbuffer, len + 1);
195 : }
196 :
446 rhaas 197 ECB : /* Consider whether to send a progress report to the client. */
446 rhaas 198 GIC 260249 : targetbytes = mysink->bytes_done_at_last_time_check
199 : + PROGRESS_REPORT_BYTE_INTERVAL;
200 260249 : if (targetbytes <= state->bytes_done)
446 rhaas 201 ECB : {
446 rhaas 202 GIC 42673 : TimestampTz now = GetCurrentTimestamp();
446 rhaas 203 ECB : long ms;
204 :
205 : /*
206 : * OK, we've sent a decent number of bytes, so check the system time
207 : * to see whether we're due to send a progress report.
208 : */
446 rhaas 209 GIC 42673 : mysink->bytes_done_at_last_time_check = state->bytes_done;
210 42673 : ms = TimestampDifferenceMilliseconds(mysink->last_progress_report_time,
211 : now);
446 rhaas 212 ECB :
213 : /*
214 : * Send a progress report if enough time has passed. Also send one if
215 : * the system clock was set backward, so that such occurrences don't
216 : * have the effect of suppressing further progress messages.
217 : */
73 tgl 218 GNC 42673 : if (ms >= PROGRESS_REPORT_MILLISECOND_THRESHOLD ||
219 42651 : now < mysink->last_progress_report_time)
220 : {
446 rhaas 221 GIC 22 : mysink->last_progress_report_time = now;
446 rhaas 222 ECB :
446 rhaas 223 CBC 22 : pq_beginmessage(&buf, 'd'); /* CopyData */
446 rhaas 224 GIC 22 : pq_sendbyte(&buf, 'p'); /* Progress report */
446 rhaas 225 CBC 22 : pq_sendint64(&buf, state->bytes_done);
446 rhaas 226 GIC 22 : pq_endmessage(&buf);
446 rhaas 227 CBC 22 : pq_flush_if_writable();
446 rhaas 228 ECB : }
229 : }
446 rhaas 230 CBC 260249 : }
446 rhaas 231 ECB :
232 : /*
233 : * We don't need to explicitly signal the end of the archive; the client
234 : * will figure out that we've reached the end when we begin the next one,
235 : * or begin the manifest, or end the COPY stream. However, this seems like
236 : * a good time to force out a progress report. One reason for that is that
237 : * if this is the last archive, and we don't force a progress report now,
238 : * the client will never be told that we sent all the bytes.
239 : */
240 : static void
446 rhaas 241 GIC 136 : bbsink_copystream_end_archive(bbsink *sink)
242 : {
243 136 : bbsink_copystream *mysink = (bbsink_copystream *) sink;
244 136 : bbsink_state *state = mysink->base.bbs_state;
446 rhaas 245 ECB : StringInfoData buf;
246 :
446 rhaas 247 CBC 136 : mysink->bytes_done_at_last_time_check = state->bytes_done;
248 136 : mysink->last_progress_report_time = GetCurrentTimestamp();
446 rhaas 249 GIC 136 : pq_beginmessage(&buf, 'd'); /* CopyData */
250 136 : pq_sendbyte(&buf, 'p'); /* Progress report */
446 rhaas 251 CBC 136 : pq_sendint64(&buf, state->bytes_done);
252 136 : pq_endmessage(&buf);
253 136 : pq_flush_if_writable();
254 136 : }
446 rhaas 255 ECB :
256 : /*
257 : * Send a CopyData message announcing the beginning of the backup manifest.
258 : */
259 : static void
446 rhaas 260 GIC 119 : bbsink_copystream_begin_manifest(bbsink *sink)
261 : {
262 : StringInfoData buf;
263 :
446 rhaas 264 CBC 119 : pq_beginmessage(&buf, 'd'); /* CopyData */
446 rhaas 265 GIC 119 : pq_sendbyte(&buf, 'm'); /* Manifest */
266 119 : pq_endmessage(&buf);
267 119 : }
446 rhaas 268 ECB :
269 : /*
270 : * Each chunk of manifest data is sent using a CopyData message.
271 : */
272 : static void
446 rhaas 273 GIC 607 : bbsink_copystream_manifest_contents(bbsink *sink, size_t len)
274 : {
275 607 : bbsink_copystream *mysink = (bbsink_copystream *) sink;
276 :
509 rhaas 277 CBC 607 : if (mysink->send_to_client)
278 : {
509 rhaas 279 ECB : /* Add one because we're also sending a leading type byte. */
509 rhaas 280 GIC 557 : pq_putmessage('d', mysink->msgbuffer, len + 1);
509 rhaas 281 ECB : }
446 rhaas 282 GIC 607 : }
283 :
446 rhaas 284 ECB : /*
285 : * We don't need an explicit terminator for the backup manifest.
286 : */
287 : static void
446 rhaas 288 GIC 119 : bbsink_copystream_end_manifest(bbsink *sink)
289 : {
290 : /* Do nothing. */
291 119 : }
446 rhaas 292 ECB :
293 : /*
294 : * Send end-of-backup wire protocol messages.
295 : */
296 : static void
446 rhaas 297 GIC 120 : bbsink_copystream_end_backup(bbsink *sink, XLogRecPtr endptr,
298 : TimeLineID endtli)
299 : {
300 120 : SendCopyDone();
446 rhaas 301 CBC 120 : SendXlogRecPtrResult(endptr, endtli);
446 rhaas 302 GIC 120 : }
303 :
446 rhaas 304 ECB : /*
305 : * Cleanup.
306 : */
307 : static void
446 rhaas 308 GIC 115 : bbsink_copystream_cleanup(bbsink *sink)
309 : {
310 : /* Nothing to do. */
311 115 : }
446 rhaas 312 ECB :
313 : /*
314 : * Send a CopyOutResponse message.
520 315 : */
316 : static void
520 rhaas 317 GIC 126 : SendCopyOutResponse(void)
318 : {
319 : StringInfoData buf;
320 :
520 rhaas 321 CBC 126 : pq_beginmessage(&buf, 'H');
520 rhaas 322 GIC 126 : pq_sendbyte(&buf, 0); /* overall format */
323 126 : pq_sendint16(&buf, 0); /* natts */
324 126 : pq_endmessage(&buf);
520 rhaas 325 CBC 126 : }
520 rhaas 326 ECB :
327 : /*
328 : * Send a CopyDone message.
329 : */
330 : static void
520 rhaas 331 GIC 120 : SendCopyDone(void)
332 : {
333 120 : pq_putemptymessage('c');
334 120 : }
520 rhaas 335 ECB :
336 : /*
337 : * Send a single resultset containing just a single
338 : * XLogRecPtr record (in text format)
339 : */
340 : static void
520 rhaas 341 GIC 246 : SendXlogRecPtrResult(XLogRecPtr ptr, TimeLineID tli)
342 : {
343 : DestReceiver *dest;
344 : TupOutputState *tstate;
345 : TupleDesc tupdesc;
346 : Datum values[2];
277 peter 347 GNC 246 : bool nulls[2] = {0};
348 :
279 349 246 : dest = CreateDestReceiver(DestRemoteSimple);
350 :
351 246 : tupdesc = CreateTemplateTupleDesc(2);
352 246 : TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 1, "recptr", TEXTOID, -1, 0);
353 : /*
520 rhaas 354 ECB : * int8 may seem like a surprising data type for this, but in theory int4
355 : * would not be wide enough for this, as TimeLineID is unsigned.
356 : */
279 peter 357 GNC 246 : TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 2, "tli", INT8OID, -1, 0);
358 :
359 : /* send RowDescription */
277 360 246 : tstate = begin_tup_output_tupdesc(dest, tupdesc, &TTSOpsVirtual);
361 :
362 : /* Data row */
363 246 : values[0]= CStringGetTextDatum(psprintf("%X/%X", LSN_FORMAT_ARGS(ptr)));
364 246 : values[1] = Int64GetDatum(tli);
365 246 : do_tup_output(tstate, values, nulls);
366 :
367 246 : end_tup_output(tstate);
368 :
520 rhaas 369 ECB : /* Send a CommandComplete message */
520 rhaas 370 GIC 246 : pq_puttextmessage('C', "SELECT");
520 rhaas 371 CBC 246 : }
520 rhaas 372 ECB :
373 : /*
374 : * Send a result set via libpq describing the tablespace list.
375 : */
376 : static void
520 rhaas 377 CBC 126 : SendTablespaceList(List *tablespaces)
378 : {
379 : DestReceiver *dest;
380 : TupOutputState *tstate;
381 : TupleDesc tupdesc;
520 rhaas 382 ECB : ListCell *lc;
383 :
279 peter 384 GNC 126 : dest = CreateDestReceiver(DestRemoteSimple);
385 :
386 126 : tupdesc = CreateTemplateTupleDesc(3);
387 126 : TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 1, "spcoid", OIDOID, -1, 0);
388 126 : TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 2, "spclocation", TEXTOID, -1, 0);
389 126 : TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 3, "size", INT8OID, -1, 0);
390 :
391 : /* send RowDescription */
277 392 126 : tstate = begin_tup_output_tupdesc(dest, tupdesc, &TTSOpsVirtual);
393 :
279 peter 394 ECB : /* Construct and send the directory information */
520 rhaas 395 GIC 268 : foreach(lc, tablespaces)
396 : {
397 142 : tablespaceinfo *ti = lfirst(lc);
398 : Datum values[3];
277 peter 399 GNC 142 : bool nulls[3] = {0};
400 :
401 : /* Send one datarow message */
520 rhaas 402 GIC 142 : if (ti->path == NULL)
403 : {
277 peter 404 GNC 126 : nulls[0] = true;
405 126 : nulls[1] = true;
406 : }
407 : else
408 : {
409 16 : values[0] = ObjectIdGetDatum(strtoul(ti->oid, NULL, 10));
410 16 : values[1] = CStringGetTextDatum(ti->path);
411 : }
520 rhaas 412 GIC 142 : if (ti->size >= 0)
277 peter 413 GNC 142 : values[2] = Int64GetDatum(ti->size / 1024);
414 : else
277 peter 415 UNC 0 : nulls[2] = true;
416 :
277 peter 417 GNC 142 : do_tup_output(tstate, values, nulls);
418 : }
419 :
420 126 : end_tup_output(tstate);
520 rhaas 421 GIC 126 : }
|