LCOV - differential code coverage report
Current view: top level - src/bin/pg_dump - pg_backup_custom.c (source / functions) Coverage Total Hit UNC LBC UIC UBC GBC GIC GNC CBC EUB ECB DUB DCB
Current: Differential Code Coverage HEAD vs 15 Lines: 58.1 % 334 194 4 6 88 42 2 74 21 97 91 76 5 15
Current Date: 2023-04-08 15:15:32 Functions: 77.4 % 31 24 1 6 13 7 4 7 16 2
Baseline: 15
Baseline Date: 2023-04-08 15:09:40
Legend: Lines: hit not hit

           TLA  Line data    Source code
       1                 : /*-------------------------------------------------------------------------
       2                 :  *
       3                 :  * pg_backup_custom.c
       4                 :  *
       5                 :  *  Implements the custom output format.
       6                 :  *
       7                 :  *  The comments with the routines in this code are a good place to
       8                 :  *  understand how to write a new format.
       9                 :  *
      10                 :  *  See the headers to pg_restore for more details.
      11                 :  *
      12                 :  * Copyright (c) 2000, Philip Warner
      13                 :  *      Rights are granted to use this software in any way so long
      14                 :  *      as this notice is not removed.
      15                 :  *
      16                 :  *  The author is not responsible for loss or damages that may
      17                 :  *  and any liability will be limited to the time taken to fix any
      18                 :  *  related bug.
      19                 :  *
      20                 :  *
      21                 :  * IDENTIFICATION
      22                 :  *      src/bin/pg_dump/pg_backup_custom.c
      23                 :  *
      24                 :  *-------------------------------------------------------------------------
      25                 :  */
      26                 : #include "postgres_fe.h"
      27                 : 
      28                 : #include "common/file_utils.h"
      29                 : #include "compress_io.h"
      30                 : #include "parallel.h"
      31                 : #include "pg_backup_utils.h"
      32                 : 
      33                 : /*--------
      34                 :  * Routines in the format interface
      35                 :  *--------
      36                 :  */
      37                 : 
      38                 : static void _ArchiveEntry(ArchiveHandle *AH, TocEntry *te);
      39                 : static void _StartData(ArchiveHandle *AH, TocEntry *te);
      40                 : static void _WriteData(ArchiveHandle *AH, const void *data, size_t dLen);
      41                 : static void _EndData(ArchiveHandle *AH, TocEntry *te);
      42                 : static int  _WriteByte(ArchiveHandle *AH, const int i);
      43                 : static int  _ReadByte(ArchiveHandle *AH);
      44                 : static void _WriteBuf(ArchiveHandle *AH, const void *buf, size_t len);
      45                 : static void _ReadBuf(ArchiveHandle *AH, void *buf, size_t len);
      46                 : static void _CloseArchive(ArchiveHandle *AH);
      47                 : static void _ReopenArchive(ArchiveHandle *AH);
      48                 : static void _PrintTocData(ArchiveHandle *AH, TocEntry *te);
      49                 : static void _WriteExtraToc(ArchiveHandle *AH, TocEntry *te);
      50                 : static void _ReadExtraToc(ArchiveHandle *AH, TocEntry *te);
      51                 : static void _PrintExtraToc(ArchiveHandle *AH, TocEntry *te);
      52                 : 
      53                 : static void _PrintData(ArchiveHandle *AH);
      54                 : static void _skipData(ArchiveHandle *AH);
      55                 : static void _skipLOs(ArchiveHandle *AH);
      56                 : 
      57                 : static void _StartLOs(ArchiveHandle *AH, TocEntry *te);
      58                 : static void _StartLO(ArchiveHandle *AH, TocEntry *te, Oid oid);
      59                 : static void _EndLO(ArchiveHandle *AH, TocEntry *te, Oid oid);
      60                 : static void _EndLOs(ArchiveHandle *AH, TocEntry *te);
      61                 : static void _LoadLOs(ArchiveHandle *AH, bool drop);
      62                 : 
      63                 : static void _PrepParallelRestore(ArchiveHandle *AH);
      64                 : static void _Clone(ArchiveHandle *AH);
      65                 : static void _DeClone(ArchiveHandle *AH);
      66                 : 
      67                 : static int  _WorkerJobRestoreCustom(ArchiveHandle *AH, TocEntry *te);
      68                 : 
      69                 : typedef struct
      70                 : {
      71                 :     CompressorState *cs;
      72                 :     int         hasSeek;
      73                 :     /* lastFilePos is used only when reading, and may be invalid if !hasSeek */
      74                 :     pgoff_t     lastFilePos;    /* position after last data block we've read */
      75                 : } lclContext;
      76                 : 
      77                 : typedef struct
      78                 : {
      79                 :     int         dataState;
      80                 :     pgoff_t     dataPos;        /* valid only if dataState=K_OFFSET_POS_SET */
      81                 : } lclTocEntry;
      82                 : 
      83                 : 
      84                 : /*------
      85                 :  * Static declarations
      86                 :  *------
      87                 :  */
      88                 : static void _readBlockHeader(ArchiveHandle *AH, int *type, int *id);
      89                 : static pgoff_t _getFilePos(ArchiveHandle *AH, lclContext *ctx);
      90                 : 
      91                 : static void _CustomWriteFunc(ArchiveHandle *AH, const char *buf, size_t len);
      92                 : static size_t _CustomReadFunc(ArchiveHandle *AH, char **buf, size_t *buflen);
      93                 : 
      94                 : 
      95                 : /*
      96                 :  *  Init routine required by ALL formats. This is a global routine
      97                 :  *  and should be declared in pg_backup_archiver.h
      98                 :  *
      99                 :  *  It's task is to create any extra archive context (using AH->formatData),
     100                 :  *  and to initialize the supported function pointers.
     101                 :  *
     102                 :  *  It should also prepare whatever its input source is for reading/writing,
     103                 :  *  and in the case of a read mode connection, it should load the Header & TOC.
     104                 :  */
     105                 : void
     106 CBC          28 : InitArchiveFmt_Custom(ArchiveHandle *AH)
     107                 : {
     108                 :     lclContext *ctx;
     109                 : 
     110                 :     /* Assuming static functions, this can be copied for each format. */
     111              28 :     AH->ArchiveEntryPtr = _ArchiveEntry;
     112              28 :     AH->StartDataPtr = _StartData;
     113              28 :     AH->WriteDataPtr = _WriteData;
     114              28 :     AH->EndDataPtr = _EndData;
     115              28 :     AH->WriteBytePtr = _WriteByte;
     116              28 :     AH->ReadBytePtr = _ReadByte;
     117              28 :     AH->WriteBufPtr = _WriteBuf;
     118              28 :     AH->ReadBufPtr = _ReadBuf;
     119              28 :     AH->ClosePtr = _CloseArchive;
     120              28 :     AH->ReopenPtr = _ReopenArchive;
     121              28 :     AH->PrintTocDataPtr = _PrintTocData;
     122              28 :     AH->ReadExtraTocPtr = _ReadExtraToc;
     123              28 :     AH->WriteExtraTocPtr = _WriteExtraToc;
     124              28 :     AH->PrintExtraTocPtr = _PrintExtraToc;
     125                 : 
     126 GNC          28 :     AH->StartLOsPtr = _StartLOs;
     127              28 :     AH->StartLOPtr = _StartLO;
     128              28 :     AH->EndLOPtr = _EndLO;
     129              28 :     AH->EndLOsPtr = _EndLOs;
     130                 : 
     131 CBC          28 :     AH->PrepParallelRestorePtr = _PrepParallelRestore;
     132              28 :     AH->ClonePtr = _Clone;
     133              28 :     AH->DeClonePtr = _DeClone;
     134                 : 
     135                 :     /* no parallel dump in the custom archive, only parallel restore */
     136              28 :     AH->WorkerJobDumpPtr = NULL;
     137              28 :     AH->WorkerJobRestorePtr = _WorkerJobRestoreCustom;
     138                 : 
     139                 :     /* Set up a private area. */
     140              28 :     ctx = (lclContext *) pg_malloc0(sizeof(lclContext));
     141              28 :     AH->formatData = (void *) ctx;
     142                 : 
     143                 :     /* Initialize LO buffering */
     144              28 :     AH->lo_buf_size = LOBBUFSIZE;
     145              28 :     AH->lo_buf = (void *) pg_malloc(LOBBUFSIZE);
     146                 : 
     147                 :     /*
     148                 :      * Now open the file
     149                 :      */
     150              28 :     if (AH->mode == archModeWrite)
     151                 :     {
     152              12 :         if (AH->fSpec && strcmp(AH->fSpec, "") != 0)
     153                 :         {
     154              12 :             AH->FH = fopen(AH->fSpec, PG_BINARY_W);
     155              12 :             if (!AH->FH)
     156 UBC           0 :                 pg_fatal("could not open output file \"%s\": %m", AH->fSpec);
     157                 :         }
     158                 :         else
     159                 :         {
     160               0 :             AH->FH = stdout;
     161               0 :             if (!AH->FH)
     162               0 :                 pg_fatal("could not open output file: %m");
     163                 :         }
     164                 : 
     165 CBC          12 :         ctx->hasSeek = checkSeek(AH->FH);
     166                 :     }
     167                 :     else
     168                 :     {
     169              16 :         if (AH->fSpec && strcmp(AH->fSpec, "") != 0)
     170                 :         {
     171              16 :             AH->FH = fopen(AH->fSpec, PG_BINARY_R);
     172              16 :             if (!AH->FH)
     173 UBC           0 :                 pg_fatal("could not open input file \"%s\": %m", AH->fSpec);
     174                 :         }
     175                 :         else
     176                 :         {
     177               0 :             AH->FH = stdin;
     178               0 :             if (!AH->FH)
     179               0 :                 pg_fatal("could not open input file: %m");
     180                 :         }
     181                 : 
     182 CBC          16 :         ctx->hasSeek = checkSeek(AH->FH);
     183                 : 
     184              16 :         ReadHead(AH);
     185              16 :         ReadToc(AH);
     186                 : 
     187                 :         /*
     188                 :          * Remember location of first data block (i.e., the point after TOC)
     189                 :          * in case we have to search for desired data blocks.
     190                 :          */
     191              16 :         ctx->lastFilePos = _getFilePos(AH, ctx);
     192                 :     }
     193              28 : }
     194                 : 
     195                 : /*
     196                 :  * Called by the Archiver when the dumper creates a new TOC entry.
     197                 :  *
     198                 :  * Optional.
     199                 :  *
     200                 :  * Set up extract format-related TOC data.
     201                 : */
     202                 : static void
     203            3169 : _ArchiveEntry(ArchiveHandle *AH, TocEntry *te)
     204                 : {
     205                 :     lclTocEntry *ctx;
     206                 : 
     207            3169 :     ctx = (lclTocEntry *) pg_malloc0(sizeof(lclTocEntry));
     208            3169 :     if (te->dataDumper)
     209             105 :         ctx->dataState = K_OFFSET_POS_NOT_SET;
     210                 :     else
     211            3064 :         ctx->dataState = K_OFFSET_NO_DATA;
     212                 : 
     213            3169 :     te->formatData = (void *) ctx;
     214            3169 : }
     215                 : 
     216                 : /*
     217                 :  * Called by the Archiver to save any extra format-related TOC entry
     218                 :  * data.
     219                 :  *
     220                 :  * Optional.
     221                 :  *
     222                 :  * Use the Archiver routines to write data - they are non-endian, and
     223                 :  * maintain other important file information.
     224                 :  */
     225                 : static void
     226            6334 : _WriteExtraToc(ArchiveHandle *AH, TocEntry *te)
     227                 : {
     228            6334 :     lclTocEntry *ctx = (lclTocEntry *) te->formatData;
     229                 : 
     230            6334 :     WriteOffset(AH, ctx->dataPos, ctx->dataState);
     231            6334 : }
     232                 : 
     233                 : /*
     234                 :  * Called by the Archiver to read any extra format-related TOC data.
     235                 :  *
     236                 :  * Optional.
     237                 :  *
     238                 :  * Needs to match the order defined in _WriteExtraToc, and should also
     239                 :  * use the Archiver input routines.
     240                 :  */
     241                 : static void
     242            4211 : _ReadExtraToc(ArchiveHandle *AH, TocEntry *te)
     243                 : {
     244            4211 :     lclTocEntry *ctx = (lclTocEntry *) te->formatData;
     245                 : 
     246            4211 :     if (ctx == NULL)
     247                 :     {
     248            4211 :         ctx = (lclTocEntry *) pg_malloc0(sizeof(lclTocEntry));
     249            4211 :         te->formatData = (void *) ctx;
     250                 :     }
     251                 : 
     252            4211 :     ctx->dataState = ReadOffset(AH, &(ctx->dataPos));
     253                 : 
     254                 :     /*
     255                 :      * Prior to V1.7 (pg7.3), we dumped the data size as an int now we don't
     256                 :      * dump it at all.
     257                 :      */
     258            4211 :     if (AH->version < K_VERS_1_7)
     259 UBC           0 :         ReadInt(AH);
     260 CBC        4211 : }
     261                 : 
     262                 : /*
     263                 :  * Called by the Archiver when restoring an archive to output a comment
     264                 :  * that includes useful information about the TOC entry.
     265                 :  *
     266                 :  * Optional.
     267                 :  */
     268                 : static void
     269            1266 : _PrintExtraToc(ArchiveHandle *AH, TocEntry *te)
     270                 : {
     271            1266 :     lclTocEntry *ctx = (lclTocEntry *) te->formatData;
     272                 : 
     273            1266 :     if (AH->public.verbose)
     274             226 :         ahprintf(AH, "-- Data Pos: " INT64_FORMAT "\n",
     275             226 :                  (int64) ctx->dataPos);
     276            1266 : }
     277                 : 
     278                 : /*
     279                 :  * Called by the archiver when saving TABLE DATA (not schema). This routine
     280                 :  * should save whatever format-specific information is needed to read
     281                 :  * the archive back.
     282                 :  *
     283                 :  * It is called just prior to the dumper's 'DataDumper' routine being called.
     284                 :  *
     285                 :  * Optional, but strongly recommended.
     286                 :  *
     287                 :  */
     288                 : static void
     289              99 : _StartData(ArchiveHandle *AH, TocEntry *te)
     290                 : {
     291              99 :     lclContext *ctx = (lclContext *) AH->formatData;
     292              99 :     lclTocEntry *tctx = (lclTocEntry *) te->formatData;
     293                 : 
     294              99 :     tctx->dataPos = _getFilePos(AH, ctx);
     295              99 :     if (tctx->dataPos >= 0)
     296              99 :         tctx->dataState = K_OFFSET_POS_SET;
     297                 : 
     298              99 :     _WriteByte(AH, BLK_DATA);   /* Block type */
     299              99 :     WriteInt(AH, te->dumpId);    /* For sanity check */
     300                 : 
     301 GNC          99 :     ctx->cs = AllocateCompressor(AH->compression_spec,
     302                 :                                  NULL,
     303                 :                                  _CustomWriteFunc);
     304 GIC          99 : }
     305                 : 
     306 ECB             : /*
     307                 :  * Called by archiver when dumper calls WriteData. This routine is
     308                 :  * called for both LO and table data; it is the responsibility of
     309                 :  * the format to manage each kind of data using StartLO/StartData.
     310                 :  *
     311                 :  * It should only be called from within a DataDumper routine.
     312                 :  *
     313                 :  * Mandatory.
     314                 :  */
     315                 : static void
     316 GIC         227 : _WriteData(ArchiveHandle *AH, const void *data, size_t dLen)
     317                 : {
     318 CBC         227 :     lclContext *ctx = (lclContext *) AH->formatData;
     319 GIC         227 :     CompressorState *cs = ctx->cs;
     320 ECB             : 
     321 CBC         227 :     if (dLen > 0)
     322                 :         /* writeData() internally throws write errors */
     323 GNC         219 :         cs->writeData(AH, cs, data, dLen);
     324 GIC         227 : }
     325 ECB             : 
     326                 : /*
     327                 :  * Called by the archiver when a dumper's 'DataDumper' routine has
     328                 :  * finished.
     329                 :  *
     330                 :  * Mandatory.
     331                 :  */
     332                 : static void
     333 GIC          99 : _EndData(ArchiveHandle *AH, TocEntry *te)
     334                 : {
     335 CBC          99 :     lclContext *ctx = (lclContext *) AH->formatData;
     336                 : 
     337              99 :     EndCompressor(AH, ctx->cs);
     338 GNC          99 :     ctx->cs = NULL;
     339                 : 
     340                 :     /* Send the end marker */
     341 CBC          99 :     WriteInt(AH, 0);
     342              99 : }
     343                 : 
     344                 : /*
     345 ECB             :  * Called by the archiver when starting to save all BLOB DATA (not schema).
     346                 :  * This routine should save whatever format-specific information is needed
     347                 :  * to read the LOs back into memory.
     348                 :  *
     349                 :  * It is called just prior to the dumper's DataDumper routine.
     350                 :  *
     351                 :  * Optional, but strongly recommended.
     352                 :  */
     353                 : static void
     354 GNC           4 : _StartLOs(ArchiveHandle *AH, TocEntry *te)
     355                 : {
     356 GIC           4 :     lclContext *ctx = (lclContext *) AH->formatData;
     357               4 :     lclTocEntry *tctx = (lclTocEntry *) te->formatData;
     358 ECB             : 
     359 GIC           4 :     tctx->dataPos = _getFilePos(AH, ctx);
     360 CBC           4 :     if (tctx->dataPos >= 0)
     361               4 :         tctx->dataState = K_OFFSET_POS_SET;
     362                 : 
     363               4 :     _WriteByte(AH, BLK_BLOBS);  /* Block type */
     364               4 :     WriteInt(AH, te->dumpId);    /* For sanity check */
     365               4 : }
     366                 : 
     367 ECB             : /*
     368                 :  * Called by the archiver when the dumper calls StartLO.
     369                 :  *
     370                 :  * Mandatory.
     371                 :  *
     372                 :  * Must save the passed OID for retrieval at restore-time.
     373                 :  */
     374                 : static void
     375 GNC           8 : _StartLO(ArchiveHandle *AH, TocEntry *te, Oid oid)
     376                 : {
     377 GIC           8 :     lclContext *ctx = (lclContext *) AH->formatData;
     378                 : 
     379 CBC           8 :     if (oid == 0)
     380 UIC           0 :         pg_fatal("invalid OID for large object");
     381 ECB             : 
     382 GIC           8 :     WriteInt(AH, oid);
     383 ECB             : 
     384 GNC           8 :     ctx->cs = AllocateCompressor(AH->compression_spec,
     385                 :                                  NULL,
     386                 :                                  _CustomWriteFunc);
     387 GIC           8 : }
     388 ECB             : 
     389                 : /*
     390                 :  * Called by the archiver when the dumper calls EndLO.
     391                 :  *
     392                 :  * Optional.
     393                 :  */
     394                 : static void
     395 GNC           8 : _EndLO(ArchiveHandle *AH, TocEntry *te, Oid oid)
     396                 : {
     397 GIC           8 :     lclContext *ctx = (lclContext *) AH->formatData;
     398                 : 
     399               8 :     EndCompressor(AH, ctx->cs);
     400                 :     /* Send the end marker */
     401 CBC           8 :     WriteInt(AH, 0);
     402 GIC           8 : }
     403 ECB             : 
     404                 : /*
     405                 :  * Called by the archiver when finishing saving all BLOB DATA.
     406                 :  *
     407                 :  * Optional.
     408                 :  */
     409                 : static void
     410 GNC           4 : _EndLOs(ArchiveHandle *AH, TocEntry *te)
     411                 : {
     412                 :     /* Write out a fake zero OID to mark end-of-LOs. */
     413 GIC           4 :     WriteInt(AH, 0);
     414               4 : }
     415                 : 
     416 ECB             : /*
     417                 :  * Print data for a given TOC entry
     418                 :  */
     419                 : static void
     420 CBC         103 : _PrintTocData(ArchiveHandle *AH, TocEntry *te)
     421                 : {
     422 GIC         103 :     lclContext *ctx = (lclContext *) AH->formatData;
     423             103 :     lclTocEntry *tctx = (lclTocEntry *) te->formatData;
     424                 :     int         blkType;
     425                 :     int         id;
     426 ECB             : 
     427 GIC         103 :     if (tctx->dataState == K_OFFSET_NO_DATA)
     428 LBC           0 :         return;
     429 ECB             : 
     430 GIC         103 :     if (!ctx->hasSeek || tctx->dataState == K_OFFSET_POS_NOT_SET)
     431                 :     {
     432                 :         /*
     433 ECB             :          * We cannot seek directly to the desired block.  Instead, skip over
     434 EUB             :          * block headers until we find the one we want.  Remember the
     435                 :          * positions of skipped-over blocks, so that if we later decide we
     436 ECB             :          * need to read one, we'll be able to seek to it.
     437                 :          *
     438                 :          * When our input file is seekable, we can do the search starting from
     439                 :          * the point after the last data block we scanned in previous
     440                 :          * iterations of this function.
     441                 :          */
     442 UIC           0 :         if (ctx->hasSeek)
     443                 :         {
     444               0 :             if (fseeko(AH->FH, ctx->lastFilePos, SEEK_SET) != 0)
     445               0 :                 pg_fatal("error during file seek: %m");
     446                 :         }
     447                 : 
     448 EUB             :         for (;;)
     449 UIC           0 :         {
     450 UBC           0 :             pgoff_t     thisBlkPos = _getFilePos(AH, ctx);
     451 EUB             : 
     452 UIC           0 :             _readBlockHeader(AH, &blkType, &id);
     453                 : 
     454               0 :             if (blkType == EOF || id == te->dumpId)
     455 EUB             :                 break;
     456                 : 
     457                 :             /* Remember the block position, if we got one */
     458 UBC           0 :             if (thisBlkPos >= 0)
     459                 :             {
     460               0 :                 TocEntry   *otherte = getTocEntryByDumpId(AH, id);
     461                 : 
     462 UIC           0 :                 if (otherte && otherte->formatData)
     463                 :                 {
     464 UBC           0 :                     lclTocEntry *othertctx = (lclTocEntry *) otherte->formatData;
     465                 : 
     466 EUB             :                     /*
     467                 :                      * Note: on Windows, multiple threads might access/update
     468                 :                      * the same lclTocEntry concurrently, but that should be
     469                 :                      * safe as long as we update dataPos before dataState.
     470                 :                      * Ideally, we'd use pg_write_barrier() to enforce that,
     471                 :                      * but the needed infrastructure doesn't exist in frontend
     472                 :                      * code.  But Windows only runs on machines with strong
     473                 :                      * store ordering, so it should be okay for now.
     474                 :                      */
     475 UIC           0 :                     if (othertctx->dataState == K_OFFSET_POS_NOT_SET)
     476                 :                     {
     477               0 :                         othertctx->dataPos = thisBlkPos;
     478               0 :                         othertctx->dataState = K_OFFSET_POS_SET;
     479                 :                     }
     480               0 :                     else if (othertctx->dataPos != thisBlkPos ||
     481 UBC           0 :                              othertctx->dataState != K_OFFSET_POS_SET)
     482                 :                     {
     483 EUB             :                         /* sanity check */
     484 UBC           0 :                         pg_log_warning("data block %d has wrong seek position",
     485                 :                                        id);
     486 EUB             :                     }
     487                 :                 }
     488                 :             }
     489                 : 
     490 UBC           0 :             switch (blkType)
     491                 :             {
     492 UIC           0 :                 case BLK_DATA:
     493               0 :                     _skipData(AH);
     494               0 :                     break;
     495                 : 
     496 UBC           0 :                 case BLK_BLOBS:
     497 UNC           0 :                     _skipLOs(AH);
     498 UBC           0 :                     break;
     499 EUB             : 
     500 UBC           0 :                 default:        /* Always have a default */
     501 UIC           0 :                     pg_fatal("unrecognized data block type (%d) while searching archive",
     502 EUB             :                              blkType);
     503                 :                     break;
     504                 :             }
     505                 :         }
     506                 :     }
     507                 :     else
     508                 :     {
     509                 :         /* We can just seek to the place we need to be. */
     510 GIC         103 :         if (fseeko(AH->FH, tctx->dataPos, SEEK_SET) != 0)
     511 UIC           0 :             pg_fatal("error during file seek: %m");
     512                 : 
     513 GIC         103 :         _readBlockHeader(AH, &blkType, &id);
     514                 :     }
     515                 : 
     516 ECB             :     /*
     517 EUB             :      * If we reached EOF without finding the block we want, then either it
     518                 :      * doesn't exist, or it does but we lack the ability to seek back to it.
     519 ECB             :      */
     520 GIC         103 :     if (blkType == EOF)
     521                 :     {
     522 UIC           0 :         if (!ctx->hasSeek)
     523               0 :             pg_fatal("could not find block ID %d in archive -- "
     524                 :                      "possibly due to out-of-order restore request, "
     525                 :                      "which cannot be handled due to non-seekable input file",
     526 ECB             :                      te->dumpId);
     527                 :         else
     528 UBC           0 :             pg_fatal("could not find block ID %d in archive -- "
     529 EUB             :                      "possibly corrupt archive",
     530                 :                      te->dumpId);
     531                 :     }
     532                 : 
     533                 :     /* Are we sane? */
     534 GBC         103 :     if (id != te->dumpId)
     535 UIC           0 :         pg_fatal("found unexpected block ID (%d) when reading data -- expected %d",
     536                 :                  id, te->dumpId);
     537                 : 
     538 GIC         103 :     switch (blkType)
     539                 :     {
     540 CBC          99 :         case BLK_DATA:
     541 GBC          99 :             _PrintData(AH);
     542 GIC          99 :             break;
     543                 : 
     544 CBC           4 :         case BLK_BLOBS:
     545 GNC           4 :             _LoadLOs(AH, AH->public.ropt->dropSchema);
     546 CBC           4 :             break;
     547 ECB             : 
     548 LBC           0 :         default:                /* Always have a default */
     549 UIC           0 :             pg_fatal("unrecognized data block type %d while restoring archive",
     550 ECB             :                      blkType);
     551                 :             break;
     552                 :     }
     553                 : 
     554 EUB             :     /*
     555                 :      * If our input file is seekable but lacks data offsets, update our
     556                 :      * knowledge of where to start future searches from.  (Note that we did
     557                 :      * not update the current TE's dataState/dataPos.  We could have, but
     558                 :      * there is no point since it will not be visited again.)
     559                 :      */
     560 GIC         103 :     if (ctx->hasSeek && tctx->dataState == K_OFFSET_POS_NOT_SET)
     561                 :     {
     562 UIC           0 :         pgoff_t     curPos = _getFilePos(AH, ctx);
     563                 : 
     564               0 :         if (curPos > ctx->lastFilePos)
     565               0 :             ctx->lastFilePos = curPos;
     566 ECB             :     }
     567                 : }
     568 EUB             : 
     569                 : /*
     570                 :  * Print data from current file position.
     571                 : */
     572                 : static void
     573 GIC         107 : _PrintData(ArchiveHandle *AH)
     574                 : {
     575                 :     CompressorState *cs;
     576                 : 
     577 GNC         107 :     cs = AllocateCompressor(AH->compression_spec,
     578                 :                             _CustomReadFunc, NULL);
     579             107 :     cs->readData(AH, cs);
     580             107 :     EndCompressor(AH, cs);
     581 GIC         107 : }
     582                 : 
     583                 : static void
     584 GNC           4 : _LoadLOs(ArchiveHandle *AH, bool drop)
     585                 : {
     586                 :     Oid         oid;
     587                 : 
     588               4 :     StartRestoreLOs(AH);
     589                 : 
     590 CBC           4 :     oid = ReadInt(AH);
     591              12 :     while (oid != 0)
     592 ECB             :     {
     593 GNC           8 :         StartRestoreLO(AH, oid, drop);
     594 GIC           8 :         _PrintData(AH);
     595 GNC           8 :         EndRestoreLO(AH, oid);
     596 GIC           8 :         oid = ReadInt(AH);
     597                 :     }
     598                 : 
     599 GNC           4 :     EndRestoreLOs(AH);
     600 GIC           4 : }
     601 ECB             : 
     602                 : /*
     603                 :  * Skip the LOs from the current file position.
     604                 :  * LOs are written sequentially as data blocks (see below).
     605                 :  * Each LO is preceded by its original OID.
     606                 :  * A zero OID indicates the end of the LOs.
     607                 :  */
     608                 : static void
     609 UNC           0 : _skipLOs(ArchiveHandle *AH)
     610 ECB             : {
     611                 :     Oid         oid;
     612                 : 
     613 UIC           0 :     oid = ReadInt(AH);
     614               0 :     while (oid != 0)
     615                 :     {
     616               0 :         _skipData(AH);
     617               0 :         oid = ReadInt(AH);
     618                 :     }
     619               0 : }
     620 EUB             : 
     621                 : /*
     622                 :  * Skip data from current file position.
     623                 :  * Data blocks are formatted as an integer length, followed by data.
     624                 :  * A zero length indicates the end of the block.
     625                 : */
     626                 : static void
     627 UBC           0 : _skipData(ArchiveHandle *AH)
     628 EUB             : {
     629 UIC           0 :     lclContext *ctx = (lclContext *) AH->formatData;
     630 EUB             :     size_t      blkLen;
     631 UIC           0 :     char       *buf = NULL;
     632               0 :     int         buflen = 0;
     633                 : 
     634               0 :     blkLen = ReadInt(AH);
     635               0 :     while (blkLen != 0)
     636                 :     {
     637               0 :         if (ctx->hasSeek)
     638 EUB             :         {
     639 UIC           0 :             if (fseeko(AH->FH, blkLen, SEEK_CUR) != 0)
     640 UBC           0 :                 pg_fatal("error during file seek: %m");
     641                 :         }
     642 EUB             :         else
     643                 :         {
     644 UIC           0 :             if (blkLen > buflen)
     645 EUB             :             {
     646 UNC           0 :                 free(buf);
     647 UBC           0 :                 buf = (char *) pg_malloc(blkLen);
     648 UIC           0 :                 buflen = blkLen;
     649 EUB             :             }
     650 UBC           0 :             if (fread(buf, 1, blkLen, AH->FH) != blkLen)
     651                 :             {
     652 UIC           0 :                 if (feof(AH->FH))
     653               0 :                     pg_fatal("could not read from input file: end of file");
     654 EUB             :                 else
     655 UIC           0 :                     pg_fatal("could not read from input file: %m");
     656 EUB             :             }
     657                 :         }
     658                 : 
     659 UIC           0 :         blkLen = ReadInt(AH);
     660 EUB             :     }
     661                 : 
     662 UNC           0 :     free(buf);
     663 UIC           0 : }
     664 EUB             : 
     665                 : /*
     666                 :  * Write a byte of data to the archive.
     667                 :  *
     668                 :  * Mandatory.
     669                 :  *
     670                 :  * Called by the archiver to do integer & byte output to the archive.
     671                 :  */
     672                 : static int
     673 GIC      611608 : _WriteByte(ArchiveHandle *AH, const int i)
     674                 : {
     675          611608 :     if (fputc(i, AH->FH) == EOF)
     676 UIC           0 :         WRITE_ERROR_EXIT;
     677                 : 
     678 GIC      611608 :     return 1;
     679                 : }
     680                 : 
     681                 : /*
     682 ECB             :  * Read a byte of data from the archive.
     683                 :  *
     684                 :  * Mandatory
     685 EUB             :  *
     686                 :  * Called by the archiver to read bytes & integers from the archive.
     687 ECB             :  * EOF should be treated as a fatal error.
     688                 :  */
     689                 : static int
     690 GIC      408466 : _ReadByte(ArchiveHandle *AH)
     691                 : {
     692                 :     int         res;
     693                 : 
     694          408466 :     res = getc(AH->FH);
     695          408466 :     if (res == EOF)
     696 UIC           0 :         READ_ERROR_EXIT(AH->FH);
     697 GIC      408466 :     return res;
     698                 : }
     699 ECB             : 
     700                 : /*
     701                 :  * Write a buffer of data to the archive.
     702                 :  *
     703                 :  * Mandatory.
     704                 :  *
     705 EUB             :  * Called by the archiver to write a block of bytes to the archive.
     706 ECB             :  */
     707                 : static void
     708 GIC       67077 : _WriteBuf(ArchiveHandle *AH, const void *buf, size_t len)
     709                 : {
     710           67077 :     if (fwrite(buf, 1, len, AH->FH) != len)
     711 UIC           0 :         WRITE_ERROR_EXIT;
     712 GIC       67077 : }
     713                 : 
     714                 : /*
     715                 :  * Read a block of bytes from the archive.
     716                 :  *
     717 ECB             :  * Mandatory.
     718                 :  *
     719                 :  * Called by the archiver to read a block of bytes from the archive
     720 EUB             :  */
     721 ECB             : static void
     722 GIC       44281 : _ReadBuf(ArchiveHandle *AH, void *buf, size_t len)
     723                 : {
     724           44281 :     if (fread(buf, 1, len, AH->FH) != len)
     725 UIC           0 :         READ_ERROR_EXIT(AH->FH);
     726 GIC       44281 : }
     727                 : 
     728                 : /*
     729                 :  * Close the archive.
     730                 :  *
     731 ECB             :  * Mandatory.
     732                 :  *
     733                 :  * When writing the archive, this is the routine that actually starts
     734 EUB             :  * the process of saving it to files. No data should be written prior
     735 ECB             :  * to this point, since the user could sort the TOC after creating it.
     736                 :  *
     737                 :  * If an archive is to be written, this routine must call:
     738                 :  *      WriteHead           to save the archive header
     739                 :  *      WriteToc            to save the TOC entries
     740                 :  *      WriteDataChunks     to save all data & LOs.
     741                 :  *
     742                 :  */
     743                 : static void
     744 GIC          28 : _CloseArchive(ArchiveHandle *AH)
     745                 : {
     746              28 :     lclContext *ctx = (lclContext *) AH->formatData;
     747                 :     pgoff_t     tpos;
     748                 : 
     749              28 :     if (AH->mode == archModeWrite)
     750                 :     {
     751              12 :         WriteHead(AH);
     752                 :         /* Remember TOC's seek position for use below */
     753 CBC          12 :         tpos = ftello(AH->FH);
     754 GIC          12 :         if (tpos < 0 && ctx->hasSeek)
     755 LBC           0 :             pg_fatal("could not determine seek position in archive file: %m");
     756 GIC          12 :         WriteToc(AH);
     757              12 :         WriteDataChunks(AH, NULL);
     758 ECB             : 
     759                 :         /*
     760                 :          * If possible, re-write the TOC in order to update the data offset
     761                 :          * information.  This is not essential, as pg_restore can cope in most
     762                 :          * cases without it; but it can make pg_restore significantly faster
     763                 :          * in some situations (especially parallel restore).
     764 EUB             :          */
     765 CBC          24 :         if (ctx->hasSeek &&
     766              12 :             fseeko(AH->FH, tpos, SEEK_SET) == 0)
     767 GIC          12 :             WriteToc(AH);
     768                 :     }
     769                 : 
     770              28 :     if (fclose(AH->FH) != 0)
     771 UIC           0 :         pg_fatal("could not close archive file: %m");
     772                 : 
     773                 :     /* Sync the output file if one is defined */
     774 CBC          28 :     if (AH->dosync && AH->mode == archModeWrite && AH->fSpec)
     775              10 :         (void) fsync_fname(AH->fSpec, false);
     776 ECB             : 
     777 GIC          28 :     AH->FH = NULL;
     778              28 : }
     779 ECB             : 
     780 EUB             : /*
     781                 :  * Reopen the archive's file handle.
     782                 :  *
     783 ECB             :  * We close the original file handle, except on Windows.  (The difference
     784                 :  * is because on Windows, this is used within a multithreading context,
     785                 :  * and we don't want a thread closing the parent file handle.)
     786                 :  */
     787                 : static void
     788 UIC           0 : _ReopenArchive(ArchiveHandle *AH)
     789                 : {
     790               0 :     lclContext *ctx = (lclContext *) AH->formatData;
     791                 :     pgoff_t     tpos;
     792                 : 
     793               0 :     if (AH->mode == archModeWrite)
     794               0 :         pg_fatal("can only reopen input archives");
     795                 : 
     796                 :     /*
     797 EUB             :      * These two cases are user-facing errors since they represent unsupported
     798                 :      * (but not invalid) use-cases.  Word the error messages appropriately.
     799                 :      */
     800 UIC           0 :     if (AH->fSpec == NULL || strcmp(AH->fSpec, "") == 0)
     801               0 :         pg_fatal("parallel restore from standard input is not supported");
     802 UBC           0 :     if (!ctx->hasSeek)
     803               0 :         pg_fatal("parallel restore from non-seekable file is not supported");
     804                 : 
     805 UIC           0 :     tpos = ftello(AH->FH);
     806               0 :     if (tpos < 0)
     807               0 :         pg_fatal("could not determine seek position in archive file: %m");
     808                 : 
     809 EUB             : #ifndef WIN32
     810 UBC           0 :     if (fclose(AH->FH) != 0)
     811               0 :         pg_fatal("could not close archive file: %m");
     812 EUB             : #endif
     813                 : 
     814 UBC           0 :     AH->FH = fopen(AH->fSpec, PG_BINARY_R);
     815               0 :     if (!AH->FH)
     816               0 :         pg_fatal("could not open input file \"%s\": %m", AH->fSpec);
     817                 : 
     818 UIC           0 :     if (fseeko(AH->FH, tpos, SEEK_SET) != 0)
     819 UBC           0 :         pg_fatal("could not set seek position in archive file: %m");
     820               0 : }
     821                 : 
     822                 : /*
     823 EUB             :  * Prepare for parallel restore.
     824                 :  *
     825                 :  * The main thing that needs to happen here is to fill in TABLE DATA and BLOBS
     826                 :  * TOC entries' dataLength fields with appropriate values to guide the
     827                 :  * ordering of restore jobs.  The source of said data is format-dependent,
     828                 :  * as is the exact meaning of the values.
     829                 :  *
     830                 :  * A format module might also choose to do other setup here.
     831                 :  */
     832                 : static void
     833 UIC           0 : _PrepParallelRestore(ArchiveHandle *AH)
     834                 : {
     835               0 :     lclContext *ctx = (lclContext *) AH->formatData;
     836               0 :     TocEntry   *prev_te = NULL;
     837               0 :     lclTocEntry *prev_tctx = NULL;
     838                 :     TocEntry   *te;
     839                 : 
     840                 :     /*
     841                 :      * Knowing that the data items were dumped out in TOC order, we can
     842 EUB             :      * reconstruct the length of each item as the delta to the start offset of
     843                 :      * the next data item.
     844                 :      */
     845 UBC           0 :     for (te = AH->toc->next; te != AH->toc; te = te->next)
     846 EUB             :     {
     847 UIC           0 :         lclTocEntry *tctx = (lclTocEntry *) te->formatData;
     848                 : 
     849                 :         /*
     850                 :          * Ignore entries without a known data offset; if we were unable to
     851                 :          * seek to rewrite the TOC when creating the archive, this'll be all
     852                 :          * of them, and we'll end up with no size estimates.
     853                 :          */
     854 UBC           0 :         if (tctx->dataState != K_OFFSET_POS_SET)
     855 UIC           0 :             continue;
     856 EUB             : 
     857                 :         /* Compute previous data item's length */
     858 UIC           0 :         if (prev_te)
     859                 :         {
     860               0 :             if (tctx->dataPos > prev_tctx->dataPos)
     861               0 :                 prev_te->dataLength = tctx->dataPos - prev_tctx->dataPos;
     862                 :         }
     863 EUB             : 
     864 UBC           0 :         prev_te = te;
     865 UIC           0 :         prev_tctx = tctx;
     866                 :     }
     867 EUB             : 
     868                 :     /* If OK to seek, we can determine the length of the last item */
     869 UBC           0 :     if (prev_te && ctx->hasSeek)
     870 EUB             :     {
     871                 :         pgoff_t     endpos;
     872                 : 
     873 UBC           0 :         if (fseeko(AH->FH, 0, SEEK_END) != 0)
     874               0 :             pg_fatal("error during file seek: %m");
     875 UIC           0 :         endpos = ftello(AH->FH);
     876               0 :         if (endpos > prev_tctx->dataPos)
     877               0 :             prev_te->dataLength = endpos - prev_tctx->dataPos;
     878 EUB             :     }
     879 UIC           0 : }
     880                 : 
     881                 : /*
     882 EUB             :  * Clone format-specific fields during parallel restoration.
     883                 :  */
     884                 : static void
     885 UBC           0 : _Clone(ArchiveHandle *AH)
     886 EUB             : {
     887 UIC           0 :     lclContext *ctx = (lclContext *) AH->formatData;
     888 EUB             : 
     889                 :     /*
     890                 :      * Each thread must have private lclContext working state.
     891                 :      */
     892 UIC           0 :     AH->formatData = (lclContext *) pg_malloc(sizeof(lclContext));
     893               0 :     memcpy(AH->formatData, ctx, sizeof(lclContext));
     894 UBC           0 :     ctx = (lclContext *) AH->formatData;
     895                 : 
     896 EUB             :     /* sanity check, shouldn't happen */
     897 UIC           0 :     if (ctx->cs != NULL)
     898               0 :         pg_fatal("compressor active");
     899                 : 
     900                 :     /*
     901 EUB             :      * We intentionally do not clone TOC-entry-local state: it's useful to
     902                 :      * share knowledge about where the data blocks are across threads.
     903                 :      * _PrintTocData has to be careful about the order of operations on that
     904                 :      * state, though.
     905                 :      *
     906                 :      * Note: we do not make a local lo_buf because we expect at most one BLOBS
     907                 :      * entry per archive, so no parallelism is possible.
     908                 :      */
     909 UIC           0 : }
     910                 : 
     911                 : static void
     912               0 : _DeClone(ArchiveHandle *AH)
     913                 : {
     914               0 :     lclContext *ctx = (lclContext *) AH->formatData;
     915                 : 
     916               0 :     free(ctx);
     917               0 : }
     918 EUB             : 
     919                 : /*
     920                 :  * This function is executed in the child of a parallel restore from a
     921                 :  * custom-format archive and restores the actual data for one TOC entry.
     922                 :  */
     923                 : static int
     924 UIC           0 : _WorkerJobRestoreCustom(ArchiveHandle *AH, TocEntry *te)
     925 EUB             : {
     926 UBC           0 :     return parallel_restore(AH, te);
     927                 : }
     928                 : 
     929                 : /*--------------------------------------------------
     930                 :  * END OF FORMAT CALLBACKS
     931                 :  *--------------------------------------------------
     932                 :  */
     933 EUB             : 
     934                 : /*
     935                 :  * Get the current position in the archive file.
     936                 :  *
     937                 :  * With a non-seekable archive file, we may not be able to obtain the
     938                 :  * file position.  If so, just return -1.  It's not too important in
     939                 :  * that case because we won't be able to rewrite the TOC to fill in
     940                 :  * data block offsets anyway.
     941                 :  */
     942                 : static pgoff_t
     943 GIC         119 : _getFilePos(ArchiveHandle *AH, lclContext *ctx)
     944                 : {
     945                 :     pgoff_t     pos;
     946                 : 
     947             119 :     pos = ftello(AH->FH);
     948             119 :     if (pos < 0)
     949                 :     {
     950                 :         /* Not expected if we found we can seek. */
     951 UIC           0 :         if (ctx->hasSeek)
     952 LBC           0 :             pg_fatal("could not determine seek position in archive file: %m");
     953                 :     }
     954 GIC         119 :     return pos;
     955                 : }
     956 ECB             : 
     957                 : /*
     958                 :  * Read a data block header. The format changed in V1.3, so we
     959                 :  * centralize the code here for simplicity.  Returns *type = EOF
     960 EUB             :  * if at EOF.
     961                 :  */
     962                 : static void
     963 CBC         103 : _readBlockHeader(ArchiveHandle *AH, int *type, int *id)
     964                 : {
     965                 :     int         byt;
     966                 : 
     967                 :     /*
     968                 :      * Note: if we are at EOF with a pre-1.3 input file, we'll pg_fatal()
     969                 :      * inside ReadInt rather than returning EOF.  It doesn't seem worth
     970                 :      * jumping through hoops to deal with that case better, because no such
     971                 :      * files are likely to exist in the wild: only some 7.1 development
     972 ECB             :      * versions of pg_dump ever generated such files.
     973                 :      */
     974 GIC         103 :     if (AH->version < K_VERS_1_3)
     975 UIC           0 :         *type = BLK_DATA;
     976                 :     else
     977                 :     {
     978 GIC         103 :         byt = getc(AH->FH);
     979             103 :         *type = byt;
     980             103 :         if (byt == EOF)
     981                 :         {
     982 UIC           0 :             *id = 0;            /* don't return an uninitialized value */
     983 LBC           0 :             return;
     984 EUB             :         }
     985                 :     }
     986                 : 
     987 CBC         103 :     *id = ReadInt(AH);
     988 ECB             : }
     989                 : 
     990                 : /*
     991                 :  * Callback function for writeData. Writes one block of (compressed)
     992 EUB             :  * data to the archive.
     993                 :  */
     994                 : static void
     995 GIC         187 : _CustomWriteFunc(ArchiveHandle *AH, const char *buf, size_t len)
     996 ECB             : {
     997                 :     /* never write 0-byte blocks (this should not happen) */
     998 GIC         187 :     if (len > 0)
     999                 :     {
    1000             133 :         WriteInt(AH, len);
    1001             133 :         _WriteBuf(AH, buf, len);
    1002                 :     }
    1003             187 : }
    1004 ECB             : 
    1005                 : /*
    1006                 :  * Callback function for readData. To keep things simple, we
    1007                 :  * always read one compressed block at a time.
    1008                 :  */
    1009                 : static size_t
    1010 CBC         240 : _CustomReadFunc(ArchiveHandle *AH, char **buf, size_t *buflen)
    1011                 : {
    1012 ECB             :     size_t      blkLen;
    1013                 : 
    1014                 :     /* Read length */
    1015 GIC         240 :     blkLen = ReadInt(AH);
    1016             240 :     if (blkLen == 0)
    1017             107 :         return 0;
    1018                 : 
    1019 ECB             :     /* If the caller's buffer is not large enough, allocate a bigger one */
    1020 GIC         133 :     if (blkLen > *buflen)
    1021                 :     {
    1022 UIC           0 :         free(*buf);
    1023               0 :         *buf = (char *) pg_malloc(blkLen);
    1024 LBC           0 :         *buflen = blkLen;
    1025 ECB             :     }
    1026                 : 
    1027                 :     /* exits app on read errors */
    1028 GIC         133 :     _ReadBuf(AH, *buf, blkLen);
    1029 ECB             : 
    1030 GIC         133 :     return blkLen;
    1031 EUB             : }
        

Generated by: LCOV version v1.16-55-g56c0a2a