Age Owner TLA Line data Source code
1 : /*-------------------------------------------------------------------------
2 : *
3 : * local_source.c
4 : * Functions for using a local data directory as the source.
5 : *
6 : * Portions Copyright (c) 2013-2023, PostgreSQL Global Development Group
7 : *
8 : *-------------------------------------------------------------------------
9 : */
10 : #include "postgres_fe.h"
11 :
12 : #include <fcntl.h>
13 : #include <unistd.h>
14 :
15 : #include "datapagemap.h"
16 : #include "file_ops.h"
17 : #include "filemap.h"
18 : #include "pg_rewind.h"
19 : #include "rewind_source.h"
20 :
21 : typedef struct
22 : {
23 : rewind_source common; /* common interface functions */
24 :
25 : const char *datadir; /* path to the source data directory */
26 : } local_source;
27 :
28 : static void local_traverse_files(rewind_source *source,
29 : process_file_callback_t callback);
30 : static char *local_fetch_file(rewind_source *source, const char *path,
31 : size_t *filesize);
32 : static void local_queue_fetch_file(rewind_source *source, const char *path,
33 : size_t len);
34 : static void local_queue_fetch_range(rewind_source *source, const char *path,
35 : off_t off, size_t len);
36 : static void local_finish_fetch(rewind_source *source);
37 : static void local_destroy(rewind_source *source);
38 :
39 : rewind_source *
886 heikki.linnakangas 40 CBC 11 : init_local_source(const char *datadir)
41 : {
42 : local_source *src;
43 :
44 11 : src = pg_malloc0(sizeof(local_source));
45 :
46 11 : src->common.traverse_files = local_traverse_files;
47 11 : src->common.fetch_file = local_fetch_file;
369 dgustafsson 48 11 : src->common.queue_fetch_file = local_queue_fetch_file;
49 11 : src->common.queue_fetch_range = local_queue_fetch_range;
886 heikki.linnakangas 50 11 : src->common.finish_fetch = local_finish_fetch;
51 11 : src->common.get_current_wal_insert_lsn = NULL;
52 11 : src->common.destroy = local_destroy;
53 :
54 11 : src->datadir = datadir;
55 :
56 11 : return &src->common;
57 : }
58 :
59 : static void
60 7 : local_traverse_files(rewind_source *source, process_file_callback_t callback)
61 : {
62 7 : traverse_datadir(((local_source *) source)->datadir, &process_source_file);
63 7 : }
64 :
65 : static char *
66 23 : local_fetch_file(rewind_source *source, const char *path, size_t *filesize)
67 : {
68 23 : return slurpFile(((local_source *) source)->datadir, path, filesize);
69 : }
70 :
71 : /*
72 : * Copy a file from source to target.
73 : *
74 : * 'len' is the expected length of the file.
75 : */
76 : static void
369 dgustafsson 77 2222 : local_queue_fetch_file(rewind_source *source, const char *path, size_t len)
78 : {
79 2222 : const char *datadir = ((local_source *) source)->datadir;
80 : PGIOAlignedBlock buf;
81 : char srcpath[MAXPGPATH];
82 : int srcfd;
83 : size_t written_len;
84 :
85 2222 : snprintf(srcpath, sizeof(srcpath), "%s/%s", datadir, path);
86 :
87 : /* Open source file for reading */
88 2222 : srcfd = open(srcpath, O_RDONLY | PG_BINARY, 0);
89 2222 : if (srcfd < 0)
369 dgustafsson 90 UBC 0 : pg_fatal("could not open source file \"%s\": %m",
91 : srcpath);
92 :
93 : /* Truncate and open the target file for writing */
369 dgustafsson 94 CBC 2222 : open_target_file(path, true);
95 :
96 2222 : written_len = 0;
97 : for (;;)
98 45607 : {
99 : ssize_t read_len;
100 :
101 47829 : read_len = read(srcfd, buf.data, sizeof(buf));
102 :
103 47829 : if (read_len < 0)
369 dgustafsson 104 UBC 0 : pg_fatal("could not read file \"%s\": %m", srcpath);
369 dgustafsson 105 CBC 47829 : else if (read_len == 0)
106 2222 : break; /* EOF reached */
107 :
108 45607 : write_target_range(buf.data, written_len, read_len);
109 45607 : written_len += read_len;
110 : }
111 :
112 : /*
113 : * A local source is not expected to change while we're rewinding, so
114 : * check that the size of the file matches our earlier expectation.
115 : */
116 2222 : if (written_len != len)
117 1 : pg_fatal("size of source file \"%s\" changed concurrently: %d bytes expected, %d copied",
118 : srcpath, (int) len, (int) written_len);
119 :
120 2221 : if (close(srcfd) != 0)
369 dgustafsson 121 UBC 0 : pg_fatal("could not close file \"%s\": %m", srcpath);
369 dgustafsson 122 CBC 2221 : }
123 :
124 : /*
125 : * Copy a file from source to target, starting at 'off', for 'len' bytes.
126 : */
127 : static void
128 828 : local_queue_fetch_range(rewind_source *source, const char *path, off_t off,
129 : size_t len)
130 : {
886 heikki.linnakangas 131 828 : const char *datadir = ((local_source *) source)->datadir;
132 : PGIOAlignedBlock buf;
133 : char srcpath[MAXPGPATH];
134 : int srcfd;
135 828 : off_t begin = off;
136 828 : off_t end = off + len;
137 :
138 828 : snprintf(srcpath, sizeof(srcpath), "%s/%s", datadir, path);
139 :
140 828 : srcfd = open(srcpath, O_RDONLY | PG_BINARY, 0);
141 828 : if (srcfd < 0)
886 heikki.linnakangas 142 UBC 0 : pg_fatal("could not open source file \"%s\": %m",
143 : srcpath);
144 :
886 heikki.linnakangas 145 CBC 828 : if (lseek(srcfd, begin, SEEK_SET) == -1)
886 heikki.linnakangas 146 UBC 0 : pg_fatal("could not seek in source file: %m");
147 :
886 heikki.linnakangas 148 CBC 828 : open_target_file(path, false);
149 :
150 1902 : while (end - begin > 0)
151 : {
152 : ssize_t readlen;
153 : size_t thislen;
154 :
155 1074 : if (end - begin > sizeof(buf))
369 dgustafsson 156 246 : thislen = sizeof(buf);
157 : else
158 828 : thislen = end - begin;
159 :
160 1074 : readlen = read(srcfd, buf.data, thislen);
161 :
886 heikki.linnakangas 162 1074 : if (readlen < 0)
886 heikki.linnakangas 163 UBC 0 : pg_fatal("could not read file \"%s\": %m", srcpath);
886 heikki.linnakangas 164 CBC 1074 : else if (readlen == 0)
886 heikki.linnakangas 165 UBC 0 : pg_fatal("unexpected EOF while reading file \"%s\"", srcpath);
166 :
886 heikki.linnakangas 167 CBC 1074 : write_target_range(buf.data, begin, readlen);
168 1074 : begin += readlen;
169 : }
170 :
171 828 : if (close(srcfd) != 0)
886 heikki.linnakangas 172 UBC 0 : pg_fatal("could not close file \"%s\": %m", srcpath);
886 heikki.linnakangas 173 CBC 828 : }
174 :
175 : static void
176 6 : local_finish_fetch(rewind_source *source)
177 : {
178 : /*
179 : * Nothing to do, local_queue_fetch_range() copies the ranges immediately.
180 : */
181 6 : }
182 :
183 : static void
184 6 : local_destroy(rewind_source *source)
185 : {
186 6 : pfree(source);
187 6 : }
|