Age Owner TLA Line data Source code
1 : /*-------------------------------------------------------------------------
2 : *
3 : * basic_archive.c
4 : *
5 : * This file demonstrates a basic archive library implementation that is
6 : * roughly equivalent to the following shell command:
7 : *
8 : * test ! -f /path/to/dest && cp /path/to/src /path/to/dest
9 : *
10 : * One notable difference between this module and the shell command above
11 : * is that this module first copies the file to a temporary destination,
12 : * syncs it to disk, and then durably moves it to the final destination.
13 : *
14 : * Another notable difference is that if /path/to/dest already exists
15 : * but has contents identical to /path/to/src, archiving will succeed,
16 : * whereas the command shown above would fail. This prevents problems if
17 : * a file is successfully archived and then the system crashes before
18 : * a durable record of the success has been made.
19 : *
20 : * Copyright (c) 2022-2023, PostgreSQL Global Development Group
21 : *
22 : * IDENTIFICATION
23 : * contrib/basic_archive/basic_archive.c
24 : *
25 : *-------------------------------------------------------------------------
26 : */
27 : #include "postgres.h"
28 :
29 : #include <sys/stat.h>
30 : #include <sys/time.h>
31 : #include <unistd.h>
32 :
33 : #include "archive/archive_module.h"
34 : #include "common/int.h"
35 : #include "miscadmin.h"
36 : #include "storage/copydir.h"
37 : #include "storage/fd.h"
38 : #include "utils/guc.h"
39 : #include "utils/memutils.h"
40 :
430 rhaas 41 CBC 1 : PG_MODULE_MAGIC;
42 :
43 : typedef struct BasicArchiveData
44 : {
45 : MemoryContext context;
46 : } BasicArchiveData;
47 :
48 : static char *archive_directory = NULL;
49 :
50 : static void basic_archive_startup(ArchiveModuleState *state);
51 : static bool basic_archive_configured(ArchiveModuleState *state);
52 : static bool basic_archive_file(ArchiveModuleState *state, const char *file, const char *path);
53 : static void basic_archive_file_internal(const char *file, const char *path);
54 : static bool check_archive_directory(char **newval, void **extra, GucSource source);
55 : static bool compare_files(const char *file1, const char *file2);
56 : static void basic_archive_shutdown(ArchiveModuleState *state);
57 :
58 : static const ArchiveModuleCallbacks basic_archive_callbacks = {
59 : .startup_cb = basic_archive_startup,
60 : .check_configured_cb = basic_archive_configured,
61 : .archive_file_cb = basic_archive_file,
62 : .shutdown_cb = basic_archive_shutdown
63 : };
64 :
65 : /*
66 : * _PG_init
67 : *
68 : * Defines the module's GUC.
69 : */
70 : void
430 rhaas 71 GIC 1 : _PG_init(void)
72 : {
73 michael 73 1 : DefineCustomStringVariable("basic_archive.archive_directory",
74 : gettext_noop("Archive file destination directory."),
75 : NULL,
76 : &archive_directory,
77 : "",
78 : PGC_SIGHUP,
79 : 0,
80 : check_archive_directory, NULL, NULL);
430 rhaas 81 ECB :
73 michael 82 GIC 1 : MarkGUCPrefixReserved("basic_archive");
430 rhaas 83 1 : }
84 :
85 : /*
86 : * _PG_archive_module_init
87 : *
430 rhaas 88 ECB : * Returns the module's archiving callbacks.
89 : */
90 : const ArchiveModuleCallbacks *
51 michael 91 GNC 1 : _PG_archive_module_init(void)
92 : {
93 1 : return &basic_archive_callbacks;
94 : }
95 :
96 : /*
97 : * basic_archive_startup
98 : *
99 : * Creates the module's memory context.
100 : */
101 : void
102 1 : basic_archive_startup(ArchiveModuleState *state)
103 : {
104 : BasicArchiveData *data;
105 :
106 1 : data = (BasicArchiveData *) MemoryContextAllocZero(TopMemoryContext,
107 : sizeof(BasicArchiveData));
108 1 : data->context = AllocSetContextCreate(TopMemoryContext,
109 : "basic_archive",
110 : ALLOCSET_DEFAULT_SIZES);
111 1 : state->private_data = (void *) data;
430 rhaas 112 CBC 1 : }
113 :
430 rhaas 114 ECB : /*
115 : * check_archive_directory
116 : *
117 : * Checks that the provided archive directory exists.
118 : */
119 : static bool
430 rhaas 120 GIC 2 : check_archive_directory(char **newval, void **extra, GucSource source)
121 : {
122 : struct stat st;
430 rhaas 123 ECB :
124 : /*
125 : * The default value is an empty string, so we have to accept that value.
126 : * Our check_configured callback also checks for this and prevents
332 tgl 127 : * archiving from proceeding if it is still empty.
128 : */
430 rhaas 129 CBC 2 : if (*newval == NULL || *newval[0] == '\0')
430 rhaas 130 GIC 1 : return true;
131 :
430 rhaas 132 ECB : /*
133 : * Make sure the file paths won't be too long. The docs indicate that the
134 : * file names to be archived can be up to 64 characters long.
135 : */
430 rhaas 136 GIC 1 : if (strlen(*newval) + 64 + 2 >= MAXPGPATH)
137 : {
263 michael 138 UIC 0 : GUC_check_errdetail("Archive directory too long.");
430 rhaas 139 0 : return false;
140 : }
430 rhaas 141 ECB :
142 : /*
143 : * Do a basic sanity check that the specified archive directory exists. It
144 : * could be removed at some point in the future, so we still need to be
145 : * prepared for it not to exist in the actual archiving logic.
146 : */
430 rhaas 147 GIC 1 : if (stat(*newval, &st) != 0 || !S_ISDIR(st.st_mode))
148 : {
263 michael 149 UIC 0 : GUC_check_errdetail("Specified archive directory does not exist.");
430 rhaas 150 LBC 0 : return false;
430 rhaas 151 ECB : }
152 :
430 rhaas 153 GIC 1 : return true;
154 : }
155 :
156 : /*
430 rhaas 157 ECB : * basic_archive_configured
158 : *
430 rhaas 159 EUB : * Checks that archive_directory is not blank.
160 : */
161 : static bool
51 michael 162 GNC 2 : basic_archive_configured(ArchiveModuleState *state)
163 : {
430 rhaas 164 GIC 2 : return archive_directory != NULL && archive_directory[0] != '\0';
165 : }
166 :
167 : /*
430 rhaas 168 ECB : * basic_archive_file
169 : *
430 rhaas 170 EUB : * Archives one file.
171 : */
172 : static bool
51 michael 173 GNC 2 : basic_archive_file(ArchiveModuleState *state, const char *file, const char *path)
430 rhaas 174 ECB : {
175 : sigjmp_buf local_sigjmp_buf;
176 : MemoryContext oldcontext;
51 michael 177 GNC 2 : BasicArchiveData *data = (BasicArchiveData *) state->private_data;
178 2 : MemoryContext basic_archive_context = data->context;
179 :
180 : /*
181 : * We run basic_archive_file_internal() in our own memory context so that
182 : * we can easily reset it during error recovery (thus avoiding memory
183 : * leaks).
184 : */
73 michael 185 CBC 2 : oldcontext = MemoryContextSwitchTo(basic_archive_context);
186 :
430 rhaas 187 ECB : /*
188 : * Since the archiver operates at the bottom of the exception stack,
189 : * ERRORs turn into FATALs and cause the archiver process to restart.
190 : * However, using ereport(ERROR, ...) when there are problems is easy to
191 : * code and maintain. Therefore, we create our own exception handler to
192 : * catch ERRORs and return false instead of restarting the archiver
193 : * whenever there is a failure.
194 : */
430 rhaas 195 GIC 2 : if (sigsetjmp(local_sigjmp_buf, 1) != 0)
430 rhaas 196 ECB : {
197 : /* Since not using PG_TRY, must reset error stack by hand */
430 rhaas 198 UIC 0 : error_context_stack = NULL;
199 :
430 rhaas 200 ECB : /* Prevent interrupts while cleaning up */
430 rhaas 201 LBC 0 : HOLD_INTERRUPTS();
202 :
203 : /* Report the error and clear ErrorContext for next time */
430 rhaas 204 UIC 0 : EmitErrorReport();
205 0 : FlushErrorState();
206 :
207 : /* Close any files left open by copy_file() or compare_files() */
430 rhaas 208 LBC 0 : AtEOSubXact_Files(false, InvalidSubTransactionId, InvalidSubTransactionId);
209 :
210 : /* Reset our memory context and switch back to the original one */
430 rhaas 211 UIC 0 : MemoryContextSwitchTo(oldcontext);
73 michael 212 0 : MemoryContextReset(basic_archive_context);
213 :
214 : /* Remove our exception handler */
430 rhaas 215 0 : PG_exception_stack = NULL;
216 :
217 : /* Now we can allow interrupts again */
430 rhaas 218 LBC 0 : RESUME_INTERRUPTS();
219 :
220 : /* Report failure so that the archiver retries this file */
430 rhaas 221 UBC 0 : return false;
222 : }
223 :
430 rhaas 224 EUB : /* Enable our exception handler */
430 rhaas 225 GIC 2 : PG_exception_stack = &local_sigjmp_buf;
226 :
430 rhaas 227 EUB : /* Archive the file! */
430 rhaas 228 GBC 2 : basic_archive_file_internal(file, path);
229 :
230 : /* Remove our exception handler */
231 2 : PG_exception_stack = NULL;
232 :
233 : /* Reset our memory context and switch back to the original one */
234 2 : MemoryContextSwitchTo(oldcontext);
73 michael 235 2 : MemoryContextReset(basic_archive_context);
236 :
430 rhaas 237 GIC 2 : return true;
430 rhaas 238 EUB : }
239 :
240 : static void
430 rhaas 241 GBC 2 : basic_archive_file_internal(const char *file, const char *path)
242 : {
243 : char destination[MAXPGPATH];
430 rhaas 244 EUB : char temp[MAXPGPATH + 256];
245 : struct stat st;
246 : struct timeval tv;
247 : uint64 epoch; /* milliseconds */
430 rhaas 248 ECB :
430 rhaas 249 GIC 2 : ereport(DEBUG3,
250 : (errmsg("archiving \"%s\" via basic_archive", file)));
430 rhaas 251 ECB :
430 rhaas 252 GIC 2 : snprintf(destination, MAXPGPATH, "%s/%s", archive_directory, file);
253 :
430 rhaas 254 ECB : /*
255 : * First, check if the file has already been archived. If it already
256 : * exists and has the same contents as the file we're trying to archive,
332 tgl 257 : * we can return success (after ensuring the file is persisted to disk).
258 : * This scenario is possible if the server crashed after archiving the
259 : * file but before renaming its .ready file to .done.
430 rhaas 260 : *
261 : * If the archive file already exists but has different contents,
262 : * something might be wrong, so we just fail.
263 : */
430 rhaas 264 CBC 2 : if (stat(destination, &st) == 0)
265 : {
430 rhaas 266 UIC 0 : if (compare_files(path, destination))
267 : {
268 0 : ereport(DEBUG3,
269 : (errmsg("archive file \"%s\" already exists with identical contents",
270 : destination)));
271 :
430 rhaas 272 LBC 0 : fsync_fname(destination, false);
430 rhaas 273 UIC 0 : fsync_fname(archive_directory, true);
274 :
430 rhaas 275 LBC 0 : return;
276 : }
277 :
430 rhaas 278 UIC 0 : ereport(ERROR,
279 : (errmsg("archive file \"%s\" already exists", destination)));
280 : }
430 rhaas 281 GIC 2 : else if (errno != ENOENT)
430 rhaas 282 UIC 0 : ereport(ERROR,
283 : (errcode_for_file_access(),
284 : errmsg("could not stat file \"%s\": %m", destination)));
285 :
286 : /*
430 rhaas 287 ECB : * Pick a sufficiently unique name for the temporary file so that a
288 : * collision is unlikely. This helps avoid problems in case a temporary
430 rhaas 289 EUB : * file was left around after a crash or another server happens to be
290 : * archiving to the same directory.
291 : */
430 rhaas 292 GIC 2 : gettimeofday(&tv, NULL);
293 4 : if (pg_mul_u64_overflow((uint64) 1000, (uint64) tv.tv_sec, &epoch) ||
174 michael 294 2 : pg_add_u64_overflow(epoch, (uint64) (tv.tv_usec / 1000), &epoch))
430 rhaas 295 UBC 0 : elog(ERROR, "could not generate temporary file name for archiving");
430 rhaas 296 EUB :
430 rhaas 297 GIC 2 : snprintf(temp, sizeof(temp), "%s/%s.%s.%d." UINT64_FORMAT,
430 rhaas 298 EUB : archive_directory, "archtemp", file, MyProcPid, epoch);
299 :
300 : /*
332 tgl 301 : * Copy the file to its temporary destination. Note that this will fail
302 : * if temp already exists.
303 : */
81 michael 304 GNC 2 : copy_file(path, temp);
430 rhaas 305 EUB :
306 : /*
307 : * Sync the temporary file to disk and move it to its final destination.
308 : * Note that this will overwrite any existing file, but this is only
309 : * possible if someone else created the file since the stat() above.
310 : */
278 michael 311 GNC 2 : (void) durable_rename(temp, destination, ERROR);
312 :
430 rhaas 313 GIC 2 : ereport(DEBUG1,
314 : (errmsg("archived \"%s\" via basic_archive", file)));
315 : }
430 rhaas 316 ECB :
317 : /*
318 : * compare_files
430 rhaas 319 EUB : *
320 : * Returns whether the contents of the files are the same.
430 rhaas 321 ECB : */
322 : static bool
430 rhaas 323 UIC 0 : compare_files(const char *file1, const char *file2)
324 : {
325 : #define CMP_BUF_SIZE (4096)
326 : char buf1[CMP_BUF_SIZE];
327 : char buf2[CMP_BUF_SIZE];
430 rhaas 328 ECB : int fd1;
329 : int fd2;
430 rhaas 330 UIC 0 : bool ret = true;
331 :
332 0 : fd1 = OpenTransientFile(file1, O_RDONLY | PG_BINARY);
333 0 : if (fd1 < 0)
334 0 : ereport(ERROR,
430 rhaas 335 ECB : (errcode_for_file_access(),
336 : errmsg("could not open file \"%s\": %m", file1)));
337 :
430 rhaas 338 UIC 0 : fd2 = OpenTransientFile(file2, O_RDONLY | PG_BINARY);
339 0 : if (fd2 < 0)
340 0 : ereport(ERROR,
341 : (errcode_for_file_access(),
342 : errmsg("could not open file \"%s\": %m", file2)));
343 :
344 : for (;;)
345 0 : {
332 tgl 346 0 : int nbytes = 0;
332 tgl 347 UBC 0 : int buf1_len = 0;
332 tgl 348 UIC 0 : int buf2_len = 0;
349 :
430 rhaas 350 0 : while (buf1_len < CMP_BUF_SIZE)
351 : {
352 0 : nbytes = read(fd1, buf1 + buf1_len, CMP_BUF_SIZE - buf1_len);
353 0 : if (nbytes < 0)
430 rhaas 354 UBC 0 : ereport(ERROR,
355 : (errcode_for_file_access(),
430 rhaas 356 EUB : errmsg("could not read file \"%s\": %m", file1)));
430 rhaas 357 UBC 0 : else if (nbytes == 0)
358 0 : break;
359 :
430 rhaas 360 UIC 0 : buf1_len += nbytes;
361 : }
430 rhaas 362 EUB :
430 rhaas 363 UBC 0 : while (buf2_len < CMP_BUF_SIZE)
430 rhaas 364 EUB : {
430 rhaas 365 UIC 0 : nbytes = read(fd2, buf2 + buf2_len, CMP_BUF_SIZE - buf2_len);
366 0 : if (nbytes < 0)
367 0 : ereport(ERROR,
368 : (errcode_for_file_access(),
430 rhaas 369 EUB : errmsg("could not read file \"%s\": %m", file2)));
430 rhaas 370 UBC 0 : else if (nbytes == 0)
371 0 : break;
430 rhaas 372 EUB :
430 rhaas 373 UIC 0 : buf2_len += nbytes;
430 rhaas 374 EUB : }
375 :
430 rhaas 376 UBC 0 : if (buf1_len != buf2_len || memcmp(buf1, buf2, buf1_len) != 0)
430 rhaas 377 EUB : {
430 rhaas 378 UBC 0 : ret = false;
430 rhaas 379 UIC 0 : break;
380 : }
430 rhaas 381 UBC 0 : else if (buf1_len == 0)
382 0 : break;
383 : }
430 rhaas 384 EUB :
430 rhaas 385 UIC 0 : if (CloseTransientFile(fd1) != 0)
386 0 : ereport(ERROR,
430 rhaas 387 EUB : (errcode_for_file_access(),
388 : errmsg("could not close file \"%s\": %m", file1)));
389 :
430 rhaas 390 UBC 0 : if (CloseTransientFile(fd2) != 0)
391 0 : ereport(ERROR,
392 : (errcode_for_file_access(),
393 : errmsg("could not close file \"%s\": %m", file2)));
430 rhaas 394 EUB :
430 rhaas 395 UBC 0 : return ret;
396 : }
397 :
398 : /*
399 : * basic_archive_shutdown
400 : *
401 : * Frees our allocated state.
402 : */
403 : static void
51 michael 404 GNC 1 : basic_archive_shutdown(ArchiveModuleState *state)
405 : {
406 1 : BasicArchiveData *data = (BasicArchiveData *) state->private_data;
407 : MemoryContext basic_archive_context;
408 :
409 : /*
410 : * If we didn't get to storing the pointer to our allocated state, we don't
411 : * have anything to clean up.
412 : */
413 1 : if (data == NULL)
51 michael 414 UNC 0 : return;
415 :
51 michael 416 GNC 1 : basic_archive_context = data->context;
417 1 : Assert(CurrentMemoryContext != basic_archive_context);
418 :
419 1 : if (MemoryContextIsValid(basic_archive_context))
420 1 : MemoryContextDelete(basic_archive_context);
421 1 : data->context = NULL;
422 :
423 : /*
424 : * Finally, free the state.
425 : */
426 1 : pfree(data);
427 1 : state->private_data = NULL;
428 : }
|