Age Owner Branch data TLA Line data Source code
1 : : /*-------------------------------------------------------------------------
2 : : *
3 : : * local_source.c
4 : : * Functions for using a local data directory as the source.
5 : : *
6 : : * Portions Copyright (c) 2013-2024, PostgreSQL Global Development Group
7 : : *
8 : : *-------------------------------------------------------------------------
9 : : */
10 : : #include "postgres_fe.h"
11 : :
12 : : #include <fcntl.h>
13 : : #include <unistd.h>
14 : :
15 : : #include "datapagemap.h"
16 : : #include "file_ops.h"
17 : : #include "filemap.h"
18 : : #include "pg_rewind.h"
19 : : #include "rewind_source.h"
20 : :
21 : : typedef struct
22 : : {
23 : : rewind_source common; /* common interface functions */
24 : :
25 : : const char *datadir; /* path to the source data directory */
26 : : } local_source;
27 : :
28 : : static void local_traverse_files(rewind_source *source,
29 : : process_file_callback_t callback);
30 : : static char *local_fetch_file(rewind_source *source, const char *path,
31 : : size_t *filesize);
32 : : static void local_queue_fetch_file(rewind_source *source, const char *path,
33 : : size_t len);
34 : : static void local_queue_fetch_range(rewind_source *source, const char *path,
35 : : off_t off, size_t len);
36 : : static void local_finish_fetch(rewind_source *source);
37 : : static void local_destroy(rewind_source *source);
38 : :
39 : : rewind_source *
1257 heikki.linnakangas@i 40 :CBC 11 : init_local_source(const char *datadir)
41 : : {
42 : : local_source *src;
43 : :
44 : 11 : src = pg_malloc0(sizeof(local_source));
45 : :
46 : 11 : src->common.traverse_files = local_traverse_files;
47 : 11 : src->common.fetch_file = local_fetch_file;
740 dgustafsson@postgres 48 : 11 : src->common.queue_fetch_file = local_queue_fetch_file;
49 : 11 : src->common.queue_fetch_range = local_queue_fetch_range;
1257 heikki.linnakangas@i 50 : 11 : src->common.finish_fetch = local_finish_fetch;
51 : 11 : src->common.get_current_wal_insert_lsn = NULL;
52 : 11 : src->common.destroy = local_destroy;
53 : :
54 : 11 : src->datadir = datadir;
55 : :
56 : 11 : return &src->common;
57 : : }
58 : :
59 : : static void
60 : 7 : local_traverse_files(rewind_source *source, process_file_callback_t callback)
61 : : {
348 dgustafsson@postgres 62 : 7 : traverse_datadir(((local_source *) source)->datadir, callback);
1257 heikki.linnakangas@i 63 : 7 : }
64 : :
65 : : static char *
66 : 23 : local_fetch_file(rewind_source *source, const char *path, size_t *filesize)
67 : : {
68 : 23 : return slurpFile(((local_source *) source)->datadir, path, filesize);
69 : : }
70 : :
71 : : /*
72 : : * Copy a file from source to target.
73 : : *
74 : : * 'len' is the expected length of the file.
75 : : */
76 : : static void
740 dgustafsson@postgres 77 : 2219 : local_queue_fetch_file(rewind_source *source, const char *path, size_t len)
78 : : {
79 : 2219 : const char *datadir = ((local_source *) source)->datadir;
80 : : PGIOAlignedBlock buf;
81 : : char srcpath[MAXPGPATH];
82 : : int srcfd;
83 : : size_t written_len;
84 : :
85 : 2219 : snprintf(srcpath, sizeof(srcpath), "%s/%s", datadir, path);
86 : :
87 : : /* Open source file for reading */
88 : 2219 : srcfd = open(srcpath, O_RDONLY | PG_BINARY, 0);
89 [ - + ]: 2219 : if (srcfd < 0)
740 dgustafsson@postgres 90 :UBC 0 : pg_fatal("could not open source file \"%s\": %m",
91 : : srcpath);
92 : :
93 : : /* Truncate and open the target file for writing */
740 dgustafsson@postgres 94 :CBC 2219 : open_target_file(path, true);
95 : :
96 : 2219 : written_len = 0;
97 : : for (;;)
98 : 45608 : {
99 : : ssize_t read_len;
100 : :
101 : 47827 : read_len = read(srcfd, buf.data, sizeof(buf));
102 : :
103 [ - + ]: 47827 : if (read_len < 0)
740 dgustafsson@postgres 104 :UBC 0 : pg_fatal("could not read file \"%s\": %m", srcpath);
740 dgustafsson@postgres 105 [ + + ]:CBC 47827 : else if (read_len == 0)
106 : 2219 : break; /* EOF reached */
107 : :
108 : 45608 : write_target_range(buf.data, written_len, read_len);
109 : 45608 : written_len += read_len;
110 : : }
111 : :
112 : : /*
113 : : * A local source is not expected to change while we're rewinding, so
114 : : * check that the size of the file matches our earlier expectation.
115 : : */
116 [ + + ]: 2219 : if (written_len != len)
117 : 1 : pg_fatal("size of source file \"%s\" changed concurrently: %d bytes expected, %d copied",
118 : : srcpath, (int) len, (int) written_len);
119 : :
120 [ - + ]: 2218 : if (close(srcfd) != 0)
740 dgustafsson@postgres 121 :UBC 0 : pg_fatal("could not close file \"%s\": %m", srcpath);
740 dgustafsson@postgres 122 :CBC 2218 : }
123 : :
124 : : /*
125 : : * Copy a file from source to target, starting at 'off', for 'len' bytes.
126 : : */
127 : : static void
128 : 840 : local_queue_fetch_range(rewind_source *source, const char *path, off_t off,
129 : : size_t len)
130 : : {
1257 heikki.linnakangas@i 131 : 840 : const char *datadir = ((local_source *) source)->datadir;
132 : : PGIOAlignedBlock buf;
133 : : char srcpath[MAXPGPATH];
134 : : int srcfd;
135 : 840 : off_t begin = off;
136 : 840 : off_t end = off + len;
137 : :
138 : 840 : snprintf(srcpath, sizeof(srcpath), "%s/%s", datadir, path);
139 : :
140 : 840 : srcfd = open(srcpath, O_RDONLY | PG_BINARY, 0);
141 [ - + ]: 840 : if (srcfd < 0)
1257 heikki.linnakangas@i 142 :UBC 0 : pg_fatal("could not open source file \"%s\": %m",
143 : : srcpath);
144 : :
1257 heikki.linnakangas@i 145 [ - + ]:CBC 840 : if (lseek(srcfd, begin, SEEK_SET) == -1)
1257 heikki.linnakangas@i 146 :UBC 0 : pg_fatal("could not seek in source file: %m");
147 : :
1257 heikki.linnakangas@i 148 :CBC 840 : open_target_file(path, false);
149 : :
150 [ + + ]: 1926 : while (end - begin > 0)
151 : : {
152 : : ssize_t readlen;
153 : : size_t thislen;
154 : :
155 [ + + ]: 1086 : if (end - begin > sizeof(buf))
740 dgustafsson@postgres 156 : 246 : thislen = sizeof(buf);
157 : : else
158 : 840 : thislen = end - begin;
159 : :
160 : 1086 : readlen = read(srcfd, buf.data, thislen);
161 : :
1257 heikki.linnakangas@i 162 [ - + ]: 1086 : if (readlen < 0)
1257 heikki.linnakangas@i 163 :UBC 0 : pg_fatal("could not read file \"%s\": %m", srcpath);
1257 heikki.linnakangas@i 164 [ - + ]:CBC 1086 : else if (readlen == 0)
1257 heikki.linnakangas@i 165 :UBC 0 : pg_fatal("unexpected EOF while reading file \"%s\"", srcpath);
166 : :
1257 heikki.linnakangas@i 167 :CBC 1086 : write_target_range(buf.data, begin, readlen);
168 : 1086 : begin += readlen;
169 : : }
170 : :
171 [ - + ]: 840 : if (close(srcfd) != 0)
1257 heikki.linnakangas@i 172 :UBC 0 : pg_fatal("could not close file \"%s\": %m", srcpath);
1257 heikki.linnakangas@i 173 :CBC 840 : }
174 : :
175 : : static void
176 : 6 : local_finish_fetch(rewind_source *source)
177 : : {
178 : : /*
179 : : * Nothing to do, local_queue_fetch_range() copies the ranges immediately.
180 : : */
181 : 6 : }
182 : :
183 : : static void
184 : 6 : local_destroy(rewind_source *source)
185 : : {
186 : 6 : pfree(source);
187 : 6 : }
|