Age Owner Branch data TLA Line data Source code
1 : : /*-------------------------------------------------------------------------
2 : : *
3 : : * bulk_write.c
4 : : * Efficiently and reliably populate a new relation
5 : : *
6 : : * The assumption is that no other backends access the relation while we are
7 : : * loading it, so we can take some shortcuts. Do not mix operations through
8 : : * the regular buffer manager and the bulk loading interface!
9 : : *
10 : : * We bypass the buffer manager to avoid the locking overhead, and call
11 : : * smgrextend() directly. A downside is that the pages will need to be
12 : : * re-read into shared buffers on first use after the build finishes. That's
13 : : * usually a good tradeoff for large relations, and for small relations, the
14 : : * overhead isn't very significant compared to creating the relation in the
15 : : * first place.
16 : : *
17 : : * The pages are WAL-logged if needed. To save on WAL header overhead, we
18 : : * WAL-log several pages in one record.
19 : : *
20 : : * One tricky point is that because we bypass the buffer manager, we need to
21 : : * register the relation for fsyncing at the next checkpoint ourselves, and
22 : : * make sure that the relation is correctly fsync'd by us or the checkpointer
23 : : * even if a checkpoint happens concurrently.
24 : : *
25 : : *
26 : : * Portions Copyright (c) 1996-2024, PostgreSQL Global Development Group
27 : : * Portions Copyright (c) 1994, Regents of the University of California
28 : : *
29 : : *
30 : : * IDENTIFICATION
31 : : * src/backend/storage/smgr/bulk_write.c
32 : : *
33 : : *-------------------------------------------------------------------------
34 : : */
35 : : #include "postgres.h"
36 : :
37 : : #include "access/xloginsert.h"
38 : : #include "access/xlogrecord.h"
39 : : #include "storage/bufmgr.h"
40 : : #include "storage/bufpage.h"
41 : : #include "storage/bulk_write.h"
42 : : #include "storage/proc.h"
43 : : #include "storage/smgr.h"
44 : : #include "utils/rel.h"
45 : :
46 : : #define MAX_PENDING_WRITES XLR_MAX_BLOCK_ID
47 : :
48 : : static const PGIOAlignedBlock zero_buffer = {{0}}; /* worth BLCKSZ */
49 : :
50 : : typedef struct PendingWrite
51 : : {
52 : : BulkWriteBuffer buf;
53 : : BlockNumber blkno;
54 : : bool page_std;
55 : : } PendingWrite;
56 : :
57 : : /*
58 : : * Bulk writer state for one relation fork.
59 : : */
60 : : struct BulkWriteState
61 : : {
62 : : /* Information about the target relation we're writing */
63 : : SMgrRelation smgr;
64 : : ForkNumber forknum;
65 : : bool use_wal;
66 : :
67 : : /* We keep several writes queued, and WAL-log them in batches */
68 : : int npending;
69 : : PendingWrite pending_writes[MAX_PENDING_WRITES];
70 : :
71 : : /* Current size of the relation */
72 : : BlockNumber pages_written;
73 : :
74 : : /* The RedoRecPtr at the time that the bulk operation started */
75 : : XLogRecPtr start_RedoRecPtr;
76 : :
77 : : MemoryContext memcxt;
78 : : };
79 : :
80 : : static void smgr_bulk_flush(BulkWriteState *bulkstate);
81 : :
82 : : /*
83 : : * Start a bulk write operation on a relation fork.
84 : : */
85 : : BulkWriteState *
51 heikki.linnakangas@i 86 :GNC 23008 : smgr_bulk_start_rel(Relation rel, ForkNumber forknum)
87 : : {
88 : 23008 : return smgr_bulk_start_smgr(RelationGetSmgr(rel),
89 : : forknum,
90 [ + + + + : 23008 : RelationNeedsWAL(rel) || forknum == INIT_FORKNUM);
+ + + + +
+ ]
91 : : }
92 : :
93 : : /*
94 : : * Start a bulk write operation on a relation fork.
95 : : *
96 : : * This is like smgr_bulk_start_rel, but can be used without a relcache entry.
97 : : */
98 : : BulkWriteState *
99 : 23094 : smgr_bulk_start_smgr(SMgrRelation smgr, ForkNumber forknum, bool use_wal)
100 : : {
101 : : BulkWriteState *state;
102 : :
103 : 23094 : state = palloc(sizeof(BulkWriteState));
104 : 23094 : state->smgr = smgr;
105 : 23094 : state->forknum = forknum;
106 : 23094 : state->use_wal = use_wal;
107 : :
108 : 23094 : state->npending = 0;
109 : 23094 : state->pages_written = 0;
110 : :
111 : 23094 : state->start_RedoRecPtr = GetRedoRecPtr();
112 : :
113 : : /*
114 : : * Remember the memory context. We will use it to allocate all the
115 : : * buffers later.
116 : : */
117 : 23094 : state->memcxt = CurrentMemoryContext;
118 : :
119 : 23094 : return state;
120 : : }
121 : :
122 : : /*
123 : : * Finish bulk write operation.
124 : : *
125 : : * This WAL-logs and flushes any remaining pending writes to disk, and fsyncs
126 : : * the relation if needed.
127 : : */
128 : : void
129 : 23094 : smgr_bulk_finish(BulkWriteState *bulkstate)
130 : : {
131 : : /* WAL-log and flush any remaining pages */
132 : 23094 : smgr_bulk_flush(bulkstate);
133 : :
134 : : /*
135 : : * When we wrote out the pages, we passed skipFsync=true to avoid the
136 : : * overhead of registering all the writes with the checkpointer. Register
137 : : * the whole relation now.
138 : : *
139 : : * There is one hole in that idea: If a checkpoint occurred while we were
140 : : * writing the pages, it already missed fsyncing the pages we had written
141 : : * before the checkpoint started. A crash later on would replay the WAL
142 : : * starting from the checkpoint, therefore it wouldn't replay our earlier
143 : : * WAL records. So if a checkpoint started after the bulk write, fsync
144 : : * the files now.
145 : : */
146 [ + + ]: 23094 : if (!SmgrIsTemp(bulkstate->smgr))
147 : : {
148 : : /*
149 : : * Prevent a checkpoint from starting between the GetRedoRecPtr() and
150 : : * smgrregistersync() calls.
151 : : */
152 [ - + ]: 21979 : Assert((MyProc->delayChkptFlags & DELAY_CHKPT_START) == 0);
153 : 21979 : MyProc->delayChkptFlags |= DELAY_CHKPT_START;
154 : :
155 [ + + ]: 21979 : if (bulkstate->start_RedoRecPtr != GetRedoRecPtr())
156 : : {
157 : : /*
158 : : * A checkpoint occurred and it didn't know about our writes, so
159 : : * fsync() the relation ourselves.
160 : : */
161 : 1 : MyProc->delayChkptFlags &= ~DELAY_CHKPT_START;
162 : 1 : smgrimmedsync(bulkstate->smgr, bulkstate->forknum);
163 [ - + ]: 1 : elog(DEBUG1, "flushed relation because a checkpoint occurred concurrently");
164 : : }
165 : : else
166 : : {
167 : 21978 : smgrregistersync(bulkstate->smgr, bulkstate->forknum);
168 : 21978 : MyProc->delayChkptFlags &= ~DELAY_CHKPT_START;
169 : : }
170 : : }
171 : 23094 : }
172 : :
173 : : static int
174 : 98694 : buffer_cmp(const void *a, const void *b)
175 : : {
176 : 98694 : const PendingWrite *bufa = (const PendingWrite *) a;
177 : 98694 : const PendingWrite *bufb = (const PendingWrite *) b;
178 : :
179 : : /* We should not see duplicated writes for the same block */
180 [ - + ]: 98694 : Assert(bufa->blkno != bufb->blkno);
181 [ + + ]: 98694 : if (bufa->blkno > bufb->blkno)
182 : 45886 : return 1;
183 : : else
184 : 52808 : return -1;
185 : : }
186 : :
187 : : /*
188 : : * Finish all the pending writes.
189 : : */
190 : : static void
191 : 23650 : smgr_bulk_flush(BulkWriteState *bulkstate)
192 : : {
193 : 23650 : int npending = bulkstate->npending;
194 : 23650 : PendingWrite *pending_writes = bulkstate->pending_writes;
195 : :
196 [ + + ]: 23650 : if (npending == 0)
197 : 148 : return;
198 : :
199 [ + + ]: 23502 : if (npending > 1)
200 : 5029 : qsort(pending_writes, npending, sizeof(PendingWrite), buffer_cmp);
201 : :
202 [ + + ]: 23502 : if (bulkstate->use_wal)
203 : : {
204 : : BlockNumber blknos[MAX_PENDING_WRITES];
205 : : Page pages[MAX_PENDING_WRITES];
206 : 16692 : bool page_std = true;
207 : :
208 [ + + ]: 50019 : for (int i = 0; i < npending; i++)
209 : : {
210 : 33327 : blknos[i] = pending_writes[i].blkno;
211 : 33327 : pages[i] = pending_writes[i].buf->data;
212 : :
213 : : /*
214 : : * If any of the pages use !page_std, we log them all as such.
215 : : * That's a bit wasteful, but in practice, a mix of standard and
216 : : * non-standard page layout is rare. None of the built-in AMs do
217 : : * that.
218 : : */
219 [ + + ]: 33327 : if (!pending_writes[i].page_std)
220 : 50 : page_std = false;
221 : : }
222 : 16692 : log_newpages(&bulkstate->smgr->smgr_rlocator.locator, bulkstate->forknum,
223 : : npending, blknos, pages, page_std);
224 : : }
225 : :
226 [ + + ]: 77099 : for (int i = 0; i < npending; i++)
227 : : {
228 : 53597 : BlockNumber blkno = pending_writes[i].blkno;
229 : 53597 : Page page = pending_writes[i].buf->data;
230 : :
231 : 53597 : PageSetChecksumInplace(page, blkno);
232 : :
233 [ + + ]: 53597 : if (blkno >= bulkstate->pages_written)
234 : : {
235 : : /*
236 : : * If we have to write pages nonsequentially, fill in the space
237 : : * with zeroes until we come back and overwrite. This is not
238 : : * logically necessary on standard Unix filesystems (unwritten
239 : : * space will read as zeroes anyway), but it should help to avoid
240 : : * fragmentation. The dummy pages aren't WAL-logged though.
241 : : */
242 [ + + ]: 53597 : while (blkno > bulkstate->pages_written)
243 : : {
244 : : /* don't set checksum for all-zero page */
245 : 206 : smgrextend(bulkstate->smgr, bulkstate->forknum,
246 : 206 : bulkstate->pages_written++,
247 : : &zero_buffer,
248 : : true);
249 : : }
250 : :
251 : 53391 : smgrextend(bulkstate->smgr, bulkstate->forknum, blkno, page, true);
252 : 53391 : bulkstate->pages_written = pending_writes[i].blkno + 1;
253 : : }
254 : : else
255 : 206 : smgrwrite(bulkstate->smgr, bulkstate->forknum, blkno, page, true);
256 : 53597 : pfree(page);
257 : : }
258 : :
259 : 23502 : bulkstate->npending = 0;
260 : : }
261 : :
262 : : /*
263 : : * Queue write of 'buf'.
264 : : *
265 : : * NB: this takes ownership of 'buf'!
266 : : *
267 : : * You are only allowed to write a given block once as part of one bulk write
268 : : * operation.
269 : : */
270 : : void
271 : 53597 : smgr_bulk_write(BulkWriteState *bulkstate, BlockNumber blocknum, BulkWriteBuffer buf, bool page_std)
272 : : {
273 : : PendingWrite *w;
274 : :
275 : 53597 : w = &bulkstate->pending_writes[bulkstate->npending++];
276 : 53597 : w->buf = buf;
277 : 53597 : w->blkno = blocknum;
278 : 53597 : w->page_std = page_std;
279 : :
280 [ + + ]: 53597 : if (bulkstate->npending == MAX_PENDING_WRITES)
281 : 556 : smgr_bulk_flush(bulkstate);
282 : 53597 : }
283 : :
284 : : /*
285 : : * Allocate a new buffer which can later be written with smgr_bulk_write().
286 : : *
287 : : * There is no function to free the buffer. When you pass it to
288 : : * smgr_bulk_write(), it takes ownership and frees it when it's no longer
289 : : * needed.
290 : : *
291 : : * This is currently implemented as a simple palloc, but could be implemented
292 : : * using a ring buffer or larger chunks in the future, so don't rely on it.
293 : : */
294 : : BulkWriteBuffer
295 : 53597 : smgr_bulk_get_buf(BulkWriteState *bulkstate)
296 : : {
297 : 53597 : return MemoryContextAllocAligned(bulkstate->memcxt, BLCKSZ, PG_IO_ALIGN_SIZE, 0);
298 : : }
|