Age Owner TLA Line data Source code
1 : /*-------------------------------------------------------------------------
2 : *
3 : * pg_backup_custom.c
4 : *
5 : * Implements the custom output format.
6 : *
7 : * The comments with the routines in this code are a good place to
8 : * understand how to write a new format.
9 : *
10 : * See the headers to pg_restore for more details.
11 : *
12 : * Copyright (c) 2000, Philip Warner
13 : * Rights are granted to use this software in any way so long
14 : * as this notice is not removed.
15 : *
16 : * The author is not responsible for loss or damages that may
17 : * and any liability will be limited to the time taken to fix any
18 : * related bug.
19 : *
20 : *
21 : * IDENTIFICATION
22 : * src/bin/pg_dump/pg_backup_custom.c
23 : *
24 : *-------------------------------------------------------------------------
25 : */
26 : #include "postgres_fe.h"
27 :
28 : #include "common/file_utils.h"
29 : #include "compress_io.h"
30 : #include "parallel.h"
31 : #include "pg_backup_utils.h"
32 :
33 : /*--------
34 : * Routines in the format interface
35 : *--------
36 : */
37 :
38 : static void _ArchiveEntry(ArchiveHandle *AH, TocEntry *te);
39 : static void _StartData(ArchiveHandle *AH, TocEntry *te);
40 : static void _WriteData(ArchiveHandle *AH, const void *data, size_t dLen);
41 : static void _EndData(ArchiveHandle *AH, TocEntry *te);
42 : static int _WriteByte(ArchiveHandle *AH, const int i);
43 : static int _ReadByte(ArchiveHandle *AH);
44 : static void _WriteBuf(ArchiveHandle *AH, const void *buf, size_t len);
45 : static void _ReadBuf(ArchiveHandle *AH, void *buf, size_t len);
46 : static void _CloseArchive(ArchiveHandle *AH);
47 : static void _ReopenArchive(ArchiveHandle *AH);
48 : static void _PrintTocData(ArchiveHandle *AH, TocEntry *te);
49 : static void _WriteExtraToc(ArchiveHandle *AH, TocEntry *te);
50 : static void _ReadExtraToc(ArchiveHandle *AH, TocEntry *te);
51 : static void _PrintExtraToc(ArchiveHandle *AH, TocEntry *te);
52 :
53 : static void _PrintData(ArchiveHandle *AH);
54 : static void _skipData(ArchiveHandle *AH);
55 : static void _skipLOs(ArchiveHandle *AH);
56 :
57 : static void _StartLOs(ArchiveHandle *AH, TocEntry *te);
58 : static void _StartLO(ArchiveHandle *AH, TocEntry *te, Oid oid);
59 : static void _EndLO(ArchiveHandle *AH, TocEntry *te, Oid oid);
60 : static void _EndLOs(ArchiveHandle *AH, TocEntry *te);
61 : static void _LoadLOs(ArchiveHandle *AH, bool drop);
62 :
63 : static void _PrepParallelRestore(ArchiveHandle *AH);
64 : static void _Clone(ArchiveHandle *AH);
65 : static void _DeClone(ArchiveHandle *AH);
66 :
67 : static int _WorkerJobRestoreCustom(ArchiveHandle *AH, TocEntry *te);
68 :
69 : typedef struct
70 : {
71 : CompressorState *cs;
72 : int hasSeek;
73 : /* lastFilePos is used only when reading, and may be invalid if !hasSeek */
74 : pgoff_t lastFilePos; /* position after last data block we've read */
75 : } lclContext;
76 :
77 : typedef struct
78 : {
79 : int dataState;
80 : pgoff_t dataPos; /* valid only if dataState=K_OFFSET_POS_SET */
81 : } lclTocEntry;
82 :
83 :
84 : /*------
85 : * Static declarations
86 : *------
87 : */
88 : static void _readBlockHeader(ArchiveHandle *AH, int *type, int *id);
89 : static pgoff_t _getFilePos(ArchiveHandle *AH, lclContext *ctx);
90 :
91 : static void _CustomWriteFunc(ArchiveHandle *AH, const char *buf, size_t len);
92 : static size_t _CustomReadFunc(ArchiveHandle *AH, char **buf, size_t *buflen);
93 :
94 :
95 : /*
96 : * Init routine required by ALL formats. This is a global routine
97 : * and should be declared in pg_backup_archiver.h
98 : *
99 : * It's task is to create any extra archive context (using AH->formatData),
100 : * and to initialize the supported function pointers.
101 : *
102 : * It should also prepare whatever its input source is for reading/writing,
103 : * and in the case of a read mode connection, it should load the Header & TOC.
104 : */
105 : void
8053 bruce 106 CBC 28 : InitArchiveFmt_Custom(ArchiveHandle *AH)
107 : {
108 : lclContext *ctx;
109 :
110 : /* Assuming static functions, this can be copied for each format. */
111 28 : AH->ArchiveEntryPtr = _ArchiveEntry;
112 28 : AH->StartDataPtr = _StartData;
113 28 : AH->WriteDataPtr = _WriteData;
114 28 : AH->EndDataPtr = _EndData;
115 28 : AH->WriteBytePtr = _WriteByte;
116 28 : AH->ReadBytePtr = _ReadByte;
117 28 : AH->WriteBufPtr = _WriteBuf;
118 28 : AH->ReadBufPtr = _ReadBuf;
119 28 : AH->ClosePtr = _CloseArchive;
5179 andrew 120 28 : AH->ReopenPtr = _ReopenArchive;
8053 bruce 121 28 : AH->PrintTocDataPtr = _PrintTocData;
122 28 : AH->ReadExtraTocPtr = _ReadExtraToc;
123 28 : AH->WriteExtraTocPtr = _WriteExtraToc;
124 28 : AH->PrintExtraTocPtr = _PrintExtraToc;
125 :
125 peter 126 GNC 28 : AH->StartLOsPtr = _StartLOs;
127 28 : AH->StartLOPtr = _StartLO;
128 28 : AH->EndLOPtr = _EndLO;
129 28 : AH->EndLOsPtr = _EndLOs;
130 :
1668 tgl 131 CBC 28 : AH->PrepParallelRestorePtr = _PrepParallelRestore;
5050 bruce 132 28 : AH->ClonePtr = _Clone;
133 28 : AH->DeClonePtr = _DeClone;
134 :
135 : /* no parallel dump in the custom archive, only parallel restore */
3668 andrew 136 28 : AH->WorkerJobDumpPtr = NULL;
137 28 : AH->WorkerJobRestorePtr = _WorkerJobRestoreCustom;
138 :
139 : /* Set up a private area. */
3841 tgl 140 28 : ctx = (lclContext *) pg_malloc0(sizeof(lclContext));
8053 bruce 141 28 : AH->formatData = (void *) ctx;
142 :
143 : /* Initialize LO buffering */
7655 144 28 : AH->lo_buf_size = LOBBUFSIZE;
4153 145 28 : AH->lo_buf = (void *) pg_malloc(LOBBUFSIZE);
146 :
147 : /*
148 : * Now open the file
149 : */
8053 150 28 : if (AH->mode == archModeWrite)
151 : {
152 12 : if (AH->fSpec && strcmp(AH->fSpec, "") != 0)
153 : {
8297 pjw 154 12 : AH->FH = fopen(AH->fSpec, PG_BINARY_W);
5642 tgl 155 12 : if (!AH->FH)
366 tgl 156 UBC 0 : pg_fatal("could not open output file \"%s\": %m", AH->fSpec);
157 : }
158 : else
159 : {
8297 pjw 160 0 : AH->FH = stdout;
5642 tgl 161 0 : if (!AH->FH)
366 162 0 : pg_fatal("could not open output file: %m");
163 : }
164 :
7471 bruce 165 CBC 12 : ctx->hasSeek = checkSeek(AH->FH);
166 : }
167 : else
168 : {
8053 169 16 : if (AH->fSpec && strcmp(AH->fSpec, "") != 0)
170 : {
8297 pjw 171 16 : AH->FH = fopen(AH->fSpec, PG_BINARY_R);
5642 tgl 172 16 : if (!AH->FH)
366 tgl 173 UBC 0 : pg_fatal("could not open input file \"%s\": %m", AH->fSpec);
174 : }
175 : else
176 : {
8297 pjw 177 0 : AH->FH = stdin;
5642 tgl 178 0 : if (!AH->FH)
366 179 0 : pg_fatal("could not open input file: %m");
180 : }
181 :
7471 bruce 182 CBC 16 : ctx->hasSeek = checkSeek(AH->FH);
183 :
8297 pjw 184 16 : ReadHead(AH);
185 16 : ReadToc(AH);
186 :
187 : /*
188 : * Remember location of first data block (i.e., the point after TOC)
189 : * in case we have to search for desired data blocks.
190 : */
996 tgl 191 16 : ctx->lastFilePos = _getFilePos(AH, ctx);
192 : }
8297 pjw 193 28 : }
194 :
195 : /*
196 : * Called by the Archiver when the dumper creates a new TOC entry.
197 : *
198 : * Optional.
199 : *
200 : * Set up extract format-related TOC data.
201 : */
202 : static void
8053 bruce 203 3169 : _ArchiveEntry(ArchiveHandle *AH, TocEntry *te)
204 : {
205 : lclTocEntry *ctx;
206 :
3841 tgl 207 3169 : ctx = (lclTocEntry *) pg_malloc0(sizeof(lclTocEntry));
8053 bruce 208 3169 : if (te->dataDumper)
7474 209 105 : ctx->dataState = K_OFFSET_POS_NOT_SET;
210 : else
211 3064 : ctx->dataState = K_OFFSET_NO_DATA;
212 :
213 3169 : te->formatData = (void *) ctx;
8297 pjw 214 3169 : }
215 :
216 : /*
217 : * Called by the Archiver to save any extra format-related TOC entry
218 : * data.
219 : *
220 : * Optional.
221 : *
222 : * Use the Archiver routines to write data - they are non-endian, and
223 : * maintain other important file information.
224 : */
225 : static void
8053 bruce 226 6334 : _WriteExtraToc(ArchiveHandle *AH, TocEntry *te)
227 : {
228 6334 : lclTocEntry *ctx = (lclTocEntry *) te->formatData;
229 :
7474 230 6334 : WriteOffset(AH, ctx->dataPos, ctx->dataState);
8297 pjw 231 6334 : }
232 :
233 : /*
234 : * Called by the Archiver to read any extra format-related TOC data.
235 : *
236 : * Optional.
237 : *
238 : * Needs to match the order defined in _WriteExtraToc, and should also
239 : * use the Archiver input routines.
240 : */
241 : static void
8053 bruce 242 4211 : _ReadExtraToc(ArchiveHandle *AH, TocEntry *te)
243 : {
244 4211 : lclTocEntry *ctx = (lclTocEntry *) te->formatData;
245 :
246 4211 : if (ctx == NULL)
247 : {
3841 tgl 248 4211 : ctx = (lclTocEntry *) pg_malloc0(sizeof(lclTocEntry));
8053 bruce 249 4211 : te->formatData = (void *) ctx;
250 : }
251 :
7188 252 4211 : ctx->dataState = ReadOffset(AH, &(ctx->dataPos));
253 :
254 : /*
255 : * Prior to V1.7 (pg7.3), we dumped the data size as an int now we don't
256 : * dump it at all.
257 : */
7474 258 4211 : if (AH->version < K_VERS_1_7)
4381 peter_e 259 UBC 0 : ReadInt(AH);
8297 pjw 260 CBC 4211 : }
261 :
262 : /*
263 : * Called by the Archiver when restoring an archive to output a comment
264 : * that includes useful information about the TOC entry.
265 : *
266 : * Optional.
267 : */
268 : static void
8053 bruce 269 1266 : _PrintExtraToc(ArchiveHandle *AH, TocEntry *te)
270 : {
271 1266 : lclTocEntry *ctx = (lclTocEntry *) te->formatData;
272 :
6976 tgl 273 1266 : if (AH->public.verbose)
274 226 : ahprintf(AH, "-- Data Pos: " INT64_FORMAT "\n",
275 226 : (int64) ctx->dataPos);
8297 pjw 276 1266 : }
277 :
278 : /*
279 : * Called by the archiver when saving TABLE DATA (not schema). This routine
280 : * should save whatever format-specific information is needed to read
281 : * the archive back.
282 : *
283 : * It is called just prior to the dumper's 'DataDumper' routine being called.
284 : *
285 : * Optional, but strongly recommended.
286 : *
287 : */
288 : static void
8053 bruce 289 99 : _StartData(ArchiveHandle *AH, TocEntry *te)
290 : {
291 99 : lclContext *ctx = (lclContext *) AH->formatData;
292 99 : lclTocEntry *tctx = (lclTocEntry *) te->formatData;
293 :
294 99 : tctx->dataPos = _getFilePos(AH, ctx);
996 tgl 295 99 : if (tctx->dataPos >= 0)
296 99 : tctx->dataState = K_OFFSET_POS_SET;
297 :
8053 bruce 298 99 : _WriteByte(AH, BLK_DATA); /* Block type */
7064 tgl 299 99 : WriteInt(AH, te->dumpId); /* For sanity check */
300 :
45 tomas.vondra 301 GNC 99 : ctx->cs = AllocateCompressor(AH->compression_spec,
302 : NULL,
303 : _CustomWriteFunc);
8297 pjw 304 GIC 99 : }
305 :
8297 pjw 306 ECB : /*
307 : * Called by archiver when dumper calls WriteData. This routine is
308 : * called for both LO and table data; it is the responsibility of
309 : * the format to manage each kind of data using StartLO/StartData.
310 : *
311 : * It should only be called from within a DataDumper routine.
312 : *
313 : * Mandatory.
314 : */
315 : static void
7537 peter_e 316 GIC 227 : _WriteData(ArchiveHandle *AH, const void *data, size_t dLen)
317 : {
8053 bruce 318 CBC 227 : lclContext *ctx = (lclContext *) AH->formatData;
4382 bruce 319 GIC 227 : CompressorState *cs = ctx->cs;
8297 pjw 320 ECB :
3261 bruce 321 CBC 227 : if (dLen > 0)
322 : /* writeData() internally throws write errors */
45 tomas.vondra 323 GNC 219 : cs->writeData(AH, cs, data, dLen);
8297 pjw 324 GIC 227 : }
8297 pjw 325 ECB :
326 : /*
327 : * Called by the archiver when a dumper's 'DataDumper' routine has
328 : * finished.
329 : *
330 : * Mandatory.
331 : */
332 : static void
8053 bruce 333 GIC 99 : _EndData(ArchiveHandle *AH, TocEntry *te)
334 : {
4511 heikki.linnakangas 335 CBC 99 : lclContext *ctx = (lclContext *) AH->formatData;
336 :
337 99 : EndCompressor(AH, ctx->cs);
45 tomas.vondra 338 GNC 99 : ctx->cs = NULL;
339 :
340 : /* Send the end marker */
4511 heikki.linnakangas 341 CBC 99 : WriteInt(AH, 0);
8297 pjw 342 99 : }
343 :
344 : /*
8053 bruce 345 ECB : * Called by the archiver when starting to save all BLOB DATA (not schema).
8297 pjw 346 : * This routine should save whatever format-specific information is needed
347 : * to read the LOs back into memory.
348 : *
349 : * It is called just prior to the dumper's DataDumper routine.
350 : *
351 : * Optional, but strongly recommended.
352 : */
353 : static void
125 peter 354 GNC 4 : _StartLOs(ArchiveHandle *AH, TocEntry *te)
355 : {
8053 bruce 356 GIC 4 : lclContext *ctx = (lclContext *) AH->formatData;
357 4 : lclTocEntry *tctx = (lclTocEntry *) te->formatData;
8297 pjw 358 ECB :
8053 bruce 359 GIC 4 : tctx->dataPos = _getFilePos(AH, ctx);
996 tgl 360 CBC 4 : if (tctx->dataPos >= 0)
361 4 : tctx->dataState = K_OFFSET_POS_SET;
362 :
8053 bruce 363 4 : _WriteByte(AH, BLK_BLOBS); /* Block type */
7064 tgl 364 4 : WriteInt(AH, te->dumpId); /* For sanity check */
8297 pjw 365 4 : }
366 :
8297 pjw 367 ECB : /*
368 : * Called by the archiver when the dumper calls StartLO.
369 : *
370 : * Mandatory.
371 : *
372 : * Must save the passed OID for retrieval at restore-time.
373 : */
374 : static void
125 peter 375 GNC 8 : _StartLO(ArchiveHandle *AH, TocEntry *te, Oid oid)
376 : {
4511 heikki.linnakangas 377 GIC 8 : lclContext *ctx = (lclContext *) AH->formatData;
378 :
8053 bruce 379 CBC 8 : if (oid == 0)
366 tgl 380 UIC 0 : pg_fatal("invalid OID for large object");
8297 pjw 381 ECB :
8053 bruce 382 GIC 8 : WriteInt(AH, oid);
4511 heikki.linnakangas 383 ECB :
45 tomas.vondra 384 GNC 8 : ctx->cs = AllocateCompressor(AH->compression_spec,
385 : NULL,
386 : _CustomWriteFunc);
8297 pjw 387 GIC 8 : }
8297 pjw 388 ECB :
389 : /*
390 : * Called by the archiver when the dumper calls EndLO.
391 : *
392 : * Optional.
393 : */
394 : static void
125 peter 395 GNC 8 : _EndLO(ArchiveHandle *AH, TocEntry *te, Oid oid)
396 : {
4511 heikki.linnakangas 397 GIC 8 : lclContext *ctx = (lclContext *) AH->formatData;
398 :
399 8 : EndCompressor(AH, ctx->cs);
400 : /* Send the end marker */
4511 heikki.linnakangas 401 CBC 8 : WriteInt(AH, 0);
8297 pjw 402 GIC 8 : }
8297 pjw 403 ECB :
404 : /*
8053 bruce 405 : * Called by the archiver when finishing saving all BLOB DATA.
406 : *
8297 pjw 407 : * Optional.
408 : */
409 : static void
125 peter 410 GNC 4 : _EndLOs(ArchiveHandle *AH, TocEntry *te)
411 : {
412 : /* Write out a fake zero OID to mark end-of-LOs. */
8053 bruce 413 GIC 4 : WriteInt(AH, 0);
8297 pjw 414 4 : }
415 :
8297 pjw 416 ECB : /*
417 : * Print data for a given TOC entry
418 : */
8053 bruce 419 : static void
2643 tgl 420 CBC 103 : _PrintTocData(ArchiveHandle *AH, TocEntry *te)
421 : {
8053 bruce 422 GIC 103 : lclContext *ctx = (lclContext *) AH->formatData;
423 103 : lclTocEntry *tctx = (lclTocEntry *) te->formatData;
424 : int blkType;
425 : int id;
8297 pjw 426 ECB :
7474 bruce 427 GIC 103 : if (tctx->dataState == K_OFFSET_NO_DATA)
8297 pjw 428 LBC 0 : return;
8297 pjw 429 ECB :
7474 bruce 430 GIC 103 : if (!ctx->hasSeek || tctx->dataState == K_OFFSET_POS_NOT_SET)
431 : {
432 : /*
4660 bruce 433 ECB : * We cannot seek directly to the desired block. Instead, skip over
996 tgl 434 EUB : * block headers until we find the one we want. Remember the
435 : * positions of skipped-over blocks, so that if we later decide we
996 tgl 436 ECB : * need to read one, we'll be able to seek to it.
437 : *
438 : * When our input file is seekable, we can do the search starting from
439 : * the point after the last data block we scanned in previous
440 : * iterations of this function.
441 : */
996 tgl 442 UIC 0 : if (ctx->hasSeek)
443 : {
444 0 : if (fseeko(AH->FH, ctx->lastFilePos, SEEK_SET) != 0)
366 445 0 : pg_fatal("error during file seek: %m");
446 : }
447 :
996 tgl 448 EUB : for (;;)
8053 bruce 449 UIC 0 : {
996 tgl 450 UBC 0 : pgoff_t thisBlkPos = _getFilePos(AH, ctx);
996 tgl 451 EUB :
996 tgl 452 UIC 0 : _readBlockHeader(AH, &blkType, &id);
453 :
454 0 : if (blkType == EOF || id == te->dumpId)
996 tgl 455 EUB : break;
456 :
457 : /* Remember the block position, if we got one */
996 tgl 458 UBC 0 : if (thisBlkPos >= 0)
459 : {
460 0 : TocEntry *otherte = getTocEntryByDumpId(AH, id);
461 :
996 tgl 462 UIC 0 : if (otherte && otherte->formatData)
463 : {
996 tgl 464 UBC 0 : lclTocEntry *othertctx = (lclTocEntry *) otherte->formatData;
465 :
996 tgl 466 EUB : /*
467 : * Note: on Windows, multiple threads might access/update
468 : * the same lclTocEntry concurrently, but that should be
469 : * safe as long as we update dataPos before dataState.
470 : * Ideally, we'd use pg_write_barrier() to enforce that,
471 : * but the needed infrastructure doesn't exist in frontend
472 : * code. But Windows only runs on machines with strong
473 : * store ordering, so it should be okay for now.
474 : */
996 tgl 475 UIC 0 : if (othertctx->dataState == K_OFFSET_POS_NOT_SET)
476 : {
477 0 : othertctx->dataPos = thisBlkPos;
478 0 : othertctx->dataState = K_OFFSET_POS_SET;
479 : }
480 0 : else if (othertctx->dataPos != thisBlkPos ||
996 tgl 481 UBC 0 : othertctx->dataState != K_OFFSET_POS_SET)
482 : {
996 tgl 483 EUB : /* sanity check */
996 tgl 484 UBC 0 : pg_log_warning("data block %d has wrong seek position",
485 : id);
996 tgl 486 EUB : }
487 : }
488 : }
489 :
8053 bruce 490 UBC 0 : switch (blkType)
491 : {
8053 bruce 492 UIC 0 : case BLK_DATA:
493 0 : _skipData(AH);
494 0 : break;
495 :
8053 bruce 496 UBC 0 : case BLK_BLOBS:
125 peter 497 UNC 0 : _skipLOs(AH);
8053 bruce 498 UBC 0 : break;
8297 pjw 499 EUB :
8053 bruce 500 UBC 0 : default: /* Always have a default */
366 tgl 501 UIC 0 : pg_fatal("unrecognized data block type (%d) while searching archive",
366 tgl 502 EUB : blkType);
8053 bruce 503 : break;
504 : }
505 : }
506 : }
507 : else
508 : {
509 : /* We can just seek to the place we need to be. */
7537 peter_e 510 GIC 103 : if (fseeko(AH->FH, tctx->dataPos, SEEK_SET) != 0)
366 tgl 511 UIC 0 : pg_fatal("error during file seek: %m");
512 :
8297 pjw 513 GIC 103 : _readBlockHeader(AH, &blkType, &id);
514 : }
515 :
996 tgl 516 ECB : /*
996 tgl 517 EUB : * If we reached EOF without finding the block we want, then either it
518 : * doesn't exist, or it does but we lack the ability to seek back to it.
996 tgl 519 ECB : */
4669 tgl 520 GIC 103 : if (blkType == EOF)
521 : {
996 tgl 522 UIC 0 : if (!ctx->hasSeek)
366 523 0 : pg_fatal("could not find block ID %d in archive -- "
524 : "possibly due to out-of-order restore request, "
525 : "which cannot be handled due to non-seekable input file",
366 tgl 526 ECB : te->dumpId);
527 : else
366 tgl 528 UBC 0 : pg_fatal("could not find block ID %d in archive -- "
366 tgl 529 EUB : "possibly corrupt archive",
530 : te->dumpId);
531 : }
532 :
533 : /* Are we sane? */
7064 tgl 534 GBC 103 : if (id != te->dumpId)
366 tgl 535 UIC 0 : pg_fatal("found unexpected block ID (%d) when reading data -- expected %d",
536 : id, te->dumpId);
537 :
8053 bruce 538 GIC 103 : switch (blkType)
539 : {
8297 pjw 540 CBC 99 : case BLK_DATA:
8297 pjw 541 GBC 99 : _PrintData(AH);
8297 pjw 542 GIC 99 : break;
543 :
8297 pjw 544 CBC 4 : case BLK_BLOBS:
125 peter 545 GNC 4 : _LoadLOs(AH, AH->public.ropt->dropSchema);
8297 pjw 546 CBC 4 : break;
8297 pjw 547 ECB :
8053 bruce 548 LBC 0 : default: /* Always have a default */
366 tgl 549 UIC 0 : pg_fatal("unrecognized data block type %d while restoring archive",
366 tgl 550 ECB : blkType);
8297 pjw 551 : break;
8053 bruce 552 : }
553 :
996 tgl 554 EUB : /*
555 : * If our input file is seekable but lacks data offsets, update our
556 : * knowledge of where to start future searches from. (Note that we did
557 : * not update the current TE's dataState/dataPos. We could have, but
558 : * there is no point since it will not be visited again.)
559 : */
996 tgl 560 GIC 103 : if (ctx->hasSeek && tctx->dataState == K_OFFSET_POS_NOT_SET)
561 : {
996 tgl 562 UIC 0 : pgoff_t curPos = _getFilePos(AH, ctx);
563 :
564 0 : if (curPos > ctx->lastFilePos)
565 0 : ctx->lastFilePos = curPos;
996 tgl 566 ECB : }
567 : }
8297 pjw 568 EUB :
569 : /*
570 : * Print data from current file position.
571 : */
572 : static void
8053 bruce 573 GIC 107 : _PrintData(ArchiveHandle *AH)
574 : {
575 : CompressorState *cs;
576 :
45 tomas.vondra 577 GNC 107 : cs = AllocateCompressor(AH->compression_spec,
578 : _CustomReadFunc, NULL);
579 107 : cs->readData(AH, cs);
580 107 : EndCompressor(AH, cs);
8297 pjw 581 GIC 107 : }
582 :
583 : static void
125 peter 584 GNC 4 : _LoadLOs(ArchiveHandle *AH, bool drop)
585 : {
586 : Oid oid;
587 :
588 4 : StartRestoreLOs(AH);
589 :
8053 bruce 590 CBC 4 : oid = ReadInt(AH);
591 12 : while (oid != 0)
8053 bruce 592 ECB : {
125 peter 593 GNC 8 : StartRestoreLO(AH, oid, drop);
8297 pjw 594 GIC 8 : _PrintData(AH);
125 peter 595 GNC 8 : EndRestoreLO(AH, oid);
8297 pjw 596 GIC 8 : oid = ReadInt(AH);
597 : }
598 :
125 peter 599 GNC 4 : EndRestoreLOs(AH);
8297 pjw 600 GIC 4 : }
8297 pjw 601 ECB :
602 : /*
603 : * Skip the LOs from the current file position.
604 : * LOs are written sequentially as data blocks (see below).
605 : * Each LO is preceded by its original OID.
606 : * A zero OID indicates the end of the LOs.
607 : */
608 : static void
125 peter 609 UNC 0 : _skipLOs(ArchiveHandle *AH)
8297 pjw 610 ECB : {
7620 tgl 611 : Oid oid;
612 :
8053 bruce 613 UIC 0 : oid = ReadInt(AH);
614 0 : while (oid != 0)
615 : {
8195 pjw 616 0 : _skipData(AH);
617 0 : oid = ReadInt(AH);
618 : }
8297 619 0 : }
8297 pjw 620 EUB :
621 : /*
622 : * Skip data from current file position.
623 : * Data blocks are formatted as an integer length, followed by data.
1031 michael 624 : * A zero length indicates the end of the block.
8297 pjw 625 : */
626 : static void
8053 bruce 627 UBC 0 : _skipData(ArchiveHandle *AH)
8297 pjw 628 EUB : {
996 tgl 629 UIC 0 : lclContext *ctx = (lclContext *) AH->formatData;
7537 peter_e 630 EUB : size_t blkLen;
4511 heikki.linnakangas 631 UIC 0 : char *buf = NULL;
632 0 : int buflen = 0;
633 :
8297 pjw 634 0 : blkLen = ReadInt(AH);
8053 bruce 635 0 : while (blkLen != 0)
636 : {
996 tgl 637 0 : if (ctx->hasSeek)
8053 bruce 638 EUB : {
996 tgl 639 UIC 0 : if (fseeko(AH->FH, blkLen, SEEK_CUR) != 0)
366 tgl 640 UBC 0 : pg_fatal("error during file seek: %m");
641 : }
996 tgl 642 EUB : else
6166 peter_e 643 : {
996 tgl 644 UIC 0 : if (blkLen > buflen)
996 tgl 645 EUB : {
297 peter 646 UNC 0 : free(buf);
996 tgl 647 UBC 0 : buf = (char *) pg_malloc(blkLen);
996 tgl 648 UIC 0 : buflen = blkLen;
996 tgl 649 EUB : }
946 tgl 650 UBC 0 : if (fread(buf, 1, blkLen, AH->FH) != blkLen)
651 : {
996 tgl 652 UIC 0 : if (feof(AH->FH))
366 653 0 : pg_fatal("could not read from input file: end of file");
996 tgl 654 EUB : else
366 tgl 655 UIC 0 : pg_fatal("could not read from input file: %m");
996 tgl 656 EUB : }
6166 peter_e 657 : }
8053 bruce 658 :
8053 bruce 659 UIC 0 : blkLen = ReadInt(AH);
8053 bruce 660 EUB : }
661 :
297 peter 662 UNC 0 : free(buf);
8297 pjw 663 UIC 0 : }
8297 pjw 664 EUB :
665 : /*
666 : * Write a byte of data to the archive.
667 : *
668 : * Mandatory.
669 : *
670 : * Called by the archiver to do integer & byte output to the archive.
671 : */
8053 bruce 672 : static int
8053 bruce 673 GIC 611608 : _WriteByte(ArchiveHandle *AH, const int i)
674 : {
946 tgl 675 611608 : if (fputc(i, AH->FH) == EOF)
3261 bruce 676 UIC 0 : WRITE_ERROR_EXIT;
677 :
3261 bruce 678 GIC 611608 : return 1;
679 : }
680 :
681 : /*
8297 pjw 682 ECB : * Read a byte of data from the archive.
683 : *
684 : * Mandatory
8297 pjw 685 EUB : *
686 : * Called by the archiver to read bytes & integers from the archive.
5725 tgl 687 ECB : * EOF should be treated as a fatal error.
688 : */
689 : static int
8053 bruce 690 GIC 408466 : _ReadByte(ArchiveHandle *AH)
691 : {
692 : int res;
693 :
5725 tgl 694 408466 : res = getc(AH->FH);
695 408466 : if (res == EOF)
3261 bruce 696 UIC 0 : READ_ERROR_EXIT(AH->FH);
8053 bruce 697 GIC 408466 : return res;
698 : }
8297 pjw 699 ECB :
700 : /*
701 : * Write a buffer of data to the archive.
702 : *
703 : * Mandatory.
704 : *
8297 pjw 705 EUB : * Called by the archiver to write a block of bytes to the archive.
8297 pjw 706 ECB : */
707 : static void
7537 peter_e 708 GIC 67077 : _WriteBuf(ArchiveHandle *AH, const void *buf, size_t len)
709 : {
3261 bruce 710 67077 : if (fwrite(buf, 1, len, AH->FH) != len)
3261 bruce 711 UIC 0 : WRITE_ERROR_EXIT;
8297 pjw 712 GIC 67077 : }
713 :
714 : /*
715 : * Read a block of bytes from the archive.
716 : *
8297 pjw 717 ECB : * Mandatory.
718 : *
719 : * Called by the archiver to read a block of bytes from the archive
8297 pjw 720 EUB : */
3261 bruce 721 ECB : static void
7537 peter_e 722 GIC 44281 : _ReadBuf(ArchiveHandle *AH, void *buf, size_t len)
723 : {
3261 bruce 724 44281 : if (fread(buf, 1, len, AH->FH) != len)
3261 bruce 725 UIC 0 : READ_ERROR_EXIT(AH->FH);
8297 pjw 726 GIC 44281 : }
727 :
728 : /*
729 : * Close the archive.
730 : *
8297 pjw 731 ECB : * Mandatory.
732 : *
8053 bruce 733 : * When writing the archive, this is the routine that actually starts
8053 bruce 734 EUB : * the process of saving it to files. No data should be written prior
8297 pjw 735 ECB : * to this point, since the user could sort the TOC after creating it.
736 : *
737 : * If an archive is to be written, this routine must call:
738 : * WriteHead to save the archive header
739 : * WriteToc to save the TOC entries
740 : * WriteDataChunks to save all data & LOs.
741 : *
742 : */
743 : static void
2643 tgl 744 GIC 28 : _CloseArchive(ArchiveHandle *AH)
745 : {
8053 bruce 746 28 : lclContext *ctx = (lclContext *) AH->formatData;
747 : pgoff_t tpos;
748 :
749 28 : if (AH->mode == archModeWrite)
750 : {
8297 pjw 751 12 : WriteHead(AH);
752 : /* Remember TOC's seek position for use below */
7537 peter_e 753 CBC 12 : tpos = ftello(AH->FH);
3261 tgl 754 GIC 12 : if (tpos < 0 && ctx->hasSeek)
366 tgl 755 LBC 0 : pg_fatal("could not determine seek position in archive file: %m");
8297 pjw 756 GIC 12 : WriteToc(AH);
2643 tgl 757 12 : WriteDataChunks(AH, NULL);
8053 bruce 758 ECB :
759 : /*
4668 tgl 760 : * If possible, re-write the TOC in order to update the data offset
761 : * information. This is not essential, as pg_restore can cope in most
4660 bruce 762 : * cases without it; but it can make pg_restore significantly faster
763 : * in some situations (especially parallel restore).
8053 bruce 764 EUB : */
5179 andrew 765 CBC 24 : if (ctx->hasSeek &&
766 12 : fseeko(AH->FH, tpos, SEEK_SET) == 0)
8297 pjw 767 GIC 12 : WriteToc(AH);
768 : }
769 :
8053 bruce 770 28 : if (fclose(AH->FH) != 0)
366 tgl 771 UIC 0 : pg_fatal("could not close archive file: %m");
772 :
773 : /* Sync the output file if one is defined */
2209 andrew 774 CBC 28 : if (AH->dosync && AH->mode == archModeWrite && AH->fSpec)
1469 peter 775 10 : (void) fsync_fname(AH->fSpec, false);
2209 andrew 776 ECB :
8053 bruce 777 GIC 28 : AH->FH = NULL;
8297 pjw 778 28 : }
8297 pjw 779 ECB :
5179 andrew 780 EUB : /*
781 : * Reopen the archive's file handle.
782 : *
5179 andrew 783 ECB : * We close the original file handle, except on Windows. (The difference
784 : * is because on Windows, this is used within a multithreading context,
785 : * and we don't want a thread closing the parent file handle.)
786 : */
5050 bruce 787 : static void
5179 andrew 788 UIC 0 : _ReopenArchive(ArchiveHandle *AH)
789 : {
790 0 : lclContext *ctx = (lclContext *) AH->formatData;
791 : pgoff_t tpos;
792 :
793 0 : if (AH->mode == archModeWrite)
366 tgl 794 0 : pg_fatal("can only reopen input archives");
795 :
796 : /*
4242 tgl 797 EUB : * These two cases are user-facing errors since they represent unsupported
798 : * (but not invalid) use-cases. Word the error messages appropriately.
799 : */
5179 andrew 800 UIC 0 : if (AH->fSpec == NULL || strcmp(AH->fSpec, "") == 0)
366 tgl 801 0 : pg_fatal("parallel restore from standard input is not supported");
5179 andrew 802 UBC 0 : if (!ctx->hasSeek)
366 tgl 803 0 : pg_fatal("parallel restore from non-seekable file is not supported");
804 :
5179 andrew 805 UIC 0 : tpos = ftello(AH->FH);
3346 sfrost 806 0 : if (tpos < 0)
366 tgl 807 0 : pg_fatal("could not determine seek position in archive file: %m");
808 :
5179 andrew 809 EUB : #ifndef WIN32
5179 andrew 810 UBC 0 : if (fclose(AH->FH) != 0)
366 tgl 811 0 : pg_fatal("could not close archive file: %m");
5179 andrew 812 EUB : #endif
813 :
5179 andrew 814 UBC 0 : AH->FH = fopen(AH->fSpec, PG_BINARY_R);
815 0 : if (!AH->FH)
366 tgl 816 0 : pg_fatal("could not open input file \"%s\": %m", AH->fSpec);
817 :
5179 andrew 818 UIC 0 : if (fseeko(AH->FH, tpos, SEEK_SET) != 0)
366 tgl 819 UBC 0 : pg_fatal("could not set seek position in archive file: %m");
5179 andrew 820 0 : }
821 :
822 : /*
1668 tgl 823 EUB : * Prepare for parallel restore.
824 : *
825 : * The main thing that needs to happen here is to fill in TABLE DATA and BLOBS
826 : * TOC entries' dataLength fields with appropriate values to guide the
827 : * ordering of restore jobs. The source of said data is format-dependent,
828 : * as is the exact meaning of the values.
829 : *
830 : * A format module might also choose to do other setup here.
831 : */
832 : static void
1668 tgl 833 UIC 0 : _PrepParallelRestore(ArchiveHandle *AH)
834 : {
835 0 : lclContext *ctx = (lclContext *) AH->formatData;
836 0 : TocEntry *prev_te = NULL;
837 0 : lclTocEntry *prev_tctx = NULL;
838 : TocEntry *te;
839 :
840 : /*
841 : * Knowing that the data items were dumped out in TOC order, we can
1668 tgl 842 EUB : * reconstruct the length of each item as the delta to the start offset of
843 : * the next data item.
844 : */
1668 tgl 845 UBC 0 : for (te = AH->toc->next; te != AH->toc; te = te->next)
1668 tgl 846 EUB : {
1668 tgl 847 UIC 0 : lclTocEntry *tctx = (lclTocEntry *) te->formatData;
848 :
849 : /*
850 : * Ignore entries without a known data offset; if we were unable to
851 : * seek to rewrite the TOC when creating the archive, this'll be all
852 : * of them, and we'll end up with no size estimates.
853 : */
1668 tgl 854 UBC 0 : if (tctx->dataState != K_OFFSET_POS_SET)
1668 tgl 855 UIC 0 : continue;
1668 tgl 856 EUB :
857 : /* Compute previous data item's length */
1668 tgl 858 UIC 0 : if (prev_te)
859 : {
860 0 : if (tctx->dataPos > prev_tctx->dataPos)
861 0 : prev_te->dataLength = tctx->dataPos - prev_tctx->dataPos;
862 : }
1668 tgl 863 EUB :
1668 tgl 864 UBC 0 : prev_te = te;
1668 tgl 865 UIC 0 : prev_tctx = tctx;
866 : }
1668 tgl 867 EUB :
868 : /* If OK to seek, we can determine the length of the last item */
1668 tgl 869 UBC 0 : if (prev_te && ctx->hasSeek)
1668 tgl 870 EUB : {
871 : pgoff_t endpos;
872 :
1668 tgl 873 UBC 0 : if (fseeko(AH->FH, 0, SEEK_END) != 0)
366 874 0 : pg_fatal("error during file seek: %m");
1668 tgl 875 UIC 0 : endpos = ftello(AH->FH);
876 0 : if (endpos > prev_tctx->dataPos)
877 0 : prev_te->dataLength = endpos - prev_tctx->dataPos;
1668 tgl 878 EUB : }
1668 tgl 879 UIC 0 : }
880 :
881 : /*
4510 heikki.linnakangas 882 EUB : * Clone format-specific fields during parallel restoration.
883 : */
884 : static void
4510 heikki.linnakangas 885 UBC 0 : _Clone(ArchiveHandle *AH)
4510 heikki.linnakangas 886 EUB : {
4510 heikki.linnakangas 887 UIC 0 : lclContext *ctx = (lclContext *) AH->formatData;
4510 heikki.linnakangas 888 EUB :
889 : /*
890 : * Each thread must have private lclContext working state.
891 : */
4153 bruce 892 UIC 0 : AH->formatData = (lclContext *) pg_malloc(sizeof(lclContext));
4510 heikki.linnakangas 893 0 : memcpy(AH->formatData, ctx, sizeof(lclContext));
4510 heikki.linnakangas 894 UBC 0 : ctx = (lclContext *) AH->formatData;
895 :
4510 heikki.linnakangas 896 EUB : /* sanity check, shouldn't happen */
4510 heikki.linnakangas 897 UIC 0 : if (ctx->cs != NULL)
366 tgl 898 0 : pg_fatal("compressor active");
899 :
900 : /*
996 tgl 901 EUB : * We intentionally do not clone TOC-entry-local state: it's useful to
902 : * share knowledge about where the data blocks are across threads.
903 : * _PrintTocData has to be careful about the order of operations on that
904 : * state, though.
905 : *
4510 heikki.linnakangas 906 : * Note: we do not make a local lo_buf because we expect at most one BLOBS
996 tgl 907 : * entry per archive, so no parallelism is possible.
908 : */
4510 heikki.linnakangas 909 UIC 0 : }
910 :
911 : static void
912 0 : _DeClone(ArchiveHandle *AH)
913 : {
914 0 : lclContext *ctx = (lclContext *) AH->formatData;
915 :
916 0 : free(ctx);
917 0 : }
4510 heikki.linnakangas 918 EUB :
919 : /*
920 : * This function is executed in the child of a parallel restore from a
2385 tgl 921 : * custom-format archive and restores the actual data for one TOC entry.
922 : */
3668 andrew 923 : static int
2385 tgl 924 UIC 0 : _WorkerJobRestoreCustom(ArchiveHandle *AH, TocEntry *te)
3668 andrew 925 EUB : {
2385 tgl 926 UBC 0 : return parallel_restore(AH, te);
927 : }
928 :
929 : /*--------------------------------------------------
930 : * END OF FORMAT CALLBACKS
931 : *--------------------------------------------------
932 : */
8297 pjw 933 EUB :
934 : /*
935 : * Get the current position in the archive file.
936 : *
937 : * With a non-seekable archive file, we may not be able to obtain the
938 : * file position. If so, just return -1. It's not too important in
939 : * that case because we won't be able to rewrite the TOC to fill in
940 : * data block offsets anyway.
941 : */
942 : static pgoff_t
8053 bruce 943 GIC 119 : _getFilePos(ArchiveHandle *AH, lclContext *ctx)
944 : {
945 : pgoff_t pos;
946 :
996 tgl 947 119 : pos = ftello(AH->FH);
948 119 : if (pos < 0)
949 : {
950 : /* Not expected if we found we can seek. */
996 tgl 951 UIC 0 : if (ctx->hasSeek)
366 tgl 952 LBC 0 : pg_fatal("could not determine seek position in archive file: %m");
953 : }
8053 bruce 954 GIC 119 : return pos;
955 : }
8297 pjw 956 ECB :
957 : /*
958 : * Read a data block header. The format changed in V1.3, so we
959 : * centralize the code here for simplicity. Returns *type = EOF
4669 tgl 960 EUB : * if at EOF.
8297 pjw 961 : */
962 : static void
8053 bruce 963 CBC 103 : _readBlockHeader(ArchiveHandle *AH, int *type, int *id)
964 : {
965 : int byt;
966 :
967 : /*
968 : * Note: if we are at EOF with a pre-1.3 input file, we'll pg_fatal()
969 : * inside ReadInt rather than returning EOF. It doesn't seem worth
970 : * jumping through hoops to deal with that case better, because no such
971 : * files are likely to exist in the wild: only some 7.1 development
332 tgl 972 ECB : * versions of pg_dump ever generated such files.
973 : */
8053 bruce 974 GIC 103 : if (AH->version < K_VERS_1_3)
8297 pjw 975 UIC 0 : *type = BLK_DATA;
976 : else
977 : {
4669 tgl 978 GIC 103 : byt = getc(AH->FH);
979 103 : *type = byt;
980 103 : if (byt == EOF)
981 : {
4669 tgl 982 UIC 0 : *id = 0; /* don't return an uninitialized value */
4669 tgl 983 LBC 0 : return;
4669 tgl 984 EUB : }
985 : }
986 :
8053 bruce 987 CBC 103 : *id = ReadInt(AH);
8297 pjw 988 ECB : }
989 :
990 : /*
991 : * Callback function for writeData. Writes one block of (compressed)
4511 heikki.linnakangas 992 EUB : * data to the archive.
993 : */
994 : static void
4511 heikki.linnakangas 995 GIC 187 : _CustomWriteFunc(ArchiveHandle *AH, const char *buf, size_t len)
8297 pjw 996 ECB : {
997 : /* never write 0-byte blocks (this should not happen) */
3261 bruce 998 GIC 187 : if (len > 0)
999 : {
1000 133 : WriteInt(AH, len);
1001 133 : _WriteBuf(AH, buf, len);
1002 : }
8297 pjw 1003 187 : }
8297 pjw 1004 ECB :
1005 : /*
1006 : * Callback function for readData. To keep things simple, we
4511 heikki.linnakangas 1007 : * always read one compressed block at a time.
1008 : */
1009 : static size_t
4511 heikki.linnakangas 1010 CBC 240 : _CustomReadFunc(ArchiveHandle *AH, char **buf, size_t *buflen)
1011 : {
4511 heikki.linnakangas 1012 ECB : size_t blkLen;
1013 :
1014 : /* Read length */
4511 heikki.linnakangas 1015 GIC 240 : blkLen = ReadInt(AH);
1016 240 : if (blkLen == 0)
1017 107 : return 0;
1018 :
4511 heikki.linnakangas 1019 ECB : /* If the caller's buffer is not large enough, allocate a bigger one */
4511 heikki.linnakangas 1020 GIC 133 : if (blkLen > *buflen)
1021 : {
4511 heikki.linnakangas 1022 UIC 0 : free(*buf);
4153 bruce 1023 0 : *buf = (char *) pg_malloc(blkLen);
4511 heikki.linnakangas 1024 LBC 0 : *buflen = blkLen;
8053 bruce 1025 ECB : }
8297 pjw 1026 :
1027 : /* exits app on read errors */
3261 bruce 1028 GIC 133 : _ReadBuf(AH, *buf, blkLen);
3261 bruce 1029 ECB :
3261 bruce 1030 GIC 133 : return blkLen;
8297 pjw 1031 EUB : }
|