Age Owner Branch data TLA Line data Source code
1 : : /*-------------------------------------------------------------------------
2 : : *
3 : : * basebackup_copy.c
4 : : * send basebackup archives using COPY OUT
5 : : *
6 : : * We send a result set with information about the tablespaces to be included
7 : : * in the backup before starting COPY OUT. Then, we start a single COPY OUT
8 : : * operation and transmits all the archives and the manifest if present during
9 : : * the course of that single COPY OUT. Each CopyData message begins with a
10 : : * type byte, allowing us to signal the start of a new archive, or the
11 : : * manifest, by some means other than ending the COPY stream. This also allows
12 : : * for future protocol extensions, since we can include arbitrary information
13 : : * in the message stream as long as we're certain that the client will know
14 : : * what to do with it.
15 : : *
16 : : * An older method that sent each archive using a separate COPY OUT
17 : : * operation is no longer supported.
18 : : *
19 : : * Portions Copyright (c) 2010-2024, PostgreSQL Global Development Group
20 : : *
21 : : * IDENTIFICATION
22 : : * src/backend/backup/basebackup_copy.c
23 : : *
24 : : *-------------------------------------------------------------------------
25 : : */
26 : : #include "postgres.h"
27 : :
28 : : #include "access/tupdesc.h"
29 : : #include "backup/basebackup.h"
30 : : #include "backup/basebackup_sink.h"
31 : : #include "catalog/pg_type_d.h"
32 : : #include "executor/executor.h"
33 : : #include "libpq/libpq.h"
34 : : #include "libpq/pqformat.h"
35 : : #include "tcop/dest.h"
36 : : #include "utils/builtins.h"
37 : : #include "utils/timestamp.h"
38 : :
39 : : typedef struct bbsink_copystream
40 : : {
41 : : /* Common information for all types of sink. */
42 : : bbsink base;
43 : :
44 : : /* Are we sending the archives to the client, or somewhere else? */
45 : : bool send_to_client;
46 : :
47 : : /*
48 : : * Protocol message buffer. We assemble CopyData protocol messages by
49 : : * setting the first character of this buffer to 'd' (archive or manifest
50 : : * data) and then making base.bbs_buffer point to the second character so
51 : : * that the rest of the data gets copied into the message just where we
52 : : * want it.
53 : : */
54 : : char *msgbuffer;
55 : :
56 : : /*
57 : : * When did we last report progress to the client, and how much progress
58 : : * did we report?
59 : : */
60 : : TimestampTz last_progress_report_time;
61 : : uint64 bytes_done_at_last_time_check;
62 : : } bbsink_copystream;
63 : :
64 : : /*
65 : : * We don't want to send progress messages to the client excessively
66 : : * frequently. Ideally, we'd like to send a message when the time since the
67 : : * last message reaches PROGRESS_REPORT_MILLISECOND_THRESHOLD, but checking
68 : : * the system time every time we send a tiny bit of data seems too expensive.
69 : : * So we only check it after the number of bytes sine the last check reaches
70 : : * PROGRESS_REPORT_BYTE_INTERVAL.
71 : : */
72 : : #define PROGRESS_REPORT_BYTE_INTERVAL 65536
73 : : #define PROGRESS_REPORT_MILLISECOND_THRESHOLD 1000
74 : :
75 : : static void bbsink_copystream_begin_backup(bbsink *sink);
76 : : static void bbsink_copystream_begin_archive(bbsink *sink,
77 : : const char *archive_name);
78 : : static void bbsink_copystream_archive_contents(bbsink *sink, size_t len);
79 : : static void bbsink_copystream_end_archive(bbsink *sink);
80 : : static void bbsink_copystream_begin_manifest(bbsink *sink);
81 : : static void bbsink_copystream_manifest_contents(bbsink *sink, size_t len);
82 : : static void bbsink_copystream_end_manifest(bbsink *sink);
83 : : static void bbsink_copystream_end_backup(bbsink *sink, XLogRecPtr endptr,
84 : : TimeLineID endtli);
85 : : static void bbsink_copystream_cleanup(bbsink *sink);
86 : :
87 : : static void SendCopyOutResponse(void);
88 : : static void SendCopyDone(void);
89 : : static void SendXlogRecPtrResult(XLogRecPtr ptr, TimeLineID tli);
90 : : static void SendTablespaceList(List *tablespaces);
91 : :
92 : : static const bbsink_ops bbsink_copystream_ops = {
93 : : .begin_backup = bbsink_copystream_begin_backup,
94 : : .begin_archive = bbsink_copystream_begin_archive,
95 : : .archive_contents = bbsink_copystream_archive_contents,
96 : : .end_archive = bbsink_copystream_end_archive,
97 : : .begin_manifest = bbsink_copystream_begin_manifest,
98 : : .manifest_contents = bbsink_copystream_manifest_contents,
99 : : .end_manifest = bbsink_copystream_end_manifest,
100 : : .end_backup = bbsink_copystream_end_backup,
101 : : .cleanup = bbsink_copystream_cleanup
102 : : };
103 : :
104 : : /*
105 : : * Create a new 'copystream' bbsink.
106 : : */
107 : : bbsink *
880 rhaas@postgresql.org 108 :CBC 152 : bbsink_copystream_new(bool send_to_client)
109 : : {
817 110 : 152 : bbsink_copystream *sink = palloc0(sizeof(bbsink_copystream));
111 : :
112 : 152 : *((const bbsink_ops **) &sink->base.bbs_ops) = &bbsink_copystream_ops;
880 113 : 152 : sink->send_to_client = send_to_client;
114 : :
115 : : /* Set up for periodic progress reporting. */
817 116 : 152 : sink->last_progress_report_time = GetCurrentTimestamp();
117 : 152 : sink->bytes_done_at_last_time_check = UINT64CONST(0);
118 : :
119 : 152 : return &sink->base;
120 : : }
121 : :
122 : : /*
123 : : * Send start-of-backup wire protocol messages.
124 : : */
125 : : static void
126 : 149 : bbsink_copystream_begin_backup(bbsink *sink)
127 : : {
128 : 149 : bbsink_copystream *mysink = (bbsink_copystream *) sink;
129 : 149 : bbsink_state *state = sink->bbs_state;
130 : : char *buf;
131 : :
132 : : /*
133 : : * Initialize buffer. We ultimately want to send the archive and manifest
134 : : * data by means of CopyData messages where the payload portion of each
135 : : * message begins with a type byte. However, basebackup.c expects the
136 : : * buffer to be aligned, so we can't just allocate one extra byte for the
137 : : * type byte. Instead, allocate enough extra bytes that the portion of the
138 : : * buffer we reveal to our callers can be aligned, while leaving room to
139 : : * slip the type byte in just beforehand. That will allow us to ship the
140 : : * data with a single call to pq_putmessage and without needing any extra
141 : : * copying.
142 : : */
816 143 : 149 : buf = palloc(mysink->base.bbs_buffer_length + MAXIMUM_ALIGNOF);
144 : 149 : mysink->msgbuffer = buf + (MAXIMUM_ALIGNOF - 1);
145 : 149 : mysink->base.bbs_buffer = buf + MAXIMUM_ALIGNOF;
817 146 : 149 : mysink->msgbuffer[0] = 'd'; /* archive or manifest data */
147 : :
148 : : /* Tell client the backup start location. */
149 : 149 : SendXlogRecPtrResult(state->startptr, state->starttli);
150 : :
151 : : /* Send client a list of tablespaces. */
152 : 149 : SendTablespaceList(state->tablespaces);
153 : :
154 : : /* Send a CommandComplete message */
236 nathan@postgresql.or 155 :GNC 149 : pq_puttextmessage(PqMsg_CommandComplete, "SELECT");
156 : :
157 : : /* Begin COPY stream. This will be used for all archives + manifest. */
817 rhaas@postgresql.org 158 :CBC 149 : SendCopyOutResponse();
159 : 149 : }
160 : :
161 : : /*
162 : : * Send a CopyData message announcing the beginning of a new archive.
163 : : */
164 : : static void
165 : 180 : bbsink_copystream_begin_archive(bbsink *sink, const char *archive_name)
166 : : {
167 : 180 : bbsink_state *state = sink->bbs_state;
168 : : tablespaceinfo *ti;
169 : : StringInfoData buf;
170 : :
171 : 180 : ti = list_nth(state->tablespaces, state->tablespace_num);
236 nathan@postgresql.or 172 :GNC 180 : pq_beginmessage(&buf, PqMsg_CopyData);
817 rhaas@postgresql.org 173 :CBC 180 : pq_sendbyte(&buf, 'n'); /* New archive */
174 : 180 : pq_sendstring(&buf, archive_name);
175 [ + + ]: 180 : pq_sendstring(&buf, ti->path == NULL ? "" : ti->path);
176 : 180 : pq_endmessage(&buf);
177 : 180 : }
178 : :
179 : : /*
180 : : * Send a CopyData message containing a chunk of archive content.
181 : : */
182 : : static void
183 : 317589 : bbsink_copystream_archive_contents(bbsink *sink, size_t len)
184 : : {
185 : 317589 : bbsink_copystream *mysink = (bbsink_copystream *) sink;
186 : 317589 : bbsink_state *state = mysink->base.bbs_state;
187 : : StringInfoData buf;
188 : : uint64 targetbytes;
189 : :
190 : : /* Send the archive content to the client, if appropriate. */
880 191 [ + + ]: 317589 : if (mysink->send_to_client)
192 : : {
193 : : /* Add one because we're also sending a leading type byte. */
194 : 302554 : pq_putmessage('d', mysink->msgbuffer, len + 1);
195 : : }
196 : :
197 : : /* Consider whether to send a progress report to the client. */
817 198 : 317589 : targetbytes = mysink->bytes_done_at_last_time_check
199 : : + PROGRESS_REPORT_BYTE_INTERVAL;
200 [ + + ]: 317589 : if (targetbytes <= state->bytes_done)
201 : : {
202 : 49681 : TimestampTz now = GetCurrentTimestamp();
203 : : long ms;
204 : :
205 : : /*
206 : : * OK, we've sent a decent number of bytes, so check the system time
207 : : * to see whether we're due to send a progress report.
208 : : */
209 : 49681 : mysink->bytes_done_at_last_time_check = state->bytes_done;
210 : 49681 : ms = TimestampDifferenceMilliseconds(mysink->last_progress_report_time,
211 : : now);
212 : :
213 : : /*
214 : : * Send a progress report if enough time has passed. Also send one if
215 : : * the system clock was set backward, so that such occurrences don't
216 : : * have the effect of suppressing further progress messages.
217 : : */
444 tgl@sss.pgh.pa.us 218 [ + + ]: 49681 : if (ms >= PROGRESS_REPORT_MILLISECOND_THRESHOLD ||
219 [ - + ]: 49622 : now < mysink->last_progress_report_time)
220 : : {
817 rhaas@postgresql.org 221 : 59 : mysink->last_progress_report_time = now;
222 : :
236 nathan@postgresql.or 223 :GNC 59 : pq_beginmessage(&buf, PqMsg_CopyData);
817 rhaas@postgresql.org 224 :CBC 59 : pq_sendbyte(&buf, 'p'); /* Progress report */
225 : 59 : pq_sendint64(&buf, state->bytes_done);
226 : 59 : pq_endmessage(&buf);
227 : 59 : pq_flush_if_writable();
228 : : }
229 : : }
230 : 317589 : }
231 : :
232 : : /*
233 : : * We don't need to explicitly signal the end of the archive; the client
234 : : * will figure out that we've reached the end when we begin the next one,
235 : : * or begin the manifest, or end the COPY stream. However, this seems like
236 : : * a good time to force out a progress report. One reason for that is that
237 : : * if this is the last archive, and we don't force a progress report now,
238 : : * the client will never be told that we sent all the bytes.
239 : : */
240 : : static void
241 : 174 : bbsink_copystream_end_archive(bbsink *sink)
242 : : {
243 : 174 : bbsink_copystream *mysink = (bbsink_copystream *) sink;
244 : 174 : bbsink_state *state = mysink->base.bbs_state;
245 : : StringInfoData buf;
246 : :
247 : 174 : mysink->bytes_done_at_last_time_check = state->bytes_done;
248 : 174 : mysink->last_progress_report_time = GetCurrentTimestamp();
236 nathan@postgresql.or 249 :GNC 174 : pq_beginmessage(&buf, PqMsg_CopyData);
817 rhaas@postgresql.org 250 :CBC 174 : pq_sendbyte(&buf, 'p'); /* Progress report */
251 : 174 : pq_sendint64(&buf, state->bytes_done);
252 : 174 : pq_endmessage(&buf);
253 : 174 : pq_flush_if_writable();
254 : 174 : }
255 : :
256 : : /*
257 : : * Send a CopyData message announcing the beginning of the backup manifest.
258 : : */
259 : : static void
260 : 142 : bbsink_copystream_begin_manifest(bbsink *sink)
261 : : {
262 : : StringInfoData buf;
263 : :
236 nathan@postgresql.or 264 :GNC 142 : pq_beginmessage(&buf, PqMsg_CopyData);
817 rhaas@postgresql.org 265 :CBC 142 : pq_sendbyte(&buf, 'm'); /* Manifest */
266 : 142 : pq_endmessage(&buf);
267 : 142 : }
268 : :
269 : : /*
270 : : * Each chunk of manifest data is sent using a CopyData message.
271 : : */
272 : : static void
273 : 731 : bbsink_copystream_manifest_contents(bbsink *sink, size_t len)
274 : : {
275 : 731 : bbsink_copystream *mysink = (bbsink_copystream *) sink;
276 : :
880 277 [ + + ]: 731 : if (mysink->send_to_client)
278 : : {
279 : : /* Add one because we're also sending a leading type byte. */
280 : 681 : pq_putmessage('d', mysink->msgbuffer, len + 1);
281 : : }
817 282 : 731 : }
283 : :
284 : : /*
285 : : * We don't need an explicit terminator for the backup manifest.
286 : : */
287 : : static void
288 : 142 : bbsink_copystream_end_manifest(bbsink *sink)
289 : : {
290 : : /* Do nothing. */
291 : 142 : }
292 : :
293 : : /*
294 : : * Send end-of-backup wire protocol messages.
295 : : */
296 : : static void
297 : 143 : bbsink_copystream_end_backup(bbsink *sink, XLogRecPtr endptr,
298 : : TimeLineID endtli)
299 : : {
300 : 143 : SendCopyDone();
301 : 143 : SendXlogRecPtrResult(endptr, endtli);
302 : 143 : }
303 : :
304 : : /*
305 : : * Cleanup.
306 : : */
307 : : static void
308 : 138 : bbsink_copystream_cleanup(bbsink *sink)
309 : : {
310 : : /* Nothing to do. */
311 : 138 : }
312 : :
313 : : /*
314 : : * Send a CopyOutResponse message.
315 : : */
316 : : static void
891 317 : 149 : SendCopyOutResponse(void)
318 : : {
319 : : StringInfoData buf;
320 : :
236 nathan@postgresql.or 321 :GNC 149 : pq_beginmessage(&buf, PqMsg_CopyOutResponse);
891 rhaas@postgresql.org 322 :CBC 149 : pq_sendbyte(&buf, 0); /* overall format */
323 : 149 : pq_sendint16(&buf, 0); /* natts */
324 : 149 : pq_endmessage(&buf);
325 : 149 : }
326 : :
327 : : /*
328 : : * Send a CopyDone message.
329 : : */
330 : : static void
331 : 143 : SendCopyDone(void)
332 : : {
236 nathan@postgresql.or 333 :GNC 143 : pq_putemptymessage(PqMsg_CopyDone);
891 rhaas@postgresql.org 334 :CBC 143 : }
335 : :
336 : : /*
337 : : * Send a single resultset containing just a single
338 : : * XLogRecPtr record (in text format)
339 : : */
340 : : static void
341 : 292 : SendXlogRecPtrResult(XLogRecPtr ptr, TimeLineID tli)
342 : : {
343 : : DestReceiver *dest;
344 : : TupOutputState *tstate;
345 : : TupleDesc tupdesc;
346 : : Datum values[2];
648 peter@eisentraut.org 347 : 292 : bool nulls[2] = {0};
348 : :
650 349 : 292 : dest = CreateDestReceiver(DestRemoteSimple);
350 : :
351 : 292 : tupdesc = CreateTemplateTupleDesc(2);
352 : 292 : TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 1, "recptr", TEXTOID, -1, 0);
353 : :
354 : : /*
355 : : * int8 may seem like a surprising data type for this, but in theory int4
356 : : * would not be wide enough for this, as TimeLineID is unsigned.
357 : : */
358 : 292 : TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 2, "tli", INT8OID, -1, 0);
359 : :
360 : : /* send RowDescription */
648 361 : 292 : tstate = begin_tup_output_tupdesc(dest, tupdesc, &TTSOpsVirtual);
362 : :
363 : : /* Data row */
331 tgl@sss.pgh.pa.us 364 : 292 : values[0] = CStringGetTextDatum(psprintf("%X/%X", LSN_FORMAT_ARGS(ptr)));
648 peter@eisentraut.org 365 : 292 : values[1] = Int64GetDatum(tli);
366 : 292 : do_tup_output(tstate, values, nulls);
367 : :
368 : 292 : end_tup_output(tstate);
369 : :
370 : : /* Send a CommandComplete message */
236 nathan@postgresql.or 371 :GNC 292 : pq_puttextmessage(PqMsg_CommandComplete, "SELECT");
891 rhaas@postgresql.org 372 :CBC 292 : }
373 : :
374 : : /*
375 : : * Send a result set via libpq describing the tablespace list.
376 : : */
377 : : static void
378 : 149 : SendTablespaceList(List *tablespaces)
379 : : {
380 : : DestReceiver *dest;
381 : : TupOutputState *tstate;
382 : : TupleDesc tupdesc;
383 : : ListCell *lc;
384 : :
650 peter@eisentraut.org 385 : 149 : dest = CreateDestReceiver(DestRemoteSimple);
386 : :
387 : 149 : tupdesc = CreateTemplateTupleDesc(3);
388 : 149 : TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 1, "spcoid", OIDOID, -1, 0);
389 : 149 : TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 2, "spclocation", TEXTOID, -1, 0);
390 : 149 : TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 3, "size", INT8OID, -1, 0);
391 : :
392 : : /* send RowDescription */
648 393 : 149 : tstate = begin_tup_output_tupdesc(dest, tupdesc, &TTSOpsVirtual);
394 : :
395 : : /* Construct and send the directory information */
891 rhaas@postgresql.org 396 [ + - + + : 329 : foreach(lc, tablespaces)
+ + ]
397 : : {
398 : 180 : tablespaceinfo *ti = lfirst(lc);
399 : : Datum values[3];
648 peter@eisentraut.org 400 : 180 : bool nulls[3] = {0};
401 : :
402 : : /* Send one datarow message */
891 rhaas@postgresql.org 403 [ + + ]: 180 : if (ti->path == NULL)
404 : : {
648 peter@eisentraut.org 405 : 149 : nulls[0] = true;
406 : 149 : nulls[1] = true;
407 : : }
408 : : else
409 : : {
174 rhaas@postgresql.org 410 :GNC 31 : values[0] = ObjectIdGetDatum(ti->oid);
648 peter@eisentraut.org 411 :CBC 31 : values[1] = CStringGetTextDatum(ti->path);
412 : : }
891 rhaas@postgresql.org 413 [ + - ]: 180 : if (ti->size >= 0)
648 peter@eisentraut.org 414 : 180 : values[2] = Int64GetDatum(ti->size / 1024);
415 : : else
648 peter@eisentraut.org 416 :UBC 0 : nulls[2] = true;
417 : :
648 peter@eisentraut.org 418 :CBC 180 : do_tup_output(tstate, values, nulls);
419 : : }
420 : :
421 : 149 : end_tup_output(tstate);
891 rhaas@postgresql.org 422 : 149 : }
|