TLA Line data Source code
1 : /*-------------------------------------------------------------------------
2 : *
3 : * pg_backup_archiver.c
4 : *
5 : * Private implementation of the archiver routines.
6 : *
7 : * See the headers to pg_restore for more details.
8 : *
9 : * Copyright (c) 2000, Philip Warner
10 : * Rights are granted to use this software in any way so long
11 : * as this notice is not removed.
12 : *
13 : * The author is not responsible for loss or damages that may
14 : * result from its use.
15 : *
16 : *
17 : * IDENTIFICATION
18 : * src/bin/pg_dump/pg_backup_archiver.c
19 : *
20 : *-------------------------------------------------------------------------
21 : */
22 : #include "postgres_fe.h"
23 :
24 : #include <ctype.h>
25 : #include <fcntl.h>
26 : #include <unistd.h>
27 : #include <sys/stat.h>
28 : #include <sys/wait.h>
29 : #ifdef WIN32
30 : #include <io.h>
31 : #endif
32 :
33 : #include "common/string.h"
34 : #include "compress_io.h"
35 : #include "dumputils.h"
36 : #include "fe_utils/string_utils.h"
37 : #include "lib/stringinfo.h"
38 : #include "libpq/libpq-fs.h"
39 : #include "parallel.h"
40 : #include "pg_backup_archiver.h"
41 : #include "pg_backup_db.h"
42 : #include "pg_backup_utils.h"
43 :
44 : #define TEXT_DUMP_HEADER "--\n-- PostgreSQL database dump\n--\n\n"
45 : #define TEXT_DUMPALL_HEADER "--\n-- PostgreSQL database cluster dump\n--\n\n"
46 :
47 : /*
48 : * State for tracking TocEntrys that are ready to process during a parallel
49 : * restore. (This used to be a list, and we still call it that, though now
50 : * it's really an array so that we can apply qsort to it.)
51 : *
52 : * tes[] is sized large enough that we can't overrun it.
53 : * The valid entries are indexed first_te .. last_te inclusive.
54 : * We periodically sort the array to bring larger-by-dataLength entries to
55 : * the front; "sorted" is true if the valid entries are known sorted.
56 : */
57 : typedef struct _parallelReadyList
58 : {
59 : TocEntry **tes; /* Ready-to-dump TocEntrys */
60 : int first_te; /* index of first valid entry in tes[] */
61 : int last_te; /* index of last valid entry in tes[] */
62 : bool sorted; /* are valid entries currently sorted? */
63 : } ParallelReadyList;
64 :
65 :
66 : static ArchiveHandle *_allocAH(const char *FileSpec, const ArchiveFormat fmt,
67 : const pg_compress_specification compression_spec,
68 : bool dosync, ArchiveMode mode,
69 : SetupWorkerPtrType setupWorkerPtr);
70 : static void _getObjectDescription(PQExpBuffer buf, const TocEntry *te);
71 : static void _printTocEntry(ArchiveHandle *AH, TocEntry *te, bool isData);
72 : static char *sanitize_line(const char *str, bool want_hyphen);
73 : static void _doSetFixedOutputState(ArchiveHandle *AH);
74 : static void _doSetSessionAuth(ArchiveHandle *AH, const char *user);
75 : static void _reconnectToDB(ArchiveHandle *AH, const char *dbname);
76 : static void _becomeUser(ArchiveHandle *AH, const char *user);
77 : static void _becomeOwner(ArchiveHandle *AH, TocEntry *te);
78 : static void _selectOutputSchema(ArchiveHandle *AH, const char *schemaName);
79 : static void _selectTablespace(ArchiveHandle *AH, const char *tablespace);
80 : static void _selectTableAccessMethod(ArchiveHandle *AH, const char *tableam);
81 : static void processEncodingEntry(ArchiveHandle *AH, TocEntry *te);
82 : static void processStdStringsEntry(ArchiveHandle *AH, TocEntry *te);
83 : static void processSearchPathEntry(ArchiveHandle *AH, TocEntry *te);
84 : static int _tocEntryRequired(TocEntry *te, teSection curSection, ArchiveHandle *AH);
85 : static RestorePass _tocEntryRestorePass(TocEntry *te);
86 : static bool _tocEntryIsACL(TocEntry *te);
87 : static void _disableTriggersIfNecessary(ArchiveHandle *AH, TocEntry *te);
88 : static void _enableTriggersIfNecessary(ArchiveHandle *AH, TocEntry *te);
89 : static bool is_load_via_partition_root(TocEntry *te);
90 : static void buildTocEntryArrays(ArchiveHandle *AH);
91 : static void _moveBefore(TocEntry *pos, TocEntry *te);
92 : static int _discoverArchiveFormat(ArchiveHandle *AH);
93 :
94 : static int RestoringToDB(ArchiveHandle *AH);
95 : static void dump_lo_buf(ArchiveHandle *AH);
96 : static void dumpTimestamp(ArchiveHandle *AH, const char *msg, time_t tim);
97 : static void SetOutput(ArchiveHandle *AH, const char *filename,
98 : const pg_compress_specification compression_spec);
99 : static CompressFileHandle *SaveOutput(ArchiveHandle *AH);
100 : static void RestoreOutput(ArchiveHandle *AH, CompressFileHandle *savedOutput);
101 :
102 : static int restore_toc_entry(ArchiveHandle *AH, TocEntry *te, bool is_parallel);
103 : static void restore_toc_entries_prefork(ArchiveHandle *AH,
104 : TocEntry *pending_list);
105 : static void restore_toc_entries_parallel(ArchiveHandle *AH,
106 : ParallelState *pstate,
107 : TocEntry *pending_list);
108 : static void restore_toc_entries_postfork(ArchiveHandle *AH,
109 : TocEntry *pending_list);
110 : static void pending_list_header_init(TocEntry *l);
111 : static void pending_list_append(TocEntry *l, TocEntry *te);
112 : static void pending_list_remove(TocEntry *te);
113 : static void ready_list_init(ParallelReadyList *ready_list, int tocCount);
114 : static void ready_list_free(ParallelReadyList *ready_list);
115 : static void ready_list_insert(ParallelReadyList *ready_list, TocEntry *te);
116 : static void ready_list_remove(ParallelReadyList *ready_list, int i);
117 : static void ready_list_sort(ParallelReadyList *ready_list);
118 : static int TocEntrySizeCompare(const void *p1, const void *p2);
119 : static void move_to_ready_list(TocEntry *pending_list,
120 : ParallelReadyList *ready_list,
121 : RestorePass pass);
122 : static TocEntry *pop_next_work_item(ParallelReadyList *ready_list,
123 : ParallelState *pstate);
124 : static void mark_dump_job_done(ArchiveHandle *AH,
125 : TocEntry *te,
126 : int status,
127 : void *callback_data);
128 : static void mark_restore_job_done(ArchiveHandle *AH,
129 : TocEntry *te,
130 : int status,
131 : void *callback_data);
132 : static void fix_dependencies(ArchiveHandle *AH);
133 : static bool has_lock_conflicts(TocEntry *te1, TocEntry *te2);
134 : static void repoint_table_dependencies(ArchiveHandle *AH);
135 : static void identify_locking_dependencies(ArchiveHandle *AH, TocEntry *te);
136 : static void reduce_dependencies(ArchiveHandle *AH, TocEntry *te,
137 : ParallelReadyList *ready_list);
138 : static void mark_create_done(ArchiveHandle *AH, TocEntry *te);
139 : static void inhibit_data_for_failed_table(ArchiveHandle *AH, TocEntry *te);
140 :
141 : static void StrictNamesCheck(RestoreOptions *ropt);
142 :
143 :
144 ECB : /*
145 : * Allocate a new DumpOptions block containing all default values.
146 : */
147 : DumpOptions *
148 CBC 33 : NewDumpOptions(void)
149 ECB : {
150 GIC 33 : DumpOptions *opts = (DumpOptions *) pg_malloc(sizeof(DumpOptions));
151 :
152 33 : InitDumpOptions(opts);
153 33 : return opts;
154 : }
155 :
156 ECB : /*
157 : * Initialize a DumpOptions struct to all default values
158 : */
159 : void
160 CBC 183 : InitDumpOptions(DumpOptions *opts)
161 ECB : {
162 CBC 183 : memset(opts, 0, sizeof(DumpOptions));
163 ECB : /* set any fields that shouldn't default to zeroes */
164 GIC 183 : opts->include_everything = true;
165 183 : opts->cparams.promptPassword = TRI_DEFAULT;
166 183 : opts->dumpSections = DUMP_UNSECTIONED;
167 183 : }
168 :
169 : /*
170 ECB : * Create a freshly allocated DumpOptions with options equivalent to those
171 : * found in the given RestoreOptions.
172 : */
173 : DumpOptions *
174 GIC 33 : dumpOptionsFromRestoreOptions(RestoreOptions *ropt)
175 ECB : {
176 CBC 33 : DumpOptions *dopt = NewDumpOptions();
177 ECB :
178 : /* this is the inverse of what's at the end of pg_dump.c's main() */
179 CBC 33 : dopt->cparams.dbname = ropt->cparams.dbname ? pg_strdup(ropt->cparams.dbname) : NULL;
180 33 : dopt->cparams.pgport = ropt->cparams.pgport ? pg_strdup(ropt->cparams.pgport) : NULL;
181 33 : dopt->cparams.pghost = ropt->cparams.pghost ? pg_strdup(ropt->cparams.pghost) : NULL;
182 33 : dopt->cparams.username = ropt->cparams.username ? pg_strdup(ropt->cparams.username) : NULL;
183 33 : dopt->cparams.promptPassword = ropt->cparams.promptPassword;
184 33 : dopt->outputClean = ropt->dropSchema;
185 33 : dopt->dataOnly = ropt->dataOnly;
186 33 : dopt->schemaOnly = ropt->schemaOnly;
187 33 : dopt->if_exists = ropt->if_exists;
188 33 : dopt->column_inserts = ropt->column_inserts;
189 33 : dopt->dumpSections = ropt->dumpSections;
190 33 : dopt->aclsSkip = ropt->aclsSkip;
191 33 : dopt->outputSuperuser = ropt->superuser;
192 33 : dopt->outputCreateDB = ropt->createDB;
193 33 : dopt->outputNoOwner = ropt->noOwner;
194 33 : dopt->outputNoTableAm = ropt->noTableAm;
195 33 : dopt->outputNoTablespaces = ropt->noTablespace;
196 33 : dopt->disable_triggers = ropt->disable_triggers;
197 33 : dopt->use_setsessauth = ropt->use_setsessauth;
198 33 : dopt->disable_dollar_quoting = ropt->disable_dollar_quoting;
199 33 : dopt->dump_inserts = ropt->dump_inserts;
200 33 : dopt->no_comments = ropt->no_comments;
201 33 : dopt->no_publications = ropt->no_publications;
202 33 : dopt->no_security_labels = ropt->no_security_labels;
203 33 : dopt->no_subscriptions = ropt->no_subscriptions;
204 GIC 33 : dopt->lockWaitTimeout = ropt->lockWaitTimeout;
205 CBC 33 : dopt->include_everything = ropt->include_everything;
206 GIC 33 : dopt->enable_row_security = ropt->enable_row_security;
207 33 : dopt->sequence_data = ropt->sequence_data;
208 :
209 33 : return dopt;
210 : }
211 :
212 :
213 : /*
214 : * Wrapper functions.
215 : *
216 : * The objective is to make writing new formats and dumpers as simple
217 : * as possible, if necessary at the expense of extra function calls etc.
218 : *
219 : */
220 :
221 : /*
222 : * The dump worker setup needs lots of knowledge of the internals of pg_dump,
223 ECB : * so it's defined in pg_dump.c and passed into OpenArchive. The restore worker
224 : * setup doesn't need to know anything much, so it's defined here.
225 : */
226 : static void
227 CBC 10 : setupRestoreWorker(Archive *AHX)
228 ECB : {
229 GIC 10 : ArchiveHandle *AH = (ArchiveHandle *) AHX;
230 :
231 10 : AH->ReopenPtr(AH);
232 10 : }
233 :
234 ECB :
235 : /* Create a new archive */
236 : /* Public */
237 : Archive *
238 GIC 133 : CreateArchive(const char *FileSpec, const ArchiveFormat fmt,
239 : const pg_compress_specification compression_spec,
240 : bool dosync, ArchiveMode mode,
241 ECB : SetupWorkerPtrType setupDumpWorker)
242 :
243 : {
244 GNC 133 : ArchiveHandle *AH = _allocAH(FileSpec, fmt, compression_spec,
245 : dosync, mode, setupDumpWorker);
246 :
247 GIC 132 : return (Archive *) AH;
248 : }
249 :
250 ECB : /* Open an existing archive */
251 : /* Public */
252 : Archive *
253 CBC 31 : OpenArchive(const char *FileSpec, const ArchiveFormat fmt)
254 : {
255 : ArchiveHandle *AH;
256 GNC 31 : pg_compress_specification compression_spec = {0};
257 :
258 31 : compression_spec.algorithm = PG_COMPRESSION_NONE;
259 31 : AH = _allocAH(FileSpec, fmt, compression_spec, true,
260 : archModeRead, setupRestoreWorker);
261 ECB :
262 GIC 31 : return (Archive *) AH;
263 : }
264 ECB :
265 : /* Public */
266 : void
267 GIC 148 : CloseArchive(Archive *AHX)
268 : {
269 148 : ArchiveHandle *AH = (ArchiveHandle *) AHX;
270 ECB :
271 GIC 148 : AH->ClosePtr(AH);
272 ECB :
273 : /* Close the output */
274 GNC 148 : errno = 0;
275 148 : if (!EndCompressFileHandle(AH->OF))
276 UIC 0 : pg_fatal("could not close output file: %m");
277 CBC 148 : }
278 :
279 : /* Public */
280 ECB : void
281 CBC 285 : SetArchiveOptions(Archive *AH, DumpOptions *dopt, RestoreOptions *ropt)
282 : {
283 : /* Caller can omit dump options, in which case we synthesize them */
284 285 : if (dopt == NULL && ropt != NULL)
285 33 : dopt = dumpOptionsFromRestoreOptions(ropt);
286 ECB :
287 : /* Save options for later access */
288 GIC 285 : AH->dopt = dopt;
289 285 : AH->ropt = ropt;
290 CBC 285 : }
291 :
292 ECB : /* Public */
293 : void
294 GIC 144 : ProcessArchiveRestoreOptions(Archive *AHX)
295 : {
296 144 : ArchiveHandle *AH = (ArchiveHandle *) AHX;
297 144 : RestoreOptions *ropt = AH->public.ropt;
298 ECB : TocEntry *te;
299 : teSection curSection;
300 :
301 : /* Decide which TOC entries will be dumped/restored, and mark them */
302 GIC 144 : curSection = SECTION_PRE_DATA;
303 26403 : for (te = AH->toc->next; te != AH->toc; te = te->next)
304 : {
305 : /*
306 : * When writing an archive, we also take this opportunity to check
307 ECB : * that we have generated the entries in a sane order that respects
308 : * the section divisions. When reading, don't complain, since buggy
309 : * old versions of pg_dump might generate out-of-order archives.
310 : */
311 CBC 26259 : if (AH->mode != archModeRead)
312 : {
313 21324 : switch (te->section)
314 ECB : {
315 CBC 2713 : case SECTION_NONE:
316 EUB : /* ok to be anywhere */
317 CBC 2713 : break;
318 10513 : case SECTION_PRE_DATA:
319 10513 : if (curSection != SECTION_PRE_DATA)
320 UBC 0 : pg_log_warning("archive items not in correct section order");
321 CBC 10513 : break;
322 3475 : case SECTION_DATA:
323 GIC 3475 : if (curSection == SECTION_POST_DATA)
324 LBC 0 : pg_log_warning("archive items not in correct section order");
325 GBC 3475 : break;
326 4623 : case SECTION_POST_DATA:
327 : /* ok no matter which section we were in */
328 GIC 4623 : break;
329 UIC 0 : default:
330 0 : pg_fatal("unexpected section code %d",
331 : (int) te->section);
332 ECB : break;
333 : }
334 : }
335 :
336 GIC 26259 : if (te->section != SECTION_NONE)
337 22694 : curSection = te->section;
338 :
339 CBC 26259 : te->reqs = _tocEntryRequired(te, curSection, AH);
340 EUB : }
341 ECB :
342 : /* Enforce strict names checking */
343 GIC 144 : if (ropt->strict_names)
344 UIC 0 : StrictNamesCheck(ropt);
345 CBC 144 : }
346 :
347 ECB : /* Public */
348 : void
349 GIC 121 : RestoreArchive(Archive *AHX)
350 : {
351 121 : ArchiveHandle *AH = (ArchiveHandle *) AHX;
352 121 : RestoreOptions *ropt = AH->public.ropt;
353 ECB : bool parallel_mode;
354 : TocEntry *te;
355 : CompressFileHandle *sav;
356 :
357 GIC 121 : AH->stage = STAGE_INITIALIZING;
358 ECB :
359 : /*
360 : * If we're going to do parallel restore, there are some restrictions.
361 : */
362 CBC 121 : parallel_mode = (AH->public.numWorkers > 1 && ropt->useDB);
363 GBC 121 : if (parallel_mode)
364 : {
365 : /* We haven't got round to making this work for all archive formats */
366 CBC 4 : if (AH->ClonePtr == NULL || AH->ReopenPtr == NULL)
367 UBC 0 : pg_fatal("parallel restore is not supported with this archive file format");
368 :
369 : /* Doesn't work if the archive represents dependencies as OIDs */
370 GIC 4 : if (AH->version < K_VERS_1_8)
371 UIC 0 : pg_fatal("parallel restore is not supported with archives made by pre-8.0 pg_dump");
372 :
373 ECB : /*
374 : * It's also not gonna work if we can't reopen the input file, so
375 : * let's try that immediately.
376 : */
377 GIC 4 : AH->ReopenPtr(AH);
378 : }
379 ECB :
380 : /*
381 : * Make sure we won't need (de)compression we haven't got
382 : */
383 GNC 121 : if (AH->PrintTocDataPtr != NULL)
384 ECB : {
385 CBC 14252 : for (te = AH->toc->next; te != AH->toc; te = te->next)
386 EUB : {
387 GIC 14202 : if (te->hadDumper && (te->reqs & REQ_DATA) != 0)
388 : {
389 GNC 71 : char *errmsg = supports_compression(AH->compression_spec);
390 71 : if (errmsg)
391 UNC 0 : pg_fatal("cannot restore from compressed archive (%s)",
392 : errmsg);
393 : else
394 GNC 71 : break;
395 : }
396 ECB : }
397 : }
398 :
399 : /*
400 : * Prepare index arrays, so we can assume we have them throughout restore.
401 : * It's possible we already did this, though.
402 : */
403 GIC 121 : if (AH->tocsByDumpId == NULL)
404 CBC 119 : buildTocEntryArrays(AH);
405 ECB :
406 : /*
407 : * If we're using a DB connection, then connect it.
408 : */
409 GIC 121 : if (ropt->useDB)
410 ECB : {
411 GIC 10 : pg_log_info("connecting to database for restore");
412 CBC 10 : if (AH->version < K_VERS_1_3)
413 LBC 0 : pg_fatal("direct database connections are not supported in pre-1.3 archives");
414 EUB :
415 : /*
416 : * We don't want to guess at whether the dump will successfully
417 : * restore; allow the attempt regardless of the version of the restore
418 : * target.
419 : */
420 GIC 10 : AHX->minRemoteVersion = 0;
421 CBC 10 : AHX->maxRemoteVersion = 9999999;
422 ECB :
423 GIC 10 : ConnectDatabase(AHX, &ropt->cparams, false);
424 ECB :
425 : /*
426 : * If we're talking to the DB directly, don't send comments since they
427 : * obscure SQL when displaying errors
428 : */
429 GIC 10 : AH->noTocComments = 1;
430 ECB : }
431 :
432 : /*
433 : * Work out if we have an implied data-only restore. This can happen if
434 : * the dump was data only or if the user has used a toc list to exclude
435 : * all of the schema data. All we do is look for schema entries - if none
436 : * are found then we set the dataOnly flag.
437 : *
438 : * We could scan for wanted TABLE entries, but that is not the same as
439 : * dataOnly. At this stage, it seems unnecessary (6-Mar-2001).
440 : */
441 GIC 121 : if (!ropt->dataOnly)
442 ECB : {
443 GIC 115 : int impliedDataOnly = 1;
444 ECB :
445 GIC 969 : for (te = AH->toc->next; te != AH->toc; te = te->next)
446 ECB : {
447 GIC 952 : if ((te->reqs & REQ_SCHEMA) != 0)
448 ECB : { /* It's schema, and it's wanted */
449 GIC 98 : impliedDataOnly = 0;
450 CBC 98 : break;
451 ECB : }
452 : }
453 GIC 115 : if (impliedDataOnly)
454 ECB : {
455 GIC 17 : ropt->dataOnly = impliedDataOnly;
456 CBC 17 : pg_log_info("implied data-only restore");
457 ECB : }
458 : }
459 :
460 : /*
461 : * Setup the output file if necessary.
462 : */
463 GIC 121 : sav = SaveOutput(AH);
464 GNC 121 : if (ropt->filename || ropt->compression_spec.algorithm != PG_COMPRESSION_NONE)
465 101 : SetOutput(AH, ropt->filename, ropt->compression_spec);
466 ECB :
467 GIC 121 : ahprintf(AH, "--\n-- PostgreSQL database dump\n--\n\n");
468 ECB :
469 GIC 121 : if (AH->archiveRemoteVersion)
470 CBC 121 : ahprintf(AH, "-- Dumped from database version %s\n",
471 ECB : AH->archiveRemoteVersion);
472 GIC 121 : if (AH->archiveDumpVersion)
473 CBC 121 : ahprintf(AH, "-- Dumped by pg_dump version %s\n",
474 ECB : AH->archiveDumpVersion);
475 :
476 GIC 121 : ahprintf(AH, "\n");
477 ECB :
478 GIC 121 : if (AH->public.verbose)
479 CBC 17 : dumpTimestamp(AH, "Started on", AH->createDate);
480 ECB :
481 GIC 121 : if (ropt->single_txn)
482 ECB : {
483 UIC 0 : if (AH->connection)
484 UBC 0 : StartTransaction(AHX);
485 EUB : else
486 UIC 0 : ahprintf(AH, "BEGIN;\n\n");
487 EUB : }
488 :
489 : /*
490 : * Establish important parameter values right away.
491 : */
492 GIC 121 : _doSetFixedOutputState(AH);
493 ECB :
494 GIC 121 : AH->stage = STAGE_PROCESSING;
495 ECB :
496 : /*
497 : * Drop the items at the start, in reverse order
498 : */
499 GIC 121 : if (ropt->dropSchema)
500 ECB : {
501 GIC 867 : for (te = AH->toc->prev; te != AH->toc; te = te->prev)
502 ECB : {
503 GIC 858 : AH->currentTE = te;
504 ECB :
505 : /*
506 : * In createDB mode, issue a DROP *only* for the database as a
507 : * whole. Issuing drops against anything else would be wrong,
508 : * because at this point we're connected to the wrong database.
509 : * (The DATABASE PROPERTIES entry, if any, should be treated like
510 : * the DATABASE entry.)
511 : */
512 GIC 858 : if (ropt->createDB)
513 ECB : {
514 GIC 299 : if (strcmp(te->desc, "DATABASE") != 0 &&
515 CBC 295 : strcmp(te->desc, "DATABASE PROPERTIES") != 0)
516 293 : continue;
517 ECB : }
518 :
519 : /* Otherwise, drop anything that's selected and has a dropStmt */
520 GIC 565 : if (((te->reqs & (REQ_SCHEMA | REQ_DATA)) != 0) && te->dropStmt)
521 ECB : {
522 GIC 260 : pg_log_info("dropping %s %s", te->desc, te->tag);
523 ECB : /* Select owner and schema as necessary */
524 GIC 260 : _becomeOwner(AH, te);
525 CBC 260 : _selectOutputSchema(AH, te->namespace);
526 ECB :
527 : /*
528 : * Now emit the DROP command, if the object has one. Note we
529 : * don't necessarily emit it verbatim; at this point we add an
530 : * appropriate IF EXISTS clause, if the user requested it.
531 : */
532 GIC 260 : if (*te->dropStmt != '\0')
533 ECB : {
534 GIC 251 : if (!ropt->if_exists ||
535 CBC 123 : strncmp(te->dropStmt, "--", 2) == 0)
536 ECB : {
537 : /*
538 : * Without --if-exists, or if it's just a comment (as
539 : * happens for the public schema), print the dropStmt
540 : * as-is.
541 : */
542 GIC 129 : ahprintf(AH, "%s", te->dropStmt);
543 ECB : }
544 : else
545 : {
546 : /*
547 : * Inject an appropriate spelling of "if exists". For
548 : * large objects, we have a separate routine that
549 : * knows how to do it, without depending on
550 : * te->dropStmt; use that. For other objects we need
551 : * to parse the command.
552 : */
553 GIC 122 : if (strncmp(te->desc, "BLOB", 4) == 0)
554 ECB : {
555 GNC 2 : DropLOIfExists(AH, te->catalogId.oid);
556 ECB : }
557 : else
558 : {
559 GIC 120 : char *dropStmt = pg_strdup(te->dropStmt);
560 CBC 120 : char *dropStmtOrig = dropStmt;
561 120 : PQExpBuffer ftStmt = createPQExpBuffer();
562 ECB :
563 : /*
564 : * Need to inject IF EXISTS clause after ALTER
565 : * TABLE part in ALTER TABLE .. DROP statement
566 : */
567 GIC 120 : if (strncmp(dropStmt, "ALTER TABLE", 11) == 0)
568 ECB : {
569 GIC 15 : appendPQExpBufferStr(ftStmt,
570 ECB : "ALTER TABLE IF EXISTS");
571 GIC 15 : dropStmt = dropStmt + 11;
572 ECB : }
573 :
574 : /*
575 : * ALTER TABLE..ALTER COLUMN..DROP DEFAULT does
576 : * not support the IF EXISTS clause, and therefore
577 : * we simply emit the original command for DEFAULT
578 : * objects (modulo the adjustment made above).
579 : *
580 : * Likewise, don't mess with DATABASE PROPERTIES.
581 : *
582 : * If we used CREATE OR REPLACE VIEW as a means of
583 : * quasi-dropping an ON SELECT rule, that should
584 : * be emitted unchanged as well.
585 : *
586 : * For other object types, we need to extract the
587 : * first part of the DROP which includes the
588 : * object type. Most of the time this matches
589 : * te->desc, so search for that; however for the
590 : * different kinds of CONSTRAINTs, we know to
591 : * search for hardcoded "DROP CONSTRAINT" instead.
592 : */
593 GIC 120 : if (strcmp(te->desc, "DEFAULT") == 0 ||
594 CBC 117 : strcmp(te->desc, "DATABASE PROPERTIES") == 0 ||
595 117 : strncmp(dropStmt, "CREATE OR REPLACE VIEW", 22) == 0)
596 3 : appendPQExpBufferStr(ftStmt, dropStmt);
597 ECB : else
598 : {
599 : char buffer[40];
600 : char *mark;
601 :
602 GIC 117 : if (strcmp(te->desc, "CONSTRAINT") == 0 ||
603 CBC 107 : strcmp(te->desc, "CHECK CONSTRAINT") == 0 ||
604 GNC 107 : strcmp(te->desc, "NOT NULL CONSTRAINT") == 0 ||
605 CBC 107 : strcmp(te->desc, "FK CONSTRAINT") == 0)
606 12 : strcpy(buffer, "DROP CONSTRAINT");
607 ECB : else
608 CBC 105 : snprintf(buffer, sizeof(buffer), "DROP %s",
609 : te->desc);
610 ECB :
611 GIC 117 : mark = strstr(dropStmt, buffer);
612 :
613 CBC 117 : if (mark)
614 : {
615 117 : *mark = '\0';
616 GIC 117 : appendPQExpBuffer(ftStmt, "%s%s IF EXISTS%s",
617 ECB : dropStmt, buffer,
618 CBC 117 : mark + strlen(buffer));
619 : }
620 ECB : else
621 : {
622 : /* complain and emit unmodified command */
623 UIC 0 : pg_log_warning("could not find where to insert IF EXISTS in statement \"%s\"",
624 : dropStmtOrig);
625 UBC 0 : appendPQExpBufferStr(ftStmt, dropStmt);
626 : }
627 EUB : }
628 :
629 GIC 120 : ahprintf(AH, "%s", ftStmt->data);
630 :
631 CBC 120 : destroyPQExpBuffer(ftStmt);
632 GIC 120 : pg_free(dropStmtOrig);
633 ECB : }
634 : }
635 : }
636 : }
637 : }
638 :
639 : /*
640 : * _selectOutputSchema may have set currSchema to reflect the effect
641 : * of a "SET search_path" command it emitted. However, by now we may
642 : * have dropped that schema; or it might not have existed in the first
643 : * place. In either case the effective value of search_path will not
644 : * be what we think. Forcibly reset currSchema so that we will
645 : * re-establish the search_path setting when needed (after creating
646 : * the schema).
647 : *
648 : * If we treated users as pg_dump'able objects then we'd need to reset
649 : * currUser here too.
650 : */
651 GNC 9 : free(AH->currSchema);
652 CBC 9 : AH->currSchema = NULL;
653 ECB : }
654 :
655 GIC 121 : if (parallel_mode)
656 ECB : {
657 : /*
658 : * In parallel mode, turn control over to the parallel-restore logic.
659 : */
660 : ParallelState *pstate;
661 : TocEntry pending_list;
662 :
663 : /* The archive format module may need some setup for this */
664 GIC 4 : if (AH->PrepParallelRestorePtr)
665 CBC 4 : AH->PrepParallelRestorePtr(AH);
666 ECB :
667 GIC 4 : pending_list_header_init(&pending_list);
668 ECB :
669 : /* This runs PRE_DATA items and then disconnects from the database */
670 GIC 4 : restore_toc_entries_prefork(AH, &pending_list);
671 CBC 4 : Assert(AH->connection == NULL);
672 ECB :
673 : /* ParallelBackupStart() will actually fork the processes */
674 GIC 4 : pstate = ParallelBackupStart(AH);
675 CBC 4 : restore_toc_entries_parallel(AH, pstate, &pending_list);
676 4 : ParallelBackupEnd(AH, pstate);
677 ECB :
678 : /* reconnect the leader and see if we missed something */
679 GIC 4 : restore_toc_entries_postfork(AH, &pending_list);
680 CBC 4 : Assert(AH->connection != NULL);
681 ECB : }
682 : else
683 : {
684 : /*
685 : * In serial mode, process everything in three phases: normal items,
686 : * then ACLs, then post-ACL items. We might be able to skip one or
687 : * both extra phases in some cases, eg data-only restores.
688 : */
689 GIC 117 : bool haveACL = false;
690 CBC 117 : bool havePostACL = false;
691 ECB :
692 GIC 21630 : for (te = AH->toc->next; te != AH->toc; te = te->next)
693 ECB : {
694 GIC 21514 : if ((te->reqs & (REQ_SCHEMA | REQ_DATA)) == 0)
695 CBC 949 : continue; /* ignore if not to be dumped at all */
696 ECB :
697 GIC 20565 : switch (_tocEntryRestorePass(te))
698 ECB : {
699 GIC 18337 : case RESTORE_PASS_MAIN:
700 CBC 18337 : (void) restore_toc_entry(AH, te, false);
701 18336 : break;
702 1964 : case RESTORE_PASS_ACL:
703 1964 : haveACL = true;
704 1964 : break;
705 264 : case RESTORE_PASS_POST_ACL:
706 264 : havePostACL = true;
707 264 : break;
708 ECB : }
709 : }
710 :
711 GIC 116 : if (haveACL)
712 ECB : {
713 GIC 20849 : for (te = AH->toc->next; te != AH->toc; te = te->next)
714 ECB : {
715 GIC 41024 : if ((te->reqs & (REQ_SCHEMA | REQ_DATA)) != 0 &&
716 CBC 20241 : _tocEntryRestorePass(te) == RESTORE_PASS_ACL)
717 1964 : (void) restore_toc_entry(AH, te, false);
718 ECB : }
719 : }
720 :
721 GIC 116 : if (havePostACL)
722 ECB : {
723 GIC 18320 : for (te = AH->toc->next; te != AH->toc; te = te->next)
724 ECB : {
725 GIC 36230 : if ((te->reqs & (REQ_SCHEMA | REQ_DATA)) != 0 &&
726 CBC 17950 : _tocEntryRestorePass(te) == RESTORE_PASS_POST_ACL)
727 264 : (void) restore_toc_entry(AH, te, false);
728 ECB : }
729 : }
730 : }
731 :
732 GIC 120 : if (ropt->single_txn)
733 ECB : {
734 UIC 0 : if (AH->connection)
735 UBC 0 : CommitTransaction(AHX);
736 EUB : else
737 UIC 0 : ahprintf(AH, "COMMIT;\n\n");
738 EUB : }
739 :
740 GIC 120 : if (AH->public.verbose)
741 CBC 17 : dumpTimestamp(AH, "Completed on", time(NULL));
742 ECB :
743 GIC 120 : ahprintf(AH, "--\n-- PostgreSQL database dump complete\n--\n\n");
744 ECB :
745 : /*
746 : * Clean up & we're done.
747 : */
748 GIC 120 : AH->stage = STAGE_FINALIZING;
749 ECB :
750 GNC 120 : if (ropt->filename || ropt->compression_spec.algorithm != PG_COMPRESSION_NONE)
751 CBC 101 : RestoreOutput(AH, sav);
752 ECB :
753 GIC 120 : if (ropt->useDB)
754 CBC 10 : DisconnectDatabase(&AH->public);
755 120 : }
756 ECB :
757 : /*
758 : * Restore a single TOC item. Used in both parallel and non-parallel restore;
759 : * is_parallel is true if we are in a worker child process.
760 : *
761 : * Returns 0 normally, but WORKER_CREATE_DONE or WORKER_INHIBIT_DATA if
762 : * the parallel parent has to make the corresponding status update.
763 : */
764 : static int
765 GIC 20661 : restore_toc_entry(ArchiveHandle *AH, TocEntry *te, bool is_parallel)
766 ECB : {
767 GIC 20661 : RestoreOptions *ropt = AH->public.ropt;
768 CBC 20661 : int status = WORKER_OK;
769 ECB : int reqs;
770 : bool defnDumped;
771 :
772 GIC 20661 : AH->currentTE = te;
773 ECB :
774 : /* Dump any relevant dump warnings to stderr */
775 GIC 20661 : if (!ropt->suppressDumpWarnings && strcmp(te->desc, "WARNING") == 0)
776 ECB : {
777 UIC 0 : if (!ropt->dataOnly && te->defn != NULL && strlen(te->defn) != 0)
778 UBC 0 : pg_log_warning("warning from original dump file: %s", te->defn);
779 0 : else if (te->copyStmt != NULL && strlen(te->copyStmt) != 0)
780 0 : pg_log_warning("warning from original dump file: %s", te->copyStmt);
781 EUB : }
782 :
783 : /* Work out what, if anything, we want from this entry */
784 GIC 20661 : reqs = te->reqs;
785 ECB :
786 GIC 20661 : defnDumped = false;
787 ECB :
788 : /*
789 : * If it has a schema component that we want, then process that
790 : */
791 GIC 20661 : if ((reqs & REQ_SCHEMA) != 0)
792 ECB : {
793 : /* Show namespace in log message if available */
794 GIC 17039 : if (te->namespace)
795 CBC 15867 : pg_log_info("creating %s \"%s.%s\"",
796 ECB : te->desc, te->namespace, te->tag);
797 : else
798 GIC 1172 : pg_log_info("creating %s \"%s\"",
799 ECB : te->desc, te->tag);
800 :
801 GIC 17039 : _printTocEntry(AH, te, false);
802 CBC 17039 : defnDumped = true;
803 ECB :
804 GIC 17039 : if (strcmp(te->desc, "TABLE") == 0)
805 ECB : {
806 GIC 4036 : if (AH->lastErrorTE == te)
807 ECB : {
808 : /*
809 : * We failed to create the table. If
810 : * --no-data-for-failed-tables was given, mark the
811 : * corresponding TABLE DATA to be ignored.
812 : *
813 : * In the parallel case this must be done in the parent, so we
814 : * just set the return value.
815 : */
816 UIC 0 : if (ropt->noDataForFailedTables)
817 EUB : {
818 UIC 0 : if (is_parallel)
819 UBC 0 : status = WORKER_INHIBIT_DATA;
820 EUB : else
821 UIC 0 : inhibit_data_for_failed_table(AH, te);
822 EUB : }
823 : }
824 : else
825 : {
826 : /*
827 : * We created the table successfully. Mark the corresponding
828 : * TABLE DATA for possible truncation.
829 : *
830 : * In the parallel case this must be done in the parent, so we
831 : * just set the return value.
832 : */
833 GIC 4036 : if (is_parallel)
834 LBC 0 : status = WORKER_CREATE_DONE;
835 EUB : else
836 GIC 4036 : mark_create_done(AH, te);
837 ECB : }
838 : }
839 :
840 : /*
841 : * If we created a DB, connect to it. Also, if we changed DB
842 : * properties, reconnect to ensure that relevant GUC settings are
843 : * applied to our session.
844 : */
845 GIC 17039 : if (strcmp(te->desc, "DATABASE") == 0 ||
846 CBC 17005 : strcmp(te->desc, "DATABASE PROPERTIES") == 0)
847 ECB : {
848 GIC 44 : pg_log_info("connecting to new database \"%s\"", te->tag);
849 CBC 44 : _reconnectToDB(AH, te->tag);
850 ECB : }
851 : }
852 :
853 : /*
854 : * If it has a data component that we want, then process that
855 : */
856 GIC 20661 : if ((reqs & REQ_DATA) != 0)
857 ECB : {
858 : /*
859 : * hadDumper will be set if there is genuine data component for this
860 : * node. Otherwise, we need to check the defn field for statements
861 : * that need to be executed in data-only restores.
862 : */
863 GIC 3607 : if (te->hadDumper)
864 ECB : {
865 : /*
866 : * If we can output the data, then restore it.
867 : */
868 GIC 3106 : if (AH->PrintTocDataPtr != NULL)
869 ECB : {
870 GIC 3106 : _printTocEntry(AH, te, true);
871 ECB :
872 GIC 3106 : if (strcmp(te->desc, "BLOBS") == 0 ||
873 CBC 3067 : strcmp(te->desc, "BLOB COMMENTS") == 0)
874 ECB : {
875 GIC 39 : pg_log_info("processing %s", te->desc);
876 ECB :
877 GIC 39 : _selectOutputSchema(AH, "pg_catalog");
878 ECB :
879 : /* Send BLOB COMMENTS data to ExecuteSimpleCommands() */
880 GIC 39 : if (strcmp(te->desc, "BLOB COMMENTS") == 0)
881 LBC 0 : AH->outputKind = OUTPUT_OTHERDATA;
882 EUB :
883 GIC 39 : AH->PrintTocDataPtr(AH, te);
884 ECB :
885 GIC 39 : AH->outputKind = OUTPUT_SQLCMDS;
886 ECB : }
887 : else
888 : {
889 : bool use_truncate;
890 :
891 GIC 3067 : _disableTriggersIfNecessary(AH, te);
892 ECB :
893 : /* Select owner and schema as necessary */
894 GIC 3067 : _becomeOwner(AH, te);
895 CBC 3067 : _selectOutputSchema(AH, te->namespace);
896 ECB :
897 GIC 3067 : pg_log_info("processing data for table \"%s.%s\"",
898 ECB : te->namespace, te->tag);
899 :
900 : /*
901 : * In parallel restore, if we created the table earlier in
902 : * this run (so that we know it is empty) and we are not
903 : * restoring a load-via-partition-root data item then we
904 : * wrap the COPY in a transaction and precede it with a
905 : * TRUNCATE. If wal_level is set to minimal this prevents
906 : * WAL-logging the COPY. This obtains a speedup similar
907 : * to that from using single_txn mode in non-parallel
908 : * restores.
909 : *
910 : * We mustn't do this for load-via-partition-root cases
911 : * because some data might get moved across partition
912 : * boundaries, risking deadlock and/or loss of previously
913 : * loaded data. (We assume that all partitions of a
914 : * partitioned table will be treated the same way.)
915 : */
916 GIC 3083 : use_truncate = is_parallel && te->created &&
917 CBC 16 : !is_load_via_partition_root(te);
918 ECB :
919 GIC 3067 : if (use_truncate)
920 ECB : {
921 : /*
922 : * Parallel restore is always talking directly to a
923 : * server, so no need to see if we should issue BEGIN.
924 : */
925 GIC 10 : StartTransaction(&AH->public);
926 ECB :
927 : /*
928 : * Issue TRUNCATE with ONLY so that child tables are
929 : * not wiped.
930 : */
931 GIC 10 : ahprintf(AH, "TRUNCATE TABLE ONLY %s;\n\n",
932 CBC 10 : fmtQualifiedId(te->namespace, te->tag));
933 ECB : }
934 :
935 : /*
936 : * If we have a copy statement, use it.
937 : */
938 GIC 3067 : if (te->copyStmt && strlen(te->copyStmt) > 0)
939 ECB : {
940 GIC 3010 : ahprintf(AH, "%s", te->copyStmt);
941 CBC 3010 : AH->outputKind = OUTPUT_COPYDATA;
942 ECB : }
943 : else
944 GIC 57 : AH->outputKind = OUTPUT_OTHERDATA;
945 ECB :
946 GIC 3067 : AH->PrintTocDataPtr(AH, te);
947 ECB :
948 : /*
949 : * Terminate COPY if needed.
950 : */
951 GIC 6075 : if (AH->outputKind == OUTPUT_COPYDATA &&
952 CBC 3009 : RestoringToDB(AH))
953 9 : EndDBCopyMode(&AH->public, te->tag);
954 3066 : AH->outputKind = OUTPUT_SQLCMDS;
955 ECB :
956 : /* close out the transaction started above */
957 GIC 3066 : if (use_truncate)
958 CBC 10 : CommitTransaction(&AH->public);
959 ECB :
960 GIC 3066 : _enableTriggersIfNecessary(AH, te);
961 ECB : }
962 : }
963 : }
964 GIC 501 : else if (!defnDumped)
965 ECB : {
966 : /* If we haven't already dumped the defn part, do so now */
967 GIC 501 : pg_log_info("executing %s %s", te->desc, te->tag);
968 CBC 501 : _printTocEntry(AH, te, false);
969 ECB : }
970 : }
971 :
972 GIC 20660 : if (AH->public.n_errors > 0 && status == WORKER_OK)
973 LBC 0 : status = WORKER_IGNORED_ERRORS;
974 EUB :
975 GIC 20660 : return status;
976 ECB : }
977 :
978 : /*
979 : * Allocate a new RestoreOptions block.
980 : * This is mainly so we can initialize it, but also for future expansion,
981 : */
982 : RestoreOptions *
983 GIC 166 : NewRestoreOptions(void)
984 ECB : {
985 : RestoreOptions *opts;
986 :
987 GIC 166 : opts = (RestoreOptions *) pg_malloc0(sizeof(RestoreOptions));
988 ECB :
989 : /* set any fields that shouldn't default to zeroes */
990 GIC 166 : opts->format = archUnknown;
991 CBC 166 : opts->cparams.promptPassword = TRI_DEFAULT;
992 166 : opts->dumpSections = DUMP_UNSECTIONED;
993 GNC 166 : opts->compression_spec.algorithm = PG_COMPRESSION_NONE;
994 166 : opts->compression_spec.level = 0;
995 ECB :
996 CBC 166 : return opts;
997 ECB : }
998 :
999 : static void
1000 GIC 3067 : _disableTriggersIfNecessary(ArchiveHandle *AH, TocEntry *te)
1001 : {
1002 3067 : RestoreOptions *ropt = AH->public.ropt;
1003 ECB :
1004 : /* This hack is only needed in a data-only restore */
1005 CBC 3067 : if (!ropt->dataOnly || !ropt->disable_triggers)
1006 GIC 3043 : return;
1007 :
1008 CBC 24 : pg_log_info("disabling triggers for %s", te->tag);
1009 ECB :
1010 : /*
1011 : * Become superuser if possible, since they are the only ones who can
1012 : * disable constraint triggers. If -S was not given, assume the initial
1013 : * user identity is a superuser. (XXX would it be better to become the
1014 : * table owner?)
1015 : */
1016 GIC 24 : _becomeUser(AH, ropt->superuser);
1017 :
1018 : /*
1019 ECB : * Disable them.
1020 : */
1021 GIC 24 : ahprintf(AH, "ALTER TABLE %s DISABLE TRIGGER ALL;\n\n",
1022 24 : fmtQualifiedId(te->namespace, te->tag));
1023 : }
1024 ECB :
1025 : static void
1026 GIC 3066 : _enableTriggersIfNecessary(ArchiveHandle *AH, TocEntry *te)
1027 : {
1028 3066 : RestoreOptions *ropt = AH->public.ropt;
1029 ECB :
1030 : /* This hack is only needed in a data-only restore */
1031 CBC 3066 : if (!ropt->dataOnly || !ropt->disable_triggers)
1032 GIC 3042 : return;
1033 :
1034 CBC 24 : pg_log_info("enabling triggers for %s", te->tag);
1035 ECB :
1036 : /*
1037 : * Become superuser if possible, since they are the only ones who can
1038 : * disable constraint triggers. If -S was not given, assume the initial
1039 : * user identity is a superuser. (XXX would it be better to become the
1040 : * table owner?)
1041 : */
1042 GIC 24 : _becomeUser(AH, ropt->superuser);
1043 :
1044 : /*
1045 ECB : * Enable them.
1046 : */
1047 GIC 24 : ahprintf(AH, "ALTER TABLE %s ENABLE TRIGGER ALL;\n\n",
1048 24 : fmtQualifiedId(te->namespace, te->tag));
1049 : }
1050 ECB :
1051 : /*
1052 : * Detect whether a TABLE DATA TOC item is performing "load via partition
1053 : * root", that is the target table is an ancestor partition rather than the
1054 : * table the TOC item is nominally for.
1055 : *
1056 : * In newer archive files this can be detected by checking for a special
1057 : * comment placed in te->defn. In older files we have to fall back to seeing
1058 : * if the COPY statement targets the named table or some other one. This
1059 : * will not work for data dumped as INSERT commands, so we could give a false
1060 : * negative in that case; fortunately, that's a rarely-used option.
1061 : */
1062 : static bool
1063 GIC 16 : is_load_via_partition_root(TocEntry *te)
1064 : {
1065 16 : if (te->defn &&
1066 CBC 6 : strncmp(te->defn, "-- load via partition root ", 27) == 0)
1067 GIC 6 : return true;
1068 CBC 10 : if (te->copyStmt && *te->copyStmt)
1069 ECB : {
1070 CBC 6 : PQExpBuffer copyStmt = createPQExpBuffer();
1071 ECB : bool result;
1072 :
1073 : /*
1074 : * Build the initial part of the COPY as it would appear if the
1075 : * nominal target table is the actual target. If we see anything
1076 : * else, it must be a load-via-partition-root case.
1077 : */
1078 GIC 6 : appendPQExpBuffer(copyStmt, "COPY %s ",
1079 6 : fmtQualifiedId(te->namespace, te->tag));
1080 6 : result = strncmp(te->copyStmt, copyStmt->data, copyStmt->len) != 0;
1081 CBC 6 : destroyPQExpBuffer(copyStmt);
1082 6 : return result;
1083 ECB : }
1084 : /* Assume it's not load-via-partition-root */
1085 CBC 4 : return false;
1086 : }
1087 :
1088 ECB : /*
1089 : * This is a routine that is part of the dumper interface, hence the 'Archive*' parameter.
1090 : */
1091 :
1092 : /* Public */
1093 : void
1094 GIC 1716439 : WriteData(Archive *AHX, const void *data, size_t dLen)
1095 : {
1096 1716439 : ArchiveHandle *AH = (ArchiveHandle *) AHX;
1097 ECB :
1098 GIC 1716439 : if (!AH->currToc)
1099 LBC 0 : pg_fatal("internal error -- WriteData cannot be called outside the context of a DataDumper routine");
1100 :
1101 CBC 1716439 : AH->WriteDataPtr(AH, data, dLen);
1102 GBC 1716439 : }
1103 :
1104 ECB : /*
1105 : * Create a new TOC entry. The TOC was designed as a TOC, but is now the
1106 : * repository for all metadata. But the name has stuck.
1107 : *
1108 : * The new entry is added to the Archive's TOC list. Most callers can ignore
1109 : * the result value because nothing else need be done, but a few want to
1110 : * manipulate the TOC entry further.
1111 : */
1112 :
1113 : /* Public */
1114 : TocEntry *
1115 GIC 21324 : ArchiveEntry(Archive *AHX, CatalogId catalogId, DumpId dumpId,
1116 : ArchiveOpts *opts)
1117 : {
1118 CBC 21324 : ArchiveHandle *AH = (ArchiveHandle *) AHX;
1119 : TocEntry *newToc;
1120 :
1121 21324 : newToc = (TocEntry *) pg_malloc0(sizeof(TocEntry));
1122 :
1123 GIC 21324 : AH->tocCount++;
1124 CBC 21324 : if (dumpId > AH->maxDumpId)
1125 GIC 3626 : AH->maxDumpId = dumpId;
1126 ECB :
1127 CBC 21324 : newToc->prev = AH->toc->prev;
1128 21324 : newToc->next = AH->toc;
1129 GIC 21324 : AH->toc->prev->next = newToc;
1130 CBC 21324 : AH->toc->prev = newToc;
1131 ECB :
1132 CBC 21324 : newToc->catalogId = catalogId;
1133 21324 : newToc->dumpId = dumpId;
1134 GIC 21324 : newToc->section = opts->section;
1135 ECB :
1136 CBC 21324 : newToc->tag = pg_strdup(opts->tag);
1137 21324 : newToc->namespace = opts->namespace ? pg_strdup(opts->namespace) : NULL;
1138 GIC 21324 : newToc->tablespace = opts->tablespace ? pg_strdup(opts->tablespace) : NULL;
1139 CBC 21324 : newToc->tableam = opts->tableam ? pg_strdup(opts->tableam) : NULL;
1140 21324 : newToc->owner = opts->owner ? pg_strdup(opts->owner) : NULL;
1141 21324 : newToc->desc = pg_strdup(opts->description);
1142 21324 : newToc->defn = opts->createStmt ? pg_strdup(opts->createStmt) : NULL;
1143 21324 : newToc->dropStmt = opts->dropStmt ? pg_strdup(opts->dropStmt) : NULL;
1144 21324 : newToc->copyStmt = opts->copyStmt ? pg_strdup(opts->copyStmt) : NULL;
1145 ECB :
1146 CBC 21324 : if (opts->nDeps > 0)
1147 ECB : {
1148 GIC 6569 : newToc->dependencies = (DumpId *) pg_malloc(opts->nDeps * sizeof(DumpId));
1149 CBC 6569 : memcpy(newToc->dependencies, opts->deps, opts->nDeps * sizeof(DumpId));
1150 GIC 6569 : newToc->nDeps = opts->nDeps;
1151 ECB : }
1152 : else
1153 : {
1154 GIC 14755 : newToc->dependencies = NULL;
1155 14755 : newToc->nDeps = 0;
1156 : }
1157 ECB :
1158 CBC 21324 : newToc->dataDumper = opts->dumpFn;
1159 GIC 21324 : newToc->dataDumperArg = opts->dumpArg;
1160 21324 : newToc->hadDumper = opts->dumpFn ? true : false;
1161 ECB :
1162 CBC 21324 : newToc->formatData = NULL;
1163 21324 : newToc->dataLength = 0;
1164 :
1165 21324 : if (AH->ArchiveEntryPtr != NULL)
1166 4931 : AH->ArchiveEntryPtr(AH, newToc);
1167 :
1168 21324 : return newToc;
1169 ECB : }
1170 :
1171 : /* Public */
1172 : void
1173 GIC 5 : PrintTOCSummary(Archive *AHX)
1174 : {
1175 5 : ArchiveHandle *AH = (ArchiveHandle *) AHX;
1176 CBC 5 : RestoreOptions *ropt = AH->public.ropt;
1177 : TocEntry *te;
1178 GNC 5 : pg_compress_specification out_compression_spec = {0};
1179 ECB : teSection curSection;
1180 : CompressFileHandle *sav;
1181 : const char *fmtName;
1182 : char stamp_str[64];
1183 :
1184 : /* TOC is always uncompressed */
1185 GNC 5 : out_compression_spec.algorithm = PG_COMPRESSION_NONE;
1186 :
1187 GIC 5 : sav = SaveOutput(AH);
1188 5 : if (ropt->filename)
1189 UNC 0 : SetOutput(AH, ropt->filename, out_compression_spec);
1190 :
1191 GIC 5 : if (strftime(stamp_str, sizeof(stamp_str), PGDUMP_STRFTIME_FMT,
1192 CBC 5 : localtime(&AH->createDate)) == 0)
1193 UIC 0 : strcpy(stamp_str, "[unknown]");
1194 ECB :
1195 CBC 5 : ahprintf(AH, ";\n; Archive created at %s\n", stamp_str);
1196 GNC 10 : ahprintf(AH, "; dbname: %s\n; TOC Entries: %d\n; Compression: %s\n",
1197 GIC 5 : sanitize_line(AH->archdbname, false),
1198 : AH->tocCount,
1199 : get_compress_algorithm_name(AH->compression_spec.algorithm));
1200 ECB :
1201 GBC 5 : switch (AH->format)
1202 : {
1203 CBC 4 : case archCustom:
1204 4 : fmtName = "CUSTOM";
1205 4 : break;
1206 GIC 1 : case archDirectory:
1207 1 : fmtName = "DIRECTORY";
1208 1 : break;
1209 LBC 0 : case archTar:
1210 UIC 0 : fmtName = "TAR";
1211 LBC 0 : break;
1212 0 : default:
1213 0 : fmtName = "UNKNOWN";
1214 ECB : }
1215 :
1216 CBC 5 : ahprintf(AH, "; Dump Version: %d.%d-%d\n",
1217 GBC 5 : ARCHIVE_MAJOR(AH->version), ARCHIVE_MINOR(AH->version), ARCHIVE_REV(AH->version));
1218 5 : ahprintf(AH, "; Format: %s\n", fmtName);
1219 5 : ahprintf(AH, "; Integer: %d bytes\n", (int) AH->intSize);
1220 5 : ahprintf(AH, "; Offset: %d bytes\n", (int) AH->offSize);
1221 5 : if (AH->archiveRemoteVersion)
1222 GIC 5 : ahprintf(AH, "; Dumped from database version: %s\n",
1223 : AH->archiveRemoteVersion);
1224 CBC 5 : if (AH->archiveDumpVersion)
1225 5 : ahprintf(AH, "; Dumped by pg_dump version: %s\n",
1226 ECB : AH->archiveDumpVersion);
1227 :
1228 CBC 5 : ahprintf(AH, ";\n;\n; Selected TOC Entries:\n;\n");
1229 ECB :
1230 CBC 5 : curSection = SECTION_PRE_DATA;
1231 GIC 1310 : for (te = AH->toc->next; te != AH->toc; te = te->next)
1232 ECB : {
1233 CBC 1305 : if (te->section != SECTION_NONE)
1234 GIC 950 : curSection = te->section;
1235 1305 : if (ropt->verbose ||
1236 CBC 1305 : (_tocEntryRequired(te, curSection, AH) & (REQ_SCHEMA | REQ_DATA)) != 0)
1237 : {
1238 ECB : char *sanitized_name;
1239 : char *sanitized_schema;
1240 : char *sanitized_owner;
1241 :
1242 : /*
1243 : */
1244 CBC 1280 : sanitized_name = sanitize_line(te->tag, false);
1245 GIC 1280 : sanitized_schema = sanitize_line(te->namespace, true);
1246 1280 : sanitized_owner = sanitize_line(te->owner, false);
1247 :
1248 1280 : ahprintf(AH, "%d; %u %u %s %s %s %s\n", te->dumpId,
1249 : te->catalogId.tableoid, te->catalogId.oid,
1250 : te->desc, sanitized_schema, sanitized_name,
1251 : sanitized_owner);
1252 ECB :
1253 CBC 1280 : free(sanitized_name);
1254 1280 : free(sanitized_schema);
1255 GIC 1280 : free(sanitized_owner);
1256 ECB : }
1257 GIC 1305 : if (ropt->verbose && te->nDeps > 0)
1258 : {
1259 : int i;
1260 :
1261 LBC 0 : ahprintf(AH, ";\tdepends on:");
1262 0 : for (i = 0; i < te->nDeps; i++)
1263 0 : ahprintf(AH, " %d", te->dependencies[i]);
1264 UIC 0 : ahprintf(AH, "\n");
1265 ECB : }
1266 : }
1267 :
1268 : /* Enforce strict names checking */
1269 GBC 5 : if (ropt->strict_names)
1270 UBC 0 : StrictNamesCheck(ropt);
1271 EUB :
1272 GBC 5 : if (ropt->filename)
1273 UIC 0 : RestoreOutput(AH, sav);
1274 GIC 5 : }
1275 :
1276 : /***********
1277 : * Large Object Archival
1278 EUB : ***********/
1279 :
1280 : /* Called by a dumper to signal start of a LO */
1281 : int
1282 GNC 80 : StartLO(Archive *AHX, Oid oid)
1283 : {
1284 GIC 80 : ArchiveHandle *AH = (ArchiveHandle *) AHX;
1285 :
1286 GNC 80 : if (!AH->StartLOPtr)
1287 UIC 0 : pg_fatal("large-object output not supported in chosen format");
1288 :
1289 GNC 80 : AH->StartLOPtr(AH, AH->currToc, oid);
1290 ECB :
1291 GIC 80 : return 1;
1292 ECB : }
1293 :
1294 : /* Called by a dumper to signal end of a LO */
1295 EUB : int
1296 GNC 80 : EndLO(Archive *AHX, Oid oid)
1297 ECB : {
1298 GIC 80 : ArchiveHandle *AH = (ArchiveHandle *) AHX;
1299 ECB :
1300 GNC 80 : if (AH->EndLOPtr)
1301 80 : AH->EndLOPtr(AH, AH->currToc, oid);
1302 :
1303 GIC 80 : return 1;
1304 ECB : }
1305 :
1306 : /**********
1307 : * Large Object Restoration
1308 : **********/
1309 :
1310 : /*
1311 : * Called by a format handler before any LOs are restored
1312 : */
1313 : void
1314 GNC 10 : StartRestoreLOs(ArchiveHandle *AH)
1315 : {
1316 GIC 10 : RestoreOptions *ropt = AH->public.ropt;
1317 :
1318 10 : if (!ropt->single_txn)
1319 : {
1320 10 : if (AH->connection)
1321 UIC 0 : StartTransaction(&AH->public);
1322 ECB : else
1323 GIC 10 : ahprintf(AH, "BEGIN;\n\n");
1324 ECB : }
1325 :
1326 GNC 10 : AH->loCount = 0;
1327 GIC 10 : }
1328 ECB :
1329 EUB : /*
1330 : * Called by a format handler after all LOs are restored
1331 ECB : */
1332 : void
1333 GNC 10 : EndRestoreLOs(ArchiveHandle *AH)
1334 ECB : {
1335 CBC 10 : RestoreOptions *ropt = AH->public.ropt;
1336 :
1337 GIC 10 : if (!ropt->single_txn)
1338 : {
1339 10 : if (AH->connection)
1340 UIC 0 : CommitTransaction(&AH->public);
1341 ECB : else
1342 GIC 10 : ahprintf(AH, "COMMIT;\n\n");
1343 ECB : }
1344 :
1345 CBC 10 : pg_log_info(ngettext("restored %d large object",
1346 : "restored %d large objects",
1347 : AH->loCount),
1348 : AH->loCount);
1349 GIC 10 : }
1350 ECB :
1351 :
1352 : /*
1353 : * Called by a format handler to initiate restoration of a LO
1354 : */
1355 : void
1356 GNC 20 : StartRestoreLO(ArchiveHandle *AH, Oid oid, bool drop)
1357 ECB : {
1358 GNC 20 : bool old_lo_style = (AH->version < K_VERS_1_12);
1359 : Oid loOid;
1360 :
1361 20 : AH->loCount++;
1362 :
1363 : /* Initialize the LO Buffer */
1364 CBC 20 : AH->lo_buf_used = 0;
1365 :
1366 20 : pg_log_info("restoring large object with OID %u", oid);
1367 :
1368 : /* With an old archive we must do drop and create logic here */
1369 GNC 20 : if (old_lo_style && drop)
1370 UNC 0 : DropLOIfExists(AH, oid);
1371 :
1372 CBC 20 : if (AH->connection)
1373 : {
1374 UNC 0 : if (old_lo_style)
1375 : {
1376 UIC 0 : loOid = lo_create(AH->connection, oid);
1377 LBC 0 : if (loOid == 0 || loOid != oid)
1378 UBC 0 : pg_fatal("could not create large object %u: %s",
1379 : oid, PQerrorMessage(AH->connection));
1380 ECB : }
1381 UIC 0 : AH->loFd = lo_open(AH->connection, oid, INV_WRITE);
1382 UBC 0 : if (AH->loFd == -1)
1383 UIC 0 : pg_fatal("could not open large object %u: %s",
1384 EUB : oid, PQerrorMessage(AH->connection));
1385 : }
1386 : else
1387 : {
1388 GNC 20 : if (old_lo_style)
1389 UBC 0 : ahprintf(AH, "SELECT pg_catalog.lo_open(pg_catalog.lo_create('%u'), %d);\n",
1390 EUB : oid, INV_WRITE);
1391 : else
1392 GIC 20 : ahprintf(AH, "SELECT pg_catalog.lo_open('%u', %d);\n",
1393 : oid, INV_WRITE);
1394 : }
1395 :
1396 GNC 20 : AH->writingLO = true;
1397 GBC 20 : }
1398 :
1399 : void
1400 GNC 20 : EndRestoreLO(ArchiveHandle *AH, Oid oid)
1401 : {
1402 GIC 20 : if (AH->lo_buf_used > 0)
1403 : {
1404 ECB : /* Write remaining bytes from the LO buffer */
1405 CBC 10 : dump_lo_buf(AH);
1406 : }
1407 :
1408 GNC 20 : AH->writingLO = false;
1409 :
1410 CBC 20 : if (AH->connection)
1411 : {
1412 UIC 0 : lo_close(AH->connection, AH->loFd);
1413 LBC 0 : AH->loFd = -1;
1414 : }
1415 : else
1416 ECB : {
1417 GIC 20 : ahprintf(AH, "SELECT pg_catalog.lo_close(0);\n\n");
1418 ECB : }
1419 GIC 20 : }
1420 EUB :
1421 : /***********
1422 : * Sorting and Reordering
1423 : ***********/
1424 :
1425 ECB : void
1426 UIC 0 : SortTocFromFile(Archive *AHX)
1427 ECB : {
1428 UIC 0 : ArchiveHandle *AH = (ArchiveHandle *) AHX;
1429 0 : RestoreOptions *ropt = AH->public.ropt;
1430 : FILE *fh;
1431 : StringInfoData linebuf;
1432 :
1433 : /* Allocate space for the 'wanted' array, and init it */
1434 UBC 0 : ropt->idWanted = (bool *) pg_malloc0(sizeof(bool) * AH->maxDumpId);
1435 :
1436 EUB : /* Setup the file */
1437 UBC 0 : fh = fopen(ropt->tocFile, PG_BINARY_R);
1438 UIC 0 : if (!fh)
1439 0 : pg_fatal("could not open TOC file \"%s\": %m", ropt->tocFile);
1440 :
1441 0 : initStringInfo(&linebuf);
1442 EUB :
1443 UIC 0 : while (pg_get_line_buf(fh, &linebuf))
1444 : {
1445 EUB : char *cmnt;
1446 : char *endptr;
1447 : DumpId id;
1448 : TocEntry *te;
1449 :
1450 : /* Truncate line at comment, if any */
1451 UBC 0 : cmnt = strchr(linebuf.data, ';');
1452 UIC 0 : if (cmnt != NULL)
1453 : {
1454 0 : cmnt[0] = '\0';
1455 0 : linebuf.len = cmnt - linebuf.data;
1456 : }
1457 :
1458 : /* Ignore if all blank */
1459 UBC 0 : if (strspn(linebuf.data, " \t\r\n") == linebuf.len)
1460 0 : continue;
1461 :
1462 EUB : /* Get an ID, check it's valid and not already seen */
1463 UBC 0 : id = strtol(linebuf.data, &endptr, 10);
1464 UIC 0 : if (endptr == linebuf.data || id <= 0 || id > AH->maxDumpId ||
1465 0 : ropt->idWanted[id - 1])
1466 : {
1467 UBC 0 : pg_log_warning("line ignored: %s", linebuf.data);
1468 0 : continue;
1469 : }
1470 :
1471 EUB : /* Find TOC entry */
1472 UBC 0 : te = getTocEntryByDumpId(AH, id);
1473 0 : if (!te)
1474 UIC 0 : pg_fatal("could not find entry for ID %d",
1475 EUB : id);
1476 :
1477 : /* Mark it wanted */
1478 UIC 0 : ropt->idWanted[id - 1] = true;
1479 :
1480 EUB : /*
1481 : * Move each item to the end of the list as it is selected, so that
1482 : * they are placed in the desired order. Any unwanted items will end
1483 : * up at the front of the list, which may seem unintuitive but it's
1484 : * what we need. In an ordinary serial restore that makes no
1485 : * difference, but in a parallel restore we need to mark unrestored
1486 : * items' dependencies as satisfied before we start examining
1487 : * restorable items. Otherwise they could have surprising
1488 : * side-effects on the order in which restorable items actually get
1489 : * restored.
1490 : */
1491 UIC 0 : _moveBefore(AH->toc, te);
1492 : }
1493 :
1494 0 : pg_free(linebuf.data);
1495 :
1496 0 : if (fclose(fh) != 0)
1497 0 : pg_fatal("could not close TOC file: %m");
1498 0 : }
1499 EUB :
1500 : /**********************
1501 : * Convenience functions that look like standard IO functions
1502 : * for writing data when in dump mode.
1503 : **********************/
1504 :
1505 : /* Public */
1506 : void
1507 GIC 21600 : archputs(const char *s, Archive *AH)
1508 : {
1509 21600 : WriteData(AH, s, strlen(s));
1510 21600 : }
1511 :
1512 : /* Public */
1513 : int
1514 2985 : archprintf(Archive *AH, const char *fmt,...)
1515 ECB : {
1516 GIC 2985 : int save_errno = errno;
1517 ECB : char *p;
1518 CBC 2985 : size_t len = 128; /* initial assumption about buffer size */
1519 : size_t cnt;
1520 :
1521 : for (;;)
1522 LBC 0 : {
1523 : va_list args;
1524 ECB :
1525 : /* Allocate work buffer. */
1526 CBC 2985 : p = (char *) pg_malloc(len);
1527 :
1528 : /* Try to format the data. */
1529 GIC 2985 : errno = save_errno;
1530 GBC 2985 : va_start(args, fmt);
1531 GIC 2985 : cnt = pvsnprintf(p, len, fmt, args);
1532 2985 : va_end(args);
1533 :
1534 CBC 2985 : if (cnt < len)
1535 GIC 2985 : break; /* success */
1536 :
1537 ECB : /* Release buffer and loop around to try again with larger len. */
1538 LBC 0 : free(p);
1539 0 : len = cnt;
1540 ECB : }
1541 :
1542 CBC 2985 : WriteData(AH, p, cnt);
1543 2985 : free(p);
1544 GIC 2985 : return (int) cnt;
1545 : }
1546 EUB :
1547 :
1548 : /*******************************
1549 : * Stuff below here should be 'private' to the archiver routines
1550 ECB : *******************************/
1551 :
1552 : static void
1553 GNC 101 : SetOutput(ArchiveHandle *AH, const char *filename,
1554 : const pg_compress_specification compression_spec)
1555 : {
1556 : CompressFileHandle *CFH;
1557 : const char *mode;
1558 101 : int fn = -1;
1559 :
1560 GIC 101 : if (filename)
1561 : {
1562 101 : if (strcmp(filename, "-") == 0)
1563 UIC 0 : fn = fileno(stdout);
1564 : }
1565 0 : else if (AH->FH)
1566 0 : fn = fileno(AH->FH);
1567 LBC 0 : else if (AH->fSpec)
1568 : {
1569 UIC 0 : filename = AH->fSpec;
1570 ECB : }
1571 EUB : else
1572 UIC 0 : fn = fileno(stdout);
1573 EUB :
1574 GNC 101 : if (AH->mode == archModeAppend)
1575 34 : mode = PG_BINARY_A;
1576 : else
1577 67 : mode = PG_BINARY_W;
1578 ECB :
1579 GNC 101 : CFH = InitCompressFileHandle(compression_spec);
1580 :
1581 101 : if (!CFH->open_func(filename, fn, mode, CFH))
1582 ECB : {
1583 LBC 0 : if (filename)
1584 UBC 0 : pg_fatal("could not open output file \"%s\": %m", filename);
1585 : else
1586 LBC 0 : pg_fatal("could not open output file: %m");
1587 ECB : }
1588 :
1589 GNC 101 : AH->OF = CFH;
1590 GIC 101 : }
1591 :
1592 : static CompressFileHandle *
1593 126 : SaveOutput(ArchiveHandle *AH)
1594 : {
1595 GNC 126 : return (CompressFileHandle *) AH->OF;
1596 ECB : }
1597 :
1598 : static void
1599 GNC 101 : RestoreOutput(ArchiveHandle *AH, CompressFileHandle *savedOutput)
1600 ECB : {
1601 GNC 101 : errno = 0;
1602 101 : if (!EndCompressFileHandle(AH->OF))
1603 LBC 0 : pg_fatal("could not close output file: %m");
1604 :
1605 GNC 101 : AH->OF = savedOutput;
1606 GIC 101 : }
1607 :
1608 ECB :
1609 :
1610 : /*
1611 : * Print formatted text to the output file (usually stdout).
1612 : */
1613 : int
1614 CBC 111361 : ahprintf(ArchiveHandle *AH, const char *fmt,...)
1615 : {
1616 GIC 111361 : int save_errno = errno;
1617 : char *p;
1618 111361 : size_t len = 128; /* initial assumption about buffer size */
1619 : size_t cnt;
1620 :
1621 ECB : for (;;)
1622 GIC 6048 : {
1623 ECB : va_list args;
1624 :
1625 : /* Allocate work buffer. */
1626 GIC 117409 : p = (char *) pg_malloc(len);
1627 :
1628 : /* Try to format the data. */
1629 117409 : errno = save_errno;
1630 117409 : va_start(args, fmt);
1631 117409 : cnt = pvsnprintf(p, len, fmt, args);
1632 CBC 117409 : va_end(args);
1633 :
1634 117409 : if (cnt < len)
1635 GIC 111361 : break; /* success */
1636 :
1637 : /* Release buffer and loop around to try again with larger len. */
1638 GBC 6048 : free(p);
1639 6048 : len = cnt;
1640 : }
1641 :
1642 GIC 111361 : ahwrite(p, 1, cnt, AH);
1643 111361 : free(p);
1644 GBC 111361 : return (int) cnt;
1645 EUB : }
1646 :
1647 : /*
1648 : * Single place for logic which says 'We are restoring to a direct DB connection'.
1649 : */
1650 ECB : static int
1651 GIC 1805211 : RestoringToDB(ArchiveHandle *AH)
1652 ECB : {
1653 GIC 1805211 : RestoreOptions *ropt = AH->public.ropt;
1654 :
1655 1805211 : return (ropt && ropt->useDB && AH->connection);
1656 : }
1657 :
1658 ECB : /*
1659 : * Dump the current contents of the LO data buffer while writing a LO
1660 : */
1661 : static void
1662 CBC 10 : dump_lo_buf(ArchiveHandle *AH)
1663 : {
1664 10 : if (AH->connection)
1665 ECB : {
1666 : int res;
1667 :
1668 UIC 0 : res = lo_write(AH->connection, AH->loFd, AH->lo_buf, AH->lo_buf_used);
1669 0 : pg_log_debug(ngettext("wrote %zu byte of large object data (result = %d)",
1670 : "wrote %zu bytes of large object data (result = %d)",
1671 : AH->lo_buf_used),
1672 : AH->lo_buf_used, res);
1673 : /* We assume there are no short writes, only errors */
1674 0 : if (res != AH->lo_buf_used)
1675 LBC 0 : warn_or_exit_horribly(AH, "could not write to large object: %s",
1676 UIC 0 : PQerrorMessage(AH->connection));
1677 ECB : }
1678 : else
1679 : {
1680 GIC 10 : PQExpBuffer buf = createPQExpBuffer();
1681 ECB :
1682 GIC 10 : appendByteaLiteralAHX(buf,
1683 ECB : (const unsigned char *) AH->lo_buf,
1684 : AH->lo_buf_used,
1685 EUB : AH);
1686 :
1687 : /* Hack: turn off writingLO so ahwrite doesn't recurse to here */
1688 GNC 10 : AH->writingLO = false;
1689 GBC 10 : ahprintf(AH, "SELECT pg_catalog.lowrite(0, %s);\n", buf->data);
1690 GNC 10 : AH->writingLO = true;
1691 EUB :
1692 GIC 10 : destroyPQExpBuffer(buf);
1693 : }
1694 CBC 10 : AH->lo_buf_used = 0;
1695 10 : }
1696 :
1697 ECB :
1698 : /*
1699 : * Write buffer to the output file (usually stdout). This is used for
1700 : * outputting 'restore' scripts etc. It is even possible for an archive
1701 : * format to create a custom output routine to 'fake' a restore if it
1702 : * wants to generate a script (see TAR output).
1703 : */
1704 : void
1705 GIC 1803486 : ahwrite(const void *ptr, size_t size, size_t nmemb, ArchiveHandle *AH)
1706 ECB : {
1707 CBC 1803486 : int bytes_written = 0;
1708 :
1709 GNC 1803486 : if (AH->writingLO)
1710 ECB : {
1711 GIC 16 : size_t remaining = size * nmemb;
1712 ECB :
1713 CBC 16 : while (AH->lo_buf_used + remaining > AH->lo_buf_size)
1714 : {
1715 UIC 0 : size_t avail = AH->lo_buf_size - AH->lo_buf_used;
1716 ECB :
1717 UBC 0 : memcpy((char *) AH->lo_buf + AH->lo_buf_used, ptr, avail);
1718 LBC 0 : ptr = (const void *) ((const char *) ptr + avail);
1719 UIC 0 : remaining -= avail;
1720 0 : AH->lo_buf_used += avail;
1721 0 : dump_lo_buf(AH);
1722 EUB : }
1723 :
1724 GIC 16 : memcpy((char *) AH->lo_buf + AH->lo_buf_used, ptr, remaining);
1725 16 : AH->lo_buf_used += remaining;
1726 EUB :
1727 GIC 16 : bytes_written = size * nmemb;
1728 : }
1729 GBC 1803470 : else if (AH->CustomOutPtr)
1730 GIC 1614 : bytes_written = AH->CustomOutPtr(AH, ptr, size * nmemb);
1731 EUB :
1732 : /*
1733 : * If we're doing a restore, and it's direct to DB, and we're connected
1734 : * then send it to the DB.
1735 : */
1736 GNC 1801856 : else if (RestoringToDB(AH))
1737 3603 : bytes_written = ExecuteSqlCommandBuf(&AH->public, (const char *) ptr, size * nmemb);
1738 EUB : else
1739 : {
1740 GNC 1798253 : CompressFileHandle *CFH = (CompressFileHandle *) AH->OF;
1741 :
1742 1798253 : if (CFH->write_func(ptr, size * nmemb, CFH))
1743 1798253 : bytes_written = size * nmemb;
1744 EUB : }
1745 :
1746 GBC 1803486 : if (bytes_written != size * nmemb)
1747 UIC 0 : WRITE_ERROR_EXIT;
1748 GBC 1803486 : }
1749 :
1750 EUB : /* on some error, we may decide to go on... */
1751 : void
1752 UIC 0 : warn_or_exit_horribly(ArchiveHandle *AH, const char *fmt,...)
1753 : {
1754 : va_list ap;
1755 :
1756 0 : switch (AH->stage)
1757 : {
1758 EUB :
1759 UBC 0 : case STAGE_NONE:
1760 : /* Do nothing special */
1761 0 : break;
1762 EUB :
1763 UBC 0 : case STAGE_INITIALIZING:
1764 UIC 0 : if (AH->stage != AH->lastErrorStage)
1765 UBC 0 : pg_log_info("while INITIALIZING:");
1766 0 : break;
1767 :
1768 0 : case STAGE_PROCESSING:
1769 0 : if (AH->stage != AH->lastErrorStage)
1770 UIC 0 : pg_log_info("while PROCESSING TOC:");
1771 0 : break;
1772 :
1773 0 : case STAGE_FINALIZING:
1774 0 : if (AH->stage != AH->lastErrorStage)
1775 0 : pg_log_info("while FINALIZING:");
1776 0 : break;
1777 : }
1778 0 : if (AH->currentTE != NULL && AH->currentTE != AH->lastErrorTE)
1779 : {
1780 0 : pg_log_info("from TOC entry %d; %u %u %s %s %s",
1781 : AH->currentTE->dumpId,
1782 : AH->currentTE->catalogId.tableoid,
1783 : AH->currentTE->catalogId.oid,
1784 : AH->currentTE->desc ? AH->currentTE->desc : "(no desc)",
1785 : AH->currentTE->tag ? AH->currentTE->tag : "(no tag)",
1786 : AH->currentTE->owner ? AH->currentTE->owner : "(no owner)");
1787 : }
1788 0 : AH->lastErrorStage = AH->stage;
1789 UBC 0 : AH->lastErrorTE = AH->currentTE;
1790 :
1791 UIC 0 : va_start(ap, fmt);
1792 UBC 0 : pg_log_generic_v(PG_LOG_ERROR, PG_LOG_PRIMARY, fmt, ap);
1793 0 : va_end(ap);
1794 :
1795 UIC 0 : if (AH->public.exit_on_error)
1796 UBC 0 : exit_nicely(1);
1797 EUB : else
1798 UBC 0 : AH->public.n_errors++;
1799 0 : }
1800 EUB :
1801 : #ifdef NOT_USED
1802 :
1803 : static void
1804 : _moveAfter(ArchiveHandle *AH, TocEntry *pos, TocEntry *te)
1805 : {
1806 : /* Unlink te from list */
1807 : te->prev->next = te->next;
1808 : te->next->prev = te->prev;
1809 :
1810 : /* and insert it after "pos" */
1811 : te->prev = pos;
1812 : te->next = pos->next;
1813 : pos->next->prev = te;
1814 ECB : pos->next = te;
1815 : }
1816 : #endif
1817 :
1818 : static void
1819 LBC 0 : _moveBefore(TocEntry *pos, TocEntry *te)
1820 ECB : {
1821 : /* Unlink te from list */
1822 LBC 0 : te->prev->next = te->next;
1823 UIC 0 : te->next->prev = te->prev;
1824 :
1825 ECB : /* and insert it before "pos" */
1826 UBC 0 : te->prev = pos->prev;
1827 UIC 0 : te->next = pos;
1828 0 : pos->prev->next = te;
1829 LBC 0 : pos->prev = te;
1830 UIC 0 : }
1831 :
1832 : /*
1833 : * Build index arrays for the TOC list
1834 : *
1835 : * This should be invoked only after we have created or read in all the TOC
1836 : * items.
1837 ECB : *
1838 : * The arrays are indexed by dump ID (so entry zero is unused). Note that the
1839 : * array entries run only up to maxDumpId. We might see dependency dump IDs
1840 : * beyond that (if the dump was partial); so always check the array bound
1841 : * before trying to touch an array entry.
1842 : */
1843 : static void
1844 GIC 139 : buildTocEntryArrays(ArchiveHandle *AH)
1845 : {
1846 CBC 139 : DumpId maxDumpId = AH->maxDumpId;
1847 EUB : TocEntry *te;
1848 :
1849 CBC 139 : AH->tocsByDumpId = (TocEntry **) pg_malloc0((maxDumpId + 1) * sizeof(TocEntry *));
1850 GIC 139 : AH->tableDataId = (DumpId *) pg_malloc0((maxDumpId + 1) * sizeof(DumpId));
1851 :
1852 CBC 26360 : for (te = AH->toc->next; te != AH->toc; te = te->next)
1853 : {
1854 : /* this check is purely paranoia, maxDumpId should be correct */
1855 26221 : if (te->dumpId <= 0 || te->dumpId > maxDumpId)
1856 UIC 0 : pg_fatal("bad dumpId");
1857 :
1858 ECB : /* tocsByDumpId indexes all TOCs by their dump ID */
1859 CBC 26221 : AH->tocsByDumpId[te->dumpId] = te;
1860 :
1861 ECB : /*
1862 : * tableDataId provides the TABLE DATA item's dump ID for each TABLE
1863 : * TOC entry that has a DATA item. We compute this by reversing the
1864 EUB : * TABLE DATA item's dependency, knowing that a TABLE DATA item has
1865 : * just one dependency and it is the TABLE item.
1866 : */
1867 GIC 26221 : if (strcmp(te->desc, "TABLE DATA") == 0 && te->nDeps > 0)
1868 ECB : {
1869 GIC 3367 : DumpId tableId = te->dependencies[0];
1870 ECB :
1871 : /*
1872 : * The TABLE item might not have been in the archive, if this was
1873 : * a data-only dump; but its dump ID should be less than its data
1874 : * item's dump ID, so there should be a place for it in the array.
1875 : */
1876 GIC 3367 : if (tableId <= 0 || tableId > maxDumpId)
1877 UIC 0 : pg_fatal("bad table dumpId for TABLE DATA item");
1878 :
1879 CBC 3367 : AH->tableDataId[tableId] = te->dumpId;
1880 : }
1881 : }
1882 GIC 139 : }
1883 :
1884 ECB : TocEntry *
1885 GIC 10706 : getTocEntryByDumpId(ArchiveHandle *AH, DumpId id)
1886 : {
1887 ECB : /* build index arrays if we didn't already */
1888 GIC 10706 : if (AH->tocsByDumpId == NULL)
1889 CBC 20 : buildTocEntryArrays(AH);
1890 ECB :
1891 GIC 10706 : if (id > 0 && id <= AH->maxDumpId)
1892 CBC 10706 : return AH->tocsByDumpId[id];
1893 :
1894 UIC 0 : return NULL;
1895 : }
1896 ECB :
1897 : int
1898 GIC 10538 : TocIDRequired(ArchiveHandle *AH, DumpId id)
1899 : {
1900 10538 : TocEntry *te = getTocEntryByDumpId(AH, id);
1901 :
1902 10538 : if (!te)
1903 CBC 4916 : return 0;
1904 :
1905 GIC 5622 : return te->reqs;
1906 ECB : }
1907 :
1908 : size_t
1909 GBC 6334 : WriteOffset(ArchiveHandle *AH, pgoff_t o, int wasSet)
1910 : {
1911 EUB : int off;
1912 :
1913 : /* Save the flag */
1914 GBC 6334 : AH->WriteBytePtr(AH, wasSet);
1915 :
1916 : /* Write out pgoff_t smallest byte first, prevents endian mismatch */
1917 57006 : for (off = 0; off < sizeof(pgoff_t); off++)
1918 EUB : {
1919 GIC 50672 : AH->WriteBytePtr(AH, o & 0xFF);
1920 50672 : o >>= 8;
1921 : }
1922 6334 : return sizeof(pgoff_t) + 1;
1923 : }
1924 :
1925 : int
1926 4211 : ReadOffset(ArchiveHandle *AH, pgoff_t * o)
1927 : {
1928 ECB : int i;
1929 : int off;
1930 : int offsetFlg;
1931 :
1932 : /* Initialize to zero */
1933 GIC 4211 : *o = 0;
1934 :
1935 : /* Check for old version */
1936 CBC 4211 : if (AH->version < K_VERS_1_7)
1937 : {
1938 EUB : /* Prior versions wrote offsets using WriteInt */
1939 UBC 0 : i = ReadInt(AH);
1940 : /* -1 means not set */
1941 UIC 0 : if (i < 0)
1942 0 : return K_OFFSET_POS_NOT_SET;
1943 0 : else if (i == 0)
1944 0 : return K_OFFSET_NO_DATA;
1945 ECB :
1946 : /* Cast to pgoff_t because it was written as an int. */
1947 LBC 0 : *o = (pgoff_t) i;
1948 0 : return K_OFFSET_POS_SET;
1949 : }
1950 :
1951 EUB : /*
1952 : * Read the flag indicating the state of the data pointer. Check if valid
1953 : * and die if not.
1954 : *
1955 : * This used to be handled by a negative or zero pointer, now we use an
1956 ECB : * extra byte specifically for the state.
1957 : */
1958 GIC 4211 : offsetFlg = AH->ReadBytePtr(AH) & 0xFF;
1959 :
1960 CBC 4211 : switch (offsetFlg)
1961 : {
1962 GIC 4211 : case K_OFFSET_POS_NOT_SET:
1963 : case K_OFFSET_NO_DATA:
1964 : case K_OFFSET_POS_SET:
1965 :
1966 4211 : break;
1967 :
1968 UIC 0 : default:
1969 0 : pg_fatal("unexpected data offset flag %d", offsetFlg);
1970 : }
1971 :
1972 : /*
1973 ECB : * Read the bytes
1974 : */
1975 CBC 37899 : for (off = 0; off < AH->offSize; off++)
1976 ECB : {
1977 GIC 33688 : if (off < sizeof(pgoff_t))
1978 33688 : *o |= ((pgoff_t) (AH->ReadBytePtr(AH))) << (off * 8);
1979 ECB : else
1980 : {
1981 LBC 0 : if (AH->ReadBytePtr(AH) != 0)
1982 UIC 0 : pg_fatal("file offset in dump file is too large");
1983 ECB : }
1984 : }
1985 :
1986 GIC 4211 : return offsetFlg;
1987 ECB : }
1988 :
1989 : size_t
1990 GIC 143787 : WriteInt(ArchiveHandle *AH, int i)
1991 ECB : {
1992 : int b;
1993 :
1994 : /*
1995 : * This is a bit yucky, but I don't want to make the binary format very
1996 : * dependent on representation, and not knowing much about it, I write out
1997 : * a sign byte. If you change this, don't forget to change the file
1998 : * version #, and modify ReadInt to read the new format AS WELL AS the old
1999 : * formats.
2000 : */
2001 :
2002 : /* SIGN byte */
2003 CBC 143787 : if (i < 0)
2004 : {
2005 32269 : AH->WriteBytePtr(AH, 1);
2006 32269 : i = -i;
2007 ECB : }
2008 : else
2009 GIC 111518 : AH->WriteBytePtr(AH, 0);
2010 :
2011 CBC 718935 : for (b = 0; b < AH->intSize; b++)
2012 ECB : {
2013 GIC 575148 : AH->WriteBytePtr(AH, i & 0xFF);
2014 CBC 575148 : i >>= 8;
2015 : }
2016 :
2017 GIC 143787 : return AH->intSize + 1;
2018 ECB : }
2019 :
2020 : int
2021 GIC 111973 : ReadInt(ArchiveHandle *AH)
2022 ECB : {
2023 GIC 111973 : int res = 0;
2024 ECB : int bv,
2025 : b;
2026 CBC 111973 : int sign = 0; /* Default positive */
2027 111973 : int bitShift = 0;
2028 ECB :
2029 GIC 111973 : if (AH->version > K_VERS_1_0)
2030 : /* Read a sign byte */
2031 CBC 111973 : sign = AH->ReadBytePtr(AH);
2032 :
2033 559865 : for (b = 0; b < AH->intSize; b++)
2034 : {
2035 GIC 447892 : bv = AH->ReadBytePtr(AH) & 0xFF;
2036 447892 : if (bv != 0)
2037 CBC 109296 : res = res + (bv << bitShift);
2038 GIC 447892 : bitShift += 8;
2039 : }
2040 :
2041 111973 : if (sign)
2042 CBC 25809 : res = -res;
2043 ECB :
2044 CBC 111973 : return res;
2045 : }
2046 :
2047 ECB : size_t
2048 CBC 118932 : WriteStr(ArchiveHandle *AH, const char *c)
2049 : {
2050 ECB : size_t res;
2051 :
2052 GIC 118932 : if (c)
2053 ECB : {
2054 GIC 86663 : int len = strlen(c);
2055 :
2056 86663 : res = WriteInt(AH, len);
2057 CBC 86663 : AH->WriteBufPtr(AH, c, len);
2058 GIC 86663 : res += len;
2059 : }
2060 : else
2061 32269 : res = WriteInt(AH, -1);
2062 ECB :
2063 GBC 118932 : return res;
2064 : }
2065 ECB :
2066 : char *
2067 GIC 92650 : ReadStr(ArchiveHandle *AH)
2068 : {
2069 ECB : char *buf;
2070 : int l;
2071 :
2072 GIC 92650 : l = ReadInt(AH);
2073 92650 : if (l < 0)
2074 CBC 25809 : buf = NULL;
2075 : else
2076 ECB : {
2077 GIC 66841 : buf = (char *) pg_malloc(l + 1);
2078 CBC 66841 : AH->ReadBufPtr(AH, (void *) buf, l);
2079 :
2080 66841 : buf[l] = '\0';
2081 ECB : }
2082 :
2083 CBC 92650 : return buf;
2084 ECB : }
2085 :
2086 : static bool
2087 GNC 12 : _fileExistsInDirectory(const char *dir, const char *filename)
2088 : {
2089 : struct stat st;
2090 : char buf[MAXPGPATH];
2091 :
2092 12 : if (snprintf(buf, MAXPGPATH, "%s/%s", dir, filename) >= MAXPGPATH)
2093 UNC 0 : pg_fatal("directory name too long: \"%s\"", dir);
2094 :
2095 GNC 12 : return (stat(buf, &st) == 0 && S_ISREG(st.st_mode));
2096 : }
2097 :
2098 ECB : static int
2099 GIC 27 : _discoverArchiveFormat(ArchiveHandle *AH)
2100 : {
2101 : FILE *fh;
2102 ECB : char sig[6]; /* More than enough */
2103 : size_t cnt;
2104 GIC 27 : int wantClose = 0;
2105 :
2106 27 : pg_log_debug("attempting to ascertain archive format");
2107 :
2108 GNC 27 : free(AH->lookahead);
2109 ECB :
2110 CBC 27 : AH->readHeader = 0;
2111 27 : AH->lookaheadSize = 512;
2112 GIC 27 : AH->lookahead = pg_malloc0(512);
2113 GBC 27 : AH->lookaheadLen = 0;
2114 27 : AH->lookaheadPos = 0;
2115 :
2116 GIC 27 : if (AH->fSpec)
2117 EUB : {
2118 : struct stat st;
2119 :
2120 GIC 27 : wantClose = 1;
2121 EUB :
2122 : /*
2123 : * Check if the specified archive is a directory. If so, check if
2124 : * there's a "toc.dat" (or "toc.dat.{gz,lz4,zst}") file in it.
2125 : */
2126 GIC 27 : if (stat(AH->fSpec, &st) == 0 && S_ISDIR(st.st_mode))
2127 : {
2128 GNC 12 : AH->format = archDirectory;
2129 12 : if (_fileExistsInDirectory(AH->fSpec, "toc.dat"))
2130 GIC 12 : return AH->format;
2131 EUB : #ifdef HAVE_LIBZ
2132 UNC 0 : if (_fileExistsInDirectory(AH->fSpec, "toc.dat.gz"))
2133 0 : return AH->format;
2134 : #endif
2135 : #ifdef USE_LZ4
2136 0 : if (_fileExistsInDirectory(AH->fSpec, "toc.dat.lz4"))
2137 0 : return AH->format;
2138 : #endif
2139 : #ifdef USE_ZSTD
2140 0 : if (_fileExistsInDirectory(AH->fSpec, "toc.dat.zst"))
2141 UIC 0 : return AH->format;
2142 : #endif
2143 0 : pg_fatal("directory \"%s\" does not appear to be a valid archive (\"toc.dat\" does not exist)",
2144 : AH->fSpec);
2145 : fh = NULL; /* keep compiler quiet */
2146 ECB : }
2147 : else
2148 : {
2149 CBC 15 : fh = fopen(AH->fSpec, PG_BINARY_R);
2150 GIC 15 : if (!fh)
2151 UIC 0 : pg_fatal("could not open input file \"%s\": %m", AH->fSpec);
2152 ECB : }
2153 : }
2154 : else
2155 : {
2156 UIC 0 : fh = stdin;
2157 0 : if (!fh)
2158 0 : pg_fatal("could not open input file: %m");
2159 : }
2160 :
2161 CBC 15 : if ((cnt = fread(sig, 1, 5, fh)) != 5)
2162 : {
2163 LBC 0 : if (ferror(fh))
2164 UIC 0 : pg_fatal("could not read input file: %m");
2165 ECB : else
2166 LBC 0 : pg_fatal("input file is too short (read %lu, expected 5)",
2167 ECB : (unsigned long) cnt);
2168 : }
2169 :
2170 : /* Save it, just in case we need it later */
2171 GIC 15 : memcpy(&AH->lookahead[0], sig, 5);
2172 15 : AH->lookaheadLen = 5;
2173 EUB :
2174 GIC 15 : if (strncmp(sig, "PGDMP", 5) == 0)
2175 : {
2176 ECB : /* It's custom format, stop here */
2177 GIC 14 : AH->format = archCustom;
2178 GBC 14 : AH->readHeader = 1;
2179 EUB : }
2180 : else
2181 : {
2182 : /*
2183 : * *Maybe* we have a tar archive format file or a text dump ... So,
2184 ECB : * read first 512 byte header...
2185 EUB : */
2186 GIC 1 : cnt = fread(&AH->lookahead[AH->lookaheadLen], 1, 512 - AH->lookaheadLen, fh);
2187 ECB : /* read failure is checked below */
2188 GIC 1 : AH->lookaheadLen += cnt;
2189 :
2190 1 : if (AH->lookaheadLen >= strlen(TEXT_DUMPALL_HEADER) &&
2191 CBC 1 : (strncmp(AH->lookahead, TEXT_DUMP_HEADER, strlen(TEXT_DUMP_HEADER)) == 0 ||
2192 GIC 1 : strncmp(AH->lookahead, TEXT_DUMPALL_HEADER, strlen(TEXT_DUMPALL_HEADER)) == 0))
2193 ECB : {
2194 EUB : /*
2195 : * looks like it's probably a text format dump. so suggest they
2196 ECB : * try psql
2197 : */
2198 UIC 0 : pg_fatal("input file appears to be a text format dump. Please use psql.");
2199 : }
2200 ECB :
2201 GIC 1 : if (AH->lookaheadLen != 512)
2202 : {
2203 UIC 0 : if (feof(fh))
2204 0 : pg_fatal("input file does not appear to be a valid archive (too short?)");
2205 : else
2206 0 : READ_ERROR_EXIT(fh);
2207 : }
2208 ECB :
2209 GIC 1 : if (!isValidTarHeader(AH->lookahead))
2210 UIC 0 : pg_fatal("input file does not appear to be a valid archive");
2211 :
2212 GIC 1 : AH->format = archTar;
2213 : }
2214 :
2215 ECB : /* Close the file if we opened it */
2216 GIC 15 : if (wantClose)
2217 ECB : {
2218 GIC 15 : if (fclose(fh) != 0)
2219 UIC 0 : pg_fatal("could not close input file: %m");
2220 ECB : /* Forget lookahead, since we'll re-read header after re-opening */
2221 GIC 15 : AH->readHeader = 0;
2222 CBC 15 : AH->lookaheadLen = 0;
2223 : }
2224 :
2225 15 : return AH->format;
2226 ECB : }
2227 :
2228 :
2229 : /*
2230 : * Allocate an archive handle
2231 : */
2232 : static ArchiveHandle *
2233 GIC 164 : _allocAH(const char *FileSpec, const ArchiveFormat fmt,
2234 : const pg_compress_specification compression_spec,
2235 : bool dosync, ArchiveMode mode,
2236 : SetupWorkerPtrType setupWorkerPtr)
2237 ECB : {
2238 : ArchiveHandle *AH;
2239 : CompressFileHandle *CFH;
2240 GNC 164 : pg_compress_specification out_compress_spec = {0};
2241 ECB :
2242 GIC 164 : pg_log_debug("allocating AH for %s, format %d",
2243 ECB : FileSpec ? FileSpec : "(stdio)", fmt);
2244 :
2245 GIC 164 : AH = (ArchiveHandle *) pg_malloc0(sizeof(ArchiveHandle));
2246 :
2247 164 : AH->version = K_VERS_SELF;
2248 :
2249 : /* initialize for backwards compatible string processing */
2250 164 : AH->public.encoding = 0; /* PG_SQL_ASCII */
2251 164 : AH->public.std_strings = false;
2252 :
2253 ECB : /* sql error handling */
2254 GIC 164 : AH->public.exit_on_error = true;
2255 CBC 164 : AH->public.n_errors = 0;
2256 ECB :
2257 CBC 164 : AH->archiveDumpVersion = PG_VERSION;
2258 ECB :
2259 GIC 164 : AH->createDate = time(NULL);
2260 ECB :
2261 GIC 164 : AH->intSize = sizeof(int);
2262 CBC 164 : AH->offSize = sizeof(pgoff_t);
2263 164 : if (FileSpec)
2264 : {
2265 141 : AH->fSpec = pg_strdup(FileSpec);
2266 ECB :
2267 : /*
2268 : * Not used; maybe later....
2269 : *
2270 : * AH->workDir = pg_strdup(FileSpec); for(i=strlen(FileSpec) ; i > 0 ;
2271 : * i--) if (AH->workDir[i-1] == '/')
2272 : */
2273 : }
2274 : else
2275 GBC 23 : AH->fSpec = NULL;
2276 ECB :
2277 GIC 164 : AH->currUser = NULL; /* unknown */
2278 164 : AH->currSchema = NULL; /* ditto */
2279 164 : AH->currTablespace = NULL; /* ditto */
2280 164 : AH->currTableAm = NULL; /* ditto */
2281 :
2282 164 : AH->toc = (TocEntry *) pg_malloc0(sizeof(TocEntry));
2283 :
2284 164 : AH->toc->next = AH->toc;
2285 164 : AH->toc->prev = AH->toc;
2286 :
2287 164 : AH->mode = mode;
2288 GNC 164 : AH->compression_spec = compression_spec;
2289 GIC 164 : AH->dosync = dosync;
2290 :
2291 164 : memset(&(AH->sqlparse), 0, sizeof(AH->sqlparse));
2292 :
2293 : /* Open stdout with no compression for AH output handle */
2294 GNC 164 : out_compress_spec.algorithm = PG_COMPRESSION_NONE;
2295 164 : CFH = InitCompressFileHandle(out_compress_spec);
2296 164 : if (!CFH->open_func(NULL, fileno(stdout), PG_BINARY_A, CFH))
2297 UNC 0 : pg_fatal("could not open stdout for appending: %m");
2298 GNC 164 : AH->OF = CFH;
2299 ECB :
2300 : /*
2301 : * On Windows, we need to use binary mode to read/write non-text files,
2302 : * which include all archive formats as well as compressed plain text.
2303 : * Force stdin/stdout into binary mode if that is what we are using.
2304 : */
2305 : #ifdef WIN32
2306 : if ((fmt != archNull || compression_spec.algorithm != PG_COMPRESSION_NONE) &&
2307 : (AH->fSpec == NULL || strcmp(AH->fSpec, "") == 0))
2308 : {
2309 : if (mode == archModeWrite)
2310 : _setmode(fileno(stdout), O_BINARY);
2311 : else
2312 : _setmode(fileno(stdin), O_BINARY);
2313 : }
2314 : #endif
2315 :
2316 CBC 164 : AH->SetupWorkerPtr = setupWorkerPtr;
2317 :
2318 164 : if (fmt == archUnknown)
2319 27 : AH->format = _discoverArchiveFormat(AH);
2320 ECB : else
2321 GIC 137 : AH->format = fmt;
2322 EUB :
2323 GBC 164 : switch (AH->format)
2324 : {
2325 GIC 28 : case archCustom:
2326 CBC 28 : InitArchiveFmt_Custom(AH);
2327 GIC 28 : break;
2328 :
2329 107 : case archNull:
2330 107 : InitArchiveFmt_Null(AH);
2331 107 : break;
2332 :
2333 CBC 24 : case archDirectory:
2334 GIC 24 : InitArchiveFmt_Directory(AH);
2335 24 : break;
2336 :
2337 CBC 5 : case archTar:
2338 5 : InitArchiveFmt_Tar(AH);
2339 GIC 4 : break;
2340 :
2341 UIC 0 : default:
2342 0 : pg_fatal("unrecognized file format \"%d\"", fmt);
2343 : }
2344 :
2345 GIC 163 : return AH;
2346 : }
2347 :
2348 : /*
2349 : * Write out all data (tables & LOs)
2350 ECB : */
2351 : void
2352 GIC 25 : WriteDataChunks(ArchiveHandle *AH, ParallelState *pstate)
2353 : {
2354 ECB : TocEntry *te;
2355 :
2356 GIC 25 : if (pstate && pstate->numWorkers > 1)
2357 CBC 9 : {
2358 EUB : /*
2359 : * In parallel mode, this code runs in the leader process. We
2360 ECB : * construct an array of candidate TEs, then sort it into decreasing
2361 : * size order, then dispatch each TE to a data-transfer worker. By
2362 : * dumping larger tables first, we avoid getting into a situation
2363 : * where we're down to one job and it's big, losing parallelism.
2364 : */
2365 : TocEntry **tes;
2366 : int ntes;
2367 :
2368 GIC 9 : tes = (TocEntry **) pg_malloc(AH->tocCount * sizeof(TocEntry *));
2369 9 : ntes = 0;
2370 CBC 1207 : for (te = AH->toc->next; te != AH->toc; te = te->next)
2371 : {
2372 : /* Consider only TEs with dataDumper functions ... */
2373 1198 : if (!te->dataDumper)
2374 GIC 1076 : continue;
2375 : /* ... and ignore ones not enabled for dump */
2376 122 : if ((te->reqs & REQ_DATA) == 0)
2377 UIC 0 : continue;
2378 ECB :
2379 GIC 122 : tes[ntes++] = te;
2380 : }
2381 ECB :
2382 CBC 9 : if (ntes > 1)
2383 GNC 8 : qsort(tes, ntes, sizeof(TocEntry *), TocEntrySizeCompare);
2384 :
2385 CBC 131 : for (int i = 0; i < ntes; i++)
2386 GIC 122 : DispatchJobForTocEntry(AH, pstate, tes[i], ACT_DUMP,
2387 : mark_dump_job_done, NULL);
2388 ECB :
2389 GIC 9 : pg_free(tes);
2390 :
2391 : /* Now wait for workers to finish. */
2392 9 : WaitForWorkers(AH, pstate, WFW_ALL_IDLE);
2393 : }
2394 : else
2395 : {
2396 : /* Non-parallel mode: just dump all candidate TEs sequentially. */
2397 3749 : for (te = AH->toc->next; te != AH->toc; te = te->next)
2398 ECB : {
2399 : /* Must have same filter conditions as above */
2400 GIC 3733 : if (!te->dataDumper)
2401 3572 : continue;
2402 161 : if ((te->reqs & REQ_DATA) == 0)
2403 CBC 2 : continue;
2404 :
2405 GIC 159 : WriteDataChunksForTocEntry(AH, te);
2406 ECB : }
2407 EUB : }
2408 GIC 25 : }
2409 ECB :
2410 :
2411 : /*
2412 : * Callback function that's invoked in the leader process after a step has
2413 : * been parallel dumped.
2414 : *
2415 : * We don't need to do anything except check for worker failure.
2416 : */
2417 : static void
2418 CBC 122 : mark_dump_job_done(ArchiveHandle *AH,
2419 : TocEntry *te,
2420 ECB : int status,
2421 : void *callback_data)
2422 : {
2423 CBC 122 : pg_log_info("finished item %d %s %s",
2424 : te->dumpId, te->desc, te->tag);
2425 :
2426 GIC 122 : if (status != 0)
2427 LBC 0 : pg_fatal("worker process failed: exit code %d",
2428 ECB : status);
2429 GIC 122 : }
2430 :
2431 ECB :
2432 : void
2433 GIC 281 : WriteDataChunksForTocEntry(ArchiveHandle *AH, TocEntry *te)
2434 : {
2435 : StartDataPtrType startPtr;
2436 : EndDataPtrType endPtr;
2437 ECB :
2438 GIC 281 : AH->currToc = te;
2439 ECB :
2440 CBC 281 : if (strcmp(te->desc, "BLOBS") == 0)
2441 : {
2442 GNC 10 : startPtr = AH->StartLOsPtr;
2443 10 : endPtr = AH->EndLOsPtr;
2444 : }
2445 : else
2446 ECB : {
2447 GIC 271 : startPtr = AH->StartDataPtr;
2448 271 : endPtr = AH->EndDataPtr;
2449 : }
2450 :
2451 281 : if (startPtr != NULL)
2452 281 : (*startPtr) (AH, te);
2453 :
2454 ECB : /*
2455 : * The user-provided DataDumper routine needs to call AH->WriteData
2456 : */
2457 CBC 281 : te->dataDumper((Archive *) AH, te->dataDumperArg);
2458 ECB :
2459 GIC 281 : if (endPtr != NULL)
2460 281 : (*endPtr) (AH, te);
2461 :
2462 281 : AH->currToc = NULL;
2463 CBC 281 : }
2464 :
2465 ECB : void
2466 GIC 37 : WriteToc(ArchiveHandle *AH)
2467 ECB : {
2468 : TocEntry *te;
2469 : char workbuf[32];
2470 : int tocCount;
2471 : int i;
2472 :
2473 : /* count entries that will actually be dumped */
2474 CBC 37 : tocCount = 0;
2475 8137 : for (te = AH->toc->next; te != AH->toc; te = te->next)
2476 ECB : {
2477 CBC 8100 : if ((te->reqs & (REQ_SCHEMA | REQ_DATA | REQ_SPECIAL)) != 0)
2478 GIC 8096 : tocCount++;
2479 ECB : }
2480 :
2481 : /* printf("%d TOC Entries to save\n", tocCount); */
2482 :
2483 CBC 37 : WriteInt(AH, tocCount);
2484 ECB :
2485 CBC 8137 : for (te = AH->toc->next; te != AH->toc; te = te->next)
2486 ECB : {
2487 CBC 8100 : if ((te->reqs & (REQ_SCHEMA | REQ_DATA | REQ_SPECIAL)) == 0)
2488 4 : continue;
2489 ECB :
2490 GIC 8096 : WriteInt(AH, te->dumpId);
2491 8096 : WriteInt(AH, te->dataDumper ? 1 : 0);
2492 ECB :
2493 : /* OID is recorded as a string for historical reasons */
2494 CBC 8096 : sprintf(workbuf, "%u", te->catalogId.tableoid);
2495 8096 : WriteStr(AH, workbuf);
2496 GIC 8096 : sprintf(workbuf, "%u", te->catalogId.oid);
2497 CBC 8096 : WriteStr(AH, workbuf);
2498 :
2499 8096 : WriteStr(AH, te->tag);
2500 8096 : WriteStr(AH, te->desc);
2501 GIC 8096 : WriteInt(AH, te->section);
2502 CBC 8096 : WriteStr(AH, te->defn);
2503 GIC 8096 : WriteStr(AH, te->dropStmt);
2504 8096 : WriteStr(AH, te->copyStmt);
2505 CBC 8096 : WriteStr(AH, te->namespace);
2506 GIC 8096 : WriteStr(AH, te->tablespace);
2507 8096 : WriteStr(AH, te->tableam);
2508 8096 : WriteStr(AH, te->owner);
2509 8096 : WriteStr(AH, "false");
2510 :
2511 : /* Dump list of dependencies */
2512 19943 : for (i = 0; i < te->nDeps; i++)
2513 : {
2514 11847 : sprintf(workbuf, "%d", te->dependencies[i]);
2515 CBC 11847 : WriteStr(AH, workbuf);
2516 ECB : }
2517 GIC 8096 : WriteStr(AH, NULL); /* Terminate List */
2518 ECB :
2519 GIC 8096 : if (AH->WriteExtraTocPtr)
2520 CBC 8096 : AH->WriteExtraTocPtr(AH, te);
2521 ECB : }
2522 GIC 37 : }
2523 ECB :
2524 : void
2525 GIC 31 : ReadToc(ArchiveHandle *AH)
2526 : {
2527 ECB : int i;
2528 EUB : char *tmp;
2529 : DumpId *deps;
2530 : int depIdx;
2531 ECB : int depSize;
2532 : TocEntry *te;
2533 : bool is_supported;
2534 :
2535 CBC 31 : AH->tocCount = ReadInt(AH);
2536 31 : AH->maxDumpId = 0;
2537 ECB :
2538 GIC 6271 : for (i = 0; i < AH->tocCount; i++)
2539 : {
2540 GBC 6240 : te = (TocEntry *) pg_malloc0(sizeof(TocEntry));
2541 CBC 6240 : te->dumpId = ReadInt(AH);
2542 ECB :
2543 CBC 6240 : if (te->dumpId > AH->maxDumpId)
2544 GIC 1494 : AH->maxDumpId = te->dumpId;
2545 ECB :
2546 : /* Sanity check */
2547 GIC 6240 : if (te->dumpId <= 0)
2548 LBC 0 : pg_fatal("entry ID %d out of range -- perhaps a corrupt TOC",
2549 : te->dumpId);
2550 ECB :
2551 GIC 6240 : te->hadDumper = ReadInt(AH);
2552 :
2553 6240 : if (AH->version >= K_VERS_1_8)
2554 : {
2555 6240 : tmp = ReadStr(AH);
2556 6240 : sscanf(tmp, "%u", &te->catalogId.tableoid);
2557 6240 : free(tmp);
2558 : }
2559 EUB : else
2560 UBC 0 : te->catalogId.tableoid = InvalidOid;
2561 GBC 6240 : tmp = ReadStr(AH);
2562 6240 : sscanf(tmp, "%u", &te->catalogId.oid);
2563 6240 : free(tmp);
2564 EUB :
2565 GBC 6240 : te->tag = ReadStr(AH);
2566 6240 : te->desc = ReadStr(AH);
2567 EUB :
2568 GBC 6240 : if (AH->version >= K_VERS_1_11)
2569 EUB : {
2570 GBC 6240 : te->section = ReadInt(AH);
2571 EUB : }
2572 : else
2573 : {
2574 : /*
2575 : * Rules for pre-8.4 archives wherein pg_dump hasn't classified
2576 : * the entries into sections. This list need not cover entry
2577 : * types added later than 8.4.
2578 ECB : */
2579 LBC 0 : if (strcmp(te->desc, "COMMENT") == 0 ||
2580 UIC 0 : strcmp(te->desc, "ACL") == 0 ||
2581 LBC 0 : strcmp(te->desc, "ACL LANGUAGE") == 0)
2582 0 : te->section = SECTION_NONE;
2583 UIC 0 : else if (strcmp(te->desc, "TABLE DATA") == 0 ||
2584 LBC 0 : strcmp(te->desc, "BLOBS") == 0 ||
2585 0 : strcmp(te->desc, "BLOB COMMENTS") == 0)
2586 UIC 0 : te->section = SECTION_DATA;
2587 LBC 0 : else if (strcmp(te->desc, "CONSTRAINT") == 0 ||
2588 0 : strcmp(te->desc, "CHECK CONSTRAINT") == 0 ||
2589 UIC 0 : strcmp(te->desc, "FK CONSTRAINT") == 0 ||
2590 LBC 0 : strcmp(te->desc, "INDEX") == 0 ||
2591 0 : strcmp(te->desc, "RULE") == 0 ||
2592 UIC 0 : strcmp(te->desc, "TRIGGER") == 0)
2593 LBC 0 : te->section = SECTION_POST_DATA;
2594 ECB : else
2595 LBC 0 : te->section = SECTION_PRE_DATA;
2596 EUB : }
2597 :
2598 GIC 6240 : te->defn = ReadStr(AH);
2599 CBC 6240 : te->dropStmt = ReadStr(AH);
2600 :
2601 6240 : if (AH->version >= K_VERS_1_3)
2602 GBC 6240 : te->copyStmt = ReadStr(AH);
2603 :
2604 CBC 6240 : if (AH->version >= K_VERS_1_6)
2605 GIC 6240 : te->namespace = ReadStr(AH);
2606 :
2607 CBC 6240 : if (AH->version >= K_VERS_1_10)
2608 GBC 6240 : te->tablespace = ReadStr(AH);
2609 :
2610 GIC 6240 : if (AH->version >= K_VERS_1_14)
2611 CBC 6240 : te->tableam = ReadStr(AH);
2612 :
2613 6240 : te->owner = ReadStr(AH);
2614 6240 : is_supported = true;
2615 6240 : if (AH->version < K_VERS_1_9)
2616 UIC 0 : is_supported = false;
2617 : else
2618 ECB : {
2619 CBC 6240 : tmp = ReadStr(AH);
2620 ECB :
2621 CBC 6240 : if (strcmp(tmp, "true") == 0)
2622 UIC 0 : is_supported = false;
2623 EUB :
2624 GBC 6240 : free(tmp);
2625 : }
2626 ECB :
2627 CBC 6240 : if (!is_supported)
2628 LBC 0 : pg_log_warning("restoring tables WITH OIDS is not supported anymore");
2629 :
2630 : /* Read TOC entry dependencies */
2631 CBC 6240 : if (AH->version >= K_VERS_1_5)
2632 : {
2633 6240 : depSize = 100;
2634 6240 : deps = (DumpId *) pg_malloc(sizeof(DumpId) * depSize);
2635 6240 : depIdx = 0;
2636 : for (;;)
2637 : {
2638 GIC 15648 : tmp = ReadStr(AH);
2639 CBC 15648 : if (!tmp)
2640 6240 : break; /* end of list */
2641 9408 : if (depIdx >= depSize)
2642 : {
2643 UIC 0 : depSize *= 2;
2644 0 : deps = (DumpId *) pg_realloc(deps, sizeof(DumpId) * depSize);
2645 : }
2646 GBC 9408 : sscanf(tmp, "%d", &deps[depIdx]);
2647 9408 : free(tmp);
2648 GIC 9408 : depIdx++;
2649 ECB : }
2650 :
2651 CBC 6240 : if (depIdx > 0) /* We have a non-null entry */
2652 ECB : {
2653 GIC 5067 : deps = (DumpId *) pg_realloc(deps, sizeof(DumpId) * depIdx);
2654 CBC 5067 : te->dependencies = deps;
2655 GIC 5067 : te->nDeps = depIdx;
2656 : }
2657 : else
2658 ECB : {
2659 CBC 1173 : free(deps);
2660 1173 : te->dependencies = NULL;
2661 1173 : te->nDeps = 0;
2662 : }
2663 : }
2664 ECB : else
2665 : {
2666 LBC 0 : te->dependencies = NULL;
2667 0 : te->nDeps = 0;
2668 ECB : }
2669 CBC 6240 : te->dataLength = 0;
2670 :
2671 6240 : if (AH->ReadExtraTocPtr)
2672 GIC 6240 : AH->ReadExtraTocPtr(AH, te);
2673 :
2674 CBC 6240 : pg_log_debug("read TOC entry %d (ID %d) for %s %s",
2675 : i, te->dumpId, te->desc, te->tag);
2676 :
2677 ECB : /* link completed entry into TOC circular list */
2678 GIC 6240 : te->prev = AH->toc->prev;
2679 CBC 6240 : AH->toc->prev->next = te;
2680 GIC 6240 : AH->toc->prev = te;
2681 6240 : te->next = AH->toc;
2682 ECB :
2683 : /* special processing immediately upon read for some items */
2684 CBC 6240 : if (strcmp(te->desc, "ENCODING") == 0)
2685 31 : processEncodingEntry(AH, te);
2686 GIC 6209 : else if (strcmp(te->desc, "STDSTRINGS") == 0)
2687 CBC 31 : processStdStringsEntry(AH, te);
2688 6178 : else if (strcmp(te->desc, "SEARCHPATH") == 0)
2689 31 : processSearchPathEntry(AH, te);
2690 EUB : }
2691 GIC 31 : }
2692 ECB :
2693 : static void
2694 GIC 31 : processEncodingEntry(ArchiveHandle *AH, TocEntry *te)
2695 EUB : {
2696 : /* te->defn should have the form SET client_encoding = 'foo'; */
2697 GIC 31 : char *defn = pg_strdup(te->defn);
2698 ECB : char *ptr1;
2699 CBC 31 : char *ptr2 = NULL;
2700 : int encoding;
2701 :
2702 31 : ptr1 = strchr(defn, '\'');
2703 GIC 31 : if (ptr1)
2704 31 : ptr2 = strchr(++ptr1, '\'');
2705 31 : if (ptr2)
2706 : {
2707 CBC 31 : *ptr2 = '\0';
2708 31 : encoding = pg_char_to_encoding(ptr1);
2709 31 : if (encoding < 0)
2710 UBC 0 : pg_fatal("unrecognized encoding \"%s\"",
2711 EUB : ptr1);
2712 GIC 31 : AH->public.encoding = encoding;
2713 EUB : }
2714 : else
2715 LBC 0 : pg_fatal("invalid ENCODING item: %s",
2716 : te->defn);
2717 :
2718 CBC 31 : free(defn);
2719 GIC 31 : }
2720 :
2721 : static void
2722 31 : processStdStringsEntry(ArchiveHandle *AH, TocEntry *te)
2723 : {
2724 ECB : /* te->defn should have the form SET standard_conforming_strings = 'x'; */
2725 : char *ptr1;
2726 :
2727 GIC 31 : ptr1 = strchr(te->defn, '\'');
2728 GBC 31 : if (ptr1 && strncmp(ptr1, "'on'", 4) == 0)
2729 GIC 31 : AH->public.std_strings = true;
2730 UIC 0 : else if (ptr1 && strncmp(ptr1, "'off'", 5) == 0)
2731 0 : AH->public.std_strings = false;
2732 EUB : else
2733 UIC 0 : pg_fatal("invalid STDSTRINGS item: %s",
2734 EUB : te->defn);
2735 GIC 31 : }
2736 EUB :
2737 : static void
2738 GBC 31 : processSearchPathEntry(ArchiveHandle *AH, TocEntry *te)
2739 : {
2740 : /*
2741 EUB : * te->defn should contain a command to set search_path. We just copy it
2742 : * verbatim for use later.
2743 : */
2744 GBC 31 : AH->public.searchpath = pg_strdup(te->defn);
2745 31 : }
2746 :
2747 : static void
2748 UBC 0 : StrictNamesCheck(RestoreOptions *ropt)
2749 : {
2750 EUB : const char *missing_name;
2751 :
2752 UBC 0 : Assert(ropt->strict_names);
2753 :
2754 UIC 0 : if (ropt->schemaNames.head != NULL)
2755 EUB : {
2756 UIC 0 : missing_name = simple_string_list_not_touched(&ropt->schemaNames);
2757 UBC 0 : if (missing_name != NULL)
2758 0 : pg_fatal("schema \"%s\" not found", missing_name);
2759 EUB : }
2760 :
2761 UIC 0 : if (ropt->tableNames.head != NULL)
2762 EUB : {
2763 UIC 0 : missing_name = simple_string_list_not_touched(&ropt->tableNames);
2764 UBC 0 : if (missing_name != NULL)
2765 0 : pg_fatal("table \"%s\" not found", missing_name);
2766 EUB : }
2767 :
2768 UBC 0 : if (ropt->indexNames.head != NULL)
2769 : {
2770 UIC 0 : missing_name = simple_string_list_not_touched(&ropt->indexNames);
2771 0 : if (missing_name != NULL)
2772 0 : pg_fatal("index \"%s\" not found", missing_name);
2773 : }
2774 :
2775 0 : if (ropt->functionNames.head != NULL)
2776 : {
2777 0 : missing_name = simple_string_list_not_touched(&ropt->functionNames);
2778 LBC 0 : if (missing_name != NULL)
2779 UIC 0 : pg_fatal("function \"%s\" not found", missing_name);
2780 ECB : }
2781 :
2782 UIC 0 : if (ropt->triggerNames.head != NULL)
2783 : {
2784 LBC 0 : missing_name = simple_string_list_not_touched(&ropt->triggerNames);
2785 0 : if (missing_name != NULL)
2786 0 : pg_fatal("trigger \"%s\" not found", missing_name);
2787 ECB : }
2788 UIC 0 : }
2789 :
2790 : /*
2791 : * Determine whether we want to restore this TOC entry.
2792 : *
2793 : * Returns 0 if entry should be skipped, or some combination of the
2794 ECB : * REQ_SCHEMA and REQ_DATA bits if we want to restore schema and/or data
2795 : * portions of this TOC entry, or REQ_SPECIAL if it's a special entry.
2796 : */
2797 : static int
2798 CBC 27564 : _tocEntryRequired(TocEntry *te, teSection curSection, ArchiveHandle *AH)
2799 : {
2800 27564 : int res = REQ_SCHEMA | REQ_DATA;
2801 GIC 27564 : RestoreOptions *ropt = AH->public.ropt;
2802 :
2803 : /* These items are treated specially */
2804 27564 : if (strcmp(te->desc, "ENCODING") == 0 ||
2805 27415 : strcmp(te->desc, "STDSTRINGS") == 0 ||
2806 27266 : strcmp(te->desc, "SEARCHPATH") == 0)
2807 447 : return REQ_SPECIAL;
2808 ECB :
2809 EUB : /*
2810 : * DATABASE and DATABASE PROPERTIES also have a special rule: they are
2811 : * restored in createDB mode, and not restored otherwise, independently of
2812 ECB : * all else.
2813 EUB : */
2814 GIC 27117 : if (strcmp(te->desc, "DATABASE") == 0 ||
2815 27036 : strcmp(te->desc, "DATABASE PROPERTIES") == 0)
2816 : {
2817 99 : if (ropt->createDB)
2818 74 : return REQ_SCHEMA;
2819 ECB : else
2820 GBC 25 : return 0;
2821 EUB : }
2822 :
2823 : /*
2824 : * Process exclusions that affect certain classes of TOC entries.
2825 : */
2826 ECB :
2827 EUB : /* If it's an ACL, maybe ignore it */
2828 GIC 27018 : if (ropt->aclsSkip && _tocEntryIsACL(te))
2829 UIC 0 : return 0;
2830 ECB :
2831 EUB : /* If it's a comment, maybe ignore it */
2832 GIC 27018 : if (ropt->no_comments && strcmp(te->desc, "COMMENT") == 0)
2833 UIC 0 : return 0;
2834 ECB :
2835 : /*
2836 : * If it's a publication or a table part of a publication, maybe ignore
2837 : * it.
2838 : */
2839 CBC 27018 : if (ropt->no_publications &&
2840 LBC 0 : (strcmp(te->desc, "PUBLICATION") == 0 ||
2841 0 : strcmp(te->desc, "PUBLICATION TABLE") == 0 ||
2842 0 : strcmp(te->desc, "PUBLICATION TABLES IN SCHEMA") == 0))
2843 0 : return 0;
2844 ECB :
2845 : /* If it's a security label, maybe ignore it */
2846 CBC 27018 : if (ropt->no_security_labels && strcmp(te->desc, "SECURITY LABEL") == 0)
2847 LBC 0 : return 0;
2848 EUB :
2849 : /* If it's a subscription, maybe ignore it */
2850 GBC 27018 : if (ropt->no_subscriptions && strcmp(te->desc, "SUBSCRIPTION") == 0)
2851 UIC 0 : return 0;
2852 :
2853 : /* Ignore it if section is not to be dumped/restored */
2854 CBC 27018 : switch (curSection)
2855 EUB : {
2856 GIC 16742 : case SECTION_PRE_DATA:
2857 16742 : if (!(ropt->dumpSections & DUMP_PRE_DATA))
2858 338 : return 0;
2859 16404 : break;
2860 CBC 3974 : case SECTION_DATA:
2861 3974 : if (!(ropt->dumpSections & DUMP_DATA))
2862 64 : return 0;
2863 GIC 3910 : break;
2864 6302 : case SECTION_POST_DATA:
2865 CBC 6302 : if (!(ropt->dumpSections & DUMP_POST_DATA))
2866 GIC 142 : return 0;
2867 CBC 6160 : break;
2868 LBC 0 : default:
2869 : /* shouldn't get here, really, but ignore it */
2870 0 : return 0;
2871 ECB : }
2872 :
2873 : /* Ignore it if rejected by idWanted[] (cf. SortTocFromFile) */
2874 GIC 26474 : if (ropt->idWanted && !ropt->idWanted[te->dumpId - 1])
2875 UIC 0 : return 0;
2876 :
2877 : /*
2878 : * Check options for selective dump/restore.
2879 : */
2880 GIC 26474 : if (strcmp(te->desc, "ACL") == 0 ||
2881 23858 : strcmp(te->desc, "COMMENT") == 0 ||
2882 22708 : strcmp(te->desc, "SECURITY LABEL") == 0)
2883 : {
2884 : /* Database properties react to createDB, not selectivity options. */
2885 3766 : if (strncmp(te->tag, "DATABASE ", 9) == 0)
2886 : {
2887 47 : if (!ropt->createDB)
2888 21 : return 0;
2889 : }
2890 GBC 3719 : else if (ropt->schemaNames.head != NULL ||
2891 3719 : ropt->schemaExcludeNames.head != NULL ||
2892 3719 : ropt->selTypes)
2893 : {
2894 : /*
2895 : * In a selective dump/restore, we want to restore these dependent
2896 : * TOC entry types only if their parent object is being restored.
2897 : * Without selectivity options, we let through everything in the
2898 ECB : * archive. Note there may be such entries with no parent, eg
2899 : * non-default ACLs for built-in objects.
2900 : *
2901 EUB : * This code depends on the parent having been marked already,
2902 : * which should be the case; if it isn't, perhaps due to
2903 : * SortTocFromFile rearrangement, skipping the dependent entry
2904 : * seems prudent anyway.
2905 : *
2906 : * Ideally we'd handle, eg, table CHECK constraints this way too.
2907 ECB : * But it's hard to tell which of their dependencies is the one to
2908 EUB : * consult.
2909 : */
2910 UBC 0 : if (te->nDeps != 1 ||
2911 UIC 0 : TocIDRequired(AH, te->dependencies[0]) == 0)
2912 LBC 0 : return 0;
2913 : }
2914 EUB : }
2915 : else
2916 : {
2917 : /* Apply selective-restore rules for standalone TOC entries. */
2918 GBC 22708 : if (ropt->schemaNames.head != NULL)
2919 EUB : {
2920 : /* If no namespace is specified, it means all. */
2921 UBC 0 : if (!te->namespace)
2922 UIC 0 : return 0;
2923 UBC 0 : if (!simple_string_list_member(&ropt->schemaNames, te->namespace))
2924 0 : return 0;
2925 EUB : }
2926 :
2927 GBC 22708 : if (ropt->schemaExcludeNames.head != NULL &&
2928 UIC 0 : te->namespace &&
2929 UBC 0 : simple_string_list_member(&ropt->schemaExcludeNames, te->namespace))
2930 UIC 0 : return 0;
2931 EUB :
2932 GBC 22708 : if (ropt->selTypes)
2933 EUB : {
2934 UBC 0 : if (strcmp(te->desc, "TABLE") == 0 ||
2935 0 : strcmp(te->desc, "TABLE DATA") == 0 ||
2936 UIC 0 : strcmp(te->desc, "VIEW") == 0 ||
2937 UBC 0 : strcmp(te->desc, "FOREIGN TABLE") == 0 ||
2938 0 : strcmp(te->desc, "MATERIALIZED VIEW") == 0 ||
2939 0 : strcmp(te->desc, "MATERIALIZED VIEW DATA") == 0 ||
2940 UIC 0 : strcmp(te->desc, "SEQUENCE") == 0 ||
2941 UBC 0 : strcmp(te->desc, "SEQUENCE SET") == 0)
2942 EUB : {
2943 UBC 0 : if (!ropt->selTable)
2944 0 : return 0;
2945 0 : if (ropt->tableNames.head != NULL &&
2946 UIC 0 : !simple_string_list_member(&ropt->tableNames, te->tag))
2947 UBC 0 : return 0;
2948 : }
2949 0 : else if (strcmp(te->desc, "INDEX") == 0)
2950 EUB : {
2951 UBC 0 : if (!ropt->selIndex)
2952 0 : return 0;
2953 0 : if (ropt->indexNames.head != NULL &&
2954 UIC 0 : !simple_string_list_member(&ropt->indexNames, te->tag))
2955 0 : return 0;
2956 EUB : }
2957 UIC 0 : else if (strcmp(te->desc, "FUNCTION") == 0 ||
2958 0 : strcmp(te->desc, "AGGREGATE") == 0 ||
2959 0 : strcmp(te->desc, "PROCEDURE") == 0)
2960 : {
2961 0 : if (!ropt->selFunction)
2962 0 : return 0;
2963 0 : if (ropt->functionNames.head != NULL &&
2964 0 : !simple_string_list_member(&ropt->functionNames, te->tag))
2965 0 : return 0;
2966 ECB : }
2967 UIC 0 : else if (strcmp(te->desc, "TRIGGER") == 0)
2968 : {
2969 0 : if (!ropt->selTrigger)
2970 0 : return 0;
2971 0 : if (ropt->triggerNames.head != NULL &&
2972 0 : !simple_string_list_member(&ropt->triggerNames, te->tag))
2973 0 : return 0;
2974 : }
2975 ECB : else
2976 LBC 0 : return 0;
2977 ECB : }
2978 : }
2979 :
2980 : /*
2981 : * Determine whether the TOC entry contains schema and/or data components,
2982 EUB : * and mask off inapplicable REQ bits. If it had a dataDumper, assume
2983 ECB : * it's both schema and data. Otherwise it's probably schema-only, but
2984 : * there are exceptions.
2985 : */
2986 GIC 26453 : if (!te->hadDumper)
2987 : {
2988 : /*
2989 : * Special Case: If 'SEQUENCE SET' or anything to do with LOs, then
2990 : * it is considered a data entry. We don't need to check for the
2991 : * BLOBS entry or old-style BLOB COMMENTS, because they will have
2992 ECB : * hadDumper = true ... but we do need to check new-style BLOB ACLs,
2993 : * comments, etc.
2994 : */
2995 GIC 22965 : if (strcmp(te->desc, "SEQUENCE SET") == 0 ||
2996 22543 : strcmp(te->desc, "BLOB") == 0 ||
2997 22423 : (strcmp(te->desc, "ACL") == 0 &&
2998 2616 : strncmp(te->tag, "LARGE OBJECT ", 13) == 0) ||
2999 22373 : (strcmp(te->desc, "COMMENT") == 0 &&
3000 CBC 1129 : strncmp(te->tag, "LARGE OBJECT ", 13) == 0) ||
3001 GBC 22310 : (strcmp(te->desc, "SECURITY LABEL") == 0 &&
3002 UIC 0 : strncmp(te->tag, "LARGE OBJECT ", 13) == 0))
3003 GIC 655 : res = res & REQ_DATA;
3004 ECB : else
3005 GIC 22310 : res = res & ~REQ_DATA;
3006 : }
3007 :
3008 : /*
3009 : * If there's no definition command, there's no schema component. Treat
3010 : * "load via partition root" comments as not schema.
3011 : */
3012 26453 : if (!te->defn || !te->defn[0] ||
3013 22977 : strncmp(te->defn, "-- load via partition root ", 27) == 0)
3014 CBC 3488 : res = res & ~REQ_SCHEMA;
3015 ECB :
3016 : /*
3017 : * Special case: <Init> type with <Max OID> tag; this is obsolete and we
3018 : * always ignore it.
3019 : */
3020 CBC 26453 : if ((strcmp(te->desc, "<Init>") == 0) && (strcmp(te->tag, "Max OID") == 0))
3021 LBC 0 : return 0;
3022 EUB :
3023 ECB : /* Mask it if we only want schema */
3024 GIC 26453 : if (ropt->schemaOnly)
3025 : {
3026 : /*
3027 ECB : * The sequence_data option overrides schemaOnly for SEQUENCE SET.
3028 : *
3029 : * In binary-upgrade mode, even with schemaOnly set, we do not mask
3030 : * out large objects. (Only large object definitions, comments and
3031 : * other metadata should be generated in binary-upgrade mode, not the
3032 : * actual data, but that need not concern us here.)
3033 : */
3034 GIC 2343 : if (!(ropt->sequence_data && strcmp(te->desc, "SEQUENCE SET") == 0) &&
3035 2295 : !(ropt->binary_upgrade &&
3036 2063 : (strcmp(te->desc, "BLOB") == 0 ||
3037 2058 : (strcmp(te->desc, "ACL") == 0 &&
3038 75 : strncmp(te->tag, "LARGE OBJECT ", 13) == 0) ||
3039 CBC 2057 : (strcmp(te->desc, "COMMENT") == 0 &&
3040 GIC 42 : strncmp(te->tag, "LARGE OBJECT ", 13) == 0) ||
3041 2054 : (strcmp(te->desc, "SECURITY LABEL") == 0 &&
3042 LBC 0 : strncmp(te->tag, "LARGE OBJECT ", 13) == 0))))
3043 CBC 2286 : res = res & REQ_SCHEMA;
3044 ECB : }
3045 :
3046 : /* Mask it if we only want data */
3047 CBC 26453 : if (ropt->dataOnly)
3048 121 : res = res & REQ_DATA;
3049 :
3050 GIC 26453 : return res;
3051 : }
3052 :
3053 : /*
3054 : * Identify which pass we should restore this TOC entry in.
3055 : *
3056 ECB : * See notes with the RestorePass typedef in pg_backup_archiver.h.
3057 : */
3058 EUB : static RestorePass
3059 GIC 58946 : _tocEntryRestorePass(TocEntry *te)
3060 : {
3061 ECB : /* "ACL LANGUAGE" was a crock emitted only in PG 7.4 */
3062 GIC 58946 : if (strcmp(te->desc, "ACL") == 0 ||
3063 53640 : strcmp(te->desc, "ACL LANGUAGE") == 0 ||
3064 53640 : strcmp(te->desc, "DEFAULT ACL") == 0)
3065 5708 : return RESTORE_PASS_ACL;
3066 53238 : if (strcmp(te->desc, "EVENT TRIGGER") == 0 ||
3067 53137 : strcmp(te->desc, "MATERIALIZED VIEW DATA") == 0)
3068 785 : return RESTORE_PASS_POST_ACL;
3069 :
3070 : /*
3071 : * Comments need to be emitted in the same pass as their parent objects.
3072 ECB : * ACLs haven't got comments, and neither do matview data objects, but
3073 : * event triggers do. (Fortunately, event triggers haven't got ACLs, or
3074 : * we'd need yet another weird special case.)
3075 : */
3076 CBC 52453 : if (strcmp(te->desc, "COMMENT") == 0 &&
3077 2276 : strncmp(te->tag, "EVENT TRIGGER ", 14) == 0)
3078 LBC 0 : return RESTORE_PASS_POST_ACL;
3079 ECB :
3080 : /* All else can be handled in the main pass. */
3081 GIC 52453 : return RESTORE_PASS_MAIN;
3082 : }
3083 :
3084 : /*
3085 : * Identify TOC entries that are ACLs.
3086 : *
3087 ECB : * Note: it seems worth duplicating some code here to avoid a hard-wired
3088 : * assumption that these are exactly the same entries that we restore during
3089 : * the RESTORE_PASS_ACL phase.
3090 : */
3091 : static bool
3092 GIC 20858 : _tocEntryIsACL(TocEntry *te)
3093 : {
3094 ECB : /* "ACL LANGUAGE" was a crock emitted only in PG 7.4 */
3095 CBC 20858 : if (strcmp(te->desc, "ACL") == 0 ||
3096 19028 : strcmp(te->desc, "ACL LANGUAGE") == 0 ||
3097 GIC 19028 : strcmp(te->desc, "DEFAULT ACL") == 0)
3098 1964 : return true;
3099 CBC 18894 : return false;
3100 : }
3101 :
3102 : /*
3103 ECB : * Issue SET commands for parameters that we want to have set the same way
3104 : * at all times during execution of a restore script.
3105 : */
3106 : static void
3107 CBC 179 : _doSetFixedOutputState(ArchiveHandle *AH)
3108 EUB : {
3109 GIC 179 : RestoreOptions *ropt = AH->public.ropt;
3110 :
3111 ECB : /*
3112 : * Disable timeouts to allow for slow commands, idle parallel workers, etc
3113 : */
3114 GIC 179 : ahprintf(AH, "SET statement_timeout = 0;\n");
3115 CBC 179 : ahprintf(AH, "SET lock_timeout = 0;\n");
3116 GIC 179 : ahprintf(AH, "SET idle_in_transaction_session_timeout = 0;\n");
3117 :
3118 ECB : /* Select the correct character set encoding */
3119 GIC 179 : ahprintf(AH, "SET client_encoding = '%s';\n",
3120 : pg_encoding_to_char(AH->public.encoding));
3121 ECB :
3122 : /* Select the correct string literal syntax */
3123 GBC 179 : ahprintf(AH, "SET standard_conforming_strings = %s;\n",
3124 GIC 179 : AH->public.std_strings ? "on" : "off");
3125 :
3126 ECB : /* Select the role to be used during restore */
3127 GBC 179 : if (ropt && ropt->use_role)
3128 UIC 0 : ahprintf(AH, "SET ROLE %s;\n", fmtId(ropt->use_role));
3129 ECB :
3130 : /* Select the dump-time search_path */
3131 CBC 179 : if (AH->public.searchpath)
3132 179 : ahprintf(AH, "%s", AH->public.searchpath);
3133 :
3134 : /* Make sure function checking is disabled */
3135 GIC 179 : ahprintf(AH, "SET check_function_bodies = false;\n");
3136 :
3137 : /* Ensure that all valid XML data will be accepted */
3138 179 : ahprintf(AH, "SET xmloption = content;\n");
3139 :
3140 ECB : /* Avoid annoying notices etc */
3141 GIC 179 : ahprintf(AH, "SET client_min_messages = warning;\n");
3142 CBC 179 : if (!AH->public.std_strings)
3143 UIC 0 : ahprintf(AH, "SET escape_string_warning = off;\n");
3144 ECB :
3145 : /* Adjust row-security state */
3146 GIC 179 : if (ropt && ropt->enable_row_security)
3147 UIC 0 : ahprintf(AH, "SET row_security = on;\n");
3148 : else
3149 CBC 179 : ahprintf(AH, "SET row_security = off;\n");
3150 ECB :
3151 GIC 179 : ahprintf(AH, "\n");
3152 GBC 179 : }
3153 ECB :
3154 : /*
3155 : * Issue a SET SESSION AUTHORIZATION command. Caller is responsible
3156 : * for updating state if appropriate. If user is NULL or an empty string,
3157 : * the specification DEFAULT will be used.
3158 : */
3159 EUB : static void
3160 GIC 1 : _doSetSessionAuth(ArchiveHandle *AH, const char *user)
3161 EUB : {
3162 GIC 1 : PQExpBuffer cmd = createPQExpBuffer();
3163 EUB :
3164 GIC 1 : appendPQExpBufferStr(cmd, "SET SESSION AUTHORIZATION ");
3165 :
3166 EUB : /*
3167 : * SQL requires a string literal here. Might as well be correct.
3168 : */
3169 CBC 1 : if (user && *user)
3170 GIC 1 : appendStringLiteralAHX(cmd, user, AH);
3171 ECB : else
3172 LBC 0 : appendPQExpBufferStr(cmd, "DEFAULT");
3173 GIC 1 : appendPQExpBufferChar(cmd, ';');
3174 :
3175 1 : if (RestoringToDB(AH))
3176 : {
3177 : PGresult *res;
3178 :
3179 UIC 0 : res = PQexec(AH->connection, cmd->data);
3180 :
3181 0 : if (!res || PQresultStatus(res) != PGRES_COMMAND_OK)
3182 : /* NOT warn_or_exit_horribly... use -O instead to skip this. */
3183 LBC 0 : pg_fatal("could not set session user to \"%s\": %s",
3184 : user, PQerrorMessage(AH->connection));
3185 ECB :
3186 LBC 0 : PQclear(res);
3187 : }
3188 : else
3189 GIC 1 : ahprintf(AH, "%s\n\n", cmd->data);
3190 :
3191 CBC 1 : destroyPQExpBuffer(cmd);
3192 1 : }
3193 ECB :
3194 :
3195 : /*
3196 : * Issue the commands to connect to the specified database.
3197 : *
3198 : * If we're currently restoring right into a database, this will
3199 : * actually establish a connection. Otherwise it puts a \connect into
3200 : * the script output.
3201 : */
3202 : static void
3203 GIC 44 : _reconnectToDB(ArchiveHandle *AH, const char *dbname)
3204 : {
3205 CBC 44 : if (RestoringToDB(AH))
3206 13 : ReconnectToServer(AH, dbname);
3207 : else
3208 ECB : {
3209 : PQExpBufferData connectbuf;
3210 :
3211 CBC 31 : initPQExpBuffer(&connectbuf);
3212 31 : appendPsqlMetaConnect(&connectbuf, dbname);
3213 GIC 31 : ahprintf(AH, "%s\n", connectbuf.data);
3214 31 : termPQExpBuffer(&connectbuf);
3215 ECB : }
3216 :
3217 : /*
3218 : * NOTE: currUser keeps track of what the imaginary session user in our
3219 : * script is. It's now effectively reset to the original userID.
3220 : */
3221 GNC 44 : free(AH->currUser);
3222 GIC 44 : AH->currUser = NULL;
3223 ECB :
3224 : /* don't assume we still know the output schema, tablespace, etc either */
3225 GNC 44 : free(AH->currSchema);
3226 GIC 44 : AH->currSchema = NULL;
3227 ECB :
3228 GNC 44 : free(AH->currTableAm);
3229 CBC 44 : AH->currTableAm = NULL;
3230 :
3231 GNC 44 : free(AH->currTablespace);
3232 GIC 44 : AH->currTablespace = NULL;
3233 :
3234 ECB : /* re-establish fixed state */
3235 CBC 44 : _doSetFixedOutputState(AH);
3236 GIC 44 : }
3237 :
3238 : /*
3239 : * Become the specified user, and update state to avoid redundant commands
3240 : *
3241 : * NULL or empty argument is taken to mean restoring the session default
3242 : */
3243 ECB : static void
3244 GIC 48 : _becomeUser(ArchiveHandle *AH, const char *user)
3245 ECB : {
3246 GIC 48 : if (!user)
3247 LBC 0 : user = ""; /* avoid null pointers */
3248 ECB :
3249 GIC 48 : if (AH->currUser && strcmp(AH->currUser, user) == 0)
3250 GBC 47 : return; /* no need to do anything */
3251 :
3252 GIC 1 : _doSetSessionAuth(AH, user);
3253 :
3254 : /*
3255 : * NOTE: currUser keeps track of what the imaginary session user in our
3256 : * script is
3257 : */
3258 GNC 1 : free(AH->currUser);
3259 GIC 1 : AH->currUser = pg_strdup(user);
3260 : }
3261 :
3262 : /*
3263 : * Become the owner of the given TOC entry object. If
3264 : * changes in ownership are not allowed, this doesn't do anything.
3265 : */
3266 : static void
3267 23973 : _becomeOwner(ArchiveHandle *AH, TocEntry *te)
3268 ECB : {
3269 CBC 23973 : RestoreOptions *ropt = AH->public.ropt;
3270 :
3271 GBC 23973 : if (ropt && (ropt->noOwner || !ropt->use_setsessauth))
3272 23973 : return;
3273 EUB :
3274 UIC 0 : _becomeUser(AH, te->owner);
3275 EUB : }
3276 :
3277 :
3278 : /*
3279 : * Issue the commands to select the specified schema as the current schema
3280 : * in the target database.
3281 : */
3282 : static void
3283 GIC 24012 : _selectOutputSchema(ArchiveHandle *AH, const char *schemaName)
3284 : {
3285 : PQExpBuffer qry;
3286 EUB :
3287 : /*
3288 : * If there was a SEARCHPATH TOC entry, we're supposed to just stay with
3289 : * that search_path rather than switching to entry-specific paths.
3290 : * Otherwise, it's an old archive that will not restore correctly unless
3291 : * we set the search_path as it's expecting.
3292 : */
3293 GBC 24012 : if (AH->public.searchpath)
3294 GIC 24012 : return;
3295 :
3296 UBC 0 : if (!schemaName || *schemaName == '\0' ||
3297 UIC 0 : (AH->currSchema && strcmp(AH->currSchema, schemaName) == 0))
3298 UBC 0 : return; /* no need to do anything */
3299 EUB :
3300 UIC 0 : qry = createPQExpBuffer();
3301 EUB :
3302 UIC 0 : appendPQExpBuffer(qry, "SET search_path = %s",
3303 : fmtId(schemaName));
3304 0 : if (strcmp(schemaName, "pg_catalog") != 0)
3305 0 : appendPQExpBufferStr(qry, ", pg_catalog");
3306 :
3307 0 : if (RestoringToDB(AH))
3308 : {
3309 ECB : PGresult *res;
3310 :
3311 LBC 0 : res = PQexec(AH->connection, qry->data);
3312 :
3313 UIC 0 : if (!res || PQresultStatus(res) != PGRES_COMMAND_OK)
3314 0 : warn_or_exit_horribly(AH,
3315 : "could not set search_path to \"%s\": %s",
3316 0 : schemaName, PQerrorMessage(AH->connection));
3317 ECB :
3318 UBC 0 : PQclear(res);
3319 : }
3320 ECB : else
3321 LBC 0 : ahprintf(AH, "%s;\n\n", qry->data);
3322 :
3323 UNC 0 : free(AH->currSchema);
3324 LBC 0 : AH->currSchema = pg_strdup(schemaName);
3325 :
3326 0 : destroyPQExpBuffer(qry);
3327 ECB : }
3328 :
3329 : /*
3330 : * Issue the commands to select the specified tablespace as the current one
3331 : * in the target database.
3332 : */
3333 : static void
3334 CBC 20646 : _selectTablespace(ArchiveHandle *AH, const char *tablespace)
3335 : {
3336 GIC 20646 : RestoreOptions *ropt = AH->public.ropt;
3337 : PQExpBuffer qry;
3338 : const char *want,
3339 EUB : *have;
3340 :
3341 : /* do nothing in --no-tablespaces mode */
3342 CBC 20646 : if (ropt->noTablespace)
3343 UIC 0 : return;
3344 :
3345 GIC 20646 : have = AH->currTablespace;
3346 CBC 20646 : want = tablespace;
3347 :
3348 ECB : /* no need to do anything for non-tablespace object */
3349 GBC 20646 : if (!want)
3350 GIC 14513 : return;
3351 EUB :
3352 GIC 6133 : if (have && strcmp(want, have) == 0)
3353 CBC 6056 : return; /* no need to do anything */
3354 :
3355 GIC 77 : qry = createPQExpBuffer();
3356 ECB :
3357 GIC 77 : if (strcmp(want, "") == 0)
3358 ECB : {
3359 : /* We want the tablespace to be the database's default */
3360 GIC 77 : appendPQExpBufferStr(qry, "SET default_tablespace = ''");
3361 ECB : }
3362 : else
3363 : {
3364 : /* We want an explicit tablespace */
3365 UIC 0 : appendPQExpBuffer(qry, "SET default_tablespace = %s", fmtId(want));
3366 : }
3367 :
3368 CBC 77 : if (RestoringToDB(AH))
3369 : {
3370 ECB : PGresult *res;
3371 :
3372 GIC 11 : res = PQexec(AH->connection, qry->data);
3373 :
3374 11 : if (!res || PQresultStatus(res) != PGRES_COMMAND_OK)
3375 UIC 0 : warn_or_exit_horribly(AH,
3376 ECB : "could not set default_tablespace to %s: %s",
3377 LBC 0 : fmtId(want), PQerrorMessage(AH->connection));
3378 :
3379 CBC 11 : PQclear(res);
3380 ECB : }
3381 : else
3382 CBC 66 : ahprintf(AH, "%s;\n\n", qry->data);
3383 ECB :
3384 GNC 77 : free(AH->currTablespace);
3385 CBC 77 : AH->currTablespace = pg_strdup(want);
3386 :
3387 77 : destroyPQExpBuffer(qry);
3388 ECB : }
3389 :
3390 : /*
3391 : * Set the proper default_table_access_method value for the table.
3392 : */
3393 : static void
3394 CBC 20646 : _selectTableAccessMethod(ArchiveHandle *AH, const char *tableam)
3395 : {
3396 20646 : RestoreOptions *ropt = AH->public.ropt;
3397 EUB : PQExpBuffer cmd;
3398 : const char *want,
3399 : *have;
3400 :
3401 ECB : /* do nothing in --no-table-access-method mode */
3402 GIC 20646 : if (ropt->noTableAm)
3403 256 : return;
3404 ECB :
3405 GIC 20390 : have = AH->currTableAm;
3406 CBC 20390 : want = tableam;
3407 :
3408 20390 : if (!want)
3409 16530 : return;
3410 :
3411 GIC 3860 : if (have && strcmp(want, have) == 0)
3412 3636 : return;
3413 :
3414 224 : cmd = createPQExpBuffer();
3415 224 : appendPQExpBuffer(cmd, "SET default_table_access_method = %s;", fmtId(want));
3416 :
3417 224 : if (RestoringToDB(AH))
3418 : {
3419 : PGresult *res;
3420 ECB :
3421 GIC 9 : res = PQexec(AH->connection, cmd->data);
3422 ECB :
3423 GIC 9 : if (!res || PQresultStatus(res) != PGRES_COMMAND_OK)
3424 UIC 0 : warn_or_exit_horribly(AH,
3425 ECB : "could not set default_table_access_method: %s",
3426 LBC 0 : PQerrorMessage(AH->connection));
3427 ECB :
3428 CBC 9 : PQclear(res);
3429 ECB : }
3430 : else
3431 CBC 215 : ahprintf(AH, "%s\n\n", cmd->data);
3432 ECB :
3433 CBC 224 : destroyPQExpBuffer(cmd);
3434 ECB :
3435 GNC 224 : free(AH->currTableAm);
3436 GIC 224 : AH->currTableAm = pg_strdup(want);
3437 ECB : }
3438 :
3439 : /*
3440 : * Extract an object description for a TOC entry, and append it to buf.
3441 : *
3442 : * This is used for ALTER ... OWNER TO.
3443 : *
3444 : * If the object type has no owner, do nothing.
3445 : */
3446 : static void
3447 GNC 11391 : _getObjectDescription(PQExpBuffer buf, const TocEntry *te)
3448 ECB : {
3449 CBC 11391 : const char *type = te->desc;
3450 ECB :
3451 : /* objects that don't require special decoration */
3452 GIC 11391 : if (strcmp(type, "COLLATION") == 0 ||
3453 11320 : strcmp(type, "CONVERSION") == 0 ||
3454 11286 : strcmp(type, "DOMAIN") == 0 ||
3455 CBC 11167 : strcmp(type, "FOREIGN TABLE") == 0 ||
3456 GNC 11131 : strcmp(type, "MATERIALIZED VIEW") == 0 ||
3457 10819 : strcmp(type, "SEQUENCE") == 0 ||
3458 10614 : strcmp(type, "STATISTICS") == 0 ||
3459 10492 : strcmp(type, "TABLE") == 0 ||
3460 CBC 6484 : strcmp(type, "TEXT SEARCH DICTIONARY") == 0 ||
3461 6405 : strcmp(type, "TEXT SEARCH CONFIGURATION") == 0 ||
3462 GNC 6351 : strcmp(type, "TYPE") == 0 ||
3463 5999 : strcmp(type, "VIEW") == 0 ||
3464 ECB : /* non-schema-specified objects */
3465 CBC 5730 : strcmp(type, "DATABASE") == 0 ||
3466 GIC 5696 : strcmp(type, "PROCEDURAL LANGUAGE") == 0 ||
3467 5663 : strcmp(type, "SCHEMA") == 0 ||
3468 CBC 5509 : strcmp(type, "EVENT TRIGGER") == 0 ||
3469 GIC 5476 : strcmp(type, "FOREIGN DATA WRAPPER") == 0 ||
3470 5437 : strcmp(type, "SERVER") == 0 ||
3471 5396 : strcmp(type, "PUBLICATION") == 0 ||
3472 GNC 5264 : strcmp(type, "SUBSCRIPTION") == 0)
3473 : {
3474 CBC 6226 : appendPQExpBuffer(buf, "%s ", type);
3475 6226 : if (te->namespace && *te->namespace)
3476 5661 : appendPQExpBuffer(buf, "%s.", fmtId(te->namespace));
3477 GIC 6226 : appendPQExpBufferStr(buf, fmtId(te->tag));
3478 : }
3479 : /* LOs just have a name, but it's numeric so must not use fmtId */
3480 GNC 5165 : else if (strcmp(type, "BLOB") == 0)
3481 : {
3482 CBC 85 : appendPQExpBuffer(buf, "LARGE OBJECT %s", te->tag);
3483 ECB : }
3484 : /*
3485 : * These object types require additional decoration. Fortunately, the
3486 : * information needed is exactly what's in the DROP command.
3487 : */
3488 GNC 5080 : else if (strcmp(type, "AGGREGATE") == 0 ||
3489 4810 : strcmp(type, "FUNCTION") == 0 ||
3490 3319 : strcmp(type, "OPERATOR") == 0 ||
3491 3230 : strcmp(type, "OPERATOR CLASS") == 0 ||
3492 3113 : strcmp(type, "OPERATOR FAMILY") == 0 ||
3493 3010 : strcmp(type, "PROCEDURE") == 0)
3494 : {
3495 : /* Chop "DROP " off the front and make a modifiable copy */
3496 GIC 2149 : char *first = pg_strdup(te->dropStmt + 5);
3497 EUB : char *last;
3498 :
3499 : /* point to last character in string */
3500 GIC 2149 : last = first + strlen(first) - 1;
3501 :
3502 : /* Strip off any ';' or '\n' at the end */
3503 6447 : while (last >= first && (*last == '\n' || *last == ';'))
3504 4298 : last--;
3505 2149 : *(last + 1) = '\0';
3506 :
3507 2149 : appendPQExpBufferStr(buf, first);
3508 ECB :
3509 GIC 2149 : free(first);
3510 CBC 2149 : return;
3511 : }
3512 : /* these object types don't have separate owners */
3513 GNC 2931 : else if (strcmp(type, "CAST") == 0 ||
3514 2931 : strcmp(type, "CHECK CONSTRAINT") == 0 ||
3515 2906 : strcmp(type, "NOT NULL CONSTRAINT") == 0 ||
3516 2906 : strcmp(type, "CONSTRAINT") == 0 ||
3517 2071 : strcmp(type, "DATABASE PROPERTIES") == 0 ||
3518 2070 : strcmp(type, "DEFAULT") == 0 ||
3519 1923 : strcmp(type, "FK CONSTRAINT") == 0 ||
3520 1775 : strcmp(type, "INDEX") == 0 ||
3521 850 : strcmp(type, "RULE") == 0 ||
3522 645 : strcmp(type, "TRIGGER") == 0 ||
3523 282 : strcmp(type, "ROW SECURITY") == 0 ||
3524 282 : strcmp(type, "POLICY") == 0 ||
3525 33 : strcmp(type, "USER MAPPING") == 0)
3526 : {
3527 : /* do nothing */
3528 : }
3529 : else
3530 UNC 0 : pg_fatal("don't know how to set owner for object type \"%s\"", type);
3531 ECB : }
3532 :
3533 : /*
3534 : * Emit the SQL commands to create the object represented by a TOC entry
3535 : *
3536 : * This now also includes issuing an ALTER OWNER command to restore the
3537 : * object's ownership, if wanted. But note that the object's permissions
3538 : * will remain at default, until the matching ACL TOC entry is restored.
3539 : */
3540 : static void
3541 GIC 20646 : _printTocEntry(ArchiveHandle *AH, TocEntry *te, bool isData)
3542 ECB : {
3543 CBC 20646 : RestoreOptions *ropt = AH->public.ropt;
3544 :
3545 ECB : /* Select owner, schema, tablespace and default AM as necessary */
3546 GIC 20646 : _becomeOwner(AH, te);
3547 CBC 20646 : _selectOutputSchema(AH, te->namespace);
3548 20646 : _selectTablespace(AH, te->tablespace);
3549 GIC 20646 : _selectTableAccessMethod(AH, te->tableam);
3550 ECB :
3551 : /* Emit header comment for item */
3552 CBC 20646 : if (!AH->noTocComments)
3553 : {
3554 : const char *pfx;
3555 : char *sanitized_name;
3556 ECB : char *sanitized_schema;
3557 : char *sanitized_owner;
3558 :
3559 CBC 18713 : if (isData)
3560 GIC 3090 : pfx = "Data for ";
3561 : else
3562 15623 : pfx = "";
3563 ECB :
3564 CBC 18713 : ahprintf(AH, "--\n");
3565 18713 : if (AH->public.verbose)
3566 : {
3567 779 : ahprintf(AH, "-- TOC entry %d (class %u OID %u)\n",
3568 : te->dumpId, te->catalogId.tableoid, te->catalogId.oid);
3569 GIC 779 : if (te->nDeps > 0)
3570 : {
3571 ECB : int i;
3572 :
3573 CBC 451 : ahprintf(AH, "-- Dependencies:");
3574 GIC 1150 : for (i = 0; i < te->nDeps; i++)
3575 CBC 699 : ahprintf(AH, " %d", te->dependencies[i]);
3576 GIC 451 : ahprintf(AH, "\n");
3577 : }
3578 : }
3579 EUB :
3580 GBC 18713 : sanitized_name = sanitize_line(te->tag, false);
3581 18713 : sanitized_schema = sanitize_line(te->namespace, true);
3582 GIC 18713 : sanitized_owner = sanitize_line(ropt->noOwner ? NULL : te->owner, true);
3583 ECB :
3584 GIC 18713 : ahprintf(AH, "-- %sName: %s; Type: %s; Schema: %s; Owner: %s",
3585 ECB : pfx, sanitized_name, te->desc, sanitized_schema,
3586 : sanitized_owner);
3587 :
3588 GIC 18713 : free(sanitized_name);
3589 18713 : free(sanitized_schema);
3590 18713 : free(sanitized_owner);
3591 :
3592 18713 : if (te->tablespace && strlen(te->tablespace) > 0 && !ropt->noTablespace)
3593 : {
3594 : char *sanitized_tablespace;
3595 :
3596 UIC 0 : sanitized_tablespace = sanitize_line(te->tablespace, false);
3597 0 : ahprintf(AH, "; Tablespace: %s", sanitized_tablespace);
3598 LBC 0 : free(sanitized_tablespace);
3599 ECB : }
3600 GIC 18713 : ahprintf(AH, "\n");
3601 ECB :
3602 GIC 18713 : if (AH->PrintExtraTocPtr != NULL)
3603 3164 : AH->PrintExtraTocPtr(AH, te);
3604 18713 : ahprintf(AH, "--\n\n");
3605 ECB : }
3606 :
3607 : /*
3608 : * Actually print the definition.
3609 : *
3610 : * Really crude hack for suppressing AUTHORIZATION clause that old pg_dump
3611 : * versions put into CREATE SCHEMA. Don't mutate the variant for schema
3612 : * "public" that is a comment. We have to do this when --no-owner mode is
3613 : * selected. This is ugly, but I see no other good way ...
3614 : */
3615 GIC 20646 : if (ropt->noOwner &&
3616 CBC 272 : strcmp(te->desc, "SCHEMA") == 0 && strncmp(te->defn, "--", 2) != 0)
3617 ECB : {
3618 GBC 2 : ahprintf(AH, "CREATE SCHEMA %s;\n\n\n", fmtId(te->tag));
3619 EUB : }
3620 ECB : else
3621 : {
3622 GIC 20644 : if (te->defn && strlen(te->defn) > 0)
3623 17544 : ahprintf(AH, "%s\n\n", te->defn);
3624 : }
3625 ECB :
3626 : /*
3627 : * If we aren't using SET SESSION AUTH to determine ownership, we must
3628 : * instead issue an ALTER OWNER command. Schema "public" is special; when
3629 : * a dump emits a comment in lieu of creating it, we use ALTER OWNER even
3630 : * when using SET SESSION for all other objects. We assume that anything
3631 : * without a DROP command is not a separately ownable object.
3632 : */
3633 GIC 20646 : if (!ropt->noOwner &&
3634 20374 : (!ropt->use_setsessauth ||
3635 UIC 0 : (strcmp(te->desc, "SCHEMA") == 0 &&
3636 0 : strncmp(te->defn, "--", 2) == 0)) &&
3637 GIC 20374 : te->owner && strlen(te->owner) > 0 &&
3638 19990 : te->dropStmt && strlen(te->dropStmt) > 0)
3639 ECB : {
3640 : PQExpBufferData temp;
3641 :
3642 GNC 11391 : initPQExpBuffer(&temp);
3643 11391 : _getObjectDescription(&temp, te);
3644 : /*
3645 : * If _getObjectDescription() didn't fill the buffer, then there is no
3646 : * owner.
3647 : */
3648 11391 : if (temp.data[0])
3649 8460 : ahprintf(AH, "ALTER %s OWNER TO %s;\n\n", temp.data, fmtId(te->owner));
3650 11391 : termPQExpBuffer(&temp);
3651 ECB : }
3652 :
3653 : /*
3654 : * If it's an ACL entry, it might contain SET SESSION AUTHORIZATION
3655 : * commands, so we can no longer assume we know the current auth setting.
3656 : */
3657 CBC 20646 : if (_tocEntryIsACL(te))
3658 ECB : {
3659 GNC 1964 : free(AH->currUser);
3660 CBC 1964 : AH->currUser = NULL;
3661 ECB : }
3662 CBC 20646 : }
3663 ECB :
3664 : /*
3665 : * Sanitize a string to be included in an SQL comment or TOC listing, by
3666 : * replacing any newlines with spaces. This ensures each logical output line
3667 : * is in fact one physical output line, to prevent corruption of the dump
3668 : * (which could, in the worst case, present an SQL injection vulnerability
3669 : * if someone were to incautiously load a dump containing objects with
3670 : * maliciously crafted names).
3671 : *
3672 : * The result is a freshly malloc'd string. If the input string is NULL,
3673 : * return a malloc'ed empty string, unless want_hyphen, in which case return a
3674 : * malloc'ed hyphen.
3675 : *
3676 : * Note that we currently don't bother to quote names, meaning that the name
3677 : * fields aren't automatically parseable. "pg_restore -L" doesn't care because
3678 : * it only examines the dumpId field, but someday we might want to try harder.
3679 : */
3680 : static char *
3681 GIC 59984 : sanitize_line(const char *str, bool want_hyphen)
3682 : {
3683 : char *result;
3684 ECB : char *s;
3685 :
3686 CBC 59984 : if (!str)
3687 GBC 2085 : return pg_strdup(want_hyphen ? "-" : "");
3688 :
3689 GIC 57899 : result = pg_strdup(str);
3690 ECB :
3691 CBC 699630 : for (s = result; *s != '\0'; s++)
3692 : {
3693 641731 : if (*s == '\n' || *s == '\r')
3694 LBC 0 : *s = ' ';
3695 : }
3696 EUB :
3697 GIC 57899 : return result;
3698 ECB : }
3699 :
3700 : /*
3701 EUB : * Write the file header for a custom-format archive
3702 : */
3703 : void
3704 CBC 25 : WriteHead(ArchiveHandle *AH)
3705 ECB : {
3706 EUB : struct tm crtm;
3707 :
3708 GIC 25 : AH->WriteBufPtr(AH, "PGDMP", 5); /* Magic code */
3709 CBC 25 : AH->WriteBytePtr(AH, ARCHIVE_MAJOR(AH->version));
3710 GBC 25 : AH->WriteBytePtr(AH, ARCHIVE_MINOR(AH->version));
3711 GIC 25 : AH->WriteBytePtr(AH, ARCHIVE_REV(AH->version));
3712 CBC 25 : AH->WriteBytePtr(AH, AH->intSize);
3713 25 : AH->WriteBytePtr(AH, AH->offSize);
3714 GIC 25 : AH->WriteBytePtr(AH, AH->format);
3715 GNC 25 : AH->WriteBytePtr(AH, AH->compression_spec.algorithm);
3716 GIC 25 : crtm = *localtime(&AH->createDate);
3717 CBC 25 : WriteInt(AH, crtm.tm_sec);
3718 GIC 25 : WriteInt(AH, crtm.tm_min);
3719 CBC 25 : WriteInt(AH, crtm.tm_hour);
3720 GBC 25 : WriteInt(AH, crtm.tm_mday);
3721 GIC 25 : WriteInt(AH, crtm.tm_mon);
3722 25 : WriteInt(AH, crtm.tm_year);
3723 CBC 25 : WriteInt(AH, crtm.tm_isdst);
3724 25 : WriteStr(AH, PQdb(AH->connection));
3725 GBC 25 : WriteStr(AH, AH->public.remoteVersionStr);
3726 GIC 25 : WriteStr(AH, PG_VERSION);
3727 25 : }
3728 EUB :
3729 : void
3730 GIC 31 : ReadHead(ArchiveHandle *AH)
3731 EUB : {
3732 : char *errmsg;
3733 : char vmaj,
3734 : vmin,
3735 : vrev;
3736 : int fmt;
3737 :
3738 : /*
3739 : * If we haven't already read the header, do so.
3740 ECB : *
3741 : * NB: this code must agree with _discoverArchiveFormat(). Maybe find a
3742 : * way to unify the cases?
3743 EUB : */
3744 GIC 31 : if (!AH->readHeader)
3745 EUB : {
3746 : char tmpMag[7];
3747 :
3748 CBC 31 : AH->ReadBufPtr(AH, tmpMag, 5);
3749 :
3750 GIC 31 : if (strncmp(tmpMag, "PGDMP", 5) != 0)
3751 UIC 0 : pg_fatal("did not find magic string in file header");
3752 ECB : }
3753 :
3754 CBC 31 : vmaj = AH->ReadBytePtr(AH);
3755 31 : vmin = AH->ReadBytePtr(AH);
3756 ECB :
3757 CBC 31 : if (vmaj > 1 || (vmaj == 1 && vmin > 0)) /* Version > 1.0 */
3758 31 : vrev = AH->ReadBytePtr(AH);
3759 : else
3760 UIC 0 : vrev = 0;
3761 :
3762 GIC 31 : AH->version = MAKE_ARCHIVE_VERSION(vmaj, vmin, vrev);
3763 :
3764 31 : if (AH->version < K_VERS_1_0 || AH->version > K_VERS_MAX)
3765 UIC 0 : pg_fatal("unsupported version (%d.%d) in file header",
3766 : vmaj, vmin);
3767 :
3768 GIC 31 : AH->intSize = AH->ReadBytePtr(AH);
3769 31 : if (AH->intSize > 32)
3770 UIC 0 : pg_fatal("sanity check on integer size (%lu) failed",
3771 : (unsigned long) AH->intSize);
3772 :
3773 CBC 31 : if (AH->intSize > sizeof(int))
3774 LBC 0 : pg_log_warning("archive was made on a machine with larger integers, some operations might fail");
3775 :
3776 GBC 31 : if (AH->version >= K_VERS_1_7)
3777 31 : AH->offSize = AH->ReadBytePtr(AH);
3778 EUB : else
3779 UBC 0 : AH->offSize = AH->intSize;
3780 :
3781 GIC 31 : fmt = AH->ReadBytePtr(AH);
3782 :
3783 CBC 31 : if (AH->format != fmt)
3784 UIC 0 : pg_fatal("expected format (%d) differs from format found in file (%d)",
3785 ECB : AH->format, fmt);
3786 :
3787 GNC 31 : if (AH->version >= K_VERS_1_15)
3788 31 : AH->compression_spec.algorithm = AH->ReadBytePtr(AH);
3789 UNC 0 : else if (AH->version >= K_VERS_1_2)
3790 ECB : {
3791 : /* Guess the compression method based on the level */
3792 UIC 0 : if (AH->version < K_VERS_1_4)
3793 UNC 0 : AH->compression_spec.level = AH->ReadBytePtr(AH);
3794 ECB : else
3795 UNC 0 : AH->compression_spec.level = ReadInt(AH);
3796 :
3797 0 : if (AH->compression_spec.level != 0)
3798 0 : AH->compression_spec.algorithm = PG_COMPRESSION_GZIP;
3799 ECB : }
3800 : else
3801 UNC 0 : AH->compression_spec.algorithm = PG_COMPRESSION_GZIP;
3802 :
3803 GNC 31 : errmsg = supports_compression(AH->compression_spec);
3804 31 : if (errmsg)
3805 : {
3806 UNC 0 : pg_log_warning("archive is compressed, but this installation does not support compression (%s) -- no data will be available",
3807 : errmsg);
3808 0 : pg_free(errmsg);
3809 : }
3810 ECB :
3811 GIC 31 : if (AH->version >= K_VERS_1_4)
3812 : {
3813 : struct tm crtm;
3814 :
3815 CBC 31 : crtm.tm_sec = ReadInt(AH);
3816 31 : crtm.tm_min = ReadInt(AH);
3817 31 : crtm.tm_hour = ReadInt(AH);
3818 GIC 31 : crtm.tm_mday = ReadInt(AH);
3819 31 : crtm.tm_mon = ReadInt(AH);
3820 31 : crtm.tm_year = ReadInt(AH);
3821 31 : crtm.tm_isdst = ReadInt(AH);
3822 :
3823 : /*
3824 ECB : * Newer versions of glibc have mktime() report failure if tm_isdst is
3825 EUB : * inconsistent with the prevailing timezone, e.g. tm_isdst = 1 when
3826 : * TZ=UTC. This is problematic when restoring an archive under a
3827 ECB : * different timezone setting. If we get a failure, try again with
3828 : * tm_isdst set to -1 ("don't know").
3829 : *
3830 : * XXX with or without this hack, we reconstruct createDate
3831 : * incorrectly when the prevailing timezone is different from
3832 : * pg_dump's. Next time we bump the archive version, we should flush
3833 : * this representation and store a plain seconds-since-the-Epoch
3834 : * timestamp instead.
3835 : */
3836 GIC 31 : AH->createDate = mktime(&crtm);
3837 31 : if (AH->createDate == (time_t) -1)
3838 : {
3839 LBC 0 : crtm.tm_isdst = -1;
3840 0 : AH->createDate = mktime(&crtm);
3841 0 : if (AH->createDate == (time_t) -1)
3842 UIC 0 : pg_log_warning("invalid creation date in header");
3843 : }
3844 : }
3845 :
3846 GIC 31 : if (AH->version >= K_VERS_1_4)
3847 : {
3848 31 : AH->archdbname = ReadStr(AH);
3849 : }
3850 :
3851 31 : if (AH->version >= K_VERS_1_10)
3852 : {
3853 CBC 31 : AH->archiveRemoteVersion = ReadStr(AH);
3854 GIC 31 : AH->archiveDumpVersion = ReadStr(AH);
3855 : }
3856 31 : }
3857 :
3858 ECB :
3859 : /*
3860 : * checkSeek
3861 : * check to see if ftell/fseek can be performed.
3862 : */
3863 : bool
3864 GIC 33 : checkSeek(FILE *fp)
3865 : {
3866 : pgoff_t tpos;
3867 :
3868 : /* Check that ftello works on this file */
3869 33 : tpos = ftello(fp);
3870 33 : if (tpos < 0)
3871 1 : return false;
3872 :
3873 : /*
3874 : * Check that fseeko(SEEK_SET) works, too. NB: we used to try to test
3875 : * this with fseeko(fp, 0, SEEK_CUR). But some platforms treat that as a
3876 : * successful no-op even on files that are otherwise unseekable.
3877 : */
3878 32 : if (fseeko(fp, tpos, SEEK_SET) != 0)
3879 UIC 0 : return false;
3880 :
3881 GIC 32 : return true;
3882 : }
3883 ECB :
3884 :
3885 : /*
3886 : * dumpTimestamp
3887 : */
3888 : static void
3889 CBC 34 : dumpTimestamp(ArchiveHandle *AH, const char *msg, time_t tim)
3890 : {
3891 : char buf[64];
3892 ECB :
3893 CBC 34 : if (strftime(buf, sizeof(buf), PGDUMP_STRFTIME_FMT, localtime(&tim)) != 0)
3894 GIC 34 : ahprintf(AH, "-- %s %s\n\n", msg, buf);
3895 CBC 34 : }
3896 ECB :
3897 : /*
3898 : * Main engine for parallel restore.
3899 : *
3900 : * Parallel restore is done in three phases. In this first phase,
3901 : * we'll process all SECTION_PRE_DATA TOC entries that are allowed to be
3902 : * processed in the RESTORE_PASS_MAIN pass. (In practice, that's all
3903 : * PRE_DATA items other than ACLs.) Entries we can't process now are
3904 : * added to the pending_list for later phases to deal with.
3905 : */
3906 EUB : static void
3907 GBC 4 : restore_toc_entries_prefork(ArchiveHandle *AH, TocEntry *pending_list)
3908 : {
3909 : bool skipped_some;
3910 : TocEntry *next_work_item;
3911 :
3912 GIC 4 : pg_log_debug("entering restore_toc_entries_prefork");
3913 :
3914 : /* Adjust dependency information */
3915 4 : fix_dependencies(AH);
3916 ECB :
3917 EUB : /*
3918 : * Do all the early stuff in a single connection in the parent. There's no
3919 ECB : * great point in running it in parallel, in fact it will actually run
3920 : * faster in a single connection because we avoid all the connection and
3921 : * setup overhead. Also, pre-9.2 pg_dump versions were not very good
3922 : * about showing all the dependencies of SECTION_PRE_DATA items, so we do
3923 : * not risk trying to process them out-of-order.
3924 : *
3925 : * Stuff that we can't do immediately gets added to the pending_list.
3926 : * Note: we don't yet filter out entries that aren't going to be restored.
3927 : * They might participate in dependency chains connecting entries that
3928 : * should be restored, so we treat them as live until we actually process
3929 : * them.
3930 : *
3931 : * Note: as of 9.2, it should be guaranteed that all PRE_DATA items appear
3932 : * before DATA items, and all DATA items before POST_DATA items. That is
3933 : * not certain to be true in older archives, though, and in any case use
3934 : * of a list file would destroy that ordering (cf. SortTocFromFile). So
3935 : * this loop cannot assume that it holds.
3936 : */
3937 GIC 4 : AH->restorePass = RESTORE_PASS_MAIN;
3938 4 : skipped_some = false;
3939 100 : for (next_work_item = AH->toc->next; next_work_item != AH->toc; next_work_item = next_work_item->next)
3940 : {
3941 96 : bool do_now = true;
3942 :
3943 CBC 96 : if (next_work_item->section != SECTION_PRE_DATA)
3944 : {
3945 : /* DATA and POST_DATA items are just ignored for now */
3946 46 : if (next_work_item->section == SECTION_DATA ||
3947 30 : next_work_item->section == SECTION_POST_DATA)
3948 ECB : {
3949 CBC 46 : do_now = false;
3950 46 : skipped_some = true;
3951 ECB : }
3952 : else
3953 : {
3954 : /*
3955 : * SECTION_NONE items, such as comments, can be processed now
3956 : * if we are still in the PRE_DATA part of the archive. Once
3957 : * we've skipped any items, we have to consider whether the
3958 : * comment's dependencies are satisfied, so skip it for now.
3959 : */
3960 UIC 0 : if (skipped_some)
3961 0 : do_now = false;
3962 : }
3963 : }
3964 :
3965 : /*
3966 : * Also skip items that need to be forced into later passes. We need
3967 : * not set skipped_some in this case, since by assumption no main-pass
3968 ECB : * items could depend on these.
3969 : */
3970 GIC 96 : if (_tocEntryRestorePass(next_work_item) != RESTORE_PASS_MAIN)
3971 UIC 0 : do_now = false;
3972 :
3973 GIC 96 : if (do_now)
3974 ECB : {
3975 : /* OK, restore the item and update its dependencies */
3976 GIC 50 : pg_log_info("processing item %d %s %s",
3977 ECB : next_work_item->dumpId,
3978 : next_work_item->desc, next_work_item->tag);
3979 :
3980 GIC 50 : (void) restore_toc_entry(AH, next_work_item, false);
3981 :
3982 : /* Reduce dependencies, but don't move anything to ready_list */
3983 50 : reduce_dependencies(AH, next_work_item, NULL);
3984 : }
3985 : else
3986 : {
3987 ECB : /* Nope, so add it to pending_list */
3988 CBC 46 : pending_list_append(pending_list, next_work_item);
3989 : }
3990 : }
3991 :
3992 : /*
3993 : * Now close parent connection in prep for parallel steps. We do this
3994 : * mainly to ensure that we don't exceed the specified number of parallel
3995 : * connections.
3996 : */
3997 4 : DisconnectDatabase(&AH->public);
3998 :
3999 : /* blow away any transient state from the old connection */
4000 GNC 4 : free(AH->currUser);
4001 CBC 4 : AH->currUser = NULL;
4002 GNC 4 : free(AH->currSchema);
4003 GIC 4 : AH->currSchema = NULL;
4004 GNC 4 : free(AH->currTablespace);
4005 GBC 4 : AH->currTablespace = NULL;
4006 GNC 4 : free(AH->currTableAm);
4007 GIC 4 : AH->currTableAm = NULL;
4008 GBC 4 : }
4009 :
4010 EUB : /*
4011 : * Main engine for parallel restore.
4012 : *
4013 ECB : * Parallel restore is done in three phases. In this second phase,
4014 : * we process entries by dispatching them to parallel worker children
4015 : * (processes on Unix, threads on Windows), each of which connects
4016 : * separately to the database. Inter-entry dependencies are respected,
4017 : * and so is the RestorePass multi-pass structure. When we can no longer
4018 : * make any entries ready to process, we exit. Normally, there will be
4019 : * nothing left to do; but if there is, the third phase will mop up.
4020 : */
4021 : static void
4022 GIC 4 : restore_toc_entries_parallel(ArchiveHandle *AH, ParallelState *pstate,
4023 : TocEntry *pending_list)
4024 : {
4025 : ParallelReadyList ready_list;
4026 : TocEntry *next_work_item;
4027 ECB :
4028 CBC 4 : pg_log_debug("entering restore_toc_entries_parallel");
4029 :
4030 : /* Set up ready_list with enough room for all known TocEntrys */
4031 4 : ready_list_init(&ready_list, AH->tocCount);
4032 :
4033 ECB : /*
4034 : * The pending_list contains all items that we need to restore. Move all
4035 : * items that are available to process immediately into the ready_list.
4036 : * After this setup, the pending list is everything that needs to be done
4037 : * but is blocked by one or more dependencies, while the ready list
4038 : * contains items that have no remaining dependencies and are OK to
4039 : * process in the current restore pass.
4040 : */
4041 GIC 4 : AH->restorePass = RESTORE_PASS_MAIN;
4042 4 : move_to_ready_list(pending_list, &ready_list, AH->restorePass);
4043 :
4044 : /*
4045 : * main parent loop
4046 : *
4047 : * Keep going until there is no worker still running AND there is no work
4048 : * left to be done. Note invariant: at top of loop, there should always
4049 : * be at least one worker available to dispatch a job to.
4050 : */
4051 4 : pg_log_info("entering main parallel loop");
4052 :
4053 : for (;;)
4054 : {
4055 : /* Look for an item ready to be dispatched to a worker */
4056 68 : next_work_item = pop_next_work_item(&ready_list, pstate);
4057 68 : if (next_work_item != NULL)
4058 ECB : {
4059 : /* If not to be restored, don't waste time launching a worker */
4060 GIC 46 : if ((next_work_item->reqs & (REQ_SCHEMA | REQ_DATA)) == 0)
4061 : {
4062 UIC 0 : pg_log_info("skipping item %d %s %s",
4063 ECB : next_work_item->dumpId,
4064 : next_work_item->desc, next_work_item->tag);
4065 : /* Update its dependencies as though we'd completed it */
4066 UIC 0 : reduce_dependencies(AH, next_work_item, &ready_list);
4067 ECB : /* Loop around to see if anything else can be dispatched */
4068 LBC 0 : continue;
4069 : }
4070 :
4071 GIC 46 : pg_log_info("launching item %d %s %s",
4072 : next_work_item->dumpId,
4073 : next_work_item->desc, next_work_item->tag);
4074 :
4075 : /* Dispatch to some worker */
4076 46 : DispatchJobForTocEntry(AH, pstate, next_work_item, ACT_RESTORE,
4077 : mark_restore_job_done, &ready_list);
4078 : }
4079 22 : else if (IsEveryWorkerIdle(pstate))
4080 ECB : {
4081 : /*
4082 : * Nothing is ready and no worker is running, so we're done with
4083 : * the current pass or maybe with the whole process.
4084 : */
4085 CBC 12 : if (AH->restorePass == RESTORE_PASS_LAST)
4086 GIC 4 : break; /* No more parallel processing is possible */
4087 :
4088 : /* Advance to next restore pass */
4089 8 : AH->restorePass++;
4090 ECB : /* That probably allows some stuff to be made ready */
4091 GIC 8 : move_to_ready_list(pending_list, &ready_list, AH->restorePass);
4092 : /* Loop around to see if anything's now ready */
4093 CBC 8 : continue;
4094 : }
4095 : else
4096 : {
4097 : /*
4098 : * We have nothing ready, but at least one child is working, so
4099 : * wait for some subjob to finish.
4100 : */
4101 ECB : }
4102 :
4103 EUB : /*
4104 : * Before dispatching another job, check to see if anything has
4105 : * finished. We should check every time through the loop so as to
4106 : * reduce dependencies as soon as possible. If we were unable to
4107 ECB : * dispatch any job this time through, wait until some worker finishes
4108 : * (and, hopefully, unblocks some pending item). If we did dispatch
4109 : * something, continue as soon as there's at least one idle worker.
4110 : * Note that in either case, there's guaranteed to be at least one
4111 : * idle worker when we return to the top of the loop. This ensures we
4112 : * won't block inside DispatchJobForTocEntry, which would be
4113 : * undesirable: we'd rather postpone dispatching until we see what's
4114 : * been unblocked by finished jobs.
4115 : */
4116 GIC 56 : WaitForWorkers(AH, pstate,
4117 : next_work_item ? WFW_ONE_IDLE : WFW_GOT_STATUS);
4118 : }
4119 ECB :
4120 : /* There should now be nothing in ready_list. */
4121 CBC 4 : Assert(ready_list.first_te > ready_list.last_te);
4122 :
4123 4 : ready_list_free(&ready_list);
4124 ECB :
4125 GIC 4 : pg_log_info("finished main parallel loop");
4126 4 : }
4127 ECB :
4128 : /*
4129 : * Main engine for parallel restore.
4130 : *
4131 : * Parallel restore is done in three phases. In this third phase,
4132 : * we mop up any remaining TOC entries by processing them serially.
4133 : * This phase normally should have nothing to do, but if we've somehow
4134 : * gotten stuck due to circular dependencies or some such, this provides
4135 : * at least some chance of completing the restore successfully.
4136 : */
4137 : static void
4138 GIC 4 : restore_toc_entries_postfork(ArchiveHandle *AH, TocEntry *pending_list)
4139 ECB : {
4140 GIC 4 : RestoreOptions *ropt = AH->public.ropt;
4141 ECB : TocEntry *te;
4142 :
4143 GIC 4 : pg_log_debug("entering restore_toc_entries_postfork");
4144 :
4145 : /*
4146 ECB : * Now reconnect the single parent connection.
4147 : */
4148 CBC 4 : ConnectDatabase((Archive *) AH, &ropt->cparams, true);
4149 ECB :
4150 : /* re-establish fixed state */
4151 CBC 4 : _doSetFixedOutputState(AH);
4152 ECB :
4153 : /*
4154 : * Make sure there is no work left due to, say, circular dependencies, or
4155 : * some other pathological condition. If so, do it in the single parent
4156 : * connection. We don't sweat about RestorePass ordering; it's likely we
4157 : * already violated that.
4158 : */
4159 CBC 4 : for (te = pending_list->pending_next; te != pending_list; te = te->pending_next)
4160 ECB : {
4161 LBC 0 : pg_log_info("processing missed item %d %s %s",
4162 ECB : te->dumpId, te->desc, te->tag);
4163 UIC 0 : (void) restore_toc_entry(AH, te, false);
4164 : }
4165 GIC 4 : }
4166 :
4167 : /*
4168 : * Check if te1 has an exclusive lock requirement for an item that te2 also
4169 ECB : * requires, whether or not te2's requirement is for an exclusive lock.
4170 : */
4171 : static bool
4172 CBC 154 : has_lock_conflicts(TocEntry *te1, TocEntry *te2)
4173 ECB : {
4174 : int j,
4175 : k;
4176 :
4177 GIC 391 : for (j = 0; j < te1->nLockDeps; j++)
4178 : {
4179 1075 : for (k = 0; k < te2->nDeps; k++)
4180 : {
4181 838 : if (te1->lockDeps[j] == te2->dependencies[k])
4182 CBC 2 : return true;
4183 : }
4184 ECB : }
4185 CBC 152 : return false;
4186 : }
4187 :
4188 :
4189 ECB : /*
4190 : * Initialize the header of the pending-items list.
4191 : *
4192 : * This is a circular list with a dummy TocEntry as header, just like the
4193 : * main TOC list; but we use separate list links so that an entry can be in
4194 : * the main TOC list as well as in the pending list.
4195 : */
4196 : static void
4197 GIC 4 : pending_list_header_init(TocEntry *l)
4198 ECB : {
4199 GIC 4 : l->pending_prev = l->pending_next = l;
4200 CBC 4 : }
4201 :
4202 ECB : /* Append te to the end of the pending-list headed by l */
4203 : static void
4204 GIC 46 : pending_list_append(TocEntry *l, TocEntry *te)
4205 : {
4206 46 : te->pending_prev = l->pending_prev;
4207 46 : l->pending_prev->pending_next = te;
4208 46 : l->pending_prev = te;
4209 46 : te->pending_next = l;
4210 46 : }
4211 ECB :
4212 : /* Remove te from the pending-list */
4213 EUB : static void
4214 GIC 46 : pending_list_remove(TocEntry *te)
4215 EUB : {
4216 GIC 46 : te->pending_prev->pending_next = te->pending_next;
4217 CBC 46 : te->pending_next->pending_prev = te->pending_prev;
4218 46 : te->pending_prev = NULL;
4219 GIC 46 : te->pending_next = NULL;
4220 46 : }
4221 :
4222 ECB :
4223 : /*
4224 : * Initialize the ready_list with enough room for up to tocCount entries.
4225 : */
4226 : static void
4227 GIC 4 : ready_list_init(ParallelReadyList *ready_list, int tocCount)
4228 ECB : {
4229 CBC 4 : ready_list->tes = (TocEntry **)
4230 GIC 4 : pg_malloc(tocCount * sizeof(TocEntry *));
4231 4 : ready_list->first_te = 0;
4232 CBC 4 : ready_list->last_te = -1;
4233 GIC 4 : ready_list->sorted = false;
4234 CBC 4 : }
4235 :
4236 : /*
4237 : * Free storage for a ready_list.
4238 ECB : */
4239 : static void
4240 CBC 4 : ready_list_free(ParallelReadyList *ready_list)
4241 ECB : {
4242 GIC 4 : pg_free(ready_list->tes);
4243 4 : }
4244 ECB :
4245 : /* Add te to the ready_list */
4246 : static void
4247 CBC 46 : ready_list_insert(ParallelReadyList *ready_list, TocEntry *te)
4248 : {
4249 GIC 46 : ready_list->tes[++ready_list->last_te] = te;
4250 ECB : /* List is (probably) not sorted anymore. */
4251 CBC 46 : ready_list->sorted = false;
4252 46 : }
4253 ECB :
4254 : /* Remove the i'th entry in the ready_list */
4255 EUB : static void
4256 GIC 46 : ready_list_remove(ParallelReadyList *ready_list, int i)
4257 : {
4258 46 : int f = ready_list->first_te;
4259 :
4260 46 : Assert(i >= f && i <= ready_list->last_te);
4261 :
4262 : /*
4263 : * In the typical case where the item to be removed is the first ready
4264 : * entry, we need only increment first_te to remove it. Otherwise, move
4265 : * the entries before it to compact the list. (This preserves sortedness,
4266 : * if any.) We could alternatively move the entries after i, but there
4267 ECB : * are typically many more of those.
4268 : */
4269 GIC 46 : if (i > f)
4270 : {
4271 UIC 0 : TocEntry **first_te_ptr = &ready_list->tes[f];
4272 :
4273 0 : memmove(first_te_ptr + 1, first_te_ptr, (i - f) * sizeof(TocEntry *));
4274 ECB : }
4275 GIC 46 : ready_list->first_te++;
4276 46 : }
4277 ECB :
4278 : /* Sort the ready_list into the desired order */
4279 : static void
4280 CBC 68 : ready_list_sort(ParallelReadyList *ready_list)
4281 : {
4282 GIC 68 : if (!ready_list->sorted)
4283 ECB : {
4284 GIC 22 : int n = ready_list->last_te - ready_list->first_te + 1;
4285 ECB :
4286 GIC 22 : if (n > 1)
4287 14 : qsort(ready_list->tes + ready_list->first_te, n,
4288 ECB : sizeof(TocEntry *),
4289 : TocEntrySizeCompare);
4290 GIC 22 : ready_list->sorted = true;
4291 : }
4292 68 : }
4293 :
4294 : /* qsort comparator for sorting TocEntries by dataLength */
4295 : static int
4296 540 : TocEntrySizeCompare(const void *p1, const void *p2)
4297 : {
4298 540 : const TocEntry *te1 = *(const TocEntry *const *) p1;
4299 540 : const TocEntry *te2 = *(const TocEntry *const *) p2;
4300 :
4301 : /* Sort by decreasing dataLength */
4302 CBC 540 : if (te1->dataLength > te2->dataLength)
4303 GIC 47 : return -1;
4304 493 : if (te1->dataLength < te2->dataLength)
4305 54 : return 1;
4306 :
4307 : /* For equal dataLengths, sort by dumpId, just to be stable */
4308 CBC 439 : if (te1->dumpId < te2->dumpId)
4309 GIC 183 : return -1;
4310 256 : if (te1->dumpId > te2->dumpId)
4311 256 : return 1;
4312 :
4313 LBC 0 : return 0;
4314 : }
4315 ECB :
4316 :
4317 : /*
4318 : * Move all immediately-ready items from pending_list to ready_list.
4319 : *
4320 : * Items are considered ready if they have no remaining dependencies and
4321 : * they belong in the current restore pass. (See also reduce_dependencies,
4322 : * which applies the same logic one-at-a-time.)
4323 : */
4324 : static void
4325 CBC 12 : move_to_ready_list(TocEntry *pending_list,
4326 : ParallelReadyList *ready_list,
4327 ECB : RestorePass pass)
4328 : {
4329 : TocEntry *te;
4330 : TocEntry *next_te;
4331 :
4332 CBC 58 : for (te = pending_list->pending_next; te != pending_list; te = next_te)
4333 ECB : {
4334 : /* must save list link before possibly removing te from list */
4335 GIC 46 : next_te = te->pending_next;
4336 :
4337 CBC 66 : if (te->depCount == 0 &&
4338 20 : _tocEntryRestorePass(te) == pass)
4339 : {
4340 : /* Remove it from pending_list ... */
4341 20 : pending_list_remove(te);
4342 ECB : /* ... and add to ready_list */
4343 GIC 20 : ready_list_insert(ready_list, te);
4344 : }
4345 ECB : }
4346 CBC 12 : }
4347 :
4348 : /*
4349 : * Find the next work item (if any) that is capable of being run now,
4350 : * and remove it from the ready_list.
4351 : *
4352 : * Returns the item, or NULL if nothing is runnable.
4353 : *
4354 : * To qualify, the item must have no remaining dependencies
4355 : * and no requirements for locks that are incompatible with
4356 : * items currently running. Items in the ready_list are known to have
4357 : * no remaining dependencies, but we have to check for lock conflicts.
4358 : */
4359 ECB : static TocEntry *
4360 GIC 68 : pop_next_work_item(ParallelReadyList *ready_list,
4361 : ParallelState *pstate)
4362 : {
4363 ECB : /*
4364 : * Sort the ready_list so that we'll tackle larger jobs first.
4365 : */
4366 CBC 68 : ready_list_sort(ready_list);
4367 :
4368 : /*
4369 ECB : * Search the ready_list until we find a suitable item.
4370 : */
4371 CBC 70 : for (int i = ready_list->first_te; i <= ready_list->last_te; i++)
4372 : {
4373 GIC 48 : TocEntry *te = ready_list->tes[i];
4374 48 : bool conflicts = false;
4375 :
4376 : /*
4377 : * Check to see if the item would need exclusive lock on something
4378 : * that a currently running item also needs lock on, or vice versa. If
4379 : * so, we don't want to schedule them together.
4380 : */
4381 186 : for (int k = 0; k < pstate->numWorkers; k++)
4382 ECB : {
4383 GIC 140 : TocEntry *running_te = pstate->te[k];
4384 :
4385 140 : if (running_te == NULL)
4386 62 : continue;
4387 CBC 154 : if (has_lock_conflicts(te, running_te) ||
4388 GIC 76 : has_lock_conflicts(running_te, te))
4389 ECB : {
4390 GIC 2 : conflicts = true;
4391 2 : break;
4392 ECB : }
4393 EUB : }
4394 ECB :
4395 GIC 48 : if (conflicts)
4396 GBC 2 : continue;
4397 EUB :
4398 : /* passed all tests, so this item can run */
4399 CBC 46 : ready_list_remove(ready_list, i);
4400 GBC 46 : return te;
4401 ECB : }
4402 EUB :
4403 GIC 22 : pg_log_debug("no item ready");
4404 22 : return NULL;
4405 ECB : }
4406 :
4407 :
4408 : /*
4409 : * Restore a single TOC item in parallel with others
4410 : *
4411 : * this is run in the worker, i.e. in a thread (Windows) or a separate process
4412 : * (everything else). A worker process executes several such work items during
4413 : * a parallel backup or restore. Once we terminate here and report back that
4414 : * our work is finished, the leader process will assign us a new work item.
4415 : */
4416 : int
4417 GIC 46 : parallel_restore(ArchiveHandle *AH, TocEntry *te)
4418 : {
4419 : int status;
4420 :
4421 46 : Assert(AH->connection != NULL);
4422 :
4423 ECB : /* Count only errors associated with this TOC entry */
4424 GIC 46 : AH->public.n_errors = 0;
4425 :
4426 : /* Restore the TOC item */
4427 46 : status = restore_toc_entry(AH, te, true);
4428 :
4429 46 : return status;
4430 : }
4431 :
4432 ECB :
4433 : /*
4434 : * Callback function that's invoked in the leader process after a step has
4435 : * been parallel restored.
4436 : *
4437 : * Update status and reduce the dependency count of any dependent items.
4438 : */
4439 : static void
4440 GIC 46 : mark_restore_job_done(ArchiveHandle *AH,
4441 : TocEntry *te,
4442 : int status,
4443 : void *callback_data)
4444 : {
4445 46 : ParallelReadyList *ready_list = (ParallelReadyList *) callback_data;
4446 ECB :
4447 GIC 46 : pg_log_info("finished item %d %s %s",
4448 : te->dumpId, te->desc, te->tag);
4449 :
4450 46 : if (status == WORKER_CREATE_DONE)
4451 UIC 0 : mark_create_done(AH, te);
4452 GIC 46 : else if (status == WORKER_INHIBIT_DATA)
4453 ECB : {
4454 UIC 0 : inhibit_data_for_failed_table(AH, te);
4455 UBC 0 : AH->public.n_errors++;
4456 : }
4457 GBC 46 : else if (status == WORKER_IGNORED_ERRORS)
4458 UIC 0 : AH->public.n_errors++;
4459 GIC 46 : else if (status != 0)
4460 UIC 0 : pg_fatal("worker process failed: exit code %d",
4461 EUB : status);
4462 :
4463 GBC 46 : reduce_dependencies(AH, te, ready_list);
4464 GIC 46 : }
4465 EUB :
4466 :
4467 : /*
4468 : * Process the dependency information into a form useful for parallel restore.
4469 : *
4470 : * This function takes care of fixing up some missing or badly designed
4471 : * dependencies, and then prepares subsidiary data structures that will be
4472 : * used in the main parallel-restore logic, including:
4473 : * 1. We build the revDeps[] arrays of incoming dependency dumpIds.
4474 : * 2. We set up depCount fields that are the number of as-yet-unprocessed
4475 : * dependencies for each TOC entry.
4476 : *
4477 : * We also identify locking dependencies so that we can avoid trying to
4478 : * schedule conflicting items at the same time.
4479 : */
4480 : static void
4481 GIC 4 : fix_dependencies(ArchiveHandle *AH)
4482 : {
4483 : TocEntry *te;
4484 : int i;
4485 :
4486 : /*
4487 : * Initialize the depCount/revDeps/nRevDeps fields, and make sure the TOC
4488 ECB : * items are marked as not being in any parallel-processing list.
4489 : */
4490 CBC 100 : for (te = AH->toc->next; te != AH->toc; te = te->next)
4491 : {
4492 96 : te->depCount = te->nDeps;
4493 GIC 96 : te->revDeps = NULL;
4494 CBC 96 : te->nRevDeps = 0;
4495 96 : te->pending_prev = NULL;
4496 GIC 96 : te->pending_next = NULL;
4497 EUB : }
4498 :
4499 : /*
4500 : * POST_DATA items that are shown as depending on a table need to be
4501 : * re-pointed to depend on that table's data, instead. This ensures they
4502 : * won't get scheduled until the data has been loaded.
4503 : */
4504 GIC 4 : repoint_table_dependencies(AH);
4505 ECB :
4506 : /*
4507 : * Pre-8.4 versions of pg_dump neglected to set up a dependency from BLOB
4508 : * COMMENTS to BLOBS. Cope. (We assume there's only one BLOBS and only
4509 : * one BLOB COMMENTS in such files.)
4510 : */
4511 GIC 4 : if (AH->version < K_VERS_1_11)
4512 : {
4513 UIC 0 : for (te = AH->toc->next; te != AH->toc; te = te->next)
4514 : {
4515 0 : if (strcmp(te->desc, "BLOB COMMENTS") == 0 && te->nDeps == 0)
4516 ECB : {
4517 : TocEntry *te2;
4518 :
4519 UIC 0 : for (te2 = AH->toc->next; te2 != AH->toc; te2 = te2->next)
4520 ECB : {
4521 UIC 0 : if (strcmp(te2->desc, "BLOBS") == 0)
4522 ECB : {
4523 UIC 0 : te->dependencies = (DumpId *) pg_malloc(sizeof(DumpId));
4524 LBC 0 : te->dependencies[0] = te2->dumpId;
4525 UIC 0 : te->nDeps++;
4526 LBC 0 : te->depCount++;
4527 UIC 0 : break;
4528 : }
4529 : }
4530 0 : break;
4531 : }
4532 : }
4533 : }
4534 ECB :
4535 : /*
4536 : * At this point we start to build the revDeps reverse-dependency arrays,
4537 : * so all changes of dependencies must be complete.
4538 : */
4539 :
4540 : /*
4541 : * Count the incoming dependencies for each item. Also, it is possible
4542 : * that the dependencies list items that are not in the archive at all
4543 : * (that should not happen in 9.2 and later, but is highly likely in older
4544 : * archives). Subtract such items from the depCounts.
4545 : */
4546 GIC 100 : for (te = AH->toc->next; te != AH->toc; te = te->next)
4547 : {
4548 288 : for (i = 0; i < te->nDeps; i++)
4549 : {
4550 192 : DumpId depid = te->dependencies[i];
4551 :
4552 192 : if (depid <= AH->maxDumpId && AH->tocsByDumpId[depid] != NULL)
4553 CBC 192 : AH->tocsByDumpId[depid]->nRevDeps++;
4554 : else
4555 UIC 0 : te->depCount--;
4556 : }
4557 : }
4558 :
4559 ECB : /*
4560 : * Allocate space for revDeps[] arrays, and reset nRevDeps so we can use
4561 : * it as a counter below.
4562 : */
4563 CBC 100 : for (te = AH->toc->next; te != AH->toc; te = te->next)
4564 : {
4565 96 : if (te->nRevDeps > 0)
4566 52 : te->revDeps = (DumpId *) pg_malloc(te->nRevDeps * sizeof(DumpId));
4567 96 : te->nRevDeps = 0;
4568 : }
4569 ECB :
4570 : /*
4571 : * Build the revDeps[] arrays of incoming-dependency dumpIds. This had
4572 : * better agree with the loops above.
4573 : */
4574 CBC 100 : for (te = AH->toc->next; te != AH->toc; te = te->next)
4575 : {
4576 GIC 288 : for (i = 0; i < te->nDeps; i++)
4577 : {
4578 192 : DumpId depid = te->dependencies[i];
4579 ECB :
4580 GIC 192 : if (depid <= AH->maxDumpId && AH->tocsByDumpId[depid] != NULL)
4581 : {
4582 192 : TocEntry *otherte = AH->tocsByDumpId[depid];
4583 :
4584 192 : otherte->revDeps[otherte->nRevDeps++] = te->dumpId;
4585 : }
4586 : }
4587 ECB : }
4588 :
4589 : /*
4590 : * Lastly, work out the locking dependencies.
4591 : */
4592 GIC 100 : for (te = AH->toc->next; te != AH->toc; te = te->next)
4593 : {
4594 96 : te->lockDeps = NULL;
4595 96 : te->nLockDeps = 0;
4596 96 : identify_locking_dependencies(AH, te);
4597 ECB : }
4598 CBC 4 : }
4599 :
4600 : /*
4601 ECB : * Change dependencies on table items to depend on table data items instead,
4602 EUB : * but only in POST_DATA items.
4603 : *
4604 : * Also, for any item having such dependency(s), set its dataLength to the
4605 : * largest dataLength of the table data items it depends on. This ensures
4606 : * that parallel restore will prioritize larger jobs (index builds, FK
4607 : * constraint checks, etc) over smaller ones, avoiding situations where we
4608 : * end a restore with only one active job working on a large table.
4609 : */
4610 ECB : static void
4611 GBC 4 : repoint_table_dependencies(ArchiveHandle *AH)
4612 : {
4613 : TocEntry *te;
4614 : int i;
4615 : DumpId olddep;
4616 :
4617 GIC 100 : for (te = AH->toc->next; te != AH->toc; te = te->next)
4618 : {
4619 96 : if (te->section != SECTION_POST_DATA)
4620 66 : continue;
4621 160 : for (i = 0; i < te->nDeps; i++)
4622 : {
4623 CBC 130 : olddep = te->dependencies[i];
4624 130 : if (olddep <= AH->maxDumpId &&
4625 130 : AH->tableDataId[olddep] != 0)
4626 : {
4627 62 : DumpId tabledataid = AH->tableDataId[olddep];
4628 GIC 62 : TocEntry *tabledatate = AH->tocsByDumpId[tabledataid];
4629 ECB :
4630 CBC 62 : te->dependencies[i] = tabledataid;
4631 62 : te->dataLength = Max(te->dataLength, tabledatate->dataLength);
4632 62 : pg_log_debug("transferring dependency %d -> %d to %d",
4633 : te->dumpId, olddep, tabledataid);
4634 : }
4635 ECB : }
4636 : }
4637 GBC 4 : }
4638 EUB :
4639 : /*
4640 : * Identify which objects we'll need exclusive lock on in order to restore
4641 ECB : * the given TOC entry (*other* than the one identified by the TOC entry
4642 : * itself). Record their dump IDs in the entry's lockDeps[] array.
4643 : */
4644 : static void
4645 GIC 96 : identify_locking_dependencies(ArchiveHandle *AH, TocEntry *te)
4646 : {
4647 : DumpId *lockids;
4648 : int nlockids;
4649 : int i;
4650 :
4651 ECB : /*
4652 : * We only care about this for POST_DATA items. PRE_DATA items are not
4653 : * run in parallel, and DATA items are all independent by assumption.
4654 : */
4655 GIC 96 : if (te->section != SECTION_POST_DATA)
4656 CBC 66 : return;
4657 :
4658 ECB : /* Quick exit if no dependencies at all */
4659 GIC 30 : if (te->nDeps == 0)
4660 LBC 0 : return;
4661 :
4662 ECB : /*
4663 : * Most POST_DATA items are ALTER TABLEs or some moral equivalent of that,
4664 : * and hence require exclusive lock. However, we know that CREATE INDEX
4665 : * does not. (Maybe someday index-creating CONSTRAINTs will fall in that
4666 : * category too ... but today is not that day.)
4667 : */
4668 GIC 30 : if (strcmp(te->desc, "INDEX") == 0)
4669 UIC 0 : return;
4670 :
4671 : /*
4672 : * We assume the entry requires exclusive lock on each TABLE or TABLE DATA
4673 ECB : * item listed among its dependencies. Originally all of these would have
4674 : * been TABLE items, but repoint_table_dependencies would have repointed
4675 : * them to the TABLE DATA items if those are present (which they might not
4676 : * be, eg in a schema-only dump). Note that all of the entries we are
4677 : * processing here are POST_DATA; otherwise there might be a significant
4678 : * difference between a dependency on a table and a dependency on its
4679 : * data, so that closer analysis would be needed here.
4680 : */
4681 CBC 30 : lockids = (DumpId *) pg_malloc(te->nDeps * sizeof(DumpId));
4682 GIC 30 : nlockids = 0;
4683 160 : for (i = 0; i < te->nDeps; i++)
4684 ECB : {
4685 GIC 130 : DumpId depid = te->dependencies[i];
4686 :
4687 130 : if (depid <= AH->maxDumpId && AH->tocsByDumpId[depid] != NULL &&
4688 130 : ((strcmp(AH->tocsByDumpId[depid]->desc, "TABLE DATA") == 0) ||
4689 68 : strcmp(AH->tocsByDumpId[depid]->desc, "TABLE") == 0))
4690 82 : lockids[nlockids++] = depid;
4691 ECB : }
4692 :
4693 CBC 30 : if (nlockids == 0)
4694 : {
4695 LBC 0 : free(lockids);
4696 UIC 0 : return;
4697 ECB : }
4698 :
4699 CBC 30 : te->lockDeps = pg_realloc(lockids, nlockids * sizeof(DumpId));
4700 GIC 30 : te->nLockDeps = nlockids;
4701 : }
4702 :
4703 : /*
4704 : * Remove the specified TOC entry from the depCounts of items that depend on
4705 : * it, thereby possibly making them ready-to-run. Any pending item that
4706 EUB : * becomes ready should be moved to the ready_list, if that's provided.
4707 : */
4708 : static void
4709 GIC 96 : reduce_dependencies(ArchiveHandle *AH, TocEntry *te,
4710 : ParallelReadyList *ready_list)
4711 EUB : {
4712 : int i;
4713 :
4714 GIC 96 : pg_log_debug("reducing dependencies for %d", te->dumpId);
4715 EUB :
4716 GIC 288 : for (i = 0; i < te->nRevDeps; i++)
4717 EUB : {
4718 GIC 192 : TocEntry *otherte = AH->tocsByDumpId[te->revDeps[i]];
4719 :
4720 192 : Assert(otherte->depCount > 0);
4721 192 : otherte->depCount--;
4722 :
4723 : /*
4724 : * It's ready if it has no remaining dependencies, and it belongs in
4725 : * the current restore pass, and it is currently a member of the
4726 ECB : * pending list (that check is needed to prevent double restore in
4727 : * some cases where a list-file forces out-of-order restoring).
4728 : * However, if ready_list == NULL then caller doesn't want any list
4729 : * memberships changed.
4730 : */
4731 CBC 192 : if (otherte->depCount == 0 &&
4732 74 : _tocEntryRestorePass(otherte) == AH->restorePass &&
4733 GIC 74 : otherte->pending_prev != NULL &&
4734 : ready_list != NULL)
4735 ECB : {
4736 : /* Remove it from pending list ... */
4737 GIC 26 : pending_list_remove(otherte);
4738 ECB : /* ... and add to ready_list */
4739 CBC 26 : ready_list_insert(ready_list, otherte);
4740 ECB : }
4741 : }
4742 CBC 96 : }
4743 ECB :
4744 : /*
4745 : * Set the created flag on the DATA member corresponding to the given
4746 : * TABLE member
4747 EUB : */
4748 : static void
4749 GIC 4036 : mark_create_done(ArchiveHandle *AH, TocEntry *te)
4750 ECB : {
4751 GIC 4036 : if (AH->tableDataId[te->dumpId] != 0)
4752 : {
4753 2976 : TocEntry *ted = AH->tocsByDumpId[AH->tableDataId[te->dumpId]];
4754 :
4755 2976 : ted->created = true;
4756 ECB : }
4757 GIC 4036 : }
4758 :
4759 ECB : /*
4760 : * Mark the DATA member corresponding to the given TABLE member
4761 : * as not wanted
4762 : */
4763 : static void
4764 LBC 0 : inhibit_data_for_failed_table(ArchiveHandle *AH, TocEntry *te)
4765 : {
4766 0 : pg_log_info("table \"%s\" could not be created, will not restore its data",
4767 ECB : te->tag);
4768 :
4769 UIC 0 : if (AH->tableDataId[te->dumpId] != 0)
4770 : {
4771 0 : TocEntry *ted = AH->tocsByDumpId[AH->tableDataId[te->dumpId]];
4772 :
4773 0 : ted->reqs = 0;
4774 : }
4775 0 : }
4776 ECB :
4777 : /*
4778 : * Clone and de-clone routines used in parallel restoration.
4779 : *
4780 : * Enough of the structure is cloned to ensure that there is no
4781 : * conflict between different threads each with their own clone.
4782 : */
4783 : ArchiveHandle *
4784 GIC 28 : CloneArchive(ArchiveHandle *AH)
4785 ECB : {
4786 : ArchiveHandle *clone;
4787 :
4788 : /* Make a "flat" copy */
4789 CBC 28 : clone = (ArchiveHandle *) pg_malloc(sizeof(ArchiveHandle));
4790 28 : memcpy(clone, AH, sizeof(ArchiveHandle));
4791 ECB :
4792 : /* Handle format-independent fields */
4793 CBC 28 : memset(&(clone->sqlparse), 0, sizeof(clone->sqlparse));
4794 :
4795 ECB : /* The clone will have its own connection, so disregard connection state */
4796 CBC 28 : clone->connection = NULL;
4797 GIC 28 : clone->connCancel = NULL;
4798 28 : clone->currUser = NULL;
4799 28 : clone->currSchema = NULL;
4800 28 : clone->currTableAm = NULL;
4801 28 : clone->currTablespace = NULL;
4802 :
4803 : /* savedPassword must be local in case we change it while connecting */
4804 28 : if (clone->savedPassword)
4805 UIC 0 : clone->savedPassword = pg_strdup(clone->savedPassword);
4806 :
4807 : /* clone has its own error count, too */
4808 GIC 28 : clone->public.n_errors = 0;
4809 :
4810 : /*
4811 : * Connect our new clone object to the database, using the same connection
4812 : * parameters used for the original connection.
4813 : */
4814 28 : ConnectDatabase((Archive *) clone, &clone->public.ropt->cparams, true);
4815 :
4816 : /* re-establish fixed state */
4817 28 : if (AH->mode == archModeRead)
4818 10 : _doSetFixedOutputState(clone);
4819 : /* in write case, setupDumpWorker will fix up connection state */
4820 :
4821 : /* Let the format-specific code have a chance too */
4822 28 : clone->ClonePtr(clone);
4823 :
4824 28 : Assert(clone->connection != NULL);
4825 28 : return clone;
4826 : }
4827 :
4828 : /*
4829 : * Release clone-local storage.
4830 : *
4831 : * Note: we assume any clone-local connection was already closed.
4832 : */
4833 : void
4834 28 : DeCloneArchive(ArchiveHandle *AH)
4835 : {
4836 : /* Should not have an open database connection */
4837 28 : Assert(AH->connection == NULL);
4838 :
4839 : /* Clear format-specific state */
4840 28 : AH->DeClonePtr(AH);
4841 :
4842 : /* Clear state allocated by CloneArchive */
4843 28 : if (AH->sqlparse.curCmd)
4844 3 : destroyPQExpBuffer(AH->sqlparse.curCmd);
4845 :
4846 : /* Clear any connection-local state */
4847 GNC 28 : free(AH->currUser);
4848 28 : free(AH->currSchema);
4849 28 : free(AH->currTablespace);
4850 28 : free(AH->currTableAm);
4851 28 : free(AH->savedPassword);
4852 :
4853 GIC 28 : free(AH);
4854 28 : }
|