Age Owner TLA Line data Source code
1 : /*-------------------------------------------------------------------------
2 : *
3 : * pgarch.c
4 : *
5 : * PostgreSQL WAL archiver
6 : *
7 : * All functions relating to archiver are included here
8 : *
9 : * - All functions executed by archiver process
10 : *
11 : * - archiver is forked from postmaster, and the two
12 : * processes then communicate using signals. All functions
13 : * executed by postmaster are included in this file.
14 : *
15 : * Initial author: Simon Riggs simon@2ndquadrant.com
16 : *
17 : * Portions Copyright (c) 1996-2023, PostgreSQL Global Development Group
18 : * Portions Copyright (c) 1994, Regents of the University of California
19 : *
20 : *
21 : * IDENTIFICATION
22 : * src/backend/postmaster/pgarch.c
23 : *
24 : *-------------------------------------------------------------------------
25 : */
26 : #include "postgres.h"
27 :
28 : #include <time.h>
29 : #include <sys/stat.h>
30 : #include <unistd.h>
31 :
32 : #include "access/xlog.h"
33 : #include "access/xlog_internal.h"
34 : #include "archive/archive_module.h"
35 : #include "archive/shell_archive.h"
36 : #include "lib/binaryheap.h"
37 : #include "libpq/pqsignal.h"
38 : #include "pgstat.h"
39 : #include "postmaster/interrupt.h"
40 : #include "postmaster/pgarch.h"
41 : #include "storage/fd.h"
42 : #include "storage/ipc.h"
43 : #include "storage/latch.h"
44 : #include "storage/pmsignal.h"
45 : #include "storage/proc.h"
46 : #include "storage/procsignal.h"
47 : #include "storage/shmem.h"
48 : #include "storage/spin.h"
49 : #include "utils/guc.h"
50 : #include "utils/memutils.h"
51 : #include "utils/ps_status.h"
52 :
53 :
54 : /* ----------
55 : * Timer definitions.
56 : * ----------
57 : */
58 : #define PGARCH_AUTOWAKE_INTERVAL 60 /* How often to force a poll of the
59 : * archive status directory; in seconds. */
60 : #define PGARCH_RESTART_INTERVAL 10 /* How often to attempt to restart a
61 : * failed archiver; in seconds. */
62 :
63 : /*
64 : * Maximum number of retries allowed when attempting to archive a WAL
65 : * file.
66 : */
67 : #define NUM_ARCHIVE_RETRIES 3
68 :
69 : /*
70 : * Maximum number of retries allowed when attempting to remove an
71 : * orphan archive status file.
72 : */
73 : #define NUM_ORPHAN_CLEANUP_RETRIES 3
74 :
75 : /*
76 : * Maximum number of .ready files to gather per directory scan.
77 : */
78 : #define NUM_FILES_PER_DIRECTORY_SCAN 64
79 :
80 : /* Shared memory area for archiver process */
81 : typedef struct PgArchData
82 : {
83 : int pgprocno; /* pgprocno of archiver process */
84 :
85 : /*
86 : * Forces a directory scan in pgarch_readyXlog(). Protected by arch_lck.
87 : */
88 : bool force_dir_scan;
89 :
90 : slock_t arch_lck;
91 : } PgArchData;
92 :
93 : char *XLogArchiveLibrary = "";
94 :
95 :
96 : /* ----------
97 : * Local data
98 : * ----------
99 : */
100 : static time_t last_sigterm_time = 0;
101 : static PgArchData *PgArch = NULL;
102 : static const ArchiveModuleCallbacks *ArchiveCallbacks;
103 : static ArchiveModuleState *archive_module_state;
104 :
105 :
106 : /*
107 : * Stuff for tracking multiple files to archive from each scan of
108 : * archive_status. Minimizing the number of directory scans when there are
109 : * many files to archive can significantly improve archival rate.
110 : *
111 : * arch_heap is a max-heap that is used during the directory scan to track
112 : * the highest-priority files to archive. After the directory scan
113 : * completes, the file names are stored in ascending order of priority in
114 : * arch_files. pgarch_readyXlog() returns files from arch_files until it
115 : * is empty, at which point another directory scan must be performed.
116 : *
117 : * We only need this data in the archiver process, so make it a palloc'd
118 : * struct rather than a bunch of static arrays.
119 : */
120 : struct arch_files_state
121 : {
122 : binaryheap *arch_heap;
123 : int arch_files_size; /* number of live entries in arch_files[] */
124 : char *arch_files[NUM_FILES_PER_DIRECTORY_SCAN];
125 : /* buffers underlying heap, and later arch_files[], entries: */
126 : char arch_filenames[NUM_FILES_PER_DIRECTORY_SCAN][MAX_XFN_CHARS + 1];
127 : };
128 :
129 : static struct arch_files_state *arch_files = NULL;
130 :
131 : /*
132 : * Flags set by interrupt handlers for later service in the main loop.
133 : */
134 : static volatile sig_atomic_t ready_to_stop = false;
135 :
136 : /* ----------
137 : * Local function forward declarations
138 : * ----------
139 : */
140 : static void pgarch_waken_stop(SIGNAL_ARGS);
141 : static void pgarch_MainLoop(void);
142 : static void pgarch_ArchiverCopyLoop(void);
143 : static bool pgarch_archiveXlog(char *xlog);
144 : static bool pgarch_readyXlog(char *xlog);
145 : static void pgarch_archiveDone(char *xlog);
146 : static void pgarch_die(int code, Datum arg);
147 : static void HandlePgArchInterrupts(void);
148 : static int ready_file_comparator(Datum a, Datum b, void *arg);
149 : static void LoadArchiveLibrary(void);
150 : static void pgarch_call_module_shutdown_cb(int code, Datum arg);
151 :
152 : /* Report shared memory space needed by PgArchShmemInit */
153 : Size
755 fujii 154 GIC 6390 : PgArchShmemSize(void)
155 : {
156 6390 : Size size = 0;
6838 tgl 157 ECB :
755 fujii 158 GIC 6390 : size = add_size(size, sizeof(PgArchData));
6838 tgl 159 ECB :
755 fujii 160 GIC 6390 : return size;
755 fujii 161 ECB : }
162 :
163 : /* Allocate and initialize archiver-related shared memory */
164 : void
755 fujii 165 GIC 1826 : PgArchShmemInit(void)
166 : {
167 : bool found;
6838 tgl 168 ECB :
755 fujii 169 GIC 1826 : PgArch = (PgArchData *)
170 1826 : ShmemInitStruct("Archiver Data", PgArchShmemSize(), &found);
171 :
755 fujii 172 CBC 1826 : if (!found)
755 fujii 173 ECB : {
174 : /* First time through, so initialize */
755 fujii 175 CBC 3652 : MemSet(PgArch, 0, PgArchShmemSize());
755 fujii 176 GIC 1826 : PgArch->pgprocno = INVALID_PGPROCNO;
514 rhaas 177 1826 : SpinLockInit(&PgArch->arch_lck);
6838 tgl 178 ECB : }
6838 tgl 179 CBC 1826 : }
6838 tgl 180 ECB :
181 : /*
755 fujii 182 : * PgArchCanRestart
183 : *
184 : * Return true and archiver is allowed to restart if enough time has
185 : * passed since it was launched last to reach PGARCH_RESTART_INTERVAL.
186 : * Otherwise return false.
187 : *
188 : * This is a safety valve to protect against continuous respawn attempts if the
189 : * archiver is dying immediately at launch. Note that since we will retry to
190 : * launch the archiver from the postmaster main loop, we will get another
191 : * chance later.
192 : */
193 : bool
755 fujii 194 GIC 28 : PgArchCanRestart(void)
195 : {
196 : static time_t last_pgarch_start_time = 0;
755 fujii 197 CBC 28 : time_t curtime = time(NULL);
198 :
199 : /*
755 fujii 200 ECB : * Return false and don't restart archiver if too soon since last archiver
201 : * start.
202 : */
755 fujii 203 GIC 28 : if ((unsigned int) (curtime - last_pgarch_start_time) <
204 : (unsigned int) PGARCH_RESTART_INTERVAL)
755 fujii 205 UIC 0 : return false;
6838 tgl 206 ECB :
755 fujii 207 GIC 28 : last_pgarch_start_time = curtime;
755 fujii 208 GBC 28 : return true;
209 : }
6838 tgl 210 ECB :
211 :
212 : /* Main entry point for archiver process */
213 : void
755 fujii 214 GIC 10 : PgArchiverMain(void)
215 : {
216 : /*
6797 bruce 217 ECB : * Ignore all signals usually bound to some action in the postmaster,
218 : * except for SIGHUP, SIGTERM, SIGUSR1, SIGUSR2, and SIGQUIT.
219 : */
1209 rhaas 220 GIC 10 : pqsignal(SIGHUP, SignalHandlerForConfigReload);
6797 bruce 221 10 : pqsignal(SIGINT, SIG_IGN);
1209 rhaas 222 10 : pqsignal(SIGTERM, SignalHandlerForShutdownRequest);
935 tgl 223 ECB : /* SIGQUIT handler was already set up by InitPostmasterChild */
6797 bruce 224 CBC 10 : pqsignal(SIGALRM, SIG_IGN);
225 10 : pqsignal(SIGPIPE, SIG_IGN);
755 fujii 226 GIC 10 : pqsignal(SIGUSR1, procsignal_sigusr1_handler);
5567 tgl 227 CBC 10 : pqsignal(SIGUSR2, pgarch_waken_stop);
755 fujii 228 ECB :
1604 tgl 229 : /* Reset some signals that are accepted by postmaster but not here */
6797 bruce 230 CBC 10 : pqsignal(SIGCHLD, SIG_DFL);
231 :
232 : /* Unblock signals (they were blocked when the postmaster forked us) */
65 tmunro 233 GNC 10 : sigprocmask(SIG_SETMASK, &UnBlockSig, NULL);
234 :
235 : /* We shouldn't be launched unnecessarily. */
755 fujii 236 CBC 10 : Assert(XLogArchivingActive());
237 :
238 : /* Arrange to clean up at archiver exit */
239 10 : on_shmem_exit(pgarch_die, 0);
240 :
241 : /*
755 fujii 242 ECB : * Advertise our pgprocno so that backends can use our latch to wake us up
243 : * while we're sleeping.
244 : */
755 fujii 245 GIC 10 : PgArch->pgprocno = MyProc->pgprocno;
246 :
247 : /* Create workspace for pgarch_readyXlog() */
466 tgl 248 CBC 10 : arch_files = palloc(sizeof(struct arch_files_state));
466 tgl 249 GIC 10 : arch_files->arch_files_size = 0;
250 :
514 rhaas 251 ECB : /* Initialize our max-heap for prioritizing files to archive. */
466 tgl 252 CBC 10 : arch_files->arch_heap = binaryheap_allocate(NUM_FILES_PER_DIRECTORY_SCAN,
253 : ready_file_comparator, NULL);
254 :
430 rhaas 255 ECB : /* Load the archive_library. */
430 rhaas 256 GIC 10 : LoadArchiveLibrary();
257 :
172 michael 258 10 : pgarch_MainLoop();
6838 tgl 259 ECB :
755 fujii 260 GIC 10 : proc_exit(0);
6838 tgl 261 ECB : }
262 :
755 fujii 263 : /*
264 : * Wake up the archiver
265 : */
266 : void
755 fujii 267 GIC 59 : PgArchWakeup(void)
268 : {
269 59 : int arch_pgprocno = PgArch->pgprocno;
4260 tgl 270 ECB :
271 : /*
755 fujii 272 : * We don't acquire ProcArrayLock here. It's actually fine because
273 : * procLatch isn't ever freed, so we just can potentially set the wrong
274 : * process' (or no process') latch. Even in that case the archiver will
275 : * be relaunched shortly and will start archiving.
276 : */
755 fujii 277 GIC 59 : if (arch_pgprocno != INVALID_PGPROCNO)
278 46 : SetLatch(&ProcGlobal->allProcs[arch_pgprocno].procLatch);
6838 tgl 279 59 : }
6838 tgl 280 ECB :
755 fujii 281 :
5567 tgl 282 : /* SIGUSR2 signal handler for archiver process */
283 : static void
5567 tgl 284 GIC 10 : pgarch_waken_stop(SIGNAL_ARGS)
285 : {
4260 286 10 : int save_errno = errno;
4260 tgl 287 ECB :
288 : /* set flag to do a final cycle and shut down afterwards */
5567 tgl 289 CBC 10 : ready_to_stop = true;
3007 andres 290 GIC 10 : SetLatch(MyLatch);
291 :
4260 tgl 292 CBC 10 : errno = save_errno;
5567 293 10 : }
294 :
6838 tgl 295 ECB : /*
296 : * pgarch_MainLoop
297 : *
298 : * Main loop for archiver
299 : */
300 : static void
6838 tgl 301 GIC 10 : pgarch_MainLoop(void)
302 : {
5567 tgl 303 ECB : bool time_to_stop;
304 :
305 : /*
306 : * There shouldn't be anything for the archiver to do except to wait for a
307 : * signal ... however, the archiver exists to protect our data, so it
308 : * wakes up occasionally to allow itself to be proactive.
309 : */
310 : do
311 : {
3007 andres 312 GIC 43 : ResetLatch(MyLatch);
313 :
5567 tgl 314 ECB : /* When we get SIGUSR2, we do one more archive cycle, then exit */
5567 tgl 315 GIC 43 : time_to_stop = ready_to_stop;
316 :
660 fujii 317 ECB : /* Check for barrier events and config update */
660 fujii 318 GIC 43 : HandlePgArchInterrupts();
319 :
5567 tgl 320 ECB : /*
321 : * If we've gotten SIGTERM, we normally just sit and do nothing until
322 : * SIGUSR2 arrives. However, that means a random SIGTERM would
323 : * disable archiving indefinitely, which doesn't seem like a good
324 : * idea. If more than 60 seconds pass since SIGTERM, exit anyway, so
325 : * that the postmaster can start a new archiver if needed.
326 : */
1209 rhaas 327 GIC 43 : if (ShutdownRequestPending)
328 : {
5567 tgl 329 LBC 0 : time_t curtime = time(NULL);
330 :
5567 tgl 331 UBC 0 : if (last_sigterm_time == 0)
5567 tgl 332 UIC 0 : last_sigterm_time = curtime;
5567 tgl 333 UBC 0 : else if ((unsigned int) (curtime - last_sigterm_time) >=
5567 tgl 334 EUB : (unsigned int) 60)
5567 tgl 335 UBC 0 : break;
336 : }
5567 tgl 337 EUB :
338 : /* Do what we're here for */
755 fujii 339 GIC 43 : pgarch_ArchiverCopyLoop();
6838 tgl 340 ECB :
341 : /*
342 : * Sleep until a signal is received, or until a poll is forced by
343 : * PGARCH_AUTOWAKE_INTERVAL, or until postmaster dies.
344 : */
3955 bruce 345 CBC 43 : if (!time_to_stop) /* Don't wait during last iteration */
346 : {
347 : int rc;
67 michael 348 ECB :
67 michael 349 GNC 33 : rc = WaitLatch(MyLatch,
350 : WL_LATCH_SET | WL_TIMEOUT | WL_POSTMASTER_DEATH,
351 : PGARCH_AUTOWAKE_INTERVAL * 1000L,
352 : WAIT_EVENT_ARCHIVER_MAIN);
353 33 : if (rc & WL_POSTMASTER_DEATH)
67 michael 354 UNC 0 : time_to_stop = true;
6838 tgl 355 ECB : }
5567 356 :
357 : /*
358 : * The archiver quits either when the postmaster dies (not expected)
359 : * or after completing one more archiving cycle after receiving
360 : * SIGUSR2.
361 : */
1598 tmunro 362 GIC 43 : } while (!time_to_stop);
6838 tgl 363 10 : }
6838 tgl 364 ECB :
365 : /*
366 : * pgarch_ArchiverCopyLoop
367 : *
368 : * Archives all outstanding xlogs then returns
369 : */
370 : static void
6838 tgl 371 GIC 43 : pgarch_ArchiverCopyLoop(void)
372 : {
373 : char xlog[MAX_XFN_CHARS + 1];
374 :
375 : /* force directory scan in the first call to pgarch_readyXlog() */
466 376 43 : arch_files->arch_files_size = 0;
514 rhaas 377 ECB :
378 : /*
6797 bruce 379 : * loop through all xlogs with archive_status of .ready and archive
6385 380 : * them...mostly we expect this to be a single file, though it is possible
381 : * some backend will add files onto the list of those that need archiving
382 : * while we are still copying earlier archives
6797 383 : */
6158 tgl 384 GIC 67 : while (pgarch_readyXlog(xlog))
385 : {
6797 bruce 386 24 : int failures = 0;
1581 michael 387 24 : int failures_orphan = 0;
388 :
389 : for (;;)
6838 tgl 390 1 : {
391 : struct stat stat_buf;
392 : char pathname[MAXPGPATH];
393 :
5567 tgl 394 ECB : /*
5567 tgl 395 EUB : * Do not initiate any more archive commands after receiving
396 : * SIGTERM, nor after the postmaster has died unexpectedly. The
397 : * first condition is to try to keep from having init SIGKILL the
398 : * command, and the second is to avoid conflicts with another
399 : * archiver spawned by a newer postmaster.
400 : */
1209 rhaas 401 GIC 25 : if (ShutdownRequestPending || !PostmasterIsAlive())
6158 tgl 402 LBC 0 : return;
403 :
404 : /*
660 fujii 405 ECB : * Check for barrier events and config update. This is so that
406 : * we'll adopt a new setting for archive_command as soon as
407 : * possible, even if there is a backlog of files to be archived.
4716 tgl 408 EUB : */
660 fujii 409 GIC 25 : HandlePgArchInterrupts();
4716 tgl 410 EUB :
411 : /* can't do anything if not configured ... */
51 michael 412 GNC 25 : if (ArchiveCallbacks->check_configured_cb != NULL &&
413 25 : !ArchiveCallbacks->check_configured_cb(archive_module_state))
414 : {
4716 tgl 415 UIC 0 : ereport(WARNING,
416 : (errmsg("archive_mode enabled, yet archiving is not configured")));
417 0 : return;
418 : }
419 :
420 : /*
421 : * Since archive status files are not removed in a durable manner,
1581 michael 422 ECB : * a system crash could leave behind .ready files for WAL segments
423 : * that have already been recycled or removed. In this case,
1581 michael 424 EUB : * simply remove the orphan status file and move on. unlink() is
425 : * used here as even on subsequent crashes the same orphan files
426 : * would get removed, so there is no need to worry about
427 : * durability.
428 : */
1581 michael 429 GIC 25 : snprintf(pathname, MAXPGPATH, XLOGDIR "/%s", xlog);
1581 michael 430 GBC 25 : if (stat(pathname, &stat_buf) != 0 && errno == ENOENT)
1581 michael 431 UIC 0 : {
432 : char xlogready[MAXPGPATH];
433 :
434 0 : StatusFilePath(xlogready, xlog, ".ready");
1581 michael 435 UBC 0 : if (unlink(xlogready) == 0)
436 : {
1581 michael 437 UIC 0 : ereport(WARNING,
1581 michael 438 EUB : (errmsg("removed orphan archive status file \"%s\"",
439 : xlogready)));
440 :
441 : /* leave loop and move to the next status file */
1581 michael 442 UIC 0 : break;
443 : }
444 :
1581 michael 445 UBC 0 : if (++failures_orphan >= NUM_ORPHAN_CLEANUP_RETRIES)
446 : {
1581 michael 447 UIC 0 : ereport(WARNING,
448 : (errmsg("removal of orphan archive status file \"%s\" failed too many times, will try again later",
1581 michael 449 EUB : xlogready)));
450 :
451 : /* give up cleanup of orphan status files */
1581 michael 452 UIC 0 : return;
1581 michael 453 ECB : }
454 :
455 : /* wait a bit before retrying */
1581 michael 456 LBC 0 : pg_usleep(1000000L);
1581 michael 457 UIC 0 : continue;
458 : }
459 :
6838 tgl 460 GIC 25 : if (pgarch_archiveXlog(xlog))
461 : {
6838 tgl 462 ECB : /* successful */
6838 tgl 463 GIC 24 : pgarch_archiveDone(xlog);
3357 fujii 464 ECB :
465 : /*
466 : * Tell the cumulative stats system about the WAL file that we
467 : * successfully archived
468 : */
368 andres 469 GIC 24 : pgstat_report_archiver(xlog, false);
470 :
6838 tgl 471 24 : break; /* out of inner retry loop */
6838 tgl 472 ECB : }
473 : else
474 : {
475 : /*
368 andres 476 EUB : * Tell the cumulative stats system about the WAL file that we
477 : * failed to archive
478 : */
368 andres 479 GBC 1 : pgstat_report_archiver(xlog, true);
480 :
6838 tgl 481 CBC 1 : if (++failures >= NUM_ARCHIVE_RETRIES)
482 : {
6838 tgl 483 UIC 0 : ereport(WARNING,
484 : (errmsg("archiving write-ahead log file \"%s\" failed too many times, will try again later",
485 : xlog)));
486 0 : return; /* give up archiving for now */
487 : }
6716 tgl 488 GIC 1 : pg_usleep(1000000L); /* wait a bit before retrying */
489 : }
490 : }
491 : }
492 : }
493 :
494 : /*
6838 tgl 495 ECB : * pgarch_archiveXlog
496 : *
497 : * Invokes archive_file_cb to copy one archive file to wherever it should go
498 : *
499 : * Returns true if successful
500 : */
501 : static bool
6838 tgl 502 GIC 25 : pgarch_archiveXlog(char *xlog)
503 : {
6797 bruce 504 ECB : char pathname[MAXPGPATH];
5591 tgl 505 : char activitymsg[MAXFNAMELEN + 16];
506 : bool ret;
6838 507 :
6488 tgl 508 CBC 25 : snprintf(pathname, MAXPGPATH, XLOGDIR "/%s", xlog);
6838 tgl 509 ECB :
510 : /* Report archive activity in PS display */
5591 tgl 511 CBC 25 : snprintf(activitymsg, sizeof(activitymsg), "archiving %s", xlog);
1124 peter 512 25 : set_ps_display(activitymsg);
513 :
51 michael 514 GNC 25 : ret = ArchiveCallbacks->archive_file_cb(archive_module_state, xlog, pathname);
436 rhaas 515 GIC 25 : if (ret)
516 24 : snprintf(activitymsg, sizeof(activitymsg), "last was %s", xlog);
517 : else
5591 tgl 518 1 : snprintf(activitymsg, sizeof(activitymsg), "failed on %s", xlog);
1124 peter 519 25 : set_ps_display(activitymsg);
520 :
436 rhaas 521 25 : return ret;
522 : }
523 :
524 : /*
525 : * pgarch_readyXlog
526 : *
527 : * Return name of the oldest xlog file that has not yet been archived.
528 : * No notification is set that file archiving is now in progress, so
529 : * this would need to be extended if multiple concurrent archival
530 : * tasks were created. If a failure occurs, we will completely
531 : * re-copy the file at the next available opportunity.
532 : *
533 : * It is important that we return the oldest, so that we archive xlogs
534 : * in order that they were written, for two reasons:
535 : * 1) to maintain the sequential chain of xlogs required for recovery
536 : * 2) because the oldest ones will sooner become candidates for
537 : * recycling at time of checkpoint
538 : *
539 : * NOTE: the "oldest" comparison will consider any .history file to be older
1567 michael 540 ECB : * than any other file except another .history file. Segments on a timeline
541 : * with a smaller ID will be older than all segments on a timeline with a
542 : * larger ID; the net result being that past timelines are given higher
543 : * priority for archiving. This seems okay, or at least not obviously worth
544 : * changing.
545 : */
546 : static bool
6838 tgl 547 GIC 67 : pgarch_readyXlog(char *xlog)
548 : {
549 : char XLogArchiveStatusDir[MAXPGPATH];
550 : DIR *rldir;
6797 bruce 551 ECB : struct dirent *rlde;
514 rhaas 552 : bool force_dir_scan;
6838 tgl 553 :
514 rhaas 554 : /*
555 : * If a directory scan was requested, clear the stored file names and
556 : * proceed.
557 : */
514 rhaas 558 GIC 67 : SpinLockAcquire(&PgArch->arch_lck);
559 67 : force_dir_scan = PgArch->force_dir_scan;
560 67 : PgArch->force_dir_scan = false;
561 67 : SpinLockRelease(&PgArch->arch_lck);
562 :
563 67 : if (force_dir_scan)
466 tgl 564 2 : arch_files->arch_files_size = 0;
514 rhaas 565 ECB :
566 : /*
567 : * If we still have stored file names from the previous directory scan,
568 : * try to return one of those. We check to make sure the status file is
569 : * still present, as the archive_command for a previous file may have
570 : * already marked it done.
571 : */
466 tgl 572 CBC 67 : while (arch_files->arch_files_size > 0)
514 rhaas 573 ECB : {
574 : struct stat st;
575 : char status_file[MAXPGPATH];
576 : char *arch_file;
577 :
466 tgl 578 CBC 1 : arch_files->arch_files_size--;
466 tgl 579 GIC 1 : arch_file = arch_files->arch_files[arch_files->arch_files_size];
514 rhaas 580 GBC 1 : StatusFilePath(status_file, arch_file, ".ready");
514 rhaas 581 EUB :
514 rhaas 582 GIC 1 : if (stat(status_file, &st) == 0)
583 : {
584 1 : strcpy(xlog, arch_file);
585 1 : return true;
586 : }
514 rhaas 587 LBC 0 : else if (errno != ENOENT)
514 rhaas 588 UIC 0 : ereport(ERROR,
589 : (errcode_for_file_access(),
590 : errmsg("could not stat file \"%s\": %m", status_file)));
591 : }
592 :
466 tgl 593 ECB : /* arch_heap is probably empty, but let's make sure */
466 tgl 594 CBC 66 : binaryheap_reset(arch_files->arch_heap);
595 :
514 rhaas 596 ECB : /*
597 : * Open the archive status directory and read through the list of files
598 : * with the .ready suffix, looking for the earliest files.
599 : */
6488 tgl 600 GIC 66 : snprintf(XLogArchiveStatusDir, MAXPGPATH, XLOGDIR "/archive_status");
6838 601 66 : rldir = AllocateDir(XLogArchiveStatusDir);
602 :
6503 tgl 603 CBC 327 : while ((rlde = ReadDir(rldir, XLogArchiveStatusDir)) != NULL)
604 : {
6797 bruce 605 261 : int basenamelen = (int) strlen(rlde->d_name) - 6;
606 : char basename[MAX_XFN_CHARS + 1];
607 : char *arch_file;
6838 tgl 608 ECB :
1567 michael 609 EUB : /* Ignore entries with unexpected number of characters */
1567 michael 610 GIC 261 : if (basenamelen < MIN_XFN_CHARS ||
611 : basenamelen > MAX_XFN_CHARS)
1567 michael 612 CBC 237 : continue;
1567 michael 613 ECB :
614 : /* Ignore entries with unexpected characters */
1567 michael 615 GIC 110 : if (strspn(rlde->d_name, VALID_XFN_CHARS) < basenamelen)
1567 michael 616 LBC 0 : continue;
1567 michael 617 ECB :
618 : /* Ignore anything not suffixed with .ready */
1567 michael 619 GIC 110 : if (strcmp(rlde->d_name + basenamelen, ".ready") != 0)
620 86 : continue;
621 :
1567 michael 622 ECB : /* Truncate off the .ready */
1567 michael 623 GIC 24 : memcpy(basename, rlde->d_name, basenamelen);
624 24 : basename[basenamelen] = '\0';
1567 michael 625 ECB :
626 : /*
514 rhaas 627 : * Store the file in our max-heap if it has a high enough priority.
628 : */
466 tgl 629 GIC 24 : if (arch_files->arch_heap->bh_size < NUM_FILES_PER_DIRECTORY_SCAN)
6838 tgl 630 ECB : {
514 rhaas 631 EUB : /* If the heap isn't full yet, quickly add it. */
466 tgl 632 GIC 24 : arch_file = arch_files->arch_filenames[arch_files->arch_heap->bh_size];
514 rhaas 633 GBC 24 : strcpy(arch_file, basename);
466 tgl 634 GIC 24 : binaryheap_add_unordered(arch_files->arch_heap, CStringGetDatum(arch_file));
635 :
636 : /* If we just filled the heap, make it a valid one. */
637 24 : if (arch_files->arch_heap->bh_size == NUM_FILES_PER_DIRECTORY_SCAN)
466 tgl 638 UIC 0 : binaryheap_build(arch_files->arch_heap);
639 : }
466 tgl 640 UBC 0 : else if (ready_file_comparator(binaryheap_first(arch_files->arch_heap),
514 rhaas 641 EUB : CStringGetDatum(basename), NULL) > 0)
1567 michael 642 : {
643 : /*
644 : * Remove the lowest priority file and add the current one to the
332 tgl 645 ECB : * heap.
646 : */
466 tgl 647 UIC 0 : arch_file = DatumGetCString(binaryheap_remove_first(arch_files->arch_heap));
514 rhaas 648 LBC 0 : strcpy(arch_file, basename);
466 tgl 649 0 : binaryheap_add(arch_files->arch_heap, CStringGetDatum(arch_file));
650 : }
651 : }
6838 tgl 652 GIC 66 : FreeDir(rldir);
653 :
654 : /* If no files were found, simply return. */
466 tgl 655 CBC 66 : if (arch_files->arch_heap->bh_size == 0)
514 rhaas 656 43 : return false;
657 :
658 : /*
659 : * If we didn't fill the heap, we didn't make it a valid one. Do that
660 : * now.
661 : */
466 tgl 662 23 : if (arch_files->arch_heap->bh_size < NUM_FILES_PER_DIRECTORY_SCAN)
663 23 : binaryheap_build(arch_files->arch_heap);
514 rhaas 664 ECB :
665 : /*
666 : * Fill arch_files array with the files to archive in ascending order of
332 tgl 667 : * priority.
514 rhaas 668 : */
466 tgl 669 GIC 23 : arch_files->arch_files_size = arch_files->arch_heap->bh_size;
466 tgl 670 CBC 47 : for (int i = 0; i < arch_files->arch_files_size; i++)
466 tgl 671 GIC 24 : arch_files->arch_files[i] = DatumGetCString(binaryheap_remove_first(arch_files->arch_heap));
672 :
673 : /* Return the highest priority file. */
674 23 : arch_files->arch_files_size--;
675 23 : strcpy(xlog, arch_files->arch_files[arch_files->arch_files_size]);
676 :
514 rhaas 677 23 : return true;
678 : }
679 :
680 : /*
681 : * ready_file_comparator
514 rhaas 682 ECB : *
683 : * Compares the archival priority of the given files to archive. If "a"
684 : * has a higher priority than "b", a negative value will be returned. If
685 : * "b" has a higher priority than "a", a positive value will be returned.
686 : * If "a" and "b" have equivalent values, 0 will be returned.
687 : */
688 : static int
514 rhaas 689 GIC 1 : ready_file_comparator(Datum a, Datum b, void *arg)
514 rhaas 690 ECB : {
332 tgl 691 GBC 1 : char *a_str = DatumGetCString(a);
332 tgl 692 GIC 1 : char *b_str = DatumGetCString(b);
693 1 : bool a_history = IsTLHistoryFileName(a_str);
332 tgl 694 CBC 1 : bool b_history = IsTLHistoryFileName(b_str);
695 :
696 : /* Timeline history files always have the highest priority. */
514 rhaas 697 GIC 1 : if (a_history != b_history)
514 rhaas 698 UIC 0 : return a_history ? -1 : 1;
699 :
700 : /* Priority is given to older files. */
514 rhaas 701 GIC 1 : return strcmp(a_str, b_str);
702 : }
703 :
704 : /*
514 rhaas 705 ECB : * PgArchForceDirScan
706 : *
707 : * When called, the next call to pgarch_readyXlog() will perform a
708 : * directory scan. This is useful for ensuring that important files such
709 : * as timeline history files are archived as quickly as possible.
710 : */
711 : void
514 rhaas 712 GIC 10 : PgArchForceDirScan(void)
713 : {
714 10 : SpinLockAcquire(&PgArch->arch_lck);
715 10 : PgArch->force_dir_scan = true;
716 10 : SpinLockRelease(&PgArch->arch_lck);
6838 tgl 717 10 : }
718 :
719 : /*
720 : * pgarch_archiveDone
6838 tgl 721 ECB : *
722 : * Emit notification that an xlog file has been successfully archived.
723 : * We do this by renaming the status file from NNN.ready to NNN.done.
724 : * Eventually, a checkpoint process will notice this and delete both the
725 : * NNN.done file and the xlog file itself.
726 : */
727 : static void
6838 tgl 728 GIC 24 : pgarch_archiveDone(char *xlog)
729 : {
730 : char rlogready[MAXPGPATH];
731 : char rlogdone[MAXPGPATH];
732 :
6836 733 24 : StatusFilePath(rlogready, xlog, ".ready");
734 24 : StatusFilePath(rlogdone, xlog, ".done");
735 :
736 : /*
737 : * To avoid extra overhead, we don't durably rename the .ready file to
738 : * .done. Archive commands and libraries must gracefully handle attempts
739 : * to re-archive files (e.g., if the server crashes just before this
740 : * function is called), so it should be okay if the .ready file reappears
741 : * after a crash.
742 : */
257 fujii 743 GNC 24 : if (rename(rlogready, rlogdone) < 0)
257 fujii 744 UNC 0 : ereport(WARNING,
745 : (errcode_for_file_access(),
746 : errmsg("could not rename file \"%s\" to \"%s\": %m",
747 : rlogready, rlogdone)));
6838 tgl 748 CBC 24 : }
755 fujii 749 EUB :
750 :
751 : /*
752 : * pgarch_die
755 fujii 753 ECB : *
754 : * Exit-time cleanup handler
755 : */
756 : static void
755 fujii 757 GIC 10 : pgarch_die(int code, Datum arg)
758 : {
759 10 : PgArch->pgprocno = INVALID_PGPROCNO;
760 10 : }
761 :
660 fujii 762 ECB : /*
763 : * Interrupt handler for WAL archiver process.
764 : *
765 : * This is called in the loops pgarch_MainLoop and pgarch_ArchiverCopyLoop.
766 : * It checks for barrier events, config update and request for logging of
767 : * memory contexts, but not shutdown request because how to handle
768 : * shutdown request is different between those loops.
769 : */
770 : static void
660 fujii 771 GIC 68 : HandlePgArchInterrupts(void)
772 : {
773 68 : if (ProcSignalBarrierPending)
660 fujii 774 UIC 0 : ProcessProcSignalBarrier();
775 :
430 rhaas 776 ECB : /* Perform logging of memory contexts of this process */
430 rhaas 777 GIC 68 : if (LogMemoryContextPending)
430 rhaas 778 LBC 0 : ProcessLogMemoryContextInterrupt();
430 rhaas 779 EUB :
660 fujii 780 GIC 68 : if (ConfigReloadPending)
781 : {
430 rhaas 782 CBC 2 : char *archiveLib = pstrdup(XLogArchiveLibrary);
430 rhaas 783 EUB : bool archiveLibChanged;
784 :
660 fujii 785 CBC 2 : ConfigReloadPending = false;
660 fujii 786 GIC 2 : ProcessConfigFile(PGC_SIGHUP);
430 rhaas 787 ECB :
145 peter 788 GNC 2 : if (XLogArchiveLibrary[0] != '\0' && XLogArchiveCommand[0] != '\0')
145 peter 789 UNC 0 : ereport(ERROR,
790 : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
791 : errmsg("both archive_command and archive_library set"),
792 : errdetail("Only one of archive_command, archive_library may be set.")));
793 :
430 rhaas 794 GIC 2 : archiveLibChanged = strcmp(XLogArchiveLibrary, archiveLib) != 0;
795 2 : pfree(archiveLib);
430 rhaas 796 ECB :
430 rhaas 797 CBC 2 : if (archiveLibChanged)
798 : {
430 rhaas 799 ECB : /*
430 rhaas 800 EUB : * Ideally, we would simply unload the previous archive module and
801 : * load the new one, but there is presently no mechanism for
802 : * unloading a library (see the comment above
803 : * internal_load_library()). To deal with this, we simply restart
804 : * the archiver. The new archive module will be loaded when the
172 michael 805 ECB : * new archiver process starts up. Note that this triggers the
806 : * module's shutdown callback, if defined.
807 : */
430 rhaas 808 LBC 0 : ereport(LOG,
809 : (errmsg("restarting archiver process because value of "
810 : "\"archive_library\" was changed")));
811 :
430 rhaas 812 UIC 0 : proc_exit(0);
813 : }
814 : }
430 rhaas 815 GIC 68 : }
816 :
817 : /*
818 : * LoadArchiveLibrary
430 rhaas 819 EUB : *
820 : * Loads the archiving callbacks into our local ArchiveCallbacks.
821 : */
822 : static void
430 rhaas 823 GBC 10 : LoadArchiveLibrary(void)
824 : {
825 : ArchiveModuleInit archive_init;
430 rhaas 826 ECB :
145 peter 827 GNC 10 : if (XLogArchiveLibrary[0] != '\0' && XLogArchiveCommand[0] != '\0')
145 peter 828 UNC 0 : ereport(ERROR,
829 : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
830 : errmsg("both archive_command and archive_library set"),
831 : errdetail("Only one of archive_command, archive_library may be set.")));
832 :
833 : /*
834 : * If shell archiving is enabled, use our special initialization function.
835 : * Otherwise, load the library and call its _PG_archive_module_init().
836 : */
430 rhaas 837 GIC 10 : if (XLogArchiveLibrary[0] == '\0')
430 rhaas 838 CBC 9 : archive_init = shell_archive_init;
839 : else
430 rhaas 840 GIC 1 : archive_init = (ArchiveModuleInit)
841 1 : load_external_function(XLogArchiveLibrary,
430 rhaas 842 ECB : "_PG_archive_module_init", false, NULL);
430 rhaas 843 EUB :
430 rhaas 844 GIC 10 : if (archive_init == NULL)
430 rhaas 845 UIC 0 : ereport(ERROR,
846 : (errmsg("archive modules have to define the symbol %s", "_PG_archive_module_init")));
847 :
51 michael 848 GNC 10 : ArchiveCallbacks = (*archive_init) ();
849 :
850 10 : if (ArchiveCallbacks->archive_file_cb == NULL)
430 rhaas 851 UIC 0 : ereport(ERROR,
430 rhaas 852 ECB : (errmsg("archive modules must register an archive callback")));
172 michael 853 :
51 michael 854 GNC 10 : archive_module_state = (ArchiveModuleState *) palloc0(sizeof(ArchiveModuleState));
855 10 : if (ArchiveCallbacks->startup_cb != NULL)
856 1 : ArchiveCallbacks->startup_cb(archive_module_state);
857 :
172 michael 858 GIC 10 : before_shmem_exit(pgarch_call_module_shutdown_cb, 0);
430 rhaas 859 CBC 10 : }
430 rhaas 860 ECB :
861 : /*
862 : * Call the shutdown callback of the loaded archive module, if defined.
863 : */
430 rhaas 864 EUB : static void
172 michael 865 GIC 10 : pgarch_call_module_shutdown_cb(int code, Datum arg)
866 : {
51 michael 867 GNC 10 : if (ArchiveCallbacks->shutdown_cb != NULL)
868 10 : ArchiveCallbacks->shutdown_cb(archive_module_state);
660 fujii 869 CBC 10 : }
|